Source code for smc_monitoring.monitors.connections

"""
A connection query returns all currently connected sessions on the
given target.

Create a query to obtain all connections for a given engine::

    query = ConnectionQuery('sg_vm')
    
Add a timezone to the query::

    query.format.timezone('CST')
    
Add a filter to only get connections if the source address is 172.18.1.252::

    query.add_in_filter(FieldValue(LogField.SRC), [IPValue('172.18.1.252')])

Only connections that match a specific service::

    query.add_in_filter(FieldValue(LogField.SERVICE), [ServiceValue('TCP/443', 'UDP/53')])
    
Execute query and return raw results::

    for records in query.fetch_raw():
        ...

Execute query and return as an :class:`.Connection` element::

    for records in query.fetch_as_element():
        ...

Retrieving live streaming results::

    for records in query.fetch_live():
        ...
    
.. seealso:: :class:`smc_monitoring.models.filters` for more information on creating filters

"""
from smc_monitoring.models.query import Query
from smc_monitoring.models.constants import LogField


[docs]class ConnectionQuery(Query): """ Show all current connections on the specified target. :ivar list field_ids: field IDs are the default fields for this entry type and are constants found in :class:`smc_monitoring.models.constants.LogField` :param str target: name of target engine/cluster """ location = '/monitoring/session/socket' field_ids = [ LogField.TIMESTAMP, LogField.NODEID, LogField.SRC, LogField.SPORT, LogField.SRCZONE, LogField.DST, LogField.DPORT, LogField.DSTZONE, LogField.SERVICE, LogField.IPSAPPID, LogField.PROTOCOL, LogField.STATE] def __init__(self, target, **kw): super(ConnectionQuery, self).__init__('CONNECTIONS', target, **kw)
[docs] def fetch_as_element(self, **kw): """ Fetch the results and return as a Connection element. The original query is not modified. :return: generator of elements :rtype: :class:`.Connection` """ clone = self.copy() clone.format.field_format('id') for custom_field in ['field_ids', 'field_names']: clone.format.data.pop(custom_field, None) for list_of_results in clone.fetch_raw(**kw): for entry in list_of_results: yield Connection(**entry)
[docs]class Connection(object): """ Connection represents a state table entry. This is the result of making a :class:`~ConnectionQuery` and using :meth:`~ConnectionQuery.fetch_as_element`. """ def __init__(self, **data): self.cxn = data @property def timestamp(self): """ Timestamp of this connection. It is recommended to set the timezone on the query to view this timestamp in the systems local time. For example:: query.format.timezone('CST') :return: timestamp in string format :rtype: str """ return self.cxn.get(str(LogField.TIMESTAMP)) @property def engine(self): """ The engine/cluster for this state table entry :return: engine or cluster for this entry :rtype: str """ return self.cxn.get(str(LogField.NODEID)) @property def source_addr(self): """ Source address for this entry :rtype: str """ return self.cxn.get(str(LogField.SRC)) @property def dest_addr(self): """ Destination address for this entry :rtype: str """ return self.cxn.get(str(LogField.DST)) @property def service(self): """ Service for this entry :return: service (HTTP/HTTPS, etc) :rtype: str """ return self.cxn.get(str(LogField.SERVICE)) @property def protocol(self): """ Protocol for this entry :return: protocol (UDP/TCP/ICMP, etc) :rtype: str """ return self.cxn.get(str(LogField.PROTOCOL), 'ANY') @property def source_port(self): """ Source port for the entry. :rtype: int """ return int(self.cxn.get(str(LogField.SPORT), 0)) @property def dest_port(self): """ Destination port for the entry. :rtype: int """ return int(self.cxn.get(str(LogField.DPORT),0)) @property def state(self): """ State of the connection. :return: state, i.e. UDP established, TCP established, etc. :rtype: str """ return self.cxn.get(str(LogField.STATE)) def __str__(self): return '{}(src={},dst={},proto={},dst_port={},state={})'.format( self.__class__.__name__, self.source_addr, self.dest_addr, self.protocol, self.dest_port, self.state) def __repr__(self): return str(self)