aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndré Erdmann <dywi@mailerd.de>2012-07-13 12:39:41 +0200
committerAndré Erdmann <dywi@mailerd.de>2012-07-13 12:39:41 +0200
commitbedc1a5d0029f17c255422310bd5ef98aa7d3fb8 (patch)
tree07bd5ea25608ef5ef2c03f45173b6bf863ffb7b0 /roverlay
parentreturn negative score if not resolved (diff)
downloadR_overlay-bedc1a5d0029f17c255422310bd5ef98aa7d3fb8.tar.gz
R_overlay-bedc1a5d0029f17c255422310bd5ef98aa7d3fb8.tar.bz2
R_overlay-bedc1a5d0029f17c255422310bd5ef98aa7d3fb8.zip
depresolver: separate run method for threads
the run-method code for the depresolver is being split up into a threaded and not-threaded variant, to increase readability, stability and performance. * threaded resolving temporarily removed
Diffstat (limited to 'roverlay')
-rw-r--r--roverlay/depres/depresolver.py355
1 files changed, 179 insertions, 176 deletions
diff --git a/roverlay/depres/depresolver.py b/roverlay/depres/depresolver.py
index bbe58b4..5f85ad1 100644
--- a/roverlay/depres/depresolver.py
+++ b/roverlay/depres/depresolver.py
@@ -16,10 +16,8 @@ from roverlay import config
from roverlay.depres import communication, deptype, events
#from roverlay.depres import simpledeprule
-#from roverlay.depres.depenv import DepEnv (implicit)
-
-# if false: do not using the "negative" result caching which stores
+# if false: do not use the "negative" result caching which stores
# unresolvable deps in a set for should-be faster lookups
USING_DEPRES_CACHE = True
@@ -29,9 +27,6 @@ SAFE_CHANNEL_IDS = True
class DependencyResolver ( object ):
"""Main object for dependency resolution."""
-
- NUMTHREADS = config.get ( "DEPRES.jobcount", 0 )
-
def __init__ ( self, err_queue ):
"""Initializes a DependencyResolver."""
@@ -44,16 +39,17 @@ class DependencyResolver ( object ):
'RESOLVED', 'UNRESOLVABLE'
)
- # this lock tells whether a dep res 'master' thread is running (locked)
- self.runlock = threading.Lock()
- self.startlock = threading.Lock()
- # the dep res main thread
- self._mainthread = None
- # the dep res worker threads
- self._threads = None
+ self._jobs = config.get ( "DEPRES.jobcount", 0 )
- self.err_queue = err_queue
+ # used to lock the run methods,
+ self._runlock = threading.Lock()
+ if self._jobs > 0:
+ # the dep res main thread
+ self._mainthread = None
+
+
+ self.err_queue = err_queue
# the list of registered listener modules
self.listeners = list ()
@@ -139,13 +135,13 @@ class DependencyResolver ( object ):
# log this event
if event_type == events.DEPRES_EVENTS ['RESOLVED']:
self.logger_resolved.info (
- "'%s' as '%s'" % ( dep_env.dep_str, dep_env.resolved_by )
+ "{!r} as {!r}".format ( dep_env.dep_str, dep_env.resolved_by )
)
elif event_type == events.DEPRES_EVENTS ['UNRESOLVABLE']:
- self.logger_unresolvable.info ( "'%s'" % dep_env.dep_str )
+ self.logger_unresolvable.info ( "{!r}".format ( dep_env.dep_str ) )
else:
# "generic" event, expects that kw msg is set
- self.logger.debug ( "event %s : %s" % ( event, msg ) )
+ self.logger.debug ( "event {}: {}".format ( event, msg ) )
# --- if
if self.listenermask & event_type:
@@ -274,48 +270,180 @@ class DependencyResolver ( object ):
# --- end of _queue_previously_failed (...) ---
def start ( self ):
- # -- verify whether resolver has to be started
- if self._depqueue.empty():
- # nothing to resolve
- return
+ if self._jobs == 0:
+ if not self._depqueue.empty():
+ self._run_resolver()
+ if not self.err_queue.really_empty():
+ self.err_queue.unblock_queues()
+ else:
+ # new resolver threads run async and
+ # can be started with an empty depqueue
+ if self._runlock.acquire ( False ):
+ # else resolver is running
+
+ self._mainthread = threading.Thread (
+ target=self._thread_run_resolver
+ )
+ self._mainthread.start()
+ # _thread_run_resolver has to release the lock when done
+ # --- end of start (...) ---
+
+ def _process_unresolvable_queue ( self ):
+ # iterate over _depqueue_failed and report unresolved
+ while not self._depqueue_failed.empty() and self.err_queue.empty:
+ try:
+ channel_id, dep_env = self._depqueue_failed.get_nowait()
+ except queue.Empty:
+ # race cond empty() <-> get_nowait()
+ return
+
+ dep_env.set_unresolvable()
+ self._report_event ( 'UNRESOLVABLE', dep_env )
+
+ try:
+ if channel_id in self._depqueue_done:
+ self._depqueue_done [channel_id].put_nowait ( dep_env )
+ except KeyError:
+ # channel has been closed before calling put, ignore this
+ pass
+ # --- end of _process_unresolvable_queue (...) ---
+
+ def _process_dep ( self, queue_item ):
+ channel_id, dep_env = queue_item
+
+ # drop dep if channel closed
+ if not channel_id in self._depqueue_done: return
+
+ self.logger.debug (
+ "Trying to resolve {!r}.".format ( dep_env.dep_str )
+ )
- if not self.startlock.acquire ( False ):
- # another channel/.. is starting the resolver
- return
- elif self._depqueue.empty():
- self.startlock.release()
- return
+ resolved = None
+ # resolved can be None, so use a tri-state int for checking
+ # 0 -> unresolved, but resolvable
+ # 1 -> unresolved and (currently, new rules may change this)
+ # not resolvable
+ # 2 -> resolved
+ is_resolved = 0
- # -- verify...
+ if USING_DEPRES_CACHE and dep_env.dep_str_low in self._dep_unresolvable:
+ # cannot resolve
+ is_resolved = 1
- # acquire the run lock (that locks _run_main)
+ else:
+ # search for a match in the rule pools that accept the dep type
+ for rulepool in (
+ p for p in self.static_rule_pools \
+ if p.deptype_mask & dep_env.deptype_mask
+ ):
+ result = rulepool.matches ( dep_env )
+ if result [0] > 0:
+ resolved = result [1]
+ is_resolved = 2
+ break
+
+ if is_resolved == 0 and dep_env.deptype_mask & deptype.try_other:
+ ## TRY_OTHER bit is set
+ # search for a match in the rule pools
+ # that (normally) don't accept the dep type
+ for rulepool in (
+ p for p in self.static_rule_pools \
+ if p.deptype_mask & ~dep_env.deptype_mask
+ ):
+ result = rulepool.matches ( dep_env )
+ if result [0] > 0:
+ resolved = result [1]
+ is_resolved = 2
+ break
+ # --
+
+
+ # -- done with resolving
+
+ if is_resolved != 2:
+ # could not resolve dep_env
+ self._depqueue_failed.put ( queue_item )
+ if USING_DEPRES_CACHE:
+ # does not work when adding new rules is possible
+ self._dep_unresolvable.add ( dep_env.dep_str_low )
+ else:
+ # successfully resolved
+ dep_env.set_resolved ( resolved, append=False )
+ self._report_event ( 'RESOLVED', dep_env )
+ try:
+ self._depqueue_done [channel_id].put ( dep_env )
+ except KeyError:
+ # channel gone while resolving
+ pass
+
+ """
+ ## only useful if new rules can be created
+ # new rule found, requeue all previously
+ # failed dependency searches
+ if have_new_rule:
+ self._queue_previously_failed
+ if USING_DEPRES_CACHE:
+ self._dep_unresolvable.clear() #?
+ """
+ # --- end of _process_dep (...) ---
+
+ def _run_resolver ( self ):
+ # single-threaded variant of run
+ # still checking err_queue 'cause other modules
+ # could be run with threads
+ if self._depqueue.empty(): return
try:
- self.runlock.acquire()
+ self._runlock.acquire()
+ while not self._depqueue.empty() and self.err_queue.empty:
+ to_resolve = self._depqueue.get_nowait()
+ self._process_dep ( queue_item=to_resolve )
+ self._depqueue.task_done()
+
+ self._process_unresolvable_queue()
+ except ( Exception, KeyboardInterrupt ) as e:
+ # single-threaded exception catcher:
+ # * push exception to inform other threads (if any)
+ # * unblock queues (automatically when calling push)
+ # * reraise
+ self.err_queue.push ( id ( self ), e )
+ raise e
finally:
- self.startlock.release()
+ self._runlock.release()
+ # --- end of _run_resolver (...) ---
- if self._depqueue.empty():
- self.runlock.release()
- return
+ def _thread_run_resolver ( self ):
+ raise Exception ( "method stub" )
+ # --- end of _thread_run_resolver (...) ---
- if DependencyResolver.NUMTHREADS > 0:
- # no need to wait for the old thread
- # FIXME: could remove the following block
- if self._mainthread is not None:
- self._mainthread.join()
- del self._mainthread
+ def enqueue ( self, dep_env, channel_id ):
+ """Adds a DepEnv to the queue of deps to resolve.
- self._mainthread = threading.Thread ( target=self._thread_run_main )
- self._mainthread.start()
+ arguments:
+ * dep_env -- to add
+ * channel_id -- identifier of the channel associated with the dep_env
- else:
- self._thread_run_main()
+ returns: None (implicit)
+ """
+ self._depqueue.put ( ( channel_id, dep_env ) )
+
+ # --- end of enqueue (...) ---
+
+ def close ( self ):
+ if self._jobs > 0:
+ if self._mainthread:
+ self._mainthread.join()
+ for lis in self.listeners: lis.close()
+ del self.listeners
+ if SAFE_CHANNEL_IDS:
+ self.logger.debug (
+ "{} channels were in use.".format ( len ( self.all_channel_ids ) )
+ )
+ # --- end of close (...) ---
- # self.runlock is released when _thread_run_main is done
- # --- end of start (...) ---
def _thread_run_main ( self ):
"""Tells the resolver to run."""
+ raise Exception ( "to be removed" )
jobcount = self.__class__.NUMTHREADS
try:
@@ -351,26 +479,7 @@ class DependencyResolver ( object ):
# iterate over _depqueue_failed and report unresolved
## todo can thread this
- while not self._depqueue_failed.empty() and self.err_queue.empty:
- try:
- channel_id, dep_env = self._depqueue_failed.get_nowait()
-
- except queue.Empty:
- # race cond empty() <-> get_nowait()
- return
-
-
- dep_env.set_unresolvable()
-
- self._report_event ( 'UNRESOLVABLE', dep_env )
-
-
- try:
- if channel_id in self._depqueue_done:
- self._depqueue_done [channel_id].put_nowait ( dep_env )
- except KeyError:
- # channel has been closed before calling put, ignore this
- pass
+ self._process_unresolvable_queue()
if not self.err_queue.really_empty():
self.err_queue.unblock_queues()
@@ -384,7 +493,7 @@ class DependencyResolver ( object ):
finally:
# release the lock
- self.runlock.release()
+ self._runlock.release()
# --- end of _thread_run_main (...) ---
@@ -393,100 +502,18 @@ class DependencyResolver ( object ):
returns: None (implicit)
"""
+ raise Exception ( "to be removed" )
try:
while self.err_queue.empty and not self._depqueue.empty():
try:
to_resolve = self._depqueue.get_nowait()
+ self._process_dep ( queue_item=to_resolve )
+ self._depqueue.task_done()
except queue.Empty:
# this thread is done when the queue is empty, so this is
# no error, but just the result of the race condition between
# queue.empty() and queue.get(False)
return
-
- channel_id, dep_env = to_resolve
-
- if channel_id in self._depqueue_done:
- # else channel has been closed, drop dep
-
- self.logger.debug (
- "Trying to resolve '%s'." % dep_env.dep_str
- )
-
- #have_new_rule = False
-
- resolved = None
- # resolved can be None, so use a tri-state int for checking
- # 0 -> unresolved, but resolvable
- # 1 -> unresolved and (currently, new rules may change this)
- # not resolvable
- # 2 -> resolved
- is_resolved = 0
-
- # TODO:
- # (threading: could search the pools in parallel)
-
- if USING_DEPRES_CACHE:
- if dep_env.dep_str_low in self._dep_unresolvable:
- # cannot resolve
- is_resolved = 1
-
- if is_resolved == 0:
- # search for a match in the rule pools that accept
- # the dep type
- for rulepool in ( p for p in self.static_rule_pools \
- if p.deptype_mask & dep_env.deptype_mask
- ):
- result = rulepool.matches ( dep_env )
- if not result is None and result [0] > 0:
- resolved = result [1]
- is_resolved = 2
- break
-
- if is_resolved == 0 and \
- dep_env.deptype_mask & deptype.try_other \
- :
- # search for a match in the rule pools that (normally)
- # don't accept the dep type
- for rulepool in ( p for p in self.static_rule_pools \
- if p.deptype_mask & ~dep_env.deptype_mask
- ):
- result = rulepool.matches ( dep_env )
- if not result is None and result [0] > 0:
- resolved = result [1]
- is_resolved = 2
- break
-
-
-
-
-
- if is_resolved == 2:
- dep_env.set_resolved ( resolved, append=False )
- self._report_event ( 'RESOLVED', dep_env )
- try:
- self._depqueue_done [channel_id].put ( dep_env )
- except KeyError:
- # channel gone while resolving
- pass
- else:
- self._depqueue_failed.put ( to_resolve )
-
- if USING_DEPRES_CACHE:
- # does not work when adding new rules is possible
- self._dep_unresolvable.add ( dep_env.dep_str_low )
-
- """
- ## only useful if new rules can be created
- # new rule found, requeue all previously
- # failed dependency searches
- if have_new_rule:
- self._queue_previously_failed
- if USING_DEPRES_CACHE:
- self._dep_unresolvable.clear() #?
- """
- # --- end if channel_id in self._depqueue_done
-
- self._depqueue.task_done()
# --- end while
except ( Exception, KeyboardInterrupt ) as e:
@@ -498,27 +525,3 @@ class DependencyResolver ( object ):
# --- end of _thread_run_resolve (...) ---
-
- def enqueue ( self, dep_env, channel_id ):
- """Adds a DepEnv to the queue of deps to resolve.
-
- arguments:
- * dep_env -- to add
- * channel_id -- identifier of the channel associated with the dep_env
-
- returns: None (implicit)
- """
- self._depqueue.put ( ( channel_id, dep_env ) )
-
- # --- end of enqueue (...) ---
-
- def close ( self ):
- if isinstance ( self._mainthread, threading.Thread ):
- self._mainthread.join()
- for lis in self.listeners: lis.close()
- del self.listeners
- if SAFE_CHANNEL_IDS:
- self.logger.debug (
- "%i channels were in use." % len ( self.all_channel_ids )
- )
- # --- end of close (...) ---