157 lines
5.4 KiB
Python
Executable File
157 lines
5.4 KiB
Python
Executable File
#
|
|
# Copyright (c) 2015-2016,2024 Wind River Systems, Inc.
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
from nfv_common import debug
|
|
|
|
from nfv_common.tasks._task_work import TaskWork
|
|
|
|
DLOG = debug.debug_get_logger('nfv_common.tasks.task_future')
|
|
|
|
|
|
class TaskFuture(object):
|
|
"""
|
|
Task Future
|
|
"""
|
|
def __init__(self, scheduler):
|
|
"""
|
|
Create a task future
|
|
"""
|
|
self._scheduler = scheduler
|
|
self._result = None
|
|
self._timeouts = None
|
|
|
|
def set_timeouts(self, timeouts):
|
|
"""
|
|
Set the timeout values to be used when work is to be done
|
|
Parameter timeouts is a dictionary of target and the timeout in seconds
|
|
"""
|
|
self._timeouts = timeouts
|
|
|
|
def work(self, target, *args, **kwargs):
|
|
"""
|
|
Schedule work in the future
|
|
"""
|
|
timeout_in_secs = None
|
|
if self._timeouts is not None:
|
|
# Look for a target specific timeout
|
|
module_name = target.__module__.split('.')[-1]
|
|
timeout_name = "%s.%s" % (module_name, target.__name__)
|
|
timeout_in_secs = self._timeouts.get(timeout_name, None)
|
|
if timeout_in_secs is not None:
|
|
timeout_in_secs = int(timeout_in_secs)
|
|
else:
|
|
# Look for a module level timeout
|
|
timeout_name = "%s" % module_name
|
|
timeout_in_secs = self._timeouts.get(timeout_name, None)
|
|
if timeout_in_secs is not None:
|
|
timeout_in_secs = int(timeout_in_secs)
|
|
|
|
if timeout_in_secs is None:
|
|
if kwargs:
|
|
timeout_in_secs = kwargs.get('timeout_in_secs', None)
|
|
if timeout_in_secs is not None:
|
|
del kwargs['timeout_in_secs']
|
|
|
|
if timeout_in_secs is None:
|
|
# WARNING: Any change to the default timeout must be reflected in
|
|
# the timeouts used for any work being done.
|
|
timeout_in_secs = 600
|
|
|
|
elif 0 >= timeout_in_secs:
|
|
timeout_in_secs = None # No timeout wanted, wait forever
|
|
|
|
# Note about timeouts. When the timeout expires, the VIM will terminate
|
|
# the worker process doing the work. Unfortunately, the python
|
|
# multiprocessing library used to manage these processes results in
|
|
# leaked file descriptors each time a process is terminated. That
|
|
# means this timeout should be a last resort - the work being done
|
|
# (e.g. sending a REST API request) must have its own timeout
|
|
# mechanism to ensure it completes before the worker process times
|
|
# out. Adding 5 seconds to the configured (or default) timeout to
|
|
# ensure the underlying timeout mechanism has the opportunity to
|
|
# abort the work being done.
|
|
if timeout_in_secs is not None:
|
|
timeout_in_secs += 5
|
|
|
|
if self._scheduler.running_task is not None:
|
|
task_work = TaskWork(timeout_in_secs, target, *args, **kwargs)
|
|
self._scheduler.running_task.add_task_work(task_work)
|
|
self._result = None
|
|
return task_work.id
|
|
else:
|
|
raise LookupError("Running task no longer running")
|
|
|
|
def timer(self, name, interval_secs):
|
|
"""
|
|
Schedule a timer to be fired after so many milliseconds,
|
|
callback is a co-routine that is sent the timer identifier
|
|
that has fired
|
|
"""
|
|
if self._scheduler.running_task is not None:
|
|
timer_id = self._scheduler.running_task.add_timer(name,
|
|
interval_secs)
|
|
return timer_id
|
|
else:
|
|
raise LookupError("Running task no longer running")
|
|
|
|
def cancel_timer(self, timer_id):
|
|
"""
|
|
Cancel a scheduled timer
|
|
"""
|
|
if self._scheduler.running_task is not None:
|
|
self._scheduler.running_task.cancel_timer(timer_id)
|
|
else:
|
|
raise LookupError("Running task no longer running")
|
|
|
|
def io_read_wait(self, select_obj):
|
|
"""
|
|
Wait on a read selection object
|
|
"""
|
|
if self._scheduler.running_task is not None:
|
|
self._scheduler.running_task.add_io_read_wait(select_obj)
|
|
else:
|
|
raise LookupError("Running task no longer running")
|
|
|
|
def io_read_wait_cancel(self, select_obj):
|
|
"""
|
|
Cancel a wait on a read selection object
|
|
"""
|
|
if self._scheduler.running_task is not None:
|
|
self._scheduler.running_task.cancel_io_read_wait(select_obj)
|
|
else:
|
|
raise LookupError("Running task no longer running")
|
|
|
|
def io_write_wait(self, select_obj):
|
|
"""
|
|
Wait on a write selection object
|
|
"""
|
|
if self._scheduler.running_task is not None:
|
|
self._scheduler.running_task.add_io_write_wait(select_obj)
|
|
else:
|
|
raise LookupError("Running task no longer running")
|
|
|
|
def io_write_wait_cancel(self, select_obj):
|
|
"""
|
|
Cancel a wait on a write selection object
|
|
"""
|
|
if self._scheduler.running_task is not None:
|
|
self._scheduler.running_task.cancel_io_write_wait(select_obj)
|
|
else:
|
|
raise LookupError("Running task no longer running")
|
|
|
|
@property
|
|
def result(self):
|
|
"""
|
|
Returns the result of a future
|
|
"""
|
|
return self._result
|
|
|
|
@result.setter
|
|
def result(self, result):
|
|
"""
|
|
Set the result of a future
|
|
"""
|
|
self._result = result
|