From d37e18693bd08978a485dffa947c3ae001ff9c89 Mon Sep 17 00:00:00 2001
From: Robert Izzard <r.izzard@surrey.ac.uk>
Date: Sun, 14 Nov 2021 15:13:05 +0000
Subject: [PATCH] add condor support

---
 binarycpython/utils/condor.py                | 201 ++++++++++---------
 binarycpython/utils/grid.py                  |  15 +-
 binarycpython/utils/grid_options_defaults.py |  15 +-
 binarycpython/utils/slurm.py                 |   7 +-
 4 files changed, 134 insertions(+), 104 deletions(-)

diff --git a/binarycpython/utils/condor.py b/binarycpython/utils/condor.py
index cd4c2d69b..c8bf626ad 100644
--- a/binarycpython/utils/condor.py
+++ b/binarycpython/utils/condor.py
@@ -2,22 +2,34 @@
     Binary_c-python's condor functions
 """
 
+import os
+import datasize
+import lib_programname
+import multiprocessing
+import os
+import pathlib
+import signal
+import stat
+import subprocess
+import sys
+import time
+
 class condor():
 
     def __init__(self, **kwargs):
         # don't do anything: we just inherit from this class
         return
 
-    def condorID(self,clusterID=None,ProcID=None):
+    def condorID(self,ClusterID=None,Process=None):
         """
-        Function to return a Condor job ID. The ClusterID and ProcID passed in are used if given, otherwise we default to the condor_ClusterID and condor_ProcID in grid_options.
+        Function to return a Condor job ID. The ClusterID and Process passed in are used if given, otherwise we default to the condor_ClusterID and condor_Process in grid_options.
         """
         if ClusterID is None:
             ClusterID=  self.grid_options['condor_ClusterID']
-        if ProcID is None:
-            ProcID = self.grid_options['condor_ProcID']
-        return "{ClusterID}.{ProcID}".format(ClusterID=ClusterID,
-                                             ProcID=ProcID)
+        if Process is None:
+            Process = self.grid_options['condor_Process']
+        return "{ClusterID}.{Process}".format(ClusterID=ClusterID,
+                                             Process=Process)
 
     def condorpath(self,path):
         """
@@ -27,12 +39,12 @@ class condor():
 
     def condor_status_file(self,
                            ClusterID=None,
-                           ProcID=None):
+                           Process=None):
         """
-        Return the condor status file corresponding to the ClusterID and ProcID, which default to grid_options condor_ClusterID and condor_ProcID, respectively.
+        Return the condor status file corresponding to the ClusterID and Process, which default to grid_options condor_ClusterID and condor_Process, respectively.
         """
         return os.path.join(self.condorpath('status'),
-                            self.condorID(ClusterID,ProcID))
+                            self.condorID(ClusterID,Process))
 
     def condor_check_requirements(self):
         """
@@ -56,7 +68,7 @@ class condor():
 
     def set_condor_status(self,string):
         """
-        Set the condor status corresponing to the self object, which should have condor_ClusterID and condor_ProcID set.
+        Set the condor status corresponing to the self object, which should have condor_ClusterID and condor_Process set.
         """
         # save condor ClusterID to file
         idfile = os.path.join(self.grid_options["condor_dir"],
@@ -76,21 +88,21 @@ class condor():
 
     def get_condor_status(self,
                          ClusterID=None,
-                         ProcID=None):
+                         Process=None):
         """
-        Get and return the condor status corresponing to the self object, or ClusterID.ProcID if they are passed in. If no status is found, returns an empty string..
+        Get and return the condor status corresponing to the self object, or ClusterID.Process if they are passed in. If no status is found, returns an empty string..
         """
         if ClusterID is None:
             ClusterID = self.grid_options['condor_ClusterID']
-        if ProcID is None:
-            ProcID = self.grid_options['condor_ProcID']
+        if Process is None:
+            Process = self.grid_options['condor_Process']
 
-        if ClusterID is None or ProcID is None :
+        if ClusterID is None or Process is None :
             return None
 
         try:
             path = pathlib.Path(self.condor_status_file(ClusterID=ClusterID,
-                                                        ProcID=ProcID))
+                                                        Process=Process))
             if path:
                 return path.read_text().strip()
             else:
@@ -127,7 +139,7 @@ class condor():
         for dir in dirs:
             try:
                 pathlib.Path(self.condorpath(dir)).mkdir(exist_ok=False,
-                                                        parents=True)
+                                                         parents=True)
             except:
                 print("Tried to make the directory {dir} but it already exists. When you launch a set of binary_c jobs on Condor, you need to set your condor_dir to be a fresh directory with no contents.".format(dir=dir))
                 self.exit(code=1)
@@ -153,29 +165,29 @@ class condor():
         """
         function to be called when running grids when grid_options['condor']>=1
 
