aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'gkeys/gkeys')
-rw-r--r--gkeys/gkeys/__init__.py5
-rw-r--r--gkeys/gkeys/actions.py767
-rw-r--r--gkeys/gkeys/base.py253
-rw-r--r--gkeys/gkeys/checks.py437
-rw-r--r--gkeys/gkeys/cli.py58
-rw-r--r--gkeys/gkeys/config.py181
-rw-r--r--gkeys/gkeys/fileops.py56
-rw-r--r--gkeys/gkeys/lib.py345
-rw-r--r--gkeys/gkeys/log.py72
-rw-r--r--gkeys/gkeys/seed.py192
-rw-r--r--gkeys/gkeys/seedhandler.py189
-rw-r--r--gkeys/gkeys/utils.py161
12 files changed, 2716 insertions, 0 deletions
diff --git a/gkeys/gkeys/__init__.py b/gkeys/gkeys/__init__.py
new file mode 100644
index 0000000..7e8b64e
--- /dev/null
+++ b/gkeys/gkeys/__init__.py
@@ -0,0 +1,5 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+__version__ = 'Git'
+__license__ = 'GPLv2'
diff --git a/gkeys/gkeys/actions.py b/gkeys/gkeys/actions.py
new file mode 100644
index 0000000..a224372
--- /dev/null
+++ b/gkeys/gkeys/actions.py
@@ -0,0 +1,767 @@
+#
+#-*- coding:utf-8 -*-
+
+"""
+ Gentoo-keys - actions.py
+
+ Primary api interface module
+
+ @copyright: 2012 by Brian Dolbec <dol-sen@gentoo.org>
+ @license: GNU GPL2, see COPYING for details.
+"""
+
+from __future__ import print_function
+
+import os
+
+from collections import defaultdict
+from json import load
+from shutil import rmtree
+from sslfetch.connections import Connector
+
+from gkeys.lib import GkeysGPG
+from gkeys.seedhandler import SeedHandler
+from gkeys.config import GKEY
+from gkeys.checks import SPECCHECK_SUMMARY, convert_pf, convert_yn
+
+Available_Actions = ['listseed', 'addseed', 'removeseed', 'moveseed', 'fetchseed',
+ 'listseedfiles', 'listkey', 'installkey', 'removekey', 'movekey',
+ 'installed', 'importkey', 'verify', 'checkkey', 'sign', 'speccheck']
+ 'refreshkey']
+
+Action_Options = {
+ 'listseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile', '1file'],
+ 'addseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile'],
+ 'removeseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile'],
+ 'moveseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile', 'dest'],
+ 'fetchseed': ['nick', 'name', 'keydir', 'fingerprint', 'seedfile'],
+ 'listseedfiles': [],
+ 'listkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'gpgsearch', 'keyid'],
+ 'installkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'seedfile', '1file'],
+ 'removekey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring'],
+ 'movekey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'dest'],
+ 'installed': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring'],
+ 'importkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring'],
+ 'verify': ['dest', 'nick', 'name', 'keydir', 'fingerprint', 'category', '1file', 'signature', 'keyring', 'timestamp'],
+ 'checkkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'keyid'],
+ 'sign': ['nick', 'name', 'keydir', 'fingerprint', 'file', 'keyring'],
+ 'speccheck': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'keyid'],
+ 'refreshkey': ['nick', 'name', 'keydir', 'fingerprint', 'category', 'keyring', 'keyid'],
+}
+
+
+class Actions(object):
+ '''Primary API actions'''
+
+ def __init__(self, config, output=None, logger=None):
+ self.config = config
+ self.output = output
+ self.logger = logger
+ self.seeds = None
+
+
+ def listseed(self, args):
+ '''Pretty-print the selected seed file(s)'''
+ handler = SeedHandler(self.logger, self.config)
+ kwargs = handler.build_gkeydict(args)
+ self.logger.debug("ACTIONS: listseed; kwargs: %s" % str(kwargs))
+ if not self.seeds:
+ try:
+ self.seeds = handler.load_seeds(args.seedfile, args.filename)
+ except ValueError:
+ return (False, ["Failed to load seed file. Consider fetching seedfiles."])
+ if self.seeds:
+ results = self.seeds.list(**kwargs)
+ else:
+ results = ''
+ return (True, ['', results])
+
+
+ def fetchseed(self, args):
+ '''Download the selected seed file(s)'''
+ self.logger.debug("ACTIONS: fetchseed; args: %s" % str(args))
+ handler = SeedHandler(self.logger, self.config)
+ success, messages = handler.fetch_seeds(args.seedfile, args, self.verify)
+
+ messages.append("")
+ messages.append("Fetch operation completed")
+ return (False not in success, messages)
+
+
+ def addseed(self, args):
+ '''Add or replace a key in the selected seed file(s)'''
+ handler = SeedHandler(self.logger, self.config)
+ gkeys = self.listseed(args)[1]
+ if not args.nick or not args.name or not args.fingerprint:
+ return (False, ["Provide a nickname, a name and a fingerprint."])
+ gkey = handler.new(args, checkgkey=True)
+ if not gkey:
+ return (False, ["Failed to create a valid GKEY instance.",
+ "Check for invalid data entries"])
+ if len(gkeys) == 0:
+ self.logger.debug("ACTIONS: installkey; now adding gkey: %s" % str(gkey))
+ success = self.seeds.add(getattr(gkey, 'nick'), gkey)
+ if success:
+ success = self.seeds.save()
+ messages = ["Successfully added new seed."]
+ else:
+ messages = ["Matching seeds found in seeds file",
+ "Aborting... \nMatching seeds:", gkeys]
+ success = False
+ return (success, messages)
+
+
+ def removeseed(self, args):
+ '''Remove a key from the selected seed file(s)'''
+ gkeys = self.listseed(args)[1]
+ if not gkeys:
+ return (False, ["Failed to remove seed: No gkeys returned from listseed()",
+ []])
+ if len(gkeys) == 1:
+ self.logger.debug("ACTIONS: removeseed; now deleting gkey: %s" % str(gkeys))
+ success = self.seeds.delete(gkeys[0])
+ if success:
+ success = self.seeds.save()
+ return (success, ["Successfully removed seed: %s" % str(success),
+ gkeys])
+ elif len(gkeys):
+ return (False, ["Too many seeds found to remove", gkeys])
+ return (False, ["Failed to remove seed:", args,
+ "No matching seed found"])
+
+
+ def moveseed(self, args):
+ '''Move keys between seed files'''
+ handler = SeedHandler(self.logger)
+ searchkey = handler.new(args, needkeyid=False, checkintegrity=False)
+ self.logger.debug("ACTIONS: moveseed; gkey: %s" % str(searchkey))
+ if not self.seeds:
+ self.seeds = self.load_seeds(args.category)
+ kwargs = handler.build_gkeydict(args)
+ sourcekeys = self.seeds.list(**kwargs)
+ dest = self.load_seeds(args.destination)
+ destkeys = dest.list(**kwargs)
+ messages = []
+ if len(sourcekeys) == 1 and destkeys == []:
+ self.logger.debug("ACTIONS: moveseed; now adding destination gkey: %s"
+ % str(sourcekeys[0]))
+ success = dest.add(sourcekeys[0])
+ self.logger.debug("ACTIONS: moveseed; success: %s" %str(success))
+ self.logger.debug("ACTIONS: moveseed; now deleting sourcekey: %s" % str(sourcekeys[0]))
+ success = self.seeds.delete(sourcekeys[0])
+ if success:
+ success = dest.save()
+ self.logger.debug("ACTIONS: moveseed; destination saved... %s" %str(success))
+ success = self.seeds.save()
+ messages.extend(["Successfully Moved %s seed: %s"
+ % (args.category, str(success)), sourcekeys[0]])
+ return (success, messages)
+ elif len(sourcekeys):
+ messages = ["Too many seeds found to move"]
+ messages.extend(sourcekeys)
+ return (False, messages)
+ messages.append("Failed to move seed:")
+ messages.append(searchkey)
+ messages.append('\n')
+ messages.append("Source seeds found...")
+ messages.extend(sourcekeys or ["None\n"])
+ messages.append("Destination seeds found...")
+ messages.extend(destkeys or ["None\n"])
+ return (False, messages)
+
+
+ def listkey(self, args):
+ '''Pretty-print the selected seed file or nick'''
+ # get confirmation
+ # fill in code here
+ if not args.category:
+ args.category = 'rel'
+ catdir = self.config.get_key(args.category + "-category")
+ self.logger.debug("ACTIONS: listkey; catdir = %s" % catdir)
+ self.gpg = GkeysGPG(self.config, catdir)
+ handler = SeedHandler(self.logger, self.config)
+ if args.keydir:
+ self.gpg.set_keydir(args.keydir, "list-keys")
+ self.gpg.set_keyseedfile()
+ seeds = self.gpg.seedfile
+ else:
+ seeds = handler.load_category(args.category)
+ results = {}
+ success = []
+ messages = []
+ if args.gpgsearch:
+ keyresults = seeds.seeds
+ # pick any key
+ key = keyresults[sorted(keyresults)[0]]
+ result = self.gpg.list_keys(key.keydir, args.gpgsearch)
+ # now split the results and reverse lookup the gkey
+ lines = result.output.split('\n')
+ while lines:
+ # determine the end of the first key listing
+ index = lines.index('')
+ keyinfo = lines[:index]
+ # trim off the first keys info
+ lines = lines[index + 1:]
+ # make sure it is a key listing
+ if len(keyinfo) < 2:
+ break
+ # get the fingerprint from the line
+ fpr = keyinfo[1].split('= ')[1]
+ # search for the matching gkey
+ kwargs = {'keydir': args.keydir, 'fingerprint': [fpr]}
+ keyresults = seeds.list(**kwargs)
+ # list the results
+ for key in sorted(keyresults):
+ ls, lr = self._list_it(key, '\n'.join(keyinfo))
+ success.append(ls)
+ results[key.name] = lr
+ messages = ["Done."]
+ else:
+ kwargs = handler.build_gkeydict(args)
+ keyresults = seeds.list(**kwargs)
+ for key in sorted(keyresults):
+ result = self.gpg.list_keys(key.keydir, key.fingerprint)
+ ls, lr = self._list_it(key, result.output)
+ success.append(ls)
+ results[key.name] = lr
+ messages = ["Done."]
+
+ if not messages:
+ messages = ['No results found meeting criteria', "Did you specify -n foo or -n '*'"]
+ return (False not in success, messages)
+
+
+ def _list_it(self, key, result, print_key=True):
+ self.logger.debug("ACTIONS: _list_it; listing key:" + str(key.nick))
+ if self.config.options['print_results']:
+ if print_key:
+ print()
+ print("Nick.....:", key.nick)
+ print("Name.....:", key.name)
+ print("Keydir...:", key.keydir)
+ c = 0
+ for line in result.split('\n'):
+ if c == 0:
+ print("Gpg info.:", line)
+ else:
+ print(" ", line)
+ c += 1
+ self.logger.debug("data output:\n" + str(result))
+ return (True, result)
+
+
+ def installkey(self, args):
+ '''Install a key from the seed(s)'''
+ self.logger.debug("ACTIONS: installkey; args: %s" % str(args))
+ success, gkey = self.listseed(args)[1]
+ if gkey:
+ if gkey and not args.nick == '*' and self.output:
+ self.output(['', gkey], "\n Found GKEY seeds:")
+ elif gkey and self.output:
+ self.output(['all'], "\n Installing seeds:")
+ else:
+ self.logger.info("ACTIONS: installkey; "
+ "Matching seed entry not found")
+ if args.nick:
+ return (False, ["Search failed for: %s" % args.nick])
+ elif args.name:
+ return (False, ["Search failed for: %s" % args.name])
+ else:
+ return (False, ["Search failed for search term"])
+ # get confirmation
+ # fill in code here
+ catdir = self.config.get_key(args.category + "-category")
+ self.logger.debug("ACTIONS: installkey; catdir = %s" % catdir)
+ self.gpg = GkeysGPG(self.config, catdir)
+ results = {}
+ failed = []
+ for key in gkey:
+ self.logger.debug("ACTIONS: installkey; adding key:")
+ self.logger.debug("ACTIONS: " + str(key))
+ results[key.name] = self.gpg.add_key(key)
+ for result in results[key.name]:
+ self.logger.debug("ACTIONS: installkey; result.failed = " +
+ str(result.failed))
+ if self.config.options['print_results']:
+ for result in results[key.name]:
+ print("key desired:", key.name, ", key added:",
+ result.username, ", succeeded:",
+ not result.failed, ", fingerprint:", result.fingerprint)
+ self.logger.debug("stderr_out: " + str(result.stderr_out))
+ if result.failed:
+ failed.append(key)
+ if failed and self.output:
+ self.output([failed], "\n Failed to install:")
+ if failed:
+ success = False
+ return (success, ["Completed"])
+ return (success, ["No seeds to search or install"])
+
+
+ def checkkey(self, args):
+ '''Check keys actions'''
+ if not args.category:
+ return (False, ["Please specify seeds type."])
+ self.logger.debug("ACTIONS: checkkey; args: %s" % str(args))
+ handler = SeedHandler(self.logger, self.config)
+ seeds = handler.load_category(args.category)
+ catdir = self.config.get_key(args.category + "-category")
+ self.logger.debug("ACTIONS: checkkey; catdir = %s" % catdir)
+ self.gpg = GkeysGPG(self.config, catdir)
+ results = {}
+ failed = defaultdict(list)
+ kwargs = handler.build_gkeydict(args)
+ keyresults = seeds.list(**kwargs)
+ self.output('', '\n Checking keys...')
+ for gkey in sorted(keyresults):
+ self.logger.info("Checking key %s, %s" % (gkey.nick, gkey.keyid))
+ self.output('',
+ "\n %s, %s: %s" % (gkey.nick, gkey.name, ', '.join(gkey.keyid)) +
+ "\n ==============================================")
+ self.logger.debug("ACTIONS: checkkey; gkey = %s" % str(gkey))
+ for key in gkey.keyid:
+ results[gkey.name] = self.gpg.check_keys(gkey.keydir, key)
+ if results[gkey.name].expired:
+ failed['expired'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key))
+ if results[gkey.name].revoked:
+ failed['revoked'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key))
+ if results[gkey.name].invalid:
+ failed['invalid'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key))
+ if not results[gkey.name].sign:
+ failed['sign'].append("%s <%s>: %s " % (gkey.name, gkey.nick, key))
+ if failed['expired']:
+ self.output([failed['expired']], '\n Expired keys:\n')
+ if failed['revoked']:
+ self.output([failed['revoked']], '\n Revoked keys:\n')
+ if failed['invalid']:
+ self.output([failed['invalid']], '\n Invalid keys:\n')
+ if failed['sign']:
+ self.output([failed['sign']], '\n No signing capable subkeys:\n')
+ return (len(failed) <1,
+ ['\nFound:\n-------', 'Expired: %d' % len(failed['expired']),
+ 'Revoked: %d' % len(failed['revoked']),
+ 'Invalid: %d' % len(failed['invalid']),
+ 'No signing capable subkeys: %d' % len(failed['sign'])
+ ])
+
+
+ def speccheck(self, args):
+ '''Check keys actions'''
+ if not args.category:
+ return (False, ["Please specify seeds type."])
+ self.logger.debug("ACTIONS: speccheck; args: %s" % str(args))
+ handler = SeedHandler(self.logger, self.config)
+ seeds = handler.load_category(args.category)
+ catdir = self.config.get_key(args.category + "-category")
+ self.logger.debug("ACTIONS: speccheck; catdir = %s" % catdir)
+ self.gpg = GkeysGPG(self.config, catdir)
+ results = {}
+ failed = defaultdict(list)
+ kwargs = handler.build_gkeydict(args)
+ keyresults = seeds.list(**kwargs)
+ self.output('', '\n Checking keys...')
+ for gkey in sorted(keyresults):
+ self.logger.info("Checking key %s, %s" % (gkey.nick, gkey.keyid))
+ self.output('',
+ "\n %s, %s: %s" % (gkey.nick, gkey.name, ', '.join(gkey.keyid)) +
+ "\n ==============================================")
+ self.logger.debug("ACTIONS: speccheck; gkey = %s" % str(gkey))
+ for key in gkey.keyid:
+ results = self.gpg.speccheck(gkey.keydir, key)
+ for g in results:
+ pub_pass = {}
+ for key in results[g]:
+ self.output('', key.pretty_print())
+
+ if key.key is "PUB":
+ pub_pass = {
+ 'key': key,
+ 'pub': key.passed_spec,
+ 'sign': False,
+ 'encrypt': False,
+ 'auth': False,
+ 'signs': [],
+ 'encrypts': [],
+ 'authens': [],
+ 'final': False,
+ }
+ if key.key is "SUB":
+ if key.sign_capable and key.passed_spec:
+ pub_pass['signs'].append(key.passed_spec)
+ pub_pass['sign'] = True
+ if key.encrypt_capable:
+ pub_pass['encrypts'].append(key.passed_spec)
+ pub_pass['encrypt'] = True
+ if key.capabilities == 'a':
+ pub_pass['authens'].append(key.passed_spec)
+ if key.passed_spec:
+ pub_pass['auth'] = True
+ validity = key.validity.split(',')[0]
+ if not key.expire and not 'r' in validity:
+ failed['expired'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint))
+ if 'r' in validity:
+ failed['revoked'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint))
+ if 'i' in validity:
+ failed['invalid'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint))
+ if key.capabilities not in ['a', 'e']:
+ if not key.algo:
+ failed['algo'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint))
+ if not key.bits:
+ failed['bits'].append("%s <%s>: %s" % (gkey.name, gkey.nick, key.fingerprint))
+ if "Warning" in key.expire_reason:
+ failed['warn'].append("%s <%s>: %s " % (gkey.name, gkey.nick, key.fingerprint))
+ if True in pub_pass['signs']:
+ pub_pass['sign'] = True
+ if True in pub_pass['encrypts']:
+ pub_pass['encrypt'] = True
+ if not pub_pass['sign']:
+ failed['sign'].append("%s <%s>: %s" % (gkey.name, gkey.nick, pub_pass['key'].fingerprint))
+ if not pub_pass['encrypt']:
+ failed['encrypt'].append("%s <%s>: %s" % (gkey.name, gkey.nick, pub_pass['key'].fingerprint))
+ spec = "%s <%s>: %s" % (gkey.name, gkey.nick, pub_pass['key'].fingerprint)
+ for k in ['pub', 'sign']:
+ if pub_pass[k]:
+ pub_pass['final'] = True
+ else:
+ pub_pass['final'] = False
+ break
+ if pub_pass['final']:
+ if spec not in failed['spec-approved']:
+ failed['spec-approved'].append(spec)
+ else:
+ if spec not in failed['spec']:
+ failed['spec'].append(spec)
+ sdata = convert_pf(pub_pass, ['pub', 'sign', 'final'])
+ sdata = convert_yn(sdata, ['auth', 'encrypt'])
+ self.output('', SPECCHECK_SUMMARY % sdata)
+
+ if failed['revoked']:
+ self.output([sorted(set(failed['revoked']))], '\n Revoked keys:')
+ if failed['invalid']:
+ self.output([sorted(set(failed['invalid']))], '\n Invalid keys:')
+ if failed['sign']:
+ self.output([sorted(set(failed['sign']))], '\n No signing capable subkey:')
+ if failed['encrypt']:
+ self.output([sorted(set(failed['encrypt']))], '\n No Encryption capable subkey (Notice only):')
+ if failed['algo']:
+ self.output([sorted(set(failed['algo']))], '\n Incorrect Algorithm:')
+ if failed['bits']:
+ self.output([sorted(set(failed['bits']))], '\n Incorrect bit length:')
+ if failed['expired']:
+ self.output([sorted(set(failed['expired']))], '\n Expiry keys:')
+ if failed['warn']:
+ self.output([sorted(set(failed['warn']))], '\n Expiry Warnings:')
+ if failed['spec']:
+ self.output([sorted(set(failed['spec']))], '\n Failed to pass SPEC requirements:')
+ if failed['spec-approved']:
+ self.output([sorted(set(failed['spec-approved']))], '\n SPEC Approved:')
+
+ return (len(failed) <1,
+ ['\nFound Failures:\n-------',
+ 'Revoked................: %d' % len(set(failed['revoked'])),
+ 'Invalid................: %d' % len(set(failed['invalid'])),
+ 'No Signing subkey......: %d' % len(set(failed['sign'])),
+ 'No Encryption subkey...: %d' % len(set(failed['encrypt'])),
+ 'Algorithm..............: %d' % len(set(failed['algo'])),
+ 'Bit length.............: %d' % len(set(failed['bits'])),
+ 'Expiry.................: %d' % len(set(failed['expired'])),
+ 'Expiry Warnings........: %d' % len(set(failed['warn'])),
+ 'SPEC requirements......: %d' % len(set(failed['spec'])),
+ '=============================',
+ 'SPEC Approved..........: %d' % len(set(failed['spec-approved'])),
+ ])
+
+
+ def removekey(self, args):
+ '''Remove an installed key'''
+ if not args.nick:
+ return (False, ["Please provide a nickname or -n *"])
+ handler = SeedHandler(self.logger, self.config)
+ kwargs = handler.build_gkeydict(args)
+ self.logger.debug("ACTIONS: addkey; kwargs: %s" % str(kwargs))
+ success, installed_keys = self.installed(args)[1]
+ for gkey in installed_keys:
+ if kwargs['nick'] not in gkey.nick:
+ messages = ["%s does not seem to be a valid key." % kwargs['nick']]
+ success = False
+ else:
+ self.output(['', [gkey]], '\n Found GKEY seed:')
+ ans = raw_input("Do you really want to remove %s?[y/n]: "
+ % kwargs['nick']).lower()
+ while ans not in ["yes", "y", "no", "n"]:
+ ans = raw_input("Do you really want to remove %s?[y/n]: "
+ % kwargs['nick']).lower()
+ if ans in ["no", "n"]:
+ messages = ["Key removal aborted... Nothing to be done."]
+ else:
+ catdir = self.config.get_key(args.category + "-category")
+ rm_candidate = os.path.join(catdir, gkey.nick)
+ self.logger.debug("ACTIONS: removekey; catdir = %s" % catdir)
+ if args.category:
+ try:
+ rmtree(rm_candidate)
+ messages = ["Done removing %s key." % kwargs['nick']]
+ except OSError:
+ messages = ["%s directory does not exist." % rm_candidate]
+ success = False
+ return (success, messages)
+
+
+ def movekey(self, args):
+ '''Rename an installed key'''
+ return (False, [])
+
+
+ def importkey(self, args):
+ '''Add a specified key to a specified keyring'''
+ if args.category:
+ catdir = self.config.get_key(args.category + "-category")
+ keyring_dir = self.config.get_key("keyring")
+ self.logger.debug("ACTIONS: importkey; catdir = %s" % catdir)
+ self.gpg = GkeysGPG(self.config, catdir)
+ success, gkeys = self.listseed(args)[1]
+ results = {}
+ failed = []
+ print("Importing specified keys to keyring.")
+ for gkey in gkeys:
+ self.logger.debug("ACTIONS: importkey; adding key: %s", gkey.name)
+ results[gkey.name] = self.gpg.add_key(gkey)
+ if self.config.options['print_results']:
+ for result in results[gkey.name]:
+ print("key desired:", gkey.name, ", key added:",
+ result.username, ", succeeded:",
+ not result.failed, ", fingerprint:", result.fingerprint)
+ self.logger.debug("stderr_out: " + str(result.stderr_out))
+ if result.failed:
+ self.logger.debug("ACTIONS: importkey; result.failed = " + str(result.failed))
+ failed.append(gkey)
+ if not results[gkey.name][0].failed:
+ print("Importing: ", gkey.name)
+ self.logger.debug("ACTIONS: importkey; importing key: %s", gkey.name)
+ keyring = os.path.join(keyring_dir,args.keyring + '.gpg')
+ self.gpg.add_to_keyring(gkey, catdir, keyring)
+ if failed and self.output:
+ self.output([failed], "\n Failed to install:")
+ if len(failed):
+ success = False
+ return (success, ["Completed."])
+ return (False, ["No seeds to search or install",
+ "You must specify a category"])
+
+
+ def installed(self, args):
+ '''Lists the installed key directories'''
+ if args.category:
+ catdir = self.config.get_key(args.category + "-category")
+ else:
+ return (False, ["Please specify a category."])
+ self.logger.debug("ACTIONS: installed; catdir = %s" % catdir)
+ installed_keys = []
+ try:
+ if args.nick:
+ keys = [args.nick]
+ else:
+ keys = os.listdir(catdir)
+ for key in keys:
+ seed_path = os.path.join(catdir, key)
+ gkey_path = os.path.join(seed_path, 'gkey.seeds')
+ seed = None
+ try:
+ with open(gkey_path, 'r') as fileseed:
+ seed = load(fileseed)
+ except IOError:
+ return ["No seed file found in %s." % gkey_path, ""]
+ if seed:
+ for val in list(seed.values()):
+ installed_keys.append(GKEY(**val))
+ except OSError:
+ return (False, ["%s directory does not exist." % catdir, ""])
+ return (True, ['Found Key(s):', installed_keys])
+
+
+ def user_confirm(self, message):
+ '''Prompt a user to confirm an action
+
+ @param message: string, user promt message to display
+ @return boolean: confirmation to proceed or abort
+ '''
+ pass
+
+
+ def verify(self, args):
+ '''File verification action'''
+ connector_output = {
+ 'info': self.logger.debug,
+ 'error': self.logger.error,
+ 'kwargs-info': {},
+ 'kwargs-error': {},
+ }
+ if not args.filename:
+ return (False, ['Please provide a signed file.'])
+ if not args.category:
+ args.category = 'rel'
+ (success, data) = self.installed(args)
+ keys = data[1]
+ if not keys:
+ return (False, ['No installed keys found, try installkey action.'])
+ catdir = self.config.get_key(args.category + "-category")
+ self.logger.debug("ACTIONS: verify; catdir = %s" % catdir)
+ self.gpg = GkeysGPG(self.config, catdir)
+ filepath, signature = args.filename, args.signature
+ timestamp_path = None
+ isurl = success = verified = False
+ if filepath.startswith('http'):
+ isurl = True
+ url = filepath
+ filepath = args.destination
+ # a bit hackish, but save it to current directory
+ # with download file name
+ if not filepath:
+ filepath = url.split('/')[-1]
+ self.logger.debug("ACTIONS: verify; destination filepath was "
+ "not supplied, using current directory ./%s" % filepath)
+ if args.timestamp:
+ timestamp_path = filepath + ".timestamp"
+ if isurl:
+ from sslfetch.connections import Connector
+ connector_output = {
+ 'info': self.logger.info,
+ 'debug': self.logger.debug,
+ 'error': self.logger.error,
+ 'kwargs-info': {},
+ 'kwargs-debug': {},
+ 'kwargs-error': {},
+ }
+ fetcher = Connector(connector_output, None, "Gentoo Keys")
+ self.logger.debug("ACTIONS: verify; fetching %s signed file " % filepath)
+ self.logger.debug("ACTIONS: verify; timestamp path: %s" % timestamp_path)
+ success, signedfile, timestamp = fetcher.fetch_file(url, filepath, timestamp_path)
+ else:
+ filepath = os.path.abspath(filepath)
+ self.logger.debug("ACTIONS: verify; local file %s" % filepath)
+ success = os.path.isfile(filepath)
+ if not success:
+ messages = ["File %s cannot be retrieved." % filepath]
+ else:
+ if not signature:
+ EXTENSIONS = ['.sig', '.asc', 'gpg','.gpgsig']
+ success_fetch = False
+ for ext in EXTENSIONS:
+ sig_path = filepath + ext
+ if isurl:
+ signature = url + ext
+ self.logger.debug("ACTIONS: verify; fetching %s signature " % signature)
+ success_fetch, sig, timestamp = fetcher.fetch_file(signature, sig_path)
+ else:
+ signature = filepath + ext
+ signature = os.path.abspath(signature)
+ self.logger.debug("ACTIONS: verify; checking %s signature " % signature)
+ success_fetch = os.path.isfile(signature)
+ if success_fetch:
+ break
+ else:
+ sig_path = signature
+ messages = []
+ self.logger.info("Verifying file...")
+ verified = False
+ # get correct key to use
+ use_gkey = self.config.get_key('seedurls', 'gkey')
+ for key in keys:
+ if key.nick == use_gkey:
+ break
+ results = self.gpg.verify_file(key, sig_path, filepath)
+ keyid = key.keyid[0]
+ (valid, trust) = results.verified
+ if valid:
+ verified = True
+ messages = ["Verification succeeded.: %s" % (filepath),
+ "Key info...............: %s <%s>, %s"
+ % ( key.name, key.nick,keyid)]
+ else:
+ messages = ["Verification failed.....:" % (filepath),
+ "Key info................: %s <%s>, %s"
+ % ( key.name, key.nick,keyid)]
+ return (verified, messages)
+
+
+ def listseedfiles(self, args):
+ '''List seed files found in the configured seed directory'''
+ seedsdir = self.config.get_key('seedsdir')
+ seedfile = [f for f in os.listdir(seedsdir) if f[-5:] == 'seeds']
+ return (True, {"Seed files found at path: %s\n %s"
+ % (seedsdir, "\n ".join(seedfile)): True})
+
+
+ def sign(self, args):
+ '''Sign a file'''
+ if not args.filename:
+ return (False, ['Please provide a file to sign.'])
+
+ if isinstance(args.nick, str):
+ nicks = [args.nick]
+ else:
+ nicks = args.nick
+ # load our installed signing keys db
+ handler = SeedHandler(self.logger, self.config)
+ self.seeds = handler.load_category('sign', nicks)
+ if not self.seeds.seeds:
+ return (False, ['No installed keys, try installkey action.', ''])
+ basedir = self.config.get_key("sign-category")
+ keydir = self.config.get_key("sign", "keydir")
+ task = self.config.get_key("sign", "type")
+ keyring = self.config.get_key("sign", "keyring")
+
+ self.config.options['gpg_defaults'] = ['--status-fd', '2']
+
+ self.logger.debug("ACTIONS: sign; keydir = %s" % keydir)
+
+ self.gpg = GkeysGPG(self.config, basedir)
+ self.gpg.set_keydir(keydir, task)
+ if keyring not in ['', None]:
+ self.gpg.set_keyring(keyring, task)
+ msgs = []
+ success = []
+ for fname in args.filename:
+ results = self.gpg.sign(task, None, fname)
+ verified, trust = results.verified
+ if not results.verified[0]:
+ msgs.extend(
+ ['Failed Signature for %s verified: %s, trust: %s'
+ % (fname, verified, trust), 'GPG output:', "\n".join(results.stderr_out)]
+ )
+ success.append(False)
+ else:
+ msgs.extend(
+ ['Signature result for: %s -- verified: %s, trust: %s'
+ % (fname, verified, trust)] #, 'GPG output:', "\n".join(results.stderr_out)]
+ )
+ success.append(True)
+ return (False not in success, ['', msgs])
+
+
+ def refreshkey(self, args):
+ '''Calls gpg with the --refresh-keys option
+ for in place updates of the installed keys'''
+ if not args.category:
+ return (False, ["Please specify seeds type."])
+ self.logger.debug("ACTIONS: refreshkey; args: %s" % str(args))
+ handler = SeedHandler(self.logger, self.config)
+ seeds = handler.load_category(args.category)
+ catdir = self.config.get_key(args.category + "-category")
+ self.logger.debug("ACTIONS: refreshkey; catdir = %s" % catdir)
+ self.gpg = GkeysGPG(self.config, catdir)
+ results = {}
+ kwargs = handler.build_gkeydict(args)
+ keyresults = seeds.list(**kwargs)
+ self.output('', '\n Refreshig keys...')
+ for gkey in sorted(keyresults):
+ self.logger.info("Refreshig key %s, %s" % (gkey.nick, gkey.keyid))
+ self.output('', " %s: %s" % (gkey.name, ', '.join(gkey.keyid)))
+ #self.output('', " ===============")
+ self.logger.debug("ACTIONS: refreshkey; gkey = %s" % str(gkey))
+ results[gkey.keydir] = self.gpg.refresh_key(gkey)
+ return (True, ['Completed'])
+
+
+
+
diff --git a/gkeys/gkeys/base.py b/gkeys/gkeys/base.py
new file mode 100644
index 0000000..95142f9
--- /dev/null
+++ b/gkeys/gkeys/base.py
@@ -0,0 +1,253 @@
+#
+#-*- coding:utf-8 -*-
+
+"""
+ Gentoo-keys - base.py
+
+ Command line interface argsparse options module
+ and common functions
+
+ @copyright: 2012 by Brian Dolbec <dol-sen@gentoo.org>
+ @license: GNU GPL2, see COPYING for details.
+"""
+
+from __future__ import print_function
+
+
+import argparse
+
+from gkeys import config, fileops, seed, lib
+from gkeys.log import log_levels, set_logger
+
+
+class CliBase(object):
+ '''Common cli and argsparse options class'''
+
+
+ def __init__(self):
+ self.cli_config = {
+ 'Actions': [],
+ 'Available_Actions': [],
+ 'Action_Options': [],
+ 'prog': 'gkeys',
+ 'description': 'Gentoo-keys manager program',
+ 'epilog': '''Caution: adding untrusted keys to these keyrings can
+ be hazardous to your system!'''
+ }
+ self.config = None
+ self.args = None
+ self.seeds = None
+ self.actions = None
+
+
+ @staticmethod
+ def _option_dest(parser=None):
+ parser.add_argument('-d', '--dest', dest='destination', default=None,
+ help='The destination seed file or keydir for move, copy operations')
+
+ @staticmethod
+ def _option_fingerprint(parser=None):
+ parser.add_argument('-f', '--fingerprint', dest='fingerprint',
+ default=None, nargs='+',
+ help='The fingerprint of the the key')
+
+ @staticmethod
+ def _option_gpgsearch(parser=None):
+ parser.add_argument('-g', '--gpgsearch', dest='gpgsearch', default=None,
+ help='Do a gpg search operations, rather than a gkey search')
+
+ @staticmethod
+ def _option_keyid(parser=None):
+ parser.add_argument('-i', '--keyid', dest='keyid', default=None,
+ nargs='+',
+ help='The long keyid of the gpg key to search for')
+
+ @staticmethod
+ def _option_keyring(parser=None):
+ parser.add_argument('-k', '--keyring', dest='keyring', default='trusted_keyring',
+ help='The name of the keyring to use')
+
+ @staticmethod
+ def _option_nick(parser=None):
+ parser.add_argument('-n', '--nick', dest='nick', default=None,
+ help='The nick associated with the the key')
+
+ @staticmethod
+ def _option_name(parser=None):
+ parser.add_argument('-N', '--name', dest='name', nargs='*',
+ default=None, help='The name of the the key')
+
+ @staticmethod
+ def _option_category(parser=None):
+ parser.add_argument('-C', '--category',
+ choices=['rel', 'dev', 'overlays', 'sign'], dest='category', default=None,
+ help='The key or seed directory category to use or update')
+
+ @staticmethod
+ def _option_cleankey(parser=None):
+ parser.add_argument('--clean-key',
+ dest='cleankey', default=False,
+ help='Clean the key from the keyring due to failures')
+
+ @staticmethod
+ def _option_cleanseed(parser=None):
+ parser.add_argument('--clean-seed',
+ dest='cleanseed', default=False,
+ help='Clean the seed from the seedfile due to failures. '
+ 'Used during binary keyring release creation.')
+
+ @staticmethod
+ def _option_keydir(parser=None):
+ parser.add_argument('-r', '--keydir', dest='keydir', default=None,
+ help='The keydir to use, update or search for/in')
+
+ @staticmethod
+ def _option_seedfile(parser=None):
+ parser.add_argument('-S', '--seedfile', dest='seedfile', default=None,
+ help='The seedfile to use from the gkeys.conf file')
+
+ @staticmethod
+ def _option_file(parser=None):
+ parser.add_argument('-F', '--file', dest='filename', default=None,
+ nargs='+',
+ help='The path/URL to use for the (signed) file')
+
+ @staticmethod
+ def _option_1file(parser=None):
+ parser.add_argument('-F', '--file', dest='filename', default=None,
+ help='The path/URL to use for the (signed) file')
+
+ @staticmethod
+ def _option_signature(parser=None):
+ parser.add_argument('-s','--signature', dest='signature', default=None,
+ help='The path/URL to use for the signature')
+
+ @staticmethod
+ def _option_timestamp(parser=None):
+ parser.add_argument('-t', '--timestamp', dest='timestamp', type=bool,
+ default=False,
+ help='Turn on timestamp use')
+
+ @staticmethod
+ def _option_mail(parser=None):
+ parser.add_argument('-m', '--mail', dest='mail', default=None,
+ help='The email address to search for')
+
+ @staticmethod
+ def _option_status(parser=None):
+ parser.add_argument('-A', '--status', default=False,
+ help='The active status of the member')
+
+
+ def parse_args(self, args):
+ '''Parse a list of aruments
+
+ @param args: list
+ @returns argparse.Namespace object
+ '''
+ #logger.debug('CliBase: parse_args; args: %s' % args)
+ parser = argparse.ArgumentParser(
+ prog=self.cli_config['prog'],
+ description=self.cli_config['description'],
+ epilog=self.cli_config['epilog'])
+
+ # options
+ parser.add_argument('-c', '--config', dest='config', default=None,
+ help='The path to an alternate config file')
+ parser.add_argument('-D', '--debug', default='DEBUG',
+ choices=list(log_levels),
+ help='The logging level to set for the logfile')
+
+
+ subparsers = parser.add_subparsers(help='actions')
+ for name in self.cli_config['Available_Actions']:
+ action_method = getattr(self.cli_config['Actions'], name)
+ actiondoc = action_method.__doc__
+ try:
+ text = actiondoc.splitlines()[0]
+ except AttributeError:
+ text = ""
+ action_parser = subparsers.add_parser(
+ name,
+ help=text,
+ description=actiondoc,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ action_parser.set_defaults(action=name)
+ self._add_options(action_parser, self.cli_config['Action_Options'][name])
+
+ return parser.parse_args(args)
+
+
+ def _add_options(self, parser, options):
+ for opt in options:
+ getattr(self, '_option_%s' % opt)(parser)
+
+
+ def run(self, args):
+ '''Run the args passed in
+
+ @param args: list or argparse.Namespace object
+ '''
+ global logger
+ message = None
+ if not args:
+ message = "Main: run; invalid args argument passed in"
+ if isinstance(args, list):
+ args = self.parse_args(args)
+ if args.config:
+ self.config.defaults['config'] = args.config
+ # now make it load the config file
+ self.config.read_config()
+
+ # establish our logger and update it in the imported files
+ logger = set_logger('gkeys', self.config['logdir'], args.debug,
+ dirmode=int(self.config.get_key('permissions', 'directories'),0),
+ filemask=int(self.config.get_key('permissions', 'files'),0))
+ config.logger = logger
+ fileops.logger = logger
+ seed.logger = logger
+ lib.logger = logger
+
+ if message:
+ logger.error(message)
+
+ # now that we have a logger, record the alternate config setting
+ if args.config:
+ logger.debug("Main: run; Found alternate config request: %s"
+ % args.config)
+
+ # establish our actions instance
+ self.actions = self.cli_config['Actions'](self.config, self.output_results, logger)
+
+ # run the action
+ func = getattr(self.actions, '%s' % args.action)
+ logger.debug('Main: run; Found action: %s' % args.action)
+ success, results = func(args)
+ if not results:
+ print("No results found. Check your configuration and that the",
+ "seed file exists.")
+ return success
+ if self.config.options['print_results'] and 'done' not in list(results):
+ self.output_results(results, '\n Gkey task results:')
+ return success
+
+
+ @staticmethod
+ def output_results(results, header):
+ # super simple output for the time being
+ if header:
+ print(header)
+ for msg in results:
+ if isinstance(msg, str) or isinstance(msg, unicode):
+ print(' ', msg)
+ else:
+ try:
+ print("\n".join([x.pretty_print for x in msg]))
+ except AttributeError:
+ for x in msg:
+ print(' ', x)
+ print()
+
+
+ def output_failed(self, failed):
+ pass
diff --git a/gkeys/gkeys/checks.py b/gkeys/gkeys/checks.py
new file mode 100644
index 0000000..db3d59f
--- /dev/null
+++ b/gkeys/gkeys/checks.py
@@ -0,0 +1,437 @@
+#
+#-*- coding:utf-8 -*-
+
+"""
+ Gentoo-Keys - gkeygen/checks.py
+
+ Primary key checks module
+ @copyright: 2014 by Brian Dolbec <dolsen@gentoo.org>
+ @license: GNU GPL2, see COPYING for details
+"""
+
+import time
+from collections import namedtuple, OrderedDict
+
+from gkeys.config import GKEY_CHECK
+
+from pyGPG.mappings import (ALGORITHM_CODES, CAPABILITY_MAP,
+ KEY_VERSION_FPR_LEN, VALIDITY_MAP, INVALID_LIST,
+ VALID_LIST)
+
+
+SPEC_INDEX = {
+ 'key': 0,
+ 'capabilities': 1,
+ 'fingerprint': 2,
+ 'bits': 3,
+ 'created': 4,
+ 'expire': 5,
+ 'encrypt_capable': 6,
+ 'sign_capable': 7,
+ 'algo': 8,
+ 'version': 9,
+ 'id': 10,
+ 'days': 11,
+ 'validity': 12,
+ 'expire_reason': 13,
+ 'long_caps': 14, # long version of the capbilities
+ 'caps': 15,
+ 'caps_reason': 16,
+ 'id_reason': 17,
+ 'is_valid': 18,
+ 'passed_spec': 19,
+}
+
+SPEC_INDEX = OrderedDict(sorted(SPEC_INDEX.items(), key=lambda t: t[1]))
+
+SPEC_STAT = ['', '','', False, False, False, False, False, False, False, False,
+ 0, '', '', '', True, '', '', False, False]
+
+# Default glep 63 minimum gpg key specification
+# and approved options, limits
+TEST_SPEC = {
+ 'bits': {
+ 'DSA': 2048,
+ 'RSA': 2048,
+ },
+ 'expire': 3 * 365, # in days
+ 'subkeys': { # warning/error mode
+ 'encrypt': {
+ 'mode': 'notice',
+ 'expire': 3 * 365,
+ },
+ 'sign': {
+ 'mode': 'error',
+ 'expire': 365,
+ },
+ },
+ 'algorithms': ['DSA', 'RSA', '1', '2', '3', '17'],
+ 'versions': ['4'],
+ 'qualified_id': '@gentoo.org',
+}
+
+# Final pass/fail fields and the pass value required
+TEST_REQUIREMENTS = {
+ 'bits': True,
+ 'created': True,
+ 'expire': True,
+ 'sign_capable': True,
+ 'algo': True,
+ 'version': True,
+ 'id': True,
+ 'is_valid': True,
+ 'caps': True,
+}
+
+SECONDS_PER_DAY = 86400
+
+
+SPECCHECK_STRING = ''' ----------
+ Fingerprint......: %(fingerprint)s
+ Key type ........: %(key)s Capabilities.: %(capabilities)s %(long_caps)s
+ Algorithm........: %(algo)s Bit Length...: %(bits)s
+ Create Date......: %(created)s Expire Date..: %(expire)s
+ Key Version......: %(version)s Validity.....: %(validity)s
+ Days till expiry.: %(days)s %(expire_reason)s
+ Capability.......: %(caps)s %(caps_reason)s
+ Qualified ID.....: %(id)s %(id_reason)s
+ This %(pub_sub)s.: %(passed_spec)s'''
+
+SPECCHECK_SUMMARY = ''' Key summary
+ primary..........: %(pub)s signing subkey: %(sign)s
+ encryption subkey: %(encrypt)s authentication subkey: %(auth)s
+ SPEC requirements: %(final)s
+'''
+
+def convert_pf(data, fields):
+ '''Converts dictionary items from True/False to Pass/Fail strings
+
+ @param data: dict
+ @param fields: list
+ @returns: dict
+ '''
+ for f in fields:
+ if data[f]:
+ data[f] = 'Pass'
+ else:
+ data[f] = 'Fail'
+ return data
+
+def convert_yn(data, fields):
+ '''Converts dictionary items from True/False to Yes/No strings
+
+ @param data: dict
+ @param fields: list
+ @returns: dict
+ '''
+ for f in fields:
+ if data[f]:
+ data[f] = 'Yes '
+ else:
+ data[f] = 'No '
+ return data
+
+
+class SpecCheck(namedtuple("SpecKey", list(SPEC_INDEX))):
+
+ __slots__ = ()
+
+ def pretty_print(self):
+ data = self.convert_data()
+ output = SPECCHECK_STRING % (data)
+ return output
+
+
+ def convert_data(self):
+ data = dict(self._asdict())
+ data = convert_pf(data, ['algo', 'bits', 'caps', 'created', 'expire', 'id',
+ 'passed_spec', 'version'])
+ for f in ['caps', 'id']:
+ data[f] = data[f].ljust(10)
+ data['validity'] += ', %s' % (VALIDITY_MAP[data['validity']])
+ days = data['days']
+ if days == float("inf"):
+ data['days'] = "infinite".ljust(10)
+ else:
+ data['days'] = str(int(data['days'])).ljust(10)
+ if data['capabilities'] == 'e':
+ data['algo'] = '----'
+ data['bits'] = '----'
+ if data['key'] =='PUB':
+ data['pub_sub'] = 'primary key'
+ else:
+ data['pub_sub'] = 'subkey.....'
+ return data
+
+
+class KeyChecks(object):
+ '''Primary gpg key validation and specifications checks class'''
+
+ def __init__(self, logger, spec=TEST_SPEC, qualified_id_check=True):
+ '''@param spec: optional gpg specification to test against
+ Defaults to TEST_SPEC
+
+ '''
+ self.logger = logger
+ self.spec = spec
+ self.check_id = qualified_id_check
+
+
+ def validity_checks(self, keydir, keyid, result):
+ '''Check the specified result based on the seed type
+
+ @param keydir: the keydir to list the keys for
+ @param keyid: the keyid to check
+ @param result: pyGPG.output.GPGResult object
+ @returns: GKEY_CHECK instance
+ '''
+ revoked = expired = invalid = sign = False
+ for data in result.status.data:
+ if data.name == "PUB":
+ if data.long_keyid == keyid[2:]:
+ # check if revoked
+ if 'r' in data.validity:
+ revoked = True
+ self.logger.debug("ERROR in key %s : revoked" % data.long_keyid)
+ break
+ # if primary key expired, all subkeys expire
+ if 'e' in data.validity:
+ expired = True
+ self.logger.debug("ERROR in key %s : expired" % data.long_keyid)
+ break
+ # check if invalid
+ if 'i' in data.validity:
+ invalid = True
+ self.logger.debug("ERROR in key %s : invalid" % data.long_keyid)
+ break
+ if 's' in data.key_capabilities:
+ sign = True
+ self.logger.debug("INFO primary key %s : key signing capabilities" % data.long_keyid)
+ if data.name == "SUB":
+ # check if invalid
+ if 'i' in data.validity:
+ self.logger.debug("WARNING in subkey %s : invalid" % data.long_keyid)
+ continue
+ # check if expired
+ if 'e' in data.validity:
+ self.logger.debug("WARNING in subkey %s : expired" % data.long_keyid)
+ continue
+ # check if revoked
+ if 'r' in data.validity:
+ self.logger.debug("WARNING in subkey %s : revoked" % data.long_keyid)
+ continue
+ # check if subkey has signing capabilities
+ if 's' in data.key_capabilities:
+ sign = True
+ self.logger.debug("INFO subkey %s : subkey signing capabilities" % data.long_keyid)
+ return GKEY_CHECK(keyid, revoked, expired, invalid, sign)
+
+
+ def spec_check(self, keydir, keyid, result):
+ '''Performs the minimum specifications checks on the key'''
+ self.logger.debug("SPEC_CHECK() : CHECKING: %s" % keyid)
+ results = {}
+ pub = None
+ stats = None
+ pub_days = 0
+ for data in result.status.data:
+ if data.name == "PUB":
+ if stats:
+ stats = self._test_final(data, stats)
+ results[pub.long_keyid].append(SpecCheck._make(stats))
+ pub = data
+ found_id = False
+ found_id_reason = ''
+ results[data.long_keyid] = []
+ stats = SPEC_STAT[:]
+ stats[SPEC_INDEX['key']] = data.name
+ stats[SPEC_INDEX['capabilities']] = data.key_capabilities
+ stats[SPEC_INDEX['validity']] = data.validity
+ stats = self._test_created(data, stats)
+ stats = self._test_algo(data, stats)
+ stats = self._test_bits(data, stats)
+ stats = self._test_expire(data, stats, pub_days)
+ pub_days = stats[SPEC_INDEX['days']]
+ stats = self._test_caps(data, stats)
+ stats = self._test_validity(data, stats)
+ elif data.name == "FPR":
+ pub = pub._replace(**{'fingerprint': data.fingerprint})
+ stats[SPEC_INDEX['fingerprint']] = data.fingerprint
+ stats = self._test_version(data, stats)
+ elif data.name == "UID":
+ stats = self._test_uid(data, stats)
+ if stats[SPEC_INDEX['id']] in [True, '-----']:
+ found_id = stats[SPEC_INDEX['id']]
+ found_id_reason = ''
+ stats[SPEC_INDEX['id_reason']] = ''
+ else:
+ found_id_reason = stats[SPEC_INDEX['id_reason']]
+ elif data.name == "SUB":
+ if stats:
+ stats = self._test_final(data, stats)
+ results[pub.long_keyid].append(SpecCheck._make(stats))
+ stats = SPEC_STAT[:]
+ stats[SPEC_INDEX['key']] = data.name
+ stats[SPEC_INDEX['capabilities']] = data.key_capabilities
+ stats[SPEC_INDEX['fingerprint']] = '%s' \
+ % (data.long_keyid)
+ stats[SPEC_INDEX['id']] = found_id
+ stats[SPEC_INDEX['id_reason']] = found_id_reason
+ stats[SPEC_INDEX['validity']] = data.validity
+ stats = self._test_validity(data, stats)
+ stats = self._test_created(data, stats)
+ stats = self._test_algo(data, stats)
+ stats = self._test_bits(data, stats)
+ stats = self._test_expire(data, stats, pub_days)
+ stats = self._test_caps(data, stats)
+ if stats:
+ stats = self._test_final(data, stats)
+ results[pub.long_keyid].append(SpecCheck._make(stats))
+ stats = None
+ self.logger.debug("SPEC_CHECK() : COMPLETED: %s" % keyid)
+ return results
+
+
+ def _test_algo(self, data, stats):
+ algo = data.pubkey_algo
+ if algo in TEST_SPEC['algorithms']:
+ stats[SPEC_INDEX['algo']] = True
+ else:
+ self.logger.debug("ERROR in key %s : invalid Type: %s"
+ % (data.long_keyid, ALGORITHM_CODES[algo]))
+ return stats
+
+
+ def _test_bits(self, data, stats):
+ bits = int(data.keylength)
+ if data.pubkey_algo in TEST_SPEC['algorithms']:
+ if bits >= TEST_SPEC['bits'][ALGORITHM_CODES[data.pubkey_algo]]:
+ stats[SPEC_INDEX['bits']] = True
+ else:
+ self.logger.debug("ERROR in key %s : invalid Bit length: %d"
+ % (data.long_keyid, bits))
+ return stats
+
+
+ def _test_version(self, data, stats):
+ fpr_l = len(data.fingerprint)
+ if KEY_VERSION_FPR_LEN[fpr_l] in TEST_SPEC['versions']:
+ stats[SPEC_INDEX['version']] = True
+ else:
+ self.logger.debug("ERROR in key %s : invalid gpg key version: %s"
+ % (data.long_keyid, KEY_VERSION_FPR_LEN[fpr_l]))
+ return stats
+
+
+ def _test_created(self, data, stats):
+ try:
+ created = float(data.creation_date)
+ except ValueError:
+ created = 0
+ if created <= time.time() :
+ stats[SPEC_INDEX['created']] = True
+ else:
+ self.logger.debug("ERROR in key %s : invalid gpg key creation date: %s"
+ % (data.long_keyid, data.creation_date))
+ return stats
+
+
+ def _test_expire(self, data, stats, pub_days):
+ if data.name in ["PUB"]:
+ delta_t = TEST_SPEC['expire']
+ stats = self._expire_check(data, stats, delta_t, pub_days)
+ return stats
+ else:
+ for cap in data.key_capabilities:
+ try:
+ delta_t = TEST_SPEC['subkeys'][CAPABILITY_MAP[cap]]['expire']
+ except KeyError:
+ self.logger.debug(
+ "WARNING in capability key %s : setting delta_t to main expiry: %d"
+ % (cap, TEST_SPEC['expire']))
+ delta_t = TEST_SPEC['expire']
+ stats = self._expire_check(data, stats, delta_t, pub_days)
+ return stats
+
+
+ def _expire_check(self, data, stats, delta_t, pub_days):
+ today = time.time()
+ try:
+ expires = float(data.expiredate)
+ except ValueError:
+ expires = float("inf")
+ if data.name == 'SUB' and expires == float("inf"):
+ days = stats[SPEC_INDEX['days']] = pub_days
+ elif expires == float("inf"):
+ days = stats[SPEC_INDEX['days']] = expires
+ else:
+ days = stats[SPEC_INDEX['days']] = max(0, int((expires - today)/SECONDS_PER_DAY))
+ if days <= delta_t:
+ stats[SPEC_INDEX['expire']] = True
+ elif days > delta_t and not ('i' in data.validity or 'r' in data.validity):
+ stats[SPEC_INDEX['expire_reason']] = '<== Exceeds specification'
+ else:
+ self.logger.debug("ERROR in key %s : invalid gpg key expire date: %s"
+ % (data.long_keyid, data.expiredate))
+ if 0 < days < 30 and not ('i' in data.validity or 'r' in data.validity):
+ stats[SPEC_INDEX['expire_reason']] = '<== WARNING < 30 days'
+
+ return stats
+
+
+ def _test_caps(self, data, stats):
+ if 'e' in data.key_capabilities:
+ if 's' in data.key_capabilities or 'a' in data.key_capabilities:
+ stats[SPEC_INDEX['caps']] = False
+ stats[SPEC_INDEX['caps_reason']] = "<== Mixing of 'e' with 's' and/or 'a'"
+ if not stats[SPEC_INDEX['is_valid']]:
+ return stats
+ kcaps = []
+ for cap in data.key_capabilities:
+ if CAPABILITY_MAP[cap] and stats[SPEC_INDEX['caps']]:
+ kcaps.append(CAPABILITY_MAP[cap])
+ if cap in ["s"] and not data.name == "PUB":
+ stats[SPEC_INDEX['sign_capable']] = True
+ elif cap in ["e"]:
+ stats[SPEC_INDEX['encrypt_capable']] = True
+ elif cap not in CAPABILITY_MAP:
+ stats[SPEC_INDEX['caps']] = False
+ self.logger.debug("ERROR in key %s : unknown gpg key capability: %s"
+ % (data.long_keyid, cap))
+ stats[SPEC_INDEX['long_caps']] = ', '.join(kcaps)
+ return stats
+
+
+ def _test_uid(self, data, stats):
+ if not self.check_id:
+ stats[SPEC_INDEX['id']] = '-----'
+ stats[SPEC_INDEX['id_reason']] = ''
+ return stats
+ if TEST_SPEC['qualified_id'] in data.user_ID :
+ stats[SPEC_INDEX['id']] = True
+ stats[SPEC_INDEX['id_reason']] = ''
+ else:
+ stats[SPEC_INDEX['id_reason']] = "<== '%s' user id not found" % TEST_SPEC['qualified_id']
+ self.logger.debug("Warning: No qualified ID found in key %s"
+ % (data.user_ID))
+ return stats
+
+
+ def _test_validity(self, data, stats):
+ if data.validity in VALID_LIST:
+ stats[SPEC_INDEX['is_valid']] = True
+ return stats
+
+
+ def _test_final(self, data, stats):
+ for test, result in TEST_REQUIREMENTS.items():
+ if ((stats[SPEC_INDEX['key']] == 'PUB' and test == 'sign_capable') or
+ (stats[SPEC_INDEX['capabilities']] == 'e' and test in ['algo', 'bits', 'sign_capable'])
+ or (stats[SPEC_INDEX['capabilities']] == 'a' and test in ['sign_capable'])):
+ continue
+ if stats[SPEC_INDEX[test]] == result:
+ stats[SPEC_INDEX['passed_spec']] = True
+ else:
+ stats[SPEC_INDEX['passed_spec']] = False
+ break
+ return stats
diff --git a/gkeys/gkeys/cli.py b/gkeys/gkeys/cli.py
new file mode 100644
index 0000000..32d2ec4
--- /dev/null
+++ b/gkeys/gkeys/cli.py
@@ -0,0 +1,58 @@
+#
+#-*- coding:utf-8 -*-
+
+"""
+ Gentoo-keys - cli.py
+
+ Command line interface module
+
+ @copyright: 2012 by Brian Dolbec <dol-sen@gentoo.org>
+ @license: GNU GPL2, see COPYING for details.
+"""
+
+from __future__ import print_function
+
+
+import sys
+
+from gkeys.base import CliBase
+from gkeys.actions import Actions, Available_Actions, Action_Options
+from gkeys.config import GKeysConfig
+
+
+
+class Main(CliBase):
+ '''Main command line interface class'''
+
+
+ def __init__(self, root=None, config=None, print_results=True):
+ """ Main class init function.
+
+ @param root: string, root path to use
+ """
+ CliBase.__init__(self)
+ self.root = root or "/"
+ self.config = config or GKeysConfig(root=root)
+ self.config.options['print_results'] = print_results
+ self.cli_config = {
+ 'Actions': Actions,
+ 'Available_Actions': Available_Actions,
+ 'Action_Options': Action_Options,
+ 'prog': 'gkeys',
+ 'description': 'Gentoo-keys manager program',
+ 'epilog': '''Caution: adding untrusted keys to these keyrings can
+ be hazardous to your system!'''
+ }
+
+
+ def __call__(self, args=None):
+ """Main class call function
+
+ @param args: Optional list of argumanets to parse and action to run
+ Defaults to sys.argv[1:]
+ """
+ if args:
+ return self.run(self.parse_args(args))
+ else:
+ return self.run(self.parse_args(sys.argv[1:]))
+
diff --git a/gkeys/gkeys/config.py b/gkeys/gkeys/config.py
new file mode 100644
index 0000000..775ea1f
--- /dev/null
+++ b/gkeys/gkeys/config.py
@@ -0,0 +1,181 @@
+#
+#-*- coding:utf-8 -*-
+
+"""
+ Gentoo-keys - config.py
+
+ Holds configuration keys and values
+
+ @copyright: 2012 by Brian Dolbec <dol-sen@gentoo.org>
+ @license: GNU GNU GPL2, see COPYING for details.
+"""
+
+import os
+import sys
+
+# py3.2
+if sys.hexversion >= 0x30200f0:
+ import configparser as ConfigParser
+else:
+ import ConfigParser
+
+from collections import namedtuple
+
+
+from pyGPG.config import GPGConfig
+
+from gkeys import log
+from gkeys.utils import path
+
+logger = log.logger
+
+
+# establish the eprefix, initially set so eprefixify can
+# set it on install
+EPREFIX = "@GENTOO_PORTAGE_EPREFIX@"
+
+# check and set it if it wasn't
+if "GENTOO_PORTAGE_EPREFIX" in EPREFIX:
+ EPREFIX = ''
+
+GKEY_STRING = ''' ----------
+ Name.........: %(name)s
+ Nick.........: %(nick)s
+ Keydir.......: %(keydir)s
+'''
+
+GKEY_FINGERPRINTS = \
+''' Keyid........: %(keyid)s
+ Fingerprint: %(fingerprint)s
+'''
+
+MAPSEEDS = { 'dev' : 'gentoodevs.seeds', 'rel': 'gentoo.seeds' }
+
+
+class GKeysConfig(GPGConfig):
+ """ Configuration superclass which holds our gentoo-keys
+ config settings for pygpg """
+
+ def __init__(self, config=None, root=None, read_configfile=False):
+ """ Class initialiser """
+ GPGConfig.__init__(self)
+
+ self.root = root or ''
+ if config:
+ self.defaults['config'] = config
+ self.defaults['configdir'] = os.path.dirname(config)
+ else:
+ homedir = os.path.expanduser('~')
+ self.defaults['configdir'] = homedir
+ self.defaults['config']= os.path.join(homedir, '.gkeys.conf')
+ if not os.path.exists(self.defaults['config']):
+ self.defaults['configdir'] = path([self.root, EPREFIX, '/etc/gkeys'])
+ self.defaults['config'] = '%(configdir)s/gkeys.conf'
+ self.configparser = None
+ self._add_gkey_defaults()
+ if read_configfile:
+ self.read_config()
+
+
+ def _add_gkey_defaults(self):
+ self.defaults['gkeysdir'] = path([self.root, EPREFIX, '/var/lib/gentoo/gkeys'])
+ self.defaults['dev-keydir'] = '%(gkeysdir)s/devs'
+ self.defaults['rel-keydir'] = '%(gkeysdir)s/release'
+ self.defaults['keyring'] = '%(gkeysdir)s/keyring'
+ self.defaults['overlays-keydir'] = '%(gkeysdir)s/overlays'
+ self.defaults['sign-keydir'] = '%(gkeysdir)s/sign',
+ self.defaults['logdir'] = '/var/log/gkeys'
+ # local directory to scan for seed files installed via ebuild, layman
+ # or manual install.
+ self.defaults['seedsdir'] = '%(gkeysdir)s/seeds'
+ self.defaults['seeds'] = {
+ 'gentoo': '%(seedsdir)s/gentoo.seeds',
+ 'gentoodevs': '%(seedsdir)s/gentoodevs.seeds',
+ }
+ self.defaults['keyserver'] = 'pool.sks-keyservers.net'
+ # NOTE: files is umask mode in octal, directories is chmod mode in octal
+ self.defaults['permissions'] = {'files': '0o002', 'directories': '0o775',}
+ self.defaults['seedurls'] = {
+ 'gentoo': 'https://api.gentoo.org/gentoo-keys/seeds/gentoo.seeds',
+ 'gentoodevs': 'https://api.gentoo.org/gentoo-keys/seeds/gentoodevs.seeds',
+ 'gkey': 'gkeys',
+ }
+ self.defaults['sign'] = {
+ 'key': 'fingerprint',
+ 'keydir': '~/.gkeys',
+ 'keyring': None,
+ 'type': 'clearsign',
+ }
+
+
+ def read_config(self):
+ '''Reads the config file into memory
+ '''
+ if "%(configdir)s" in self.defaults['config']:
+ # fix the config path
+ self.defaults['config'] = self.defaults['config'] \
+ % {'configdir': self.defaults['configdir']}
+ defaults = self.get_defaults()
+ # remove some defaults from being entered into the configparser
+ for key in ['gpg_defaults', 'only_usable', 'refetch', 'tasks']:
+ defaults.pop(key)
+ self.configparser = ConfigParser.ConfigParser(defaults)
+ self.configparser.read(defaults['config'])
+
+
+ def get_key(self, key, subkey=None):
+ return self._get_(key, subkey)
+
+
+ def _get_(self, key, subkey=None):
+ if subkey:
+ if self.configparser and self.configparser.has_option(key, subkey):
+ if logger:
+ logger.debug("Found %s in configparser... %s"
+ % (key, str(self.configparser.get(key, subkey))))
+ return self._sub_(self.configparser.get(key, subkey))
+ #print("CONFIG: key, subkey", key, subkey)
+ if key in self.options and subkey in self.options[key]:
+ return self._sub_(self.options[key][subkey])
+ elif key in self.defaults and subkey in self.defaults[key]:
+ return self._sub_(self.defaults[key][subkey])
+ else:
+ return super(GKeysConfig, self)._get_(key, subkey)
+ elif self.configparser and self.configparser.has_option('DEFAULT', key):
+ if logger:
+ logger.debug("Found %s in configparser... %s"
+ % (key, str(self.configparser.get('DEFAULT', key))))
+ #logger.debug("type(key)= %s"
+ # % str(type(self.configparser.get('DEFAULT', key))))
+ return self.configparser.get('DEFAULT', key)
+ else:
+ return super(GKeysConfig, self)._get_(key, subkey)
+
+
+class GKEY(namedtuple('GKEY', ['nick', 'name', 'keydir', 'fingerprint'])):
+ '''Class to hold the relavent info about a key'''
+
+ field_types = {'nick': str, 'name': str, 'keydir': str, 'fingerprint': list}
+ __slots__ = ()
+
+
+ @property
+ def keyid(self):
+ '''Keyid is a substring value of the fingerprint'''
+ return ['0x' + x[-16:] for x in self.fingerprint]
+
+
+ @property
+ def pretty_print(self):
+ '''Pretty printing a GKEY'''
+ gkey = {'name': self.name, 'nick': self.nick, 'keydir': self.keydir}
+ output = GKEY_STRING % gkey
+ for f in self.fingerprint:
+ fingerprint = {'fingerprint': f, 'keyid': '0x' + f[-16:]}
+ output += GKEY_FINGERPRINTS % fingerprint
+ return output
+
+
+class GKEY_CHECK(namedtuple('GKEY_CHECK', ['keyid', 'revoked', 'expired', 'invalid', 'sign'])):
+
+ __slots__ = ()
diff --git a/gkeys/gkeys/fileops.py b/gkeys/gkeys/fileops.py
new file mode 100644
index 0000000..7cb244f
--- /dev/null
+++ b/gkeys/gkeys/fileops.py
@@ -0,0 +1,56 @@
+import os
+from snakeoil.osutils import (ensure_dirs as snakeoil_ensure_dirs)
+
+
+def ensure_dirs(path, gid=-1, uid=-1, mode=0o700, minimal=True, failback=None, fatal=False):
+ '''Wrapper to snakeoil.osutil's ensure_dirs()
+ This additionally allows for failures to run
+ cleanup or other code and/or raise fatal errors.
+
+ @param path: directory to ensure exists on disk
+ @param gid: a valid GID to set any created directories to
+ @param uid: a valid UID to set any created directories to
+ @param mode: permissions to set any created directories to
+ @param minimal: boolean controlling whether or not the specified mode
+ must be enforced, or is the minimal permissions necessary. For example,
+ if mode=0755, minimal=True, and a directory exists with mode 0707,
+ this will restore the missing group perms resulting in 757.
+ @param failback: function to run in the event of a failed attemp
+ to create the directory.
+ @return: True if the directory could be created/ensured to have those
+ permissions, False if not.
+ '''
+ succeeded = snakeoil_ensure_dirs(path, gid=-1, uid=-1, mode=mode, minimal=True)
+ if not succeeded:
+ if failback:
+ failback()
+ if fatal:
+ raise IOError(
+ "Failed to create directory: %s" % path)
+ return succeeded
+
+
+def updatefiles(config, logger):
+ filename = config['dev-seedfile']
+ old = filename + '.old'
+ try:
+ logger.info("Backing up existing file...")
+ if os.path.exists(old):
+ logger.debug(
+ "MAIN: _action_updatefile; Removing 'old' seed file: %s"
+ % old)
+ os.unlink(old)
+ if os.path.exists(filename):
+ logger.debug(
+ "MAIN: _action_updatefile; Renaming current seed file to: "
+ "%s" % old)
+ os.rename(filename, old)
+ if os.path.exists(filename + '.new'):
+ logger.debug(
+ "MAIN: _action_updatefile; Renaming '.new' seed file to: %s"
+ % filename)
+ os.rename(filename + '.new', filename)
+ except IOError:
+ raise
+ return False
+ return True
diff --git a/gkeys/gkeys/lib.py b/gkeys/gkeys/lib.py
new file mode 100644
index 0000000..50ed63e
--- /dev/null
+++ b/gkeys/gkeys/lib.py
@@ -0,0 +1,345 @@
+#
+#-*- coding:utf-8 -*-
+
+'''Gentoo-keys - lib.py
+This is gentoo-keys superclass which wraps the pyGPG lib
+with gentoo-keys specific convienience functions.
+
+ Distributed under the terms of the GNU General Public License v2
+
+ Copyright:
+ (c) 2011 Brian Dolbec
+ Distributed under the terms of the GNU General Public License v2
+
+ Author(s):
+ Brian Dolbec <dolsen@gentoo.org>
+
+'''
+
+# for py 2.6 compatibility
+from __future__ import print_function
+
+
+from os.path import abspath, pardir
+from os.path import join as pjoin
+
+from pyGPG.gpg import GPG
+from gkeys.checks import KeyChecks
+from gkeys.fileops import ensure_dirs
+from gkeys.log import logger
+from gkeys.seed import Seeds
+
+class GkeysGPG(GPG):
+ '''Gentoo-keys primary gpg class'''
+
+
+ def __init__(self, config, basedir):
+ '''class init function
+
+ @param config: GKeysConfig config instance to use
+ @param keydir: string, the path to the keydir to be used
+ for all operations.
+ '''
+ GPG.__init__(self, config, logger)
+ self.config = config
+ self.basedir = basedir
+ self.keydir = None
+ self.server = None
+
+
+ def set_keyserver(self, server=None):
+ '''Set the keyserver and add the --keyserver option to the gpg defaults
+ '''
+ if self.server and not server:
+ return
+ self.server = server or self.config['keyserver']
+ self.config.options['gpg_defaults'] = self.config.defaults['gpg_defaults'][:]
+ logger.debug("keyserver: %s" % (self.server))
+ server_value = ['--keyserver', self.server]
+ self.config.options['gpg_defaults'].extend(server_value)
+ logger.debug("self.config.options['gpg_defaults']: %s"
+ % (self.config.options['gpg_defaults']))
+ return
+
+
+ def set_keyring(self, keyring, task, importkey=False, reset=True):
+ '''Sets the keyring to use as well as related task options
+ '''
+ logger.debug("keydir: %s, keyring: %s" % (self.keydir, keyring))
+ if reset:
+ self.config.options['tasks'][task] = self.config.defaults['tasks'][task][:]
+ # --keyring file | Note that this adds a keyring to the current list.
+ # If the intent is to use the specified keyring alone,
+ # use --keyring along with --no-default-keyring.
+ if importkey:
+ task_value = ['--import-options', 'import-clean']
+ self.config.options['tasks'][task].extend(task_value)
+ parent_dir = abspath(pjoin(keyring, pardir))
+ ensure_dirs(parent_dir,
+ mode=int(self.config.get_key('permissions', 'directories'),0))
+ task_value = ['--no-default-keyring', '--keyring', keyring]
+ self.config.options['tasks'][task].extend(task_value)
+ logger.debug("set_keyring: New task options: %s" %str(self.config.options['tasks'][task]))
+ return
+
+
+ def set_keydir(self, keydir, task, fingerprint=True, reset=True):
+ logger.debug("basedir: %s, keydir: %s" % (self.basedir, keydir))
+ self.keydir = pjoin(self.basedir, keydir)
+ self.task = task
+ if reset:
+ self.config.options['tasks'][task] = self.config.defaults['tasks'][task][:]
+ task_value = []
+ if fingerprint:
+ task_value.append('--fingerprint')
+ task_value.extend(['--homedir', self.keydir])
+ self.config.options['tasks'][task].extend(task_value)
+ logger.debug("set_keydir: New task options: %s" %str(self.config.options['tasks'][task]))
+ return
+
+
+ def add_to_keyring(self, gkey, keydir, keyring):
+ '''Add the specified key to the specified keyring
+
+ @param gkey: GKEY namedtuple with
+ (name, keyid/longkeyid, keydir, fingerprint)
+ @param keydir: path with the specified keydir
+ @param keyring: string with the specified keyring
+ '''
+ self.set_keydir(keydir, 'import', reset=True)
+ self.set_keyring(keyring, 'import', importkey=True, reset=False)
+ results = []
+ logger.debug("LIB: import_to_keyring; name: " + gkey.name)
+ logger.debug("** Calling runGPG with Running: gpg %s --import' for: %s"
+ % (' '.join(self.config.get_key('tasks', 'import')),
+ gkey.name))
+ pubring_path = pjoin(self.keydir, gkey.keydir, 'pubring.gpg')
+ result = self.runGPG(task='import', inputfile=pubring_path)
+ logger.info('GPG return code: ' + str(result.returncode))
+ results.append(result)
+ print(result.stderr_out)
+ return results
+
+
+ def add_key(self, gkey):
+ '''Add the specified key to the specified keydir
+
+ @param gkey: GKEY namedtuple with
+ (name, nick, keydir, fingerprint)
+ '''
+ self.config.defaults['gpg_defaults'].append('--no-permission-warning')
+ self.set_keyserver()
+ self.set_keydir(gkey.keydir, 'recv-keys', reset=True)
+ self.set_keyring('pubring.gpg', 'recv-keys', reset=False)
+ logger.debug("LIB: add_key; ensure dirs: " + self.keydir)
+ ensure_dirs(str(self.keydir))
+ self.set_keyseedfile(trap_errors=False)
+ results = []
+ for fingerprint in gkey.fingerprint:
+ logger.debug("LIB: add_key; adding fingerprint " + fingerprint)
+ logger.debug("** Calling runGPG with Running 'gpg %s --recv-keys %s' for: %s"
+ % (' '.join(self.config.get_key('tasks', 'recv-keys')),
+ fingerprint, gkey.name))
+ result = self.runGPG(task='recv-keys', inputfile=fingerprint)
+ logger.info('GPG return code: ' + str(result.returncode))
+ if result.fingerprint in gkey.fingerprint:
+ result.failed = False
+ message = "Fingerprints match... Import successful: "
+ message += "%s, fingerprint: %s" % (gkey.nick, fingerprint)
+ message += "\n result len: %s, %s" % (len(result.fingerprint), result.fingerprint)
+ message += "\n gkey len: %s, %s" % (len(gkey.fingerprint[0]), gkey.fingerprint[0])
+ logger.info(message)
+ else:
+ result.failed = True
+ message = "Fingerprints do not match... Import failed for "
+ message += "%s, fingerprint: %s" % (gkey.nick, fingerprint)
+ message += "\n result: %s" % (result.fingerprint)
+ message += "\n gkey..: %s" % (str(gkey.fingerprint))
+ logger.error(message)
+ # Save the gkey seed to the installed db
+ self.seedfile.update(gkey)
+ if not self.seedfile.save():
+ logger.error("GkeysGPG.add_key(); failed to save seed: " + gkey.nick)
+ return []
+ results.append(result)
+ return results
+
+
+ def del_key(self, gkey, keydir):
+ '''Delete the specified key in the specified keydir
+
+ @param gkey: GKEY namedtuple with (name, nick, keydir, fingerprint)
+ '''
+ return []
+
+
+ def del_keydir(self, keydir):
+ '''Delete the specified keydir
+ '''
+ return []
+
+
+ def refresh_key(self, gkey):
+ '''Refresh the specified key in the specified keydir
+
+ @param key: tuple of (name, nick, keydir, fingerprint)
+ @param keydir: the keydir to add the key to
+ '''
+ self.config.defaults['gpg_defaults'].append('--no-permission-warning')
+ self.set_keyserver()
+ self.set_keydir(gkey.keydir, 'refresh-keys', reset=True)
+ self.set_keyring('pubring.gpg', 'refresh-keys', reset=False)
+ logger.debug("LIB: refresh_key, gkey: %s" % str(gkey))
+ logger.debug("** Calling runGPG with Running 'gpg %s --refresh-keys' for: %s"
+ % (' '.join(self.config.get_key('tasks', 'refresh-keys')), str(gkey)))
+ result = self.runGPG(task='refresh-keys', inputfile='')
+ logger.info('GPG return code: ' + str(result.returncode))
+ return result
+
+
+ def update_key(self, gkey, keydir):
+ '''Update the specified key in the specified keydir
+
+ @param key: tuple of (name, nick, keydir, fingerprint)
+ @param keydir: the keydir to add the key to
+ '''
+ return []
+
+
+ def list_keys(self, keydir, fingerprint=None, colons=False):
+ '''List all keys in the specified keydir or
+ all keys in all keydir if keydir=None
+
+ @param keydir: the keydir to list the keys for
+ @param colons: bool to enable colon listing
+ '''
+ if not keydir:
+ logger.debug("LIB: list_keys(), invalid keydir parameter: %s"
+ % str(keydir))
+ return []
+ if fingerprint:
+ task = 'list-key'
+ target = fingerprint
+ else:
+ task = 'list-keys'
+ target = keydir
+ self.set_keydir(keydir, task, fingerprint=True)
+ self.config.options['tasks'][task].extend(['--keyid-format', 'long', '--with-fingerprint'])
+ if colons:
+ task_value = ['--with-colons']
+ self.config.options['tasks'][task].extend(task_value)
+ logger.debug("** Calling runGPG with Running 'gpg %s --%s %s'"
+ % (' '.join(self.config['tasks'][task]), task, keydir)
+ )
+ result = self.runGPG(task=task, inputfile=target)
+ logger.info('GPG return code: ' + str(result.returncode))
+ return result
+
+
+ def check_keys(self, keydir, keyid, result=None):
+ '''Check specified or all keys based on the seed type
+
+ @param keydir: the keydir to list the keys for
+ @param keyid: the keyid to check
+ @param result: optional pyGPG.output.GPGResult object
+ @returns: GKEY_CHECK instance
+ '''
+ if not result:
+ result = self.list_keys(keydir, fingerprint=keyid, colons=True)
+ checker = KeyChecks(logger, qualified_id_check=True)
+ return checker.validity_checks(keydir, keyid, result)
+
+
+ def speccheck(self, keydir, keyid, result=None):
+ '''Check specified or all keys based on the seed type
+ specifications are met.
+
+ @param keydir: the keydir to list the keys for
+ @param keyid: the keyid to check
+ @param result: optional pyGPG.output.GPGResult object
+ @returns: SpecCheck instance
+ '''
+ if not result:
+ result = self.list_keys(keydir, fingerprint=keyid, colons=True)
+ checker = KeyChecks(logger, qualified_id_check=True)
+ specchecks = checker.spec_check(keydir, keyid, result)
+ return specchecks
+
+
+ def list_keydirs(self):
+ '''List all available keydirs
+ '''
+ return []
+
+
+ def verify_key(self, gkey):
+ '''Verify the specified key from the specified keydir
+
+ @param gkey: GKEY namedtuple with (name, keyid/longkeyid, fingerprint)
+ '''
+ pass
+
+
+ def verify_text(self, text):
+ '''Verify a text block in memory
+ '''
+ pass
+
+
+ def verify_file(self, gkey, signature, filepath):
+ '''Verify the file specified at filepath or url
+
+ @param gkey: GKEY instance of the gpg key used to verify it
+ @param signature: string with the signature file
+ @param filepath: string with the path or url of the signed file
+ '''
+ if signature:
+ self.set_keydir(gkey.keydir, 'verify', reset=True)
+ logger.debug("** Calling runGPG with Running 'gpg %s --verify %s and %s'"
+ % (' '.join(self.config['tasks']['verify']), signature, filepath))
+ results = self.runGPG(task='verify', inputfile=[signature,filepath])
+ else:
+ self.set_keydir(gkey.keydir, 'decrypt', reset=True)
+ logger.debug("** Calling runGPG with Running 'gpg %s --decrypt %s and %s'"
+ % (' '.join(self.config['tasks']['decrypt']), filepath))
+ results = self.runGPG(task='decrypt', inputfile=filepath)
+ keyid = gkey.keyid[0]
+ if results.verified[0]:
+ logger.info("GPG verification succeeded. Name: %s / Key: %s" % (gkey.name, keyid))
+ logger.info("\tSignature result:" + str(results.verified))
+ else:
+ logger.debug("GPG verification failed. Name: %s / Key: %s" % (gkey.name, keyid))
+ logger.debug("\t Signature result:"+ str(results.verified))
+ logger.debug("LIB: verify; stderr_out:" + str(results.stderr_out))
+ return results
+
+
+ def set_keyseedfile(self, trap_errors):
+ if not self.keydir:
+ logger.debug("GkeysGPG.set_keyseedfile(); self.keydir error")
+ self.seedfile = Seeds(pjoin(self.keydir, 'gkey.seeds'), self.config)
+ self.seedfile.load(trap_errors=trap_errors)
+
+
+ def sign_file(self, gkey, mode, fingerprint, filepath):
+ '''Verify the file specified at filepath or url
+
+ @param gkey: GKEY instance
+ @param mode: string of the signing task to use
+ @param fingerprint: string of the fingerprint to sign with
+ @param filepath: string with the path of the file to sign
+ '''
+ keyid = gkey.keyid[0]
+ self.set_keydir(gkey.keydir, mode, reset=True)
+ logger.debug("** Calling runGPG with Running 'gpg %s --%s %s %s'"
+ % (' '.join(self.config['tasks'][mode]), mode, fingerprint, filepath))
+ results = self.runGPG(task=mode, inputfile=filepath)
+
+ if results.verified[0]:
+ logger.info("GPG signing succeeded. Name: %s / Key: %s" % (str(gkey.name), str(keyid)))
+ logger.info("\tSignature result:" + str(results.verified))
+ else:
+ logger.debug("GPG signing failed. Name: %s / Key: %s" % (str(gkey.name), str(keyid)))
+ logger.debug("\t Signature result:"+ str(results.verified))
+ logger.debug("LIB: sign; stderr_out:" + str(results.stderr_out))
+ return results
diff --git a/gkeys/gkeys/log.py b/gkeys/gkeys/log.py
new file mode 100644
index 0000000..a16767e
--- /dev/null
+++ b/gkeys/gkeys/log.py
@@ -0,0 +1,72 @@
+#
+#-*- coding:utf-8 -*-
+
+"""
+ Gentoo-Keys - Log.py
+
+ Logging module, placeholder for our site-wide logging module
+
+ @copyright: 2012 by Brian Dolbec <dol-sen> <dol-sen@users.sourceforge.net>
+ @license: GNU GPL2, see COPYING for details.
+"""
+
+import logging
+import time
+import os
+
+from gkeys.fileops import ensure_dirs
+
+
+NAMESPACE = 'gentoo-keys'
+logger = None
+Console_handler = None
+
+log_levels = {
+ 'CRITICAL': logging.CRITICAL,
+ 'DEBUG': logging.DEBUG,
+ 'ERROR': logging.ERROR,
+ 'FATAL': logging.FATAL,
+ 'INFO': logging.INFO,
+ 'NOTSET': logging.NOTSET,
+ 'WARN': logging.WARN,
+ 'WARNING': logging.WARNING,
+}
+
+
+def set_logger(namespace=None, logpath='', level=None,
+ dirmode=0o775, filemask=0o002):
+ global logger, NAMESPACE, Console_handler
+ if not namespace:
+ namespace = NAMESPACE
+ else:
+ NAMESPACE = namespace
+ logger = logging.getLogger(namespace)
+ logger.setLevel(log_levels['DEBUG'])
+ # create formatter and add it to the handlers
+ log_format = '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
+ formatter = logging.Formatter(log_format)
+ # add the handlers to logger
+ if logpath:
+ ensure_dirs(logpath, mode=dirmode, fatal=True)
+ os.umask(filemask)
+ logname = os.path.join(logpath,
+ '%s-%s.log' % (namespace, time.strftime('%Y%m%d-%H:%M')))
+ file_handler = logging.FileHandler(logname)
+ if level:
+ #print "Setting cli log level", level, log_levels[level]
+ file_handler.setLevel(log_levels[level])
+ else:
+ #print "Create file handler which logs even debug messages"
+ file_handler.setLevel(log_levels['DEBUG'])
+ file_handler.setFormatter(formatter)
+ logger.addHandler(file_handler)
+
+ # create console handler with a higher log level
+ Console_handler = logging.StreamHandler()
+ Console_handler.setLevel(logging.ERROR)
+ #Console_handler.setFormatter(formatter)
+ logger.addHandler(Console_handler)
+ #print "File logger suppose to be initialized", logger, Console_handler
+ logger.debug("Loggers initialized")
+
+ return logger
diff --git a/gkeys/gkeys/seed.py b/gkeys/gkeys/seed.py
new file mode 100644
index 0000000..f0cb019
--- /dev/null
+++ b/gkeys/gkeys/seed.py
@@ -0,0 +1,192 @@
+#
+#-*- coding:utf-8 -*-
+
+'''Gentoo-keys - seed.py
+This is gentoo-keys superclass which wraps the pyGPG lib
+with gentoo-keys specific convienience functions.
+
+ Distributed under the terms of the GNU General Public License v2
+
+ Copyright:
+ (c) 2011 Brian Dolbec
+ Distributed under the terms of the GNU General Public License v2
+
+ Author(s):
+ Brian Dolbec <dolsen@gentoo.org>
+
+'''
+
+import json
+import os
+
+from gkeys.log import logger
+from gkeys.config import GKEY
+from gkeys.fileops import ensure_dirs
+
+
+class Seeds(object):
+ '''Handles all seed key file operations'''
+
+
+ def __init__(self, filepath=None, config=None):
+ '''Seeds class init function
+
+ @param filepath: string of the file to load
+ '''
+ self.filename = filepath
+ self.config = config
+ self.seeds = {}
+
+
+ def load(self, filename=None, trap_errors=True):
+ '''Load the seed file into memory'''
+ if filename:
+ self.filename = filename
+ if not self.filename:
+ logger.debug("Seed: load; Not a valid filename: '%s'" % str(self.filename))
+ return False
+ logger.debug("Seeds: load; Begin loading seed file %s" % self.filename)
+ seedlines = None
+ self.seeds = {}
+ try:
+ with open(self.filename, "r+") as seedfile:
+ seedlines = json.load(seedfile)
+ except IOError as err:
+ logger.debug("Seed: load; IOError occurred while loading file")
+ if trap_errors:
+ self._error(err)
+ return False
+ for seed in list(seedlines.items()):
+ #try:
+ self.seeds[seed[0]] = GKEY(**seed[1])
+ #except Exception as err:
+ #logger.debug("Seed: load; Error splitting seed: %s" % seed)
+ #logger.debug("Seed: load; ...............parts: %s" % str(parts))
+ #self._error(err)
+ logger.debug("Seed: load; Completed loading seed file %s" % self.filename)
+ return True
+
+
+ def save(self, filename=None):
+ '''Save the seeds to the file'''
+ if filename:
+ self.filename = filename
+ if not self.filename:
+ logger.debug("Seed: save; Not a valid filename: '%s'" % str(self.filename))
+ return False
+ logger.debug("Seed: save; Begin saving seed file %s" % self.filename)
+ ensure_dirs(os.path.split(self.filename)[0],
+ mode=int(self.config.get_key('permissions', "directories"),0),
+ fatal=True)
+ os.umask(int(self.config.get_key("permissions", "files"),0))
+ try:
+ with open(self.filename, 'w') as seedfile:
+ seedfile.write(self._seeds2json(self.seeds))
+ seedfile.write("\n")
+ except IOError as err:
+ self._error(err)
+ return False
+ return True
+
+
+ def add(self, dev, gkey):
+ '''Add a new seed key to memory'''
+ if isinstance(gkey, dict) or isinstance(gkey, GKEY):
+ self.seeds[dev] = gkey
+ return True
+ return False
+
+
+ def delete(self, gkey=None):
+ '''Delete the key from the seeds in memory
+
+ @param gkey: GKEY, the matching GKEY to delete
+ '''
+ if gkey:
+ if isinstance(gkey, dict):
+ nick = gkey['nick']
+ elif isinstance(gkey, GKEY):
+ nick = gkey.nick
+ try:
+ self.seeds.pop(nick, None)
+ except ValueError:
+ return False
+ return True
+
+
+ def list(self, **kwargs):
+ '''List the key or keys matching the kwargs argument or all
+
+ @param kwargs: dict of GKEY._fields and values
+ @returns list
+ '''
+ if not kwargs or ('nick' in kwargs and kwargs['nick'] == '*'):
+ return sorted(self.seeds.values())
+ # proceed with the search
+ # discard any invalid keys
+ keys = kwargs
+ result = self.seeds
+ for key in keys:
+ if key in ['fingerprint', 'keyid']:
+ kwargs[key] = [x.replace(' ', '').upper() for x in kwargs[key]]
+ if key in ['fingerprint']:
+ result = {dev: gkey for dev, gkey in list(result.items()) if kwargs[key][0] in getattr(gkey, key)}
+ elif key in ['keyid']:
+ searchids = [x.lstrip('0X') for x in kwargs[key]]
+ res = {}
+ for dev, gkey in list(result.items()):
+ keyids = [x.lstrip("0x") for x in getattr(gkey, key)]
+ for keyid in searchids:
+ if keyid in keyids:
+ res[dev] = gkey
+ break
+ result = res
+ else:
+ result = {dev: gkey for dev, gkey in list(result.items()) if kwargs[key].lower() in getattr(gkey, key).lower()}
+ return sorted(result.values())
+
+
+ def search(self, pattern):
+ '''Search for the keys matching the regular expression pattern'''
+ pass
+
+
+ def nick_search(self, nick):
+ '''Searches the seeds for a matching nick
+
+ @param nick: string
+ @returns GKEY instance or None
+ '''
+ try:
+ return self.seeds[nick]
+ except KeyError:
+ return None
+
+
+ def _error(self, err):
+ '''Class error logging function'''
+ logger.error("Seed: Error processing seed file %s" % self.filename)
+ logger.error("Seed: Error was: %s" % str(err))
+
+
+ def _seeds2json(self, seeds):
+ is_gkey = False
+ if not seeds:
+ seeds = {}
+ elif isinstance(list(seeds.values())[0], GKEY):
+ is_gkey = True
+ for dev, value in list(seeds.items()):
+ if is_gkey:
+ seeds[dev] = dict(value._asdict())
+ return json.dumps(seeds, sort_keys=True, indent=4)
+
+
+ def update(self, gkey):
+ '''Looks for existance of a matching nick already in the seedfile
+ if it exists. Then either adds or replaces the gkey
+ @param gkey: GKEY instance
+ '''
+ oldkey = self.nick_search(gkey.nick)
+ if oldkey:
+ self.delete(oldkey)
+ self.add(gkey.nick, gkey)
diff --git a/gkeys/gkeys/seedhandler.py b/gkeys/gkeys/seedhandler.py
new file mode 100644
index 0000000..cc797b9
--- /dev/null
+++ b/gkeys/gkeys/seedhandler.py
@@ -0,0 +1,189 @@
+#
+#-*- coding:utf-8 -*-
+
+"""
+ Gentoo-keys - seedhandler.py
+
+ Seed handling interface module
+
+ @copyright: 2012 by Brian Dolbec <dol-sen@gentoo.org>
+ @license: GNU GPL2, see COPYING for details.
+"""
+
+import os
+import re
+from json import load
+
+from gkeys.config import GKEY
+from gkeys.seed import Seeds
+from gkeys.fileops import ensure_dirs
+
+
+class SeedHandler(object):
+
+ def __init__(self, logger, config):
+ self.config = config
+ self.logger = logger
+ self.fingerprint_re = re.compile('[0-9A-Fa-f]{40}')
+ self.finerprint_re2 = re.compile('[0-9A-Fa-f]{4}( [0-9A-Fa-f]{4}){9}')
+
+
+ def new(self, args, checkgkey=False):
+ newgkey = self.build_gkeydict(args)
+ if checkgkey:
+ newgkey, is_good = self.check_gkey(newgkey)
+ if is_good:
+ newgkey = GKEY(**newgkey)
+ self.logger.debug("SeedHandler: new; new gkey: %s" % str(newgkey))
+ else:
+ return None
+ else:
+ newgkey = GKEY(**newgkey)
+ self.logger.debug("SeedHandler: new; NON-checked new gkey: %s" % str(newgkey))
+ return newgkey
+
+
+ @staticmethod
+ def build_gkeydict(args):
+ keyinfo = {}
+ for attr in GKEY._fields + ('keyid',):
+ try:
+ value = getattr(args, attr)
+ if attr == 'name' and value:
+ value = " ".join(value)
+ if value:
+ keyinfo[attr] = value
+ except AttributeError:
+ pass
+ return keyinfo
+
+ def load_seeds(self, seedfile=None, filepath=None):
+ '''Load seed file
+
+ @param seeds: string of the short name seed file
+ @param seedfile: string filepath of the file to load
+ @return Seeds class instance of the file loaded
+ '''
+ if not seedfile and not filepath:
+ self.logger.error("SeedHandler: load_seeds; no filename to load: "
+ "setting = %s. Please use the -S or -F option to indicate: which seed "
+ "file to use." % seedfile)
+ return False
+ if seedfile:
+ filepath = self.config.get_key('seeds', seedfile)
+ elif not filepath:
+ self.logger.error("SeedHandler: load_seeds; No filepath to load")
+ self.logger.debug("SeedHandler: load_seeds; seeds filepath to load: "
+ "%s" % filepath)
+ seeds = Seeds(config=self.config)
+ seeds.load(filepath)
+ return seeds
+
+ def load_category(self, category, nicks=None):
+ '''Loads the designated key directories
+
+ @param category: string
+ @param nicks: list of string nick ids to load
+ @return Seeds class object
+ '''
+ seeds = Seeds(config=self.config)
+ if category:
+ catdir = self.config.get_key(category + "-category")
+ else:
+ self.logger.debug("SeedHandler: load_category; Error invalid category: %s." % (str(category)))
+ return seeds
+ self.logger.debug("SeedHandler: load_category; catdir = %s" % catdir)
+ try:
+ if not nicks:
+ nicks = os.listdir(catdir)
+ for nick in nicks:
+ seed_path = os.path.join(catdir, nick)
+ gkey_path = os.path.join(seed_path, 'gkey.seeds')
+ seed = None
+ try:
+ with open(gkey_path, 'r') as fileseed:
+ seed = load(fileseed)
+ except IOError as error:
+ self.logger.debug("SeedHandler: load_category; IOError loading seed file %s." % gkey_path)
+ self.logger.debug("Error was: %s" % str(error))
+ if seed:
+ for nick in sorted(seed):
+ key = seed[nick]
+ seeds.add(nick, GKEY(**key))
+ except OSError as error:
+ self.logger.debug("SeedHandler: load_category; OSError for %s" % catdir)
+ self.logger.debug("Error was: %s" % str(error))
+ return seeds
+
+ def fetch_seeds(self, seeds, args, verified_dl=None):
+ '''Fetch new seed files
+
+ @param seeds: list of seed nicks to download
+ @param verified_dl: Function pointer to the Actions.verify()
+ instance needed to do the download and verification
+ '''
+ http_check = re.compile(r'^(http|https)://')
+ urls = []
+ messages = []
+ try:
+ for seed in [seeds]:
+ seedurl = self.config.get_key('seedurls', seed)
+ seedpath = self.config.get_key('seeds', seed)
+ if http_check.match(seedurl):
+ urls.extend([(seedurl, seedpath)])
+ else:
+ self.logger.info("Wrong seed file URLs... Switching to default URLs.")
+ urls.extend([(self.config['seedurls'][seed], seedpath)])
+ except KeyError:
+ pass
+ succeeded = []
+ seedsdir = self.config.get_key('seedsdir')
+ mode = int(self.config.get_key('permissions', 'directories'),0)
+ ensure_dirs(seedsdir, mode=mode)
+ for (url, filepath) in urls:
+ args.category = 'rel'
+ args.filename = url
+ args.signature = None
+ args.timestamp = True
+ args.destination = filepath
+ verified, messages_ = verified_dl(args)
+ succeeded.append(verified)
+ messages.append(messages_)
+ return (succeeded, messages)
+
+ def check_gkey(self, args):
+ # assume it's good until an error is found
+ is_good = True
+ try:
+ args['keydir'] = args.get('keydir', args['nick'])
+ fprs = []
+ if args['fingerprint']:
+ for fpr in args['fingerprint']:
+ is_good, fingerprint = self._check_fingerprint_integrity(fpr)
+ if is_good:
+ fprs.append(fingerprint)
+ else:
+ self.logger.error('Bad fingerprint from command line args: %s' % fpr)
+ if is_good:
+ args['fingerprint'] = fprs
+ except KeyError:
+ self.logger.error('GPG fingerprint not found.')
+ is_good = False
+ if not is_good:
+ self.logger.error('A valid fingerprint '
+ 'was not found for %s' % args['name'])
+ return args, is_good
+
+ def _check_fingerprint_integrity(self, fpr):
+ # assume it's good unti an error is found
+ is_good = True
+ fingerprint = fpr.replace(" ", "")
+ # check fingerprint integrity
+ if len(fingerprint) != 40:
+ self.logger.error(' GPGKey incorrect fingerprint ' +
+ 'length (%s) for fingerprint: %s' % (len(fingerprint), fingerprint))
+ is_good = False
+ if not self.fingerprint_re.match(fingerprint):
+ self.logger.error(' GPGKey: Non hexadecimal digits in ' + 'fingerprint for fingerprint: ' + fingerprint)
+ is_good = False
+ return is_good, fingerprint
diff --git a/gkeys/gkeys/utils.py b/gkeys/gkeys/utils.py
new file mode 100644
index 0000000..92abc50
--- /dev/null
+++ b/gkeys/gkeys/utils.py
@@ -0,0 +1,161 @@
+#
+# -*- coding: utf-8 -*-
+
+'''# File: utils.py
+
+ Utilities to deal with things...
+ copied/edited from app-portage/layman's xml.py
+
+
+ Copyright:
+ (c) 2005 - 2008 Gunnar Wrobel
+ (c) 2009 Sebastian Pipping
+ (c) 2009 Christian Groschupp
+ Distributed under the terms of the GNU General Public License v2
+
+ Author(s):
+ Gunnar Wrobel <wrobel@gentoo.org>
+ Sebastian Pipping <sebastian@pipping.org>
+ Christian Groschupp <christian@groschupp.org>
+
+Utility functions'''
+
+
+
+import types
+import re
+import os
+import sys
+import locale
+import codecs
+
+try:
+ StringTypes = types.StringTypes
+except AttributeError:
+ StringTypes = [str]
+
+
+def encoder(text, _encoding_):
+ return codecs.encode(text, _encoding_, 'replace')
+
+
+def decode_selection(selection):
+ '''utility function to decode a list of strings
+ accoring to the filesystem encoding
+ '''
+ # fix None passed in, return an empty list
+ selection = selection or []
+ enc = sys.getfilesystemencoding()
+ if enc is not None:
+ return [encoder(i, enc) for i in selection]
+ return selection
+
+
+def get_encoding(output):
+ if hasattr(output, 'encoding') \
+ and output.encoding != None:
+ return output.encoding
+ else:
+ encoding = locale.getpreferredencoding()
+ # Make sure that python knows the encoding. Bug 350156
+ try:
+ # We don't care about what is returned, we just want to
+ # verify that we can find a codec.
+ codecs.lookup(encoding)
+ except LookupError:
+ # Python does not know the encoding, so use utf-8.
+ encoding = 'utf_8'
+ return encoding
+
+
+def pad(string, length):
+ '''Pad a string with spaces.'''
+ if len(string) <= length:
+ return string + ' ' * (length - len(string))
+ else:
+ return string[:length - 3] + '...'
+
+
+def terminal_width():
+ '''Determine width of terminal window.'''
+ try:
+ width = int(os.environ['COLUMNS'])
+ if width > 0:
+ return width
+ except:
+ pass
+ try:
+ import struct, fcntl, termios
+ query = struct.pack('HHHH', 0, 0, 0, 0)
+ response = fcntl.ioctl(1, termios.TIOCGWINSZ, query)
+ width = struct.unpack('HHHH', response)[1]
+ if width > 0:
+ return width
+ except:
+ pass
+ return 80
+
+
+# From <http://effbot.org/zone/element-lib.htm>
+# BEGIN
+def indent(elem, level=0):
+ i = "\n" + level*" "
+ if len(elem):
+ if not elem.text or not elem.text.strip():
+ elem.text = i + " "
+ if not elem.tail or not elem.tail.strip():
+ elem.tail = i
+ for elem in elem:
+ indent(elem, level+1)
+ if not elem.tail or not elem.tail.strip():
+ elem.tail = i
+ else:
+ if level and (not elem.tail or not elem.tail.strip()):
+ elem.tail = i
+# END
+
+def path(path_elements):
+ '''
+ Concatenate a path from several elements.
+
+ >>> path([])
+ ''
+ >>> path(['a'])
+ 'a'
+ >>> path(['a','b'])
+ 'a/b'
+ >>> path(['a/','b'])
+ 'a/b'
+ >>> path(['/a/','b'])
+ '/a/b'
+ >>> path(['/a/','b/'])
+ '/a/b'
+ >>> path(['/a/','b/'])
+ '/a/b'
+ >>> path(['/a/','/b/'])
+ '/a/b'
+ >>> path(['/a/','/b','c/'])
+ '/a/b/c'
+ '''
+ pathname = ''
+
+ if type(path_elements) in StringTypes:
+ path_elements = [path_elements]
+
+ # Concatenate elements and seperate with /
+ for i in path_elements:
+ pathname += i + '/'
+
+ # Replace multiple consecutive slashes
+ pathname = re.compile('/+').sub('/', pathname)
+
+ # Remove the final / if there is one
+ if pathname and pathname[-1] == '/':
+ pathname = pathname[:-1]
+
+ return pathname
+
+
+if __name__ == '__main__':
+ import doctest
+ doctest.testmod(sys.modules[__name__])