Source code for smc.vpn.elements

"""
VPN Elements are used in conjunction with Policy or Route Based VPN configurations.
VPN elements consist of external gateway and VPN site settings that identify 3rd party
gateways to be used as a VPN termination endpoint.

There are several ways to create an external gateway configuration.
A step by step process which first creates a network element to be used in a 
VPN site, then creates the ExternalGatway, an ExternalEndpoint for the gateway,
and inserts the VPN site into the configuration::

    Network.create(name='mynetwork', ipv4_network='172.18.1.0/24')
    gw = ExternalGateway.create(name='mygw')
    gw.external_endpoint.create(name='myendpoint', address='10.10.10.10')
    gw.vpn_site.create(name='mysite', site_element=[Network('mynetwork')])

You can also use the convenience method `update_or_create` on the ExternalGateway
to fully provision in a single step. Note that the ExternalEndpoint and VPNSite also
have an `update_or_create` method to limit the update to those respective
configurations::

    >>> from smc.elements.network import Network
    >>> from smc.vpn.elements import ExternalGateway
    >>> network = Network.get_or_create(name='network-172.18.1.0/24', ipv4_network='172.18.1.0/24')
    >>> 
    >>> g = ExternalGateway.update_or_create(name='newgw',
        external_endpoint=[
            {'name': 'endpoint1', 'address': '1.1.1.1', 'enabled': True},
            {'name': 'endpoint2', 'address': '2.2.2.2', 'enabled': True}],
        vpn_site=[{'name': 'sitea', 'site_element':[network]}])
    >>> g
    ExternalGateway(name=newgw)
    >>> for endpoint in g.external_endpoint:
    ...   endpoint
    ... 
    ExternalEndpoint(name=endpoint1 (1.1.1.1))
    ExternalEndpoint(name=endpoint2 (2.2.2.2))
    >>> for site in g.vpn_site:
    ...   site, site.site_element
    ... 
    (VPNSite(name=sitea), [Network(name=network-172.18.1.0/24)])

.. note:: When calling `update_or_create` from the ExternalGateway, providing the
    parameters for external_endpoints and vpn_site is optional.
"""

from smc.api.exceptions import ElementNotFound
from smc.base.model import SubElement, Element, ElementRef, ElementCreator
from smc.base.collection import create_collection
from smc.base.util import element_resolver


