From c689e3ac972d35c4650fe083df31e4477138734c Mon Sep 17 00:00:00 2001
From: Izzard <ri0005@orca.eps.surrey.ac.uk>
Date: Sat, 27 Nov 2021 08:19:08 +0000
Subject: [PATCH] sync latest changes to HPC/condor

---
 binarycpython/utils/HPC.py                   | 130 +++++++++++++++++--
 binarycpython/utils/condor.py                |  46 ++++++-
 binarycpython/utils/dataIO.py                |  49 +++++--
 binarycpython/utils/grid.py                  |  16 ++-
 binarycpython/utils/grid_options_defaults.py |   2 +
 binarycpython/utils/slurm.py                 |   7 +
 binarycpython/utils/spacing_functions.py     |   7 +-
 7 files changed, 225 insertions(+), 32 deletions(-)

diff --git a/binarycpython/utils/HPC.py b/binarycpython/utils/HPC.py
index 821450bf8..81e87e933 100644
--- a/binarycpython/utils/HPC.py
+++ b/binarycpython/utils/HPC.py
@@ -11,6 +11,7 @@
 import datetime
 import flufl.lock
 import glob
+import json
 import os
 import pathlib
 import time
@@ -102,7 +103,7 @@ class HPC(condor,slurm):
                 print("Making joiningfile list range (0,{}) at {}".format(n,file))                
                 f.write(string)
                 f.flush()
-                os.fsync(f)
+                os.fsync(f.fileno())
                 x = True
                 self.locked_close(f,lock)
                 os.sync()
@@ -307,20 +308,31 @@ class HPC(condor,slurm):
             status = None
         return status
 
-    def HPC_get_status(self,dir=None):
+    def HPC_get_status(self,
+                       job_id=None,
+                       job_index=None,
+                       dir=None):
         """
-        Get and return the appropriate HPC job (Condor or Slurm) status string for this job.
+        Get and return the appropriate HPC job (Condor or Slurm) status string for this job (or, if given, the job at id.index)
 
         Args:
             dir : optional HPC run directory. If not set, the default (e.g. slurm_dir or condor_dir)
                   is used.
+            job_id,job_index : the id and index of the job to be queried
         """
+        #print("HPC get status job_id={} job_index={} dir={}".format(
+        #      job_id,job_index,dir))
         if self.grid_options['slurm'] > 0:
-            status = self.get_slurm_status(dir=dir)
+            status = self.get_slurm_status(jobid=job_id,
+                                           jobarrayindex=job_index,
+                                           dir=dir)
         elif self.grid_options['condor'] > 0:
-            status = self.get_condor_status(dir=dir)
+            status = self.get_condor_status(ClusterID=job_id,
+                                            Process=job_index,
+                                            dir=dir)
         else:
             status = None
+        #print("return ",status)
         return status
 
     def HPC_dirs(self):
