"""
Rule module is a base class for all access control and NAT rules.
::
Policy (base)
|
FirewallPolicy ------------> fw_ipv4_access_rules
|
|
IPv4Rule / IPv4NATRule (smc.policy.rule.Rule)
| |
------------
|
name
comment
sources (smc.policy.rule_elements.Source)
destinations (smc.policy.rule_elements.Destination)
services (smc.policy.rule_elements.Service)
action (smc.policy.rule_elements.Action)
authentication_options (smc.policy.rule_elements.AuthenticationOptions)
is_disabled
disable
enable
options (smc.policy.rule_elements.LogOptions)
parent_policy
tag
...
Examples of rule operations::
>>> from smc.policy.layer3 import FirewallPolicy
>>> from smc.policy.rule_elements import LogOptions
>>> from smc.policy.rule_elements import Action
>>> from smc.elements.other import Alias
...
>>> options = LogOptions()
>>> options.log_accounting_info_mode=True
>>> options.log_level='stored'
...
>>> policy = FirewallPolicy('AWS_Default')
>>> options = LogOptions()
>>> options.log_accounting_info_mode=True
>>> options.log_level='stored'
>>> policy.fw_ipv4_access_rules.create(name='mylogrule',services='any',sources='any',destinations='any',actions='continue',log_options=options)
'http://172.18.1.150:8082/6.2/elements/fw_policy/272/fw_ipv4_access_rule/2099703'
...
>>> actions = Action()
>>> actions.deep_inspection = True
>>> actions.file_filtering=False
...
>>> policy.fw_ipv4_access_rules.create(name='outbound',sources=[Alias('$$ Interface ID 1.net')],destinations='any',services='any',action=actions,log_options=options)
'http://172.18.1.150:8082/6.2/elements/fw_policy/272/fw_ipv4_access_rule/2099704'
>>> for rule in policy.fw_ipv4_access_rules.all():
... print(rule)
...
IPv4Rule(name=outbound)
IPv4Rule(name=mylogrule)
...
>>> policy.search_rule('outbound')
[IPv4Rule(name=outbound)]
...
>>> policy.fw_ipv4_access_rules.create(name='discard at bottom', sources='any',destinations='any',services='any',action='discard',add_pos=50)
'http://172.18.1.150:8082/6.2/elements/fw_policy/272/fw_ipv4_access_rule/2099705'
>>> for rule in policy.fw_ipv4_access_rules.all():
... print(rule, rule.name, rule.action.action)
...
IPv4Rule(name=outbound) outbound allow
IPv4Rule(name=mylogrule) mylogrule allow
IPv4Rule(name=discard at bottom) discard at bottom discard
"""
from smc.base.model import Element, SubElement, ElementCreator
from smc.elements.other import LogicalInterface
from smc.api.exceptions import ElementNotFound, MissingRequiredInput,\
CreateRuleFailed, PolicyCommandFailed
from smc.policy.rule_elements import Action, LogOptions, Destination, Source,\
Service, AuthenticationOptions, TimeRange
from smc.base.util import element_resolver
from smc.base.decorators import cacheable_resource
[docs]class Rule(object):
"""
Top level rule construct with methods required to modify common
behavior of any rule types. To retrieve a rule, access by reference::
policy = FirewallPolicy('mypolicy')
for rule in policy.fw_ipv4_nat_rules.all():
print(rule.name, rule.comment, rule.is_disabled)
"""
@property
def name(self):
"""
Name attribute of rule element
"""
return self._meta.name if self._meta.name else \
'Rule @%s' % self.tag
[docs] def move_rule_after(self, other_rule):
"""
Add this rule after another. This process will make a copy of
the existing rule and add after the specified rule. If this
raises an exception, processing is stopped. Otherwise the original
rule is then deleted.
You must re-retrieve the new element after running this operation
as new references will be created.
:param other_rule Rule: rule where this rule will be positioned after
:raises CreateRuleFailed: failed to duplicate this rule, no move
is made
"""
self.make_request(
CreateRuleFailed,
href=other_rule.get_relation('add_after'),
method='create',
json=self)
self.delete()
[docs] def move_rule_before(self, other_rule):
"""
Move this rule after another. This process will make a copy of
the existing rule and add after the specified rule. If this
raises an exception, processing is stopped. Otherwise the original
rule is then deleted.
You must re-retrieve the new element after running this operation
as new references will be created.
:param other_rule Rule: rule where this rule will be positioned before
:raises CreateRuleFailed: failed to duplicate this rule, no move
is made
"""
self.make_request(
CreateRuleFailed,
href=other_rule.get_relation('add_before'),
method='create',
json=self)
self.delete()
@cacheable_resource
def action(self):
"""
Action for this rule.
:rtype: Action
"""
return Action(self)
@cacheable_resource
def authentication_options(self):
"""
Read only authentication options field
:rtype: AuthenticationOptions
"""
return AuthenticationOptions(self)
@property
def comment(self):
"""
Optional comment for this rule.
:param str value: string comment
:rtype: str
"""
return self.data.get('comment')
@comment.setter
def comment(self, value):
self.data['comment'] = value
@property
def is_rule_section(self):
"""
Is this rule considered a rule section
:rtype: bool
"""
return not any(field for field in ('sources', 'destinations')
if field in self.data)
@property
def is_disabled(self):
"""
Whether the rule is enabled or disabled
:param bool value: True, False
:rtype: bool
"""
return self.data.get('is_disabled')
[docs] def disable(self):
"""
Disable this rule
"""
self.data['is_disabled'] = True
[docs] def enable(self):
"""
Enable this rule
"""
self.data['is_disabled'] = False
@cacheable_resource
def sources(self):
"""
Sources assigned to this rule
:rtype: Source
"""
return Source(self)
@cacheable_resource
def destinations(self):
"""
Destinations for this rule
:rtype: Destination
"""
return Destination(self)
@cacheable_resource
def services(self):
"""
Services assigned to this rule
:rtype: Service
"""
return Service(self)
@cacheable_resource
def options(self):
"""
Rule based options for logging. Enabling and
disabled specific log settings.
:rtype: LogOptions
"""
return LogOptions(self)
@property
def parent_policy(self):
"""
Read-only name of the parent policy
:return: :class:`smc.base.model.Element` of type policy
"""
return Element.from_href(self.data.get('parent_policy'))
[docs] def save(self):
"""
After making changes to a rule element, you must call save
to apply the changes. Rule changes are made to cache before
sending to SMC.
:raises PolicyCommandFailed: failed to save with reason
:return: None
"""
self.update(PolicyCommandFailed)
try:
del self._cache
except AttributeError:
pass
@property
def tag(self):
"""
Value of rule tag. Read only.
:return: rule tag
:rtype: str
"""
return self.data.get('tag')
#@property
# def time_range(self):
# """
# Time range/s assigned to this rule. May be None if
# no time range configured.
# :return: :py:class:`smc.policy.rule_elements.TimeRange`
# """
# time_range = self.data.get('time_range')
# if time_range:
# return TimeRange(self.data.get('time_range'))
class RuleCommon(object):
"""
Functionality common to all rules
"""
def create_rule_section(self, name, add_pos=None, after=None, before=None):
"""
Create a rule section in a Firewall Policy. To specify a specific numbering
position for the rule section, use the `add_pos` field. If no position or
before/after is specified, the rule section will be placed at the top which
will encapsulate all rules below.
Create a rule section for the relavant policy::
policy = FirewallPolicy('mypolicy')
policy.fw_ipv4_access_rules.create_rule_section(name='attop')
# For NAT rules
policy.fw_ipv4_nat_rules.create_rule_section(name='mysection', add_pos=5)
:param str name: create a rule section by name
:param int add_pos: position to insert the rule, starting with position 1.
If the position value is greater than the number of rules, the rule is
inserted at the bottom. If add_pos is not provided, rule is inserted in
position 1. Mutually exclusive with ``after`` and ``before`` params.
:param str after: Rule tag to add this rule after. Mutually exclusive with
``add_pos`` and ``before`` params.
:param str before: Rule tag to add this rule before. Mutually exclusive with
``add_pos`` and ``after`` params.
:raises MissingRequiredInput: when options are specified the need additional
setting, i.e. use_vpn action requires a vpn policy be specified.
:raises CreateRuleFailed: rule creation failure
:return: the created ipv4 rule
:rtype: IPv4Rule
"""
href = self.href
params = None
if add_pos is not None:
href = self.add_at_position(add_pos)
elif before or after:
params = self.add_before_after(before, after)
return ElementCreator(
self.__class__,
exception=CreateRuleFailed,
href=href,
params=params,
json={'comment': name})
def add_at_position(self, pos):
if pos <= 0:
pos = 1
rules = self.make_request(href=self.href)
if rules:
if len(rules) >= pos: # Position somewhere in the list
for position, entry in enumerate(rules):
if position + 1 == pos:
return self.__class__(**entry).get_relation('add_before')
else: # Put at the end
last_rule = rules.pop()
return self.__class__(**last_rule).get_relation('add_after')
return self.href
def add_before_after(self, before=None, after=None):
params = None
if after is not None:
params = {'after': after}
elif before is not None:
params = {'before': before}
return params
def update_targets(self, sources, destinations, services):
source = Source()
destination = Destination()
service = Service()
if sources is not None:
if isinstance(sources, str) and sources.lower() == 'any':
source.set_any()
else:
source.add_many(sources)
else:
source.set_none()
if destinations is not None:
if isinstance(destinations, str) and destinations.lower() == 'any':
destination.set_any()
else:
destination.add_many(destinations)
else:
destination.set_none()
if services is not None:
if isinstance(services, str) and services.lower() == 'any':
service.set_any()
else:
service.add_many(services)
else:
service.set_none()
e = {}
#e.update(source())
e.update(sources=source.data)
e.update(destinations=destination.data)
e.update(services=service.data)
return e
def update_logical_if(self, logical_interfaces):
e = {}
if logical_interfaces is None:
e.update(logical_interfaces={'any': True})
else:
try:
logicals = []
for interface in logical_interfaces:
logicals.append(LogicalInterface(interface).href)
e.update(logical_interfaces={
'logical_interface': logicals})
except ElementNotFound:
raise MissingRequiredInput(
'Cannot find Logical interface specified '
': {}'.format(logical_interfaces))
return e
[docs]class IPv4Rule(RuleCommon, Rule, SubElement):
"""
Represents an IPv4 Rule for a layer 3 engine.
Create a rule::
policy = FirewallPolicy('mypolicy')
policy.fw_ipv4_access_rules.create(name='smcpython',
sources='any',
destinations='any',
services='any')
Sources and Destinations can be one of any valid network element types defined
in :py:class:`smc.elements.network`.
Source entries by href::
sources=['http://1.1.1.1:8082/elements/network/myelement',
'http://1.1.1.1:8082/elements/host/myhost'], etc
Source entries using network elements::
sources=[Host('myhost'), Network('thenetwork'), AddressRange('range')]
Services have a similar syntax and can take any type of :py:class:`smc.elements.service`
or the element href or both::
services=[TCPService('myservice'),
'http://1.1.1.1/8082/elements/tcp_service/mytcpservice',
'http://1.1.1.1/8082/elements/udp_server/myudpservice'], etc
You can obtain services and href for the elements by using the
:py:class:`smc.base.collection` collections::
>>> services = list(TCPService.objects.filter('80'))
>>> for service in services:
... print(service, service.href)
...
(TCPService(name=tcp80443), u'http://172.18.1.150:8082/6.1/elements/tcp_service/3535')
(TCPService(name=HTTP to Web SaaS), u'http://172.18.1.150:8082/6.1/elements/tcp_service/589')
(TCPService(name=HTTP), u'http://172.18.1.150:8082/6.1/elements/tcp_service/440')
Services by application (get all facebook applications)::
>>> applications = Search.objects.entry_point('application_situation').filter('facebook')
>>> print(list(applications))
[ApplicationSituation(name=Facebook-Plugins-Share-Button), ApplicationSituation(name=Facebook-Plugins]
...
Sources / Destinations and Services can also take the string value 'any' to
allow all. For example::
sources='any'
"""
typeof = 'fw_ipv4_access_rule'
_actions = ('allow', 'discard', 'continue', 'refuse', 'jump',
'apply_vpn', 'enforce_vpn', 'forward_vpn', 'blacklist')
[docs] def create(self, name, sources=None, destinations=None,
services=None, action='allow', log_options=None,
authentication_options=None, connection_tracking=None,
is_disabled=False, vpn_policy=None, mobile_vpn=False,
add_pos=None, after=None, before=None,
sub_policy=None, comment=None, **kw):
"""
Create a layer 3 firewall rule
:param str name: name of rule
:param sources: source/s for rule
:type sources: list[str, Element]
:param destinations: destination/s for rule
:type destinations: list[str, Element]
:param services: service/s for rule
:type services: list[str, Element]
:param action: allow,continue,discard,refuse,enforce_vpn,
apply_vpn,forward_vpn, blacklist (default: allow)
:type action: Action or str
:param LogOptions log_options: LogOptions object
:param ConnectionTracking connection_tracking: custom connection tracking settings
:param AuthenticationOptions authentication_options: options for auth if any
:param PolicyVPN,str vpn_policy: policy element or str href; required for
enforce_vpn, use_vpn and apply_vpn actions
:param bool mobile_vpn: if using a vpn action, you can set mobile_vpn to True and
omit the vpn_policy setting if you want this VPN to apply to any mobile VPN based
on the policy VPN associated with the engine
:param str,Element sub_policy: sub policy required when rule has an action of 'jump'.
Can be the FirewallSubPolicy element or href.
:param int add_pos: position to insert the rule, starting with position 1. If
the position value is greater than the number of rules, the rule is inserted at
the bottom. If add_pos is not provided, rule is inserted in position 1. Mutually
exclusive with ``after`` and ``before`` params.
:param str after: Rule tag to add this rule after. Mutually exclusive with ``add_pos``
and ``before`` params.
:param str before: Rule tag to add this rule before. Mutually exclusive with ``add_pos``
and ``after`` params.
:param str comment: optional comment for this rule
:raises MissingRequiredInput: when options are specified the need additional
setting, i.e. use_vpn action requires a vpn policy be specified.
:raises CreateRuleFailed: rule creation failure
:return: the created ipv4 rule
:rtype: IPv4Rule
"""
rule_values = self.update_targets(sources, destinations, services)
rule_values.update(name=name, comment=comment)
if isinstance(action, Action):
rule_action = action
else:
rule_action = Action()
rule_action.action = action
if not rule_action.action in self._actions:
raise CreateRuleFailed('Action specified is not valid for this '
'rule type; action: {}'.format(rule_action.action))
if rule_action.action in ('apply_vpn', 'enforce_vpn', 'forward_vpn'):
if vpn_policy is None and not mobile_vpn:
raise MissingRequiredInput('You must either specify a vpn_policy or set '
'mobile_vpn when using a rule with a VPN action')
if mobile_vpn:
rule_action.mobile_vpn = True
else:
try:
vpn = element_resolver(vpn_policy) # VPNPolicy
rule_action.vpn = vpn
except ElementNotFound:
raise MissingRequiredInput('Cannot find VPN policy specified: {}, '
.format(vpn_policy))
elif rule_action.action == 'jump':
try:
rule_action.sub_policy = element_resolver(sub_policy)
except ElementNotFound:
raise MissingRequiredInput('Cannot find sub policy specified: {} '
.format(sub_policy))
#rule_values.update(action=rule_action.data)
log_options = LogOptions() if not log_options else log_options
if connection_tracking is not None:
rule_action.connection_tracking_options.update(**connection_tracking)
auth_options = AuthenticationOptions() if not authentication_options \
else authentication_options
rule_values.update(
action=rule_action.data,
options=log_options.data,
authentication_options=auth_options.data,
is_disabled=is_disabled)
params = None
href = self.href
if add_pos is not None:
href = self.add_at_position(add_pos)
elif before or after:
params = self.add_before_after(before, after)
return ElementCreator(
self.__class__,
exception=CreateRuleFailed,
href=href,
params=params,
json=rule_values)
[docs]class IPv4Layer2Rule(RuleCommon, Rule, SubElement):
"""
Create IPv4 rules for Layer 2 Firewalls
Example of creating an allow all rule::
policy = Layer2Policy('mylayer2')
policy.layer2_ipv4_access_rules.create(name='myrule',
sources='any',
destinations='any',
services='any')
"""
typeof = 'layer2_ipv4_access_rule'
_actions = ('allow', 'continue', 'discard',
'refuse', 'jump', 'blacklist')
[docs] def create(self, name, sources=None, destinations=None,
services=None, action='allow', is_disabled=False,
logical_interfaces=None, add_pos=None,
after=None, before=None, comment=None):
"""
Create an IPv4 Layer 2 FW rule
:param str name: name of rule
:param sources: source/s for rule
:type sources: list[str, Element]
:param destinations: destination/s for rule
:type destinations: list[str, Element]
:param services: service/s for rule
:type services: list[str, Element]
:param str, Action action: \|allow\|continue\|discard\|refuse\|blacklist
:param bool is_disabled: whether to disable rule or not
:param list logical_interfaces: logical interfaces by name
:param int add_pos: position to insert the rule, starting with position 1. If
the position value is greater than the number of rules, the rule is inserted at
the bottom. If add_pos is not provided, rule is inserted in position 1. Mutually
exclusive with ``after`` and ``before`` params.
:param str after: Rule tag to add this rule after. Mutually exclusive with ``add_pos``
and ``before`` params.
:param str before: Rule tag to add this rule before. Mutually exclusive with ``add_pos``
and ``after`` params.
:param str comment: optional comment for this rule
:raises MissingRequiredInput: when options are specified the need additional
setting, i.e. use_vpn action requires a vpn policy be specified.
:raises CreateRuleFailed: rule creation failure
:return: newly created rule
:rtype: IPv4Layer2Rule
"""
rule_values = self.update_targets(sources, destinations, services)
rule_values.update(name=name)
rule_values.update(is_disabled=is_disabled)
if isinstance(action, Action):
rule_action = action
else:
rule_action = Action()
rule_action.action = action
if not rule_action.action in self._actions:
raise CreateRuleFailed('Action specified is not valid for this '
'rule type; action: {}'.format(rule_action.action))
rule_values.update(action=rule_action.data)
rule_values.update(self.update_logical_if(logical_interfaces))
params = None
href = self.href
if add_pos is not None:
href = self.add_at_position(add_pos)
elif before or after:
params = self.add_before_after(before, after)
return ElementCreator(
self.__class__,
exception=CreateRuleFailed,
href=href,
params=params,
json=rule_values)
[docs]class EthernetRule(RuleCommon, Rule, SubElement):
"""
Ethernet Rule represents a policy on a layer 2 or IPS engine.
If logical_interfaces parameter is left blank, 'any' logical
interface is used.
Create an ethernet rule for a layer 2 policy::
policy = Layer2Policy('layer2policy')
policy.layer2_ethernet_rules.create(name='l2rule',
logical_interfaces=['dmz'],
sources='any',
action='discard')
"""
typeof = 'ethernet_rule'
_actions = ('allow', 'discard')
[docs] def create(self, name, sources=None, destinations=None,
services=None, action='allow', is_disabled=False,
logical_interfaces=None, add_pos=None,
after=None, before=None, comment=None):
"""
Create an Ethernet rule
:param str name: name of rule
:param sources: source/s for rule
:type sources: list[str, Element]
:param destinations: destination/s for rule
:type destinations: list[str, Element]
:param services: service/s for rule
:type services: list[str, Element]
:param str action: \|allow\|continue\|discard\|refuse\|blacklist
:param bool is_disabled: whether to disable rule or not
:param list logical_interfaces: logical interfaces by name
:param int add_pos: position to insert the rule, starting with position 1. If
the position value is greater than the number of rules, the rule is inserted at
the bottom. If add_pos is not provided, rule is inserted in position 1. Mutually
exclusive with ``after`` and ``before`` params.
:param str after: Rule tag to add this rule after. Mutually exclusive with ``add_pos``
and ``before`` params.
:param str before: Rule tag to add this rule before. Mutually exclusive with ``add_pos``
and ``after`` params.
:raises MissingReuqiredInput: when options are specified the need additional
setting, i.e. use_vpn action requires a vpn policy be specified.
:raises CreateRuleFailed: rule creation failure
:return: newly created rule
:rtype: EthernetRule
"""
rule_values = self.update_targets(sources, destinations, services)
rule_values.update(name=name, comment=comment)
rule_values.update(is_disabled=is_disabled)
if isinstance(action, Action):
rule_action = action
else:
rule_action = Action()
rule_action.action = action
if not rule_action.action in self._actions:
raise CreateRuleFailed('Action specified is not valid for this '
'rule type; action: {}'
.format(rule_action.action))
rule_values.update(action=rule_action.data)
rule_values.update(self.update_logical_if(logical_interfaces))
params = None
href = self.href
if add_pos is not None:
href = self.add_at_position(add_pos)
else:
params = self.add_before_after(before, after)
return ElementCreator(
self.__class__,
exception=CreateRuleFailed,
href=href,
params=params,
json=rule_values)
[docs]class IPv6Rule(IPv4Rule):
"""
IPv6 access rule defines sources and destinations that must be
in IPv6 format.
.. note:: It is possible to submit a source or destination in
IPv4 format, however this will fail validation when
attempting to push policy.
"""
typeof = 'fw_ipv6_access_rule'