aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Bersenev <bay@hackerdom.ru>2014-02-17 17:55:51 +0600
committerAlexander Bersenev <bay@hackerdom.ru>2014-02-17 17:55:51 +0600
commit5a3f506c9ef1cfd78940b0509f10ef94b4434e29 (patch)
tree147c35a17a8bcd8ff467bb3063adab623da51fac /portage_with_autodep/pym/_emerge/PollScheduler.py
parentfixed a deadlock (diff)
downloadautodep-5a3f506c9ef1cfd78940b0509f10ef94b4434e29.tar.gz
autodep-5a3f506c9ef1cfd78940b0509f10ef94b4434e29.tar.bz2
autodep-5a3f506c9ef1cfd78940b0509f10ef94b4434e29.zip
updated portage to 2.2.8-r1
Diffstat (limited to 'portage_with_autodep/pym/_emerge/PollScheduler.py')
-rw-r--r--portage_with_autodep/pym/_emerge/PollScheduler.py346
1 files changed, 94 insertions, 252 deletions
diff --git a/portage_with_autodep/pym/_emerge/PollScheduler.py b/portage_with_autodep/pym/_emerge/PollScheduler.py
index a2b5c24..965dc20 100644
--- a/portage_with_autodep/pym/_emerge/PollScheduler.py
+++ b/portage_with_autodep/pym/_emerge/PollScheduler.py
@@ -1,11 +1,8 @@
-# Copyright 1999-2011 Gentoo Foundation
+# Copyright 1999-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import gzip
import errno
-import logging
-import select
-import time
try:
import threading
@@ -15,36 +12,55 @@ except ImportError:
from portage import _encodings
from portage import _unicode_encode
from portage.util import writemsg_level
+from portage.util.SlotObject import SlotObject
+from portage.util._eventloop.EventLoop import EventLoop
+from portage.util._eventloop.global_event_loop import global_event_loop
-from _emerge.SlotObject import SlotObject
from _emerge.getloadavg import getloadavg
-from _emerge.PollConstants import PollConstants
-from _emerge.PollSelectAdapter import PollSelectAdapter
class PollScheduler(object):
class _sched_iface_class(SlotObject):
- __slots__ = ("output", "register", "schedule", "unregister")
+ __slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT",
+ "IO_PRI", "child_watch_add",
+ "idle_add", "io_add_watch", "iteration",
+ "output", "register", "run",
+ "source_remove", "timeout_add", "unregister")
- def __init__(self):
+ def __init__(self, main=False):
+ """
+ @param main: If True then use global_event_loop(), otherwise use
+ a local EventLoop instance (default is False, for safe use in
+ a non-main thread)
+ @type main: bool
+ """
self._terminated = threading.Event()
self._terminated_tasks = False
self._max_jobs = 1
self._max_load = None
self._jobs = 0
- self._poll_event_queue = []
- self._poll_event_handlers = {}
- self._poll_event_handler_ids = {}
- # Increment id for each new handler.
- self._event_handler_id = 0
- self._poll_obj = create_poll_instance()
self._scheduling = False
self._background = False
+ if main:
+ self._event_loop = global_event_loop()
+ else:
+ self._event_loop = EventLoop(main=False)
self.sched_iface = self._sched_iface_class(
+ IO_ERR=self._event_loop.IO_ERR,
+ IO_HUP=self._event_loop.IO_HUP,
+ IO_IN=self._event_loop.IO_IN,
+ IO_NVAL=self._event_loop.IO_NVAL,
+ IO_OUT=self._event_loop.IO_OUT,
+ IO_PRI=self._event_loop.IO_PRI,
+ child_watch_add=self._event_loop.child_watch_add,
+ idle_add=self._event_loop.idle_add,
+ io_add_watch=self._event_loop.io_add_watch,
+ iteration=self._event_loop.iteration,
output=self._task_output,
- register=self._register,
- schedule=self._schedule_wait,
- unregister=self._unregister)
+ register=self._event_loop.io_add_watch,
+ source_remove=self._event_loop.source_remove,
+ timeout_add=self._event_loop.timeout_add,
+ unregister=self._event_loop.source_remove)
def terminate(self):
"""
@@ -55,17 +71,47 @@ class PollScheduler(object):
"""
self._terminated.set()
+ def _termination_check(self):
+ """
+ Calls _terminate_tasks() if appropriate. It's guaranteed not to
+ call it while _schedule_tasks() is being called. The check should
+ be executed for each iteration of the event loop, for response to
+ termination signals at the earliest opportunity. It always returns
+ True, for continuous scheduling via idle_add.
+ """
+ if not self._scheduling and \
+ self._terminated.is_set() and \
+ not self._terminated_tasks:
+ self._scheduling = True
+ try:
+ self._terminated_tasks = True
+ self._terminate_tasks()
+ finally:
+ self._scheduling = False
+ return True
+
def _terminate_tasks(self):
"""
Send signals to terminate all tasks. This is called once
- from self._schedule() in the event dispatching thread. This
- prevents it from being called while the _schedule_tasks()
+ from _keep_scheduling() or _is_work_scheduled() in the event
+ dispatching thread. It will not be called while the _schedule_tasks()
implementation is running, in order to avoid potential
interference. All tasks should be cleaned up at the earliest
opportunity, but not necessarily before this method returns.
+ Typically, this method will send kill signals and return without
+ waiting for exit status. This allows basic cleanup to occur, such as
+ flushing of buffered output to logs.
"""
raise NotImplementedError()
+ def _keep_scheduling(self):
+ """
+ @rtype: bool
+ @return: True if there may be remaining tasks to schedule,
+ False otherwise.
+ """
+ return False
+
def _schedule_tasks(self):
"""
This is called from inside the _schedule() method, which
@@ -79,10 +125,10 @@ class PollScheduler(object):
Unless this method is used to perform user interface updates,
or something like that, the first thing it should do is check
the state of _terminated_tasks and if that is True then it
- should return False immediately (since there's no need to
+ should return immediately (since there's no need to
schedule anything after _terminate_tasks() has been called).
"""
- raise NotImplementedError()
+ pass
def _schedule(self):
"""
@@ -95,15 +141,32 @@ class PollScheduler(object):
return False
self._scheduling = True
try:
+ self._schedule_tasks()
+ finally:
+ self._scheduling = False
- if self._terminated.is_set() and \
- not self._terminated_tasks:
- self._terminated_tasks = True
- self._terminate_tasks()
+ def _main_loop(self):
+ term_check_id = self.sched_iface.idle_add(self._termination_check)
+ try:
+ # Populate initial event sources. We only need to do
+ # this once here, since it can be called during the
+ # loop from within event handlers.
+ self._schedule()
+
+ # Loop while there are jobs to be scheduled.
+ while self._keep_scheduling():
+ self.sched_iface.iteration()
- return self._schedule_tasks()
+ # Clean shutdown of previously scheduled jobs. In the
+ # case of termination, this allows for basic cleanup
+ # such as flushing of buffered output to logs.
+ while self._is_work_scheduled():
+ self.sched_iface.iteration()
finally:
- self._scheduling = False
+ self.sched_iface.source_remove(term_check_id)
+
+ def _is_work_scheduled(self):
+ return bool(self._running_job_count())
def _running_job_count(self):
return self._jobs
@@ -132,183 +195,6 @@ class PollScheduler(object):
return True
- def _poll(self, timeout=None):
- """
- All poll() calls pass through here. The poll events
- are added directly to self._poll_event_queue.
- In order to avoid endless blocking, this raises
- StopIteration if timeout is None and there are
- no file descriptors to poll.
- """
- if not self._poll_event_handlers:
- self._schedule()
- if timeout is None and \
- not self._poll_event_handlers:
- raise StopIteration(
- "timeout is None and there are no poll() event handlers")
-
- # The following error is known to occur with Linux kernel versions
- # less than 2.6.24:
- #
- # select.error: (4, 'Interrupted system call')
- #
- # This error has been observed after a SIGSTOP, followed by SIGCONT.
- # Treat it similar to EAGAIN if timeout is None, otherwise just return
- # without any events.
- while True:
- try:
- self._poll_event_queue.extend(self._poll_obj.poll(timeout))
- break
- except select.error as e:
- writemsg_level("\n!!! select error: %s\n" % (e,),
- level=logging.ERROR, noiselevel=-1)
- del e
- if timeout is not None:
- break
-
- def _next_poll_event(self, timeout=None):
- """
- Since the _schedule_wait() loop is called by event
- handlers from _poll_loop(), maintain a central event
- queue for both of them to share events from a single
- poll() call. In order to avoid endless blocking, this
- raises StopIteration if timeout is None and there are
- no file descriptors to poll.
- """
- if not self._poll_event_queue:
- self._poll(timeout)
- if not self._poll_event_queue:
- raise StopIteration()
- return self._poll_event_queue.pop()
-
- def _poll_loop(self):
-
- event_handlers = self._poll_event_handlers
- event_handled = False
-
- try:
- while event_handlers:
- f, event = self._next_poll_event()
- handler, reg_id = event_handlers[f]
- handler(f, event)
- event_handled = True
- except StopIteration:
- event_handled = True
-
- if not event_handled:
- raise AssertionError("tight loop")
-
- def _schedule_yield(self):
- """
- Schedule for a short period of time chosen by the scheduler based
- on internal state. Synchronous tasks should call this periodically
- in order to allow the scheduler to service pending poll events. The
- scheduler will call poll() exactly once, without blocking, and any
- resulting poll events will be serviced.
- """
- event_handlers = self._poll_event_handlers
- events_handled = 0
-
- if not event_handlers:
- return bool(events_handled)
-
- if not self._poll_event_queue:
- self._poll(0)
-
- try:
- while event_handlers and self._poll_event_queue:
- f, event = self._next_poll_event()
- handler, reg_id = event_handlers[f]
- handler(f, event)
- events_handled += 1
- except StopIteration:
- events_handled += 1
-
- return bool(events_handled)
-
- def _register(self, f, eventmask, handler):
- """
- @rtype: Integer
- @return: A unique registration id, for use in schedule() or
- unregister() calls.
- """
- if f in self._poll_event_handlers:
- raise AssertionError("fd %d is already registered" % f)
- self._event_handler_id += 1
- reg_id = self._event_handler_id
- self._poll_event_handler_ids[reg_id] = f
- self._poll_event_handlers[f] = (handler, reg_id)
- self._poll_obj.register(f, eventmask)
- return reg_id
-
- def _unregister(self, reg_id):
- f = self._poll_event_handler_ids[reg_id]
- self._poll_obj.unregister(f)
- if self._poll_event_queue:
- # Discard any unhandled events that belong to this file,
- # in order to prevent these events from being erroneously
- # delivered to a future handler that is using a reallocated
- # file descriptor of the same numeric value (causing
- # extremely confusing bugs).
- remaining_events = []
- discarded_events = False
- for event in self._poll_event_queue:
- if event[0] == f:
- discarded_events = True
- else:
- remaining_events.append(event)
-
- if discarded_events:
- self._poll_event_queue[:] = remaining_events
-
- del self._poll_event_handlers[f]
- del self._poll_event_handler_ids[reg_id]
-
- def _schedule_wait(self, wait_ids=None, timeout=None, condition=None):
- """
- Schedule until wait_id is not longer registered
- for poll() events.
- @type wait_id: int
- @param wait_id: a task id to wait for
- """
- event_handlers = self._poll_event_handlers
- handler_ids = self._poll_event_handler_ids
- event_handled = False
-
- if isinstance(wait_ids, int):
- wait_ids = frozenset([wait_ids])
-
- start_time = None
- remaining_timeout = timeout
- timed_out = False
- if timeout is not None:
- start_time = time.time()
- try:
- while (wait_ids is None and event_handlers) or \
- (wait_ids is not None and wait_ids.intersection(handler_ids)):
- f, event = self._next_poll_event(timeout=remaining_timeout)
- handler, reg_id = event_handlers[f]
- handler(f, event)
- event_handled = True
- if condition is not None and condition():
- break
- if timeout is not None:
- elapsed_time = time.time() - start_time
- if elapsed_time < 0:
- # The system clock has changed such that start_time
- # is now in the future, so just assume that the
- # timeout has already elapsed.
- timed_out = True
- break
- remaining_timeout = timeout - 1000 * elapsed_time
- if remaining_timeout <= 0:
- timed_out = True
- break
- except StopIteration:
- event_handled = True
-
- return event_handled
-
def _task_output(self, msg, log_path=None, background=None,
level=0, noiselevel=-1):
"""
@@ -333,6 +219,7 @@ class PollScheduler(object):
f = open(_unicode_encode(log_path,
encoding=_encodings['fs'], errors='strict'),
mode='ab')
+ f_real = f
except IOError as e:
if e.errno not in (errno.ENOENT, errno.ESTALE):
raise
@@ -349,50 +236,5 @@ class PollScheduler(object):
f.write(_unicode_encode(msg))
f.close()
-
-_can_poll_device = None
-
-def can_poll_device():
- """
- Test if it's possible to use poll() on a device such as a pty. This
- is known to fail on Darwin.
- @rtype: bool
- @returns: True if poll() on a device succeeds, False otherwise.
- """
-
- global _can_poll_device
- if _can_poll_device is not None:
- return _can_poll_device
-
- if not hasattr(select, "poll"):
- _can_poll_device = False
- return _can_poll_device
-
- try:
- dev_null = open('/dev/null', 'rb')
- except IOError:
- _can_poll_device = False
- return _can_poll_device
-
- p = select.poll()
- p.register(dev_null.fileno(), PollConstants.POLLIN)
-
- invalid_request = False
- for f, event in p.poll():
- if event & PollConstants.POLLNVAL:
- invalid_request = True
- break
- dev_null.close()
-
- _can_poll_device = not invalid_request
- return _can_poll_device
-
-def create_poll_instance():
- """
- Create an instance of select.poll, or an instance of
- PollSelectAdapter there is no poll() implementation or
- it is broken somehow.
- """
- if can_poll_device():
- return select.poll()
- return PollSelectAdapter()
+ if f_real is not f:
+ f_real.close()