# Distributed under the terms of the GNU General Public License v2 import portage from portage.dbapi._expand_new_virt import expand_new_virt from portage import os import subprocess import re class PortageUtils: """ class for accessing the portage api """ def __init__(self, settings): """ test """ self.settings=settings self.vartree=portage.vartree(settings=settings) self.vardbapi=portage.vardbapi(settings=settings, vartree=self.vartree) self.portdbapi=portage.portdbapi(mysettings=settings) self.metadata_keys = [k for k in portage.auxdbkeys if not k.startswith("UNUSED_")] self.use=self.settings["USE"] def get_best_visible_pkg(self,pkg): """ Gets best candidate on installing. Returns empty string if no found :param pkg: package name """ try: return self.portdbapi.xmatch("bestmatch-visible", pkg) except: return '' # non-recursive dependency getter def get_dep(self,pkg,dep_type=["RDEPEND","DEPEND"]): """ Gets current dependencies of a package. Looks in portage db :param pkg: name of package :param dep_type: type of dependencies to recurse. Can be ["DEPEND"] or ["RDEPEND", "DEPEND"] :returns: **set** of packages names """ ret=set() pkg = self.get_best_visible_pkg(pkg) if not pkg: return ret # we found the best visible match in common tree metadata = dict(zip(self.metadata_keys, self.portdbapi.aux_get(pkg, self.metadata_keys))) dep_str = " ".join(metadata[k] for k in dep_type) # the IUSE default are very important for us iuse_defaults=[ u[1:] for u in metadata.get("IUSE",'').split() if u.startswith("+")] use=self.use.split() for u in iuse_defaults: if u not in use: use.append(u) success, atoms = portage.dep_check(dep_str, None, self.settings, myuse=use, myroot=self.settings["ROOT"], trees={self.settings["ROOT"]:{"vartree":self.vartree, "porttree": self.vartree}}) if not success: return ret for atom in atoms: atomname = self.vartree.dep_bestmatch(atom) if not atomname: continue for unvirt_pkg in expand_new_virt(self.vardbapi,'='+atomname): for pkg in self.vartree.dep_match(unvirt_pkg): ret.add(pkg) return ret # recursive dependency getter def get_deps(self,pkg,dep_type=["RDEPEND","DEPEND"]): """ Gets current dependencies of a package on any depth All dependencies **must** be installed :param pkg: name of package :param dep_type: type of dependencies to recurse. Can be ["DEPEND"] or ["RDEPEND", "DEPEND"] :returns: **set** of packages names """ ret=set() # get porttree dependencies on the first package pkg = self.portdbapi.xmatch("bestmatch-visible", pkg) if not pkg: return ret known_packages=set() unknown_packages=self.get_dep(pkg,dep_type) ret=ret.union(unknown_packages) while unknown_packages: p=unknown_packages.pop() if p in known_packages: continue known_packages.add(p) metadata = dict(zip(self.metadata_keys, self.vardbapi.aux_get(p, self.metadata_keys))) dep_str = " ".join(metadata[k] for k in dep_type) # the IUSE default are very important for us iuse_defaults=[ u[1:] for u in metadata.get("IUSE",'').split() if u.startswith("+")] use=self.use.split() for u in iuse_defaults: if u not in use: use.append(u) success, atoms = portage.dep_check(dep_str, None, self.settings, myuse=use, myroot=self.settings["ROOT"], trees={self.settings["ROOT"]:{"vartree":self.vartree,"porttree": self.vartree}}) if not success: continue for atom in atoms: atomname = self.vartree.dep_bestmatch(atom) if not atomname: continue for unvirt_pkg in expand_new_virt(self.vardbapi,'='+atomname): for pkg in self.vartree.dep_match(unvirt_pkg): ret.add(pkg) unknown_packages.add(pkg) return ret def get_deps_for_package_building(self, pkg): """ returns buildtime dependencies of current package and all runtime dependencies of that buildtime dependencies """ buildtime_deps=self.get_dep(pkg, ["DEPEND"]) runtime_deps=set() for dep in buildtime_deps: runtime_deps=runtime_deps.union(self.get_deps(dep,["RDEPEND"])) ret=buildtime_deps.union(runtime_deps) return ret def get_system_packages_list(self): """ returns all packages from system set. They are always implicit dependencies :returns: **list** of package names """ ret=[] for atom in self.settings.packages: for pre_pkg in self.vartree.dep_match(atom): for unvirt_pkg in expand_new_virt(self.vardbapi,'='+pre_pkg): for pkg in self.vartree.dep_match(unvirt_pkg): ret.append(pkg) return ret class GentoolkitUtils: """ Interface with qfile and qlist utils. They are much faster than internals. """ def getpackagesbyfiles(self,files): """ :param files: list of filenames :returns: **dictionary** file->package, if file doesn't belong to any package it not returned as key of this dictionary """ ret={} listtocheck=[] for f in files: if os.path.isdir(f): ret[f]="directory" else: listtocheck.append(f) try: proc=subprocess.Popen(['qfile']+['--nocolor','--exact','','--from','-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE, bufsize=4096) out,err=proc.communicate("\n".join(listtocheck).encode("utf8")) lines=out.decode("utf8").split("\n") #print lines line_re=re.compile(r"^([^ ]+)\s+\(([^)]+)\)$") for line in lines: if len(line)==0: continue match=line_re.match(line) if match: ret[match.group(2)]=match.group(1) else: portage.util.writemsg("Util qfile returned unparsable string: %s\n" % line) except OSError as e: portage.util.writemsg("Error while launching qfile: %s\n" % e) return ret def getfilesbypackages(self,packagenames): """ :param packagename: name of package :returns: **list** of files in package with name *packagename* """ ret=[] try: proc=subprocess.Popen(['qlist']+['--nocolor',"--obj"]+packagenames, stdout=subprocess.PIPE,stderr=subprocess.PIPE, bufsize=4096) out,err=proc.communicate() ret=out.decode("utf8").split("\n") if ret==['']: ret=[] except OSError as e: portage.util.writemsg("Error while launching qfile: %s\n" % e) return ret def get_all_packages_files(self): """ Memory-hungry operation :returns: **set** of all files that belongs to package """ ret=[] try: proc=subprocess.Popen(['qlist']+['--all',"--obj"], stdout=subprocess.PIPE,stderr=subprocess.PIPE, bufsize=4096) out,err=proc.communicate() ret=out.decode("utf8").split("\n") except OSError as e: portage.util.writemsg("Error while launching qfile: %s\n" % e) return set(ret) class FilterProcGenerator: def __init__(self, pkgname, settings): portageutils=PortageUtils(settings=settings) deps_all=portageutils.get_deps_for_package_building(pkgname) deps_portage=portageutils.get_dep('portage',["RDEPEND"]) system_packages=portageutils.get_system_packages_list() allfiles=GentoolkitUtils.get_all_packages_files() portage.util.writemsg("All files list recieved, waiting for " \ "a list of allowed files\n") allowedpkgs=system_packages+list(deps_portage)+list(deps_all) allowedfiles=GentoolkitUtils.getfilesbypackages(allowedpkgs) #for pkg in allowedpkgs: # allowedfiles+=GentoolkitUtils.getfilesbypackage(pkg) #import pdb; pdb.set_trace() # manually add all python interpreters to this list allowedfiles+=GentoolkitUtils.getfilesbypackages(['python']) allowedfiles=set(allowedfiles) deniedfiles=allfiles-allowedfiles def filter_proc(eventname,filename,stage): if filename in deniedfiles: return False return True self.filter_proc=filter_proc def get_filter_proc(self): return self.filter_proc class EventsAnalyser: def __init__(self, pkgname, events, settings): self.pkgname=pkgname self.events=events self.settings=settings self.portageutils=PortageUtils(settings=settings) self.deps_all=self.portageutils.get_deps_for_package_building(pkgname) self.deps_direct=self.portageutils.get_dep(pkgname,["DEPEND"]) self.deps_portage=self.portageutils.get_dep('portage',["RDEPEND"]) self.system_packages=self.portageutils.get_system_packages_list() # All analyse work is here # get unique filenames filenames=set() for stage in events: succ_events=set(events[stage][0]) fail_events=set(events[stage][1]) filenames=filenames.union(succ_events) filenames=filenames.union(fail_events) filenames=list(filenames) file_to_package=GentoolkitUtils.getpackagesbyfiles(filenames) # This part is completly unreadable. # It converting one complex struct(returned by getfsevents) to another complex # struct which good for generating output. # # Old struct is also used during output packagesinfo={} for stage in sorted(events): succ_events=events[stage][0] fail_events=events[stage][1] for filename in succ_events: if filename in file_to_package: package=file_to_package[filename] else: package="unknown" if not package in packagesinfo: packagesinfo[package]={} stageinfo=packagesinfo[package] if not stage in stageinfo: stageinfo[stage]={} filesinfo=stageinfo[stage] if not filename in filesinfo: filesinfo[filename]={"found":[],"notfound":[]} filesinfo[filename]["found"]=succ_events[filename] for filename in fail_events: if filename in file_to_package: package=file_to_package[filename] else: package="unknown" if not package in packagesinfo: packagesinfo[package]={} stageinfo=packagesinfo[package] if not stage in stageinfo: stageinfo[stage]={} filesinfo=stageinfo[stage] if not filename in filesinfo: filesinfo[filename]={"found":[],"notfound":[]} filesinfo[filename]["notfound"]=fail_events[filename] self.packagesinfo=packagesinfo def display(self): portage.util.writemsg( portage.output.colorize( "WARN", "\nFile access report for %s:\n" % self.pkgname)) stagesorder={"clean":1,"setup":2,"unpack":3,"prepare":4,"configure":5,"compile":6,"test":7, "install":8,"preinst":9,"postinst":10,"prerm":11,"postrm":12,"unknown":13} packagesinfo=self.packagesinfo # print information grouped by package for package in sorted(packagesinfo): # not showing special directory package if package=="directory": continue if package=="unknown": continue is_pkg_in_dep=package in self.deps_all is_pkg_in_portage_dep=package in self.deps_portage is_pkg_in_system=package in self.system_packages is_pkg_python="dev-lang/python" in package stages=[] for stage in sorted(packagesinfo[package].keys(), key=stagesorder.get): if stage!="unknown": stages.append(stage) if len(stages)==0: continue filenames={} for stage in stages: for filename in packagesinfo[package][stage]: if len(packagesinfo[package][stage][filename]["found"])!=0: was_readed,was_writed=packagesinfo[package][stage][filename]["found"] if not filename in filenames: filenames[filename]=['ok',was_readed,was_writed] else: status, old_was_readed, old_was_writed=filenames[filename] filenames[filename]=[ 'ok',old_was_readed | was_readed, old_was_writed | was_writed ] if len(packagesinfo[package][stage][filename]["notfound"])!=0: was_notfound,was_blocked=packagesinfo[package][stage][filename]["notfound"] if not filename in filenames: filenames[filename]=['err',was_notfound,was_blocked] else: status, old_was_notfound, old_was_blocked=filenames[filename] filenames[filename]=[ 'err',old_was_notfound | was_notfound, old_was_blocked | was_blocked ] if is_pkg_in_dep: portage.util.writemsg("[OK]") elif is_pkg_in_system: portage.util.writemsg("[SYSTEM]") elif is_pkg_in_portage_dep: portage.util.writemsg("[PORTAGE DEP]") elif is_pkg_python: portage.util.writemsg("[INTERPRETER]") elif not self.is_package_useful(package,stages,filenames.keys()): portage.util.writemsg("[LIKELY OK]") else: portage.util.writemsg(portage.output.colorize("BAD", "[NOT IN DEPS]")) # show information about accessed files portage.util.writemsg(" %-40s: %s\n" % (package,stages)) # this is here for readability action={ ('ok',False,False):"accessed", ('ok',True,False):"readed", ('ok',False,True):"writed", ('ok',True,True):"readed and writed", ('err',False,False):"other error", ('err',True,False):"not found", ('err',False,True):"blocked", ('err',True,True):"not found and blocked" } filescounter=0 for filename in filenames: event_info=tuple(filenames[filename]) portage.util.writemsg(" %-56s %-21s\n" % (filename,action[event_info])) filescounter+=1 if filescounter>10: portage.util.writemsg(" ... and %d more ...\n" % (len(filenames)-10)) break # ... and one more check. Making sure that direct build time # dependencies were accessed #import pdb; pdb.set_trace() not_accessed_deps=set(self.deps_direct)-set(self.packagesinfo.keys()) if not_accessed_deps: portage.util.writemsg(portage.output.colorize("WARN", "!!! ")) portage.util.writemsg("Warning! Some build time dependencies " + \ "of packages were not accessed: " + \ " ".join(not_accessed_deps) + "\n") def is_package_useful(self,pkg,stages,files): """ some basic heuristics here to cut part of packages """ excluded_paths=set( ['/etc/sandbox.d/'] ) excluded_packages=set( # autodep shows these two packages every time ['net-zope/zope-fixers', 'net-zope/zope-interface'] ) def is_pkg_excluded(p): for pkg in excluded_packages: if p.startswith(pkg): # if package is excluded return True return False def is_file_excluded(f): for path in excluded_paths: if f.startswith(path): # if path is excluded return True return False if is_pkg_excluded(pkg): return False for f in files: if is_file_excluded(f): continue # test 1: package is not useful if all files are *.desktop or *.xml or *.m4 if not (f.endswith(".desktop") or f.endswith(".xml") or f.endswith(".m4") or f.endswith(".pc")): break else: return False # we get here if cycle ends not with break return True