diff options
author | Alexander Bersenev <bay@hackerdom.ru> | 2014-02-17 17:57:05 +0600 |
---|---|---|
committer | Alexander Bersenev <bay@hackerdom.ru> | 2014-02-17 17:57:05 +0600 |
commit | 6563293d18daed502ccdb663f3c72b4bae5fe23a (patch) | |
tree | d0a7d53a7c137feb4073c963408829f88ea75c92 /portage_with_autodep/pym/_emerge/Scheduler.py | |
parent | updated portage to 2.2.8-r1 (diff) | |
download | autodep-master.tar.gz autodep-master.tar.bz2 autodep-master.zip |
Diffstat (limited to 'portage_with_autodep/pym/_emerge/Scheduler.py')
-rw-r--r-- | portage_with_autodep/pym/_emerge/Scheduler.py | 234 |
1 files changed, 170 insertions, 64 deletions
diff --git a/portage_with_autodep/pym/_emerge/Scheduler.py b/portage_with_autodep/pym/_emerge/Scheduler.py index 30a7e10..d663e97 100644 --- a/portage_with_autodep/pym/_emerge/Scheduler.py +++ b/portage_with_autodep/pym/_emerge/Scheduler.py @@ -1,7 +1,7 @@ -# Copyright 1999-2012 Gentoo Foundation +# Copyright 1999-2013 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 -from __future__ import print_function +from __future__ import print_function, unicode_literals from collections import deque import gc @@ -18,7 +18,7 @@ import zlib import portage from portage import os from portage import _encodings -from portage import _unicode_decode, _unicode_encode +from portage import _unicode_encode from portage.cache.mappings import slot_dict_class from portage.elog.messages import eerror from portage.localization import _ @@ -28,6 +28,8 @@ from portage._sets import SETPREFIX from portage._sets.base import InternalPackageSet from portage.util import ensure_dirs, writemsg, writemsg_level from portage.util.SlotObject import SlotObject +from portage.util._async.SchedulerInterface import SchedulerInterface +from portage.util._eventloop.EventLoop import EventLoop from portage.package.ebuild.digestcheck import digestcheck from portage.package.ebuild.digestgen import digestgen from portage.package.ebuild.doebuild import (_check_temp_dir, @@ -50,6 +52,7 @@ from _emerge.EbuildFetcher import EbuildFetcher from _emerge.EbuildPhase import EbuildPhase from _emerge.emergelog import emergelog from _emerge.FakeVartree import FakeVartree +from _emerge.getloadavg import getloadavg from _emerge._find_deep_system_runtime_deps import _find_deep_system_runtime_deps from _emerge._flush_elog_mod_echo import _flush_elog_mod_echo from _emerge.JobStatusDisplay import JobStatusDisplay @@ -64,6 +67,9 @@ if sys.hexversion >= 0x3000000: class Scheduler(PollScheduler): + # max time between loadavg checks (milliseconds) + _loadavg_latency = 30000 + # max time between display status updates (milliseconds) _max_display_latency = 3000 @@ -79,7 +85,7 @@ class Scheduler(PollScheduler): _opts_no_self_update = frozenset(["--buildpkgonly", "--fetchonly", "--fetch-all-uri", "--pretend"]) - class _iface_class(PollScheduler._sched_iface_class): + class _iface_class(SchedulerInterface): __slots__ = ("fetch", "scheduleSetup", "scheduleUnpack") @@ -135,8 +141,7 @@ class Scheduler(PollScheduler): portage.exception.PortageException.__init__(self, value) def __init__(self, settings, trees, mtimedb, myopts, - spinner, mergelist=None, favorites=None, graph_config=None, - uninstall_only=False): + spinner, mergelist=None, favorites=None, graph_config=None): PollScheduler.__init__(self, main=True) if mergelist is not None: @@ -152,7 +157,6 @@ class Scheduler(PollScheduler): self._spinner = spinner self._mtimedb = mtimedb self._favorites = favorites - self._uninstall_only = uninstall_only self._args_set = InternalPackageSet(favorites, allow_repo=True) self._build_opts = self._build_opts_class() @@ -217,14 +221,15 @@ class Scheduler(PollScheduler): fetch_iface = self._fetch_iface_class(log_file=self._fetch_log, schedule=self._schedule_fetch) self._sched_iface = self._iface_class( + self._event_loop, + is_background=self._is_background, fetch=fetch_iface, scheduleSetup=self._schedule_setup, - scheduleUnpack=self._schedule_unpack, - **dict((k, getattr(self.sched_iface, k)) - for k in self.sched_iface.__slots__)) + scheduleUnpack=self._schedule_unpack) self._prefetchers = weakref.WeakValueDictionary() self._pkg_queue = [] + self._jobs = 0 self._running_tasks = {} self._completed_tasks = set() @@ -243,10 +248,15 @@ class Scheduler(PollScheduler): # The load average takes some time to respond when new # jobs are added, so we need to limit the rate of adding # new jobs. - self._job_delay_max = 10 - self._job_delay_factor = 1.0 - self._job_delay_exp = 1.5 + self._job_delay_max = 5 self._previous_job_start_time = None + self._job_delay_timeout_id = None + + # The load average takes some time to respond when after + # a SIGSTOP/SIGCONT cycle, so delay scheduling for some + # time after SIGCONT is received. + self._sigcont_delay = 5 + self._sigcont_time = None # This is used to memoize the _choose_pkg() result when # no packages can be chosen until one of the existing @@ -300,15 +310,10 @@ class Scheduler(PollScheduler): if not portage.dep.match_from_list( portage.const.PORTAGE_PACKAGE_ATOM, [x]): continue - if self._running_portage is None or \ - self._running_portage.cpv != x.cpv or \ - '9999' in x.cpv or \ - 'git' in x.inherited or \ - 'git-2' in x.inherited: - rval = _check_temp_dir(self.settings) - if rval != os.EX_OK: - return rval - _prepare_self_update(self.settings) + rval = _check_temp_dir(self.settings) + if rval != os.EX_OK: + return rval + _prepare_self_update(self.settings) break return os.EX_OK @@ -328,12 +333,13 @@ class Scheduler(PollScheduler): self._set_graph_config(graph_config) self._blocker_db = {} dynamic_deps = self.myopts.get("--dynamic-deps", "y") != "n" + ignore_built_slot_operator_deps = self.myopts.get( + "--ignore-built-slot-operator-deps", "n") == "y" for root in self.trees: - if self._uninstall_only: - continue if graph_config is None: fake_vartree = FakeVartree(self.trees[root]["root_config"], - pkg_cache=self._pkg_cache, dynamic_deps=dynamic_deps) + pkg_cache=self._pkg_cache, dynamic_deps=dynamic_deps, + ignore_built_slot_operator_deps=ignore_built_slot_operator_deps) fake_vartree.sync() else: fake_vartree = graph_config.trees[root]['vartree'] @@ -410,7 +416,7 @@ class Scheduler(PollScheduler): if not (isinstance(task, Package) and \ task.operation == "merge"): continue - if 'interactive' in task.metadata.properties: + if 'interactive' in task.properties: interactive_tasks.append(task) return interactive_tasks @@ -655,10 +661,11 @@ class Scheduler(PollScheduler): if value and value.strip(): continue msg = _("%(var)s is not set... " - "Are you missing the '%(configroot)setc/make.profile' symlink? " + "Are you missing the '%(configroot)s%(profile_path)s' symlink? " "Is the symlink correct? " "Is your portage tree complete?") % \ - {"var": var, "configroot": settings["PORTAGE_CONFIGROOT"]} + {"var": var, "configroot": settings["PORTAGE_CONFIGROOT"], + "profile_path": portage.const.PROFILE_PATH} out = portage.output.EOutput() for line in textwrap.wrap(msg, 70): @@ -718,7 +725,6 @@ class Scheduler(PollScheduler): return if self._parallel_fetch: - self._status_msg("Starting parallel fetch") prefetchers = self._prefetchers @@ -771,10 +777,10 @@ class Scheduler(PollScheduler): failures = 0 - # Use a local PollScheduler instance here, since we don't + # Use a local EventLoop instance here, since we don't # want tasks here to trigger the usual Scheduler callbacks # that handle job scheduling and status display. - sched_iface = PollScheduler().sched_iface + sched_iface = SchedulerInterface(EventLoop(main=False)) for x in self._mergelist: if not isinstance(x, Package): @@ -783,10 +789,10 @@ class Scheduler(PollScheduler): if x.operation == "uninstall": continue - if x.metadata["EAPI"] in ("0", "1", "2", "3"): + if x.eapi in ("0", "1", "2", "3"): continue - if "pretend" not in x.metadata.defined_phases: + if "pretend" not in x.defined_phases: continue out_str =">>> Running pre-merge checks for " + colorize("INFORM", x.cpv) + "\n" @@ -805,7 +811,7 @@ class Scheduler(PollScheduler): build_dir_path = os.path.join( os.path.realpath(settings["PORTAGE_TMPDIR"]), "portage", x.category, x.pf) - existing_buildir = os.path.isdir(build_dir_path) + existing_builddir = os.path.isdir(build_dir_path) settings["PORTAGE_BUILDDIR"] = build_dir_path build_dir = EbuildBuildDir(scheduler=sched_iface, settings=settings) @@ -816,7 +822,7 @@ class Scheduler(PollScheduler): # Clean up the existing build dir, in case pkg_pretend # checks for available space (bug #390711). - if existing_buildir: + if existing_builddir: if x.built: tree = "bintree" infloc = os.path.join(build_dir_path, "build-info") @@ -905,13 +911,18 @@ class Scheduler(PollScheduler): failures += 1 portage.elog.elog_process(x.cpv, settings) finally: - if current_task is not None and current_task.isAlive(): - current_task.cancel() - current_task.wait() - clean_phase = EbuildPhase(background=False, - phase='clean', scheduler=sched_iface, settings=settings) - clean_phase.start() - clean_phase.wait() + + if current_task is not None: + if current_task.isAlive(): + current_task.cancel() + current_task.wait() + if current_task.returncode == os.EX_OK: + clean_phase = EbuildPhase(background=False, + phase='clean', scheduler=sched_iface, + settings=settings) + clean_phase.start() + clean_phase.wait() + build_dir.unlock() if failures: @@ -1001,6 +1012,8 @@ class Scheduler(PollScheduler): earlier_sigint_handler = signal.signal(signal.SIGINT, sighandler) earlier_sigterm_handler = signal.signal(signal.SIGTERM, sighandler) + earlier_sigcont_handler = \ + signal.signal(signal.SIGCONT, self._sigcont_handler) try: rval = self._merge() @@ -1014,6 +1027,10 @@ class Scheduler(PollScheduler): signal.signal(signal.SIGTERM, earlier_sigterm_handler) else: signal.signal(signal.SIGTERM, signal.SIG_DFL) + if earlier_sigcont_handler is not None: + signal.signal(signal.SIGCONT, earlier_sigcont_handler) + else: + signal.signal(signal.SIGCONT, signal.SIG_DFL) if received_signal: sys.exit(received_signal[0]) @@ -1060,7 +1077,8 @@ class Scheduler(PollScheduler): printer = portage.output.EOutput() background = self._background failure_log_shown = False - if background and len(self._failed_pkgs_all) == 1: + if background and len(self._failed_pkgs_all) == 1 and \ + self.myopts.get('--quiet-fail', 'n') != 'y': # If only one package failed then just show it's # whole log for easy viewing. failed_pkg = self._failed_pkgs_all[-1] @@ -1139,9 +1157,9 @@ class Scheduler(PollScheduler): printer.eerror(line) printer.eerror("") for failed_pkg in self._failed_pkgs_all: - # Use _unicode_decode() to force unicode format string so + # Use unicode_literals to force unicode format string so # that Package.__unicode__() is called in python2. - msg = _unicode_decode(" %s") % (failed_pkg.pkg,) + msg = " %s" % (failed_pkg.pkg,) log_path = self._locate_failure_log(failed_pkg) if log_path is not None: msg += ", Log file:" @@ -1338,6 +1356,38 @@ class Scheduler(PollScheduler): blocker_db = self._blocker_db[pkg.root] blocker_db.discardBlocker(pkg) + def _main_loop(self): + term_check_id = self._event_loop.idle_add(self._termination_check) + loadavg_check_id = None + if self._max_load is not None and \ + self._loadavg_latency is not None and \ + (self._max_jobs is True or self._max_jobs > 1): + # We have to schedule periodically, in case the load + # average has changed since the last call. + loadavg_check_id = self._event_loop.timeout_add( + self._loadavg_latency, self._schedule) + + try: + # Populate initial event sources. Unless we're scheduling + # based on load average, we only need to do this once + # here, since it can be called during the loop from within + # event handlers. + self._schedule() + + # Loop while there are jobs to be scheduled. + while self._keep_scheduling(): + self._event_loop.iteration() + + # Clean shutdown of previously scheduled jobs. In the + # case of termination, this allows for basic cleanup + # such as flushing of buffered output to logs. + while self._is_work_scheduled(): + self._event_loop.iteration() + finally: + self._event_loop.source_remove(term_check_id) + if loadavg_check_id is not None: + self._event_loop.source_remove(loadavg_check_id) + def _merge(self): if self._opts_no_background.intersection(self.myopts): @@ -1348,8 +1398,10 @@ class Scheduler(PollScheduler): failed_pkgs = self._failed_pkgs portage.locks._quiet = self._background portage.elog.add_listener(self._elog_listener) - display_timeout_id = self.sched_iface.timeout_add( - self._max_display_latency, self._status_display.display) + display_timeout_id = None + if self._status_display._isatty and not self._status_display.quiet: + display_timeout_id = self._event_loop.timeout_add( + self._max_display_latency, self._status_display.display) rval = os.EX_OK try: @@ -1358,7 +1410,8 @@ class Scheduler(PollScheduler): self._main_loop_cleanup() portage.locks._quiet = False portage.elog.remove_listener(self._elog_listener) - self.sched_iface.source_remove(display_timeout_id) + if display_timeout_id is not None: + self._event_loop.source_remove(display_timeout_id) if failed_pkgs: rval = failed_pkgs[-1].returncode @@ -1490,12 +1543,15 @@ class Scheduler(PollScheduler): self._config_pool[settings['EROOT']].append(settings) def _keep_scheduling(self): - return bool(not self._terminated_tasks and self._pkg_queue and \ + return bool(not self._terminated.is_set() and self._pkg_queue and \ not (self._failed_pkgs and not self._build_opts.fetchonly)) def _is_work_scheduled(self): return bool(self._running_tasks) + def _running_job_count(self): + return self._jobs + def _schedule_tasks(self): while True: @@ -1536,6 +1592,9 @@ class Scheduler(PollScheduler): not self._task_queues.merge)): break + def _sigcont_handler(self, signum, frame): + self._sigcont_time = time.time() + def _job_delay(self): """ @rtype: bool @@ -1546,14 +1605,53 @@ class Scheduler(PollScheduler): current_time = time.time() - delay = self._job_delay_factor * self._jobs ** self._job_delay_exp + if self._sigcont_time is not None: + + elapsed_seconds = current_time - self._sigcont_time + # elapsed_seconds < 0 means the system clock has been adjusted + if elapsed_seconds > 0 and \ + elapsed_seconds < self._sigcont_delay: + + if self._job_delay_timeout_id is not None: + self._event_loop.source_remove( + self._job_delay_timeout_id) + + self._job_delay_timeout_id = self._event_loop.timeout_add( + 1000 * (self._sigcont_delay - elapsed_seconds), + self._schedule_once) + return True + + # Only set this to None after the delay has expired, + # since this method may be called again before the + # delay has expired. + self._sigcont_time = None + + try: + avg1, avg5, avg15 = getloadavg() + except OSError: + return False + + delay = self._job_delay_max * avg1 / self._max_load if delay > self._job_delay_max: delay = self._job_delay_max - if (current_time - self._previous_job_start_time) < delay: + elapsed_seconds = current_time - self._previous_job_start_time + # elapsed_seconds < 0 means the system clock has been adjusted + if elapsed_seconds > 0 and elapsed_seconds < delay: + + if self._job_delay_timeout_id is not None: + self._event_loop.source_remove( + self._job_delay_timeout_id) + + self._job_delay_timeout_id = self._event_loop.timeout_add( + 1000 * (delay - elapsed_seconds), self._schedule_once) return True return False + def _schedule_once(self): + self._schedule() + return False + def _schedule_tasks_imp(self): """ @rtype: bool @@ -1735,7 +1833,7 @@ class Scheduler(PollScheduler): # scope e = exc mydepgraph = e.depgraph - dropped_tasks = set() + dropped_tasks = {} if e is not None: def unsatisfied_resume_dep_msg(): @@ -1772,11 +1870,7 @@ class Scheduler(PollScheduler): return False if success and self._show_list(): - mylist = mydepgraph.altlist() - if mylist: - if "--tree" in self.myopts: - mylist.reverse() - mydepgraph.display(mylist, favorites=self._favorites) + mydepgraph.display(mydepgraph.altlist(), favorites=self._favorites) if not success: self._post_mod_echo_msgs.append(mydepgraph.display_problems) @@ -1785,7 +1879,7 @@ class Scheduler(PollScheduler): self._init_graph(mydepgraph.schedulerGraph()) msg_width = 75 - for task in dropped_tasks: + for task, atoms in dropped_tasks.items(): if not (isinstance(task, Package) and task.operation == "merge"): continue pkg = task @@ -1793,7 +1887,10 @@ class Scheduler(PollScheduler): " %s" % (pkg.cpv,) if pkg.root_config.settings["ROOT"] != "/": msg += " for %s" % (pkg.root,) - msg += " dropped due to unsatisfied dependency." + if not atoms: + msg += " dropped because it is masked or unavailable" + else: + msg += " dropped because it requires %s" % ", ".join(atoms) for line in textwrap.wrap(msg, msg_width): eerror(line, phase="other", key=pkg.cpv) settings = self.pkgsettings[pkg.root] @@ -1838,11 +1935,21 @@ class Scheduler(PollScheduler): root_config = pkg.root_config world_set = root_config.sets["selected"] world_locked = False - if hasattr(world_set, "lock"): - world_set.lock() - world_locked = True + atom = None + + if pkg.operation != "uninstall": + # Do this before acquiring the lock, since it queries the + # portdbapi which can call the global event loop, triggering + # a concurrent call to this method or something else that + # needs an exclusive (non-reentrant) lock on the world file. + atom = create_world_atom(pkg, args_set, root_config) try: + + if hasattr(world_set, "lock"): + world_set.lock() + world_locked = True + if hasattr(world_set, "load"): world_set.load() # maybe it's changed on disk @@ -1854,8 +1961,7 @@ class Scheduler(PollScheduler): for s in pkg.root_config.setconfig.active: world_set.remove(SETPREFIX+s) else: - atom = create_world_atom(pkg, args_set, root_config) - if atom: + if atom is not None: if hasattr(world_set, "add"): self._status_msg(('Recording %s in "world" ' + \ 'favorites file...') % atom) |