[docs]class GatewaySettings(Element): """ Gateway settings define various VPN related settings that are applied at the firewall level such as negotiation timers and mobike settings. A gateway setting is a property of an engine:: engine = Engine('myfw') engine.vpn.gateway_settings """ typeof = 'gateway_settings'
[docs] @classmethod def create(cls, name, negotiation_expiration=200000, negotiation_retry_timer=500, negotiation_retry_max_number=32, negotiation_retry_timer_max=7000, certificate_cache_crl_validity=90000, mobike_after_sa_update=False, mobike_before_sa_update=False, mobike_no_rrc=True): """ Create a new gateway setting profile. :param str name: name of profile :param int negotiation_expiration: expire after (ms) :param int negotiation_retry_timer: retry time length (ms) :param int negotiation_retry_max_num: max number of retries allowed :param int negotiation_retry_timer_max: maximum length for retry (ms) :param int certificate_cache_crl_validity: cert cache validity (seconds) :param boolean mobike_after_sa_update: Whether the After SA flag is set for Mobike Policy :param boolean mobike_before_sa_update: Whether the Before SA flag is set for Mobike Policy :param boolean mobike_no_rrc: Whether the No RRC flag is set for Mobike Policy :raises CreateElementFailed: failed creating profile :return: instance with meta :rtype: GatewaySettings """ json = {'name': name, 'negotiation_expiration': negotiation_expiration, 'negotiation_retry_timer': negotiation_retry_timer, 'negotiation_retry_max_number': negotiation_retry_max_number, 'negotiation_retry_timer_max': negotiation_retry_timer_max, 'certificate_cache_crl_validity': certificate_cache_crl_validity, 'mobike_after_sa_update': mobike_after_sa_update, 'mobike_before_sa_update': mobike_before_sa_update, 'mobike_no_rrc': mobike_no_rrc} return ElementCreator(cls, json)
[docs]class GatewayProfile(Element): """ Gateway Profiles describe the capabilities of a Gateway, i.e. supported cipher, hash, etc. Gateway Profiles of Internal Gateways are read-only and computed from the firewall version and FIPS mode. Gateway Profiles of External Gateways are user-defined. """ typeof = 'gateway_profile' def capabilities(self): pass
[docs]class ExternalGateway(Element): """ External Gateway defines an VPN Gateway for a non-SMC managed device. This will specify details such as the endpoint IP, and VPN site protected networks. Example of manually provisioning each step:: Network.create(name='mynetwork', ipv4_network='172.18.1.0/24') gw = ExternalGateway.create(name='mygw') gw.external_endpoint.create(name='myendpoint', address='10.10.10.10') gw.vpn_site.create(name='mysite', site_element=[Network('mynetwork')]) :ivar GatewayProfile gateway_profile: A gateway profile will define the capabilities (i.e. crypto) allowed for this VPN. """ typeof = 'external_gateway' gateway_profile = ElementRef('gateway_profile')
[docs] @classmethod def create(cls, name, trust_all_cas=True): """ Create new External Gateway :param str name: name of test_external gateway :param bool trust_all_cas: whether to trust all internal CA's (default: True) :return: instance with meta :rtype: ExternalGateway """ json = {'name': name, 'trust_all_cas': trust_all_cas} return ElementCreator(cls, json)
[docs] @classmethod def update_or_create(cls, name, external_endpoint=None, vpn_site=None, trust_all_cas=True, with_status=False): """ Update or create an ExternalGateway. The ``external_endpoint`` and ``vpn_site`` parameters are expected to be a list of dicts with key/value pairs to satisfy the respective elements create constructor. VPN Sites will represent the final state of the VPN site list. ExternalEndpoint that are pre-existing will not be deleted if not provided in the ``external_endpoint`` parameter, however existing elements will be updated as specified. :param str name: name of external gateway :param list(dict) external_endpoint: list of dict items with key/value to satisfy ExternalEndpoint.create constructor :param list(dict) vpn_site: list of dict items with key/value to satisfy VPNSite.create constructor :param bool with_status: If set to True, returns a 3-tuple of (ExternalGateway, modified, created), where modified and created is the boolean status for operations performed. :raises ValueError: missing required argument/s for constructor argument :rtype: ExternalGateway """ if external_endpoint: for endpoint in external_endpoint: if 'name' not in endpoint: raise ValueError('External endpoints are configured ' 'but missing the name parameter.') if vpn_site: for site in vpn_site: if 'name' not in site: raise ValueError('VPN sites are configured but missing ' 'the name parameter.') # Make sure VPN sites are resolvable before continuing sites = [element_resolver(element, do_raise=True) for element in site.get('site_element', [])] site.update(site_element=sites) updated = False created = False try: extgw = ExternalGateway.get(name) except ElementNotFound: extgw = ExternalGateway.create(name, trust_all_cas) created = True if external_endpoint: for endpoint in external_endpoint: _, modified, was_created = ExternalEndpoint.update_or_create( extgw, with_status=True, **endpoint) if was_created or modified: updated = True if vpn_site: for site in vpn_site: _, modified, was_created = VPNSite.update_or_create(extgw, name=site['name'], site_element=site.get('site_element'), with_status=True) if was_created or modified: updated = True if with_status: return extgw, updated, created return extgw
@property def vpn_site(self): """ A VPN site defines a collection of IP's or networks that identify address space that is defined on the other end of the VPN tunnel. :rtype: CreateCollection(VPNSite) """ return create_collection( self.get_relation('vpn_site'), VPNSite) @property def external_endpoint(self): """ An External Endpoint is the IP based definition for the destination VPN peers. There may be multiple per External Gateway. Add a new endpoint to an existing test_external gateway:: >>> list(ExternalGateway.objects.all()) [ExternalGateway(name=cisco-remote-side), ExternalGateway(name=remoteside)] >>> gateway.external_endpoint.create('someendpoint', '12.12.12.12') 'http://1.1.1.1:8082/6.1/elements/external_gateway/22961/external_endpoint/27467' :rtype: CreateCollection(ExternalEndpoint) """ return create_collection( self.get_relation('external_endpoint'), ExternalEndpoint) @property def trust_all_cas(self): """ Gateway setting identifying whether all CA's specified in the profile are supported or only specific ones. :rtype: bool """ return self.data.get('trust_all_cas')
[docs]class ExternalEndpoint(SubElement): """ External Endpoint is used by the External Gateway and defines the IP and other VPN related settings to identify the VPN peer. This is created to define the details of the non-SMC managed gateway. This class is a property of :py:class:`smc.vpn.elements.ExternalGateway` and should not be called directly. Add an endpoint to existing External Gateway:: gw = ExternalGateway('aws') gw.external_endpoint.create(name='aws01', address='2.2.2.2') """ typeof = 'external_endpoint'
[docs] def create(self, name, address=None, enabled=True, balancing_mode='active', ipsec_vpn=True, nat_t=False, force_nat_t=False, dynamic=False, ike_phase1_id_type=None, ike_phase1_id_value=None): """ Create an test_external endpoint. Define common settings for that specify the address, enabled, nat_t, name, etc. You can also omit the IP address if the endpoint is dynamic. In that case, you must also specify the ike_phase1 settings. :param str name: name of test_external endpoint :param str address: address of remote host :param bool enabled: True|False (default: True) :param str balancing_mode: active :param bool ipsec_vpn: True|False (default: True) :param bool nat_t: True|False (default: False) :param bool force_nat_t: True|False (default: False) :param bool dynamic: is a dynamic VPN (default: False) :param int ike_phase1_id_type: If using a dynamic endpoint, you must set this value. Valid options: 0=DNS name, 1=Email, 2=DN, 3=IP Address :param str ike_phase1_id_value: value of ike_phase1_id. Required if ike_phase1_id_type and dynamic set. :raises CreateElementFailed: create element with reason :return: newly created element :rtype: ExternalEndpoint """ json = {'name': name, 'address': address, 'balancing_mode': balancing_mode, 'dynamic': dynamic, 'enabled': enabled, 'nat_t': nat_t, 'force_nat_t': force_nat_t, 'ipsec_vpn': ipsec_vpn} if dynamic: json.pop('address') json.update( ike_phase1_id_type=ike_phase1_id_type, ike_phase1_id_value=ike_phase1_id_value) return ElementCreator( self.__class__, href=self.href, json=json)
[docs] @classmethod def update_or_create(cls, external_gateway, name, with_status=False, **kw): """ Update or create external endpoints for the specified external gateway. An ExternalEndpoint is considered unique based on the IP address for the endpoint (you cannot add two external endpoints with the same IP). If the external endpoint is dynamic, then the name is the unique identifier. :param ExternalGateway external_gateway: external gateway reference :param str name: name of the ExternalEndpoint. This is only used as a direct match if the endpoint is dynamic. Otherwise the address field in the keyword arguments will be used as you cannot add multiple external endpoints with the same IP address. :param bool with_status: If set to True, returns a 3-tuple of (ExternalEndpoint, modified, created), where modified and created is the boolean status for operations performed. :param dict kw: keyword arguments to satisfy ExternalEndpoint.create constructor :raises CreateElementFailed: Failed to create external endpoint with reason :raises ElementNotFound: If specified ExternalGateway is not valid :return: if with_status=True, return tuple(ExternalEndpoint, created). Otherwise return only ExternalEndpoint. """ if 'address' in kw: external_endpoint = external_gateway.external_endpoint.get_contains( '({})'.format(kw['address'])) else: external_endpoint = external_gateway.external_endpoint.get_contains(name) updated = False created = False if external_endpoint: # Check for changes for name, value in kw.items(): # Check for differences before updating if getattr(external_endpoint, name, None) != value: external_endpoint.data[name] = value updated = True if updated: external_endpoint.update() else: external_endpoint = external_gateway.external_endpoint.create( name, **kw) created = True if with_status: return external_endpoint, updated, created return external_endpoint
@property def force_nat_t(self): """ Whether force_nat_t is enabled on this endpoint. :rtype: bool """ return self.data.get('force_nat_t') @property def enabled(self): """ Whether this endpoint is enabled. :rtype: bool """ return self.data.get('enabled')
[docs] def enable_disable(self): """ Enable or disable this endpoint. If enabled, it will be disabled and vice versa. :return: None """ if self.enabled: self.data['enabled'] = False else: self.data['enabled'] = True self.update()
[docs] def enable_disable_force_nat_t(self): """ Enable or disable NAT-T on this endpoint. If enabled, it will be disabled and vice versa. :return: None """ if self.force_nat_t: self.data['force_nat_t'] = False else: self.data['force_nat_t'] = True self.update()
[docs]class VPNSite(SubElement): """ VPN Site information for an internal or test_external gateway Sites are used to encapsulate hosts or networks as 'protected' for VPN policy configuration. Create a new vpn site for an engine:: engine = Engine('myengine') network = Network('network-192.168.5.0/25') #get resource engine.vpn.sites.create('newsite', [network.href]) Sites can also be added to ExternalGateway's as well:: extgw = ExternalGateway('mygw') extgw.vpn_site.create('newsite', [Network('foo')]) This class is a property of :py:class:`smc.core.engine.InternalGateway` or :py:class:`smc.vpn.elements.ExternalGateway` and should not be accessed directly. :ivar InternalGateway,ExternalGateway gateway: gateway referenced """ typeof = 'vpn_site' gateway = ElementRef('gateway')
[docs] def create(self, name, site_element): """ Create a VPN site for an internal or external gateway :param str name: name of site :param list site_element: list of protected networks/hosts :type site_element: list[str,Element] :raises CreateElementFailed: create element failed with reason :return: href of new element :rtype: str """ site_element = element_resolver(site_element) json = { 'name': name, 'site_element': site_element} return ElementCreator( self.__class__, href=self.href, json=json)
[docs] @classmethod def update_or_create(cls, external_gateway, name, site_element=None, with_status=False): """ Update or create a VPN Site elements or modify an existing VPN site based on value of provided site_element list. The resultant VPN site end result will be what is provided in the site_element argument (can also be an empty list to clear existing). :param ExternalGateway external_gateway: The external gateway for this VPN site :param str name: name of the VPN site :param list(str,Element) site_element: list of resolved Elements to add to the VPN site :param bool with_status: If set to True, returns a 3-tuple of (VPNSite, modified, created), where modified and created is the boolean status for operations performed. :raises ElementNotFound: ExternalGateway or unresolved site_element """ site_element = [] if not site_element else site_element site_elements = [element_resolver(element) for element in site_element] vpn_site = external_gateway.vpn_site.get_exact(name) updated = False created = False if vpn_site: # If difference, reset if set(site_elements) != set(vpn_site.data.get('site_element', [])): vpn_site.data['site_element'] = site_elements vpn_site.update() updated = True else: vpn_site = external_gateway.vpn_site.create( name=name, site_element=site_elements) created = True if with_status: return vpn_site, updated, created return vpn_site
@property def name(self): name = super(VPNSite, self).name if not name: return self.data.get('name') return name @property def site_element(self): """ Site elements for this VPN Site. :return: Elements used in this VPN site :rtype: list(Element) """ return [Element.from_href(element) for element in self.data.get('site_element')]
[docs] def add_site_element(self, element): """ Add a site element or list of elements to this VPN. :param list element: list of Elements or href's of vpn site elements :type element: list(str,Network) :raises UpdateElementFailed: fails due to reason :return: None """ element = element_resolver(element) self.data['site_element'].extend(element) self.update()
class VPNProfile(Element): """ Represents a VPNProfile configuration used by the PolicyVPN """ typeof = 'vpn_profile'