aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'portage_with_autodep/pym/_emerge/SequentialTaskQueue.py')
-rw-r--r--portage_with_autodep/pym/_emerge/SequentialTaskQueue.py72
1 files changed, 32 insertions, 40 deletions
diff --git a/portage_with_autodep/pym/_emerge/SequentialTaskQueue.py b/portage_with_autodep/pym/_emerge/SequentialTaskQueue.py
index c1c98c4..8090893 100644
--- a/portage_with_autodep/pym/_emerge/SequentialTaskQueue.py
+++ b/portage_with_autodep/pym/_emerge/SequentialTaskQueue.py
@@ -1,13 +1,15 @@
-# Copyright 1999-2009 Gentoo Foundation
+# Copyright 1999-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
-import sys
-from _emerge.SlotObject import SlotObject
from collections import deque
+import sys
+
+from portage.util.SlotObject import SlotObject
+
class SequentialTaskQueue(SlotObject):
__slots__ = ("max_jobs", "running_tasks") + \
- ("_dirty", "_scheduling", "_task_queue")
+ ("_scheduling", "_task_queue")
def __init__(self, **kwargs):
SlotObject.__init__(self, **kwargs)
@@ -15,50 +17,34 @@ class SequentialTaskQueue(SlotObject):
self.running_tasks = set()
if self.max_jobs is None:
self.max_jobs = 1
- self._dirty = True
def add(self, task):
self._task_queue.append(task)
- self._dirty = True
+ self.schedule()
def addFront(self, task):
self._task_queue.appendleft(task)
- self._dirty = True
+ self.schedule()
def schedule(self):
- if not self._dirty:
- return False
-
- if not self:
- return False
-
if self._scheduling:
# Ignore any recursive schedule() calls triggered via
# self._task_exit().
- return False
+ return
self._scheduling = True
-
- task_queue = self._task_queue
- running_tasks = self.running_tasks
- max_jobs = self.max_jobs
- state_changed = False
-
- while task_queue and \
- (max_jobs is True or len(running_tasks) < max_jobs):
- task = task_queue.popleft()
- cancelled = getattr(task, "cancelled", None)
- if not cancelled:
- running_tasks.add(task)
- task.addExitListener(self._task_exit)
- task.start()
- state_changed = True
-
- self._dirty = False
- self._scheduling = False
-
- return state_changed
+ try:
+ while self._task_queue and (self.max_jobs is True or
+ len(self.running_tasks) < self.max_jobs):
+ task = self._task_queue.popleft()
+ cancelled = getattr(task, "cancelled", None)
+ if not cancelled:
+ self.running_tasks.add(task)
+ task.addExitListener(self._task_exit)
+ task.start()
+ finally:
+ self._scheduling = False
def _task_exit(self, task):
"""
@@ -68,16 +54,22 @@ class SequentialTaskQueue(SlotObject):
"""
self.running_tasks.remove(task)
if self._task_queue:
- self._dirty = True
+ self.schedule()
def clear(self):
+ """
+ Clear the task queue and asynchronously terminate any running tasks.
+ """
self._task_queue.clear()
- running_tasks = self.running_tasks
- while running_tasks:
- task = running_tasks.pop()
- task.removeExitListener(self._task_exit)
+ for task in list(self.running_tasks):
task.cancel()
- self._dirty = False
+
+ def wait(self):
+ """
+ Synchronously wait for all running tasks to exit.
+ """
+ while self.running_tasks:
+ next(iter(self.running_tasks)).wait()
def __bool__(self):
return bool(self._task_queue or self.running_tasks)