From a4de48a129ff6526ae19533af76730c4707d8a53 Mon Sep 17 00:00:00 2001 From: Allain Legacy Date: Wed, 31 May 2017 16:18:19 -0400 Subject: [PATCH] Permit aborting loopingcall while sleeping Some of the openstack services implement worker tasks that are based on the oslo-service LoopingCallBase objects. They do this as a way to have a task that runs periodically as a greenthread within a child worker process. For example, the neutron-server runs AgentStatusCheckWorker() objects as base service workers in its child worker processes. When the parent server process handles a SIGTERM signal it attempts to stop all services launched on each of the child worker processes (i.e., ProcessLauncher.stop()). That results in a stop() being called on each of the underlying base services and then a wait() to ensure that they complete before shutdown. If any service that is implemented on a LoopingCallBase related object is suspended on a greenthread.sleep() the previous call to stop() will have no effect and so the wait() will block until the sleep() finishes. For tasks that either have a frequent FixedLoopingBase interface or a short initial_delay this may not be a problem, but for those with a long delay this could mean that the wait() blocks for minutes before the process is allowed to shutdown. To solve this the LoopingCallBase calls to greenthread.sleep() are being replaced with a threading.Event() object's wait() method. This allows a caller of stop() to interrupt the sleep and expedite the shutdown. Closes-Bug: #1660210 Change-Id: I5835f9595826df5349e4cc8b1da8529bb960ee04 Signed-off-by: Allain Legacy --- oslo_service/loopingcall.py | 19 +++++++++++++------ oslo_service/tests/test_loopingcall.py | 14 +++++++------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/oslo_service/loopingcall.py b/oslo_service/loopingcall.py index 1747fda..ee2813d 100644 --- a/oslo_service/loopingcall.py +++ b/oslo_service/loopingcall.py @@ -18,6 +18,7 @@ import random import sys import time +import threading from eventlet import event from eventlet import greenthread @@ -85,19 +86,25 @@ class LoopingCallBase(object): self.args = args self.kw = kw self.f = f - self._running = False self._thread = None self.done = None + self.abort = threading.Event() + + @property + def _running(self): + return not self.abort.is_set() def stop(self): - self._running = False + self.abort.set() def wait(self): return self.done.wait() def _on_done(self, gt, *args, **kwargs): self._thread = None - self._running = False + + def _sleep(self, timeout): + return self.abort.wait(timeout) def _start(self, idle_for, initial_delay=None, stop_on_exception=True): """Start the looping @@ -114,8 +121,8 @@ class LoopingCallBase(object): """ if self._thread is not None: raise RuntimeError(self._RUN_ONLY_ONE_MESSAGE) - self._running = True self.done = event.Event() + self.abort.clear() self._thread = greenthread.spawn( self._run_loop, idle_for, initial_delay=initial_delay, stop_on_exception=stop_on_exception) @@ -129,7 +136,7 @@ class LoopingCallBase(object): func = self.f if stop_on_exception else _safe_wrapper(self.f, kind, func_name) if initial_delay: - greenthread.sleep(initial_delay) + self._sleep(initial_delay) try: watch = timeutils.StopWatch() while self._running: @@ -143,7 +150,7 @@ class LoopingCallBase(object): 'for %(idle).02f seconds', {'func_name': func_name, 'idle': idle, 'kind': kind}) - greenthread.sleep(idle) + self._sleep(idle) except LoopingCallDone as e: self.done.send(e.retvalue) except Exception: diff --git a/oslo_service/tests/test_loopingcall.py b/oslo_service/tests/test_loopingcall.py index 7ac8025..218e9d1 100644 --- a/oslo_service/tests/test_loopingcall.py +++ b/oslo_service/tests/test_loopingcall.py @@ -285,7 +285,7 @@ class DynamicLoopingCallTestCase(test_base.BaseTestCase): else: self.num_runs = self.num_runs - 1 - @mock.patch('eventlet.greenthread.sleep') + @mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep') def test_timeout_task_without_return(self, sleep_mock): self.num_runs = 1 timer = loopingcall.DynamicLoopingCall( @@ -294,7 +294,7 @@ class DynamicLoopingCallTestCase(test_base.BaseTestCase): timer.start(periodic_interval_max=5).wait() sleep_mock.assert_has_calls([mock.call(5)]) - @mock.patch('eventlet.greenthread.sleep') + @mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep') def test_interval_adjustment(self, sleep_mock): self.num_runs = 2 @@ -303,7 +303,7 @@ class DynamicLoopingCallTestCase(test_base.BaseTestCase): sleep_mock.assert_has_calls([mock.call(5), mock.call(1)]) - @mock.patch('eventlet.greenthread.sleep') + @mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep') def test_initial_delay(self, sleep_mock): self.num_runs = 1 @@ -315,7 +315,7 @@ class DynamicLoopingCallTestCase(test_base.BaseTestCase): class TestBackOffLoopingCall(test_base.BaseTestCase): @mock.patch('random.SystemRandom.gauss') - @mock.patch('eventlet.greenthread.sleep') + @mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep') def test_exponential_backoff(self, sleep_mock, random_mock): def false(): return False @@ -366,7 +366,7 @@ class TestBackOffLoopingCall(test_base.BaseTestCase): self.assertEqual(expected_times, sleep_mock.call_args_list) @mock.patch('random.SystemRandom.gauss') - @mock.patch('eventlet.greenthread.sleep') + @mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep') def test_no_backoff(self, sleep_mock, random_mock): random_mock.return_value = 1 func = mock.Mock() @@ -381,7 +381,7 @@ class TestBackOffLoopingCall(test_base.BaseTestCase): self.assertTrue(retvalue, 'return value') @mock.patch('random.SystemRandom.gauss') - @mock.patch('eventlet.greenthread.sleep') + @mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep') def test_no_sleep(self, sleep_mock, random_mock): # Any call that executes properly the first time shouldn't sleep random_mock.return_value = 1 @@ -394,7 +394,7 @@ class TestBackOffLoopingCall(test_base.BaseTestCase): self.assertTrue(retvalue, 'return value') @mock.patch('random.SystemRandom.gauss') - @mock.patch('eventlet.greenthread.sleep') + @mock.patch('oslo_service.loopingcall.LoopingCallBase._sleep') def test_max_interval(self, sleep_mock, random_mock): def false(): return False -- 2.7.4