"""
Filters are used by queries to refine how results are returned.
QueryFilter is the top level 'interface' for all filter types. The ``filter``
attribute of a QueryFilter provides access to the compiled query string used
to build the filter. Each QueryFilter also has an ``update_filter`` method
that can be used to swap new filters in and out of an existing query.
Filters can be added to queries using the add_XXX methods of the query, or by
building the filters and adding to the query using query.update_filter(). Filters
can be swapped in and out of a query.
Examples:
Build a query to return all records of alert severity high or critical::
query = LogQuery(fetch_size=50)
query.add_in_filter(
FieldValue(LogField.ALERTSEVERITY), [ConstantValue(Alerts.HIGH, Alerts.CRITICAL)])
If you prefer building your filters individually, it is not required to call the
add_XX_filter methods of the query. You can also insert filters by building the filter
and calling the ``update_filter`` method on the query::
query = LogQuery(fetch_size=50)
query.update_filter(
InFilter(FieldValue(LogField.SERVICE), [ServiceValue('UDP/53', 'TCP/80')])
You can also replace existing query filters with new filters to re-use the base level
query parameters such as fetch_size, format style, time/date ranges, etc.
Replace the existing query filter with a different filter::
new_filter = InFilter(FieldValue(LogField.SERVICE), [ServiceValue('UDP/53', 'TCP/80')])
query.update_filter(new_filter)
.. note:: it is also possible to update a filter by calling query.add_XX_filter methods
multiple times. Each time will replace an existing filter if it exists.
For example, calling add_XX_filter methods multiple times to refine filter results::
query = LogQuery(fetch_size=50)
query.add_in_filter( # First filter query - look for alert severity high and critical
FieldValue(LogField.ALERTSEVERITY), [ConstantValue(Alerts.HIGH, Alerts.CRITICAL)])
query.add_and_filter([ # Change filter to AND filter for further granularity
InFilter(FieldValue(LogField.ALERTSEVERITY), [ConstantValue(Alerts.HIGH, Alerts.CRITICAL)]),
InFilter(FieldValue(LogField.SRC), [IPValue('192.168.4.84')])])
"""
class QueryFilter(object):
def __init__(self, filter_type):
self.filter = {
'type': filter_type}
def update_filter(self, value):
self.filter.update(value=value)
[docs]class InFilter(QueryFilter):
"""
InFilter's are made up of two parts, a left and a right. An InFilter
is considered a match if evaluation of the left part is equivalent to
one of the elements of the right part. The left part of an InFilter
is made up of a target of type :class:`smc.monitoring.values.Value`.
The right part is made up of a list of the same type.
Search the Source field for IP addresses 192.168.4.84 or 10.0.0.252::
query = LogQuery(fetch_size=50)
query.add_in_filter(
FieldValue(LogField.SRC), [IPValue('192.168.4.84', '10.0.0.252')])
Reverse the logic and search for IP address 192.168.4.84 in source and
dest log fields::
query = LogQuery(fetch_size=50)
query.add_in_filter(
IPValue('192.168.4.84'), [FieldValue(LogField.SRC, LogField.DST)])
InFilter's are one of the most common filters and are often added to AND, OR
or NOT filters for more specific matching.
:param left: single value for leftmost portion of filter
:type left: Values: any value type in :py:mod:`smc_monitoring.models.values`
:param right: list of values for rightmost portion of filter
:type right: list(Values): any value type in :py:mod:`smc_monitoring.models.values`
"""
def __init__(self, left, right):
super(InFilter, self).__init__('in')
self.update_filter(left, right)
def update_filter(self, left_filter, right_filter):
right_side = []
for filters in right_filter:
right_side.extend(filters.value)
self.filter.update(
left=left_filter.value[0],
right=right_side)
[docs]class AndFilter(QueryFilter):
"""
An AND filter combines other filter types and requires that each filter
matches. An AND filter is a collection of QueryFilter's, typically IN
or NOT filters that are AND'd together.
Example of fetching 50 records for sources matching '192.168.4.84' and
a service of 'TCP/80'::
query = LogQuery(fetch_size=50)
query.add_and_filter([
InFilter(FieldValue(LogField.SRC), [IPValue('192.168.4.84')]),
InFilter(FieldValue(LogField.SERVICE), [ServiceValue('TCP/80')])])
:param QueryFilter filters: Any filter type in :py:mod:`smc.monitoring.filters`.
:type filters: list or tuple
"""
def __init__(self, *filters):
super(AndFilter, self).__init__('and')
if filters:
self.update_filter(*filters)
def update_filter(self, filters):
self.filter.update(
values=[value.filter for value in filters])
[docs]class OrFilter(QueryFilter):
"""
An OR filter matches if any of the combined filters match. An OR filter
is a collection of QueryFilter's, typically IN or NOT filters that are
OR'd together.
Example of fetching 50 records for sources matching '192.168.4.84' or
a service of 'TCP/80'::
query = LogQuery(fetch_size=50)
query.add_or_filter([
InFilter(FieldValue(LogField.SRC), [IPValue('192.168.4.84')]),
InFilter(FieldValue(LogField.SERVICE), [ServiceValue('TCP/80')])])
:param QueryFilter filters: Any filter type in :py:mod:`smc.monitoring.filters`.
:type filters: list or tuple
"""
def __init__(self, *filters):
super(OrFilter, self).__init__('or')
if filters:
self.update_filter(*filters)
def update_filter(self, filters):
self.filter.update(
values=[value.filter for value in filters])
[docs]class NotFilter(QueryFilter):
"""
A NOT filter provides the ability to suppress auditing based on a specific
filter. A NOT filter is typically added to an AND filter to remove unwanted
entries from the response.
Use only a NOT filter to a query and to ignore DNS traffic::
query = LogQuery(fetch_size=50)
query.add_not_filter(
[InFilter(FieldValue(LogField.SERVICE), [ServiceValue('UDP/53')])])
The above example by itself is not overly useful, however you can use NOT
filters with AND filters to achieve a logic like "Find source IP 192.168.4.68
and not service UDP/53 or TCP/80"::
query = LogQuery(fetch_size=50)
not_dns = NotFilter(
[InFilter(FieldValue(LogField.SERVICE), [ServiceValue('UDP/53', 'TCP/80')])])
by_ip = InFilter(
FieldValue(LogField.SRC), [IPValue('172.18.1.20')])
query.add_and_filter([not_dns, by_ip])
:param QueryFilter filters: Any filter type in :py:mod:`smc.monitoring.filters`.
:type filters: list or tuple
"""
def __init__(self, *filters):
super(NotFilter, self).__init__('not')
if filters:
self.update_filter(*filters)
def update_filter(self, filters):
self.filter.update(
value=filters[0].filter)
[docs]class DefinedFilter(QueryFilter):
"""
A Defined Filter applied to a query will only match if the value
specified has a value in the audit record/s.
Show only records that have a defined Action (read as 'match if action
has a value')::
query = LogQuery(fetch_size=50)
query.add_defined_filter(FieldValue(LogField.ACTION))
DefinedFilter's can be used in AND, OR or NOT filter queries as well.
Fetch the most recent 50 records for source 192.168.4.84 that have
an application defined::
query = LogQuery(fetch_size=50)
query.add_and_filter([
DefinedFilter(FieldValue(LogField.IPSAPPID)),
InFilter(FieldValue(LogField.SRC), [IPValue('192.168.4.84')])])
:param Value values: single value type to require on filter
"""
def __init__(self, value=None):
super(DefinedFilter, self).__init__('defined')
if value is not None:
self.update_filter(value)
def update_filter(self, value):
self.filter.update(
value=value.value[0])
[docs]class CSLikeFilter(QueryFilter):
"""
A CSLikeFilter is a case sensitive LIKE string match filter.
"""
def __init__(self):
super(CSLikeFilter, self).__init__('cs_like')
pass
[docs]class CILikeFilter(QueryFilter):
"""
A CILikeFilter is a case insensitive LIKE string match filter.
"""
def __init__(self):
super(CILikeFilter, self).__init__('cs_like')
pass
[docs]class TranslatedFilter(QueryFilter):
"""
Translated filters use the SMC internal name alias and builds expressions
to make more complex queries.
Example of using built in filter methods::
query = LogQuery(fetch_size=50)
query.format.timezone('CST')
query.format.field_format('name')
translated_filter = query.add_translated_filter()
translated_filter.within_ipv4_network('$Dst', ['192.168.4.0/24'])
translated_filter.within_ipv4_range('$Src', ['1.1.1.1-192.168.1.254'])
translated_filter.exact_ipv4_match('$Src', ['172.18.1.152', '192.168.4.84'])
"""
def __init__(self):
super(TranslatedFilter, self).__init__('translated')
[docs] def within_ipv4_network(self, field, values):
"""
This filter adds specified networks to a filter to check
for inclusion.
:param str field: name of field to filter on. Taken from 'Show Filter
Expression' within SMC.
:param list values: network definitions, in cidr format, i.e: 1.1.1.0/24.
"""
v = ['ipv4_net("%s")' % net for net in values]
self.update_filter('{} IN union({})'.format(field, ','.join(v)))
[docs] def within_ipv4_range(self, field, values):
"""
Add an IP range network filter for relevant address fields.
Range (between) filters allow only one range be provided.
:param str field: name of field to filter on. Taken from 'Show Filter
Expression' within SMC.
:param list values: IP range values. Values would be a list of IP's
separated by a '-', i.e. ['1.1.1.1-1.1.1.254']
"""
v = ['ipv4("%s")' % part
for iprange in values
for part in iprange.split('-')]
self.update_filter('{} IN range({})'.format(field, ','.join(v)))
[docs] def exact_ipv4_match(self, field, values):
"""
An exact IPv4 address match on relevant address fields.
:param str field: name of field to filter on. Taken from 'Show Filter
Expression' within SMC.
:param list values: value/s to add. If more than a single value is
provided, the query is modified to use UNION vs. ==
:param bool complex: A complex filter is one which requires AND'ing
or OR'ing values. Set to return the filter before committing.
"""
if len(values) > 1:
v = ['ipv4("%s")' % ip for ip in values]
value='{} IN union({})'.format(field, ','.join(v))
else:
value='{} == ipv4("{}")'.format(field, values[0])
self.update_filter(value)