@@ -346,6 +358,9 @@ class HPC(condor,slurm):
         """
         id = self.HPC_jobID_tuple()[0]
 
+        # give some current status about the HPC run
+        self.HPC_dump_status("HPC grid before")
+        
         if makejoiningfile and \
            self.HPC_job_task() == 2 and \
            id is not None:
@@ -358,6 +373,9 @@ class HPC(condor,slurm):
         else:
             x = None # should not happen
 
+        # give some current status about the HPC run
+        self.HPC_dump_status("HPC grid after")
+            
         return x
 
     def HPC_check_requirements(self):
@@ -474,14 +492,25 @@ class HPC(condor,slurm):
             print("This job did not finish (status is {status}) : cannot join".format(status=status))
         else:
             # our job has finished
+            HPC_status = self.HPC_status()
+
+            #HPC_queue_stats = self.HPC_queue_stats()
+            
+            if HPC_status['status']['finished'] != HPC_status['njobs']:
+                print("HPC_status reports {} finished jobs out of {}. We cannot join because not all the jobs are finished. Exiting.".format(
+                    HPC_status['status']['finished'],
+                    HPC_status['njobs']
+                ))
+                self.exit(1)
+
             joinfiles = self.HPC_load_joinfiles_list()
             joiningfile = self.HPC_path('joining')
             print("Joinfile list n={n} (should be {m}".format(
                 n=len(joinfiles),
                 m=self.HPC_njobs()
             ))
-            print("Joingingfile is at : ",joiningfile)
-
+            print("Joingingfile path : ",joiningfile)
+                
             if len(joinfiles) != self.HPC_njobs():
                 print("Number of joinfiles != njobs : this is wrong, exiting.")
                 self.exit(1)
@@ -493,7 +522,7 @@ class HPC(condor,slurm):
                     # touch joiningfile
                     if self.grid_options['HPC_force_join'] == 0:
                         print("Making joiningfile at {}".format(joiningfile))
-                        pathlib.Path(joiningfile).touch(exist_ok=False)
+                        self.HPC_touch(joiningfile)
                     try:
                         print("Calling HPC_join_from_files()")
                         self.HPC_join_from_files(self,joinfiles)
@@ -546,3 +575,88 @@ class HPC(condor,slurm):
         else:
             d = None
         return d
+
+    def HPC_touch(self,filename,string=None):
+        """
+        Function to touch the file at filename, put into it the job number
+        and (if given) the string passed in.
+        """
+        try:
+            f = open(filename,'w',encoding='utf-8')
+            if f:
+                job = self.HPC_jobID()
+                jobtype = self.HPC_job_type()
+                if job:
+                    s = str(job)
+                    if jobtype:
+                        s += ' ' + str(jobtype)
+                    f.write(s + "\n")
+                if string:
+                    f.write(string)
+                f.flush()
+                f.close()
+            self.NFS_flush_hack(filename)
+        except:
+            pass
+
+    def HPC_status(self):
+        """
+        Return a dict of useful information about the current status
+        of this HPC run.
+        """
+        d = {} # returned
+        _id,_index = self.HPC_jobID_tuple()
+        d['job_id'] = _id
+        d['job_index'] = _index
+        if _id and _index:
+            n = self.HPC_njobs()
+            d['njobs'] = n
+            d['job_task'] = self.HPC_job_task()
+            d['job_type'] = self.HPC_job_type()
+            d['job_status'] = self.HPC_get_status()
+            d['dir'] = self.HPC_dir()
+            d['dirs'] = self.HPC_dirs()
+
+            # get fellow jobs' status
+            d["status"] = {}
+            d["joblist"] = {}
+
+            # default types
+            for x in ["running","starting","finishing","finished","killed"]:
+                d["status"][x] = 0
+                d["joblist"][x] = []
+            
+            for i in range(0,n):
+                s = self.HPC_get_status(job_id=_id,
+                                        job_index=i)
+                if s is None:
+                    s = "unknown"
+                if not s in d["status"]:
+                    d["status"][s] = 1
+                else:
+                    d["status"][s] += 1
+                if not s in d["joblist"]:
+                    d["joblist"][s] = [str(_id) + '.' + str(i)]
+                else:
+                    d["joblist"][s] += [str(_id) + '.' + str(i)]
+                
+        return d
+        
+        
+    def HPC_dump_status(self,string=None):
+        d = self.HPC_status()
+        print("############################################################")
+        if not string:
+            string = ""
+        print("HPC job status " + string)
+        print(json.dumps(d,indent=4))
+        print("############################################################")
+
+    def HPC_queue_stats(self):
+        if self.grid_options['slurm'] > 0:
+            x = self.slurm_queue_stats()
+        elif self.grid_options['condor'] > 0:
+            x = self.condor_queue_stats()
+        else:
+            x = None
+        return x
diff --git a/binarycpython/utils/condor.py b/binarycpython/utils/condor.py
index c711f944c..de7d4983b 100644
--- a/binarycpython/utils/condor.py
+++ b/binarycpython/utils/condor.py
@@ -9,6 +9,7 @@ import multiprocessing
 import os
 import pathlib
 import signal
+import re
 import stat
 import subprocess
 import sys
@@ -25,7 +26,7 @@ class condor():
         Function to return a Condor job ID. The ClusterID and Process passed in are used if given, otherwise we default to the condor_ClusterID and condor_Process in grid_options.
         """
         if ClusterID is None:
