Source code for smc.policy.rule_elements

from smc.base.model import Element, ElementCreator
from smc.base.structs import NestedDict
from smc.api.exceptions import ElementNotFound
from smc.base.util import element_resolver


[docs]class RuleElement(object): """ Rule Element encapsulates actions for source, destination and service fields. """ @property def is_any(self): """ Is the field set to any :rtype: bool """ return 'any' in self
[docs] def set_any(self): """ Set field to any """ self.clear() self.update({'any': True})
@property def is_none(self): """ Is the field set to none :rtype: bool """ return 'none' in self
[docs] def set_none(self): """ Set field to none """ self.clear() self.update({'none': True})
[docs] def add(self, data): """ Add a single entry to field. Entries can be added to a rule using the href of the element or by loading the element directly. Element should be of type :py:mod:`smc.elements.network`. After modifying rule, call :py:meth:`~.save`. Example of adding entry by element:: policy = FirewallPolicy('policy') for rule in policy.fw_ipv4_nat_rules.all(): if rule.name == 'therule': rule.sources.add(Host('myhost')) rule.save() .. note:: If submitting type Element and the element cannot be found, it will be skipped. :param data: entry to add :type data: Element or str """ if self.is_none or self.is_any: self.clear() self.data[self.typeof] = [] try: self.get(self.typeof).append(element_resolver(data)) except ElementNotFound: pass
[docs] def add_many(self, data): """ Add multiple entries to field. Entries should be list format. Entries can be of types relavant to the field type. For example, for source and destination fields, elements may be of type :py:mod:`smc.elements.network` or be the elements direct href, or a combination of both. Add several entries to existing rule:: policy = FirewallPolicy('policy') for rule in policy.fw_ipv4_nat_rules.all(): if rule.name == 'therule': rule.sources.add_many([Host('myhost'), 'http://1.1.1.1/hosts/12345']) rule.save() :param list data: list of sources .. note:: If submitting type Element and the element cannot be found, it will be skipped. """ assert isinstance(data, list), "Incorrect format. Expecting list." if self.is_none or self.is_any: self.clear() self.data[self.typeof] = [] data = element_resolver(data, do_raise=False) self.data[self.typeof] = data
def __eq__(self, other): if type(self) is type(other): if self.is_none or self.is_any: if self.is_none and other.is_none or self.is_any and other.is_any: return True return False if other.is_none or other.is_any: # Current sources not any or none return False if set(self.all_as_href()) == set(other.all_as_href()): return True return False def __ne__(self, other): return not self.__eq__(other)
[docs] def update_field(self, elements): """ Update the field with a list of provided values but only if the values are different. Return a boolean indicating whether a change was made indicating whether `save` should be called. If the field is currently set to any or none, then no comparison is made and field is updated. :param list elements: list of elements in href or Element format to compare to existing field :rtype: bool """ changed = False if isinstance(elements, list): if self.is_any or self.is_none: self.add_many(elements) changed = True else: _elements = element_resolver(elements, do_raise=False) if set(self.all_as_href()) ^ set(_elements): self.data[self.typeof] = _elements changed = True if changed and self.rule and (isinstance(self, (Source, Destination)) and \ self.rule.typeof in ('fw_ipv4_nat_rule', 'fw_ipv6_nat_rule')): # Modify NAT cell if necessary self.rule._update_nat_field(self) return changed
[docs] def all_as_href(self): """ Return all elements without resolving to :class:`smc.elements.network` or :class:`smc.elements.service`. Just raw representation as href. :return: elements in href form :rtype: list """ if not self.is_any and not self.is_none: return [element for element in self.get(self.typeof)] return []
[docs] def all(self): """ Return all destinations for this rule. Elements returned are of the object type for the given element for further introspection. Search the fields in rule:: for sources in rule.sources.all(): print('My source: %s' % sources) :return: elements by resolved object type :rtype: list(Element) """ if not self.is_any and not self.is_none: return [Element.from_href(href) for href in self.get(self.typeof)] return []
[docs]class Destination(RuleElement, NestedDict): """ Destination fields for a rule. """ typeof = 'dst' def __init__(self, rule=None): dests = dict(none=True) if not rule else \ rule.data.get('destinations') self.rule = rule super(Destination, self).__init__(data=dests)
[docs]class Source(RuleElement, NestedDict): """ Source fields for a rule """ typeof = 'src' def __init__(self, rule=None): sources = dict(none=True) if not rule else \ rule.data.get('sources') self.rule = rule super(Source, self).__init__(data=sources)
[docs]class Service(RuleElement, NestedDict): """ Service fields for a rule """ typeof = 'service' def __init__(self, rule=None): services = dict(none=True) if not rule else \ rule.data.get('services') self.rule = rule super(Service, self).__init__(data=services)
[docs]class Action(NestedDict): """ This represents the action associated with the rule. """ def __init__(self, rule=None): if rule is None: action = dict(action='allow') conn_tracking = ConnectionTracking() action.update(connection_tracking_options=conn_tracking.data) action.update(scan_detection='undefined') else: action = rule.data.get('action', {}) super(Action, self).__init__(data=action) @property def action(self): """ Action set for this rule :param str value: allow\|discard\|continue\|refuse\|jump\|apply_vpn \|enforce_vpn\|forward_vpn\|blacklist :rtype: str """ return self.get('action') @action.setter def action(self, value): self.update(action=value) @property def connection_tracking_options(self): """ Enables connection tracking. The firewall allows or discards packets according to the selected Connection Tracking mode. Reply packets are allowed as part of the allowed connection without an explicit Access rule. Protocols that use a dynamic port assignment must be allowed using a Service with the appropriate Protocol Agent for that protocol (in Access rules and NAT rules). :rtype: ConnectionTracking """ return ConnectionTracking(self) @property def decrypting(self): """ .. versionadded:: 0.6.0 Requires SMC version >= 6.3.3 Whether the decryption is enabled on this rule. :param bool value: True, False, None (inherit from continue rule) :rtype: bool """ return self.get('decrypting') @decrypting.setter def decrypting(self, value): self.update(decrypting=value) @property def deep_inspection(self): """ Selects traffic that matches this rule for checking against the Inspection Policy referenced by this policy. Traffic is inspected as the Protocol that is attached to the Service element in this rule. :param bool value: True, False, None (inherit from continue rule) :rtype: bool """ return self.get('deep_inspection') @deep_inspection.setter def deep_inspection(self, value): self.update(deep_inspection=value) @property def file_filtering(self): """ (IPv4 Only) Inspects matching traffic against the File Filtering policy. Selecting this option should also activates the Deep Inspection option. You can further adjust virus scanning in the Inspection Policy. :param bool value: True, False, None (inherit from continue rule) :rtype: bool """ return self.get('file_filtering') @file_filtering.setter def file_filtering(self, value): self.update(file_filtering=value) @property def dos_protection(self): """ Enable or disable DOS protection mode :param bool value: True, False, None (inherit from continue rule) :rtype: bool """ return self.get('dos_protection') @dos_protection.setter def dos_protection(self, value): self.update(dos_protection=value) @property def scan_detection(self): """ Enable or disable Scan Detection for traffic that matches the rule. This overrides the option set in the Engine properties. Enable scan detection on this rule:: for rule in policy.fw_ipv4_access_rules.all(): rule.action.scan_detection = 'on' :param str value: on\|off\|undefined :return: scan detection setting (on,off,undefined) :rtype: str """ return self.get('scan_detection') @scan_detection.setter def scan_detection(self, value): if value in ('on', 'off', 'undefined'): self.update(scan_detection=value) @property def sub_policy(self): """ Sub policy is used when ``action=jump``. :rtype: FirewallSubPolicy """ if 'sub_policy' in self: return Element.from_href(self.get('sub_policy')) @sub_policy.setter def sub_policy(self, value): self.update(sub_policy=element_resolver(value)) @property def user_response(self): """ Read-only user response setting """ return self.get('user_response') @property def vpn(self): """ Return vpn reference. Only used if 'enforce_vpn', 'apply_vpn', or 'forward_vpn' is the action type. :param PolicyVPN value: set the policy VPN for VPN action :rtype: PolicyVPN """ if 'vpn' in self: return Element.from_href(self.get('vpn')) @vpn.setter def vpn(self, value): self.update(vpn=element_resolver(value)) @property def mobile_vpn(self): """ Mobile VPN only applies to engines that support VPN and that have the action of 'enforce_vpn', 'apply_vpn' or 'forward_vpn' set. This will enable mobile VPN traffic on this VPN rule. :param boolean value: set mobile vpn on or off :rtype: boolean """ return self.get('mobile_vpn') @mobile_vpn.setter def mobile_vpn(self, value): self.update(mobile_vpn=value)
[docs]class ConnectionTracking(NestedDict): """ Connection tracking settings can be configured on a per rule basis to control settings such as enforced MSS and how to handle connection states. Configuring a rule to enable MSS and set connection state tracking to normal:: for rule in policy.fw_ipv4_access_rules.all(): rule.action.connection_tracking_options.mss_enforced = True rule.action.connection_tracking_options.state = 'normal' rule.action.connection_tracking_options.mss_enforced_min_max = (1400, 1450) rule.action.connection_tracking_options.sync_connections = True rule.save() """ def __init__(self, rule=None): if rule is None: ct = dict( mss_enforced=False, mss_enforced_max=0, mss_enforced_min=0, timeout=-1) else: ct = rule.data.get('connection_tracking_options', {}) super(ConnectionTracking, self).__init__(data=ct) @property def mss_enforced(self): """ Is MSS enforced :param bool value: True, False :return: bool """ return self.get('mss_enforced') @mss_enforced.setter def mss_enforced(self, value): self.update(mss_enforced=value) @property def mss_enforced_min_max(self): """ Allows entering the Minimum and Maximum value for the MSS in bytes. Headers are not included in the MSS value; MSS concerns only the payload portion of the packet. :param tuple int value: tuple containing (min, max) in bytes :return: (min, max) values :rtype: tuple """ return (self.get('mss_enforced_min'), self.get('mss_enforced_max')) @mss_enforced_min_max.setter def mss_enforced_min_max(self, value): if isinstance(value, tuple): minimum, maximum = value self.update(mss_enforced_min=minimum) self.update(mss_enforced_max=maximum) @property def state(self): """ Connection tracking mode. See Stonesoft documentation for more info. :param str value: no,loose,normal,strict :return: str """ return self.get('state') @state.setter def state(self, value): self.update(state=value) @property def timeout(self): """ The timeout (in seconds) after which inactive connections are closed. This timeout only concerns idle connections. Connections are not cut because of timeouts while the hosts are still communicating. :param int value: time in seconds :return: int """ return self.get('timeout') @timeout.setter def timeout(self, value): """ Set the idle timeout for connections in seconds :param int value: idle connection timeout """ self.update(timeout=value) @property def sync_connections(self): """ Are sync connections enabled for this engine. If None, then this is set to inherit from a continue rule. :return True, False, None (inherit from continue rule) """ return self.get('sync_connections') @sync_connections.setter def sync_connections(self, value): self.update(sync_connections=value)
[docs]class LogOptions(NestedDict): """ Log Options represent the settings related to per rule logging. Example of obtaining a rule reference and turning logging on for a particular rule:: policy = FirewallPolicy('smcpython') for rule in policy.fw_ipv4_access_rules.all(): if rule.name == 'foo': rule.options.log_accounting_info_mode = True rule.options.log_level = 'stored' rule.options.application_logging = 'enforced' rule.options.user_logging = 'enforced' rule.save() """ def __init__(self, rule=None): if rule is None: logopts = dict( log_accounting_info_mode=False, log_closing_mode=True, log_level='undefined', log_payload_additionnal=False, log_payload_excerpt=False, log_payload_record=False, log_severity=-1) else: logopts = rule.data.get('options', {}) super(LogOptions, self).__init__(data=logopts) @property def application_logging(self): """ Stores information about Application use. You can log spplication use even if you do not use Applications for access control. :param str value: off\|default\|enforced :return: str """ return self.get('application_logging') @application_logging.setter def application_logging(self, value): if value in ('off', 'default', 'enforced'): self.update(application_logging=value) @property def log_accounting_info_mode(self): """ Both connection opening and closing are logged and information on the volume of traffic is collected. This option is not available for rules that issue alerts. If you want to create reports that are based on traffic volume, you must select this option for all rules that allow traffic that you want to include in the reports. :param bool value: log accounting information (bits/bytes transferred) :return: bool """ return self.get('log_accounting_info_mode') @log_accounting_info_mode.setter def log_accounting_info_mode(self, value): self.update(log_accounting_info_mode=value) @property def log_closing_mode(self): """ Specifying False means no log entries are created when connections are closed. True will mean both connection opening and closing are logged, but no information is collected on the volume of traffic. :param bool value: enable/disable accounting data :return: bool """ return self.get('log_closing_mode') @log_closing_mode.setter def log_closing_mode(self, value): self.update(log_closing_mode=value) @property def log_level(self): """ Configure per rule logging. It is recommended to configure an Any/Any/Any/Continue rule in position 1 if global logging is required. This can be used to override any global logging setting. :param str value: none\|stored\|transient\|essential\|alert\|undefined :return: str """ return self.get('log_level') @log_level.setter def log_level(self, value): if value in ('none', 'stored', 'transient', 'essential', 'alert'): self.update(log_level=value) if not self.log_accounting_info_mode: self.log_accounting_info_mode = True @property def log_payload_additional(self): """ Stores packet payload extracted from the traffic. The collected payload provides information for some of the additional log fields depending on the type of traffic. :param bool value: True, False :return: bool """ return self.get('log_payload_additionnal') @log_payload_additional.setter def log_payload_additional(self, value): self.update(log_payload_additionnal=value) @property def log_payload_excerpt(self): """ Stores an excerpt of the packet that matched. The maximum recorded excerpt size is 4 KB. This allows quick viewing of the payload in the logs view. :param bool value: collect excerpt or not :return: bool """ return self.get('log_payload_excerpt') @log_payload_excerpt.setter def log_payload_excerpt(self, value): self.update(log_payload_excerpt=value) @property def log_payload_record(self): """ Records the traffic up to the limit that is set in the Record Length field. :param bool value: True, False :return: bool """ return self.get('log_payload_record') @log_payload_record.setter def log_payload_record(self, value): self.update(log_payload_record=value) @property def log_severity(self): """ Read only log severity level :return: str """ return self.get('log_severity') @property def user_logging(self): """ Stores information about Users when they are used as the Source or Destination of an Access rule. You must select this option if you want Users to be referenced by name in log entries, statistics, reports, and user monitoring. Otherwise, only the IP address associated with the User at the time the log was created is stored. :param str value: off\|default\|enforced :return: str """ return self.get('user_logging') @user_logging.setter def user_logging(self, value): if value in ('off', 'default', 'enforced'): self.update(user_logging=value)
[docs]class AuthenticationOptions(NestedDict): """ Authentication options are set on a per rule basis and dictate whether a user requires identification to match. """ def __init__(self, rule=None): if rule is None: auth = dict( methods=[], require_auth=False, users=[]) else: auth = rule.data.get('authentication_options', {}) super(AuthenticationOptions, self).__init__(data=auth) def __eq__(self, other): if isinstance(other, AuthenticationOptions): if self.require_auth != other.require_auth: return False for values in ('users', 'methods'): if set(self.data.get(values, [])) != \ set(other.data.get(values, [])): return False return True return False def __ne__(self, other): return not self.__eq__(other) @property def methods(self): """ Read only authentication methods enabled :return: list value: auth methods enabled """ return [Element.from_href(method) for method in self.get('methods')] @property def require_auth(self): """ Ready only authentication required :return: boolean """ return self.get('require_auth') @property def timeout(self): """ Timeout between authentications :return: int """ return self.get('timeout') @property def users(self): """ List of users required to authenticate :return: list """ return [Element.from_href(user) for user in self.get('users', [])]
class TimeRange(object): """ Represents a time range setting for a given rule. Time ranges can currently be set up to support rules based on starting month and ending month. At that time the rule will be disabled automatically. """ def __init__(self, data): self.data = data @property def day_ranges(self): """ Not Yet Implemented """ pass @property def month_range_start(self): """ Starting month for rule validity. Use this with month_range_end. :param str jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec """ return self.data.get('month_range_start') @month_range_start.setter def month_range_start(self, value): self.data['month_range_start'] = value @property def month_range_end(self): """ Set month end range. Use this with month_range_start. :param str jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec """ return self.data.get('month_range_end') @month_range_end.setter def month_range_end(self, value): self.data['month_range_end'] = value
[docs]class MatchExpression(Element): """ A match expression is used in the source / destination / service fields to group together elements into an 'AND'ed configuration. For example, a normal rule might have a source field that could include network=172.18.1.0/24 and zone=Internal objects. A match expression enables you to AND these elements together to enforce the match requires both. Logically it would be represented as (network 172.18.1.0/24 AND zone Internal). >>> from smc.elements.network import Host, Zone >>> from smc.policy.rule_elements import MatchExpression >>> from smc.policy.layer3 import FirewallPolicy >>> match = MatchExpression.create(name='mymatch', network_element=Host('kali'), zone=Zone('Mail')) >>> policy = FirewallPolicy('smcpython') >>> policy.fw_ipv4_access_rules.create(name='myrule', sources=[match], destinations='any', services='any') 'http://172.18.1.150:8082/6.2/elements/fw_policy/261/fw_ipv4_access_rule/2099740' >>> rule = policy.search_rule('myrule') ... >>> for source in rule[0].sources.all(): ... print(source, source.values()) ... MatchExpression(name=MatchExpression _1491760686976_2) [Zone(name=Mail), Host(name=kali)] .. note:: MatchExpression is currently only supported on source and destination fields. """ typeof = 'match_expression'
[docs] @classmethod def create(cls, name, user=None, network_element=None, domain_name=None, zone=None, executable=None): """ Create a match expression :param str name: name of match expression :param str user: name of user or user group :param Element network_element: valid network element type, i.e. host, network, etc :param DomainName domain_name: domain name network element :param Zone zone: zone to use :param str executable: name of executable or group :raises ElementNotFound: specified object does not exist :return: instance with meta :rtype: MatchExpression """ ref_list = [] if user: pass if network_element: ref_list.append(network_element.href) if domain_name: ref_list.append(domain_name.href) if zone: ref_list.append(zone.href) if executable: pass json = {'name': name, 'ref': ref_list} return ElementCreator(cls, json)
def values(self): return [Element.from_href(ref) for ref in self.data.get('ref')]