"""
Get all active VPN SA's.
Create a query to obtain all connections for a given engine::
query = VPNSAQuery('sg_vm')
Add a timezone to the query::
query.format.timezone('CST')
Execute query and return raw results::
for records in query.fetch_batch():
...
Execute query and return as a :class:`.VPNSecurityAssoc` element::
for records in query.fetch_as_element():
...
Delete a VPN SA::
query = VPNSAQuery('sg_vm')
for sa in query.fetch_as_element():
sa.delete()
.. seealso:: :class:`smc_monitoring.models.filters` for more information on creating filters
"""
from smc_monitoring.models.query import Query
from smc_monitoring.models.constants import LogField
from smc.base.model import prepared_request
from smc.api.exceptions import DeleteElementFailed
[docs]class VPNSAQuery(Query):
"""
Show all current VPN SA's on the specified target.
:ivar list field_ids: field IDs are the default fields for this entry type
and are constants found in :class:`smc_monitoring.models.constants.LogField`
:param str target: name of target engine/cluster
"""
location = '/monitoring/session/socket'
field_ids = [
LogField.TIMESTAMP,
LogField.NODEID,
LogField.VPNID,
LogField.SECURITYGATEWAY,
LogField.PEERSECURITYGATEWAY,
LogField.IKECOOKIE,
LogField.ENDPOINT,
LogField.PEERENDPOINT,
LogField.SACLASS,
LogField.CIPHERALG,
LogField.NEGOTIATIONROLE,
LogField.SRCADDRS,
LogField.DSTADDRS,
LogField.PROTOCOL,
LogField.NUMBYTESSENT,
LogField.NUMBYTESRECEIVED,
LogField.EXPIRATIONTIME]
def __init__(self, target, **kw):
super(VPNSAQuery, self).__init__('VPN_SA', target, **kw)
[docs] def fetch_as_element(self, **kw):
"""
Fetch the results and return as a VPNSecurityAssoc element.
The original query is not modified.
:return: generator of elements
:rtype: :class:`~VPNSecurityAssoc`
"""
clone = self.copy()
clone.format.field_format('id')
for custom_field in ['field_ids', 'field_names']:
clone.format.data.pop(custom_field, None)
for list_of_results in clone.fetch_raw(**kw):
for entry in list_of_results:
yield VPNSecurityAssoc(**entry)
[docs]class VPNSecurityAssoc(object):
"""
A VPN Security Association represents a currently connected VPN
endpoint. This is the result of making a :class:`.VPNSAQuery` and
using :meth:`~VPNSAQuery.fetch_as_element`.
"""
def __init__(self, **data):
self.vpn = data
@property
def href(self):
return self.vpn.get('vpn_sa_href')
def delete(self):
return prepared_request(
DeleteElementFailed,
href=self.href).delete()
@property
def timestamp(self):
"""
Timestamp of this connection. It is recommended to set the timezone
on the query to view this timestamp in the systems local time.
For example::
query.format.timezone('CST')
:rtype: str
"""
return self.vpn.get(str(LogField.TIMESTAMP))
@property
def engine(self):
"""
The engine/cluster for this VPN
:rtype: str
"""
return self.vpn.get(str(LogField.NODEID))
@property
def local_gateway(self):
"""
Local gateway for this VPN.
:rtype: str
"""
return self.vpn.get(str(LogField.SECURITYGATEWAY))
@property
def peer_gateway(self):
"""
Peer gateway for this VPN.
:rtype: str
"""
return self.vpn.get(str(LogField.PEERSECURITYGATEWAY))
@property
def local_endpoint(self):
"""
Local endpoint (IP address) for this VPN tunnel.
:rtype: str
"""
return self.vpn.get(str(LogField.ENDPOINT))
@property
def peer_endpoint(self):
"""
Peer endpoint element and IP Address for this tunnel.
:rtype: str
"""
return self.vpn.get(str(LogField.PEERENDPOINT))
@property
def local_networks(self):
"""
Local protected networks
:rtype: str
"""
return self.vpn.get(str(LogField.SRCADDRS))
@property
def peer_networks(self):
"""
Remote protected networks
:rtype: str
"""
return self.vpn.get(str(LogField.DSTADDRS))
@property
def vpn_id(self):
return self.vpn.get(str(LogField.VPNID))
@property
def sa_type(self):
"""
SA Type for this VPN tunnel. Each VPN tunnel will typically have
at least two entries, one for IPSEC and another for IKE.
:rtype: str
"""
return self.vpn.get(str(LogField.SACLASS))
@property
def protocol(self):
"""
WHich protocol is associated with this tunnel entry.
:return: IP protocol for tunnel, i.e. ESP/UDP
:rtype: str
"""
return self.vpn.get(str(LogField.PROTOCOL))
@property
def negotiation_role(self):
"""
Role for this tunnel entry.
:return: Negotiation role, i.e. Initiator, Responder, etc.
:rtype: str
"""
return self.vpn.get(str(LogField.NEGOTIATIONROLE))
@property
def bytes_sent(self):
"""
Number of bytes sent.
:rtype: int
"""
return int(self.vpn.get(str(LogField.NUMBYTESSENT), 0))
@property
def bytes_received(self):
"""
Number of bytes received.
:rtype: int
"""
return int(self.vpn.get(str(LogField.NUMBYTESRECEIVED), 0))
@property
def expiration(self):
"""
Expiration time for this tunnel Security Association
:rtype: str
"""
return self.vpn.get(str(LogField.EXPIRATIONTIME))
def __str__(self):
return '{}(local={},peer={},localip={},peerip={},satype={})'.format(
self.__class__.__name__, self.local_gateway, self.peer_gateway,
self.local_endpoint, self.peer_endpoint, self.sa_type)
def __repr__(self):
return str(self)