Source code for sphot.run_sphot

#!python

description = '''run_sphot.py
Y.S.Murakami 2024 @ JHU
Run the basic sphot pipeline on a single galaxy.
This script automatically detects the running environment (e.g., Slurm array job or local machine) and switches the logging output accordingly.

Usage:
    run_sphot data_file.h5  [--out_folder=foldername] # run sphot on a new data file (option 1)
    run_sphot sphot_file.h5 --rerun_all      # rerun both basefit and scalefit (option 2)
    run_sphot sphot_file.h5 --continue       # continue scalefit on existing sphot file if necessary (option 3)
    run_sphot sphot_file.h5 --rerun_basefit  # rerun basefit on existing sphot file (option 4)
    run_sphot sphot_file.h5 --rerun_scalefit # rerun all scalefit on existing sphot file (option 5)
    run_sphot sphot_file.h5 --rerun_scalefit --filter=F555W # rerun specific filter 
    run_sphot sphot_file.h5 --photometry # run photometry on existing sphot file

Requirements:
    - for running the initial fit, a file PSFdata.h5 is required in the working directory. See documentation for the format.
    - fit parameters can be changed by placing sphot_config.toml in the working directory. If the file is not found, the default settings are used. See documentation for the format of the config file.
    
'''

# minimum import in case --help is called
import sys
if '--help' in sys.argv:
    print(description)
    sys.exit()
        
# import everything else needed to actually run sphot
import os
import numpy as np
from sphot.utils import load_and_crop
from sphot.core import run_basefit, run_aperphot, logger
from sphot.data import read_sphot_h5
from sphot.parallel import parallel_scalefit
from .config import config
global config
    
