"""
Tasks will be fired when executing specific actions such as a policy
upload, refresh, or making backups.
This module provides that ability to access task specific attributes
and optionally poll for status of an operation.
An example of using a task poller when uploading an engine policy
(use `wait_for_finish=True`)::
engine = Engine('myfirewall')
poller = engine.upload(policy=fwpolicy, wait_for_finish=True)
while not poller.done():
poller.wait(5)
print("Task Progress {}%".format(poller.task.progress))
print(poller.last_message())
"""
import re
import time
import threading
from smc.base.model import ElementCache, Element, SubElement
from smc.api.exceptions import TaskRunFailed, ActionCommandFailed,\
ResourceNotFound
from smc.base.collection import Search
from smc.base.util import millis_to_utc
clean_html = re.compile(r'<.*?>')
[docs]def TaskHistory():
"""
Task history retrieves a list of tasks in an event queue.
:return: list of task events
:rtype: list(TaskProgress)
"""
events = Search.objects.entry_point('task_progress')
return [event for event in events]
[docs]class TaskProgress(Element):
"""
Task Progress represents a task event queue. These
tasks may be completed or still running. The task event
queue events can be retrieved by calling :func:`~TaskHistory`.
"""
typeof = 'task_progress'
@property
def task(self):
"""
Return the task associated with this event
:rtype: Task
"""
return Task(self.data)
[docs]class Task(SubElement):
"""
Task representation. This is generic and the format is used for
any calls to SMC that return an asynchronous follower link to
check the status of the task.
:param str last_message: Last message received on this task
:param bool in_progress: Whether the task is in progress or finished
:param bool success: Whether the task succeeded or not
:param str follower: Fully qualified path to the follower link to track
this task.
"""
def __init__(self, task):
super(Task, self).__init__(
href=task.get('follower', None),
name=task.get('type', None))
self.data = ElementCache(task)
@property
def resource(self):
"""
The resource/s associated with this task
:rtype: list(Element)
"""
return [Element.from_href(resource)
for resource in self.data.get('resource', [])]
@property
def progress(self):
"""
Percentage of completion
:rtype: int
"""
return self.data.get('progress', 0)
@property
def start_time(self):
"""
Task start time in UTC datetime format
:rtype: datetime
"""
start_time = self.data.get('start_time')
if start_time:
return millis_to_utc(start_time)
@property
def end_time(self):
"""
Task end time in UTC datetime format
:rtype: datetime
"""
end_time = self.data.get('end_time')
if end_time:
return millis_to_utc(end_time)
[docs] def abort(self):
"""
Abort existing task.
:raises ActionCommandFailed: aborting task failed with reason
:return: None
"""
try:
self.make_request(
method='delete',
resource='abort')
except ResourceNotFound:
pass
except ActionCommandFailed:
pass
@property
def result_url(self):
"""
Link to result (this task)
:rtype: str
"""
return self.get_relation('result')
[docs] def update_status(self):
"""
Gets the current status of this task and returns a
new task object.
:raises TaskRunFailed: fail to update task status
"""
task = self.make_request(
TaskRunFailed,
href=self.href)
return Task(task)
def __getattr__(self, key):
return self.data.get(key)
@staticmethod
def execute(self, resource, **kw):
"""
Execute the task and return a TaskOperationPoller.
:rtype: TaskOperationPoller
"""
params = kw.pop('params', {})
json = kw.pop('json', None)
task = self.make_request(
TaskRunFailed,
method='create',
params=params,
json=json,
resource=resource)
timeout = kw.pop('timeout', 5)
wait_for_finish = kw.pop('wait_for_finish', True)
return TaskOperationPoller(
task=task, timeout=timeout,
wait_for_finish=wait_for_finish,
**kw)
@staticmethod
def download(self, resource, filename, **kw):
"""
Start and return a Download Task
:rtype: DownloadTask(TaskOperationPoller)
"""
params = kw.pop('params', {})
task = self.make_request(
TaskRunFailed,
method='create',
resource=resource,
params=params)
return DownloadTask(
filename=filename, task=task)
[docs]class TaskOperationPoller(object):
"""
Task Operation Poller provides a way to poll the SMC
for the status of the task operation. This is returned
by functions that return a task. Typically these will be
operations like refreshing policy, uploading policy, etc.
"""
def __init__(self, task, timeout=5, max_tries=36,
wait_for_finish=False):
self._task = Task(task)
self._thread = None
self._done = None
self._exception = None
self.callbacks = [] # Call after operation completes
if wait_for_finish:
self._max_tries = max_tries
self._timeout = timeout
self._done = threading.Event()
self._thread = threading.Thread(
target=self._start)
self._thread.daemon = True
self._thread.start()
def _start(self):
while not self.finished():
try:
time.sleep(self._timeout)
self._task = self._task.update_status()
self._max_tries -= 1
except Exception as e:
self._exception = e
break
self._done.set()
for call in self.callbacks:
call(self.task)
def finished(self):
return self._done.is_set() or not self._task.in_progress or \
self._max_tries == 0
[docs] def add_done_callback(self, callback):
"""
Add a callback to run after the task completes.
The callable must take 1 argument which will be
the completed Task.
:param callback: a callable that takes a single argument which
will be the completed Task.
"""
if self._done is None or self._done.is_set():
raise ValueError('Task has already finished')
if callable(callback):
self.callbacks.append(callback)
[docs] def result(self, timeout=None):
"""
Return the current Task after waiting for timeout
:rtype: Task
"""
self.wait(timeout)
return self._task
[docs] def wait(self, timeout=None):
"""
Blocking wait for task status.
"""
if self._thread is None:
return
self._thread.join(timeout=timeout)
[docs] def last_message(self, timeout=5):
"""
Wait a specified amount of time and return
the last message from the task
:rtype: str
"""
if self._thread is not None:
self._thread.join(timeout=timeout)
return self._task.last_message
[docs] def done(self):
"""
Is the task done yet
:rtype: bool
"""
return self._thread is None or not self._thread.isAlive()
@property
def task(self):
"""
Access to task
:rtype: Task
"""
return self._task
[docs] def stop(self):
"""
Stop the running task
"""
if self._thread is not None and self._thread.isAlive():
self._done.set()
[docs]class DownloadTask(TaskOperationPoller):
"""
A download task handles tasks that have files associated, for example
exporting an element to a specified file.
"""
def __init__(self, filename, task, **kw):
super(DownloadTask, self).__init__(task, wait_for_finish=True, **kw)
self.type = 'download_task'
self.filename = filename
self.download(None)
def download(self, timeout):
self.wait(timeout)
if not self.task.in_progress and not self.task.success:
raise TaskRunFailed(self.task.last_message)
try:
result = self.task.make_request(
TaskRunFailed,
raw_result=True,
href=self.task.result_url,
filename=self.filename)
self.filename = result.content
except IOError as io:
raise TaskRunFailed(
'Export task failed with message: {}'.format(io))