#!/usr/bin/python """Analysis module""" import os import re import time from portage import _encodings, _unicode_encode from portage.output import bold, blue, yellow, green from .stuff import scan from .collect import ( prepare_search_dirs, parse_revdep_config, collect_libraries_from_dir, collect_binaries_from_dir, ) from .assign import assign_packages from .cache import save_cache current_milli_time = lambda: int(round(time.time() * 1000)) def scan_files(libs_and_bins, cmd_max_args, logger, searchbits): """Calls stuff.scan() and processes the data into a dictionary of scanned files information. @param libs_and_bins: set of libraries and binaries to scan for lib links. @param cmd_max_args: maximum number of files to pass into scanelf calls. @param logger: python style Logging function to use for output. @returns dict: {bit_length: {soname: {filename: set(needed)}}} """ stime = current_milli_time() scanned_files = {} # {bits: {soname: (filename, needed), ...}, ...} lines = scan(["-BF", "%F;%f;%S;%n;%M"], libs_and_bins, cmd_max_args, logger) ftime = current_milli_time() logger.debug( "\tscan_files(); total time to get scanelf data is " "%d milliseconds" % (ftime - stime) ) stime = current_milli_time() count = 0 for line in lines: parts = line.split(";") if len(parts) != 5: logger.error("\tscan_files(); error processing lib: %s" % line) logger.error("\tscan_files(); parts = %s" % str(parts)) continue filename, sfilename, soname, needed, bits = parts filename = os.path.realpath(filename) needed = needed.split(",") bits = bits[8:] # 8: -> strlen('ELFCLASS') if bits not in searchbits: continue if not soname: soname = sfilename if bits not in scanned_files: scanned_files[bits] = {} if soname not in scanned_files[bits]: scanned_files[bits][soname] = {} if filename not in scanned_files[bits][soname]: scanned_files[bits][soname][filename] = set(needed) count += 1 else: scanned_files[bits][soname][filename].update(needed) ftime = current_milli_time() logger.debug( "\tscan_files(); total filenames found: %d in %d milliseconds" % (count, ftime - stime) ) return scanned_files def extract_dependencies_from_la(la, libraries, to_check, logger): broken = [] libnames = [] for lib in libraries: match = re.match(r".+\/(.+)\.(so|la|a)(\..+)?", lib) if match is not None: libname = match.group(1) if libname not in libnames: libnames += [ libname, ] for _file in la: if not os.path.exists(_file): continue for line in open( _unicode_encode(_file, encoding=_encodings["fs"]), mode="r", encoding=_encodings["content"], ).readlines(): line = line.strip() if line.startswith("dependency_libs="): match = re.match(r"dependency_libs='([^']+)'", line) if match is not None: for el in match.group(1).split(" "): el = el.strip() if len(el) < 1 or el.startswith("-L") or el.startswith("-R"): continue if el.startswith("-l") and "lib" + el[2:] in libnames: pass elif el in la or el in libraries: pass else: if to_check: _break = False for tc in to_check: if tc in el: _break = True break if not _break: continue logger.info( "\t" + yellow(" * ") + _file + " is broken (requires: " + bold(el) + ")" ) broken.append(_file) return broken class LibCheck: def __init__( self, scanned_files, logger, searchlibs=None, searchbits=None, all_masks=None, masked_dirs=None, ): """LibCheck init function. @param scanned_files: optional dictionary if the type created by scan_files(). Defaults to the class instance of scanned_files @param logger: python style Logging function to use for output. @param searchlibs: optional set() of libraries to search for. If defined it toggles several settings to configure this class for a target search rather than a broken libs search. """ self.scanned_files = scanned_files self.logger = logger self.searchlibs = searchlibs self.searchbits = sorted(searchbits) or ["32", "64"] self.all_masks = all_masks self.masked_dirs = masked_dirs self.logger.debug( "\tLibCheck.__init__(), new searchlibs: %s" % (self.searchbits) ) if searchlibs: self.smsg = "\tLibCheck.search(), Checking for %s bit dependants" self.pmsg = yellow(" * ") + "Files that depend on: %s (%s bits)" self.setlibs = self._setslibs self.check = self._checkforlib else: self.smsg = "\tLibCheck.search(), Checking for broken %s bit libs" self.pmsg = ( green(" * ") + bold("Broken files that require:") + " %s (%s bits)" ) self.setlibs = self._setlibs self.check = self._checkbroken self.sfmsg = "\tLibCheck.search(); Total found: %(count)d libs, %(deps)d files in %(time)d milliseconds" self.alllibs = None def _setslibs(self, l, b): """Internal function. Use the class's setlibs variable""" sonames = [] for s in self.searchlibs: if s in self.scanned_files[b].keys(): sonames.append(s) continue found_partial = [a for a in self.scanned_files[b].keys() if s in a] if found_partial: sonames += found_partial continue for k, v in self.scanned_files[b].items(): for vv in v.keys(): if s in vv: sonames.append(k) break self.alllibs = "|".join(sonames) + "|" self.logger.debug("\tLibCheck._setslibs(), new alllibs: %s" % (self.alllibs)) def _setlibs(self, l, b): """Internal function. Use the class's setlibs variable""" self.alllibs = "|".join(l) + "|" def _checkforlib(self, l): """Internal function. Use the class's check variable""" if l: return l + "|" in self.alllibs return False def _checkbroken(self, l): """Internal function. Use the class's check variable""" if l: return l + "|" not in self.alllibs return False def search(self, scanned_files=None): """Searches the scanned files for broken lib links or for libs to search for @param scanned_files: optional dictionary of the type created by scan_files(). Defaults to the class instance of scanned_files @ returns: dict: {bit_length: {found_lib: set(file_paths)}}. """ stime = current_milli_time() count = 0 fcount = 0 if not scanned_files: scanned_files = self.scanned_files found_libs = {} for bits in self.searchbits: try: scanned = scanned_files[bits] except KeyError: self.logger.debug("There are no %s-bit libraries" % bits) continue self.logger.debug(self.smsg % bits) self.setlibs(sorted(scanned), bits) for soname, filepaths in scanned.items(): for filename, needed in filepaths.items(): for l in needed: if self.check(l): if l in self.all_masks: self.logger.debug( "\tLibrary %s ignored as it is masked" % l ) continue if ( filename in self.all_masks or os.path.realpath(filename) in self.all_masks or self.is_masked(os.path.realpath(filename)) ): self.logger.debug( "\tFile %s ignored as it is masked" % filename ) continue if not bits in found_libs: found_libs[bits] = {} try: found_libs[bits][l].add(filename) except KeyError: found_libs[bits][l] = set([filename]) count += 1 fcount += 1 self.logger.debug( "\tLibCheck.search(); FOUND:" " %sbit, %s, %s" % (bits, l, filename) ) ftime = current_milli_time() self.logger.debug( self.sfmsg % {"count": count, "deps": fcount, "time": ftime - stime} ) return found_libs def is_masked(self, filename): for m in self.masked_dirs: t = os.path.realpath(m).split(os.sep) f = filename.split(os.sep) # self.logger.debug("\tis_masked(); %s, %s" % (t, f)) if t == f[: min(len(t), len(f))]: return True return False def process_results(self, found_libs, scanned_files=None): """Processes the search results, logs the files found @param found_libs: dictionary of the type returned by search() @param scanned_files: optional dictionary if the type created by scan_files(). Defaults to the class instance of scanned_files @ returns: list: of filepaths from teh search results. """ stime = current_milli_time() if not scanned_files: scanned_files = self.scanned_files found_pathes = [] for bits, found in found_libs.items(): for lib, files in found.items(): self.logger.info(self.pmsg % (bold(lib), bits)) for fp in sorted(files): self.logger.info("\t" + yellow("* ") + fp) found_pathes.append(fp) ftime = current_milli_time() self.logger.debug( "\tLibCheck.process_results(); total filepaths found: " "%d in %d milliseconds" % (len(found_pathes), ftime - stime) ) return found_pathes def analyse( settings, logger, libraries=None, la_libraries=None, libraries_links=None, binaries=None, _libs_to_check=None, ): """Main program body. It will collect all info and determine the pkgs needing rebuilding. @param logger: logger used for logging messages, instance of logging.Logger class. Can be logging (RootLogger). @param _libs_to_check Libraries that need to be checked only @rtype list: list of pkgs that need rebuilding """ searchbits = set() """if _libs_to_check: for lib in _libs_to_check: if "lib64" in lib: searchbits.add('64') elif "lib32" in lib: searchbits.add('32') else: _libs_to_check = set()""" searchbits.update(["64", "32"]) masked_dirs, masked_files, ld = parse_revdep_config(settings["REVDEP_CONFDIR"]) masked_dirs.update( [ "/lib/modules", "/lib32/modules", "/lib64/modules", ] ) if "64" not in searchbits: masked_dirs.update(["/lib64", "/usr/lib64"]) elif "32" not in searchbits: masked_dirs.update(["/lib32", "/usr/lib32"]) all_masks = masked_dirs.copy() all_masks.update(masked_files) logger.debug("\tall_masks:") for x in sorted(all_masks): logger.debug("\t\t%s" % (x)) if libraries and la_libraries and libraries_links and binaries: logger.info( blue(" * ") + bold("Found a valid cache, skipping collecting phase") ) else: # TODO: add partial cache (for ex. only libraries) # when found for some reason stime = current_milli_time() logger.warning(green(" * ") + bold("Collecting system binaries and libraries")) bin_dirs, lib_dirs = prepare_search_dirs(logger, settings) lib_dirs.update(ld) bin_dirs.update(ld) logger.debug("\tanalyse(), bin directories:") for x in sorted(bin_dirs): logger.debug("\t\t%s" % (x)) logger.debug("\tanalyse(), lib directories:") for x in sorted(lib_dirs): logger.debug("\t\t%s" % (x)) logger.debug("\tanalyse(), masked directories:") for x in sorted(masked_dirs): logger.debug("\t\t%s" % (x)) logger.debug("\tanalyse(), masked files:") for x in sorted(masked_files): logger.debug("\t\t%s" % (x)) ftime = current_milli_time() logger.debug("\ttime to complete task: %d milliseconds" % (ftime - stime)) stime = current_milli_time() logger.info(green(" * ") + bold("Collecting dynamic linking informations")) libraries, la_libraries, libraries_links = collect_libraries_from_dir( lib_dirs, all_masks, logger ) binaries = collect_binaries_from_dir(bin_dirs, all_masks, logger) ftime = current_milli_time() logger.debug("\ttime to complete task: %d milliseconds" % (ftime - stime)) if settings["USE_TMP_FILES"]: save_cache( logger=logger, to_save={ "libraries": libraries, "la_libraries": la_libraries, "libraries_links": libraries_links, "binaries": binaries, }, temp_path=settings["DEFAULT_TMP_DIR"], ) logger.debug( "\tanalyse(), Found %i libraries (+%i symlinks) and %i binaries" % (len(libraries), len(libraries_links), len(binaries)) ) logger.info(green(" * ") + bold("Scanning files")) libs_and_bins = libraries.union(binaries) scanned_files = scan_files( libs_and_bins, settings["CMD_MAX_ARGS"], logger, searchbits ) logger.warning(green(" * ") + bold("Checking dynamic linking consistency")) logger.debug( "\tanalyse(), Searching for %i libs, bins within %i libraries and links" % (len(libs_and_bins), len(libraries) + len(libraries_links)) ) libcheck = LibCheck( scanned_files, logger, _libs_to_check, searchbits, all_masks, masked_dirs ) broken_pathes = libcheck.process_results(libcheck.search()) broken_la = extract_dependencies_from_la( la_libraries, libraries.union(libraries_links), _libs_to_check, logger ) broken_pathes += broken_la if broken_pathes: logger.warning(green(" * ") + bold("Assign files to packages")) return assign_packages(broken_pathes, logger, settings) return None, None # no need to assign anything if __name__ == "__main__": print("This script shouldn't be called directly")