diff --git a/binarycpython/utils/condor.py b/binarycpython/utils/condor.py index cd4c2d69bd8888a4420bd4bd27e8e402db888cbe..c8bf626ad5ac864fd2dc3e235289d2da063ebbf3 100644 --- a/binarycpython/utils/condor.py +++ b/binarycpython/utils/condor.py @@ -2,22 +2,34 @@ Binary_c-python's condor functions """ +import os +import datasize +import lib_programname +import multiprocessing +import os +import pathlib +import signal +import stat +import subprocess +import sys +import time + class condor(): def __init__(self, **kwargs): # don't do anything: we just inherit from this class return - def condorID(self,clusterID=None,ProcID=None): + def condorID(self,ClusterID=None,Process=None): """ - Function to return a Condor job ID. The ClusterID and ProcID passed in are used if given, otherwise we default to the condor_ClusterID and condor_ProcID in grid_options. + Function to return a Condor job ID. The ClusterID and Process passed in are used if given, otherwise we default to the condor_ClusterID and condor_Process in grid_options. """ if ClusterID is None: ClusterID= self.grid_options['condor_ClusterID'] - if ProcID is None: - ProcID = self.grid_options['condor_ProcID'] - return "{ClusterID}.{ProcID}".format(ClusterID=ClusterID, - ProcID=ProcID) + if Process is None: + Process = self.grid_options['condor_Process'] + return "{ClusterID}.{Process}".format(ClusterID=ClusterID, + Process=Process) def condorpath(self,path): """ @@ -27,12 +39,12 @@ class condor(): def condor_status_file(self, ClusterID=None, - ProcID=None): + Process=None): """ - Return the condor status file corresponding to the ClusterID and ProcID, which default to grid_options condor_ClusterID and condor_ProcID, respectively. + Return the condor status file corresponding to the ClusterID and Process, which default to grid_options condor_ClusterID and condor_Process, respectively. """ return os.path.join(self.condorpath('status'), - self.condorID(ClusterID,ProcID)) + self.condorID(ClusterID,Process)) def condor_check_requirements(self): """ @@ -56,7 +68,7 @@ class condor(): def set_condor_status(self,string): """ - Set the condor status corresponing to the self object, which should have condor_ClusterID and condor_ProcID set. + Set the condor status corresponing to the self object, which should have condor_ClusterID and condor_Process set. """ # save condor ClusterID to file idfile = os.path.join(self.grid_options["condor_dir"], @@ -76,21 +88,21 @@ class condor(): def get_condor_status(self, ClusterID=None, - ProcID=None): + Process=None): """ - Get and return the condor status corresponing to the self object, or ClusterID.ProcID if they are passed in. If no status is found, returns an empty string.. + Get and return the condor status corresponing to the self object, or ClusterID.Process if they are passed in. If no status is found, returns an empty string.. """ if ClusterID is None: ClusterID = self.grid_options['condor_ClusterID'] - if ProcID is None: - ProcID = self.grid_options['condor_ProcID'] + if Process is None: + Process = self.grid_options['condor_Process'] - if ClusterID is None or ProcID is None : + if ClusterID is None or Process is None : return None try: path = pathlib.Path(self.condor_status_file(ClusterID=ClusterID, - ProcID=ProcID)) + Process=Process)) if path: return path.read_text().strip() else: @@ -127,7 +139,7 @@ class condor(): for dir in dirs: try: pathlib.Path(self.condorpath(dir)).mkdir(exist_ok=False, - parents=True) + parents=True) except: print("Tried to make the directory {dir} but it already exists. When you launch a set of binary_c jobs on Condor, you need to set your condor_dir to be a fresh directory with no contents.".format(dir=dir)) self.exit(code=1) @@ -153,29 +165,29 @@ class condor(): """ function to be called when running grids when grid_options['condor']>=1 - if grid_options['condor']==1, we set up the condor script and launch the jobs, then exit. - if grid_options['condor']==2, we are being called from the jobs to run the grids + if grid_options['condor']==1, we set up the condor script and launch the jobs, then return True to exit. + if grid_options['condor']==2, we run the stars, which means we return False to continue. + if grid_options['condor']==3, we are being called from the jobs to run the grids, return False to continue. """ - # if condor=1, we should have no evolution type, we just - # set up a load of condor scripts and get them evolving - # in condor array - if self.grid_options['condor'] == 1: - self.grid_options['evolution_type'] = None - - if self.grid_options['evolution_type'] == 'grid': + if self.grid_options['condor'] == 2: # run a grid of stars only, leaving the results - # in a file - - # set output file - self.grid_options['save_population_object'] = condor_outfile() + # in the appropriate outfile + return False + + elif self.grid_options['condor'] == 3: + # joining : set the evolution type to "join" and return + # False to continue + self.grid_options['evolution_type'] = 'join' + return False + + elif self.grid_options['condor'] == 1: + # if condor=1, we should have no evolution type, we + # set up the Condor scripts and get them evolving + # in a Condor queue + self.grid_options['evolution_type'] = None - return self.evolve() - elif self.grid_options['evolution_type'] == 'join': - # should not happen! - return - else: - # setup and launch condor jobs + # make dirs self.make_condor_dirs() # check we're not using too much RAM @@ -185,33 +197,30 @@ class condor(): self.grid_options['condor_warn_max_memory'])) self.exit(code=1) - # set up condor_array - if not self.grid_options['condor_array_max_jobs']: - self.grid_options['condor_array_max_jobs'] = self.grid_options['condor_njobs'] - condor_array = self.grid_options['condor_array'] or "1-{njobs}\%{max_jobs}".format( - njobs=self.grid_options['condor_njobs'], - max_jobs=self.grid_options['condor_array_max_jobs']) - # get job id (might be passed in) - ClusterID = self.grid_options['condor_ClusterID'] if self.grid_options['condor_ClusterID'] != "" else '$CONDOR_ARRAY_JOB_ID' + ClusterID = self.grid_options['condor_ClusterID'] if self.grid_options['condor_ClusterID'] != "" else '$ClusterID' # get job array index - ProcID = self.grid_options['condor_ProcID'] - if ProcID is None: - ProcID = '$CONDOR_ARRAY_TASK_ID' - + Process = self.grid_options['condor_Process'] if self.grid_options['condor_Process'] != "" else '$Process' + if self.grid_options['condor_njobs'] == 0: print("binary_c-python Condor : You must set grid_option condor_njobs to be non-zero") self.exit(code=1) + # find the path to the Python script that we are running + pyscriptpath = str(lib_programname.get_path_executed_script()) + + # set the condor initial dir to be our current working directory + if not self.grid_options['condor_initial_dir']: + self.grid_options['condor_initial_dir'] = os.getcwd() + # build the grid command grid_command = [ os.path.join("/usr","bin","env"), sys.executable, - str(lib_programname.get_path_executed_script()), + pyscriptpath, ] + sys.argv[1:] + [ - 'condor=2', - 'start_at=' + str(ProcID) + '-1', # do we need the -1? + 'start_at=$Process', # Process is 0,1,2... which is what we want 'modulo=' + str(self.grid_options['condor_njobs']), 'condor_njobs=' + str(self.grid_options['condor_njobs']), 'condor_dir=' + self.grid_options['condor_dir'], @@ -239,48 +248,46 @@ class condor(): path=job_script_path, condor_dir = self.grid_options['condor_dir'])) - - ############################################################ # The condor job script calls your binary_c-pthyon script ############################################################ condor_job_script = """#!{bash} -# first two arguments are ClusterID and ProcID -ClusterID = $1 -shift -ProcID = $1 -shift +echo "Condor Job Args: $@" + +# first two arguments are ClusterID and Process +export ClusterID=$1 +export Process=$2 +shift 2 + +echo "Job ClusterID $ClusterID Process $Process" # Set binary_c startup conditions export BINARY_C_PYTHON_ORIGINAL_CMD_LINE={cmdline} export BINARY_C_PYTHON_ORIGINAL_WD=`{pwd}` export BINARY_C_PYTHON_ORIGINAL_SUBMISSION_TIME=`{date}` -{grid_command} - # set status to \"running\" -echo \"running\" > {condor_dir}/status/{id} +echo \"running\" > {condor_dir}/status/$ClusterID.$ProcessID # make list of files which is checked for joining -echo {condor_dir}/results/{id}.gz >> {condor_dir}/results/$ClusterID.all +echo {condor_dir}/results/$ClusterID.$Process.gz >> {condor_dir}/results/$ClusterID.all # run grid of stars and, if this returns 0, set status to finished -{grid_command} evolution_type=grid condor_ClusterID=$ClusterID condor_ProcID=$ProcID save_population_object={condor_dir}/results/$ProcID.gz && echo -n \"finished\" > {condor_dir}/status/{id} && echo """.format( +{grid_command} condor=2 evolution_type=grid condor_ClusterID=$ClusterID condor_Process=$Process save_population_object={condor_dir}/results/$ClusterID.$Process.gz && echo -n \"finished\" > {condor_dir}/status/$ClusterID.$ProcessID && echo """.format( bash=self.grid_options['condor_bash'], date=self.grid_options['condor_date'], pwd=self.grid_options['condor_pwd'], cmdline=repr(self.grid_options['command_line']), grid_command=grid_command, condor_dir=self.grid_options['condor_dir'], - id=self.condorID() ) if not self.grid_options['condor_postpone_join']: - joinfile = "{condor_dir}/results/{ClusterID}.all".format( + joinfile = "{condor_dir}/results/$ClusterID.all".format( condor_dir=self.grid_options['condor_dir'], ClusterID=ClusterID ) - condor_job_script += """&& echo \"Checking if we can join...\" && echo && {grid_command} condor=2 evolution_type=join joinlist={joinfile} condor_ClusterID=$ClusterID condor_ProcID=$ProcID + condor_job_script += """&& echo \"Checking if we can join...\" && echo && {grid_command} condor=3 evolution_type=join joinlist={joinfile} condor_ClusterID=$ClusterID condor_Process=$Process """.format( bash = self.grid_options['condor_bash'], grid_command=grid_command, @@ -289,17 +296,27 @@ echo {condor_dir}/results/{id}.gz >> {condor_dir}/results/$ClusterID.all ############################################################ # The Condor submit script is sent to condor_submit - # In here we know $(Cluster) and $(Item) which identify + # In here we know $(Cluster) and $(Process) which identify # each job ############################################################ + extra_settings = "" + if self.grid_options['condor_extra_settings']: + for key in self.grid_options['condor_extra_settings']: + extra_settings += "{key} = {value}\n".format( + key=key, + value=self.grid_options['condor_extra_settings'][key] + ) + + jobid = '$(Cluster).$(Process)' condor_submit_script = """ -executable = {env} -arguments = {bash} {scriptfile} $(Cluster) $(Item) {arguments} +executable = {usr_bin_env} +arguments = {bash} {job_script_path} $(Cluster) $(Process) universe = {universe} +getenv = {getenv} +initial_dir = {initial_dir} output = {outfile} error = {errfile} log = {logfile} -initial_dir = {cwd} stream_output = {stream_output} stream_error = {stream_error} request_memory = {request_memory} @@ -308,43 +325,37 @@ should_transfer_files = {should_transfer_files} when_to_transfer_output = {when_to_transfer_output} requirements = {requirements} {extra_settings} -queue {njobs} +queue {njobs} """.format( usr_bin_env = self.grid_options['condor_usr_bin_env'], bash = self.grid_options['condor_bash'], - scriptfile = scriptfile, - arguments = ' '.join(arguments_list), + job_script_path = job_script_path, universe = self.grid_options['condor_universe'], + getenv = self.grid_options['condor_getenv'], + initial_dir = self.grid_options['condor_initial_dir'], outfile = os.path.abspath(os.path.join(self.grid_options['condor_dir'], 'stdout', - '$(Cluster).$(Item)')), + jobid)), errfile = os.path.abspath(os.path.join(self.grid_options['condor_dir'], 'stderr', - '$(Cluster).$(Item)')), + jobid)), logfile = os.path.abspath(os.path.join(self.grid_options['condor_dir'], 'log', - '$(Cluster).$(Item)')), - initial_dir = os.getcwd(), + jobid)), stream_output = self.grid_options['condor_stream_output'], stream_error = self.grid_options['condor_stream_error'], - request_memory = self.grid_options['stream_memory'], + request_memory = self.grid_options['condor_memory'], request_cpus = self.grid_options['num_processes'], should_transfer_files = self.grid_options['condor_should_transfer_files'], when_to_transfer_output = self.grid_options['when_to_transfer_output'], requirements = self.grid_options['condor_requirements'], - extra_settings = self.grid_options['condor_extra_settings'], + extra_settings = extra_settings, njobs = self.grid_options['condor_njobs'], ) - for key in self.grid_options['condor_extra_settings']: - condor_submit_script += "{key} = {value}\n".format( - key=key, - value=self.grid_options['condor_extra_settings'][key] - ) - # write the scripts, close them and make them executable by # all (so the condor user can pick it up) - for file,contents,path in [(submit_script,condor_submit_script), + for file,contents in [(submit_script,condor_submit_script), (job_script,condor_job_script)]: path = file.name file.writelines(contents) @@ -354,9 +365,9 @@ queue {njobs} stat.S_IRGRP | stat.S_IXGRP | \ stat.S_IROTH | stat.S_IXOTH) - if not self.grid_options['condor_postpone_sbatch']: + if not self.grid_options['condor_postpone_submit']: # call sbatch to launch the condor jobs - cmd = [self.grid_options['condor_sbatch'], submit_script_path] + cmd = [self.grid_options['condor_submit'], submit_script_path] pipes = subprocess.Popen(cmd, stdout = subprocess.PIPE, stderr = subprocess.PIPE) @@ -383,12 +394,14 @@ queue {njobs} # some messages to the user, then return - if self.grid_options['condor_postpone_sbatch'] == 1: + if self.grid_options['condor_postpone_submit'] == 1: print("Condor script written, to {path}, but launching the jobs with sbatch was postponed.".format(path=submit_script_path)) else: print("Condor jobs launched.") print("All done in condor_grid().") - return + + # return True so we exit immediately + return True def condor_ClusterID_from_dir(self,dir): """ @@ -503,14 +516,14 @@ queue {njobs} # # command = "".join([ # # "{}".python_details['executable'], # # "{}".scriptname, - # # "offset=$ProcID", + # # "offset=$Process", # # "modulo={}".format(self.grid_options['condor_njobs']), # # "vb={}".format(self.grid_options['verbosity']) - # # "results_hash_dumpfile=$self->{_grid_options}{condor_dir}/results/$ClusterID.$ProcID", + # # "results_hash_dumpfile=$self->{_grid_options}{condor_dir}/results/$ClusterID.$Process", # # 'condor_ClusterID='.$ClusterID, - # # 'condor_ProcID='.$ProcID, - # # 'condor_jobname=binary_grid_'.$ClusterID.'.'.$ProcID, + # # 'condor_Process='.$Process, + # # 'condor_jobname=binary_grid_'.$ClusterID.'.'.$Process, # # "condor_njobs=$njobs", # # "condor_dir=$self->{_grid_options}{condor_dir}", # # ); diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py index bba6eefec5d23d4634a4ec2ecbb0456bb77a9701..3c48ab8b933236b41aa4c93abb00f18af1130007 100644 --- a/binarycpython/utils/grid.py +++ b/binarycpython/utils/grid.py @@ -392,9 +392,18 @@ class Population(analytics, self.grid_options["verbosity"], 2, ) - value = type(old_value)(value) - self.verbose_print("Success!", self.grid_options["verbosity"], 2) - + try: + value = type(old_value)(value) + self.verbose_print("Success!", self.grid_options["verbosity"], 2) + except: + print("Failed to convert {param} value with type {type}: old_value is '{old}', new value is '{new}'".format( + param=parameter, + old=old_value, + type=type(old_value), + new=split[1] + )) + self.exit(code=1) + except ValueError: # might be able to eval the parameter, e.g. diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py index b7b102474d0e8762f515a90c1f16c170e6ea973c..8ca4abc37366d036211bc1bb1e417faae8d6d5c7 100644 --- a/binarycpython/utils/grid_options_defaults.py +++ b/binarycpython/utils/grid_options_defaults.py @@ -218,10 +218,12 @@ class grid_options_defaults(): "condor_dir": "", # working directory containing e.g. scripts, output, logs (e.g. should be NFS available to all) "condor_njobs": "", # number of scripts/jobs that CONDOR will run in total "condor_ClusterID": None, # condor cluster id, equivalent to Slurm's jobid - "condor_ProcID": None, # condor process id, equivalent to Slurm's jobarrayindex - "condor_postpone_join": 0, # if 1, data is not joined, e.g. if you want to do it off the condor grid (e.g. with more RAM) + "condor_Process": None, # condor process, equivalent to Slurm's jobarrayindex + "condor_postpone_submit": 0, # if 1, the condor script is not submitted (useful for debugging). Default 0. + "condor_postpone_join": 0, # if 1, data is not joined, e.g. if you want to do it off the condor grid (e.g. with more RAM). Default 0. # "condor_join_machine": None, # if defined then this is the machine on which the join command should be launched (must be sshable and not postponed) - "condor_memory": 1024, # in MB, the memory use (ImageSize) of the job + "condor_memory": 512, # in MB, the memory use (ImageSize) of the job + "condor_warn_max_memory": 1024, # in MB, the memory use (ImageSize) of the job "condor_universe": "vanilla", # usually vanilla universe "condor_extra_settings": {}, # Place to put extra configuration for the CONDOR submit file. The key and value of the dict will become the key and value of the line in te slurm batch file. Will be put in after all the other settings (and before the command). Take care not to overwrite something without really meaning to do so. # snapshots and checkpoints @@ -232,7 +234,7 @@ class grid_options_defaults(): # (warning: lots of files!), otherwise just store the lates 'condor_stream_output':True, # stream stdout 'condor_stream_error':True, # stream stderr - 'condor_should_transfer_files' : 'IF_NEEDED', + 'condor_should_transfer_files' : 'YES', 'when_to_transfer_output' : 'ON_EXIT_OR_EVICT', 'condor_save_joined_file':0, # if 1 then results/joined contains the results # (useful for debugging, otherwise a lot of work) @@ -241,7 +243,10 @@ class grid_options_defaults(): 'condor_bash' : '/bin/bash', # bash executable location "condor_pwd" : "/usr/bin/pwd", # pwd command location for Condor "condor_date" : "/usr/bin/date", # bash location for Condor - + "condor_array_max_jobs" : None, # override for the max number of concurrent array jobs + "condor_initial_dir" : None, # directory from which condor is run, if None is the directory in which your script is run + "condor_submit" : "condor_submit", # the condor_submit command + "condor_getenv" : True, # if True condor takes the environment at submission and copies it to the jobs. You almost certainly want this. ########################## # Unordered. Need to go through this. Copied from the perl implementation. ########################## diff --git a/binarycpython/utils/slurm.py b/binarycpython/utils/slurm.py index c4114f2e38ecec4bfa53f5b224cfc45bf0ecad1e..4b942d99441a39cb955a058a3da2f1ecc901c17d 100644 --- a/binarycpython/utils/slurm.py +++ b/binarycpython/utils/slurm.py @@ -175,17 +175,20 @@ class slurm(): # run a grid of stars only, leaving the results # in the appropriate outfile return False + elif self.grid_options['slurm'] == 3: # joining : set the evolution type to "join" and return # False to continue self.grid_options['evolution_type'] = 'join' return False + elif self.grid_options['slurm'] == 1: # if slurm=1, we should have no evolution type, we # set up the Slurm scripts and get them evolving - # in slurm array + # in a Slurm array self.grid_options['evolution_type'] = None - # setup and launch slurm jobs + + # make dirs self.make_slurm_dirs() # check we're not using too much RAM