#!/usr/bin/python2.4 from OpenSSL import SSL from OpenSSL import crypto import certgen import sys, os, socket, getopt, commands, time #import fcntl, select import logging, syslog from SecureXMLRPCClient import SecureXMLRPCClient import xmlrpclib import traceback from pprint import pprint bind_address = "localhost" bind_port = 9876 config_dir = "/etc/scire" sim = False debug = True verbose = False daemon = False poll_int = 10 # MATT: When job is received, store it in a spool directory (as XML?) # Implemented in scired.py (stub). # MATT: When the client starts up, it needs to read in any jobs from # the spool. (Potential for indexing here?) # Implemented in scired.py (stub). # MATT: When the job is finished executing, remove it from the spool. # Implemented in scired.py (stub). # MATT: Populate some kind of summary vector to send to the server so # the server won't resend jobs. The vector should consist of # hashes. # MATT: What is necessary to make these actions atomic? Is that even # possible? def display_traceback(): etype, value, tb = sys.exc_info() s = traceback.format_exception(etype, value, tb) for line in s: print line.strip() def run_jobs(client,jobs): # The rcodes dict will contain the results for the # execution of each job. It will be returned in its # entirety. # 0 = Successful # 1 = Job execution failed # 2 = Sanity check failed for jobid in jobs: job=client.get_job(jobid,True) if sim or debug or verbose: print "Job %s:" % job['jobid'] print job #for key in job.keys(): # if key != "script": # print ' %s: %s' % (key,job[key]) #print ' Script %s:' %job['script']['scriptid'] #for key in job['script'].keys(): # print ' %s: %s' % (key,job['script'][key]) #print '' if not sanity_check(job): rcodes[jobid] = 2 continue # Here is where we run the job. # Do we want to allow binary executables? If so, this might need # to be done a bit differently. # Here we don't use tmpfile because it would disappear too quickly. # The file needs to stick around long enough for us to run it, but # we also have to be confident that the file is finished. pid = os.getpid() scriptfile = '/tmp/'+`pid`+'.'+`job['jobid']` if debug: print 'scriptfile: %s' % scriptfile scriptfd = open(scriptfile,'w') scriptfd.writelines(job['script']['script_data']) scriptfd.close() os.chmod(scriptfile,0755) # Is this dangerous? rcode = False if sim: print 'Command not executed (simulation mode):\n%s' % job['script']['script_data'] else: (rcode,output) = run_job(client,job,scriptfile) print 'rcode: %s' % str(rcode) print 'success code is %s' % str(job['script']['success_code']) output = repr(output) #print 'Command output to return: %s' % output if rcode == 'ScireDepErr': success = 'Failed' client.job_return(job['jobid'],success,output) elif rcode == 'Aborted': success = 'Aborted' client.job_return(job['jobid'],success) elif int(rcode) == int(job['script']['success_code']): success = 'Succeeded' job['script']['return_output'] = 1; #FIXME don't hardcode hacks like this. fix the DB/UI if job['script']['return_output'] and (job['script']['return_output']== 1): client.job_return(job['jobid'],success,output) else: client.job_return(job['jobid'],success) # unspool_job should probably take jobid, not job #client.unspool_job(job) else: success = 'Failed' client.job_return(job['jobid'],success,output) jobs.remove(jobid) os.remove(scriptfile) return jobs # Here we execute the command and return the output from the command. # MATT: We still need to implement failure detection/return codes. def run_job(client,job,command): jobid = job['jobid'] if client.job_cancelled(jobid): return 'Aborted','Job aborted' deps = job['job_dependency'] if deps != 'None': deplist = () deplist = job['job_dependency'].split(',') for jobdep in deplist: d_status = client.get_jstatus(jobdep) print "Dependency for jobid %s: %s. Status: %s" % (jobid,jobdep,d_status) if not d_status == 'Succeeded': rmsg = "Dependency for jobid %s: %s. Status: %s" % (jobid,jobdep,d_status) return 'ScireDepErr',rmsg status,output = commands.getstatusoutput(''.join([command,' 2>&1'])) if debug: print 'Command Output:\n %s' % output if output: # MATT: Right now this is just an arbitrary file. # No reason not to use the value provided by the # database except for the fact that we will need to # change it to use syslog. Here's an idea: if # the logfile is defined in the database, write to # it, but also always write to syslog. # Also, we should timestamp the log entries and # deal with file existence. # self.conn = adodb.NewADOConnection(config.get("db","type")) logfile = job['script']['log_location'] #if logfile and os.path.isfile(logfile): if logfile: print 'Writing log to %s' % logfile logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', filename=logfile, filemode='a') logging.info('Jobid %s: %s', job['jobid'], job['description']) logging.info(output) syslog.syslog('Jobid %s: %s' % (job['jobid'], job['description'])) syslog.syslog(output) return os.WEXITSTATUS(status),output def sanity_check(job): return True def sys_info(): client_info = {} # defint = os.popen('/sbin/route|grep default').read().split()[7] default_interface = commands.getoutput(r"/sbin/route | awk '/^default/ { print $8 }'").strip() # cmdstr = '/sbin/ifconfig ' + defint + ' | grep HWaddr' # client_info['mac'] = os.popen(cmdstr).read().replace('\n','').split()[4] client_info['mac'] = commands.getoutput("/sbin/ifconfig " + default_interface + r" | awk '/HWaddr/ { print $5 }'").strip() #cmdstr = "/sbin/ifconfig " + defint + " | grep 'inet '" #client_info['ip'] = os.popen(cmdstr).read().replace('addr:','').split()[1] client_info['ip'] = commands.getoutput("/sbin/ifconfig " + default_interface + r" | sed -ne '/^[[:space:]]*inet addr:/{s///;s/[[:space:]]\+.*$//p}'").strip() try: client_info['hostname'] = socket.gethostbyaddr(client_info['ip'])[0] except: client_info['hostname'] = client_info['ip'] return client_info def verify_server_cert(conn, cert, errnum, depth, ok): if not os.path.isfile(config_dir + "/known_servers"): print "Recording server's cert digest in known_servers" known_servers_file = open(config_dir + "/known_servers", "w") known_servers_file.write(cert.digest("sha1") + "\n") known_servers_file.close() else: known_servers_file = open(config_dir + "/known_servers", "r") known_servers = [x.strip() for x in known_servers_file.readlines()] known_servers_file.close() if not cert.digest("sha1") in known_servers: return False return True def generate_cert_and_key(keytype="RSA", keylength=1024): # Generate server cert/key sys.stdout.write("Generating client certificate...") #certgen.certgen(keytype, keylength, "Scire Client", config_dir + '/client.key', config_dir + '/client.cert') certgen.certgen(keytype, keylength, socket.gethostname(), config_dir + '/client.key', config_dir + '/client.cert') print "done" def scirec_main(client,known_jobs): summary = client.gen_summary(known_jobs) print 'Summary: ' + str(summary) #print client.db_version() if verbose: print "Acquiring jobs..." jobs = client.get_jobs(summary,known_jobs) if jobs == []: print 'There are no jobs.' leftovers = [] elif not jobs: print 'Problem acquiring jobs. Quitting.' sys.exit(1) else: print 'Jobs: %s' %jobs leftovers = run_jobs(client,jobs) return leftovers if __name__ == "__main__": try: opts, args = getopt.getopt(sys.argv[1:], 'sdDvp:h:') except getopt.error, msg: print msg print """usage: python %s [-s] [-d] [-v] [-D] [-h host] [-p port] [-s] = Simulation mode. Doesn't actually run scripts. [-d] = Turn on debugging [-v] = Turn on verboseness [-D] = Run in daemon mode [-h host] = Set the host name/ip of the scire server [-p port] = Set the port to look for the scire server """ % sys.argv[0] sys.exit(2) for o, a in opts: if o == '-s': sim = True elif o == '-d': debug = True verbose = True elif o == '-D': daemon = True elif o == '-h': bind_address = a elif o == '-p': bind_port = int(a) elif o == '-v': verbose = True if verbose: print "Starting up...\nChecking for the certificate... ", # Check for public/private keypair and generate if they don't exist if not os.path.isfile(config_dir + "/client.key") or not os.path.isfile(config_dir + "/client.cert"): if verbose: print "No cert found. Generating.\n" generate_cert_and_key() try: if verbose: print "Loading the certificate...\n" certfh = open(config_dir + "/client.cert","r") except: print 'Could not load certificate' sys.exit(1) certbuf = certfh.read() certfh.close() client_cert = crypto.load_certificate(crypto.FILETYPE_PEM,certbuf) if verbose: print "Connecting to the server...\n" try: client = SecureXMLRPCClient(bind_address, bind_port, config_dir + "/client.cert", config_dir + "/client.key", verify_server_cert) except: print "ERROR: Couldn't connect to server!\n" display_traceback() if debug: print client_cert.digest("sha1") if verbose: print "Registering client... " client_status = client.register_client() if not client_status: try: client_info = sys_info() client.add_client(certbuf,client_info) print "Client added successfully. Status is pending. Will exit now.\n" except: print "ERROR: Could not add client\n" display_traceback() else: print "Client Status is: %s \n" % client_status summary = [] known_jobs = [] if daemon: if verbose: print "Running in daemon mode.\n" while True: known_jobs = scirec_main(client,known_jobs) time.sleep(poll_int) else: scirec_main(client,known_jobs)