Source code for smc_monitoring.monitors.blacklist

"""
Blacklist Query provides the ability to view current blacklist entries in the
SMC by target. Target is defined as the cluster or engine. Retrieved results
will have a reference to the entry and hence be possible to remove the entry.
::
    
    query = BlacklistQuery('sg_vm')
    query.format.timezone('CST')

Optionally add an "InFilter" to restrict search to a specific field::
 
    query.add_in_filter(
        FieldValue(LogField.BLACKLISTENTRYSOURCEIP), [IPValue('2.2.2.2')])

An InFilter can also use a network based syntax::

    query.add_in_filter(
        FieldValue(LogField.BLACKLISTENTRYSOURCEIP), [IPValue('2.2.2.0/24')])

Or combine filters using "AndFilter" or "OrFilter". Find an entry with
source IP 2.2.2.2 OR 2.2.2.5::

    ip1 = InFilter(FieldValue(LogField.BLACKLISTENTRYSOURCEIP), [IPValue('2.2.2.2')])
    ip2 = InFilter(FieldValue(LogField.BLACKLISTENTRYSOURCEIP), [IPValue('2.2.2.5')])
    query.add_or_filter([in_filter, or_filter])

Get the results of the query in the default TableFormat::

    for entry in query.fetch_batch():
        print(entry)

Delete any blacklist entries with a source IP within a network range of 3.3.3.0/24::
    
        query = BlacklistQuery('sg_vm')
        query.add_in_filter(
            FieldValue(LogField.BLACKLISTENTRYSOURCEIP), [IPValue('3.3.3.0/24')])
        
        for record in query.fetch_as_element():    # <-- must get as element to obtain delete() method
            record.delete()
        
.. seealso:: :class:`smc_monitoring.models.filters` for more information on creating filters
        
"""
from smc_monitoring.models.query import Query
from smc_monitoring.models.formats import TextFormat, CombinedFormat,\
    DetailedFormat
from smc_monitoring.models.constants import LogField
from smc.base.model import prepared_request
from smc.api.exceptions import DeleteElementFailed


[docs]class BlacklistQuery(Query): """ Query existing blacklist entries for a given cluster/engine. It is generally recommended to set your local timezone when making a query to convert the timestamp into a relevant format. :param str target: NAME of the engine or cluster :param str timezone: timezone for timestamps. .. note:: Timezone can be in the following formats: 'US/Eastern', 'PST', 'Europe/Helsinki'. More example time zone formats are available in the SMC Log Viewer -> Settings. """ location = '/monitoring/session/socket' field_ids = [ LogField.TIMESTAMP, LogField.BLACKLISTER, LogField.BLACKLISTENTRYSOURCEIP, LogField.BLACKLISTENTRYDESTINATIONIP, LogField.PROTOCOL, LogField.BLACKLISTENTRYDURATION, LogField.NODEID, LogField.SENDERDOMAIN, LogField.BLACKLISTENTRYID] def __init__(self, target, timezone=None, **kw): super(BlacklistQuery, self).__init__('BLACKLIST', target, **kw) if timezone is not None: self.format.set_resolving(timezone=timezone)
[docs] def fetch_as_element(self, **kw): """ Fetch the blacklist and return as an instance of Element. :return: generator returning element instances :rtype: BlacklistEntry """ clone = self.copy() # Replace all filters with a combined filter bldata = TextFormat(field_format='name') # Preserve resolving fields for new filter if 'resolving' in self.format.data: bldata.set_resolving(**self.format.data['resolving']) # Resolve the entry ID to match SMC blid = TextFormat(field_format='pretty') blid.field_ids([LogField.BLACKLISTENTRYID]) combined = CombinedFormat(bldata=bldata, blid=blid) clone.update_format(combined) for list_of_results in clone.fetch_raw(**kw): for entry in list_of_results: data = entry.get('bldata') data.update(**entry.get('blid')) yield BlacklistEntry(**data)
[docs]class BlacklistEntry(object): """ A blacklist entry represents an entry in the engines kernel table indicating that a source/destination/port/protocol mapping is currently being blocked by the engine. To remove a blacklist entry from an engine, retrieve all entries as element and remove the entry of interest by called ``delete`` on the element. The simplest way to use search filters with a blacklist entry is to examine the BlacklistQuery ``field_ids`` and use these constant fields as InFilter definitions on the query. """ def __init__(self, **kw): self.blacklist = kw @property def blacklist_id(self): """ Blacklist entry ID. Useful if you want to locate the entry within the SMC UI. :rtype: str """ return self.blacklist.get('Blacklist Entry ID') @property def timestamp(self): """ Timestamp when this blacklist entry was added. :rtype: str """ return self.blacklist.get('Timestamp') @property def engine(self): """ The engine for this blacklist entry. :rtype: str """ return self.blacklist.get('NodeId') @property def href(self): """ The href for this blacklist entry. This is the reference to the entry for deleting the entry. :rtype: str """ return self.blacklist.get('blacklist_href') @property def source(self): """ Source address/netmask for this blacklist entry. :rtype: str """ return '{}/{}'.format( self.blacklist.get('BlacklistEntrySourceIp'), self.blacklist.get('BlacklistEntrySourceIpPrefixlen')) @property def destination(self): """ Destination network/netmask for this blacklist entry. :rtype: str """ return '{}/{}'.format( self.blacklist.get('BlacklistEntryDestinationIp'), self.blacklist.get('BlacklistEntryDestinationIpPrefixlen')) @property def protocol(self): """ Specified protocol for the blacklist entry. If none is specified, 'ANY' is returned. :rtype: str """ proto = self.blacklist.get('BlacklistEntryProtocol') if proto is None: return 'ANY' return proto @property def source_ports(self): """ Source ports for this blacklist entry. If no ports are specified (i.e. ALL ports), 'ANY' is returned. :rtype: str """ start_port = self.blacklist.get('BlacklistEntrySourcePort') if start_port is not None: return '{}-{}'.format( start_port, self.blacklist.get('BlacklistEntrySourcePortRange')) return 'ANY' @property def dest_ports(self): """ Destination ports for this blacklist entry. If no ports are specified, 'ANY' is returned. :rtype: str """ start_port = self.blacklist.get('BlacklistEntryDestinationPort') if start_port is not None: return '{}-{}'.format( start_port, self.blacklist.get('BlacklistEntryDestinationPortRange')) return 'ANY' @property def duration(self): """ Duration for the blacklist entry. :rtype: int """ return int(self.blacklist.get('BlacklistEntryDuration'))
[docs] def delete(self): """ Delete the entry from the engine where the entry is applied. :raises: DeleteElementFailed :return: None """ return prepared_request( DeleteElementFailed, href=self.href).delete()
def __str__(self): return '{0}(id={1},src={2},dst={3})'.format( self.__class__.__name__, self.blacklist_id, self.source, self.destination) def __repr__(self): return str(self)