aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2017-03-21 00:06:12 -0700
committerZac Medico <zmedico@gentoo.org>2017-03-24 13:32:25 -0700
commit61878e4fbdfef5f8512b34640089e954a14e6d12 (patch)
tree977fbbb7761ed80df785f0c2198d461a53ffb382
parentPollScheduler: terminate via call_soon for asyncio compat (diff)
downloadportage-61878e4fbdfef5f8512b34640089e954a14e6d12.tar.gz
portage-61878e4fbdfef5f8512b34640089e954a14e6d12.tar.bz2
portage-61878e4fbdfef5f8512b34640089e954a14e6d12.zip
MirrorDistTask: terminate via call_soon for asyncio compat
These changes are analogous to the PollScheduler changes in the previous commit.
-rw-r--r--pym/portage/_emirrordist/MirrorDistTask.py39
1 files changed, 27 insertions, 12 deletions
diff --git a/pym/portage/_emirrordist/MirrorDistTask.py b/pym/portage/_emirrordist/MirrorDistTask.py
index e23a11b3c..d6b3decc0 100644
--- a/pym/portage/_emirrordist/MirrorDistTask.py
+++ b/pym/portage/_emirrordist/MirrorDistTask.py
@@ -24,15 +24,16 @@ if sys.hexversion >= 0x3000000:
class MirrorDistTask(CompositeTask):
- __slots__ = ('_config', '_terminated', '_term_check_id')
+ __slots__ = ('_config', '_fetch_iterator', '_term_rlock',
+ '_term_callback_handle')
def __init__(self, config):
CompositeTask.__init__(self, scheduler=config.event_loop)
self._config = config
- self._terminated = threading.Event()
+ self._term_rlock = threading.RLock()
+ self._term_callback_handle = None
def _start(self):
- self._term_check_id = self.scheduler.idle_add(self._termination_check)
fetch = TaskScheduler(iter(FetchIterator(self._config)),
max_jobs=self._config.options.jobs,
max_load=self._config.options.load_average,
@@ -203,17 +204,31 @@ class MirrorDistTask(CompositeTask):
logging.info("added %i files" % added_file_count)
logging.info("added %i bytes total" % added_byte_count)
+ def _cleanup(self):
+ """
+ Cleanup any callbacks that have been registered with the global
+ event loop.
+ """
+ # The self._term_callback_handle attribute requires locking
+ # since it's modified by the thread safe terminate method.
+ with self._term_rlock:
+ if self._term_callback_handle not in (None, False):
+ self._term_callback_handle.cancel()
+ # This prevents the terminate method from scheduling
+ # any more callbacks (since _cleanup must eliminate all
+ # callbacks in order to ensure complete cleanup).
+ self._term_callback_handle = False
+
def terminate(self):
- self._terminated.set()
+ with self._term_rlock:
+ if self._term_callback_handle is None:
+ self._term_callback_handle = self.scheduler.call_soon_threadsafe(
+ self._term_callback)
- def _termination_check(self):
- if self._terminated.is_set():
- self.cancel()
- self.wait()
- return True
+ def _term_callback(self):
+ self.cancel()
+ self.wait()
def _wait(self):
CompositeTask._wait(self)
- if self._term_check_id is not None:
- self.scheduler.source_remove(self._term_check_id)
- self._term_check_id = None
+ self._cleanup()