""" Binary_c-python's condor functions """ import os import datasize import lib_programname import multiprocessing import os import pathlib import signal import stat import subprocess import sys import time class condor(): def __init__(self, **kwargs): # don't do anything: we just inherit from this class return def condorID(self,ClusterID=None,Process=None): """ Function to return a Condor job ID. The ClusterID and Process passed in are used if given, otherwise we default to the condor_ClusterID and condor_Process in grid_options. """ if ClusterID is None: ClusterID= self.grid_options['condor_ClusterID'] if Process is None: Process = self.grid_options['condor_Process'] return "{ClusterID}.{Process}".format(ClusterID=ClusterID, Process=Process) def condorpath(self,path): """ Function to return the full condor directory path. """ return os.path.abspath(os.path.join(self.grid_options['condor_dir'],path)) def condor_status_file(self, ClusterID=None, Process=None): """ Return the condor status file corresponding to the ClusterID and Process, which default to grid_options condor_ClusterID and condor_Process, respectively. """ return os.path.join(self.condorpath('status'), self.condorID(ClusterID,Process)) def condor_check_requirements(self): """ Function to check whether the condor parameters in grid_options have been set appropriately. """ if self.grid_options['condor'] > 0 and \ self.grid_options['condor_dir'] is None: return (False, "You have set condor={condor}",self.grid_options['condor'],"but not set condor_dir (which is {condor_dir}). Please set it and try again.".format( condor=self.grid_options['condor'], condor_dir=self.grid_options['condor_dir'] )) else: return (True,"") def condor_dirs(self): """ Directories associated specifically with this condor job. """ return ['condor_dir'] def set_condor_status(self,string): """ Set the condor status corresponing to the self object, which should have condor_ClusterID and condor_Process set. """ # save condor ClusterID to file idfile = os.path.join(self.grid_options["condor_dir"], "ClusterID") if not os.path.exists(idfile): with open(idfile,"w",encoding='utf-8') as fClusterID: fClusterID.write("{ClusterID}\n".format(ClusterID=self.grid_options['condor_ClusterID'])) fClusterID.close() # save condor status file = self.condor_status_file() if file: with open(file,'w',encoding='utf-8') as f: f.write(string) f.close() return def get_condor_status(self, ClusterID=None, Process=None): """ Get and return the condor status corresponing to the self object, or ClusterID.Process if they are passed in. If no status is found, returns an empty string.. """ if ClusterID is None: ClusterID = self.grid_options['condor_ClusterID'] if Process is None: Process = self.grid_options['condor_Process'] if ClusterID is None or Process is None : return None try: path = pathlib.Path(self.condor_status_file(ClusterID=ClusterID, Process=Process)) if path: return path.read_text().strip() else: return "" except: return "" def condor_outfile(self): """ return a standard filename for the condor chunk files """ file = "{id}.gz".format( id = self.condorID() ) return os.path.abspath(os.path.join(self.grid_options['condor_dir'], 'results', file)) def make_condor_dirs(self): # make the condor directories if not self.grid_options['condor_dir']: print("You must set self.grid_options['condor_dir'] to a directory which we can use to set up binary_c-python's Condor files. This should be unique to your set of grids.") os.exit() # make a list of directories, these contain the various condor # output, status files, etc. dirs = [] for dir in ['stdout','stderr','log','results','status','snapshots']: dirs.append(self.condorpath(dir)) # make the directories: we do not allow these to already exist # as the condor directory should be a fresh location for each set of jobs for dir in dirs: try: pathlib.Path(self.condorpath(dir)).mkdir(exist_ok=False, parents=True) except: print("Tried to make the directory {dir} but it already exists. When you launch a set of binary_c jobs on Condor, you need to set your condor_dir to be a fresh directory with no contents.".format(dir=dir)) self.exit(code=1) # check that they have been made and exist: we need this # because on network mounts (NFS) there's often a delay between the mkdir # above and the actual directory being made. This shouldn't be too long... fail = True count = 0 count_warn = 10 while fail is True: fail = False count += 1 if count > count_warn: print("Warning: Have been waiting about {} seconds for Condor directories to be made, there seems to be significant delay...".format(count)) for dir in dirs: if os.path.isdir(dir) is False: fail = True time.sleep(1) break # break the "for dir in dirs:" def condor_grid(self): """ function to be called when running grids when grid_options['condor']>=1 if grid_options['condor']==1, we set up the condor script and launch the jobs, then return True to exit. if grid_options['condor']==2, we run the stars, which means we return False to continue. if grid_options['condor']==3, we are being called from the jobs to run the grids, return False to continue. """ if self.grid_options['condor'] == 2: # run a grid of stars only, leaving the results # in the appropriate outfile return False elif self.grid_options['condor'] == 3: # joining : set the evolution type to "join" and return # False to continue self.grid_options['evolution_type'] = 'join' return False elif self.grid_options['condor'] == 1: # if condor=1, we should have no evolution type, we # set up the Condor scripts and get them evolving # in a Condor queue self.grid_options['evolution_type'] = None # make dirs self.make_condor_dirs() # check we're not using too much RAM if datasize.DataSize(self.grid_options['condor_memory']) > datasize.DataSize(self.grid_options['condor_warn_max_memory']): print("WARNING: you want to use {} MB of RAM : this is unlikely to be correct. If you believe it is, set condor_warn_max_memory to something very large (it is currently {} MB)\n".format( self.grid_options['condor_memory'], self.grid_options['condor_warn_max_memory'])) self.exit(code=1) # get job id (might be passed in) ClusterID = self.grid_options['condor_ClusterID'] if self.grid_options['condor_ClusterID'] != "" else '$ClusterID' # get job array index Process = self.grid_options['condor_Process'] if self.grid_options['condor_Process'] != "" else '$Process' if self.grid_options['condor_njobs'] == 0: print("binary_c-python Condor : You must set grid_option condor_njobs to be non-zero") self.exit(code=1) # find the path to the Python script that we are running pyscriptpath = str(lib_programname.get_path_executed_script()) # set the condor initial dir to be our current working directory if not self.grid_options['condor_initial_dir']: self.grid_options['condor_initial_dir'] = os.getcwd() # build the grid command grid_command = [ os.path.join("/usr","bin","env"), sys.executable, pyscriptpath, ] + sys.argv[1:] + [ 'start_at=$Process', # Process is 0,1,2... which is what we want 'modulo=' + str(self.grid_options['condor_njobs']), 'condor_njobs=' + str(self.grid_options['condor_njobs']), 'condor_dir=' + self.grid_options['condor_dir'], 'verbosity=' + str(self.grid_options['verbosity']), 'num_cores=' + str(self.grid_options['num_processes']) ] grid_command = ' '.join(grid_command) # make condor script paths submit_script_path = self.condorpath('condor_submit_script') job_script_path = self.condorpath('condor_job_script') # open the files try: submit_script = open(submit_script_path,'w',encoding='utf-8') except IOError: print("Could not open Condor script at {path} for writing: please check you have set {condor_dir} correctly (it is currently {condor_dir} and can write to this directory.".format( path=submit_script_path, condor_dir = self.grid_options['condor_dir'])) try: job_script = open(job_script_path,'w',encoding='utf-8') except IOError: print("Could not open Condor script at {path} for writing: please check you have set {condor_dir} correctly (it is currently {condor_dir} and can write to this directory.".format( path=job_script_path, condor_dir = self.grid_options['condor_dir'])) ############################################################ # The condor job script calls your binary_c-pthyon script ############################################################ condor_job_script = """#!{bash} echo "Condor Job Args: $@" # first two arguments are ClusterID and Process export ClusterID=$1 export Process=$2 shift 2 echo "Job ClusterID $ClusterID Process $Process" # Set binary_c startup conditions export BINARY_C_PYTHON_ORIGINAL_CMD_LINE={cmdline} export BINARY_C_PYTHON_ORIGINAL_WD=`{pwd}` export BINARY_C_PYTHON_ORIGINAL_SUBMISSION_TIME=`{date}` # set status to \"running\" echo \"running\" > {condor_dir}/status/$ClusterID.$ProcessID # make list of files which is checked for joining echo {condor_dir}/results/$ClusterID.$Process.gz >> {condor_dir}/results/$ClusterID.all # run grid of stars and, if this returns 0, set status to finished {grid_command} condor=2 evolution_type=grid condor_ClusterID=$ClusterID condor_Process=$Process save_population_object={condor_dir}/results/$ClusterID.$Process.gz && echo -n \"finished\" > {condor_dir}/status/$ClusterID.$ProcessID && echo """.format( bash=self.grid_options['condor_bash'], date=self.grid_options['condor_date'], pwd=self.grid_options['condor_pwd'], cmdline=repr(self.grid_options['command_line']), grid_command=grid_command, condor_dir=self.grid_options['condor_dir'], ) if not self.grid_options['condor_postpone_join']: joinfile = "{condor_dir}/results/$ClusterID.all".format( condor_dir=self.grid_options['condor_dir'], ClusterID=ClusterID ) condor_job_script += """&& echo \"Checking if we can join...\" && echo && {grid_command} condor=3 evolution_type=join joinlist={joinfile} condor_ClusterID=$ClusterID condor_Process=$Process """.format( bash = self.grid_options['condor_bash'], grid_command=grid_command, joinfile=joinfile, ) ############################################################ # The Condor submit script is sent to condor_submit # In here we know $(Cluster) and $(Process) which identify # each job ############################################################ extra_settings = "" if self.grid_options['condor_extra_settings']: for key in self.grid_options['condor_extra_settings']: extra_settings += "{key} = {value}\n".format( key=key, value=self.grid_options['condor_extra_settings'][key] ) jobid = '$(Cluster).$(Process)' condor_submit_script = """ executable = {usr_bin_env} arguments = {bash} {job_script_path} $(Cluster) $(Process) universe = {universe} getenv = {getenv} initial_dir = {initial_dir} output = {outfile} error = {errfile} log = {logfile} stream_output = {stream_output} stream_error = {stream_error} request_memory = {request_memory} request_cpus = {request_cpus} should_transfer_files = {should_transfer_files} when_to_transfer_output = {when_to_transfer_output} requirements = {requirements} JobBatchName = {batchname} {extra_settings} queue {njobs} """.format( usr_bin_env = self.grid_options['condor_usr_bin_env'], bash = self.grid_options['condor_bash'], job_script_path = job_script_path, universe = self.grid_options['condor_universe'], getenv = self.grid_options['condor_getenv'], initial_dir = self.grid_options['condor_initial_dir'], outfile = os.path.abspath(os.path.join(self.grid_options['condor_dir'], 'stdout', jobid)), errfile = os.path.abspath(os.path.join(self.grid_options['condor_dir'], 'stderr', jobid)), logfile = os.path.abspath(os.path.join(self.grid_options['condor_dir'], 'log', jobid)), stream_output = self.grid_options['condor_stream_output'], stream_error = self.grid_options['condor_stream_error'], request_memory = self.grid_options['condor_memory'], request_cpus = self.grid_options['num_processes'], should_transfer_files = self.grid_options['condor_should_transfer_files'], when_to_transfer_output = self.grid_options['when_to_transfer_output'], requirements = self.grid_options['condor_requirements'], batchname = self.grid_options['condor_batchname'], extra_settings = extra_settings, njobs = self.grid_options['condor_njobs'], ) # write the scripts, close them and make them executable by # all (so the condor user can pick it up) for file,contents in [(submit_script,condor_submit_script), (job_script,condor_job_script)]: path = file.name file.writelines(contents) file.close() os.chmod(path, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC | \ stat.S_IRGRP | stat.S_IXGRP | \ stat.S_IROTH | stat.S_IXOTH) if not self.grid_options['condor_postpone_submit']: # call sbatch to launch the condor jobs cmd = [self.grid_options['condor_submit'], submit_script_path] pipes = subprocess.Popen(cmd, stdout = subprocess.PIPE, stderr = subprocess.PIPE) std_out, std_err = pipes.communicate() if pipes.returncode != 0: # an error happened! err_msg = "{red}{err}\nReturn Code: {code}{reset}".format(err=std_err.strip(), code=pipes.returncode, red=self.ANSI_colours["red"], reset=self.ANSI_colours["reset"],) raise Exception(err_msg) elif len(std_err): print("{red}{err}{reset}".format(red=self.ANSI_colours["red"], reset=self.ANSI_colours["reset"], err=std_err.strip().decode('utf-8'))) print("{yellow}{out}{reset}".format(yellow=self.ANSI_colours["yellow"], reset=self.ANSI_colours["reset"], out=std_out.strip().decode('utf-8'))) else: # just say we would have (use this for testing) print("Condor script is at {path} but has not been launched".format(path=submit_script_path)) # some messages to the user, then return if self.grid_options['condor_postpone_submit'] == 1: print("Condor script written, to {path}, but launching the jobs with sbatch was postponed.".format(path=submit_script_path)) else: print("Condor jobs launched.") print("All done in condor_grid().") # return True so we exit immediately return True