-        if grid_options['condor']==1, we set up the condor script and launch the jobs, then exit.
-        if grid_options['condor']==2, we are being called from the jobs to run the grids
+        if grid_options['condor']==1, we set up the condor script and launch the jobs, then return True to exit.
+        if grid_options['condor']==2, we run the stars, which means we return False to continue.
+        if grid_options['condor']==3, we are being called from the jobs to run the grids, return False to continue.
         """
 
-        # if condor=1,  we should have no evolution type, we just
-        # set up a load of condor scripts and get them evolving
-        # in condor array
-        if self.grid_options['condor'] == 1:
-            self.grid_options['evolution_type'] = None
-
-        if self.grid_options['evolution_type'] == 'grid':
+        if self.grid_options['condor'] == 2:
             # run a grid of stars only, leaving the results
-            # in a file
-
-            # set output file
-            self.grid_options['save_population_object'] = condor_outfile()
+            # in the appropriate outfile
+            return False
+        
+        elif self.grid_options['condor'] == 3:
+            # joining : set the evolution type to "join" and return
+            # False to continue
+            self.grid_options['evolution_type'] = 'join'
+            return False
+
+        elif self.grid_options['condor'] == 1:
+            # if condor=1,  we should have no evolution type, we
+            # set up the Condor scripts and get them evolving
+            # in a Condor queue 
+            self.grid_options['evolution_type'] = None
 
-            return self.evolve()
-        elif self.grid_options['evolution_type'] == 'join':
-            # should not happen!
-            return
-        else:
-            # setup and launch condor jobs
+            # make dirs
             self.make_condor_dirs()
 
             # check we're not using too much RAM
@@ -185,33 +197,30 @@ class condor():
                     self.grid_options['condor_warn_max_memory']))
                 self.exit(code=1)
 
-            # set up condor_array
-            if not self.grid_options['condor_array_max_jobs']:
-                self.grid_options['condor_array_max_jobs'] = self.grid_options['condor_njobs']
-                condor_array = self.grid_options['condor_array'] or "1-{njobs}\%{max_jobs}".format(
-                    njobs=self.grid_options['condor_njobs'],
-                    max_jobs=self.grid_options['condor_array_max_jobs'])
-
             # get job id (might be passed in)
-            ClusterID = self.grid_options['condor_ClusterID'] if self.grid_options['condor_ClusterID'] != "" else '$CONDOR_ARRAY_JOB_ID'
+            ClusterID = self.grid_options['condor_ClusterID'] if self.grid_options['condor_ClusterID'] != "" else '$ClusterID'
 
             # get job array index
-            ProcID = self.grid_options['condor_ProcID']
-            if ProcID is None:
-                ProcID = '$CONDOR_ARRAY_TASK_ID'
-
+            Process = self.grid_options['condor_Process'] if self.grid_options['condor_Process'] != "" else '$Process'
+            
             if self.grid_options['condor_njobs'] == 0:
                 print("binary_c-python Condor : You must set grid_option condor_njobs to be non-zero")
                 self.exit(code=1)
 
+            # find the path to the Python script that we are running
+            pyscriptpath = str(lib_programname.get_path_executed_script())
+            
+            # set the condor initial dir to be our current working directory
+            if not self.grid_options['condor_initial_dir']:
+                self.grid_options['condor_initial_dir'] = os.getcwd()
+                
             # build the grid command
             grid_command = [
                 os.path.join("/usr","bin","env"),
                 sys.executable,
-                str(lib_programname.get_path_executed_script()),
+                pyscriptpath,
             ] + sys.argv[1:] + [
-                'condor=2',
-                'start_at=' + str(ProcID) + '-1', # do we need the -1?
+                'start_at=$Process', # Process is 0,1,2... which is what we want
                 'modulo=' + str(self.grid_options['condor_njobs']),
                 'condor_njobs=' + str(self.grid_options['condor_njobs']),
                 'condor_dir=' + self.grid_options['condor_dir'],
@@ -239,48 +248,46 @@ class condor():
                     path=job_script_path,
                     condor_dir = self.grid_options['condor_dir']))
 
-
-
             ############################################################
             # The condor job script calls your binary_c-pthyon script
             ############################################################
             condor_job_script = """#!{bash}