-            ClusterID=  self.grid_options['condor_ClusterID']
+            ClusterID =  self.grid_options['condor_ClusterID']
         if Process is None:
             Process = self.grid_options['condor_Process']
         return "{ClusterID}.{Process}".format(ClusterID=ClusterID,
@@ -86,13 +87,15 @@ class condor():
             with open(idfile,"w",encoding='utf-8') as fClusterID:
                 fClusterID.write("{ClusterID}\n".format(ClusterID=self.grid_options['condor_ClusterID']))
                 fClusterID.close()
-
+                self.NFS_flush_hack(idfile)
+                
         # save condor status
         file = self.condor_status_file(dir=dir)
         if file:
             with open(file,'w',encoding='utf-8') as f:
                 f.write(string)
                 f.close()
+                self.NFS_flush_hack(file)
         return
 
     def get_condor_status(self,
@@ -108,15 +111,22 @@ class condor():
             Process = self.grid_options['condor_Process']
         if ClusterID is None or Process is None :
             return None
+        #print("get_condor_status {}.{}".format(ClusterID,Process))
         try:
             path = pathlib.Path(self.condor_status_file(dir=dir,
                                                         ClusterID=ClusterID,
                                                         Process=Process))
+            #print("path={}".format(path))
+            #print("size={}".format(path.stat().st_size))
             if path:
-                return path.read_text().strip()
+                s = path.read_text().strip()
+                #print("contains {}".format(s))
+                return s
             else:
+                #print("path empty")
                 return ""
         except:
+            #print("read failed")
             return ""
 
     def condor_outfile(self,dir=None):
@@ -423,3 +433,33 @@ queue {njobs}
 
         # return True so we exit immediately
         return True
+
+    def condor_queue_stats(self):
+        """
+        Return condor queue statistics for this job
+        """
+        _id = self.grid_options['condor_ClusterID']
+        if not _id:
+            return None
+        cmd = "{} {} 2>&1".format(
+            '/usr/bin/condor_q',#self.grid_options["condor_q"],
+            _id
+        ) 
+        print("Q cmd",cmd)
+        result = subprocess.Popen(cmd,
+                                  shell=True,
+                                  stdout=subprocess.PIPE).stdout.read()
+        print("Q result ",result)
+        if not result:
+            return None
+        d = {}
+        for x in ["jobs","completed","removed","idle","running","held","suspended"]:
+            print("Q x ",x)
+            m = re.search("(\d+)\s+{}".format(x),
+                          result)
+            print("Q m ",m)
+            if m:
+                d[x] = m.group(0)
+
+        print("Q d ",d)
+        return d
diff --git a/binarycpython/utils/dataIO.py b/binarycpython/utils/dataIO.py
index 099b8ef98..6d625b99b 100644
--- a/binarycpython/utils/dataIO.py
+++ b/binarycpython/utils/dataIO.py
@@ -118,15 +118,18 @@ class dataIO():
             object.grid_options['store_memaddr'] = _store_memaddr
             object.persistent_data_memory_dict = persistent_data_memory_dict
 
+            self.NFS_flush_hack(filename)
+            
             # touch 'saved' file
-            pathlib.Path(filename + '.saved').touch(exist_ok=True)
-
+            saved = filename + '.saved'
+            self.HPC_touch(saved)
         return
 
     def load_population_object(self,filename):
         """
         returns the Population object loaded from filename
         """
+        self.NFS_flush_hack(filename)
         if filename is None:
             obj = None
         else:
@@ -216,8 +219,30 @@ class dataIO():
             The file should be saved using save_population_object()
         """
 
+
+        mtime = time.localtime(os.path.getmtime(filename))
+        modtime = time.strftime("%a, %d %b %Y %H:%M:%S",mtime)
+        print("Load data from {filename} : size {size}, modtime {modtime}".format(
+            filename=filename,
+            size=os.path.getsize(filename),
+            modtime=modtime,
+        ))
+        
         newpop = self.load_population_object(filename)
 
+        try:
+            n = newpop.grid_options['total_count']
+        except:
+            try:
+                n = newpop.grid_options['_count']
+            except:
+                try:
+                    n = newpop.grid_ensemble_results["metadata"]["_count"]
+                except:
+                    n = -1
+        print("Loaded data from {n} stars".format(
+            n=n
+        ))
 
         # merge with refpop
         try:
@@ -475,17 +500,19 @@ class dataIO():
             ID = self.process_ID
 
         if self.grid_options['status_dir']:
+            path = os.path.join(
+                self.grid_options["status_dir"],
+                format_statment.format(ID),
+            )
             with open(
-                    os.path.join(
-                        self.grid_options["status_dir"],
-                        format_statment.format(ID),
-                    ),
+                    path,
                     "w",
                     encoding='utf-8'
             ) as f:
                 f.write(string)
                 f.close()
-
+                self.NFS_flush_hack(path)
+                           
         # custom logging functions for HPC jobs
         if self.HPC_job():
             self.HPC_set_status(string)
@@ -498,16 +525,10 @@ class dataIO():
 
         Closes and unlocks the file
         """
-        #print("locked_close:\nFile now = {}".format(file))
-        #print("Lock now = {}".format(lock))
         if file:
-            #print("Close file {}".format(file))
             file.close()
         if lock:
-            #print("Unlock {}".format(lock))
             lock.unlock()
-        #print("File now = {}".format(file))
-        #print("Lock now = {}".format(lock))
         if file:
             self.NFS_flush_hack(file.name)
 
@@ -653,5 +674,7 @@ class dataIO():
         Use opendir()/closedir() to flush NFS access to a file.
         Note: this may or may not work!
         """
+        os.sync()
         dir = os.path.dirname(filename)
         os.scandir(dir)
+        
diff --git a/binarycpython/utils/grid.py b/binarycpython/utils/grid.py
index d2ebfea79..c604de20f 100644
--- a/binarycpython/utils/grid.py
+++ b/binarycpython/utils/grid.py
@@ -400,7 +400,7 @@ class Population(analytics,
                                         parameter, type(value), type(old_value)
                                     ),
                                     self.grid_options["verbosity"],
-                                    1,
+                                    3,
                                 )
                                 try:
                                     if isinstance(old_value, bool):
@@ -442,7 +442,7 @@ class Population(analytics,
                             parameter, value
                         ),
                         self.grid_options["verbosity"],
-                        1,
+                        3,
                     )
                     cmdline_dict[parameter] = value
 
@@ -833,6 +833,9 @@ class Population(analytics,
         # make analytics information
         analytics_dict = self.make_analytics_dict()
 
+        if self.HPC_job():
+            self.HPC_dump_status("HPC grid after analytics")
+        
         if self.custom_options['save_snapshot']:
             # we must save a snapshot, not the population object
             # ... also save the new starting point: this has to take into
@@ -847,10 +850,10 @@ class Population(analytics,
         # Save object to a pickle file
         elif self.grid_options['save_population_object']:
             self.save_population_object()
-
+            
         # if we're running an HPC grid, exit here
         # unless we're joining
-        if self.HPC_grid() and \
+        if self.HPC_job() and \
            self.grid_options['evolution_type'] != 'join':
             self.exit()
 
@@ -2285,7 +2288,7 @@ class Population(analytics,
                 3,
             )
 
-    def now(self,style=None,specifier=None):
+    def now(self,now=None,style=None,specifier=None):
         """
         convenience function to return a string of the current time,
         using the format "%m/%d/%Y %H:%M:%S"
@@ -2296,7 +2299,8 @@ class Population(analytics,
 
             specifier: if set, uses this as a specifier rather than whatever is set by default or in the style variable
         """
-        now = datetime.datetime.now()
+        if not now:
+            now = datetime.datetime.now()
         if not specifier:
             if style == 'nospace':
                 # special case
diff --git a/binarycpython/utils/grid_options_defaults.py b/binarycpython/utils/grid_options_defaults.py
index 33dee5f0f..cb903637e 100644
--- a/binarycpython/utils/grid_options_defaults.py
+++ b/binarycpython/utils/grid_options_defaults.py
@@ -252,6 +252,7 @@ class grid_options_defaults():
             "condor_date" : shutil.which("date"), # bash location for Condor
             "condor_initial_dir" : None, # directory from which condor is run, if None is the directory in which your script is run
             "condor_submit" : shutil.which("condor_submit"), # the condor_submit command
+            "condor_q" : shutil.which("condor_q"), # the condor_submit command
             "condor_getenv" : True, # if True condor takes the environment at submission and copies it to the jobs. You almost certainly want this.
             "condor_batchname" : "binary_c-condor", # Condor batchname option
             "condor_kill_sig" : "SIGINT", # signal Condor should use to stop a process : note that grid.py expects this to be "SIGINT"
@@ -331,6 +332,7 @@ class grid_options_defaults():
 
             "condor_initial_dir" : "String. Directory from which condor scripts are run. If set to the default, None, this is the directory from which your script is run.",
             "condor_submit" : "String. The Condor_submit command, usually \"/usr/bin/condor_submit\" but will depend on your HTCondor installation.",
+            "condor_q" : "String. The Condor_q command, usually \"/usr/bin/condor_q\" but will depend on your HTCondor installation.",
             "condor_getenv" : "Boolean. If True, the default, condor takes the environment at submission and copies it to the jobs. You almost certainly want this to be True.",
             "condor_batchname" : "String. Condor batchname option: this is what appears in condor_q. Defaults to \"binary_c-condor\"",
             "condor_kill_sig" : "String. Signal Condor should use to stop a process. Note that grid.py expects this to be \"SIGINT\" which is the default.",
diff --git a/binarycpython/utils/slurm.py b/binarycpython/utils/slurm.py
index 8f2068393..5a6a15201 100644
--- a/binarycpython/utils/slurm.py
+++ b/binarycpython/utils/slurm.py
@@ -86,6 +86,7 @@ class slurm():
             with open(idfile,"w",encoding='utf-8') as fjobid:
                 fjobid.write("{jobid}\n".format(jobid=self.grid_options['slurm_jobid']))
                 fjobid.close()
+                self.NFS_flush_hach(idfile)
 
         # save slurm status
         file = self.slurm_status_file(dir=dir)
@@ -93,6 +94,7 @@ class slurm():
             with open(file,'w',encoding='utf-8') as f:
                 f.write(string)
                 f.close()
+                self.NFS_fluch_hack(file)
         return
 
     def get_slurm_status(self,
@@ -109,10 +111,12 @@ class slurm():
         if jobid is None or jobarrayindex is None :
             return None
         try:
+            
             path = pathlib.Path(self.slurm_status_file(dir=dir,
                                                        jobid=jobid,
                                                        jobarrayindex=jobarrayindex))
             if path:
+                self.NFS_flush_hack(path)
                 return path.read_text().strip()
             else:
                 return ""
@@ -365,3 +369,6 @@ echo \"running\" > {slurm_dir}/status/$SLURM_ARRAY_JOB_ID.$SLURM_ARRAY_TASK_ID
 
         # return True so we exit immediately
         return True
+
+    def slurm_queue_stats(self):
+        return None
diff --git a/binarycpython/utils/spacing_functions.py b/binarycpython/utils/spacing_functions.py
index 4b7fe6465..d18f25253 100644
--- a/binarycpython/utils/spacing_functions.py
+++ b/binarycpython/utils/spacing_functions.py
@@ -257,8 +257,11 @@ class spacing_functions():
             # _const_dt
             bse_stripped = bse_options.copy()
             for x in ['multiplicity']:
-                del bse_stripped[x]
-
+                try:
+                    del bse_stripped[x]
+                except:
+                    pass
+                
             # make a JSON string of the options (this can be
             # used to check the cache)
             bse_options_json = json.dumps(bse_stripped,
-- 
GitLab