diff options
author | Zac Medico <zmedico@gentoo.org> | 2017-03-21 00:06:12 -0700 |
---|---|---|
committer | Zac Medico <zmedico@gentoo.org> | 2017-03-24 13:32:25 -0700 |
commit | 61878e4fbdfef5f8512b34640089e954a14e6d12 (patch) | |
tree | 977fbbb7761ed80df785f0c2198d461a53ffb382 | |
parent | PollScheduler: terminate via call_soon for asyncio compat (diff) | |
download | portage-61878e4fbdfef5f8512b34640089e954a14e6d12.tar.gz portage-61878e4fbdfef5f8512b34640089e954a14e6d12.tar.bz2 portage-61878e4fbdfef5f8512b34640089e954a14e6d12.zip |
MirrorDistTask: terminate via call_soon for asyncio compat
These changes are analogous to the PollScheduler changes in
the previous commit.
-rw-r--r-- | pym/portage/_emirrordist/MirrorDistTask.py | 39 |
1 files changed, 27 insertions, 12 deletions
diff --git a/pym/portage/_emirrordist/MirrorDistTask.py b/pym/portage/_emirrordist/MirrorDistTask.py index e23a11b3c..d6b3decc0 100644 --- a/pym/portage/_emirrordist/MirrorDistTask.py +++ b/pym/portage/_emirrordist/MirrorDistTask.py @@ -24,15 +24,16 @@ if sys.hexversion >= 0x3000000: class MirrorDistTask(CompositeTask): - __slots__ = ('_config', '_terminated', '_term_check_id') + __slots__ = ('_config', '_fetch_iterator', '_term_rlock', + '_term_callback_handle') def __init__(self, config): CompositeTask.__init__(self, scheduler=config.event_loop) self._config = config - self._terminated = threading.Event() + self._term_rlock = threading.RLock() + self._term_callback_handle = None def _start(self): - self._term_check_id = self.scheduler.idle_add(self._termination_check) fetch = TaskScheduler(iter(FetchIterator(self._config)), max_jobs=self._config.options.jobs, max_load=self._config.options.load_average, @@ -203,17 +204,31 @@ class MirrorDistTask(CompositeTask): logging.info("added %i files" % added_file_count) logging.info("added %i bytes total" % added_byte_count) + def _cleanup(self): + """ + Cleanup any callbacks that have been registered with the global + event loop. + """ + # The self._term_callback_handle attribute requires locking + # since it's modified by the thread safe terminate method. + with self._term_rlock: + if self._term_callback_handle not in (None, False): + self._term_callback_handle.cancel() + # This prevents the terminate method from scheduling + # any more callbacks (since _cleanup must eliminate all + # callbacks in order to ensure complete cleanup). + self._term_callback_handle = False + def terminate(self): - self._terminated.set() + with self._term_rlock: + if self._term_callback_handle is None: + self._term_callback_handle = self.scheduler.call_soon_threadsafe( + self._term_callback) - def _termination_check(self): - if self._terminated.is_set(): - self.cancel() - self.wait() - return True + def _term_callback(self): + self.cancel() + self.wait() def _wait(self): CompositeTask._wait(self) - if self._term_check_id is not None: - self.scheduler.source_remove(self._term_check_id) - self._term_check_id = None + self._cleanup() |