diff options
author | 2014-12-20 13:17:27 -0800 | |
---|---|---|
committer | 2014-12-22 14:57:48 -0800 | |
commit | a153cacf6b47788c9a017c37f78469e009e4ffff (patch) | |
tree | 249a0ea9461276ed921ff5c56b08a474626080f8 /gkeys/gkeys | |
parent | Merge pull request #35 from gentoo/dol-sen-PR (diff) | |
download | gentoo-keys-a153cacf6b47788c9a017c37f78469e009e4ffff.tar.gz gentoo-keys-a153cacf6b47788c9a017c37f78469e009e4ffff.tar.bz2 gentoo-keys-a153cacf6b47788c9a017c37f78469e009e4ffff.zip |
Move the 3 pkgs into their own *-pkg dir
This makes releasing each pkg independently easier.
testpath: Update paths for the new directory structure
Diffstat (limited to 'gkeys/gkeys')
-rw-r--r-- | gkeys/gkeys/__init__.py | 5 | ||||
-rw-r--r-- | gkeys/gkeys/actions.py | 767 | ||||
-rw-r--r-- | gkeys/gkeys/base.py | 253 | ||||
-rw-r--r-- | gkeys/gkeys/checks.py | 437 | ||||
-rw-r--r-- | gkeys/gkeys/cli.py | 58 | ||||
-rw-r--r-- | gkeys/gkeys/config.py | 181 | ||||
-rw-r--r-- | gkeys/gkeys/fileops.py | 56 | ||||
-rw-r--r-- | gkeys/gkeys/lib.py | 345 | ||||
-rw-r--r-- | gkeys/gkeys/log.py | 72 | ||||
-rw-r--r-- | gkeys/gkeys/seed.py | 192 | ||||
-rw-r--r-- | gkeys/gkeys/seedhandler.py | 189 | ||||
-rw-r--r-- | gkeys/gkeys/utils.py | 161 |
12 files changed, 2716 insertions, 0 deletions
diff --git a/gkeys/gkeys/__init__.py b/gkeys/gkeys/__init__.py new file mode 100644 index 0000000..7e8b64e --- /dev/null +++ b/gkeys/gkeys/__init__.py @@ -0,0 +1,5 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +__version__ = 'Git' +__license__ = 'GPLv2' diff --git a/gkeys/gkeys/actions.py b/gkeys/gkeys/actions.py new file mode 100644 index 0000000..a224372 --- /dev/null +++ b/gkeys/gkeys/actions.py @@ -0,0 +1,767 @@ +# +#-*- coding:utf-8 -*- + +""" + Gentoo-keys - actions.py + + Primary api interface module + + @copyright: 2012 by Brian Dolbec <dol-sen@gentoo.org> + @license: GNU GPL2, see COPYING for details. +""" + +from __future__ import print_function + +import os + +from collections import defaultdict +from json import load +from shutil import rmtree +from sslfetch.connections import Connector + +from gkeys.lib import GkeysGPG +from gkeys.seedhandler import SeedHandler +from gkeys.config import GKEY +from gkeys.checks import SPECCHECK_SUMMARY, convert_pf, convert_yn + +Available_Actions = ['listseed', 'addseed', 'removeseed', 'moveseed', 'fetchseed', + 'listseedfiles', 'listkey', 'installkey', 'removekey', 'movekey', + 'installed', 'importkey', 'verify', 'checkkey', 'sign', 'speccheck'] + 'refreshkey'] + +Action_Options = { + 'listseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile', '1file'], + 'addseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile'], + 'removeseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile'], + 'moveseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile', 'dest'], + 'fetchseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile'], + 'listseedfiles': [], + 'listkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'gpgsearch', 'keyid'], + 'installkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'seedfile', '1file'], + 'removekey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring'], + 'movekey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'dest'], + 'installed': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring'], + 'importkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring'], + 'verify': ['dest', 'nick', 'name', 'keydir', 'fingerprint', 'category', '1file', 'signature', 'keyring', 'timestamp'], + 'checkkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'keyid'], + 'sign': ['nick', 'name', 'keydir', 'fingerprint', 'file', 'keyring'], + 'speccheck': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'keyid'], + 'refreshkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'keyid'], +} + + +class Actions(object): + '''Primary API actions''' + + def __init__(self, config, output=None, logger=None): + self.config = config + self.output = output + self.logger = logger + self.seeds = None + + + def listseed(self, args): + '''Pretty-print the selected seed file(s)''' + handler = SeedHandler(self.logger, self.config) + kwargs = handler.build_gkeydict(args) + self.logger.debug("ACTIONS: listseed; kwargs: %s" % str(kwargs)) + if not self.seeds: + try: + self.seeds = handler.load_seeds(args.seedfile, args.filename) + except ValueError: + return (False, ["Failed to load seed file. Consider fetching seedfiles."]) + if self.seeds: + results = self.seeds.list(**kwargs) + else: + results = '' + return (True, ['', results]) + + + def fetchseed(self, args): + '''Download the selected seed file(s)''' + self.logger.debug("ACTIONS: fetchseed; args: %s" % str(args)) + handler = SeedHandler(self.logger, self.config) + success, messages = handler.fetch_seeds(args.seedfile, args, self.verify) + + messages.append("") + messages.append("Fetch operation completed") + return (False not in success, messages) + + + def addseed(self, args): + '''Add or replace a key in the selected seed file(s)''' + handler = SeedHandler(self.logger, self.config) + gkeys = self.listseed(args)[1] + if not args.nick or not args.name or not args.fingerprint: + return (False, ["Provide a nickname, a name and a fingerprint."]) + gkey = handler.new(args, checkgkey=True) + if not gkey: + return (False, ["Failed to create a valid GKEY instance.", + "Check for invalid data entries"]) + if len(gkeys) == 0: + self.logger.debug("ACTIONS: installkey; now adding gkey: %s" % str(gkey)) + success = self.seeds.add(getattr(gkey, 'nick'), gkey) + if success: + success = self.seeds.save() + messages = ["Successfully added new seed."] + else: + messages = ["Matching seeds found in seeds file", + "Aborting... \nMatching seeds:", gkeys] + success = False + return (success, messages) + + + def removeseed(self, args): + '''Remove a key from the selected seed file(s)''' + gkeys = self.listseed(args)[1] + if not gkeys: + return (False, ["Failed to remove seed: No gkeys returned from listseed()", + []]) + if len(gkeys) == 1: + self.logger.debug("ACTIONS: removeseed; now deleting gkey: %s" % str(gkeys)) + success = self.seeds.delete(gkeys[0]) + if success: + success = self.seeds.save() + return (success, ["Successfully removed seed: %s" % str(success), + gkeys]) + elif len(gkeys): + return (False, ["Too many seeds found to remove", gkeys]) + return (False, ["Failed to remove seed:", args, + "No matching seed found"]) + + + def moveseed(self, args): + '''Move keys between seed files''' + handler = SeedHandler(self.logger) + searchkey = handler.new(args, needkeyid=False, checkintegrity=False) + self.logger.debug("ACTIONS: moveseed; gkey: %s" % str(searchkey)) + if not self.seeds: + self.seeds = self.load_seeds(args.category) + kwargs = handler.build_gkeydict(args) + sourcekeys = self.seeds.list(**kwargs) + dest = self.load_seeds(args.destination) + destkeys = dest.list(**kwargs) + messages = [] + if len(sourcekeys) == 1 and destkeys == []: + self.logger.debug("ACTIONS: moveseed; now adding destination gkey: %s" + % str(sourcekeys[0])) + success = dest.add(sourcekeys[0]) + self.logger.debug("ACTIONS: moveseed; success: %s" %str(success)) + self.logger.debug("ACTIONS: moveseed; now deleting sourcekey: %s" % str(sourcekeys[0])) + success = self.seeds.delete(sourcekeys[0]) + if success: + success = dest.save() + self.logger.debug("ACTIONS: moveseed; destination saved... %s" %str(success)) + success = self.seeds.save() + messages.extend(["Successfully Moved %s seed: %s" + % (args.category, str(success)), sourcekeys[0]]) + return (success, messages) + elif len(sourcekeys): + messages = ["Too many seeds found to move"] + messages.extend(sourcekeys) + return (False, messages) + messages.append("Failed to move seed:") + messages.append(searchkey) + messages.append('\n') + messages.append("Source seeds found...") + messages.extend(sourcekeys or ["None\n"]) + messages.append("Destination seeds found...") + messages.extend(destkeys or ["None\n"]) + return (False, messages) + + + def listkey(self, args): + '''Pretty-print the selected seed file or nick''' + # get confirmation + # fill in code here + if not args.category: + args.category = 'rel' + catdir = self.config.get_key(args.category + "-category") + self.logger.debug("ACTIONS: listkey; catdir = %s" % catdir) + self.gpg = GkeysGPG(self.config, catdir) + handler = SeedHandler(self.logger, self.config) + if args.keydir: + self.gpg.set_keydir(args.keydir, "list-keys") + self.gpg.set_keyseedfile() + seeds = self.gpg.seedfile + else: + seeds = handler.load_category(args.category) + results = {} + success = [] + messages = [] + if args.gpgsearch: + keyresults = seeds.seeds + # pick any key + key = keyresults[sorted(keyresults)[0]] + result = self.gpg.list_keys(key.keydir, args.gpgsearch) + # now split the results and reverse lookup the gkey + lines = result.output.split('\n') + while lines: + # determine the end of the first key listing + index = lines.index('') + keyinfo = lines[:index] + # trim off the first keys info + lines = lines[index + 1:] + # make sure it is a key listing + if len(keyinfo) < 2: + break + # get the fingerprint from the line + fpr = keyinfo[1].split('= ')[1] + # search for the matching gkey + kwargs = {'keydir': args.keydir, 'fingerprint': [fpr]} + keyresults = seeds.list(**kwargs) + # list the results + for key in sorted(keyresults): + ls, lr = self._list_it(key, '\n'.join(keyinfo)) + success.append(ls) + results[key.name] = lr + messages = ["Done."] + else: + kwargs = handler.build_gkeydict(args) + keyresults = seeds.list(**kwargs) + for key in sorted(keyresults): + result = self.gpg.list_keys(key.keydir, key.fingerprint) + ls, lr = self._list_it(key, result.output) + success.append(ls) + results[key.name] = lr + messages = ["Done."] + + if not messages: + messages = ['No results found meeting criteria', "Did you specify -n foo or -n '*'"] + return (False not in success, messages) + + + def _list_it(self, key, result, print_key=True): + self.logger.debug("ACTIONS: _list_it; listing key:" + str(key.nick)) + if self.config.options['print_results']: + if print_key: + print() + print("Nick.....:", key.nick) + print("Name.....:", key.name) + print("Keydir...:", key.keydir) + c = 0 + for line in result.split('\n'): + if c == 0: + print("Gpg info.:", line) + else: + print(" ", line) + c += 1 + self.logger.debug("data output:\n" + str(result)) + return (True, result) + + + def installkey(self, args): + '''Install a key from the seed(s)''' + self.logger.debug("ACTIONS: installkey; args: %s" % str(args)) + success, gkey = self.listseed(args)[1] + if gkey: + if gkey and not args.nick == '*' and self.output: + self.output(['', gkey], "\n Found GKEY seeds:") + elif gkey and self.output: + self.output(['all'], "\n Installing seeds:") + else: + self.logger.info("ACTIONS: installkey; " + "Matching seed entry not found") + if args.nick: + return (False, ["Search failed for: %s" % args.nick]) + elif args.name: + return (False, ["Search failed for: %s" % args.name]) + else: + return (False, ["Search failed for search term"]) + # get confirmation + # fill in code here + catdir = self.config.get_key(args.category + "-category") + self.logger.debug("ACTIONS: installkey; catdir = %s" % catdir) + self.gpg = GkeysGPG(self.config, catdir) + results = {} + failed = [] + for key in gkey: + self.logger.debug("ACTIONS: installkey; adding key:") + self.logger.debug("ACTIONS: " + str(key)) + results[key.name] = self.gpg.add_key(key) + for result in results[key.name]: + self.logger.debug("ACTIONS: installkey; result.failed = " + + str(result.failed)) + if self.config.options['print_results']: + for result in results[key.name]: + print("key desired:", key.name, ", key added:", + result.username, ", succeeded:", + not result.failed, ", fingerprint:", result.fingerprint) + self.logger.debug("stderr_out: " + str(result.stderr_out)) + if result.failed: + failed.append(key) + if failed and self.output: + self.output([failed], "\n Failed to install:") + if failed: + success = False + return (success, ["Completed"]) + return (success, ["No seeds to search or install"]) + + + def checkkey(self, args): + '''Check keys actions''' + if not args.category: + return (False, ["Please specify seeds type."]) + self.logger.debug("ACTIONS: checkkey; args: %s" % str(args)) + handler = SeedHandler(self.logger, self.config) + seeds = handler.load_category(args.category) + catdir = self.config.get_key(args.category + "-category") + self.logger.debug("ACTIONS: checkkey; catdir = %s" % catdir) + self.gpg = GkeysGPG(self.config, catdir) + results = {} + failed = defaultdict(list) + kwargs = handler.build_gkeydict(args) + keyresults = seeds.list(**kwargs) + self.output('', '\n Checking keys...') + for gkey in sorted(keyresults): + self.logger.info("Checking key %s, %s" % (gkey.nick, gkey.keyid)) + self.output('', + "\n %s, %s: %s" % (gkey.nick, gkey.name, ', '.join(gkey.keyid)) + + "\n ==============================================") + self.logger.debug("ACTIONS: checkkey; gkey = %s" % str(gkey)) + for key in gkey.keyid: + results[gkey.name] = self.gpg.check_keys(gkey.keydir, key) + if results[gkey.name].expired: + failed['expired'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key)) + if results[gkey.name].revoked: + failed['revoked'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key)) + if results[gkey.name].invalid: + failed['invalid'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key)) + if not results[gkey.name].sign: + failed['sign'].append("%s <%s>: %s " % (gkey.name, gkey.nick, key)) + if failed['expired']: + self.output([failed['expired']], '\n Expired keys:\n') + if failed['revoked']: + self.output([failed['revoked']], '\n Revoked keys:\n') + if failed['invalid']: + self.output([failed['invalid']], '\n Invalid keys:\n') + if failed['sign']: + self.output([failed['sign']], '\n No signing capable subkeys:\n') + return (len(failed) <1, + ['\nFound:\n-------', 'Expired: %d' % len(failed['expired']), + 'Revoked: %d' % len(failed['revoked']), + 'Invalid: %d' % len(failed['invalid']), + 'No signing capable subkeys: %d' % len(failed['sign']) + ]) + + + def speccheck(self, args): + '''Check keys actions''' + if not args.category: + return (False, ["Please specify seeds type."]) + self.logger.debug("ACTIONS: speccheck; args: %s" % str(args)) + handler = SeedHandler(self.logger, self.config) + seeds = handler.load_category(args.category) + catdir = self.config.get_key(args.category + "-category") + self.logger.debug("ACTIONS: speccheck; catdir = %s" % catdir) + self.gpg = GkeysGPG(self.config, catdir) + results = {} + failed = defaultdict(list) + kwargs = handler.build_gkeydict(args) + keyresults = seeds.list(**kwargs) + self.output('', '\n Checking keys...') + for gkey in sorted(keyresults): + self.logger.info("Checking key %s, %s" % (gkey.nick, gkey.keyid)) + self.output('', + "\n %s, %s: %s" % (gkey.nick, gkey.name, ', '.join(gkey.keyid)) + + "\n ==============================================") + self.logger.debug("ACTIONS: speccheck; gkey = %s" % str(gkey)) + for key in gkey.keyid: + results = self.gpg.speccheck(gkey.keydir, key) + for g in results: + pub_pass = {} + for key in results[g]: + self.output('', key.pretty_print()) + + if key.key is "PUB": + pub_pass = { + 'key': key, + 'pub': key.passed_spec, + 'sign': False, + 'encrypt': False, + 'auth': False, + 'signs': [], + 'encrypts': [], + 'authens': [], + 'final': False, + } + if key.key is "SUB": + if key.sign_capable and key.passed_spec: + pub_pass['signs'].append(key.passed_spec) + pub_pass['sign'] = True + if key.encrypt_capable: + pub_pass['encrypts'].append(key.passed_spec) + pub_pass['encrypt'] = True + if key.capabilities == 'a': + pub_pass['authens'].append(key.passed_spec) + if key.passed_spec: + pub_pass['auth'] = True + validity = key.validity.split(',')[0] + if not key.expire and not 'r' in validity: + failed['expired'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint)) + if 'r' in validity: + failed['revoked'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint)) + if 'i' in validity: + failed['invalid'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint)) + if key.capabilities not in ['a', 'e']: + if not key.algo: + failed['algo'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint)) + if not key.bits: + failed['bits'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint)) + if "Warning" in key.expire_reason: + failed['warn'].append("%s <%s>: %s " % (gkey.name, gkey.nick, key.fingerprint)) + if True in pub_pass['signs']: + pub_pass['sign'] = True + if True in pub_pass['encrypts']: + pub_pass['encrypt'] = True + if not pub_pass['sign']: + failed['sign'].append("%s <%s>: %s" % (gkey.name, gkey.nick, pub_pass['key'].fingerprint)) + if not pub_pass['encrypt']: + failed['encrypt'].append("%s <%s>: %s" % (gkey.name, gkey.nick, pub_pass['key'].fingerprint)) + spec = "%s <%s>: %s" % (gkey.name, gkey.nick, pub_pass['key'].fingerprint) + for k in ['pub', 'sign']: + if pub_pass[k]: + pub_pass['final'] = True + else: + pub_pass['final'] = False + break + if pub_pass['final']: + if spec not in failed['spec-approved']: + failed['spec-approved'].append(spec) + else: + if spec not in failed['spec']: + failed['spec'].append(spec) + sdata = convert_pf(pub_pass, ['pub', 'sign', 'final']) + sdata = convert_yn(sdata, ['auth', 'encrypt']) + self.output('', SPECCHECK_SUMMARY % sdata) + + if failed['revoked']: + self.output([sorted(set(failed['revoked']))], '\n Revoked keys:') + if failed['invalid']: + self.output([sorted(set(failed['invalid']))], '\n Invalid keys:') + if failed['sign']: + self.output([sorted(set(failed['sign']))], '\n No signing capable subkey:') + if failed['encrypt']: + self.output([sorted(set(failed['encrypt']))], '\n No Encryption capable subkey (Notice only):') + if failed['algo']: + self.output([sorted(set(failed['algo']))], '\n Incorrect Algorithm:') + if failed['bits']: + self.output([sorted(set(failed['bits']))], '\n Incorrect bit length:') + if failed['expired']: + self.output([sorted(set(failed['expired']))], '\n Expiry keys:') + if failed['warn']: + self.output([sorted(set(failed['warn']))], '\n Expiry Warnings:') + if failed['spec']: + self.output([sorted(set(failed['spec']))], '\n Failed to pass SPEC requirements:') + if failed['spec-approved']: + self.output([sorted(set(failed['spec-approved']))], '\n SPEC Approved:') + + return (len(failed) <1, + ['\nFound Failures:\n-------', + 'Revoked................: %d' % len(set(failed['revoked'])), + 'Invalid................: %d' % len(set(failed['invalid'])), + 'No Signing subkey......: %d' % len(set(failed['sign'])), + 'No Encryption subkey...: %d' % len(set(failed['encrypt'])), + 'Algorithm..............: %d' % len(set(failed['algo'])), + 'Bit length.............: %d' % len(set(failed['bits'])), + 'Expiry.................: %d' % len(set(failed['expired'])), + 'Expiry Warnings........: %d' % len(set(failed['warn'])), + 'SPEC requirements......: %d' % len(set(failed['spec'])), + '=============================', + 'SPEC Approved..........: %d' % len(set(failed['spec-approved'])), + ]) + + + def removekey(self, args): + '''Remove an installed key''' + if not args.nick: + return (False, ["Please provide a nickname or -n *"]) + handler = SeedHandler(self.logger, self.config) + kwargs = handler.build_gkeydict(args) + self.logger.debug("ACTIONS: addkey; kwargs: %s" % str(kwargs)) + success, installed_keys = self.installed(args)[1] + for gkey in installed_keys: + if kwargs['nick'] not in gkey.nick: + messages = ["%s does not seem to be a valid key." % kwargs['nick']] + success = False + else: + self.output(['', [gkey]], '\n Found GKEY seed:') + ans = raw_input("Do you really want to remove %s?[y/n]: " + % kwargs['nick']).lower() + while ans not in ["yes", "y", "no", "n"]: + ans = raw_input("Do you really want to remove %s?[y/n]: " + % kwargs['nick']).lower() + if ans in ["no", "n"]: + messages = ["Key removal aborted... Nothing to be done."] + else: + catdir = self.config.get_key(args.category + "-category") + rm_candidate = os.path.join(catdir, gkey.nick) + self.logger.debug("ACTIONS: removekey; catdir = %s" % catdir) + if args.category: + try: + rmtree(rm_candidate) + messages = ["Done removing %s key." % kwargs['nick']] + except OSError: + messages = ["%s directory does not exist." % rm_candidate] + success = False + return (success, messages) + + + def movekey(self, args): + '''Rename an installed key''' + return (False, []) + + + def importkey(self, args): + '''Add a specified key to a specified keyring''' + if args.category: + catdir = self.config.get_key(args.category + "-category") + keyring_dir = self.config.get_key("keyring") + self.logger.debug("ACTIONS: importkey; catdir = %s" % catdir) + self.gpg = GkeysGPG(self.config, catdir) + success, gkeys = self.listseed(args)[1] + results = {} + failed = [] + print("Importing specified keys to keyring.") + for gkey in gkeys: + self.logger.debug("ACTIONS: importkey; adding key: %s", gkey.name) + results[gkey.name] = self.gpg.add_key(gkey) + if self.config.options['print_results']: + for result in results[gkey.name]: + print("key desired:", gkey.name, ", key added:", + result.username, ", succeeded:", + not result.failed, ", fingerprint:", result.fingerprint) + self.logger.debug("stderr_out: " + str(result.stderr_out)) + if result.failed: + self.logger.debug("ACTIONS: importkey; result.failed = " + str(result.failed)) + failed.append(gkey) + if not results[gkey.name][0].failed: + print("Importing: ", gkey.name) + self.logger.debug("ACTIONS: importkey; importing key: %s", gkey.name) + keyring = os.path.join(keyring_dir,args.keyring + '.gpg') + self.gpg.add_to_keyring(gkey, catdir, keyring) + if failed and self.output: + self.output([failed], "\n Failed to install:") + if len(failed): + success = False + return (success, ["Completed."]) + return (False, ["No seeds to search or install", + "You must specify a category"]) + + + def installed(self, args): + '''Lists the installed key directories''' + if args.category: + catdir = self.config.get_key(args.category + "-category") + else: + return (False, ["Please specify a category."]) + self.logger.debug("ACTIONS: installed; catdir = %s" % catdir) + installed_keys = [] + try: + if args.nick: + keys = [args.nick] + else: + keys = os.listdir(catdir) + for key in keys: + seed_path = os.path.join(catdir, key) + gkey_path = os.path.join(seed_path, 'gkey.seeds') + seed = None + try: + with open(gkey_path, 'r') as fileseed: + seed = load(fileseed) + except IOError: + return ["No seed file found in %s." % gkey_path, ""] + if seed: + for val in list(seed.values()): + installed_keys.append(GKEY(**val)) + except OSError: + return (False, ["%s directory does not exist." % catdir, ""]) + return (True, ['Found Key(s):', installed_keys]) + + + def user_confirm(self, message): + '''Prompt a user to confirm an action + + @param message: string, user promt message to display + @return boolean: confirmation to proceed or abort + ''' + pass + + + def verify(self, args): + '''File verification action''' + connector_output = { + 'info': self.logger.debug, + 'error': self.logger.error, + 'kwargs-info': {}, + 'kwargs-error': {}, + } + if not args.filename: + return (False, ['Please provide a signed file.']) + if not args.category: + args.category = 'rel' + (success, data) = self.installed(args) + keys = data[1] + if not keys: + return (False, ['No installed keys found, try installkey action.']) + catdir = self.config.get_key(args.category + "-category") + self.logger.debug("ACTIONS: verify; catdir = %s" % catdir) + self.gpg = GkeysGPG(self.config, catdir) + filepath, signature = args.filename, args.signature + timestamp_path = None + isurl = success = verified = False + if filepath.startswith('http'): + isurl = True + url = filepath + filepath = args.destination + # a bit hackish, but save it to current directory + # with download file name + if not filepath: + filepath = url.split('/')[-1] + self.logger.debug("ACTIONS: verify; destination filepath was " + "not supplied, using current directory ./%s" % filepath) + if args.timestamp: + timestamp_path = filepath + ".timestamp" + if isurl: + from sslfetch.connections import Connector + connector_output = { + 'info': self.logger.info, + 'debug': self.logger.debug, + 'error': self.logger.error, + 'kwargs-info': {}, + 'kwargs-debug': {}, + 'kwargs-error': {}, + } + fetcher = Connector(connector_output, None, "Gentoo Keys") + self.logger.debug("ACTIONS: verify; fetching %s signed file " % filepath) + self.logger.debug("ACTIONS: verify; timestamp path: %s" % timestamp_path) + success, signedfile, timestamp = fetcher.fetch_file(url, filepath, timestamp_path) + else: + filepath = os.path.abspath(filepath) + self.logger.debug("ACTIONS: verify; local file %s" % filepath) + success = os.path.isfile(filepath) + if not success: + messages = ["File %s cannot be retrieved." % filepath] + else: + if not signature: + EXTENSIONS = ['.sig', '.asc', 'gpg','.gpgsig'] + success_fetch = False + for ext in EXTENSIONS: + sig_path = filepath + ext + if isurl: + signature = url + ext + self.logger.debug("ACTIONS: verify; fetching %s signature " % signature) + success_fetch, sig, timestamp = fetcher.fetch_file(signature, sig_path) + else: + signature = filepath + ext + signature = os.path.abspath(signature) + self.logger.debug("ACTIONS: verify; checking %s signature " % signature) + success_fetch = os.path.isfile(signature) + if success_fetch: + break + else: + sig_path = signature + messages = [] + self.logger.info("Verifying file...") + verified = False + # get correct key to use + use_gkey = self.config.get_key('seedurls', 'gkey') + for key in keys: + if key.nick == use_gkey: + break + results = self.gpg.verify_file(key, sig_path, filepath) + keyid = key.keyid[0] + (valid, trust) = results.verified + if valid: + verified = True + messages = ["Verification succeeded.: %s" % (filepath), + "Key info...............: %s <%s>, %s" + % ( key.name, key.nick,keyid)] + else: + messages = ["Verification failed.....:" % (filepath), + "Key info................: %s <%s>, %s" + % ( key.name, key.nick,keyid)] + return (verified, messages) + + + def listseedfiles(self, args): + '''List seed files found in the configured seed directory''' + seedsdir = self.config.get_key('seedsdir') + seedfile = [f for f in os.listdir(seedsdir) if f[-5:] == 'seeds'] + return (True, {"Seed files found at path: %s\n %s" + % (seedsdir, "\n ".join(seedfile)): True}) + + + def sign(self, args): + '''Sign a file''' + if not args.filename: + return (False, ['Please provide a file to sign.']) + + if isinstance(args.nick, str): + nicks = [args.nick] + else: + nicks = args.nick + # load our installed signing keys db + handler = SeedHandler(self.logger, self.config) + self.seeds = handler.load_category('sign', nicks) + if not self.seeds.seeds: + return (False, ['No installed keys, try installkey action.', '']) + basedir = self.config.get_key("sign-category") + keydir = self.config.get_key("sign", "keydir") + task = self.config.get_key("sign", "type") + keyring = self.config.get_key("sign", "keyring") + + self.config.options['gpg_defaults'] = ['--status-fd', '2'] + + self.logger.debug("ACTIONS: sign; keydir = %s" % keydir) + + self.gpg = GkeysGPG(self.config, basedir) + self.gpg.set_keydir(keydir, task) + if keyring not in ['', None]: + self.gpg.set_keyring(keyring, task) + msgs = [] + success = [] + for fname in args.filename: + results = self.gpg.sign(task, None, fname) + verified, trust = results.verified + if not results.verified[0]: + msgs.extend( + ['Failed Signature for %s verified: %s, trust: %s' + % (fname, verified, trust), 'GPG output:', "\n".join(results.stderr_out)] + ) + success.append(False) + else: + msgs.extend( + ['Signature result for: %s -- verified: %s, trust: %s' + % (fname, verified, trust)] #, 'GPG output:', "\n".join(results.stderr_out)] + ) + success.append(True) + return (False not in success, ['', msgs]) + + + def refreshkey(self, args): + '''Calls gpg with the --refresh-keys option + for in place updates of the installed keys''' + if not args.category: + return (False, ["Please specify seeds type."]) + self.logger.debug("ACTIONS: refreshkey; args: %s" % str(args)) + handler = SeedHandler(self.logger, self.config) + seeds = handler.load_category(args.category) + catdir = self.config.get_key(args.category + "-category") + self.logger.debug("ACTIONS: refreshkey; catdir = %s" % catdir) + self.gpg = GkeysGPG(self.config, catdir) + results = {} + kwargs = handler.build_gkeydict(args) + keyresults = seeds.list(**kwargs) + self.output('', '\n Refreshig keys...') + for gkey in sorted(keyresults): + self.logger.info("Refreshig key %s, %s" % (gkey.nick, gkey.keyid)) + self.output('', " %s: %s" % (gkey.name, ', '.join(gkey.keyid))) + #self.output('', " ===============") + self.logger.debug("ACTIONS: refreshkey; gkey = %s" % str(gkey)) + results[gkey.keydir] = self.gpg.refresh_key(gkey) + return (True, ['Completed']) + + + + diff --git a/gkeys/gkeys/base.py b/gkeys/gkeys/base.py new file mode 100644 index 0000000..95142f9 --- /dev/null +++ b/gkeys/gkeys/base.py @@ -0,0 +1,253 @@ +# +#-*- coding:utf-8 -*- + +""" + Gentoo-keys - base.py + + Command line interface argsparse options module + and common functions + + @copyright: 2012 by Brian Dolbec <dol-sen@gentoo.org> + @license: GNU GPL2, see COPYING for details. +""" + +from __future__ import print_function + + +import argparse + +from gkeys import config, fileops, seed, lib +from gkeys.log import log_levels, set_logger + + +class CliBase(object): + '''Common cli and argsparse options class''' + + + def __init__(self): + self.cli_config = { + 'Actions': [], + 'Available_Actions': [], + 'Action_Options': [], + 'prog': 'gkeys', + 'description': 'Gentoo-keys manager program', + 'epilog': '''Caution: adding untrusted keys to these keyrings can + be hazardous to your system!''' + } + self.config = None + self.args = None + self.seeds = None + self.actions = None + + + @staticmethod + def _option_dest(parser=None): + parser.add_argument('-d', '--dest', dest='destination', default=None, + help='The destination seed file or keydir for move, copy operations') + + @staticmethod + def _option_fingerprint(parser=None): + parser.add_argument('-f', '--fingerprint', dest='fingerprint', + default=None, nargs='+', + help='The fingerprint of the the key') + + @staticmethod + def _option_gpgsearch(parser=None): + parser.add_argument('-g', '--gpgsearch', dest='gpgsearch', default=None, + help='Do a gpg search operations, rather than a gkey search') + + @staticmethod + def _option_keyid(parser=None): + parser.add_argument('-i', '--keyid', dest='keyid', default=None, + nargs='+', + help='The long keyid of the gpg key to search for') + + @staticmethod + def _option_keyring(parser=None): + parser.add_argument('-k', '--keyring', dest='keyring', default='trusted_keyring', + help='The name of the keyring to use') + + @staticmethod + def _option_nick(parser=None): + parser.add_argument('-n', '--nick', dest='nick', default=None, + help='The nick associated with the the key') + + @staticmethod + def _option_name(parser=None): + parser.add_argument('-N', '--name', dest='name', nargs='*', + default=None, help='The name of the the key') + + @staticmethod + def _option_category(parser=None): + parser.add_argument('-C', '--category', + choices=['rel', 'dev', 'overlays', 'sign'], dest='category', default=None, + help='The key or seed directory category to use or update') + + @staticmethod + def _option_cleankey(parser=None): + parser.add_argument('--clean-key', + dest='cleankey', default=False, + help='Clean the key from the keyring due to failures') + + @staticmethod + def _option_cleanseed(parser=None): + parser.add_argument('--clean-seed', + dest='cleanseed', default=False, + help='Clean the seed from the seedfile due to failures. ' + 'Used during binary keyring release creation.') + + @staticmethod + def _option_keydir(parser=None): + parser.add_argument('-r', '--keydir', dest='keydir', default=None, + help='The keydir to use, update or search for/in') + + @staticmethod + def _option_seedfile(parser=None): + parser.add_argument('-S', '--seedfile', dest='seedfile', default=None, + help='The seedfile to use from the gkeys.conf file') + + @staticmethod + def _option_file(parser=None): + parser.add_argument('-F', '--file', dest='filename', default=None, + nargs='+', + help='The path/URL to use for the (signed) file') + + @staticmethod + def _option_1file(parser=None): + parser.add_argument('-F', '--file', dest='filename', default=None, + help='The path/URL to use for the (signed) file') + + @staticmethod + def _option_signature(parser=None): + parser.add_argument('-s','--signature', dest='signature', default=None, + help='The path/URL to use for the signature') + + @staticmethod + def _option_timestamp(parser=None): + parser.add_argument('-t', '--timestamp', dest='timestamp', type=bool, + default=False, + help='Turn on timestamp use') + + @staticmethod + def _option_mail(parser=None): + parser.add_argument('-m', '--mail', dest='mail', default=None, + help='The email address to search for') + + @staticmethod + def _option_status(parser=None): + parser.add_argument('-A', '--status', default=False, + help='The active status of the member') + + + def parse_args(self, args): + '''Parse a list of aruments + + @param args: list + @returns argparse.Namespace object + ''' + #logger.debug('CliBase: parse_args; args: %s' % args) + parser = argparse.ArgumentParser( + prog=self.cli_config['prog'], + description=self.cli_config['description'], + epilog=self.cli_config['epilog']) + + # options + parser.add_argument('-c', '--config', dest='config', default=None, + help='The path to an alternate config file') + parser.add_argument('-D', '--debug', default='DEBUG', + choices=list(log_levels), + help='The logging level to set for the logfile') + + + subparsers = parser.add_subparsers(help='actions') + for name in self.cli_config['Available_Actions']: + action_method = getattr(self.cli_config['Actions'], name) + actiondoc = action_method.__doc__ + try: + text = actiondoc.splitlines()[0] + except AttributeError: + text = "" + action_parser = subparsers.add_parser( + name, + help=text, + description=actiondoc, + formatter_class=argparse.RawDescriptionHelpFormatter) + action_parser.set_defaults(action=name) + self._add_options(action_parser, self.cli_config['Action_Options'][name]) + + return parser.parse_args(args) + + + def _add_options(self, parser, options): + for opt in options: + getattr(self, '_option_%s' % opt)(parser) + + + def run(self, args): + '''Run the args passed in + + @param args: list or argparse.Namespace object + ''' + global logger + message = None + if not args: + message = "Main: run; invalid args argument passed in" + if isinstance(args, list): + args = self.parse_args(args) + if args.config: + self.config.defaults['config'] = args.config + # now make it load the config file + self.config.read_config() + + # establish our logger and update it in the imported files + logger = set_logger('gkeys', self.config['logdir'], args.debug, + dirmode=int(self.config.get_key('permissions', 'directories'),0), + filemask=int(self.config.get_key('permissions', 'files'),0)) + config.logger = logger + fileops.logger = logger + seed.logger = logger + lib.logger = logger + + if message: + logger.error(message) + + # now that we have a logger, record the alternate config setting + if args.config: + logger.debug("Main: run; Found alternate config request: %s" + % args.config) + + # establish our actions instance + self.actions = self.cli_config['Actions'](self.config, self.output_results, logger) + + # run the action + func = getattr(self.actions, '%s' % args.action) + logger.debug('Main: run; Found action: %s' % args.action) + success, results = func(args) + if not results: + print("No results found. Check your configuration and that the", + "seed file exists.") + return success + if self.config.options['print_results'] and 'done' not in list(results): + self.output_results(results, '\n Gkey task results:') + return success + + + @staticmethod + def output_results(results, header): + # super simple output for the time being + if header: + print(header) + for msg in results: + if isinstance(msg, str) or isinstance(msg, unicode): + print(' ', msg) + else: + try: + print("\n".join([x.pretty_print for x in msg])) + except AttributeError: + for x in msg: + print(' ', x) + print() + + + def output_failed(self, failed): + pass diff --git a/gkeys/gkeys/checks.py b/gkeys/gkeys/checks.py new file mode 100644 index 0000000..db3d59f --- /dev/null +++ b/gkeys/gkeys/checks.py @@ -0,0 +1,437 @@ +# +#-*- coding:utf-8 -*- + +""" + Gentoo-Keys - gkeygen/checks.py + + Primary key checks module + @copyright: 2014 by Brian Dolbec <dolsen@gentoo.org> + @license: GNU GPL2, see COPYING for details +""" + +import time +from collections import namedtuple, OrderedDict + +from gkeys.config import GKEY_CHECK + +from pyGPG.mappings import (ALGORITHM_CODES, CAPABILITY_MAP, + KEY_VERSION_FPR_LEN, VALIDITY_MAP, INVALID_LIST, + VALID_LIST) + + +SPEC_INDEX = { + 'key': 0, + 'capabilities': 1, + 'fingerprint': 2, + 'bits': 3, + 'created': 4, + 'expire': 5, + 'encrypt_capable': 6, + 'sign_capable': 7, + 'algo': 8, + 'version': 9, + 'id': 10, + 'days': 11, + 'validity': 12, + 'expire_reason': 13, + 'long_caps': 14, # long version of the capbilities + 'caps': 15, + 'caps_reason': 16, + 'id_reason': 17, + 'is_valid': 18, + 'passed_spec': 19, +} + +SPEC_INDEX = OrderedDict(sorted(SPEC_INDEX.items(), key=lambda t: t[1])) + +SPEC_STAT = ['', '','', False, False, False, False, False, False, False, False, + 0, '', '', '', True, '', '', False, False] + +# Default glep 63 minimum gpg key specification +# and approved options, limits +TEST_SPEC = { + 'bits': { + 'DSA': 2048, + 'RSA': 2048, + }, + 'expire': 3 * 365, # in days + 'subkeys': { # warning/error mode + 'encrypt': { + 'mode': 'notice', + 'expire': 3 * 365, + }, + 'sign': { + 'mode': 'error', + 'expire': 365, + }, + }, + 'algorithms': ['DSA', 'RSA', '1', '2', '3', '17'], + 'versions': ['4'], + 'qualified_id': '@gentoo.org', +} + +# Final pass/fail fields and the pass value required +TEST_REQUIREMENTS = { + 'bits': True, + 'created': True, + 'expire': True, + 'sign_capable': True, + 'algo': True, + 'version': True, + 'id': True, + 'is_valid': True, + 'caps': True, +} + +SECONDS_PER_DAY = 86400 + + +SPECCHECK_STRING = ''' ---------- + Fingerprint......: %(fingerprint)s + Key type ........: %(key)s Capabilities.: %(capabilities)s %(long_caps)s + Algorithm........: %(algo)s Bit Length...: %(bits)s + Create Date......: %(created)s Expire Date..: %(expire)s + Key Version......: %(version)s Validity.....: %(validity)s + Days till expiry.: %(days)s %(expire_reason)s + Capability.......: %(caps)s %(caps_reason)s + Qualified ID.....: %(id)s %(id_reason)s + This %(pub_sub)s.: %(passed_spec)s''' + +SPECCHECK_SUMMARY = ''' Key summary + primary..........: %(pub)s signing subkey: %(sign)s + encryption subkey: %(encrypt)s authentication subkey: %(auth)s + SPEC requirements: %(final)s +''' + +def convert_pf(data, fields): + '''Converts dictionary items from True/False to Pass/Fail strings + + @param data: dict + @param fields: list + @returns: dict + ''' + for f in fields: + if data[f]: + data[f] = 'Pass' + else: + data[f] = 'Fail' + return data + +def convert_yn(data, fields): + '''Converts dictionary items from True/False to Yes/No strings + + @param data: dict + @param fields: list + @returns: dict + ''' + for f in fields: + if data[f]: + data[f] = 'Yes ' + else: + data[f] = 'No ' + return data + + +class SpecCheck(namedtuple("SpecKey", list(SPEC_INDEX))): + + __slots__ = () + + def pretty_print(self): + data = self.convert_data() + output = SPECCHECK_STRING % (data) + return output + + + def convert_data(self): + data = dict(self._asdict()) + data = convert_pf(data, ['algo', 'bits', 'caps', 'created', 'expire', 'id', + 'passed_spec', 'version']) + for f in ['caps', 'id']: + data[f] = data[f].ljust(10) + data['validity'] += ', %s' % (VALIDITY_MAP[data['validity']]) + days = data['days'] + if days == float("inf"): + data['days'] = "infinite".ljust(10) + else: + data['days'] = str(int(data['days'])).ljust(10) + if data['capabilities'] == 'e': + data['algo'] = '----' + data['bits'] = '----' + if data['key'] =='PUB': + data['pub_sub'] = 'primary key' + else: + data['pub_sub'] = 'subkey.....' + return data + + +class KeyChecks(object): + '''Primary gpg key validation and specifications checks class''' + + def __init__(self, logger, spec=TEST_SPEC, qualified_id_check=True): + '''@param spec: optional gpg specification to test against + Defaults to TEST_SPEC + + ''' + self.logger = logger + self.spec = spec + self.check_id = qualified_id_check + + + def validity_checks(self, keydir, keyid, result): + '''Check the specified result based on the seed type + + @param keydir: the keydir to list the keys for + @param keyid: the keyid to check + @param result: pyGPG.output.GPGResult object + @returns: GKEY_CHECK instance + ''' + revoked = expired = invalid = sign = False + for data in result.status.data: + if data.name == "PUB": + if data.long_keyid == keyid[2:]: + # check if revoked + if 'r' in data.validity: + revoked = True + self.logger.debug("ERROR in key %s : revoked" % data.long_keyid) + break + # if primary key expired, all subkeys expire + if 'e' in data.validity: + expired = True + self.logger.debug("ERROR in key %s : expired" % data.long_keyid) + break + # check if invalid + if 'i' in data.validity: + invalid = True + self.logger.debug("ERROR in key %s : invalid" % data.long_keyid) + break + if 's' in data.key_capabilities: + sign = True + self.logger.debug("INFO primary key %s : key signing capabilities" % data.long_keyid) + if data.name == "SUB": + # check if invalid + if 'i' in data.validity: + self.logger.debug("WARNING in subkey %s : invalid" % data.long_keyid) + continue + # check if expired + if 'e' in data.validity: + self.logger.debug("WARNING in subkey %s : expired" % data.long_keyid) + continue + # check if revoked + if 'r' in data.validity: + self.logger.debug("WARNING in subkey %s : revoked" % data.long_keyid) + continue + # check if subkey has signing capabilities + if 's' in data.key_capabilities: + sign = True + self.logger.debug("INFO subkey %s : subkey signing capabilities" % data.long_keyid) + return GKEY_CHECK(keyid, revoked, expired, invalid, sign) + + + def spec_check(self, keydir, keyid, result): + '''Performs the minimum specifications checks on the key''' + self.logger.debug("SPEC_CHECK() : CHECKING: %s" % keyid) + results = {} + pub = None + stats = None + pub_days = 0 + for data in result.status.data: + if data.name == "PUB": + if stats: + stats = self._test_final(data, stats) + results[pub.long_keyid].append(SpecCheck._make(stats)) + pub = data + found_id = False + found_id_reason = '' + results[data.long_keyid] = [] + stats = SPEC_STAT[:] + stats[SPEC_INDEX['key']] = data.name + stats[SPEC_INDEX['capabilities']] = data.key_capabilities + stats[SPEC_INDEX['validity']] = data.validity + stats = self._test_created(data, stats) + stats = self._test_algo(data, stats) + stats = self._test_bits(data, stats) + stats = self._test_expire(data, stats, pub_days) + pub_days = stats[SPEC_INDEX['days']] + stats = self._test_caps(data, stats) + stats = self._test_validity(data, stats) + elif data.name == "FPR": + pub = pub._replace(**{'fingerprint': data.fingerprint}) + stats[SPEC_INDEX['fingerprint']] = data.fingerprint + stats = self._test_version(data, stats) + elif data.name == "UID": + stats = self._test_uid(data, stats) + if stats[SPEC_INDEX['id']] in [True, '-----']: + found_id = stats[SPEC_INDEX['id']] + found_id_reason = '' + stats[SPEC_INDEX['id_reason']] = '' + else: + found_id_reason = stats[SPEC_INDEX['id_reason']] + elif data.name == "SUB": + if stats: + stats = self._test_final(data, stats) + results[pub.long_keyid].append(SpecCheck._make(stats)) + stats = SPEC_STAT[:] + stats[SPEC_INDEX['key']] = data.name + stats[SPEC_INDEX['capabilities']] = data.key_capabilities + stats[SPEC_INDEX['fingerprint']] = '%s' \ + % (data.long_keyid) + stats[SPEC_INDEX['id']] = found_id + stats[SPEC_INDEX['id_reason']] = found_id_reason + stats[SPEC_INDEX['validity']] = data.validity + stats = self._test_validity(data, stats) + stats = self._test_created(data, stats) + stats = self._test_algo(data, stats) + stats = self._test_bits(data, stats) + stats = self._test_expire(data, stats, pub_days) + stats = self._test_caps(data, stats) + if stats: + stats = self._test_final(data, stats) + results[pub.long_keyid].append(SpecCheck._make(stats)) + stats = None + self.logger.debug("SPEC_CHECK() : COMPLETED: %s" % keyid) + return results + + + def _test_algo(self, data, stats): + algo = data.pubkey_algo + if algo in TEST_SPEC['algorithms']: + stats[SPEC_INDEX['algo']] = True + else: + self.logger.debug("ERROR in key %s : invalid Type: %s" + % (data.long_keyid, ALGORITHM_CODES[algo])) + return stats + + + def _test_bits(self, data, stats): + bits = int(data.keylength) + if data.pubkey_algo in TEST_SPEC['algorithms']: + if bits >= TEST_SPEC['bits'][ALGORITHM_CODES[data.pubkey_algo]]: + stats[SPEC_INDEX['bits']] = True + else: + self.logger.debug("ERROR in key %s : invalid Bit length: %d" + % (data.long_keyid, bits)) + return stats + + + def _test_version(self, data, stats): + fpr_l = len(data.fingerprint) + if KEY_VERSION_FPR_LEN[fpr_l] in TEST_SPEC['versions']: + stats[SPEC_INDEX['version']] = True + else: + self.logger.debug("ERROR in key %s : invalid gpg key version: %s" + % (data.long_keyid, KEY_VERSION_FPR_LEN[fpr_l])) + return stats + + + def _test_created(self, data, stats): + try: + created = float(data.creation_date) + except ValueError: + created = 0 + if created <= time.time() : + stats[SPEC_INDEX['created']] = True + else: + self.logger.debug("ERROR in key %s : invalid gpg key creation date: %s" + % (data.long_keyid, data.creation_date)) + return stats + + + def _test_expire(self, data, stats, pub_days): + if data.name in ["PUB"]: + delta_t = TEST_SPEC['expire'] + stats = self._expire_check(data, stats, delta_t, pub_days) + return stats + else: + for cap in data.key_capabilities: + try: + delta_t = TEST_SPEC['subkeys'][CAPABILITY_MAP[cap]]['expire'] + except KeyError: + self.logger.debug( + "WARNING in capability key %s : setting delta_t to main expiry: %d" + % (cap, TEST_SPEC['expire'])) + delta_t = TEST_SPEC['expire'] + stats = self._expire_check(data, stats, delta_t, pub_days) + return stats + + + def _expire_check(self, data, stats, delta_t, pub_days): + today = time.time() + try: + expires = float(data.expiredate) + except ValueError: + expires = float("inf") + if data.name == 'SUB' and expires == float("inf"): + days = stats[SPEC_INDEX['days']] = pub_days + elif expires == float("inf"): + days = stats[SPEC_INDEX['days']] = expires + else: + days = stats[SPEC_INDEX['days']] = max(0, int((expires - today)/SECONDS_PER_DAY)) + if days <= delta_t: + stats[SPEC_INDEX['expire']] = True + elif days > delta_t and not ('i' in data.validity or 'r' in data.validity): + stats[SPEC_INDEX['expire_reason']] = '<== Exceeds specification' + else: + self.logger.debug("ERROR in key %s : invalid gpg key expire date: %s" + % (data.long_keyid, data.expiredate)) + if 0 < days < 30 and not ('i' in data.validity or 'r' in data.validity): + stats[SPEC_INDEX['expire_reason']] = '<== WARNING < 30 days' + + return stats + + + def _test_caps(self, data, stats): + if 'e' in data.key_capabilities: + if 's' in data.key_capabilities or 'a' in data.key_capabilities: + stats[SPEC_INDEX['caps']] = False + stats[SPEC_INDEX['caps_reason']] = "<== Mixing of 'e' with 's' and/or 'a'" + if not stats[SPEC_INDEX['is_valid']]: + return stats + kcaps = [] + for cap in data.key_capabilities: + if CAPABILITY_MAP[cap] and stats[SPEC_INDEX['caps']]: + kcaps.append(CAPABILITY_MAP[cap]) + if cap in ["s"] and not data.name == "PUB": + stats[SPEC_INDEX['sign_capable']] = True + elif cap in ["e"]: + stats[SPEC_INDEX['encrypt_capable']] = True + elif cap not in CAPABILITY_MAP: + stats[SPEC_INDEX['caps']] = False + self.logger.debug("ERROR in key %s : unknown gpg key capability: %s" + % (data.long_keyid, cap)) + stats[SPEC_INDEX['long_caps']] = ', '.join(kcaps) + return stats + + + def _test_uid(self, data, stats): + if not self.check_id: + stats[SPEC_INDEX['id']] = '-----' + stats[SPEC_INDEX['id_reason']] = '' + return stats + if TEST_SPEC['qualified_id'] in data.user_ID : + stats[SPEC_INDEX['id']] = True + stats[SPEC_INDEX['id_reason']] = '' + else: + stats[SPEC_INDEX['id_reason']] = "<== '%s' user id not found" % TEST_SPEC['qualified_id'] + self.logger.debug("Warning: No qualified ID found in key %s" + % (data.user_ID)) + return stats + + + def _test_validity(self, data, stats): + if data.validity in VALID_LIST: + stats[SPEC_INDEX['is_valid']] = True + return stats + + + def _test_final(self, data, stats): + for test, result in TEST_REQUIREMENTS.items(): + if ((stats[SPEC_INDEX['key']] == 'PUB' and test == 'sign_capable') or + (stats[SPEC_INDEX['capabilities']] == 'e' and test in ['algo', 'bits', 'sign_capable']) + or (stats[SPEC_INDEX['capabilities']] == 'a' and test in ['sign_capable'])): + continue + if stats[SPEC_INDEX[test]] == result: + stats[SPEC_INDEX['passed_spec']] = True + else: + stats[SPEC_INDEX['passed_spec']] = False + break + return stats diff --git a/gkeys/gkeys/cli.py b/gkeys/gkeys/cli.py new file mode 100644 index 0000000..32d2ec4 --- /dev/null +++ b/gkeys/gkeys/cli.py @@ -0,0 +1,58 @@ +# +#-*- coding:utf-8 -*- + +""" + Gentoo-keys - cli.py + + Command line interface module + + @copyright: 2012 by Brian Dolbec <dol-sen@gentoo.org> + @license: GNU GPL2, see COPYING for details. +""" + +from __future__ import print_function + + +import sys + +from gkeys.base import CliBase +from gkeys.actions import Actions, Available_Actions, Action_Options +from gkeys.config import GKeysConfig + + + +class Main(CliBase): + '''Main command line interface class''' + + + def __init__(self, root=None, config=None, print_results=True): + """ Main class init function. + + @param root: string, root path to use + """ + CliBase.__init__(self) + self.root = root or "/" + self.config = config or GKeysConfig(root=root) + self.config.options['print_results'] = print_results + self.cli_config = { + 'Actions': Actions, + 'Available_Actions': Available_Actions, + 'Action_Options': Action_Options, + 'prog': 'gkeys', + 'description': 'Gentoo-keys manager program', + 'epilog': '''Caution: adding untrusted keys to these keyrings can + be hazardous to your system!''' + } + + + def __call__(self, args=None): + """Main class call function + + @param args: Optional list of argumanets to parse and action to run + Defaults to sys.argv[1:] + """ + if args: + return self.run(self.parse_args(args)) + else: + return self.run(self.parse_args(sys.argv[1:])) + diff --git a/gkeys/gkeys/config.py b/gkeys/gkeys/config.py new file mode 100644 index 0000000..775ea1f --- /dev/null +++ b/gkeys/gkeys/config.py @@ -0,0 +1,181 @@ +# +#-*- coding:utf-8 -*- + +""" + Gentoo-keys - config.py + + Holds configuration keys and values + + @copyright: 2012 by Brian Dolbec <dol-sen@gentoo.org> + @license: GNU GNU GPL2, see COPYING for details. +""" + +import os +import sys + +# py3.2 +if sys.hexversion >= 0x30200f0: + import configparser as ConfigParser +else: + import ConfigParser + +from collections import namedtuple + + +from pyGPG.config import GPGConfig + +from gkeys import log +from gkeys.utils import path + +logger = log.logger + + +# establish the eprefix, initially set so eprefixify can +# set it on install +EPREFIX = "@GENTOO_PORTAGE_EPREFIX@" + +# check and set it if it wasn't +if "GENTOO_PORTAGE_EPREFIX" in EPREFIX: + EPREFIX = '' + +GKEY_STRING = ''' ---------- + Name.........: %(name)s + Nick.........: %(nick)s + Keydir.......: %(keydir)s +''' + +GKEY_FINGERPRINTS = \ +''' Keyid........: %(keyid)s + Fingerprint: %(fingerprint)s +''' + +MAPSEEDS = { 'dev' : 'gentoodevs.seeds', 'rel': 'gentoo.seeds' } + + +class GKeysConfig(GPGConfig): + """ Configuration superclass which holds our gentoo-keys + config settings for pygpg """ + + def __init__(self, config=None, root=None, read_configfile=False): + """ Class initialiser """ + GPGConfig.__init__(self) + + self.root = root or '' + if config: + self.defaults['config'] = config + self.defaults['configdir'] = os.path.dirname(config) + else: + homedir = os.path.expanduser('~') + self.defaults['configdir'] = homedir + self.defaults['config']= os.path.join(homedir, '.gkeys.conf') + if not os.path.exists(self.defaults['config']): + self.defaults['configdir'] = path([self.root, EPREFIX, '/etc/gkeys']) + self.defaults['config'] = '%(configdir)s/gkeys.conf' + self.configparser = None + self._add_gkey_defaults() + if read_configfile: + self.read_config() + + + def _add_gkey_defaults(self): + self.defaults['gkeysdir'] = path([self.root, EPREFIX, '/var/lib/gentoo/gkeys']) + self.defaults['dev-keydir'] = '%(gkeysdir)s/devs' + self.defaults['rel-keydir'] = '%(gkeysdir)s/release' + self.defaults['keyring'] = '%(gkeysdir)s/keyring' + self.defaults['overlays-keydir'] = '%(gkeysdir)s/overlays' + self.defaults['sign-keydir'] = '%(gkeysdir)s/sign', + self.defaults['logdir'] = '/var/log/gkeys' + # local directory to scan for seed files installed via ebuild, layman + # or manual install. + self.defaults['seedsdir'] = '%(gkeysdir)s/seeds' + self.defaults['seeds'] = { + 'gentoo': '%(seedsdir)s/gentoo.seeds', + 'gentoodevs': '%(seedsdir)s/gentoodevs.seeds', + } + self.defaults['keyserver'] = 'pool.sks-keyservers.net' + # NOTE: files is umask mode in octal, directories is chmod mode in octal + self.defaults['permissions'] = {'files': '0o002', 'directories': '0o775',} + self.defaults['seedurls'] = { + 'gentoo': 'https://api.gentoo.org/gentoo-keys/seeds/gentoo.seeds', + 'gentoodevs': 'https://api.gentoo.org/gentoo-keys/seeds/gentoodevs.seeds', + 'gkey': 'gkeys', + } + self.defaults['sign'] = { + 'key': 'fingerprint', + 'keydir': '~/.gkeys', + 'keyring': None, + 'type': 'clearsign', + } + + + def read_config(self): + '''Reads the config file into memory + ''' + if "%(configdir)s" in self.defaults['config']: + # fix the config path + self.defaults['config'] = self.defaults['config'] \ + % {'configdir': self.defaults['configdir']} + defaults = self.get_defaults() + # remove some defaults from being entered into the configparser + for key in ['gpg_defaults', 'only_usable', 'refetch', 'tasks']: + defaults.pop(key) + self.configparser = ConfigParser.ConfigParser(defaults) + self.configparser.read(defaults['config']) + + + def get_key(self, key, subkey=None): + return self._get_(key, subkey) + + + def _get_(self, key, subkey=None): + if subkey: + if self.configparser and self.configparser.has_option(key, subkey): + if logger: + logger.debug("Found %s in configparser... %s" + % (key, str(self.configparser.get(key, subkey)))) + return self._sub_(self.configparser.get(key, subkey)) + #print("CONFIG: key, subkey", key, subkey) + if key in self.options and subkey in self.options[key]: + return self._sub_(self.options[key][subkey]) + elif key in self.defaults and subkey in self.defaults[key]: + return self._sub_(self.defaults[key][subkey]) + else: + return super(GKeysConfig, self)._get_(key, subkey) + elif self.configparser and self.configparser.has_option('DEFAULT', key): + if logger: + logger.debug("Found %s in configparser... %s" + % (key, str(self.configparser.get('DEFAULT', key)))) + #logger.debug("type(key)= %s" + # % str(type(self.configparser.get('DEFAULT', key)))) + return self.configparser.get('DEFAULT', key) + else: + return super(GKeysConfig, self)._get_(key, subkey) + + +class GKEY(namedtuple('GKEY', ['nick', 'name', 'keydir', 'fingerprint'])): + '''Class to hold the relavent info about a key''' + + field_types = {'nick': str, 'name': str, 'keydir': str, 'fingerprint': list} + __slots__ = () + + + @property + def keyid(self): + '''Keyid is a substring value of the fingerprint''' + return ['0x' + x[-16:] for x in self.fingerprint] + + + @property + def pretty_print(self): + '''Pretty printing a GKEY''' + gkey = {'name': self.name, 'nick': self.nick, 'keydir': self.keydir} + output = GKEY_STRING % gkey + for f in self.fingerprint: + fingerprint = {'fingerprint': f, 'keyid': '0x' + f[-16:]} + output += GKEY_FINGERPRINTS % fingerprint + return output + + +class GKEY_CHECK(namedtuple('GKEY_CHECK', ['keyid', 'revoked', 'expired', 'invalid', 'sign'])): + + __slots__ = () diff --git a/gkeys/gkeys/fileops.py b/gkeys/gkeys/fileops.py new file mode 100644 index 0000000..7cb244f --- /dev/null +++ b/gkeys/gkeys/fileops.py @@ -0,0 +1,56 @@ +import os +from snakeoil.osutils import (ensure_dirs as snakeoil_ensure_dirs) + + +def ensure_dirs(path, gid=-1, uid=-1, mode=0o700, minimal=True, failback=None, fatal=False): + '''Wrapper to snakeoil.osutil's ensure_dirs() + This additionally allows for failures to run + cleanup or other code and/or raise fatal errors. + + @param path: directory to ensure exists on disk + @param gid: a valid GID to set any created directories to + @param uid: a valid UID to set any created directories to + @param mode: permissions to set any created directories to + @param minimal: boolean controlling whether or not the specified mode + must be enforced, or is the minimal permissions necessary. For example, + if mode=0755, minimal=True, and a directory exists with mode 0707, + this will restore the missing group perms resulting in 757. + @param failback: function to run in the event of a failed attemp + to create the directory. + @return: True if the directory could be created/ensured to have those + permissions, False if not. + ''' + succeeded = snakeoil_ensure_dirs(path, gid=-1, uid=-1, mode=mode, minimal=True) + if not succeeded: + if failback: + failback() + if fatal: + raise IOError( + "Failed to create directory: %s" % path) + return succeeded + + +def updatefiles(config, logger): + filename = config['dev-seedfile'] + old = filename + '.old' + try: + logger.info("Backing up existing file...") + if os.path.exists(old): + logger.debug( + "MAIN: _action_updatefile; Removing 'old' seed file: %s" + % old) + os.unlink(old) + if os.path.exists(filename): + logger.debug( + "MAIN: _action_updatefile; Renaming current seed file to: " + "%s" % old) + os.rename(filename, old) + if os.path.exists(filename + '.new'): + logger.debug( + "MAIN: _action_updatefile; Renaming '.new' seed file to: %s" + % filename) + os.rename(filename + '.new', filename) + except IOError: + raise + return False + return True diff --git a/gkeys/gkeys/lib.py b/gkeys/gkeys/lib.py new file mode 100644 index 0000000..50ed63e --- /dev/null +++ b/gkeys/gkeys/lib.py @@ -0,0 +1,345 @@ +# +#-*- coding:utf-8 -*- + +'''Gentoo-keys - lib.py +This is gentoo-keys superclass which wraps the pyGPG lib +with gentoo-keys specific convienience functions. + + Distributed under the terms of the GNU General Public License v2 + + Copyright: + (c) 2011 Brian Dolbec + Distributed under the terms of the GNU General Public License v2 + + Author(s): + Brian Dolbec <dolsen@gentoo.org> + +''' + +# for py 2.6 compatibility +from __future__ import print_function + + +from os.path import abspath, pardir +from os.path import join as pjoin + +from pyGPG.gpg import GPG +from gkeys.checks import KeyChecks +from gkeys.fileops import ensure_dirs +from gkeys.log import logger +from gkeys.seed import Seeds + +class GkeysGPG(GPG): + '''Gentoo-keys primary gpg class''' + + + def __init__(self, config, basedir): + '''class init function + + @param config: GKeysConfig config instance to use + @param keydir: string, the path to the keydir to be used + for all operations. + ''' + GPG.__init__(self, config, logger) + self.config = config + self.basedir = basedir + self.keydir = None + self.server = None + + + def set_keyserver(self, server=None): + '''Set the keyserver and add the --keyserver option to the gpg defaults + ''' + if self.server and not server: + return + self.server = server or self.config['keyserver'] + self.config.options['gpg_defaults'] = self.config.defaults['gpg_defaults'][:] + logger.debug("keyserver: %s" % (self.server)) + server_value = ['--keyserver', self.server] + self.config.options['gpg_defaults'].extend(server_value) + logger.debug("self.config.options['gpg_defaults']: %s" + % (self.config.options['gpg_defaults'])) + return + + + def set_keyring(self, keyring, task, importkey=False, reset=True): + '''Sets the keyring to use as well as related task options + ''' + logger.debug("keydir: %s, keyring: %s" % (self.keydir, keyring)) + if reset: + self.config.options['tasks'][task] = self.config.defaults['tasks'][task][:] + # --keyring file | Note that this adds a keyring to the current list. + # If the intent is to use the specified keyring alone, + # use --keyring along with --no-default-keyring. + if importkey: + task_value = ['--import-options', 'import-clean'] + self.config.options['tasks'][task].extend(task_value) + parent_dir = abspath(pjoin(keyring, pardir)) + ensure_dirs(parent_dir, + mode=int(self.config.get_key('permissions', 'directories'),0)) + task_value = ['--no-default-keyring', '--keyring', keyring] + self.config.options['tasks'][task].extend(task_value) + logger.debug("set_keyring: New task options: %s" %str(self.config.options['tasks'][task])) + return + + + def set_keydir(self, keydir, task, fingerprint=True, reset=True): + logger.debug("basedir: %s, keydir: %s" % (self.basedir, keydir)) + self.keydir = pjoin(self.basedir, keydir) + self.task = task + if reset: + self.config.options['tasks'][task] = self.config.defaults['tasks'][task][:] + task_value = [] + if fingerprint: + task_value.append('--fingerprint') + task_value.extend(['--homedir', self.keydir]) + self.config.options['tasks'][task].extend(task_value) + logger.debug("set_keydir: New task options: %s" %str(self.config.options['tasks'][task])) + return + + + def add_to_keyring(self, gkey, keydir, keyring): + '''Add the specified key to the specified keyring + + @param gkey: GKEY namedtuple with + (name, keyid/longkeyid, keydir, fingerprint) + @param keydir: path with the specified keydir + @param keyring: string with the specified keyring + ''' + self.set_keydir(keydir, 'import', reset=True) + self.set_keyring(keyring, 'import', importkey=True, reset=False) + results = [] + logger.debug("LIB: import_to_keyring; name: " + gkey.name) + logger.debug("** Calling runGPG with Running: gpg %s --import' for: %s" + % (' '.join(self.config.get_key('tasks', 'import')), + gkey.name)) + pubring_path = pjoin(self.keydir, gkey.keydir, 'pubring.gpg') + result = self.runGPG(task='import', inputfile=pubring_path) + logger.info('GPG return code: ' + str(result.returncode)) + results.append(result) + print(result.stderr_out) + return results + + + def add_key(self, gkey): + '''Add the specified key to the specified keydir + + @param gkey: GKEY namedtuple with + (name, nick, keydir, fingerprint) + ''' + self.config.defaults['gpg_defaults'].append('--no-permission-warning') + self.set_keyserver() + self.set_keydir(gkey.keydir, 'recv-keys', reset=True) + self.set_keyring('pubring.gpg', 'recv-keys', reset=False) + logger.debug("LIB: add_key; ensure dirs: " + self.keydir) + ensure_dirs(str(self.keydir)) + self.set_keyseedfile(trap_errors=False) + results = [] + for fingerprint in gkey.fingerprint: + logger.debug("LIB: add_key; adding fingerprint " + fingerprint) + logger.debug("** Calling runGPG with Running 'gpg %s --recv-keys %s' for: %s" + % (' '.join(self.config.get_key('tasks', 'recv-keys')), + fingerprint, gkey.name)) + result = self.runGPG(task='recv-keys', inputfile=fingerprint) + logger.info('GPG return code: ' + str(result.returncode)) + if result.fingerprint in gkey.fingerprint: + result.failed = False + message = "Fingerprints match... Import successful: " + message += "%s, fingerprint: %s" % (gkey.nick, fingerprint) + message += "\n result len: %s, %s" % (len(result.fingerprint), result.fingerprint) + message += "\n gkey len: %s, %s" % (len(gkey.fingerprint[0]), gkey.fingerprint[0]) + logger.info(message) + else: + result.failed = True + message = "Fingerprints do not match... Import failed for " + message += "%s, fingerprint: %s" % (gkey.nick, fingerprint) + message += "\n result: %s" % (result.fingerprint) + message += "\n gkey..: %s" % (str(gkey.fingerprint)) + logger.error(message) + # Save the gkey seed to the installed db + self.seedfile.update(gkey) + if not self.seedfile.save(): + logger.error("GkeysGPG.add_key(); failed to save seed: " + gkey.nick) + return [] + results.append(result) + return results + + + def del_key(self, gkey, keydir): + '''Delete the specified key in the specified keydir + + @param gkey: GKEY namedtuple with (name, nick, keydir, fingerprint) + ''' + return [] + + + def del_keydir(self, keydir): + '''Delete the specified keydir + ''' + return [] + + + def refresh_key(self, gkey): + '''Refresh the specified key in the specified keydir + + @param key: tuple of (name, nick, keydir, fingerprint) + @param keydir: the keydir to add the key to + ''' + self.config.defaults['gpg_defaults'].append('--no-permission-warning') + self.set_keyserver() + self.set_keydir(gkey.keydir, 'refresh-keys', reset=True) + self.set_keyring('pubring.gpg', 'refresh-keys', reset=False) + logger.debug("LIB: refresh_key, gkey: %s" % str(gkey)) + logger.debug("** Calling runGPG with Running 'gpg %s --refresh-keys' for: %s" + % (' '.join(self.config.get_key('tasks', 'refresh-keys')), str(gkey))) + result = self.runGPG(task='refresh-keys', inputfile='') + logger.info('GPG return code: ' + str(result.returncode)) + return result + + + def update_key(self, gkey, keydir): + '''Update the specified key in the specified keydir + + @param key: tuple of (name, nick, keydir, fingerprint) + @param keydir: the keydir to add the key to + ''' + return [] + + + def list_keys(self, keydir, fingerprint=None, colons=False): + '''List all keys in the specified keydir or + all keys in all keydir if keydir=None + + @param keydir: the keydir to list the keys for + @param colons: bool to enable colon listing + ''' + if not keydir: + logger.debug("LIB: list_keys(), invalid keydir parameter: %s" + % str(keydir)) + return [] + if fingerprint: + task = 'list-key' + target = fingerprint + else: + task = 'list-keys' + target = keydir + self.set_keydir(keydir, task, fingerprint=True) + self.config.options['tasks'][task].extend(['--keyid-format', 'long', '--with-fingerprint']) + if colons: + task_value = ['--with-colons'] + self.config.options['tasks'][task].extend(task_value) + logger.debug("** Calling runGPG with Running 'gpg %s --%s %s'" + % (' '.join(self.config['tasks'][task]), task, keydir) + ) + result = self.runGPG(task=task, inputfile=target) + logger.info('GPG return code: ' + str(result.returncode)) + return result + + + def check_keys(self, keydir, keyid, result=None): + '''Check specified or all keys based on the seed type + + @param keydir: the keydir to list the keys for + @param keyid: the keyid to check + @param result: optional pyGPG.output.GPGResult object + @returns: GKEY_CHECK instance + ''' + if not result: + result = self.list_keys(keydir, fingerprint=keyid, colons=True) + checker = KeyChecks(logger, qualified_id_check=True) + return checker.validity_checks(keydir, keyid, result) + + + def speccheck(self, keydir, keyid, result=None): + '''Check specified or all keys based on the seed type + specifications are met. + + @param keydir: the keydir to list the keys for + @param keyid: the keyid to check + @param result: optional pyGPG.output.GPGResult object + @returns: SpecCheck instance + ''' + if not result: + result = self.list_keys(keydir, fingerprint=keyid, colons=True) + checker = KeyChecks(logger, qualified_id_check=True) + specchecks = checker.spec_check(keydir, keyid, result) + return specchecks + + + def list_keydirs(self): + '''List all available keydirs + ''' + return [] + + + def verify_key(self, gkey): + '''Verify the specified key from the specified keydir + + @param gkey: GKEY namedtuple with (name, keyid/longkeyid, fingerprint) + ''' + pass + + + def verify_text(self, text): + '''Verify a text block in memory + ''' + pass + + + def verify_file(self, gkey, signature, filepath): + '''Verify the file specified at filepath or url + + @param gkey: GKEY instance of the gpg key used to verify it + @param signature: string with the signature file + @param filepath: string with the path or url of the signed file + ''' + if signature: + self.set_keydir(gkey.keydir, 'verify', reset=True) + logger.debug("** Calling runGPG with Running 'gpg %s --verify %s and %s'" + % (' '.join(self.config['tasks']['verify']), signature, filepath)) + results = self.runGPG(task='verify', inputfile=[signature,filepath]) + else: + self.set_keydir(gkey.keydir, 'decrypt', reset=True) + logger.debug("** Calling runGPG with Running 'gpg %s --decrypt %s and %s'" + % (' '.join(self.config['tasks']['decrypt']), filepath)) + results = self.runGPG(task='decrypt', inputfile=filepath) + keyid = gkey.keyid[0] + if results.verified[0]: + logger.info("GPG verification succeeded. Name: %s / Key: %s" % (gkey.name, keyid)) + logger.info("\tSignature result:" + str(results.verified)) + else: + logger.debug("GPG verification failed. Name: %s / Key: %s" % (gkey.name, keyid)) + logger.debug("\t Signature result:"+ str(results.verified)) + logger.debug("LIB: verify; stderr_out:" + str(results.stderr_out)) + return results + + + def set_keyseedfile(self, trap_errors): + if not self.keydir: + logger.debug("GkeysGPG.set_keyseedfile(); self.keydir error") + self.seedfile = Seeds(pjoin(self.keydir, 'gkey.seeds'), self.config) + self.seedfile.load(trap_errors=trap_errors) + + + def sign_file(self, gkey, mode, fingerprint, filepath): + '''Verify the file specified at filepath or url + + @param gkey: GKEY instance + @param mode: string of the signing task to use + @param fingerprint: string of the fingerprint to sign with + @param filepath: string with the path of the file to sign + ''' + keyid = gkey.keyid[0] + self.set_keydir(gkey.keydir, mode, reset=True) + logger.debug("** Calling runGPG with Running 'gpg %s --%s %s %s'" + % (' '.join(self.config['tasks'][mode]), mode, fingerprint, filepath)) + results = self.runGPG(task=mode, inputfile=filepath) + + if results.verified[0]: + logger.info("GPG signing succeeded. Name: %s / Key: %s" % (str(gkey.name), str(keyid))) + logger.info("\tSignature result:" + str(results.verified)) + else: + logger.debug("GPG signing failed. Name: %s / Key: %s" % (str(gkey.name), str(keyid))) + logger.debug("\t Signature result:"+ str(results.verified)) + logger.debug("LIB: sign; stderr_out:" + str(results.stderr_out)) + return results diff --git a/gkeys/gkeys/log.py b/gkeys/gkeys/log.py new file mode 100644 index 0000000..a16767e --- /dev/null +++ b/gkeys/gkeys/log.py @@ -0,0 +1,72 @@ +# +#-*- coding:utf-8 -*- + +""" + Gentoo-Keys - Log.py + + Logging module, placeholder for our site-wide logging module + + @copyright: 2012 by Brian Dolbec <dol-sen> <dol-sen@users.sourceforge.net> + @license: GNU GPL2, see COPYING for details. +""" + +import logging +import time +import os + +from gkeys.fileops import ensure_dirs + + +NAMESPACE = 'gentoo-keys' +logger = None +Console_handler = None + +log_levels = { + 'CRITICAL': logging.CRITICAL, + 'DEBUG': logging.DEBUG, + 'ERROR': logging.ERROR, + 'FATAL': logging.FATAL, + 'INFO': logging.INFO, + 'NOTSET': logging.NOTSET, + 'WARN': logging.WARN, + 'WARNING': logging.WARNING, +} + + +def set_logger(namespace=None, logpath='', level=None, + dirmode=0o775, filemask=0o002): + global logger, NAMESPACE, Console_handler + if not namespace: + namespace = NAMESPACE + else: + NAMESPACE = namespace + logger = logging.getLogger(namespace) + logger.setLevel(log_levels['DEBUG']) + # create formatter and add it to the handlers + log_format = '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' + formatter = logging.Formatter(log_format) + # add the handlers to logger + if logpath: + ensure_dirs(logpath, mode=dirmode, fatal=True) + os.umask(filemask) + logname = os.path.join(logpath, + '%s-%s.log' % (namespace, time.strftime('%Y%m%d-%H:%M'))) + file_handler = logging.FileHandler(logname) + if level: + #print "Setting cli log level", level, log_levels[level] + file_handler.setLevel(log_levels[level]) + else: + #print "Create file handler which logs even debug messages" + file_handler.setLevel(log_levels['DEBUG']) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + # create console handler with a higher log level + Console_handler = logging.StreamHandler() + Console_handler.setLevel(logging.ERROR) + #Console_handler.setFormatter(formatter) + logger.addHandler(Console_handler) + #print "File logger suppose to be initialized", logger, Console_handler + logger.debug("Loggers initialized") + + return logger diff --git a/gkeys/gkeys/seed.py b/gkeys/gkeys/seed.py new file mode 100644 index 0000000..f0cb019 --- /dev/null +++ b/gkeys/gkeys/seed.py @@ -0,0 +1,192 @@ +# +#-*- coding:utf-8 -*- + +'''Gentoo-keys - seed.py +This is gentoo-keys superclass which wraps the pyGPG lib +with gentoo-keys specific convienience functions. + + Distributed under the terms of the GNU General Public License v2 + + Copyright: + (c) 2011 Brian Dolbec + Distributed under the terms of the GNU General Public License v2 + + Author(s): + Brian Dolbec <dolsen@gentoo.org> + +''' + +import json +import os + +from gkeys.log import logger +from gkeys.config import GKEY +from gkeys.fileops import ensure_dirs + + +class Seeds(object): + '''Handles all seed key file operations''' + + + def __init__(self, filepath=None, config=None): + '''Seeds class init function + + @param filepath: string of the file to load + ''' + self.filename = filepath + self.config = config + self.seeds = {} + + + def load(self, filename=None, trap_errors=True): + '''Load the seed file into memory''' + if filename: + self.filename = filename + if not self.filename: + logger.debug("Seed: load; Not a valid filename: '%s'" % str(self.filename)) + return False + logger.debug("Seeds: load; Begin loading seed file %s" % self.filename) + seedlines = None + self.seeds = {} + try: + with open(self.filename, "r+") as seedfile: + seedlines = json.load(seedfile) + except IOError as err: + logger.debug("Seed: load; IOError occurred while loading file") + if trap_errors: + self._error(err) + return False + for seed in list(seedlines.items()): + #try: + self.seeds[seed[0]] = GKEY(**seed[1]) + #except Exception as err: + #logger.debug("Seed: load; Error splitting seed: %s" % seed) + #logger.debug("Seed: load; ...............parts: %s" % str(parts)) + #self._error(err) + logger.debug("Seed: load; Completed loading seed file %s" % self.filename) + return True + + + def save(self, filename=None): + '''Save the seeds to the file''' + if filename: + self.filename = filename + if not self.filename: + logger.debug("Seed: save; Not a valid filename: '%s'" % str(self.filename)) + return False + logger.debug("Seed: save; Begin saving seed file %s" % self.filename) + ensure_dirs(os.path.split(self.filename)[0], + mode=int(self.config.get_key('permissions', "directories"),0), + fatal=True) + os.umask(int(self.config.get_key("permissions", "files"),0)) + try: + with open(self.filename, 'w') as seedfile: + seedfile.write(self._seeds2json(self.seeds)) + seedfile.write("\n") + except IOError as err: + self._error(err) + return False + return True + + + def add(self, dev, gkey): + '''Add a new seed key to memory''' + if isinstance(gkey, dict) or isinstance(gkey, GKEY): + self.seeds[dev] = gkey + return True + return False + + + def delete(self, gkey=None): + '''Delete the key from the seeds in memory + + @param gkey: GKEY, the matching GKEY to delete + ''' + if gkey: + if isinstance(gkey, dict): + nick = gkey['nick'] + elif isinstance(gkey, GKEY): + nick = gkey.nick + try: + self.seeds.pop(nick, None) + except ValueError: + return False + return True + + + def list(self, **kwargs): + '''List the key or keys matching the kwargs argument or all + + @param kwargs: dict of GKEY._fields and values + @returns list + ''' + if not kwargs or ('nick' in kwargs and kwargs['nick'] == '*'): + return sorted(self.seeds.values()) + # proceed with the search + # discard any invalid keys + keys = kwargs + result = self.seeds + for key in keys: + if key in ['fingerprint', 'keyid']: + kwargs[key] = [x.replace(' ', '').upper() for x in kwargs[key]] + if key in ['fingerprint']: + result = {dev: gkey for dev, gkey in list(result.items()) if kwargs[key][0] in getattr(gkey, key)} + elif key in ['keyid']: + searchids = [x.lstrip('0X') for x in kwargs[key]] + res = {} + for dev, gkey in list(result.items()): + keyids = [x.lstrip("0x") for x in getattr(gkey, key)] + for keyid in searchids: + if keyid in keyids: + res[dev] = gkey + break + result = res + else: + result = {dev: gkey for dev, gkey in list(result.items()) if kwargs[key].lower() in getattr(gkey, key).lower()} + return sorted(result.values()) + + + def search(self, pattern): + '''Search for the keys matching the regular expression pattern''' + pass + + + def nick_search(self, nick): + '''Searches the seeds for a matching nick + + @param nick: string + @returns GKEY instance or None + ''' + try: + return self.seeds[nick] + except KeyError: + return None + + + def _error(self, err): + '''Class error logging function''' + logger.error("Seed: Error processing seed file %s" % self.filename) + logger.error("Seed: Error was: %s" % str(err)) + + + def _seeds2json(self, seeds): + is_gkey = False + if not seeds: + seeds = {} + elif isinstance(list(seeds.values())[0], GKEY): + is_gkey = True + for dev, value in list(seeds.items()): + if is_gkey: + seeds[dev] = dict(value._asdict()) + return json.dumps(seeds, sort_keys=True, indent=4) + + + def update(self, gkey): + '''Looks for existance of a matching nick already in the seedfile + if it exists. Then either adds or replaces the gkey + @param gkey: GKEY instance + ''' + oldkey = self.nick_search(gkey.nick) + if oldkey: + self.delete(oldkey) + self.add(gkey.nick, gkey) diff --git a/gkeys/gkeys/seedhandler.py b/gkeys/gkeys/seedhandler.py new file mode 100644 index 0000000..cc797b9 --- /dev/null +++ b/gkeys/gkeys/seedhandler.py @@ -0,0 +1,189 @@ +# +#-*- coding:utf-8 -*- + +""" + Gentoo-keys - seedhandler.py + + Seed handling interface module + + @copyright: 2012 by Brian Dolbec <dol-sen@gentoo.org> + @license: GNU GPL2, see COPYING for details. +""" + +import os +import re +from json import load + +from gkeys.config import GKEY +from gkeys.seed import Seeds +from gkeys.fileops import ensure_dirs + + +class SeedHandler(object): + + def __init__(self, logger, config): + self.config = config + self.logger = logger + self.fingerprint_re = re.compile('[0-9A-Fa-f]{40}') + self.finerprint_re2 = re.compile('[0-9A-Fa-f]{4}( [0-9A-Fa-f]{4}){9}') + + + def new(self, args, checkgkey=False): + newgkey = self.build_gkeydict(args) + if checkgkey: + newgkey, is_good = self.check_gkey(newgkey) + if is_good: + newgkey = GKEY(**newgkey) + self.logger.debug("SeedHandler: new; new gkey: %s" % str(newgkey)) + else: + return None + else: + newgkey = GKEY(**newgkey) + self.logger.debug("SeedHandler: new; NON-checked new gkey: %s" % str(newgkey)) + return newgkey + + + @staticmethod + def build_gkeydict(args): + keyinfo = {} + for attr in GKEY._fields + ('keyid',): + try: + value = getattr(args, attr) + if attr == 'name' and value: + value = " ".join(value) + if value: + keyinfo[attr] = value + except AttributeError: + pass + return keyinfo + + def load_seeds(self, seedfile=None, filepath=None): + '''Load seed file + + @param seeds: string of the short name seed file + @param seedfile: string filepath of the file to load + @return Seeds class instance of the file loaded + ''' + if not seedfile and not filepath: + self.logger.error("SeedHandler: load_seeds; no filename to load: " + "setting = %s. Please use the -S or -F option to indicate: which seed " + "file to use." % seedfile) + return False + if seedfile: + filepath = self.config.get_key('seeds', seedfile) + elif not filepath: + self.logger.error("SeedHandler: load_seeds; No filepath to load") + self.logger.debug("SeedHandler: load_seeds; seeds filepath to load: " + "%s" % filepath) + seeds = Seeds(config=self.config) + seeds.load(filepath) + return seeds + + def load_category(self, category, nicks=None): + '''Loads the designated key directories + + @param category: string + @param nicks: list of string nick ids to load + @return Seeds class object + ''' + seeds = Seeds(config=self.config) + if category: + catdir = self.config.get_key(category + "-category") + else: + self.logger.debug("SeedHandler: load_category; Error invalid category: %s." % (str(category))) + return seeds + self.logger.debug("SeedHandler: load_category; catdir = %s" % catdir) + try: + if not nicks: + nicks = os.listdir(catdir) + for nick in nicks: + seed_path = os.path.join(catdir, nick) + gkey_path = os.path.join(seed_path, 'gkey.seeds') + seed = None + try: + with open(gkey_path, 'r') as fileseed: + seed = load(fileseed) + except IOError as error: + self.logger.debug("SeedHandler: load_category; IOError loading seed file %s." % gkey_path) + self.logger.debug("Error was: %s" % str(error)) + if seed: + for nick in sorted(seed): + key = seed[nick] + seeds.add(nick, GKEY(**key)) + except OSError as error: + self.logger.debug("SeedHandler: load_category; OSError for %s" % catdir) + self.logger.debug("Error was: %s" % str(error)) + return seeds + + def fetch_seeds(self, seeds, args, verified_dl=None): + '''Fetch new seed files + + @param seeds: list of seed nicks to download + @param verified_dl: Function pointer to the Actions.verify() + instance needed to do the download and verification + ''' + http_check = re.compile(r'^(http|https)://') + urls = [] + messages = [] + try: + for seed in [seeds]: + seedurl = self.config.get_key('seedurls', seed) + seedpath = self.config.get_key('seeds', seed) + if http_check.match(seedurl): + urls.extend([(seedurl, seedpath)]) + else: + self.logger.info("Wrong seed file URLs... Switching to default URLs.") + urls.extend([(self.config['seedurls'][seed], seedpath)]) + except KeyError: + pass + succeeded = [] + seedsdir = self.config.get_key('seedsdir') + mode = int(self.config.get_key('permissions', 'directories'),0) + ensure_dirs(seedsdir, mode=mode) + for (url, filepath) in urls: + args.category = 'rel' + args.filename = url + args.signature = None + args.timestamp = True + args.destination = filepath + verified, messages_ = verified_dl(args) + succeeded.append(verified) + messages.append(messages_) + return (succeeded, messages) + + def check_gkey(self, args): + # assume it's good until an error is found + is_good = True + try: + args['keydir'] = args.get('keydir', args['nick']) + fprs = [] + if args['fingerprint']: + for fpr in args['fingerprint']: + is_good, fingerprint = self._check_fingerprint_integrity(fpr) + if is_good: + fprs.append(fingerprint) + else: + self.logger.error('Bad fingerprint from command line args: %s' % fpr) + if is_good: + args['fingerprint'] = fprs + except KeyError: + self.logger.error('GPG fingerprint not found.') + is_good = False + if not is_good: + self.logger.error('A valid fingerprint ' + 'was not found for %s' % args['name']) + return args, is_good + + def _check_fingerprint_integrity(self, fpr): + # assume it's good unti an error is found + is_good = True + fingerprint = fpr.replace(" ", "") + # check fingerprint integrity + if len(fingerprint) != 40: + self.logger.error(' GPGKey incorrect fingerprint ' + + 'length (%s) for fingerprint: %s' % (len(fingerprint), fingerprint)) + is_good = False + if not self.fingerprint_re.match(fingerprint): + self.logger.error(' GPGKey: Non hexadecimal digits in ' + 'fingerprint for fingerprint: ' + fingerprint) + is_good = False + return is_good, fingerprint diff --git a/gkeys/gkeys/utils.py b/gkeys/gkeys/utils.py new file mode 100644 index 0000000..92abc50 --- /dev/null +++ b/gkeys/gkeys/utils.py @@ -0,0 +1,161 @@ +# +# -*- coding: utf-8 -*- + +'''# File: utils.py + + Utilities to deal with things... + copied/edited from app-portage/layman's xml.py + + + Copyright: + (c) 2005 - 2008 Gunnar Wrobel + (c) 2009 Sebastian Pipping + (c) 2009 Christian Groschupp + Distributed under the terms of the GNU General Public License v2 + + Author(s): + Gunnar Wrobel <wrobel@gentoo.org> + Sebastian Pipping <sebastian@pipping.org> + Christian Groschupp <christian@groschupp.org> + +Utility functions''' + + + +import types +import re +import os +import sys +import locale +import codecs + +try: + StringTypes = types.StringTypes +except AttributeError: + StringTypes = [str] + + +def encoder(text, _encoding_): + return codecs.encode(text, _encoding_, 'replace') + + +def decode_selection(selection): + '''utility function to decode a list of strings + accoring to the filesystem encoding + ''' + # fix None passed in, return an empty list + selection = selection or [] + enc = sys.getfilesystemencoding() + if enc is not None: + return [encoder(i, enc) for i in selection] + return selection + + +def get_encoding(output): + if hasattr(output, 'encoding') \ + and output.encoding != None: + return output.encoding + else: + encoding = locale.getpreferredencoding() + # Make sure that python knows the encoding. Bug 350156 + try: + # We don't care about what is returned, we just want to + # verify that we can find a codec. + codecs.lookup(encoding) + except LookupError: + # Python does not know the encoding, so use utf-8. + encoding = 'utf_8' + return encoding + + +def pad(string, length): + '''Pad a string with spaces.''' + if len(string) <= length: + return string + ' ' * (length - len(string)) + else: + return string[:length - 3] + '...' + + +def terminal_width(): + '''Determine width of terminal window.''' + try: + width = int(os.environ['COLUMNS']) + if width > 0: + return width + except: + pass + try: + import struct, fcntl, termios + query = struct.pack('HHHH', 0, 0, 0, 0) + response = fcntl.ioctl(1, termios.TIOCGWINSZ, query) + width = struct.unpack('HHHH', response)[1] + if width > 0: + return width + except: + pass + return 80 + + +# From <http://effbot.org/zone/element-lib.htm> +# BEGIN +def indent(elem, level=0): + i = "\n" + level*" " + if len(elem): + if not elem.text or not elem.text.strip(): + elem.text = i + " " + if not elem.tail or not elem.tail.strip(): + elem.tail = i + for elem in elem: + indent(elem, level+1) + if not elem.tail or not elem.tail.strip(): + elem.tail = i + else: + if level and (not elem.tail or not elem.tail.strip()): + elem.tail = i +# END + +def path(path_elements): + ''' + Concatenate a path from several elements. + + >>> path([]) + '' + >>> path(['a']) + 'a' + >>> path(['a','b']) + 'a/b' + >>> path(['a/','b']) + 'a/b' + >>> path(['/a/','b']) + '/a/b' + >>> path(['/a/','b/']) + '/a/b' + >>> path(['/a/','b/']) + '/a/b' + >>> path(['/a/','/b/']) + '/a/b' + >>> path(['/a/','/b','c/']) + '/a/b/c' + ''' + pathname = '' + + if type(path_elements) in StringTypes: + path_elements = [path_elements] + + # Concatenate elements and seperate with / + for i in path_elements: + pathname += i + '/' + + # Replace multiple consecutive slashes + pathname = re.compile('/+').sub('/', pathname) + + # Remove the final / if there is one + if pathname and pathname[-1] == '/': + pathname = pathname[:-1] + + return pathname + + +if __name__ == '__main__': + import doctest + doctest.testmod(sys.modules[__name__]) |