diff options
Diffstat (limited to 'pym/gentoolkit/equery/check.py')
-rw-r--r-- | pym/gentoolkit/equery/check.py | 323 |
1 files changed, 191 insertions, 132 deletions
diff --git a/pym/gentoolkit/equery/check.py b/pym/gentoolkit/equery/check.py index 2531970..011fd5d 100644 --- a/pym/gentoolkit/equery/check.py +++ b/pym/gentoolkit/equery/check.py @@ -4,7 +4,7 @@ # # $Header: $ -"""Check timestamps and MD5sums for files owned by a given installed package""" +"""Check timestamps and MD5 sums for files owned by a given installed package""" __docformat__ = 'epytext' @@ -14,43 +14,166 @@ __docformat__ = 'epytext' import os import sys +from functools import partial from getopt import gnu_getopt, GetoptError -try: - import portage.checksum as checksum -except ImportError: - import portage_checksum as checksum +import portage.checksum as checksum import gentoolkit.pprinter as pp -from gentoolkit.equery import format_options, mod_usage, Config -from gentoolkit.helpers2 import do_lookup +from gentoolkit import errors +from gentoolkit.equery import format_options, mod_usage, CONFIG +from gentoolkit.helpers import do_lookup # ======= # Globals # ======= QUERY_OPTS = { - "categoryFilter": None, - "includeInstalled": False, + "includeInstalled": True, "includeOverlayTree": False, "includePortTree": False, "checkMD5sum": True, "checkTimestamp" : True, "isRegex": False, - "matchExact": True, + "onlyFailures": False, "printMatchInfo": False, "showSummary" : True, "showPassedFiles" : False, "showFailedFiles" : True } +# ======= +# Classes +# ======= + +class VerifyContents(object): + """Verify installed packages' CONTENTS files. + + The CONTENTS file contains timestamps and MD5 sums for each file owned + by a package. + """ + def __init__(self, printer_fn=None): + """Create a VerifyObjects instance. + + @type printer_fn: callable + @param printer_fn: if defined, will be applied to each result as found + """ + self.check_sums = True + self.check_timestamps = True + self.printer_fn = printer_fn + + self.is_regex = False + + def __call__( + self, + pkgs, + is_regex=False, + check_sums=True, + check_timestamps=True + ): + self.is_regex = is_regex + self.check_sums = check_sums + self.check_timestamps = check_timestamps + + result = {} + for pkg in pkgs: + # _run_checks returns tuple(n_passed, n_checked, err) + check_results = self._run_checks(pkg.get_contents()) + result[pkg.cpv] = check_results + if self.printer_fn is not None: + self.printer_fn(pkg.cpv, check_results) + + return result + + def _run_checks(self, files): + """Run some basic sanity checks on a package's contents. + + If the file type (ftype) is not a directory or symlink, optionally + verify MD5 sums or mtimes via L{self._verify_obj}. + + @see: gentoolkit.packages.get_contents() + @type files: dict + @param files: in form {'PATH': ['TYPE', 'TIMESTAMP', 'MD5SUM']} + @rtype: tuple + @return: + n_passed (int): number of files that passed all checks + n_checked (int): number of files checked + errs (list): check errors' descriptions + """ + n_checked = 0 + n_passed = 0 + errs = [] + for cfile in files: + n_checked += 1 + ftype = files[cfile][0] + if not os.path.exists(cfile): + errs.append("%s does not exist" % cfile) + continue + elif ftype == "dir": + if not os.path.isdir(cfile): + err = "%(cfile)s exists, but is not a directory" + errs.append(err % locals()) + continue + elif ftype == "obj": + obj_errs = self._verify_obj(files, cfile, errs) + if len(obj_errs) > len(errs): + errs = obj_errs[:] + continue + elif ftype == "sym": + target = files[cfile][2].strip() + if not os.path.islink(cfile): + err = "%(cfile)s exists, but is not a symlink" + errs.append(err % locals()) + continue + tgt = os.readlink(cfile) + if tgt != target: + err = "%(cfile)s does not point to %(target)s" + errs.append(err % locals()) + continue + else: + err = "%(cfile)s has unknown type %(ftype)s" + errs.append(err % locals()) + continue + n_passed += 1 + + return n_passed, n_checked, errs + + def _verify_obj(self, files, cfile, errs): + """Verify the MD5 sum and/or mtime and return any errors.""" + + obj_errs = errs[:] + if self.check_sums: + md5sum = files[cfile][2] + try: + cur_checksum = checksum.perform_md5(cfile, calc_prelink=1) + except IOError: + err = "Insufficient permissions to read %(cfile)s" + obj_errs.append(err % locals()) + return obj_errs + if cur_checksum != md5sum: + err = "%(cfile)s has incorrect MD5sum" + obj_errs.append(err % locals()) + return obj_errs + if self.check_timestamps: + mtime = int(files[cfile][1]) + st_mtime = int(os.lstat(cfile).st_mtime) + if st_mtime != mtime: + err = ( + "%(cfile)s has wrong mtime (is %(st_mtime)d, should be " + "%(mtime)d)" + ) + obj_errs.append(err % locals()) + return obj_errs + + return obj_errs + # ========= # Functions # ========= def print_help(with_description=True): """Print description, usage and a detailed help message. - + @type with_description: bool @param with_description: if true, print module's __doc__ string """ @@ -60,9 +183,14 @@ def print_help(with_description=True): print # Deprecation warning added by djanderson, 12/2008 - pp.print_warn("Default action for this module has changed in Gentoolkit 0.3.") - pp.print_warn("Use globbing to simulate the old behavior (see man equery).") - pp.print_warn("Use '*' to check all installed packages.") + depwarning = ( + "Default action for this module has changed in Gentoolkit 0.3.", + "Use globbing to simulate the old behavior (see man equery).", + "Use '*' to check all installed packages.", + "Use 'foo-bar/*' to filter by category." + ) + for line in depwarning: + sys.stderr.write(pp.warn(line)) print print mod_usage(mod_name="check") @@ -70,135 +198,73 @@ def print_help(with_description=True): print pp.command("options") print format_options(( (" -h, --help", "display this help message"), - (" -c, --category CAT", "only check files from packages in CAT"), (" -f, --full-regex", "query is a regular expression"), + (" -o, --only-failures", "only display packages that do not pass"), )) +def checks_printer(cpv, data, verbose=True, only_failures=False): + """Output formatted results of pkg file(s) checks""" + seen = [] + + n_passed, n_checked, errs = data + n_failed = n_checked - n_passed + if only_failures and not n_failed: + return + else: + if verbose: + if not cpv in seen: + print "* Checking %s ..." % (pp.emph(str(cpv))) + seen.append(cpv) + else: + print "%s:" % cpv, + + if verbose: + for err in errs: + sys.stderr.write(pp.error(err)) + + if verbose: + n_passed = pp.number(str(n_passed)) + n_checked = pp.number(str(n_checked)) + info = " %(n_passed)s out of %(n_checked)s files passed" + print info % locals() + else: + print "failed(%s)" % n_failed + + def parse_module_options(module_opts): - """Parse module options and update GLOBAL_OPTS""" + """Parse module options and update QUERY_OPTS""" opts = (x[0] for x in module_opts) - posargs = (x[1] for x in module_opts) - for opt, posarg in zip(opts, posargs): + for opt in opts: if opt in ('-h', '--help'): print_help() sys.exit(0) - elif opt in ('-c', '--category'): - QUERY_OPTS['categoryFilter'] = posarg elif opt in ('-f', '--full-regex'): QUERY_OPTS['isRegex'] = True - - -def run_checks(files): - """Run some basic sanity checks on a package's contents. - - If the file type (ftype) is not a directory or symlink, optionally - verify MD5 sums or mtimes via verify_obj(). - - @see: gentoolkit.packages.get_contents() - @type files: dict - @param files: in form {'PATH': ['TYPE', 'TIMESTAMP', 'MD5SUM']} - @rtype: tuple - @return: - passed (int): number of files that passed all checks - checked (int): number of files checked - errs (list): check errors' descriptions - """ - - checked = 0 - passed = 0 - errs = [] - for cfile in files: - checked += 1 - ftype = files[cfile][0] - if not os.path.exists(cfile): - errs.append("%s does not exist" % cfile) - continue - elif ftype == "dir": - if not os.path.isdir(cfile): - err = "%(cfile)s exists, but is not a directory" - errs.append(err % locals()) - continue - elif ftype == "obj": - new_errs = verify_obj(files, cfile, errs) - if new_errs != errs: - errs = new_errs - continue - elif ftype == "sym": - target = files[cfile][2].strip() - if not os.path.islink(cfile): - err = "%(cfile)s exists, but is not a symlink" - errs.append(err % locals()) - continue - tgt = os.readlink(cfile) - if tgt != target: - err = "%(cfile)s does not point to %(target)s" - errs.append(err % locals()) - continue - else: - err = "%(cfile)s has unknown type %(ftype)s" - errs.append(err % locals()) - continue - passed += 1 - - return passed, checked, errs - - -def verify_obj(files, cfile, errs): - """Verify the MD5 sum and/or mtime and return any errors.""" - - if QUERY_OPTS["checkMD5sum"]: - md5sum = files[cfile][2] - try: - cur_checksum = checksum.perform_md5(cfile, calc_prelink=1) - except IOError: - err = "Insufficient permissions to read %(cfile)s" - errs.append(err % locals()) - return errs - if cur_checksum != md5sum: - err = "%(cfile)s has incorrect MD5sum" - errs.append(err % locals()) - return errs - if QUERY_OPTS["checkTimestamp"]: - mtime = int(files[cfile][1]) - st_mtime = os.lstat(cfile).st_mtime - if st_mtime != mtime: - err = "%(cfile)s has wrong mtime (is %(st_mtime)d, " + \ - "should be %(mtime)d)" - errs.append(err % locals()) - return errs - - return errs + elif opt in ('-o', '--only-failures'): + QUERY_OPTS['onlyFailures'] = True def main(input_args): """Parse input and run the program""" - short_opts = "hac:f" - long_opts = ('help', 'all', 'category=', 'full-regex') + short_opts = "hof" + long_opts = ('help', 'only-failures', 'full-regex') try: module_opts, queries = gnu_getopt(input_args, short_opts, long_opts) except GetoptError, err: - pp.print_error("Module %s" % err) + sys.stderr.write(pp.error("Module %s" % err)) print print_help(with_description=False) sys.exit(2) parse_module_options(module_opts) - - if not queries and not QUERY_OPTS["includeInstalled"]: + + if not queries: print_help() sys.exit(2) - elif queries and not QUERY_OPTS["includeInstalled"]: - QUERY_OPTS["includeInstalled"] = True - elif QUERY_OPTS["includeInstalled"]: - queries = ["*"] - - # - # Output - # first_run = True for query in queries: @@ -208,25 +274,18 @@ def main(input_args): matches = do_lookup(query, QUERY_OPTS) if not matches: - pp.print_error("No package found matching %s" % query) + raise errors.GentoolkitNoMatches(query, in_installed=True) matches.sort() - for pkg in matches: - if Config['verbose']: - print " * Checking %s ..." % pp.emph(pkg.cpv) - else: - print "%s:" % pkg.cpv - - passed, checked, errs = run_checks(pkg.get_contents()) - - if Config['verbose']: - for err in errs: - pp.print_error(err) + printer = partial( + checks_printer, + verbose=CONFIG['verbose'], + only_failures=QUERY_OPTS['onlyFailures'] + ) + check = VerifyContents(printer_fn=printer) + check(matches) - passed = pp.number(str(passed)) - checked = pp.number(str(checked)) - info = " %(passed)s out of %(checked)s files passed" - print info % locals() + first_run = False - first_run = False +# vim: set ts=4 sw=4 tw=79: |