[docs] def main(): ''' main module of run_sphot.py: this is called when run_sphot is called from the command line (installed via project.scripts in pyproject.toml)''' # parse command line arguments datafiles, kwargs = argv_to_kwargs(sys.argv) # switch logging option based on how this file is running console_wrapper = prep_console_wrapper(force_standard_output=kwargs['force_standard_output']) # run sphot kwargs['plot'] = False for datafile in datafiles: console_wrapper(run_sphot,datafile=datafile,**kwargs)
if __name__ == '__main__': os.environ.setdefault("OMP_NUM_THREADS", "1") os.environ.setdefault("MKL_NUM_THREADS", "1") os.environ.setdefault("OPENBLAS_NUM_THREADS", "1") os.environ.setdefault("NUMEXPR_NUM_THREADS", "1") main() # ----------------------------------------------------- # run_sphot main modules # - argv_to_kwargs: parse command line arguments and convert them to keyword arguments # - run_sphot: the main function to run sphot # - prep_console_wrapper: a wrapper function to switch the logging output based on the running environment # -----------------------------------------------------
[docs] def argv_to_kwargs(args): # default options initial_run = True rerun_basefit = False continue_scalefit = False rerun_scalefit = False photometry = False force_standard_output = False filters_to_fit = config['prep']['filters'].copy() out_folder = './' # parse command line arguments # datafile = args[1] datafiles = [] if len(args) > 1: for arg in args[1:]: if '.h5' in arg: if arg == config['prep']['PSF_file']: continue datafiles.append(arg) elif arg == '--rerun_all': logger.info('rerun all option detected') rerun_basefit = True rerun_scalefit = True initial_run = False elif arg == '--rerun_basefit': logger.info('basefit rerun option detected') rerun_basefit = True initial_run = False elif arg == '--continue': logger.info('continue option detected. Running scalefit if necessary.') continue_scalefit = True initial_run = False elif arg == '--rerun_scalefit': logger.info('rerun option detected') scalefit_only = True rerun_scalefit = True initial_run = False elif arg == '--photometry': logger.info('run photometry option detected') initial_run = False rerun_basefit = False rerun_scalefit = False photometry = True elif arg.startswith('--out_folder'): out_folder = arg.split('=')[1] logger.info(f'output folder specified: {out_folder}') elif arg.startswith('--filter'): filters_to_fit = np.atleast_1d(arg.split('=')[1].split(',')) logger.info(f'filters to fit: {filters_to_fit}') elif arg.startswith('--standard_output'): force_standard_output=True logger.info('All outputs will be printed to the standard output') else: logger.info(f'unknown options: {arg}') kwargs = dict(initial_run = initial_run, continue_scalefit = continue_scalefit, rerun_basefit=rerun_basefit, rerun_scalefit=rerun_scalefit, photometry=photometry, out_folder=out_folder, filters_to_fit=filters_to_fit, force_standard_output=force_standard_output) return datafiles, kwargs
[docs] def run_sphot(datafile, initial_run=True,continue_scalefit=False, rerun_basefit=False,rerun_scalefit=False, photometry=False, out_folder='.',filters_to_fit=[],**kwargs): ''' main commands are put in this dummy function so that the rich output can be forwarded to a log file when running in slurm''' base_filter = config['core']['base_filter'] blur_psf = dict(config['prep'].get('blur_psf', {})) iter_basefit = config['core']['iter_basefit'] iter_scalefit = config['core']['iter_scalefit'] # 1. load data if initial_run: logger.info(f'Loading a new galaxy data: {[datafile]}') galaxy = load_and_crop(datafile, config['prep']['filters'], config['prep']['PSF_file'], base_filter = base_filter, plot = False, custom_initial_crop = config['prep']['custom_initial_crop'], sigma_guess = config['prep']['sigma_guess'], auto_crop = config['prep']['auto_crop'], auto_crop_factor = config['prep']['auto_crop_factor']) out_path = os.path.join(out_folder,f'{galaxy.name}_sphot.h5') logger.info(f'Galaxy data loaded: sphot file will be saved as {out_path}') rerun_basefit = True rerun_scalefit = True galaxy.save(out_path) else: logger.info(f'Loading an existing sphot file: {[datafile]}') galaxy = read_sphot_h5(datafile) out_path=datafile # 2. fit Sersic model using the base filter if rerun_basefit: logger.info('----- Starting base fit -----') logger.info(f'Number of iterations: {iter_basefit+1}') run_basefit(galaxy, base_filter = base_filter, fit_complex_model = config['core']['fit_complex_model'], blur_psf = blur_psf.get(base_filter), N_mainloop_iter = iter_basefit, **kwargs) galaxy.save(out_path) # 3. Scale Sersic model (parallel across non-base filters) if rerun_scalefit or continue_scalefit: logger.info('----- Starting Scale fit (parallel) -----') logger.info(f'Number of iterations: {iter_scalefit+1}') logger.info(f'Filters to fit: {filters_to_fit}') base_params = galaxy.images[base_filter].sersic_params # Decide which filters need scalefit. With --rerun_scalefit, redo # all of them; otherwise skip filters that already have a finite # psf_sub_data array. todo = [] for filt in filters_to_fit: if filt == base_filter: continue cd = galaxy.images[filt] if hasattr(cd, 'psf_sub_data') and not rerun_scalefit: if np.any(np.isfinite(getattr(cd, 'psf_sub_data'))): logger.info(f'Filter {filt} already has PSF-subtracted data; skipping') continue logger.info(f'Filter {filt} has all-NaN psf_sub_data; redoing') todo.append(filt) if todo: blur_for = {f: blur_psf.get(f) for f in todo} parallel_scalefit( galaxy, base_params, todo, blur_for, allow_refit=config['core']['allow_refit'], fit_complex_model=config['core']['fit_complex_model'], N_mainloop_iter=iter_scalefit, working_dir=os.getcwd(), log_dir=os.path.join(out_folder, 'parallel_logs'), ) galaxy.save(out_path) # 4. run aperphot if necessary if photometry: logger.info('----- Starting aperphot -----') aperphot_kwargs = kwargs.copy() aperphot_kwargs.update(config['aperture']) aper_sci = run_aperphot(galaxy=galaxy,**aperphot_kwargs) # save aperture info for key in ['positions','a','b','theta']: setattr(galaxy,'aper_sci_'+key,getattr(aper_sci,key)) # setattr(galaxy,'aper_sci_frac_enclosed',frac_enclosed) galaxy.save(out_path) logger.info('Completed Sphot')
[docs] def prep_console_wrapper(force_standard_output=False): if force_standard_output: def console_wrapper(func,*args,**kwargs): return func(*args,**kwargs) elif "SLURM_JOB_ID" in os.environ: from rich.console import Console def console_wrapper(func,*args,**kwargs): slurm_jobid = os.environ.get("SLURM_ARRAY_JOB_ID") slurm_taskid = os.environ.get("SLURM_ARRAY_TASK_ID") logfile = f'logs/{slurm_jobid}_{slurm_taskid}.log' logger.info(f"Running in Slurm (jobid={slurm_jobid}, taskid={slurm_taskid})") logger.info(f'Saving the progress in the log file: {logfile}') print(f'Saving the progress in the log file: {logfile}',flush=True) with open(logfile, 'w') as log_file: # Create a Console instance that writes to the log file console = Console(file=log_file, force_terminal=True, force_interactive=True) # console.print('test: this should be written to the log file '+logfile) kwargs.update(dict(console=console)) return func(*args,**kwargs) else: def console_wrapper(func,*args,**kwargs): return func(*args,**kwargs) return console_wrapper