diff options
author | 2014-12-20 21:08:02 +0200 | |
---|---|---|
committer | 2014-12-20 21:08:02 +0200 | |
commit | 1456cc998790edefa9cdef8a21bf47882c7d3574 (patch) | |
tree | c123da8f424d779043ad7d58f2b78311cab18e08 /gkeys | |
parent | Merge pull request #34 from gentoo/local_conf (diff) | |
parent | gkeys.base.py: Fix a unicode handling error in print_results() (diff) | |
download | gentoo-keys-1456cc998790edefa9cdef8a21bf47882c7d3574.tar.gz gentoo-keys-1456cc998790edefa9cdef8a21bf47882c7d3574.tar.bz2 gentoo-keys-1456cc998790edefa9cdef8a21bf47882c7d3574.zip |
Merge pull request #35 from gentoo/dol-sen-PR
Dol sen pr, new glepcheck action and other fixes
Diffstat (limited to 'gkeys')
-rw-r--r-- | gkeys/actions.py | 182 | ||||
-rw-r--r-- | gkeys/base.py | 253 | ||||
-rw-r--r-- | gkeys/checks.py | 353 | ||||
-rw-r--r-- | gkeys/cli.py | 207 | ||||
-rw-r--r-- | gkeys/config.py | 6 | ||||
-rw-r--r-- | gkeys/lib.py | 41 | ||||
-rw-r--r-- | gkeys/seedhandler.py | 14 |
7 files changed, 833 insertions, 223 deletions
diff --git a/gkeys/actions.py b/gkeys/actions.py index dc36f7c..a224372 100644 --- a/gkeys/actions.py +++ b/gkeys/actions.py @@ -22,27 +22,31 @@ from sslfetch.connections import Connector from gkeys.lib import GkeysGPG from gkeys.seedhandler import SeedHandler from gkeys.config import GKEY +from gkeys.checks import SPECCHECK_SUMMARY, convert_pf, convert_yn Available_Actions = ['listseed', 'addseed', 'removeseed', 'moveseed', 'fetchseed', 'listseedfiles', 'listkey', 'installkey', 'removekey', 'movekey', - 'installed', 'importkey', 'verify', 'checkkey', 'sign'] + 'installed', 'importkey', 'verify', 'checkkey', 'sign', 'speccheck'] + 'refreshkey'] Action_Options = { - 'listseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile', 'file'], + 'listseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile', '1file'], 'addseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile'], 'removeseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile'], 'moveseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile', 'dest'], 'fetchseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile'], 'listseedfiles': [], 'listkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'gpgsearch', 'keyid'], - 'installkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'seedfile'], + 'installkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'seedfile', '1file'], 'removekey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring'], 'movekey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'dest'], 'installed': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring'], 'importkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring'], - 'verify': ['dest', 'nick', 'name', 'keydir', 'fingerprint', 'category', 'file', 'signature', 'keyring', 'timestamp'], + 'verify': ['dest', 'nick', 'name', 'keydir', 'fingerprint', 'category', '1file', 'signature', 'keyring', 'timestamp'], 'checkkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'keyid'], 'sign': ['nick', 'name', 'keydir', 'fingerprint', 'file', 'keyring'], + 'speccheck': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'keyid'], + 'refreshkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'keyid'], } @@ -77,7 +81,7 @@ class Actions(object): '''Download the selected seed file(s)''' self.logger.debug("ACTIONS: fetchseed; args: %s" % str(args)) handler = SeedHandler(self.logger, self.config) - success, messages = handler.fetch_seeds(args.category, args, self.verify) + success, messages = handler.fetch_seeds(args.seedfile, args, self.verify) messages.append("") messages.append("Fetch operation completed") @@ -311,7 +315,9 @@ class Actions(object): self.output('', '\n Checking keys...') for gkey in sorted(keyresults): self.logger.info("Checking key %s, %s" % (gkey.nick, gkey.keyid)) - self.output('', " %s: %s" % (gkey.name, ', '.join(gkey.keyid))) + self.output('', + "\n %s, %s: %s" % (gkey.nick, gkey.name, ', '.join(gkey.keyid)) + + "\n ==============================================") self.logger.debug("ACTIONS: checkkey; gkey = %s" % str(gkey)) for key in gkey.keyid: results[gkey.name] = self.gpg.check_keys(gkey.keydir, key) @@ -332,9 +338,137 @@ class Actions(object): if failed['sign']: self.output([failed['sign']], '\n No signing capable subkeys:\n') return (len(failed) <1, - ['\nFound:\n-------', 'Expired: %d\nRevoked: %d\nInvalid: %d\nNo signing capable subkeys: %d' - % (len(failed['expired']), len(failed['revoked']), - len(failed['invalid']), len(failed['sign'])) + ['\nFound:\n-------', 'Expired: %d' % len(failed['expired']), + 'Revoked: %d' % len(failed['revoked']), + 'Invalid: %d' % len(failed['invalid']), + 'No signing capable subkeys: %d' % len(failed['sign']) + ]) + + + def speccheck(self, args): + '''Check keys actions''' + if not args.category: + return (False, ["Please specify seeds type."]) + self.logger.debug("ACTIONS: speccheck; args: %s" % str(args)) + handler = SeedHandler(self.logger, self.config) + seeds = handler.load_category(args.category) + catdir = self.config.get_key(args.category + "-category") + self.logger.debug("ACTIONS: speccheck; catdir = %s" % catdir) + self.gpg = GkeysGPG(self.config, catdir) + results = {} + failed = defaultdict(list) + kwargs = handler.build_gkeydict(args) + keyresults = seeds.list(**kwargs) + self.output('', '\n Checking keys...') + for gkey in sorted(keyresults): + self.logger.info("Checking key %s, %s" % (gkey.nick, gkey.keyid)) + self.output('', + "\n %s, %s: %s" % (gkey.nick, gkey.name, ', '.join(gkey.keyid)) + + "\n ==============================================") + self.logger.debug("ACTIONS: speccheck; gkey = %s" % str(gkey)) + for key in gkey.keyid: + results = self.gpg.speccheck(gkey.keydir, key) + for g in results: + pub_pass = {} + for key in results[g]: + self.output('', key.pretty_print()) + + if key.key is "PUB": + pub_pass = { + 'key': key, + 'pub': key.passed_spec, + 'sign': False, + 'encrypt': False, + 'auth': False, + 'signs': [], + 'encrypts': [], + 'authens': [], + 'final': False, + } + if key.key is "SUB": + if key.sign_capable and key.passed_spec: + pub_pass['signs'].append(key.passed_spec) + pub_pass['sign'] = True + if key.encrypt_capable: + pub_pass['encrypts'].append(key.passed_spec) + pub_pass['encrypt'] = True + if key.capabilities == 'a': + pub_pass['authens'].append(key.passed_spec) + if key.passed_spec: + pub_pass['auth'] = True + validity = key.validity.split(',')[0] + if not key.expire and not 'r' in validity: + failed['expired'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint)) + if 'r' in validity: + failed['revoked'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint)) + if 'i' in validity: + failed['invalid'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint)) + if key.capabilities not in ['a', 'e']: + if not key.algo: + failed['algo'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint)) + if not key.bits: + failed['bits'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint)) + if "Warning" in key.expire_reason: + failed['warn'].append("%s <%s>: %s " % (gkey.name, gkey.nick, key.fingerprint)) + if True in pub_pass['signs']: + pub_pass['sign'] = True + if True in pub_pass['encrypts']: + pub_pass['encrypt'] = True + if not pub_pass['sign']: + failed['sign'].append("%s <%s>: %s" % (gkey.name, gkey.nick, pub_pass['key'].fingerprint)) + if not pub_pass['encrypt']: + failed['encrypt'].append("%s <%s>: %s" % (gkey.name, gkey.nick, pub_pass['key'].fingerprint)) + spec = "%s <%s>: %s" % (gkey.name, gkey.nick, pub_pass['key'].fingerprint) + for k in ['pub', 'sign']: + if pub_pass[k]: + pub_pass['final'] = True + else: + pub_pass['final'] = False + break + if pub_pass['final']: + if spec not in failed['spec-approved']: + failed['spec-approved'].append(spec) + else: + if spec not in failed['spec']: + failed['spec'].append(spec) + sdata = convert_pf(pub_pass, ['pub', 'sign', 'final']) + sdata = convert_yn(sdata, ['auth', 'encrypt']) + self.output('', SPECCHECK_SUMMARY % sdata) + + if failed['revoked']: + self.output([sorted(set(failed['revoked']))], '\n Revoked keys:') + if failed['invalid']: + self.output([sorted(set(failed['invalid']))], '\n Invalid keys:') + if failed['sign']: + self.output([sorted(set(failed['sign']))], '\n No signing capable subkey:') + if failed['encrypt']: + self.output([sorted(set(failed['encrypt']))], '\n No Encryption capable subkey (Notice only):') + if failed['algo']: + self.output([sorted(set(failed['algo']))], '\n Incorrect Algorithm:') + if failed['bits']: + self.output([sorted(set(failed['bits']))], '\n Incorrect bit length:') + if failed['expired']: + self.output([sorted(set(failed['expired']))], '\n Expiry keys:') + if failed['warn']: + self.output([sorted(set(failed['warn']))], '\n Expiry Warnings:') + if failed['spec']: + self.output([sorted(set(failed['spec']))], '\n Failed to pass SPEC requirements:') + if failed['spec-approved']: + self.output([sorted(set(failed['spec-approved']))], '\n SPEC Approved:') + + return (len(failed) <1, + ['\nFound Failures:\n-------', + 'Revoked................: %d' % len(set(failed['revoked'])), + 'Invalid................: %d' % len(set(failed['invalid'])), + 'No Signing subkey......: %d' % len(set(failed['sign'])), + 'No Encryption subkey...: %d' % len(set(failed['encrypt'])), + 'Algorithm..............: %d' % len(set(failed['algo'])), + 'Bit length.............: %d' % len(set(failed['bits'])), + 'Expiry.................: %d' % len(set(failed['expired'])), + 'Expiry Warnings........: %d' % len(set(failed['warn'])), + 'SPEC requirements......: %d' % len(set(failed['spec'])), + '=============================', + 'SPEC Approved..........: %d' % len(set(failed['spec-approved'])), ]) @@ -525,6 +659,8 @@ class Actions(object): success_fetch = os.path.isfile(signature) if success_fetch: break + else: + sig_path = signature messages = [] self.logger.info("Verifying file...") verified = False @@ -601,3 +737,31 @@ class Actions(object): ) success.append(True) return (False not in success, ['', msgs]) + + + def refreshkey(self, args): + '''Calls gpg with the --refresh-keys option + for in place updates of the installed keys''' + if not args.category: + return (False, ["Please specify seeds type."]) + self.logger.debug("ACTIONS: refreshkey; args: %s" % str(args)) + handler = SeedHandler(self.logger, self.config) + seeds = handler.load_category(args.category) + catdir = self.config.get_key(args.category + "-category") + self.logger.debug("ACTIONS: refreshkey; catdir = %s" % catdir) + self.gpg = GkeysGPG(self.config, catdir) + results = {} + kwargs = handler.build_gkeydict(args) + keyresults = seeds.list(**kwargs) + self.output('', '\n Refreshig keys...') + for gkey in sorted(keyresults): + self.logger.info("Refreshig key %s, %s" % (gkey.nick, gkey.keyid)) + self.output('', " %s: %s" % (gkey.name, ', '.join(gkey.keyid))) + #self.output('', " ===============") + self.logger.debug("ACTIONS: refreshkey; gkey = %s" % str(gkey)) + results[gkey.keydir] = self.gpg.refresh_key(gkey) + return (True, ['Completed']) + + + + diff --git a/gkeys/base.py b/gkeys/base.py new file mode 100644 index 0000000..95142f9 --- /dev/null +++ b/gkeys/base.py @@ -0,0 +1,253 @@ +# +#-*- coding:utf-8 -*- + +""" + Gentoo-keys - base.py + + Command line interface argsparse options module + and common functions + + @copyright: 2012 by Brian Dolbec <dol-sen@gentoo.org> + @license: GNU GPL2, see COPYING for details. +""" + +from __future__ import print_function + + +import argparse + +from gkeys import config, fileops, seed, lib +from gkeys.log import log_levels, set_logger + + +class CliBase(object): + '''Common cli and argsparse options class''' + + + def __init__(self): + self.cli_config = { + 'Actions': [], + 'Available_Actions': [], + 'Action_Options': [], + 'prog': 'gkeys', + 'description': 'Gentoo-keys manager program', + 'epilog': '''Caution: adding untrusted keys to these keyrings can + be hazardous to your system!''' + } + self.config = None + self.args = None + self.seeds = None + self.actions = None + + + @staticmethod + def _option_dest(parser=None): + parser.add_argument('-d', '--dest', dest='destination', default=None, + help='The destination seed file or keydir for move, copy operations') + + @staticmethod + def _option_fingerprint(parser=None): + parser.add_argument('-f', '--fingerprint', dest='fingerprint', + default=None, nargs='+', + help='The fingerprint of the the key') + + @staticmethod + def _option_gpgsearch(parser=None): + parser.add_argument('-g', '--gpgsearch', dest='gpgsearch', default=None, + help='Do a gpg search operations, rather than a gkey search') + + @staticmethod + def _option_keyid(parser=None): + parser.add_argument('-i', '--keyid', dest='keyid', default=None, + nargs='+', + help='The long keyid of the gpg key to search for') + + @staticmethod + def _option_keyring(parser=None): + parser.add_argument('-k', '--keyring', dest='keyring', default='trusted_keyring', + help='The name of the keyring to use') + + @staticmethod + def _option_nick(parser=None): + parser.add_argument('-n', '--nick', dest='nick', default=None, + help='The nick associated with the the key') + + @staticmethod + def _option_name(parser=None): + parser.add_argument('-N', '--name', dest='name', nargs='*', + default=None, help='The name of the the key') + + @staticmethod + def _option_category(parser=None): + parser.add_argument('-C', '--category', + choices=['rel', 'dev', 'overlays', 'sign'], dest='category', default=None, + help='The key or seed directory category to use or update') + + @staticmethod + def _option_cleankey(parser=None): + parser.add_argument('--clean-key', + dest='cleankey', default=False, + help='Clean the key from the keyring due to failures') + + @staticmethod + def _option_cleanseed(parser=None): + parser.add_argument('--clean-seed', + dest='cleanseed', default=False, + help='Clean the seed from the seedfile due to failures. ' + 'Used during binary keyring release creation.') + + @staticmethod + def _option_keydir(parser=None): + parser.add_argument('-r', '--keydir', dest='keydir', default=None, + help='The keydir to use, update or search for/in') + + @staticmethod + def _option_seedfile(parser=None): + parser.add_argument('-S', '--seedfile', dest='seedfile', default=None, + help='The seedfile to use from the gkeys.conf file') + + @staticmethod + def _option_file(parser=None): + parser.add_argument('-F', '--file', dest='filename', default=None, + nargs='+', + help='The path/URL to use for the (signed) file') + + @staticmethod + def _option_1file(parser=None): + parser.add_argument('-F', '--file', dest='filename', default=None, + help='The path/URL to use for the (signed) file') + + @staticmethod + def _option_signature(parser=None): + parser.add_argument('-s','--signature', dest='signature', default=None, + help='The path/URL to use for the signature') + + @staticmethod + def _option_timestamp(parser=None): + parser.add_argument('-t', '--timestamp', dest='timestamp', type=bool, + default=False, + help='Turn on timestamp use') + + @staticmethod + def _option_mail(parser=None): + parser.add_argument('-m', '--mail', dest='mail', default=None, + help='The email address to search for') + + @staticmethod + def _option_status(parser=None): + parser.add_argument('-A', '--status', default=False, + help='The active status of the member') + + + def parse_args(self, args): + '''Parse a list of aruments + + @param args: list + @returns argparse.Namespace object + ''' + #logger.debug('CliBase: parse_args; args: %s' % args) + parser = argparse.ArgumentParser( + prog=self.cli_config['prog'], + description=self.cli_config['description'], + epilog=self.cli_config['epilog']) + + # options + parser.add_argument('-c', '--config', dest='config', default=None, + help='The path to an alternate config file') + parser.add_argument('-D', '--debug', default='DEBUG', + choices=list(log_levels), + help='The logging level to set for the logfile') + + + subparsers = parser.add_subparsers(help='actions') + for name in self.cli_config['Available_Actions']: + action_method = getattr(self.cli_config['Actions'], name) + actiondoc = action_method.__doc__ + try: + text = actiondoc.splitlines()[0] + except AttributeError: + text = "" + action_parser = subparsers.add_parser( + name, + help=text, + description=actiondoc, + formatter_class=argparse.RawDescriptionHelpFormatter) + action_parser.set_defaults(action=name) + self._add_options(action_parser, self.cli_config['Action_Options'][name]) + + return parser.parse_args(args) + + + def _add_options(self, parser, options): + for opt in options: + getattr(self, '_option_%s' % opt)(parser) + + + def run(self, args): + '''Run the args passed in + + @param args: list or argparse.Namespace object + ''' + global logger + message = None + if not args: + message = "Main: run; invalid args argument passed in" + if isinstance(args, list): + args = self.parse_args(args) + if args.config: + self.config.defaults['config'] = args.config + # now make it load the config file + self.config.read_config() + + # establish our logger and update it in the imported files + logger = set_logger('gkeys', self.config['logdir'], args.debug, + dirmode=int(self.config.get_key('permissions', 'directories'),0), + filemask=int(self.config.get_key('permissions', 'files'),0)) + config.logger = logger + fileops.logger = logger + seed.logger = logger + lib.logger = logger + + if message: + logger.error(message) + + # now that we have a logger, record the alternate config setting + if args.config: + logger.debug("Main: run; Found alternate config request: %s" + % args.config) + + # establish our actions instance + self.actions = self.cli_config['Actions'](self.config, self.output_results, logger) + + # run the action + func = getattr(self.actions, '%s' % args.action) + logger.debug('Main: run; Found action: %s' % args.action) + success, results = func(args) + if not results: + print("No results found. Check your configuration and that the", + "seed file exists.") + return success + if self.config.options['print_results'] and 'done' not in list(results): + self.output_results(results, '\n Gkey task results:') + return success + + + @staticmethod + def output_results(results, header): + # super simple output for the time being + if header: + print(header) + for msg in results: + if isinstance(msg, str) or isinstance(msg, unicode): + print(' ', msg) + else: + try: + print("\n".join([x.pretty_print for x in msg])) + except AttributeError: + for x in msg: + print(' ', x) + print() + + + def output_failed(self, failed): + pass diff --git a/gkeys/checks.py b/gkeys/checks.py index 2d4be4c..db3d59f 100644 --- a/gkeys/checks.py +++ b/gkeys/checks.py @@ -9,42 +9,172 @@ @license: GNU GPL2, see COPYING for details """ +import time +from collections import namedtuple, OrderedDict from gkeys.config import GKEY_CHECK +from pyGPG.mappings import (ALGORITHM_CODES, CAPABILITY_MAP, + KEY_VERSION_FPR_LEN, VALIDITY_MAP, INVALID_LIST, + VALID_LIST) + + +SPEC_INDEX = { + 'key': 0, + 'capabilities': 1, + 'fingerprint': 2, + 'bits': 3, + 'created': 4, + 'expire': 5, + 'encrypt_capable': 6, + 'sign_capable': 7, + 'algo': 8, + 'version': 9, + 'id': 10, + 'days': 11, + 'validity': 12, + 'expire_reason': 13, + 'long_caps': 14, # long version of the capbilities + 'caps': 15, + 'caps_reason': 16, + 'id_reason': 17, + 'is_valid': 18, + 'passed_spec': 19, +} + +SPEC_INDEX = OrderedDict(sorted(SPEC_INDEX.items(), key=lambda t: t[1])) + +SPEC_STAT = ['', '','', False, False, False, False, False, False, False, False, + 0, '', '', '', True, '', '', False, False] # Default glep 63 minimum gpg key specification +# and approved options, limits TEST_SPEC = { 'bits': { 'DSA': 2048, 'RSA': 2048, }, - 'expire': 36, # in months + 'expire': 3 * 365, # in days 'subkeys': { # warning/error mode - 'encryption': { + 'encrypt': { 'mode': 'notice', - 'expire': -1, # -1 is the primary key expirery + 'expire': 3 * 365, }, 'sign': { 'mode': 'error', - 'expire': 12, + 'expire': 365, }, }, - 'type': ['DSA', 'RSA'], - 'version': 4, + 'algorithms': ['DSA', 'RSA', '1', '2', '3', '17'], + 'versions': ['4'], + 'qualified_id': '@gentoo.org', } +# Final pass/fail fields and the pass value required +TEST_REQUIREMENTS = { + 'bits': True, + 'created': True, + 'expire': True, + 'sign_capable': True, + 'algo': True, + 'version': True, + 'id': True, + 'is_valid': True, + 'caps': True, +} + +SECONDS_PER_DAY = 86400 + + +SPECCHECK_STRING = ''' ---------- + Fingerprint......: %(fingerprint)s + Key type ........: %(key)s Capabilities.: %(capabilities)s %(long_caps)s + Algorithm........: %(algo)s Bit Length...: %(bits)s + Create Date......: %(created)s Expire Date..: %(expire)s + Key Version......: %(version)s Validity.....: %(validity)s + Days till expiry.: %(days)s %(expire_reason)s + Capability.......: %(caps)s %(caps_reason)s + Qualified ID.....: %(id)s %(id_reason)s + This %(pub_sub)s.: %(passed_spec)s''' + +SPECCHECK_SUMMARY = ''' Key summary + primary..........: %(pub)s signing subkey: %(sign)s + encryption subkey: %(encrypt)s authentication subkey: %(auth)s + SPEC requirements: %(final)s +''' + +def convert_pf(data, fields): + '''Converts dictionary items from True/False to Pass/Fail strings + + @param data: dict + @param fields: list + @returns: dict + ''' + for f in fields: + if data[f]: + data[f] = 'Pass' + else: + data[f] = 'Fail' + return data + +def convert_yn(data, fields): + '''Converts dictionary items from True/False to Yes/No strings + + @param data: dict + @param fields: list + @returns: dict + ''' + for f in fields: + if data[f]: + data[f] = 'Yes ' + else: + data[f] = 'No ' + return data + + +class SpecCheck(namedtuple("SpecKey", list(SPEC_INDEX))): + + __slots__ = () + + def pretty_print(self): + data = self.convert_data() + output = SPECCHECK_STRING % (data) + return output + + + def convert_data(self): + data = dict(self._asdict()) + data = convert_pf(data, ['algo', 'bits', 'caps', 'created', 'expire', 'id', + 'passed_spec', 'version']) + for f in ['caps', 'id']: + data[f] = data[f].ljust(10) + data['validity'] += ', %s' % (VALIDITY_MAP[data['validity']]) + days = data['days'] + if days == float("inf"): + data['days'] = "infinite".ljust(10) + else: + data['days'] = str(int(data['days'])).ljust(10) + if data['capabilities'] == 'e': + data['algo'] = '----' + data['bits'] = '----' + if data['key'] =='PUB': + data['pub_sub'] = 'primary key' + else: + data['pub_sub'] = 'subkey.....' + return data + class KeyChecks(object): - '''Primary gpg key validation and glep spec checks class''' + '''Primary gpg key validation and specifications checks class''' - def __init__(self, logger, spec=TEST_SPEC): + def __init__(self, logger, spec=TEST_SPEC, qualified_id_check=True): '''@param spec: optional gpg specification to test against Defaults to TEST_SPEC ''' self.logger = logger self.spec = spec + self.check_id = qualified_id_check def validity_checks(self, keydir, keyid, result): @@ -97,8 +227,211 @@ class KeyChecks(object): return GKEY_CHECK(keyid, revoked, expired, invalid, sign) - def glep_check(self, keydir, keyid, result): + def spec_check(self, keydir, keyid, result): '''Performs the minimum specifications checks on the key''' - pass + self.logger.debug("SPEC_CHECK() : CHECKING: %s" % keyid) + results = {} + pub = None + stats = None + pub_days = 0 + for data in result.status.data: + if data.name == "PUB": + if stats: + stats = self._test_final(data, stats) + results[pub.long_keyid].append(SpecCheck._make(stats)) + pub = data + found_id = False + found_id_reason = '' + results[data.long_keyid] = [] + stats = SPEC_STAT[:] + stats[SPEC_INDEX['key']] = data.name + stats[SPEC_INDEX['capabilities']] = data.key_capabilities + stats[SPEC_INDEX['validity']] = data.validity + stats = self._test_created(data, stats) + stats = self._test_algo(data, stats) + stats = self._test_bits(data, stats) + stats = self._test_expire(data, stats, pub_days) + pub_days = stats[SPEC_INDEX['days']] + stats = self._test_caps(data, stats) + stats = self._test_validity(data, stats) + elif data.name == "FPR": + pub = pub._replace(**{'fingerprint': data.fingerprint}) + stats[SPEC_INDEX['fingerprint']] = data.fingerprint + stats = self._test_version(data, stats) + elif data.name == "UID": + stats = self._test_uid(data, stats) + if stats[SPEC_INDEX['id']] in [True, '-----']: + found_id = stats[SPEC_INDEX['id']] + found_id_reason = '' + stats[SPEC_INDEX['id_reason']] = '' + else: + found_id_reason = stats[SPEC_INDEX['id_reason']] + elif data.name == "SUB": + if stats: + stats = self._test_final(data, stats) + results[pub.long_keyid].append(SpecCheck._make(stats)) + stats = SPEC_STAT[:] + stats[SPEC_INDEX['key']] = data.name + stats[SPEC_INDEX['capabilities']] = data.key_capabilities + stats[SPEC_INDEX['fingerprint']] = '%s' \ + % (data.long_keyid) + stats[SPEC_INDEX['id']] = found_id + stats[SPEC_INDEX['id_reason']] = found_id_reason + stats[SPEC_INDEX['validity']] = data.validity + stats = self._test_validity(data, stats) + stats = self._test_created(data, stats) + stats = self._test_algo(data, stats) + stats = self._test_bits(data, stats) + stats = self._test_expire(data, stats, pub_days) + stats = self._test_caps(data, stats) + if stats: + stats = self._test_final(data, stats) + results[pub.long_keyid].append(SpecCheck._make(stats)) + stats = None + self.logger.debug("SPEC_CHECK() : COMPLETED: %s" % keyid) + return results + + + def _test_algo(self, data, stats): + algo = data.pubkey_algo + if algo in TEST_SPEC['algorithms']: + stats[SPEC_INDEX['algo']] = True + else: + self.logger.debug("ERROR in key %s : invalid Type: %s" + % (data.long_keyid, ALGORITHM_CODES[algo])) + return stats + + + def _test_bits(self, data, stats): + bits = int(data.keylength) + if data.pubkey_algo in TEST_SPEC['algorithms']: + if bits >= TEST_SPEC['bits'][ALGORITHM_CODES[data.pubkey_algo]]: + stats[SPEC_INDEX['bits']] = True + else: + self.logger.debug("ERROR in key %s : invalid Bit length: %d" + % (data.long_keyid, bits)) + return stats + + + def _test_version(self, data, stats): + fpr_l = len(data.fingerprint) + if KEY_VERSION_FPR_LEN[fpr_l] in TEST_SPEC['versions']: + stats[SPEC_INDEX['version']] = True + else: + self.logger.debug("ERROR in key %s : invalid gpg key version: %s" + % (data.long_keyid, KEY_VERSION_FPR_LEN[fpr_l])) + return stats + + + def _test_created(self, data, stats): + try: + created = float(data.creation_date) + except ValueError: + created = 0 + if created <= time.time() : + stats[SPEC_INDEX['created']] = True + else: + self.logger.debug("ERROR in key %s : invalid gpg key creation date: %s" + % (data.long_keyid, data.creation_date)) + return stats + + + def _test_expire(self, data, stats, pub_days): + if data.name in ["PUB"]: + delta_t = TEST_SPEC['expire'] + stats = self._expire_check(data, stats, delta_t, pub_days) + return stats + else: + for cap in data.key_capabilities: + try: + delta_t = TEST_SPEC['subkeys'][CAPABILITY_MAP[cap]]['expire'] + except KeyError: + self.logger.debug( + "WARNING in capability key %s : setting delta_t to main expiry: %d" + % (cap, TEST_SPEC['expire'])) + delta_t = TEST_SPEC['expire'] + stats = self._expire_check(data, stats, delta_t, pub_days) + return stats + + + def _expire_check(self, data, stats, delta_t, pub_days): + today = time.time() + try: + expires = float(data.expiredate) + except ValueError: + expires = float("inf") + if data.name == 'SUB' and expires == float("inf"): + days = stats[SPEC_INDEX['days']] = pub_days + elif expires == float("inf"): + days = stats[SPEC_INDEX['days']] = expires + else: + days = stats[SPEC_INDEX['days']] = max(0, int((expires - today)/SECONDS_PER_DAY)) + if days <= delta_t: + stats[SPEC_INDEX['expire']] = True + elif days > delta_t and not ('i' in data.validity or 'r' in data.validity): + stats[SPEC_INDEX['expire_reason']] = '<== Exceeds specification' + else: + self.logger.debug("ERROR in key %s : invalid gpg key expire date: %s" + % (data.long_keyid, data.expiredate)) + if 0 < days < 30 and not ('i' in data.validity or 'r' in data.validity): + stats[SPEC_INDEX['expire_reason']] = '<== WARNING < 30 days' + + return stats + + + def _test_caps(self, data, stats): + if 'e' in data.key_capabilities: + if 's' in data.key_capabilities or 'a' in data.key_capabilities: + stats[SPEC_INDEX['caps']] = False + stats[SPEC_INDEX['caps_reason']] = "<== Mixing of 'e' with 's' and/or 'a'" + if not stats[SPEC_INDEX['is_valid']]: + return stats + kcaps = [] + for cap in data.key_capabilities: + if CAPABILITY_MAP[cap] and stats[SPEC_INDEX['caps']]: + kcaps.append(CAPABILITY_MAP[cap]) + if cap in ["s"] and not data.name == "PUB": + stats[SPEC_INDEX['sign_capable']] = True + elif cap in ["e"]: + stats[SPEC_INDEX['encrypt_capable']] = True + elif cap not in CAPABILITY_MAP: + stats[SPEC_INDEX['caps']] = False + self.logger.debug("ERROR in key %s : unknown gpg key capability: %s" + % (data.long_keyid, cap)) + stats[SPEC_INDEX['long_caps']] = ', '.join(kcaps) + return stats + + + def _test_uid(self, data, stats): + if not self.check_id: + stats[SPEC_INDEX['id']] = '-----' + stats[SPEC_INDEX['id_reason']] = '' + return stats + if TEST_SPEC['qualified_id'] in data.user_ID : + stats[SPEC_INDEX['id']] = True + stats[SPEC_INDEX['id_reason']] = '' + else: + stats[SPEC_INDEX['id_reason']] = "<== '%s' user id not found" % TEST_SPEC['qualified_id'] + self.logger.debug("Warning: No qualified ID found in key %s" + % (data.user_ID)) + return stats + + + def _test_validity(self, data, stats): + if data.validity in VALID_LIST: + stats[SPEC_INDEX['is_valid']] = True + return stats + def _test_final(self, data, stats): + for test, result in TEST_REQUIREMENTS.items(): + if ((stats[SPEC_INDEX['key']] == 'PUB' and test == 'sign_capable') or + (stats[SPEC_INDEX['capabilities']] == 'e' and test in ['algo', 'bits', 'sign_capable']) + or (stats[SPEC_INDEX['capabilities']] == 'a' and test in ['sign_capable'])): + continue + if stats[SPEC_INDEX[test]] == result: + stats[SPEC_INDEX['passed_spec']] = True + else: + stats[SPEC_INDEX['passed_spec']] = False + break + return stats diff --git a/gkeys/cli.py b/gkeys/cli.py index 4c1e946..32d2ec4 100644 --- a/gkeys/cli.py +++ b/gkeys/cli.py @@ -13,17 +13,15 @@ from __future__ import print_function -import argparse import sys -from gkeys import config, fileops, seed, lib +from gkeys.base import CliBase from gkeys.actions import Actions, Available_Actions, Action_Options from gkeys.config import GKeysConfig -from gkeys.log import log_levels, set_logger -class Main(object): +class Main(CliBase): '''Main command line interface class''' @@ -32,200 +30,29 @@ class Main(object): @param root: string, root path to use """ + CliBase.__init__(self) self.root = root or "/" self.config = config or GKeysConfig(root=root) self.config.options['print_results'] = print_results - self.args = None - self.seeds = None - self.actions = None + self.cli_config = { + 'Actions': Actions, + 'Available_Actions': Available_Actions, + 'Action_Options': Action_Options, + 'prog': 'gkeys', + 'description': 'Gentoo-keys manager program', + 'epilog': '''Caution: adding untrusted keys to these keyrings can + be hazardous to your system!''' + } def __call__(self, args=None): + """Main class call function + + @param args: Optional list of argumanets to parse and action to run + Defaults to sys.argv[1:] + """ if args: return self.run(self.parse_args(args)) else: return self.run(self.parse_args(sys.argv[1:])) - - def _add_options(self, parser, options): - for opt in options: - getattr(self, '_option_%s' % opt)(parser) - - @staticmethod - def _option_dest(parser=None): - parser.add_argument('-d', '--dest', dest='destination', default=None, - help='The destination seed file or keydir for move, copy operations') - - @staticmethod - def _option_fingerprint(parser=None): - parser.add_argument('-f', '--fingerprint', dest='fingerprint', - default=None, nargs='+', - help='The fingerprint of the the key') - - @staticmethod - def _option_gpgsearch(parser=None): - parser.add_argument('-g', '--gpgsearch', dest='gpgsearch', default=None, - help='Do a gpg search operations, rather than a gkey search') - - @staticmethod - def _option_keyid(parser=None): - parser.add_argument('-i', '--keyid', dest='keyid', default=None, - nargs='+', - help='The long keyid of the gpg key to search for') - - @staticmethod - def _option_keyring(parser=None): - parser.add_argument('-k', '--keyring', dest='keyring', default='trusted_keyring', - help='The name of the keyring to use') - - @staticmethod - def _option_nick(parser=None): - parser.add_argument('-n', '--nick', dest='nick', default=None, - help='The nick associated with the the key') - - @staticmethod - def _option_name(parser=None): - parser.add_argument('-N', '--name', dest='name', nargs='*', - default=None, help='The name of the the key') - - @staticmethod - def _option_category(parser=None): - parser.add_argument('-C', '--category', - choices=['rel', 'dev', 'overlays', 'sign'], dest='category', default=None, - help='The key or seed directory category to use or update') - - @staticmethod - def _option_keydir(parser=None): - parser.add_argument('-r', '--keydir', dest='keydir', default=None, - help='The keydir to use, update or search for/in') - - @staticmethod - def _option_seedfile(parser=None): - parser.add_argument('-S', '--seedfile', dest='seedfile', default=None, - help='The seedfile to use from the gkeys.conf file') - - @staticmethod - def _option_file(parser=None): - parser.add_argument('-F', '--file', dest='filename', default=None, - nargs='+', - help='The path/URL to use for the (signed) file') - - @staticmethod - def _option_signature(parser=None): - parser.add_argument('-s','--signature', dest='signature', default=None, - help='The path/URL to use for the signature') - - @staticmethod - def _option_timestamp(parser=None): - parser.add_argument('-t', '--timestamp', dest='timestamp', type=bool, - default=False, - help='Turn on timestamp use') - - - def parse_args(self, args): - '''Parse a list of aruments - - @param args: list - @returns argparse.Namespace object - ''' - #logger.debug('MAIN: parse_args; args: %s' % args) - parser = argparse.ArgumentParser( - prog='gkeys', - description='Gentoo-keys manager program', - epilog='''Caution: adding untrusted keys to these keyrings can - be hazardous to your system!''') - - # options - parser.add_argument('-c', '--config', dest='config', default=None, - help='The path to an alternate config file') - parser.add_argument('-D', '--debug', default='DEBUG', - choices=list(log_levels), - help='The logging level to set for the logfile') - - - subparsers = parser.add_subparsers(help='actions') - for name in Available_Actions: - action_method = getattr(Actions, name) - actiondoc = action_method.__doc__ - try: - text = actiondoc.splitlines()[0] - except AttributeError: - text = "" - action_parser = subparsers.add_parser( - name, - help=text, - description=actiondoc, - formatter_class=argparse.RawDescriptionHelpFormatter) - action_parser.set_defaults(action=name) - self._add_options(action_parser, Action_Options[name]) - - return parser.parse_args(args) - - - def run(self, args): - '''Run the args passed in - - @param args: list or argparse.Namespace object - ''' - global logger - message = None - if not args: - message = "Main: run; invalid args argument passed in" - if isinstance(args, list): - args = self.parse_args(args) - if args.config: - self.config.defaults['config'] = args.config - # now make it load the config file - self.config.read_config() - - # establish our logger and update it in the imported files - logger = set_logger('gkeys', self.config['logdir'], args.debug, - dirmode=int(self.config.get_key('permissions', 'directories'),0), - filemask=int(self.config.get_key('permissions', 'files'),0)) - config.logger = logger - fileops.logger = logger - seed.logger = logger - lib.logger = logger - - if message: - logger.error(message) - - # now that we have a logger, record the alternate config setting - if args.config: - logger.debug("Main: run; Found alternate config request: %s" - % args.config) - - # establish our actions instance - self.actions = Actions(self.config, self.output_results, logger) - - # run the action - func = getattr(self.actions, '%s' % args.action) - logger.debug('Main: run; Found action: %s' % args.action) - success, results = func(args) - if not results: - print("No results found. Check your configuration and that the", - "seed file exists.") - return success - if self.config.options['print_results'] and 'done' not in list(results): - self.output_results(results, '\n Gkey task results:') - return success - - - @staticmethod - def output_results(results, header): - # super simple output for the time being - if header: - print(header) - for msg in results: - if isinstance(msg, str): - print(' ', msg) - else: - try: - print("\n".join([x.pretty_print for x in msg])) - except AttributeError: - for x in msg: - print(' ', x) - - - def output_failed(self, failed): - pass diff --git a/gkeys/config.py b/gkeys/config.py index 7cefe92..775ea1f 100644 --- a/gkeys/config.py +++ b/gkeys/config.py @@ -49,7 +49,7 @@ GKEY_FINGERPRINTS = \ Fingerprint: %(fingerprint)s ''' -MAPSEEDS = { 'dev' : 'developers.seeds', 'rel': 'release.seeds' } +MAPSEEDS = { 'dev' : 'gentoodevs.seeds', 'rel': 'gentoo.seeds' } class GKeysConfig(GPGConfig): @@ -96,8 +96,8 @@ class GKeysConfig(GPGConfig): # NOTE: files is umask mode in octal, directories is chmod mode in octal self.defaults['permissions'] = {'files': '0o002', 'directories': '0o775',} self.defaults['seedurls'] = { - 'gentoo.seeds': 'https://api.gentoo.org/gentoo-keys/seeds/gentoo.seeds', - 'gentoodevs.seeds': 'https://api.gentoo.org/gentoo-keys/seeds/gentoodevs.seeds', + 'gentoo': 'https://api.gentoo.org/gentoo-keys/seeds/gentoo.seeds', + 'gentoodevs': 'https://api.gentoo.org/gentoo-keys/seeds/gentoodevs.seeds', 'gkey': 'gkeys', } self.defaults['sign'] = { diff --git a/gkeys/lib.py b/gkeys/lib.py index 31afbce..50ed63e 100644 --- a/gkeys/lib.py +++ b/gkeys/lib.py @@ -179,6 +179,24 @@ class GkeysGPG(GPG): return [] + def refresh_key(self, gkey): + '''Refresh the specified key in the specified keydir + + @param key: tuple of (name, nick, keydir, fingerprint) + @param keydir: the keydir to add the key to + ''' + self.config.defaults['gpg_defaults'].append('--no-permission-warning') + self.set_keyserver() + self.set_keydir(gkey.keydir, 'refresh-keys', reset=True) + self.set_keyring('pubring.gpg', 'refresh-keys', reset=False) + logger.debug("LIB: refresh_key, gkey: %s" % str(gkey)) + logger.debug("** Calling runGPG with Running 'gpg %s --refresh-keys' for: %s" + % (' '.join(self.config.get_key('tasks', 'refresh-keys')), str(gkey))) + result = self.runGPG(task='refresh-keys', inputfile='') + logger.info('GPG return code: ' + str(result.returncode)) + return result + + def update_key(self, gkey, keydir): '''Update the specified key in the specified keydir @@ -206,7 +224,7 @@ class GkeysGPG(GPG): task = 'list-keys' target = keydir self.set_keydir(keydir, task, fingerprint=True) - self.config.options['tasks'][task].extend(['--keyid-format', 'long']) + self.config.options['tasks'][task].extend(['--keyid-format', 'long', '--with-fingerprint']) if colons: task_value = ['--with-colons'] self.config.options['tasks'][task].extend(task_value) @@ -227,11 +245,27 @@ class GkeysGPG(GPG): @returns: GKEY_CHECK instance ''' if not result: - result = self.list_keys(keydir, colons=True) - checker = KeyChecks(logger) + result = self.list_keys(keydir, fingerprint=keyid, colons=True) + checker = KeyChecks(logger, qualified_id_check=True) return checker.validity_checks(keydir, keyid, result) + def speccheck(self, keydir, keyid, result=None): + '''Check specified or all keys based on the seed type + specifications are met. + + @param keydir: the keydir to list the keys for + @param keyid: the keyid to check + @param result: optional pyGPG.output.GPGResult object + @returns: SpecCheck instance + ''' + if not result: + result = self.list_keys(keydir, fingerprint=keyid, colons=True) + checker = KeyChecks(logger, qualified_id_check=True) + specchecks = checker.spec_check(keydir, keyid, result) + return specchecks + + def list_keydirs(self): '''List all available keydirs ''' @@ -255,6 +289,7 @@ class GkeysGPG(GPG): def verify_file(self, gkey, signature, filepath): '''Verify the file specified at filepath or url + @param gkey: GKEY instance of the gpg key used to verify it @param signature: string with the signature file @param filepath: string with the path or url of the signed file ''' diff --git a/gkeys/seedhandler.py b/gkeys/seedhandler.py index 8e06fab..cc797b9 100644 --- a/gkeys/seedhandler.py +++ b/gkeys/seedhandler.py @@ -14,7 +14,7 @@ import os import re from json import load -from gkeys.config import GKEY, MAPSEEDS +from gkeys.config import GKEY from gkeys.seed import Seeds from gkeys.fileops import ensure_dirs @@ -64,7 +64,7 @@ class SeedHandler(object): @param seedfile: string filepath of the file to load @return Seeds class instance of the file loaded ''' - if not seedfile and not filename: + if not seedfile and not filepath: self.logger.error("SeedHandler: load_seeds; no filename to load: " "setting = %s. Please use the -S or -F option to indicate: which seed " "file to use." % seedfile) @@ -127,17 +127,15 @@ class SeedHandler(object): messages = [] try: for seed in [seeds]: - seedurl = self.config.get_key('seedurls', MAPSEEDS[seed]) - seedpath = self.config.get_key('%s-seedfile' % seed) + seedurl = self.config.get_key('seedurls', seed) + seedpath = self.config.get_key('seeds', seed) if http_check.match(seedurl): urls.extend([(seedurl, seedpath)]) else: self.logger.info("Wrong seed file URLs... Switching to default URLs.") - urls.extend([(self.config['seedurls'][MAPSEEDS[seed]], seedpath)]) + urls.extend([(self.config['seedurls'][seed], seedpath)]) except KeyError: - for key, value in list(MAPSEEDS.items()): - seedpath = self.config.get_key('%s-seedfile' % key) - urls.extend([(self.config['seedurls'][value], seedpath)]) + pass succeeded = [] seedsdir = self.config.get_key('seedsdir') mode = int(self.config.get_key('permissions', 'directories'),0) |