import re import os import sys from base64 import b64encode from urllib.parse import urlencode import requests GLSAMAKER_URI = 'https://glsamaker.gentoo.org' BGO_URI = 'https://bugs.gentoo.org' class CVETool: """ Interface to GLSAMaker's CVETool """ CVEPlaceholderText = ( "** RESERVED ** This candidate has been reserved by an " "organization or individual that will use it when announcing a " "new security problem. When the candidate has been publicized, " "the details for this candidate will be provided." ) class NotFoundError(RuntimeError): pass def __init__(self, auth, command, args): self.auth = auth if command == 'info': if len(args) != 1: print('Usage: info ') print('Retrieves information about a CVE from database') sys.exit(1) try: self.info(self.cleanup_cve(args[0])) except ValueError: print('"{}" is not a valid CVE identifier!'.format(args[0])) sys.exit(1) elif command == 'assign': if len(args) < 2: print('Usage: assign [...]') print('Assigns a set of CVEs to a bug') sys.exit(1) self.assign(args[0], [self.cleanup_cve(cve) for cve in args[1:]]) elif command == 'getcveidlist': if len(args) < 1: print('Usage: getcveidlist [...]') print('Returns a list of the real CVE IDs') sys.exit(1) self.getcveidlist([self.cleanup_cve(cve) for cve in args]) elif command == 'new': if len(args) != 1: print('Usage: new ') print('Adds a new CVE to database with placeholder text') sys.exit(1) try: self.new(self.cleanup_cve(args[0])) except ValueError: print('"{}" is not a valid CVE identifier!'.format(args[0])) sys.exit(1) elif command == 'nfu': if len(args) != 1: print('Usage: nfu ') print('Marks a CVE as not-for-us') sys.exit(1) self.nfu(self.cleanup_cve(args[0])) elif command == 'pw': if len(args) != 2: print('Usage: pw ') print('Generates a base64-encoded credential for storing') sys.exit(1) self.pw(args[0], args[1]) elif command == 'dobug': if len(args) != 1: print('Usage: dobug ') print('Adds and assigns a bug\'s CVEs') sys.exit(1) self.dobug(args[0]) else: self.usage(sys.argv[0]) sys.exit(1) def info(self, cve): try: data = self.json_request('/cve/info/' + cve + '.json') except self.NotFoundError: print('{} not found in Gentoo\'s CVE database!'.format(cve)) sys.exit(0) if data['published_at'] is not None: published = data['published_at'] # if data['published_at'] is not None else "Not yet published")) else: published = "Not yet published" bugs = ' , '.join(['https://bugs.gentoo.org/' + str(bug) for bug in data['bugs']]) print(' CVE ID: ' + data['cve_id'] + ' (#' + str(data['id']) + ')') print(' Summary: ' + data['summary']) print(' Published: ' + published) print('-' * 80) print(' State: ' + data['state']) print(' Bugs: ' + bugs) def getcveidlist(self, cves): cve_ids = [self.get_internal_cve_id(cve) for cve in cves] print('CVE IDs: cves=' + ','.join([str(c) for c in cve_ids])) def assign(self, bug, cves): cve_ids = [self.get_internal_cve_id(cve) for cve in cves] response = self.request('/cve/assign/?bug=' + str(bug) + '&cves=' + ','.join([str(c) for c in cve_ids])) if (response == 'ok'): print('Assigned bug {} to {}'.format(str(bug), ', '.join(cves))) else: print('Assigning likely failed: ' + response) sys.exit(1) def new(self, cve): query_string = urlencode({'cve_id': cve, 'summary': self.CVEPlaceholderText}) try: response = self.request('/cve/new/?' + str(query_string), 'POST') except RuntimeError: try: data = self.json_request('/cve/info/' + cve + '.json') print('Adding CVE "{}" to database failed: CVE already exists!'.format(cve)) sys.exit(0) except self.NotFoundError: print('Adding CVE "{}" to database failed for unknown reason:'.format(cve)) raise if response == 'ok': print('New CVE "{}" added to database'.format(cve)) else: # Should never get here because HTTP API currently returns # HTTP code 500 which triggers a RuntimeError in request # function print('Adding CVE "{}" to database failed: '.format(cve) + response) sys.exit(1) def nfu(self, cve): cve_id = self.get_internal_cve_id(cve) response = self.request('/cve/nfu/?cves=' + str(cve_id) + '&reason=') if response == 'ok': print('Marked {} as NFU'.format(cve)) else: print('Assigning likely failed: ' + response) sys.exit(1) def usage(self, programname): """ Print usage information """ print('Usage: {} [args]'.format(programname)) print('CLI for CVETool.') def pw(self, user, password): print(b64encode(bytes(user + ':' + password, 'utf-8')).decode('ascii')) def dobug(self, bug): cves = [] data = self.request('/rest/bug/' + bug, jsondata=True, bgo=True) for alias in data['bugs'][0]['alias']: # If we were able to catch a sane error in new() to # reflect a duplicate CVE we wouldn't have to check if the # CVE exists beforehand with cve_exists, but it seems the # only other way is to try to do two requests and check if # one fails and the other succeeds (as new() currently # works). By doing it this way though, we actually reduce # the number of requests by not making useless requests in # new(). if self.is_cve(alias) and not self.cve_exists(alias): self.new(alias) cves.append(alias) # We can do assignment in one request, so do it if cves: self.assign(bug, cves) def cve_exists(self, cve): try: return self.request('/cve/info/' + cve, method='HEAD') == '' except RuntimeError: return False @staticmethod def is_cve(string): regex = re.compile('^(CVE-)?\d{4}-\d{4,}$') return regex.match(string) def get_internal_cve_id(self, cve): """ Resolves a CVE id to the internal databse ID """ return self.json_request('/cve/info/' + cve + '.json')['id'] def json_request(self, uri, method='GET'): return self.request(uri, method, jsondata=True) def cleanup_cve(self, string): if not self.is_cve(string): raise ValueError('Cannot parse CVE: ' + string) if not string.startswith('CVE-'): return 'CVE-' + string else: return string def request(self, uri, method='GET', jsondata=False, bgo=False): if bgo: full_uri = BGO_URI + uri else: full_uri = GLSAMAKER_URI + uri if method == 'GET': response = \ requests.get(full_uri, headers={'Authorization': 'Basic ' + self.auth}) elif method == 'POST': response = \ requests.post(full_uri, headers={'Authorization': 'Basic ' + self.auth}) elif method == 'HEAD': response = \ requests.head(full_uri, headers={'Authorization': 'Basic ' + self.auth}) status = response.status_code if status == 404: raise self.NotFoundError(full_uri + ': ' + str(status)) if not response.ok: raise RuntimeError(full_uri + ': ' + str(status)) if jsondata: return response.json() return response.text def cvetool(): if len(sys.argv) == 1: CVETool(None, 'usage', sys.argv[2:]) auth = None authpath = os.path.join(os.path.expanduser('~'), '.config', 'cvetool_auth') if 'CVETOOL_AUTH' in os.environ: auth = os.environ['CVETOOL_AUTH'] elif os.path.isfile(authpath): with open(authpath, 'r') as authfile: auth = authfile.readlines()[0] elif 'CVETOOL_AUTH' not in os.environ and not sys.argv[1] == 'pw': print('CVETOOL_AUTH environment variable missing. Generate its contents with the pw subcommand.') sys.exit(1) CVETool(auth, sys.argv[1], sys.argv[2:])