diff options
author | Alexander Bersenev <bay@hackerdom.ru> | 2014-02-17 17:55:51 +0600 |
---|---|---|
committer | Alexander Bersenev <bay@hackerdom.ru> | 2014-02-17 17:55:51 +0600 |
commit | 5a3f506c9ef1cfd78940b0509f10ef94b4434e29 (patch) | |
tree | 147c35a17a8bcd8ff467bb3063adab623da51fac /portage_with_autodep/pym/_emerge/PollScheduler.py | |
parent | fixed a deadlock (diff) | |
download | autodep-5a3f506c9ef1cfd78940b0509f10ef94b4434e29.tar.gz autodep-5a3f506c9ef1cfd78940b0509f10ef94b4434e29.tar.bz2 autodep-5a3f506c9ef1cfd78940b0509f10ef94b4434e29.zip |
updated portage to 2.2.8-r1
Diffstat (limited to 'portage_with_autodep/pym/_emerge/PollScheduler.py')
-rw-r--r-- | portage_with_autodep/pym/_emerge/PollScheduler.py | 346 |
1 files changed, 94 insertions, 252 deletions
diff --git a/portage_with_autodep/pym/_emerge/PollScheduler.py b/portage_with_autodep/pym/_emerge/PollScheduler.py index a2b5c24..965dc20 100644 --- a/portage_with_autodep/pym/_emerge/PollScheduler.py +++ b/portage_with_autodep/pym/_emerge/PollScheduler.py @@ -1,11 +1,8 @@ -# Copyright 1999-2011 Gentoo Foundation +# Copyright 1999-2012 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import gzip import errno -import logging -import select -import time try: import threading @@ -15,36 +12,55 @@ except ImportError: from portage import _encodings from portage import _unicode_encode from portage.util import writemsg_level +from portage.util.SlotObject import SlotObject +from portage.util._eventloop.EventLoop import EventLoop +from portage.util._eventloop.global_event_loop import global_event_loop -from _emerge.SlotObject import SlotObject from _emerge.getloadavg import getloadavg -from _emerge.PollConstants import PollConstants -from _emerge.PollSelectAdapter import PollSelectAdapter class PollScheduler(object): class _sched_iface_class(SlotObject): - __slots__ = ("output", "register", "schedule", "unregister") + __slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT", + "IO_PRI", "child_watch_add", + "idle_add", "io_add_watch", "iteration", + "output", "register", "run", + "source_remove", "timeout_add", "unregister") - def __init__(self): + def __init__(self, main=False): + """ + @param main: If True then use global_event_loop(), otherwise use + a local EventLoop instance (default is False, for safe use in + a non-main thread) + @type main: bool + """ self._terminated = threading.Event() self._terminated_tasks = False self._max_jobs = 1 self._max_load = None self._jobs = 0 - self._poll_event_queue = [] - self._poll_event_handlers = {} - self._poll_event_handler_ids = {} - # Increment id for each new handler. - self._event_handler_id = 0 - self._poll_obj = create_poll_instance() self._scheduling = False self._background = False + if main: + self._event_loop = global_event_loop() + else: + self._event_loop = EventLoop(main=False) self.sched_iface = self._sched_iface_class( + IO_ERR=self._event_loop.IO_ERR, + IO_HUP=self._event_loop.IO_HUP, + IO_IN=self._event_loop.IO_IN, + IO_NVAL=self._event_loop.IO_NVAL, + IO_OUT=self._event_loop.IO_OUT, + IO_PRI=self._event_loop.IO_PRI, + child_watch_add=self._event_loop.child_watch_add, + idle_add=self._event_loop.idle_add, + io_add_watch=self._event_loop.io_add_watch, + iteration=self._event_loop.iteration, output=self._task_output, - register=self._register, - schedule=self._schedule_wait, - unregister=self._unregister) + register=self._event_loop.io_add_watch, + source_remove=self._event_loop.source_remove, + timeout_add=self._event_loop.timeout_add, + unregister=self._event_loop.source_remove) def terminate(self): """ @@ -55,17 +71,47 @@ class PollScheduler(object): """ self._terminated.set() + def _termination_check(self): + """ + Calls _terminate_tasks() if appropriate. It's guaranteed not to + call it while _schedule_tasks() is being called. The check should + be executed for each iteration of the event loop, for response to + termination signals at the earliest opportunity. It always returns + True, for continuous scheduling via idle_add. + """ + if not self._scheduling and \ + self._terminated.is_set() and \ + not self._terminated_tasks: + self._scheduling = True + try: + self._terminated_tasks = True + self._terminate_tasks() + finally: + self._scheduling = False + return True + def _terminate_tasks(self): """ Send signals to terminate all tasks. This is called once - from self._schedule() in the event dispatching thread. This - prevents it from being called while the _schedule_tasks() + from _keep_scheduling() or _is_work_scheduled() in the event + dispatching thread. It will not be called while the _schedule_tasks() implementation is running, in order to avoid potential interference. All tasks should be cleaned up at the earliest opportunity, but not necessarily before this method returns. + Typically, this method will send kill signals and return without + waiting for exit status. This allows basic cleanup to occur, such as + flushing of buffered output to logs. """ raise NotImplementedError() + def _keep_scheduling(self): + """ + @rtype: bool + @return: True if there may be remaining tasks to schedule, + False otherwise. + """ + return False + def _schedule_tasks(self): """ This is called from inside the _schedule() method, which @@ -79,10 +125,10 @@ class PollScheduler(object): Unless this method is used to perform user interface updates, or something like that, the first thing it should do is check the state of _terminated_tasks and if that is True then it - should return False immediately (since there's no need to + should return immediately (since there's no need to schedule anything after _terminate_tasks() has been called). """ - raise NotImplementedError() + pass def _schedule(self): """ @@ -95,15 +141,32 @@ class PollScheduler(object): return False self._scheduling = True try: + self._schedule_tasks() + finally: + self._scheduling = False - if self._terminated.is_set() and \ - not self._terminated_tasks: - self._terminated_tasks = True - self._terminate_tasks() + def _main_loop(self): + term_check_id = self.sched_iface.idle_add(self._termination_check) + try: + # Populate initial event sources. We only need to do + # this once here, since it can be called during the + # loop from within event handlers. + self._schedule() + + # Loop while there are jobs to be scheduled. + while self._keep_scheduling(): + self.sched_iface.iteration() - return self._schedule_tasks() + # Clean shutdown of previously scheduled jobs. In the + # case of termination, this allows for basic cleanup + # such as flushing of buffered output to logs. + while self._is_work_scheduled(): + self.sched_iface.iteration() finally: - self._scheduling = False + self.sched_iface.source_remove(term_check_id) + + def _is_work_scheduled(self): + return bool(self._running_job_count()) def _running_job_count(self): return self._jobs @@ -132,183 +195,6 @@ class PollScheduler(object): return True - def _poll(self, timeout=None): - """ - All poll() calls pass through here. The poll events - are added directly to self._poll_event_queue. - In order to avoid endless blocking, this raises - StopIteration if timeout is None and there are - no file descriptors to poll. - """ - if not self._poll_event_handlers: - self._schedule() - if timeout is None and \ - not self._poll_event_handlers: - raise StopIteration( - "timeout is None and there are no poll() event handlers") - - # The following error is known to occur with Linux kernel versions - # less than 2.6.24: - # - # select.error: (4, 'Interrupted system call') - # - # This error has been observed after a SIGSTOP, followed by SIGCONT. - # Treat it similar to EAGAIN if timeout is None, otherwise just return - # without any events. - while True: - try: - self._poll_event_queue.extend(self._poll_obj.poll(timeout)) - break - except select.error as e: - writemsg_level("\n!!! select error: %s\n" % (e,), - level=logging.ERROR, noiselevel=-1) - del e - if timeout is not None: - break - - def _next_poll_event(self, timeout=None): - """ - Since the _schedule_wait() loop is called by event - handlers from _poll_loop(), maintain a central event - queue for both of them to share events from a single - poll() call. In order to avoid endless blocking, this - raises StopIteration if timeout is None and there are - no file descriptors to poll. - """ - if not self._poll_event_queue: - self._poll(timeout) - if not self._poll_event_queue: - raise StopIteration() - return self._poll_event_queue.pop() - - def _poll_loop(self): - - event_handlers = self._poll_event_handlers - event_handled = False - - try: - while event_handlers: - f, event = self._next_poll_event() - handler, reg_id = event_handlers[f] - handler(f, event) - event_handled = True - except StopIteration: - event_handled = True - - if not event_handled: - raise AssertionError("tight loop") - - def _schedule_yield(self): - """ - Schedule for a short period of time chosen by the scheduler based - on internal state. Synchronous tasks should call this periodically - in order to allow the scheduler to service pending poll events. The - scheduler will call poll() exactly once, without blocking, and any - resulting poll events will be serviced. - """ - event_handlers = self._poll_event_handlers - events_handled = 0 - - if not event_handlers: - return bool(events_handled) - - if not self._poll_event_queue: - self._poll(0) - - try: - while event_handlers and self._poll_event_queue: - f, event = self._next_poll_event() - handler, reg_id = event_handlers[f] - handler(f, event) - events_handled += 1 - except StopIteration: - events_handled += 1 - - return bool(events_handled) - - def _register(self, f, eventmask, handler): - """ - @rtype: Integer - @return: A unique registration id, for use in schedule() or - unregister() calls. - """ - if f in self._poll_event_handlers: - raise AssertionError("fd %d is already registered" % f) - self._event_handler_id += 1 - reg_id = self._event_handler_id - self._poll_event_handler_ids[reg_id] = f - self._poll_event_handlers[f] = (handler, reg_id) - self._poll_obj.register(f, eventmask) - return reg_id - - def _unregister(self, reg_id): - f = self._poll_event_handler_ids[reg_id] - self._poll_obj.unregister(f) - if self._poll_event_queue: - # Discard any unhandled events that belong to this file, - # in order to prevent these events from being erroneously - # delivered to a future handler that is using a reallocated - # file descriptor of the same numeric value (causing - # extremely confusing bugs). - remaining_events = [] - discarded_events = False - for event in self._poll_event_queue: - if event[0] == f: - discarded_events = True - else: - remaining_events.append(event) - - if discarded_events: - self._poll_event_queue[:] = remaining_events - - del self._poll_event_handlers[f] - del self._poll_event_handler_ids[reg_id] - - def _schedule_wait(self, wait_ids=None, timeout=None, condition=None): - """ - Schedule until wait_id is not longer registered - for poll() events. - @type wait_id: int - @param wait_id: a task id to wait for - """ - event_handlers = self._poll_event_handlers - handler_ids = self._poll_event_handler_ids - event_handled = False - - if isinstance(wait_ids, int): - wait_ids = frozenset([wait_ids]) - - start_time = None - remaining_timeout = timeout - timed_out = False - if timeout is not None: - start_time = time.time() - try: - while (wait_ids is None and event_handlers) or \ - (wait_ids is not None and wait_ids.intersection(handler_ids)): - f, event = self._next_poll_event(timeout=remaining_timeout) - handler, reg_id = event_handlers[f] - handler(f, event) - event_handled = True - if condition is not None and condition(): - break - if timeout is not None: - elapsed_time = time.time() - start_time - if elapsed_time < 0: - # The system clock has changed such that start_time - # is now in the future, so just assume that the - # timeout has already elapsed. - timed_out = True - break - remaining_timeout = timeout - 1000 * elapsed_time - if remaining_timeout <= 0: - timed_out = True - break - except StopIteration: - event_handled = True - - return event_handled - def _task_output(self, msg, log_path=None, background=None, level=0, noiselevel=-1): """ @@ -333,6 +219,7 @@ class PollScheduler(object): f = open(_unicode_encode(log_path, encoding=_encodings['fs'], errors='strict'), mode='ab') + f_real = f except IOError as e: if e.errno not in (errno.ENOENT, errno.ESTALE): raise @@ -349,50 +236,5 @@ class PollScheduler(object): f.write(_unicode_encode(msg)) f.close() - -_can_poll_device = None - -def can_poll_device(): - """ - Test if it's possible to use poll() on a device such as a pty. This - is known to fail on Darwin. - @rtype: bool - @returns: True if poll() on a device succeeds, False otherwise. - """ - - global _can_poll_device - if _can_poll_device is not None: - return _can_poll_device - - if not hasattr(select, "poll"): - _can_poll_device = False - return _can_poll_device - - try: - dev_null = open('/dev/null', 'rb') - except IOError: - _can_poll_device = False - return _can_poll_device - - p = select.poll() - p.register(dev_null.fileno(), PollConstants.POLLIN) - - invalid_request = False - for f, event in p.poll(): - if event & PollConstants.POLLNVAL: - invalid_request = True - break - dev_null.close() - - _can_poll_device = not invalid_request - return _can_poll_device - -def create_poll_instance(): - """ - Create an instance of select.poll, or an instance of - PollSelectAdapter there is no poll() implementation or - it is broken somehow. - """ - if can_poll_device(): - return select.poll() - return PollSelectAdapter() + if f_real is not f: + f_real.close() |