Source code for smc.core.general

"""
General configuration areas for an engine. This correlates to service level
settings such as DNSRelay, SNMP, etc.
"""

import collections
from smc.base.model import Element
from smc.base.util import element_resolver
from smc.elements.profiles import DNSRelayProfile
from smc.base.structs import NestedDict
from smc.policy.interface import InterfacePolicy
from smc.api.exceptions import LoadPolicyFailed


[docs]class SNMP(object): """ SNMP configuration details for applying SNMP on an engine. SNMP requires at minimum an assigned SNMPAgent configuration which defines the SNMP specific settings (version, community string, etc). You can also define specific interfaces to enable SNMP on. By default, if no addresses are specified, SNMP will be defined on all interfaces. .. seealso:: :class:`smc.elements.profiles.SNMPAgent` """ def __init__(self, engine): self.engine = engine @property def agent(self): """ The SNMP agent profile used for this engine. :rtype: SNMPAgent """ return self.engine.from_href( getattr(self.engine, 'snmp_agent_ref', None)) @property def status(self): return bool(getattr(self.engine, 'snmp_agent_ref', False))
[docs] def disable(self): """ Disable SNMP on this engine. You must call `update` on the engine for this to take effect. :return: None """ self.engine.data.update( snmp_agent_ref=None, snmp_location='', snmp_interface=[])
[docs] def enable(self, snmp_agent, snmp_location=None, snmp_interface=None): """ Enable SNMP on the engine. Specify a list of interfaces by ID to enable only on those interfaces. Only interfaces that have NDI's are supported. :param str,Element snmp_agent: the SNMP agent reference for this engine :param str snmp_location: the SNMP location identifier for the engine :param list snmp_interface: list of interface IDs to enable SNMP :raises ElementNotFound: unable to resolve snmp_agent :raises InterfaceNotFound: specified interface by ID not found """ agent = element_resolver(snmp_agent) snmp_interface = [] if not snmp_interface else snmp_interface interfaces = self._iface_dict(snmp_interface) self.engine.data.update( snmp_agent_ref=agent, snmp_location=snmp_location if snmp_location else '', snmp_interface=interfaces)
def _iface_dict(self, snmp_interface): return [values for interface in snmp_interface for values in self.engine.interface.get(interface).ndi_interfaces] @property def _nicids(self): return [str(nic.get('nicid')) for nic in getattr(self.engine, 'snmp_interface', [])]
[docs] def update_configuration(self, **kwargs): """ Update the SNMP configuration using any kwargs supported in the `enable` constructor. Return whether a change was made. You must call update on the engine to commit any changes. :param dict kwargs: keyword arguments supported by enable constructor :rtype: bool """ updated = False if 'snmp_agent' in kwargs: kwargs.update(snmp_agent_ref=kwargs.pop('snmp_agent')) snmp_interface = kwargs.pop('snmp_interface', None) for name, value in kwargs.items(): _value = element_resolver(value) if getattr(self.engine, name, None) != _value: self.engine.data[name] = _value updated = True if snmp_interface is not None: _snmp_interface = getattr(self.engine, 'snmp_interface', []) if not len(snmp_interface) and len(_snmp_interface): self.engine.data.update(snmp_interface=[]) updated = True elif len(snmp_interface): if set(self._nicids) ^ set(map(str, snmp_interface)): self.engine.data.update( snmp_interface=self._iface_dict(snmp_interface)) updated = True return updated
@property def location(self): """ Return the SNMP location string :rtype: str """ return getattr(self.engine, 'snmp_location', None) @property def interface(self): """ Return a list of physical interfaces that the SNMP agent is bound to. :rtype: list(PhysicalInterface) """ nics = set([nic.get('nicid') for nic in \ getattr(self.engine, 'snmp_interface', [])]) return [self.engine.interface.get(nic) for nic in nics] def __repr__(self): return '{0}(enabled={1})'.format( self.__class__.__name__, self.status)
[docs]class DNSRelay(object): """ DNS Relay allows the engine to provide DNS caching or specific host, IP and domain replies to clients. It can also be used to sinkhole specific DNS requests. .. seealso:: :class:`smc.elements.profiles.DNSRelayProfile` """ def __init__(self, engine): self.engine = engine @property def status(self): """ Status of DNS Relay on this engine. :rtype: bool """ return getattr(self.engine, 'dns_relay_profile_ref', False)
[docs] def enable(self, interface_id, dns_relay_profile=None): """ Enable the DNS Relay service on this engine. :param int interface_id: interface id to enable relay :param str,DNSRelayProfile dns_relay_profile: DNSRelayProfile element or str href :raises EngineCommandFailed: interface not found :raises ElementNotFound: profile not found :return: None """ if not dns_relay_profile: # Use default href = DNSRelayProfile('Cache Only').href else: href = element_resolver(dns_relay_profile) intf = self.engine.interface.get(interface_id) self.engine.data.update(dns_relay_profile_ref=href) self.engine.data.update(dns_relay_interface=intf.ndi_interfaces)
[docs] def disable(self): """ Disable DNS Relay on this engine :return: None """ self.engine.data.update(dns_relay_interface=[]) self.engine.data.pop('dns_relay_profile_ref', None)
def __repr__(self): return '{0}(enabled={1})'.format( self.__class__.__name__, self.status)
[docs]class DefaultNAT(object): """ Default NAT on the engine is used to automatically create NAT configurations based on internal routing. This simplifies the need to create specific NAT rules, primarily for outbound traffic. .. note:: You must call engine.update() to commit any changes. """ def __init__(self, engine): self.engine = engine @property def status(self): """ Status of default nat on the engine. :rtype: bool """ return self.engine.data['default_nat']
[docs] def enable(self): """ Enable default NAT on this engine """ self.engine.data['default_nat'] = True
[docs] def disable(self): """ Disable default NAT on this engine """ self.engine.data['default_nat'] = False
def __repr__(self): return '{0}(enabled={1})'.format( self.__class__.__name__, self.status)
[docs]class RankedDNSAddress(object): """ A RankedDNSAddress represents a list of DNS entries used as a ranked list to provide an ordered way to perform DNS queries. DNS entries can be added as raw IP addresses, or as elements of type :class:`smc.elements.network.Host` or :class:`smc.elements.servers.DNSServer` (or combination of both). This is an iterable class yielding namedtuples of type :class:`.DNSEntry`. Normal access is done through an engine reference:: >>> list(engine.dns) [DNSEntry(rank=0,value=8.8.8.8,ne_ref=None), DNSEntry(rank=1,value=None,ne_ref=DNSServer(name=mydnsserver))] >>> engine.dns.append(['8.8.8.8', '9.9.9.9']) >>> engine.dns.prepend(['1.1.1.1']) >>> engine.dns.remove(['8.8.8.8', DNSServer('mydnsserver')]) .. note:: You must call engine.update() to commit any changes. """ def __init__(self, entries): self.entries = entries def __iter__(self): for entry in self.entries: yield DNSEntry(**entry) def __len__(self): return len(self.entries) def __contains__(self, entry): for e in self: try: if e.ne_ref == entry.href: return True except AttributeError: if e.value == entry: return True return False def _rank_dns(self, entry, prepend=False): if prepend and len(self) or not len(self): start_rank = 0 else: start_rank = self.entries[-1].get('rank')+1 additions = [] for e in entry: if e not in self and e not in additions: additions.append(e) if not additions: return if prepend: # Rerank for e in self.entries: e.update((k, v+1) for k, v in e.items() if k == 'rank') for num, addr in enumerate(additions, int(start_rank)): try: self.entries.append({'rank': float(num), 'ne_ref': addr.href}) except AttributeError: self.entries.append({'rank': float(num), 'value': addr}) def add(self, values): return self.append(values)
[docs] def append(self, values): """ Add DNS entries to the engine at the end of the existing list (if any). A DNS entry can be either a raw IP Address, or an element of type :class:`smc.elements.network.Host` or :class:`smc.elements.servers.DNSServer`. :param list values: list of IP addresses, Host and/or DNSServer elements. :return: None .. note:: If the DNS entry added already exists, it will not be added. It's not a valid configuration to enter the same DNS IP multiple times. This is also true if the element is assigned the same address as a raw IP address already defined. """ self._rank_dns(values)
[docs] def prepend(self, values): """ Prepend DNS entries to the engine at the beginning of the existing list (if any). A DNS entry can be either a raw IP Address, or an element of type :class:`smc.elements.network.Host` or :class:`smc.elements.servers.DNSServer`. :param list values: list of IP addresses, Host and/or DNSServer elements. :return: None """ self._rank_dns(values, prepend=True)
[docs] def remove(self, values): """ Remove DNS entries from this ranked DNS list. A DNS entry can be either a raw IP Address, or an element of type :class:`smc.elements.network.Host` or :class:`smc.elements.servers.DNSServer`. :param list values: list of IP addresses, Host and/or DNSServer elements. :return: None """ removables = [] for value in values: if value in self: removables.append(value) if removables: self.entries[:] = [entry._asdict() for entry in self if entry.value not in removables and not entry.element in removables] # Rerank to maintain order for i, entry in enumerate(self.entries): entry.update(rank='{}'.format(i))
[docs]class DNSEntry(collections.namedtuple('DNSEntry', 'value rank ne_ref')): """ DNSEntry represents a single DNS entry within an engine DNSAddress list. :ivar str value: IP address value of this entry (None if type Element is used) :ivar int rank: order rank for the entry :ivar str ne_ref: network element href of entry. Use element property to resolve to type Element. :ivar Element element: If the DNS entry is an element type, this property will returned a resolved version of the ne_ref field. """ __slots__ = () def __new__(cls, rank, value=None, ne_ref=None): # @ReservedAssignment return super(DNSEntry, cls).__new__(cls, value, rank, ne_ref) @property def element(self): return Element.from_href(self.ne_ref) def __repr__(self): return 'DNSEntry(rank={0},value={1},ne_ref={2})'\ .format(self.rank, self.value, self.element)
[docs]class Layer2Settings(NestedDict): """ Layer 2 Settings are only applicable on Layer 3 Firewall engines that want to run specific interfaces in layer 2 mode. This requires that a Layer 2 Interface Policy is applied to the engine. You can also set connection tracking and bypass on overload settings for these interfaces as well. Set policy for the engine:: engine.l2fw_settings.enable(InterfacePolicy('mylayer2')) :ivar bool bypass_overload_traffic: whether to bypass traffic on overload :ivar str tracking_mode: connection tracking mode .. note:: You must call engine.update() to commit any changes. .. warning:: This feature requires SMC and engine version >= 6.3 """ def __init__(self, engine): l2 = engine.data['l2fw_settings'] super(Layer2Settings, self).__init__(data=l2)
[docs] def connection_tracking(self, mode): """ Set the connection tracking mode for these layer 2 settings. :param str mode: normal, strict, loose :return: None """ if mode in ('normal', 'strict', 'loose'): self.update(tracking_mode=mode)
[docs] def bypass_on_overload(self, value): """ Set the l2fw settings to bypass on overload. :param bool value: boolean to indicate bypass setting :return: None """ self.update(bypass_overload_traffic=value)
[docs] def disable(self): """ Disable the layer 2 interface policy """ self.pop('l2_interface_policy_ref', None)
[docs] def enable(self, policy): """ Set a layer 2 interface policy. :param str,Element policy: an InterfacePolicy or str href :raises LoadPolicyFailed: Invalid policy specified :raises ElementNotFound: InterfacePolicy not found :return: None """ if hasattr(policy, 'href'): if not isinstance(policy, InterfacePolicy): raise LoadPolicyFailed('Invalid policy type specified. The policy' 'type must be InterfacePolicy') self.update(l2_interface_policy_ref=element_resolver(policy))
@property def policy(self): """ Return the InterfacePolicy for this layer 3 firewall. :rtype: InterfacePolicy """ return InterfacePolicy.from_href(self.get('l2_interface_policy_ref')) def __repr__(self): return '{0}(policy={1})'.format( self.__class__.__name__, self.policy)