diff options
author | André Erdmann <dywi@mailerd.de> | 2012-07-13 12:39:41 +0200 |
---|---|---|
committer | André Erdmann <dywi@mailerd.de> | 2012-07-13 12:39:41 +0200 |
commit | bedc1a5d0029f17c255422310bd5ef98aa7d3fb8 (patch) | |
tree | 07bd5ea25608ef5ef2c03f45173b6bf863ffb7b0 /roverlay | |
parent | return negative score if not resolved (diff) | |
download | R_overlay-bedc1a5d0029f17c255422310bd5ef98aa7d3fb8.tar.gz R_overlay-bedc1a5d0029f17c255422310bd5ef98aa7d3fb8.tar.bz2 R_overlay-bedc1a5d0029f17c255422310bd5ef98aa7d3fb8.zip |
depresolver: separate run method for threads
the run-method code for the depresolver is being split up into
a threaded and not-threaded variant, to increase readability,
stability and performance.
* threaded resolving temporarily removed
Diffstat (limited to 'roverlay')
-rw-r--r-- | roverlay/depres/depresolver.py | 355 |
1 files changed, 179 insertions, 176 deletions
diff --git a/roverlay/depres/depresolver.py b/roverlay/depres/depresolver.py index bbe58b4..5f85ad1 100644 --- a/roverlay/depres/depresolver.py +++ b/roverlay/depres/depresolver.py @@ -16,10 +16,8 @@ from roverlay import config from roverlay.depres import communication, deptype, events #from roverlay.depres import simpledeprule -#from roverlay.depres.depenv import DepEnv (implicit) - -# if false: do not using the "negative" result caching which stores +# if false: do not use the "negative" result caching which stores # unresolvable deps in a set for should-be faster lookups USING_DEPRES_CACHE = True @@ -29,9 +27,6 @@ SAFE_CHANNEL_IDS = True class DependencyResolver ( object ): """Main object for dependency resolution.""" - - NUMTHREADS = config.get ( "DEPRES.jobcount", 0 ) - def __init__ ( self, err_queue ): """Initializes a DependencyResolver.""" @@ -44,16 +39,17 @@ class DependencyResolver ( object ): 'RESOLVED', 'UNRESOLVABLE' ) - # this lock tells whether a dep res 'master' thread is running (locked) - self.runlock = threading.Lock() - self.startlock = threading.Lock() - # the dep res main thread - self._mainthread = None - # the dep res worker threads - self._threads = None + self._jobs = config.get ( "DEPRES.jobcount", 0 ) - self.err_queue = err_queue + # used to lock the run methods, + self._runlock = threading.Lock() + if self._jobs > 0: + # the dep res main thread + self._mainthread = None + + + self.err_queue = err_queue # the list of registered listener modules self.listeners = list () @@ -139,13 +135,13 @@ class DependencyResolver ( object ): # log this event if event_type == events.DEPRES_EVENTS ['RESOLVED']: self.logger_resolved.info ( - "'%s' as '%s'" % ( dep_env.dep_str, dep_env.resolved_by ) + "{!r} as {!r}".format ( dep_env.dep_str, dep_env.resolved_by ) ) elif event_type == events.DEPRES_EVENTS ['UNRESOLVABLE']: - self.logger_unresolvable.info ( "'%s'" % dep_env.dep_str ) + self.logger_unresolvable.info ( "{!r}".format ( dep_env.dep_str ) ) else: # "generic" event, expects that kw msg is set - self.logger.debug ( "event %s : %s" % ( event, msg ) ) + self.logger.debug ( "event {}: {}".format ( event, msg ) ) # --- if if self.listenermask & event_type: @@ -274,48 +270,180 @@ class DependencyResolver ( object ): # --- end of _queue_previously_failed (...) --- def start ( self ): - # -- verify whether resolver has to be started - if self._depqueue.empty(): - # nothing to resolve - return + if self._jobs == 0: + if not self._depqueue.empty(): + self._run_resolver() + if not self.err_queue.really_empty(): + self.err_queue.unblock_queues() + else: + # new resolver threads run async and + # can be started with an empty depqueue + if self._runlock.acquire ( False ): + # else resolver is running + + self._mainthread = threading.Thread ( + target=self._thread_run_resolver + ) + self._mainthread.start() + # _thread_run_resolver has to release the lock when done + # --- end of start (...) --- + + def _process_unresolvable_queue ( self ): + # iterate over _depqueue_failed and report unresolved + while not self._depqueue_failed.empty() and self.err_queue.empty: + try: + channel_id, dep_env = self._depqueue_failed.get_nowait() + except queue.Empty: + # race cond empty() <-> get_nowait() + return + + dep_env.set_unresolvable() + self._report_event ( 'UNRESOLVABLE', dep_env ) + + try: + if channel_id in self._depqueue_done: + self._depqueue_done [channel_id].put_nowait ( dep_env ) + except KeyError: + # channel has been closed before calling put, ignore this + pass + # --- end of _process_unresolvable_queue (...) --- + + def _process_dep ( self, queue_item ): + channel_id, dep_env = queue_item + + # drop dep if channel closed + if not channel_id in self._depqueue_done: return + + self.logger.debug ( + "Trying to resolve {!r}.".format ( dep_env.dep_str ) + ) - if not self.startlock.acquire ( False ): - # another channel/.. is starting the resolver - return - elif self._depqueue.empty(): - self.startlock.release() - return + resolved = None + # resolved can be None, so use a tri-state int for checking + # 0 -> unresolved, but resolvable + # 1 -> unresolved and (currently, new rules may change this) + # not resolvable + # 2 -> resolved + is_resolved = 0 - # -- verify... + if USING_DEPRES_CACHE and dep_env.dep_str_low in self._dep_unresolvable: + # cannot resolve + is_resolved = 1 - # acquire the run lock (that locks _run_main) + else: + # search for a match in the rule pools that accept the dep type + for rulepool in ( + p for p in self.static_rule_pools \ + if p.deptype_mask & dep_env.deptype_mask + ): + result = rulepool.matches ( dep_env ) + if result [0] > 0: + resolved = result [1] + is_resolved = 2 + break + + if is_resolved == 0 and dep_env.deptype_mask & deptype.try_other: + ## TRY_OTHER bit is set + # search for a match in the rule pools + # that (normally) don't accept the dep type + for rulepool in ( + p for p in self.static_rule_pools \ + if p.deptype_mask & ~dep_env.deptype_mask + ): + result = rulepool.matches ( dep_env ) + if result [0] > 0: + resolved = result [1] + is_resolved = 2 + break + # -- + + + # -- done with resolving + + if is_resolved != 2: + # could not resolve dep_env + self._depqueue_failed.put ( queue_item ) + if USING_DEPRES_CACHE: + # does not work when adding new rules is possible + self._dep_unresolvable.add ( dep_env.dep_str_low ) + else: + # successfully resolved + dep_env.set_resolved ( resolved, append=False ) + self._report_event ( 'RESOLVED', dep_env ) + try: + self._depqueue_done [channel_id].put ( dep_env ) + except KeyError: + # channel gone while resolving + pass + + """ + ## only useful if new rules can be created + # new rule found, requeue all previously + # failed dependency searches + if have_new_rule: + self._queue_previously_failed + if USING_DEPRES_CACHE: + self._dep_unresolvable.clear() #? + """ + # --- end of _process_dep (...) --- + + def _run_resolver ( self ): + # single-threaded variant of run + # still checking err_queue 'cause other modules + # could be run with threads + if self._depqueue.empty(): return try: - self.runlock.acquire() + self._runlock.acquire() + while not self._depqueue.empty() and self.err_queue.empty: + to_resolve = self._depqueue.get_nowait() + self._process_dep ( queue_item=to_resolve ) + self._depqueue.task_done() + + self._process_unresolvable_queue() + except ( Exception, KeyboardInterrupt ) as e: + # single-threaded exception catcher: + # * push exception to inform other threads (if any) + # * unblock queues (automatically when calling push) + # * reraise + self.err_queue.push ( id ( self ), e ) + raise e finally: - self.startlock.release() + self._runlock.release() + # --- end of _run_resolver (...) --- - if self._depqueue.empty(): - self.runlock.release() - return + def _thread_run_resolver ( self ): + raise Exception ( "method stub" ) + # --- end of _thread_run_resolver (...) --- - if DependencyResolver.NUMTHREADS > 0: - # no need to wait for the old thread - # FIXME: could remove the following block - if self._mainthread is not None: - self._mainthread.join() - del self._mainthread + def enqueue ( self, dep_env, channel_id ): + """Adds a DepEnv to the queue of deps to resolve. - self._mainthread = threading.Thread ( target=self._thread_run_main ) - self._mainthread.start() + arguments: + * dep_env -- to add + * channel_id -- identifier of the channel associated with the dep_env - else: - self._thread_run_main() + returns: None (implicit) + """ + self._depqueue.put ( ( channel_id, dep_env ) ) + + # --- end of enqueue (...) --- + + def close ( self ): + if self._jobs > 0: + if self._mainthread: + self._mainthread.join() + for lis in self.listeners: lis.close() + del self.listeners + if SAFE_CHANNEL_IDS: + self.logger.debug ( + "{} channels were in use.".format ( len ( self.all_channel_ids ) ) + ) + # --- end of close (...) --- - # self.runlock is released when _thread_run_main is done - # --- end of start (...) --- def _thread_run_main ( self ): """Tells the resolver to run.""" + raise Exception ( "to be removed" ) jobcount = self.__class__.NUMTHREADS try: @@ -351,26 +479,7 @@ class DependencyResolver ( object ): # iterate over _depqueue_failed and report unresolved ## todo can thread this - while not self._depqueue_failed.empty() and self.err_queue.empty: - try: - channel_id, dep_env = self._depqueue_failed.get_nowait() - - except queue.Empty: - # race cond empty() <-> get_nowait() - return - - - dep_env.set_unresolvable() - - self._report_event ( 'UNRESOLVABLE', dep_env ) - - - try: - if channel_id in self._depqueue_done: - self._depqueue_done [channel_id].put_nowait ( dep_env ) - except KeyError: - # channel has been closed before calling put, ignore this - pass + self._process_unresolvable_queue() if not self.err_queue.really_empty(): self.err_queue.unblock_queues() @@ -384,7 +493,7 @@ class DependencyResolver ( object ): finally: # release the lock - self.runlock.release() + self._runlock.release() # --- end of _thread_run_main (...) --- @@ -393,100 +502,18 @@ class DependencyResolver ( object ): returns: None (implicit) """ + raise Exception ( "to be removed" ) try: while self.err_queue.empty and not self._depqueue.empty(): try: to_resolve = self._depqueue.get_nowait() + self._process_dep ( queue_item=to_resolve ) + self._depqueue.task_done() except queue.Empty: # this thread is done when the queue is empty, so this is # no error, but just the result of the race condition between # queue.empty() and queue.get(False) return - - channel_id, dep_env = to_resolve - - if channel_id in self._depqueue_done: - # else channel has been closed, drop dep - - self.logger.debug ( - "Trying to resolve '%s'." % dep_env.dep_str - ) - - #have_new_rule = False - - resolved = None - # resolved can be None, so use a tri-state int for checking - # 0 -> unresolved, but resolvable - # 1 -> unresolved and (currently, new rules may change this) - # not resolvable - # 2 -> resolved - is_resolved = 0 - - # TODO: - # (threading: could search the pools in parallel) - - if USING_DEPRES_CACHE: - if dep_env.dep_str_low in self._dep_unresolvable: - # cannot resolve - is_resolved = 1 - - if is_resolved == 0: - # search for a match in the rule pools that accept - # the dep type - for rulepool in ( p for p in self.static_rule_pools \ - if p.deptype_mask & dep_env.deptype_mask - ): - result = rulepool.matches ( dep_env ) - if not result is None and result [0] > 0: - resolved = result [1] - is_resolved = 2 - break - - if is_resolved == 0 and \ - dep_env.deptype_mask & deptype.try_other \ - : - # search for a match in the rule pools that (normally) - # don't accept the dep type - for rulepool in ( p for p in self.static_rule_pools \ - if p.deptype_mask & ~dep_env.deptype_mask - ): - result = rulepool.matches ( dep_env ) - if not result is None and result [0] > 0: - resolved = result [1] - is_resolved = 2 - break - - - - - - if is_resolved == 2: - dep_env.set_resolved ( resolved, append=False ) - self._report_event ( 'RESOLVED', dep_env ) - try: - self._depqueue_done [channel_id].put ( dep_env ) - except KeyError: - # channel gone while resolving - pass - else: - self._depqueue_failed.put ( to_resolve ) - - if USING_DEPRES_CACHE: - # does not work when adding new rules is possible - self._dep_unresolvable.add ( dep_env.dep_str_low ) - - """ - ## only useful if new rules can be created - # new rule found, requeue all previously - # failed dependency searches - if have_new_rule: - self._queue_previously_failed - if USING_DEPRES_CACHE: - self._dep_unresolvable.clear() #? - """ - # --- end if channel_id in self._depqueue_done - - self._depqueue.task_done() # --- end while except ( Exception, KeyboardInterrupt ) as e: @@ -498,27 +525,3 @@ class DependencyResolver ( object ): # --- end of _thread_run_resolve (...) --- - - def enqueue ( self, dep_env, channel_id ): - """Adds a DepEnv to the queue of deps to resolve. - - arguments: - * dep_env -- to add - * channel_id -- identifier of the channel associated with the dep_env - - returns: None (implicit) - """ - self._depqueue.put ( ( channel_id, dep_env ) ) - - # --- end of enqueue (...) --- - - def close ( self ): - if isinstance ( self._mainthread, threading.Thread ): - self._mainthread.join() - for lis in self.listeners: lis.close() - del self.listeners - if SAFE_CHANNEL_IDS: - self.logger.debug ( - "%i channels were in use." % len ( self.all_channel_ids ) - ) - # --- end of close (...) --- |