-# first two arguments are ClusterID and ProcID
-ClusterID = $1
-shift
-ProcID = $1
-shift
+echo "Condor Job Args: $@"
+
+# first two arguments are ClusterID and Process
+export ClusterID=$1
+export Process=$2
+shift 2
+
+echo "Job ClusterID $ClusterID Process $Process"
 
 # Set binary_c startup conditions
 export BINARY_C_PYTHON_ORIGINAL_CMD_LINE={cmdline}
 export BINARY_C_PYTHON_ORIGINAL_WD=`{pwd}`
 export BINARY_C_PYTHON_ORIGINAL_SUBMISSION_TIME=`{date}`
 
-{grid_command}
-
 # set status to \"running\"
-echo \"running\" > {condor_dir}/status/{id}
+echo \"running\" > {condor_dir}/status/$ClusterID.$ProcessID
 
 # make list of files which is checked for joining
-echo {condor_dir}/results/{id}.gz >> {condor_dir}/results/$ClusterID.all
+echo {condor_dir}/results/$ClusterID.$Process.gz >> {condor_dir}/results/$ClusterID.all
 
 # run grid of stars and, if this returns 0, set status to finished
-{grid_command} evolution_type=grid condor_ClusterID=$ClusterID condor_ProcID=$ProcID save_population_object={condor_dir}/results/$ProcID.gz && echo -n \"finished\" > {condor_dir}/status/{id} && echo """.format(
+{grid_command} condor=2 evolution_type=grid condor_ClusterID=$ClusterID condor_Process=$Process save_population_object={condor_dir}/results/$ClusterID.$Process.gz && echo -n \"finished\" > {condor_dir}/status/$ClusterID.$ProcessID && echo """.format(
     bash=self.grid_options['condor_bash'],
     date=self.grid_options['condor_date'],
     pwd=self.grid_options['condor_pwd'],
     cmdline=repr(self.grid_options['command_line']),
     grid_command=grid_command,
     condor_dir=self.grid_options['condor_dir'],
-    id=self.condorID()
 )
 
             if not self.grid_options['condor_postpone_join']:
-                joinfile = "{condor_dir}/results/{ClusterID}.all".format(
+                joinfile = "{condor_dir}/results/$ClusterID.all".format(
                     condor_dir=self.grid_options['condor_dir'],
                     ClusterID=ClusterID
                 )
-                condor_job_script += """&& echo \"Checking if we can join...\" && echo && {grid_command} condor=2 evolution_type=join joinlist={joinfile} condor_ClusterID=$ClusterID condor_ProcID=$ProcID
+                condor_job_script += """&& echo \"Checking if we can join...\" && echo && {grid_command} condor=3 evolution_type=join joinlist={joinfile} condor_ClusterID=$ClusterID condor_Process=$Process
                 """.format(
                     bash = self.grid_options['condor_bash'],
                     grid_command=grid_command,
@@ -289,17 +296,27 @@ echo {condor_dir}/results/{id}.gz >> {condor_dir}/results/$ClusterID.all
 
                 ############################################################
                 # The Condor submit script is sent to condor_submit
-                # In here we know $(Cluster) and $(Item) which identify
+                # In here we know $(Cluster) and $(Process) which identify
                 # each job
                 ############################################################
+                extra_settings = ""
+                if self.grid_options['condor_extra_settings']:
+                    for key in self.grid_options['condor_extra_settings']:
+                        extra_settings += "{key} = {value}\n".format(
+                            key=key,
+                            value=self.grid_options['condor_extra_settings'][key]
+                        )
+                        
+                jobid = '$(Cluster).$(Process)'
                 condor_submit_script = """
-executable = {env}
-arguments = {bash} {scriptfile} $(Cluster) $(Item) {arguments}
+executable = {usr_bin_env}
+arguments = {bash} {job_script_path} $(Cluster) $(Process)
 universe = {universe}
+getenv = {getenv}
+initial_dir = {initial_dir}
 output = {outfile}
 error = {errfile}
 log = {logfile}
-initial_dir = {cwd}
 stream_output = {stream_output}
 stream_error = {stream_error}
 request_memory = {request_memory}
@@ -308,43 +325,37 @@ should_transfer_files = {should_transfer_files}
 when_to_transfer_output = {when_to_transfer_output}
 requirements = {requirements}
 {extra_settings}
-queue {njobs}
+queue {njobs} 
             """.format(
                 usr_bin_env = self.grid_options['condor_usr_bin_env'],
                 bash = self.grid_options['condor_bash'],
-                scriptfile = scriptfile,
-                arguments = ' '.join(arguments_list),
+                job_script_path = job_script_path,
                 universe = self.grid_options['condor_universe'],
+                getenv = self.grid_options['condor_getenv'],
+                initial_dir = self.grid_options['condor_initial_dir'],
                 outfile = os.path.abspath(os.path.join(self.grid_options['condor_dir'],
                                                        'stdout',
-                                                       '$(Cluster).$(Item)')),
+                                                       jobid)),
                 errfile = os.path.abspath(os.path.join(self.grid_options['condor_dir'],
                                                        'stderr',
-                                                       '$(Cluster).$(Item)')),
+                                                       jobid)),
                 logfile = os.path.abspath(os.path.join(self.grid_options['condor_dir'],
                                                        'log',
-                                                       '$(Cluster).$(Item)')),
-                initial_dir = os.getcwd(),
+                                                       jobid)),
                 stream_output = self.grid_options['condor_stream_output'],
                 stream_error = self.grid_options['condor_stream_error'],
-                request_memory = self.grid_options['stream_memory'],
+                request_memory = self.grid_options['condor_memory'],
                 request_cpus = self.grid_options['num_processes'],
                 should_transfer_files = self.grid_options['condor_should_transfer_files'],
                 when_to_transfer_output = self.grid_options['when_to_transfer_output'],
                 requirements = self.grid_options['condor_requirements'],
-                extra_settings = self.grid_options['condor_extra_settings'],
+                extra_settings = extra_settings,
                 njobs = self.grid_options['condor_njobs'],
             )
 
-            for key in self.grid_options['condor_extra_settings']:
-                condor_submit_script += "{key} = {value}\n".format(
-                    key=key,
-                    value=self.grid_options['condor_extra_settings'][key]
-                )
-
             # write the scripts, close them and make them executable by
             # all (so the condor user can pick it up)
-            for file,contents,path in [(submit_script,condor_submit_script),
+            for file,contents in [(submit_script,condor_submit_script),
                                        (job_script,condor_job_script)]:
                 path = file.name
                 file.writelines(contents)
@@ -354,9 +365,9 @@ queue {njobs}
                          stat.S_IRGRP | stat.S_IXGRP | \
                          stat.S_IROTH | stat.S_IXOTH)
 
-            if not self.grid_options['condor_postpone_sbatch']:
+            if not self.grid_options['condor_postpone_submit']:
                 # call sbatch to launch the condor jobs
-                cmd = [self.grid_options['condor_sbatch'], submit_script_path]
+                cmd = [self.grid_options['condor_submit'], submit_script_path]
                 pipes = subprocess.Popen(cmd,
                                          stdout = subprocess.PIPE,
                                          stderr = subprocess.PIPE)
@@ -383,12 +394,14 @@ queue {njobs}
 
 
         # some messages to the user, then return
-        if self.grid_options['condor_postpone_sbatch'] == 1:
+        if self.grid_options['condor_postpone_submit'] == 1:
             print("Condor script written, to {path}, but launching the jobs with sbatch was postponed.".format(path=submit_script_path))
         else:
             print("Condor jobs launched.")
             print("All done in condor_grid().")
-        return
+
+        # return True so we exit immediately        
+        return True
 
     def condor_ClusterID_from_dir(self,dir):
         """
@@ -503,14 +516,14 @@ queue {njobs}
     #             # command = "".join([
     #             #     "{}".python_details['executable'],
     #             #     "{}".scriptname,
-    #             #     "offset=$ProcID",
+    #             #     "offset=$Process",
     #             #     "modulo={}".format(self.grid_options['condor_njobs']),
     #             #     "vb={}".format(self.grid_options['verbosity'])
 
-    #             #      "results_hash_dumpfile=$self->{_grid_options}{condor_dir}/results/$ClusterID.$ProcID",
+    #             #      "results_hash_dumpfile=$self->{_grid_options}{condor_dir}/results/$ClusterID.$Process",
     #             #      'condor_ClusterID='.$ClusterID,
-    #             #      'condor_ProcID='.$ProcID,
-    #             #      'condor_jobname=binary_grid_'.$ClusterID.'.'.$ProcID,
+    #             #      'condor_Process='.$Process,
+    #             #      'condor_jobname=binary_grid_'.$ClusterID.'.'.$Process,
     #             #      "condor_njobs=$njobs",
     #             #      "condor_dir=$self->{_grid_options}{condor_dir}",
     #             # );
diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py
index bba6eefec..3c48ab8b9 100644
--- a/binarycpython/utils/grid.py
+++ b/binarycpython/utils/grid.py
@@ -392,9 +392,18 @@ class Population(analytics,
                                     self.grid_options["verbosity"],
                                     2,
                                 )
-                                value = type(old_value)(value)
-                                self.verbose_print("Success!", self.grid_options["verbosity"], 2)
-
+                                try:
+                                    value = type(old_value)(value)
+                                    self.verbose_print("Success!", self.grid_options["verbosity"], 2)
+                                except:
+                                    print("Failed to convert {param} value with type {type}: old_value is '{old}', new value is '{new}'".format(
+                                        param=parameter,
+                                        old=old_value,
+                                        type=type(old_value),
+                                        new=split[1]
+                                    ))
+                                    self.exit(code=1)
+                                    
                             except ValueError:
 
                                 # might be able to eval the parameter, e.g.
diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py
index b7b102474..8ca4abc37 100644
--- a/binarycpython/utils/grid_options_defaults.py
+++ b/binarycpython/utils/grid_options_defaults.py
@@ -218,10 +218,12 @@ class grid_options_defaults():
             "condor_dir": "",  # working directory containing e.g. scripts, output, logs (e.g. should be NFS available to all)
             "condor_njobs": "",  # number of scripts/jobs that CONDOR will run in total
             "condor_ClusterID": None,  # condor cluster id, equivalent to Slurm's jobid
-            "condor_ProcID": None,  # condor process id, equivalent to Slurm's jobarrayindex
-            "condor_postpone_join": 0,  # if 1, data is not joined, e.g. if you want to do it off the condor grid (e.g. with more RAM)
+            "condor_Process": None,  # condor process, equivalent to Slurm's jobarrayindex
+            "condor_postpone_submit": 0,  # if 1, the condor script is not submitted (useful for debugging). Default 0.
+            "condor_postpone_join": 0,  # if 1, data is not joined, e.g. if you want to do it off the condor grid (e.g. with more RAM). Default 0.
             # "condor_join_machine": None, # if defined then this is the machine on which the join command should be launched (must be sshable and not postponed)
-            "condor_memory": 1024,  # in MB, the memory use (ImageSize) of the job
+            "condor_memory": 512,  # in MB, the memory use (ImageSize) of the job
+            "condor_warn_max_memory": 1024,  # in MB, the memory use (ImageSize) of the job
             "condor_universe": "vanilla",  # usually vanilla universe
             "condor_extra_settings": {},  # Place to put extra configuration for the CONDOR submit file. The key and value of the dict will become the key and value of the line in te slurm batch file. Will be put in after all the other settings (and before the command). Take care not to overwrite something without really meaning to do so.
             # snapshots and checkpoints
@@ -232,7 +234,7 @@ class grid_options_defaults():
             # (warning: lots of files!), otherwise just store the lates
             'condor_stream_output':True, # stream stdout
             'condor_stream_error':True, # stream stderr
-            'condor_should_transfer_files' : 'IF_NEEDED',
+            'condor_should_transfer_files' : 'YES',
             'when_to_transfer_output' : 'ON_EXIT_OR_EVICT',
             'condor_save_joined_file':0, # if 1 then results/joined contains the results
             # (useful for debugging, otherwise a lot of work)
@@ -241,7 +243,10 @@ class grid_options_defaults():
             'condor_bash' : '/bin/bash', # bash executable location
             "condor_pwd" : "/usr/bin/pwd", # pwd command location for Condor
             "condor_date" : "/usr/bin/date", # bash location for Condor
-
+            "condor_array_max_jobs" : None, # override for the max number of concurrent array jobs
+            "condor_initial_dir" : None, # directory from which condor is run, if None is the directory in which your script is run
+            "condor_submit" : "condor_submit", # the condor_submit command
+            "condor_getenv" : True, # if True condor takes the environment at submission and copies it to the jobs. You almost certainly want this.
             ##########################
             # Unordered. Need to go through this. Copied from the perl implementation.
             ##########################
diff --git a/binarycpython/utils/slurm.py b/binarycpython/utils/slurm.py
index c4114f2e3..4b942d994 100644
--- a/binarycpython/utils/slurm.py
+++ b/binarycpython/utils/slurm.py
@@ -175,17 +175,20 @@ class slurm():
             # run a grid of stars only, leaving the results
             # in the appropriate outfile
             return False
+        
         elif self.grid_options['slurm'] == 3:
             # joining : set the evolution type to "join" and return
             # False to continue
             self.grid_options['evolution_type'] = 'join'
             return False
+        
         elif self.grid_options['slurm'] == 1:
             # if slurm=1,  we should have no evolution type, we
             # set up the Slurm scripts and get them evolving
-            # in slurm array
+            # in a Slurm array
             self.grid_options['evolution_type'] = None
-            # setup and launch slurm jobs
+            
+            # make dirs
             self.make_slurm_dirs()
 
             # check we're not using too much RAM
-- 
GitLab