aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'gkeys/gkeys/lib.py')
-rw-r--r--gkeys/gkeys/lib.py345
1 files changed, 345 insertions, 0 deletions
diff --git a/gkeys/gkeys/lib.py b/gkeys/gkeys/lib.py
new file mode 100644
index 0000000..50ed63e
--- /dev/null
+++ b/gkeys/gkeys/lib.py
@@ -0,0 +1,345 @@
+#
+#-*- coding:utf-8 -*-
+
+'''Gentoo-keys - lib.py
+This is gentoo-keys superclass which wraps the pyGPG lib
+with gentoo-keys specific convienience functions.
+
+ Distributed under the terms of the GNU General Public License v2
+
+ Copyright:
+ (c) 2011 Brian Dolbec
+ Distributed under the terms of the GNU General Public License v2
+
+ Author(s):
+ Brian Dolbec <dolsen@gentoo.org>
+
+'''
+
+# for py 2.6 compatibility
+from __future__ import print_function
+
+
+from os.path import abspath, pardir
+from os.path import join as pjoin
+
+from pyGPG.gpg import GPG
+from gkeys.checks import KeyChecks
+from gkeys.fileops import ensure_dirs
+from gkeys.log import logger
+from gkeys.seed import Seeds
+
+class GkeysGPG(GPG):
+ '''Gentoo-keys primary gpg class'''
+
+
+ def __init__(self, config, basedir):
+ '''class init function
+
+ @param config: GKeysConfig config instance to use
+ @param keydir: string, the path to the keydir to be used
+ for all operations.
+ '''
+ GPG.__init__(self, config, logger)
+ self.config = config
+ self.basedir = basedir
+ self.keydir = None
+ self.server = None
+
+
+ def set_keyserver(self, server=None):
+ '''Set the keyserver and add the --keyserver option to the gpg defaults
+ '''
+ if self.server and not server:
+ return
+ self.server = server or self.config['keyserver']
+ self.config.options['gpg_defaults'] = self.config.defaults['gpg_defaults'][:]
+ logger.debug("keyserver: %s" % (self.server))
+ server_value = ['--keyserver', self.server]
+ self.config.options['gpg_defaults'].extend(server_value)
+ logger.debug("self.config.options['gpg_defaults']: %s"
+ % (self.config.options['gpg_defaults']))
+ return
+
+
+ def set_keyring(self, keyring, task, importkey=False, reset=True):
+ '''Sets the keyring to use as well as related task options
+ '''
+ logger.debug("keydir: %s, keyring: %s" % (self.keydir, keyring))
+ if reset:
+ self.config.options['tasks'][task] = self.config.defaults['tasks'][task][:]
+ # --keyring file | Note that this adds a keyring to the current list.
+ # If the intent is to use the specified keyring alone,
+ # use --keyring along with --no-default-keyring.
+ if importkey:
+ task_value = ['--import-options', 'import-clean']
+ self.config.options['tasks'][task].extend(task_value)
+ parent_dir = abspath(pjoin(keyring, pardir))
+ ensure_dirs(parent_dir,
+ mode=int(self.config.get_key('permissions', 'directories'),0))
+ task_value = ['--no-default-keyring', '--keyring', keyring]
+ self.config.options['tasks'][task].extend(task_value)
+ logger.debug("set_keyring: New task options: %s" %str(self.config.options['tasks'][task]))
+ return
+
+
+ def set_keydir(self, keydir, task, fingerprint=True, reset=True):
+ logger.debug("basedir: %s, keydir: %s" % (self.basedir, keydir))
+ self.keydir = pjoin(self.basedir, keydir)
+ self.task = task
+ if reset:
+ self.config.options['tasks'][task] = self.config.defaults['tasks'][task][:]
+ task_value = []
+ if fingerprint:
+ task_value.append('--fingerprint')
+ task_value.extend(['--homedir', self.keydir])
+ self.config.options['tasks'][task].extend(task_value)
+ logger.debug("set_keydir: New task options: %s" %str(self.config.options['tasks'][task]))
+ return
+
+
+ def add_to_keyring(self, gkey, keydir, keyring):
+ '''Add the specified key to the specified keyring
+
+ @param gkey: GKEY namedtuple with
+ (name, keyid/longkeyid, keydir, fingerprint)
+ @param keydir: path with the specified keydir
+ @param keyring: string with the specified keyring
+ '''
+ self.set_keydir(keydir, 'import', reset=True)
+ self.set_keyring(keyring, 'import', importkey=True, reset=False)
+ results = []
+ logger.debug("LIB: import_to_keyring; name: " + gkey.name)
+ logger.debug("** Calling runGPG with Running: gpg %s --import' for: %s"
+ % (' '.join(self.config.get_key('tasks', 'import')),
+ gkey.name))
+ pubring_path = pjoin(self.keydir, gkey.keydir, 'pubring.gpg')
+ result = self.runGPG(task='import', inputfile=pubring_path)
+ logger.info('GPG return code: ' + str(result.returncode))
+ results.append(result)
+ print(result.stderr_out)
+ return results
+
+
+ def add_key(self, gkey):
+ '''Add the specified key to the specified keydir
+
+ @param gkey: GKEY namedtuple with
+ (name, nick, keydir, fingerprint)
+ '''
+ self.config.defaults['gpg_defaults'].append('--no-permission-warning')
+ self.set_keyserver()
+ self.set_keydir(gkey.keydir, 'recv-keys', reset=True)
+ self.set_keyring('pubring.gpg', 'recv-keys', reset=False)
+ logger.debug("LIB: add_key; ensure dirs: " + self.keydir)
+ ensure_dirs(str(self.keydir))
+ self.set_keyseedfile(trap_errors=False)
+ results = []
+ for fingerprint in gkey.fingerprint:
+ logger.debug("LIB: add_key; adding fingerprint " + fingerprint)
+ logger.debug("** Calling runGPG with Running 'gpg %s --recv-keys %s' for: %s"
+ % (' '.join(self.config.get_key('tasks', 'recv-keys')),
+ fingerprint, gkey.name))
+ result = self.runGPG(task='recv-keys', inputfile=fingerprint)
+ logger.info('GPG return code: ' + str(result.returncode))
+ if result.fingerprint in gkey.fingerprint:
+ result.failed = False
+ message = "Fingerprints match... Import successful: "
+ message += "%s, fingerprint: %s" % (gkey.nick, fingerprint)
+ message += "\n result len: %s, %s" % (len(result.fingerprint), result.fingerprint)
+ message += "\n gkey len: %s, %s" % (len(gkey.fingerprint[0]), gkey.fingerprint[0])
+ logger.info(message)
+ else:
+ result.failed = True
+ message = "Fingerprints do not match... Import failed for "
+ message += "%s, fingerprint: %s" % (gkey.nick, fingerprint)
+ message += "\n result: %s" % (result.fingerprint)
+ message += "\n gkey..: %s" % (str(gkey.fingerprint))
+ logger.error(message)
+ # Save the gkey seed to the installed db
+ self.seedfile.update(gkey)
+ if not self.seedfile.save():
+ logger.error("GkeysGPG.add_key(); failed to save seed: " + gkey.nick)
+ return []
+ results.append(result)
+ return results
+
+
+ def del_key(self, gkey, keydir):
+ '''Delete the specified key in the specified keydir
+
+ @param gkey: GKEY namedtuple with (name, nick, keydir, fingerprint)
+ '''
+ return []
+
+
+ def del_keydir(self, keydir):
+ '''Delete the specified keydir
+ '''
+ return []
+
+
+ def refresh_key(self, gkey):
+ '''Refresh the specified key in the specified keydir
+
+ @param key: tuple of (name, nick, keydir, fingerprint)
+ @param keydir: the keydir to add the key to
+ '''
+ self.config.defaults['gpg_defaults'].append('--no-permission-warning')
+ self.set_keyserver()
+ self.set_keydir(gkey.keydir, 'refresh-keys', reset=True)
+ self.set_keyring('pubring.gpg', 'refresh-keys', reset=False)
+ logger.debug("LIB: refresh_key, gkey: %s" % str(gkey))
+ logger.debug("** Calling runGPG with Running 'gpg %s --refresh-keys' for: %s"
+ % (' '.join(self.config.get_key('tasks', 'refresh-keys')), str(gkey)))
+ result = self.runGPG(task='refresh-keys', inputfile='')
+ logger.info('GPG return code: ' + str(result.returncode))
+ return result
+
+
+ def update_key(self, gkey, keydir):
+ '''Update the specified key in the specified keydir
+
+ @param key: tuple of (name, nick, keydir, fingerprint)
+ @param keydir: the keydir to add the key to
+ '''
+ return []
+
+
+ def list_keys(self, keydir, fingerprint=None, colons=False):
+ '''List all keys in the specified keydir or
+ all keys in all keydir if keydir=None
+
+ @param keydir: the keydir to list the keys for
+ @param colons: bool to enable colon listing
+ '''
+ if not keydir:
+ logger.debug("LIB: list_keys(), invalid keydir parameter: %s"
+ % str(keydir))
+ return []
+ if fingerprint:
+ task = 'list-key'
+ target = fingerprint
+ else:
+ task = 'list-keys'
+ target = keydir
+ self.set_keydir(keydir, task, fingerprint=True)
+ self.config.options['tasks'][task].extend(['--keyid-format', 'long', '--with-fingerprint'])
+ if colons:
+ task_value = ['--with-colons']
+ self.config.options['tasks'][task].extend(task_value)
+ logger.debug("** Calling runGPG with Running 'gpg %s --%s %s'"
+ % (' '.join(self.config['tasks'][task]), task, keydir)
+ )
+ result = self.runGPG(task=task, inputfile=target)
+ logger.info('GPG return code: ' + str(result.returncode))
+ return result
+
+
+ def check_keys(self, keydir, keyid, result=None):
+ '''Check specified or all keys based on the seed type
+
+ @param keydir: the keydir to list the keys for
+ @param keyid: the keyid to check
+ @param result: optional pyGPG.output.GPGResult object
+ @returns: GKEY_CHECK instance
+ '''
+ if not result:
+ result = self.list_keys(keydir, fingerprint=keyid, colons=True)
+ checker = KeyChecks(logger, qualified_id_check=True)
+ return checker.validity_checks(keydir, keyid, result)
+
+
+ def speccheck(self, keydir, keyid, result=None):
+ '''Check specified or all keys based on the seed type
+ specifications are met.
+
+ @param keydir: the keydir to list the keys for
+ @param keyid: the keyid to check
+ @param result: optional pyGPG.output.GPGResult object
+ @returns: SpecCheck instance
+ '''
+ if not result:
+ result = self.list_keys(keydir, fingerprint=keyid, colons=True)
+ checker = KeyChecks(logger, qualified_id_check=True)
+ specchecks = checker.spec_check(keydir, keyid, result)
+ return specchecks
+
+
+ def list_keydirs(self):
+ '''List all available keydirs
+ '''
+ return []
+
+
+ def verify_key(self, gkey):
+ '''Verify the specified key from the specified keydir
+
+ @param gkey: GKEY namedtuple with (name, keyid/longkeyid, fingerprint)
+ '''
+ pass
+
+
+ def verify_text(self, text):
+ '''Verify a text block in memory
+ '''
+ pass
+
+
+ def verify_file(self, gkey, signature, filepath):
+ '''Verify the file specified at filepath or url
+
+ @param gkey: GKEY instance of the gpg key used to verify it
+ @param signature: string with the signature file
+ @param filepath: string with the path or url of the signed file
+ '''
+ if signature:
+ self.set_keydir(gkey.keydir, 'verify', reset=True)
+ logger.debug("** Calling runGPG with Running 'gpg %s --verify %s and %s'"
+ % (' '.join(self.config['tasks']['verify']), signature, filepath))
+ results = self.runGPG(task='verify', inputfile=[signature,filepath])
+ else:
+ self.set_keydir(gkey.keydir, 'decrypt', reset=True)
+ logger.debug("** Calling runGPG with Running 'gpg %s --decrypt %s and %s'"
+ % (' '.join(self.config['tasks']['decrypt']), filepath))
+ results = self.runGPG(task='decrypt', inputfile=filepath)
+ keyid = gkey.keyid[0]
+ if results.verified[0]:
+ logger.info("GPG verification succeeded. Name: %s / Key: %s" % (gkey.name, keyid))
+ logger.info("\tSignature result:" + str(results.verified))
+ else:
+ logger.debug("GPG verification failed. Name: %s / Key: %s" % (gkey.name, keyid))
+ logger.debug("\t Signature result:"+ str(results.verified))
+ logger.debug("LIB: verify; stderr_out:" + str(results.stderr_out))
+ return results
+
+
+ def set_keyseedfile(self, trap_errors):
+ if not self.keydir:
+ logger.debug("GkeysGPG.set_keyseedfile(); self.keydir error")
+ self.seedfile = Seeds(pjoin(self.keydir, 'gkey.seeds'), self.config)
+ self.seedfile.load(trap_errors=trap_errors)
+
+
+ def sign_file(self, gkey, mode, fingerprint, filepath):
+ '''Verify the file specified at filepath or url
+
+ @param gkey: GKEY instance
+ @param mode: string of the signing task to use
+ @param fingerprint: string of the fingerprint to sign with
+ @param filepath: string with the path of the file to sign
+ '''
+ keyid = gkey.keyid[0]
+ self.set_keydir(gkey.keydir, mode, reset=True)
+ logger.debug("** Calling runGPG with Running 'gpg %s --%s %s %s'"
+ % (' '.join(self.config['tasks'][mode]), mode, fingerprint, filepath))
+ results = self.runGPG(task=mode, inputfile=filepath)
+
+ if results.verified[0]:
+ logger.info("GPG signing succeeded. Name: %s / Key: %s" % (str(gkey.name), str(keyid)))
+ logger.info("\tSignature result:" + str(results.verified))
+ else:
+ logger.debug("GPG signing failed. Name: %s / Key: %s" % (str(gkey.name), str(keyid)))
+ logger.debug("\t Signature result:"+ str(results.verified))
+ logger.debug("LIB: sign; stderr_out:" + str(results.stderr_out))
+ return results