Coverage for PyFHD/pyfhd.py: 0%
293 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
1import logging
2import sys
3import time
4from datetime import timedelta
5import h5py
6from h5py import File
7import numpy as np
8from pathlib import Path
9from PyFHD.beam_setup.beam import create_psf
10from PyFHD.calibration.calibrate import calibrate, calibrate_qu_mixing
11from PyFHD.data_setup.obs import create_obs
12from PyFHD.data_setup.uvfits import (
13 create_layout,
14 create_params,
15 extract_header,
16 extract_visibilities,
17)
18from PyFHD.flagging.flagging import vis_flag, vis_flag_basic
19from PyFHD.gridding.gridding_utils import crosspol_reformat
20from PyFHD.gridding.visibility_grid import visibility_grid
21from PyFHD.pyfhd_tools.pyfhd_setup import (
22 pyfhd_parser,
23 pyfhd_setup,
24 write_collated_yaml_config,
25)
26from PyFHD.pyfhd_tools.pyfhd_utils import (
27 simple_deproject_w_term,
28 vis_noise_calc,
29 vis_weights_update,
30)
31from PyFHD.source_modeling.vis_model_transfer import (
32 vis_model_transfer,
33)
34from PyFHD.io.pyfhd_io import save, load
35from PyFHD.io.pyfhd_quickview import quickview
36from PyFHD.healpix.export import healpix_snapshot_cube_generate
37from PyFHD.plotting.gridding import plot_gridding
38import importlib_resources
39import shutil
42def _print_time_diff(
43 start: float, end: float, description: str, logger: logging.Logger
44):
45 """
46 Print the time difference in a nice format between start and end time
48 Parameters
49 ----------
50 start : float
51 Start time in seconds since epoch
52 end : float
53 End time in seconds since epoch
54 """
55 runtime = end - start
56 if runtime > 60:
57 runtime = timedelta(seconds=end - start)
58 logger.info(f"{description} completed in: {runtime}")
59 elif runtime < 1:
60 logger.info(
61 f"{description} completed in: {round(runtime * 1000,5)} milliseconds"
62 )
63 else:
64 logger.info(f"{description} completed in: {round(runtime,5)} seconds")
67def finish_pyfhd(
68 pyfhd_start: float, logger: logging.Logger, psf: dict | File, pyfhd_config: dict
69):
70 pyfhd_end = time.time()
71 runtime = timedelta(seconds=pyfhd_end - pyfhd_start)
72 # Close all open h5 files
73 if isinstance(psf, h5py.File):
74 psf.close()
75 if not pyfhd_config["get_sample_data"]:
76 # Write a final collated yaml for the final pyfhd_config
77 write_collated_yaml_config(
78 pyfhd_config, Path(pyfhd_config["output_dir"], "config"), "-final"
79 )
80 # Save the config in a HDF5 file for ease of reading in previous parameters from previous runs
81 save(
82 Path(pyfhd_config["output_dir"], "config", "pyfhd_config.h5"),
83 pyfhd_config,
84 "pyfhd_config",
85 logger=logger,
86 )
87 logger.info(
88 f'PyFHD Run Completed for {pyfhd_config["obs_id"]}\nTotal Runtime (Days:Hours:Minutes:Seconds.Millseconds): {runtime}'
89 )
90 # Close the handlers in the log
91 for handler in logger.handlers:
92 handler.close()
94 return
97def main():
98 pyfhd_start = time.time()
99 configargparser = pyfhd_parser()
100 options = configargparser.parse_args()
102 if options.get_sample_data:
103 options.input_path = importlib_resources.files("PyFHD").joinpath(
104 "resources/1088285600_example"
105 )
106 options.output_path = Path.cwd() / "input" / "1088285600_example"
108 # Validate options and Create the Logger
109 pyfhd_config, logger = pyfhd_setup(options)
111 if pyfhd_config["get_sample_data"]:
112 sample_path = Path(importlib_resources.files("PyFHD")).joinpath(
113 "resources/1088285600_example"
114 )
115 for file in sample_path.iterdir():
116 if file.is_file():
117 dest_file = pyfhd_config["output_path"] / file.name
118 if file.suffix == ".yaml":
119 config = file.read_text()
120 # Replace the input directory in the config with the current working directory
121 config = config.replace(
122 "./PyFHD/resources/1088285600_example",
123 str(pyfhd_config["output_path"]),
124 )
125 dest_file.write_text(config)
126 logger.info(
127 f"Wrote the sample config file to {dest_file} with updated paths to your machine"
128 )
129 else:
130 shutil.copyfile(file, dest_file)
131 logger.info(f"Copied sample data file: {file.name} to {dest_file}")
132 finish_pyfhd(pyfhd_start, logger, None, pyfhd_config)
133 return
135 pyfhd_successful = False
136 try:
137 checkpoint_name = pyfhd_config["description"]
138 if pyfhd_config["description"] is None or pyfhd_config["description"] == "":
139 checkpoint_name = pyfhd_config["obs_id"]
141 if pyfhd_config["save_checkpoints"]:
142 pyfhd_config["checkpoint_dir"] = Path(
143 pyfhd_config["output_dir"], "checkpoints"
144 )
145 pyfhd_config["checkpoint_dir"].mkdir(exist_ok=True)
147 if (
148 pyfhd_config["obs_checkpoint"] is None
149 and pyfhd_config["calibrate_checkpoint"] is None
150 ):
151 header_start = time.time()
152 # Get the header
153 pyfhd_header, params_data, antenna_header, antenna_data = extract_header(
154 pyfhd_config, logger
155 )
156 header_end = time.time()
157 _print_time_diff(header_start, header_end, "PyFHD Header Created", logger)
159 params_start = time.time()
160 # Get params
161 params = create_params(pyfhd_header, params_data, logger)
162 params_end = time.time()
163 _print_time_diff(params_start, params_end, "Params Created", logger)
165 visibility_start = time.time()
166 vis_arr, vis_weights = extract_visibilities(
167 pyfhd_header, params_data, pyfhd_config, logger
168 )
169 visibility_end = time.time()
170 _print_time_diff(
171 visibility_start, visibility_end, "Visibilities Extracted", logger
172 )
174 # Save the raw visibilities and weights if the option is set
175 pyfhd_config["visibilities_path"] = Path(
176 pyfhd_config["output_dir"], "visibilities"
177 )
178 if pyfhd_config["save_visibilities"]:
179 pyfhd_config["visibilities_path"].mkdir(exist_ok=True)
180 raw_vis_arr_path = Path(
181 pyfhd_config["visibilities_path"],
182 f"{pyfhd_config['obs_id']}_raw_vis_arr.h5",
183 )
184 save(raw_vis_arr_path, vis_arr, "visibilities", logger=logger)
186 if pyfhd_config["save_weights"]:
187 pyfhd_config["visibilities_path"].mkdir(exist_ok=True)
188 weights_path = Path(
189 pyfhd_config["visibilities_path"],
190 f"{pyfhd_config['obs_id']}_raw_vis_weights.h5",
191 )
192 save(weights_path, vis_weights, "weights", logger=logger)
194 # If you wish to reorder your visibilities, insert your function to do that here.
195 # If you wish to average your fits data by time or frequency, insert your functions to do that here
197 layout_start = time.time()
198 layout = create_layout(antenna_header, antenna_data, pyfhd_config, logger)
199 layout_end = time.time()
200 _print_time_diff(
201 layout_start, layout_end, "Layout Dictionary Extracted", logger
202 )
204 # Get obs
205 obs_start = time.time()
206 obs = create_obs(pyfhd_header, params, layout, pyfhd_config, logger)
207 obs_end = time.time()
208 _print_time_diff(obs_start, obs_end, "Obs Dictionary Created", logger)
210 # If you decide to use the PyFHD checkpoint system, save the uncalibrated visibility &
211 # observation data and metadata now
212 if pyfhd_config["save_checkpoints"]:
213 checkpoint = {
214 "obs": obs,
215 "params": params,
216 "vis_arr": vis_arr,
217 "vis_weights": vis_weights,
218 }
219 save(
220 Path(
221 pyfhd_config["checkpoint_dir"],
222 f"{checkpoint_name}_obs_checkpoint.h5",
223 ),
224 checkpoint,
225 "obs_checkpoint",
226 logger=logger,
227 )
228 del checkpoint
229 logger.info(
230 f"Checkpoint Saved: Uncalibrated visibility parameters, array and weights and the observation metadata dictionary saved into {Path(pyfhd_config['output_dir'], 'obs_checkpoint.h5')}"
231 )
232 else:
233 # Load the checkpoint and initialize the required variables
234 if (
235 pyfhd_config["obs_checkpoint"]
236 and Path(pyfhd_config["obs_checkpoint"]).exists()
237 ):
238 obs_checkpoint = load(pyfhd_config["obs_checkpoint"], logger=logger)
239 obs = obs_checkpoint["obs"]
240 params = obs_checkpoint["params"]
241 vis_arr = obs_checkpoint["vis_arr"]
242 vis_weights = obs_checkpoint["vis_weights"]
243 del obs_checkpoint
244 logger.info(
245 f"Checkpoint Loaded: Uncalibrated visibility parameters, array and weights and the observation metadata dictionary loaded from {Path(pyfhd_config['output_dir'], 'obs_checkpoint.h5')}"
246 )
248 # If the calibration checkpoint exists, load it now before loading in the beam
249 # to get the observation metadata and visibility parameters
250 if (
251 pyfhd_config["calibrate_checkpoint"] is not None
252 and Path(pyfhd_config["calibrate_checkpoint"]).exists()
253 ):
254 cal_checkpoint = load(pyfhd_config["calibrate_checkpoint"], logger=logger)
255 obs = cal_checkpoint["obs"]
256 params = cal_checkpoint["params"]
257 vis_arr = cal_checkpoint["vis_arr"]
258 vis_model_arr = cal_checkpoint["vis_model_arr"]
259 vis_weights = cal_checkpoint["vis_weights"]
260 cal = cal_checkpoint["cal"]
261 del cal_checkpoint
262 logger.info(
263 f"Checkpoint Loaded: Calibrated and Flagged visibility parameters, array and weights, the flagged observation metadata dictionary and the calibration dictionary loaded from {Path(pyfhd_config['output_dir'], 'calibrate_checkpoint.h5')}"
264 )
266 # Read in the beam from a file returning a psf dictionary
267 psf_start = time.time()
268 psf = create_psf(obs, pyfhd_config, logger)
269 psf_end = time.time()
270 _print_time_diff(
271 psf_start, psf_end, "Beam and PSF dictionary imported.", logger
272 )
274 # Check if the calibrate checkpoint has been used, if not run the calibration steps
275 if pyfhd_config["calibrate_checkpoint"] is None:
276 if pyfhd_config["deproject_w_term"] is not None:
277 w_term_start = time.time()
278 vis_arr = simple_deproject_w_term(
279 obs, params, vis_arr, pyfhd_config["deproject_w_term"], logger
280 )
281 w_term_end = time.time()
282 _print_time_diff(
283 w_term_start,
284 w_term_end,
285 "Simple W-Term Deprojection Applied",
286 logger,
287 )
289 # Peform basic flagging
290 if pyfhd_config["flag_basic"]:
291 basic_flag_start = time.time()
292 vis_weights, obs = vis_flag_basic(
293 vis_weights, vis_arr, obs, pyfhd_config, logger
294 )
295 basic_flag_end = time.time()
296 _print_time_diff(
297 basic_flag_start, basic_flag_end, "Basic Flagging Completed", logger
298 )
300 # Update the visibility weights
301 weight_start = time.time()
302 vis_weights, obs = vis_weights_update(vis_weights, obs, psf, params)
303 weight_end = time.time()
304 _print_time_diff(
305 weight_start,
306 weight_end,
307 "Visibilities Weights Updated After Basic Flagging",
308 logger,
309 )
311 # Get the vis_model_arr from a UVFITS file or SAV files and flag any issues
312 vis_model_arr_start = time.time()
313 vis_model_arr = vis_model_transfer(pyfhd_config, obs, params, logger)
314 vis_model_arr_end = time.time()
315 _print_time_diff(
316 vis_model_arr_start,
317 vis_model_arr_end,
318 "Model Imported and Flagged From UVFITS",
319 logger,
320 )
322 # Skipped initializing the cal structure as it mostly just copies values from the obs, params, config and the skymodel from FHD
323 # However, there is resulting cal structure for logging and output purposes to store the resulting gain and any other associated
324 # arrays
325 if pyfhd_config["calibrate_visibilities"]:
326 logger.info("Beginning Calibration")
327 cal_start = time.time()
328 vis_arr, cal, obs, pyfhd_config = calibrate(
329 obs,
330 params,
331 vis_arr,
332 vis_weights,
333 vis_model_arr,
334 pyfhd_config,
335 logger,
336 )
337 cal_end = time.time()
338 _print_time_diff(
339 cal_start,
340 cal_end,
341 "Visibilities calibrated and cal dictionary with gains created",
342 logger,
343 )
345 if obs["n_pol"] >= 4:
346 qu_mixing_start = time.time()
347 cal["stokes_mix_phase"] = calibrate_qu_mixing(
348 vis_arr, vis_model_arr, vis_weights, obs
349 )
350 qu_mixing_end = time.time()
351 _print_time_diff(
352 qu_mixing_start,
353 qu_mixing_end,
354 'Calibrate QU-Mixing has finished, result in cal["stokes_mix_phase"]',
355 logger,
356 )
358 weight_start = time.time()
359 vis_weights, obs = vis_weights_update(vis_weights, obs, psf, params)
360 weight_end = time.time()
361 _print_time_diff(
362 weight_start,
363 weight_end,
364 "Visibilities Weights Updated After Calibration",
365 logger,
366 )
368 if pyfhd_config["flag_visibilities"]:
369 flag_start = time.time()
370 vis_weights, obs = vis_flag(vis_arr, vis_weights, obs, params)
371 flag_end = time.time()
372 _print_time_diff(
373 flag_start, flag_end, "Visibilities Flagged", logger
374 )
375 if np.max(vis_weights) == 0:
376 raise ValueError(
377 "All visibilities were flagged during the flagging step, exiting PyFHD."
378 )
380 noise_start = time.time()
381 obs["vis_noise"] = vis_noise_calc(obs, vis_arr, vis_weights)
382 noise_end = time.time()
383 _print_time_diff(
384 noise_start, noise_end, "Noise Calculated and added to obs", logger
385 )
387 if pyfhd_config["save_checkpoints"]:
388 checkpoint = {
389 "obs": obs,
390 "params": params,
391 "vis_arr": vis_arr,
392 "vis_model_arr": vis_model_arr,
393 "vis_weights": vis_weights,
394 "cal": cal,
395 }
396 save(
397 Path(
398 pyfhd_config["checkpoint_dir"],
399 f"{checkpoint_name}_calibrate_checkpoint.h5",
400 ),
401 checkpoint,
402 "calibrate_checkpoint",
403 logger=logger,
404 )
405 del checkpoint
406 logger.info(
407 f"Checkpoint Saved: Calibrated and Flagged visibility parameters, array and weights, the flagged observation metadata dictionary and the calibration dictionary saved into {Path(pyfhd_config['output_dir'], 'calibrate_checkpoint.h5')}"
408 )
410 if pyfhd_config["cal_stop"]:
411 logger.info(
412 "The cal_stop option was used, calibration was finished, saving calibration files then exiting PyFHD"
413 )
414 pyfhd_config["metadata_dir"] = Path(pyfhd_config["output_dir"], "metadata")
415 pyfhd_config["visibilities_path"] = Path(
416 pyfhd_config["output_dir"], "visibilities"
417 )
418 pyfhd_config["metadata_dir"].mkdir(exist_ok=True)
419 pyfhd_config["visibilities_path"].mkdir(exist_ok=True)
421 if pyfhd_config["save_obs"]:
422 obs_path = Path(
423 pyfhd_config["metadata_dir"], f"{pyfhd_config['obs_id']}_obs.h5"
424 )
425 logger.info(f"Saving the obs dictionary to {obs_path}")
426 save(obs_path, obs, "obs", logger=logger)
428 if pyfhd_config["save_params"]:
429 params_path = Path(
430 pyfhd_config["metadata_dir"], f"{pyfhd_config['obs_id']}_params.h5"
431 )
432 logger.info(f"Saving params dictionary to {params_path}")
433 save(params_path, params, "params", logger=logger)
435 if pyfhd_config["save_cal"] and pyfhd_config["calibrate_visibilities"]:
436 cal_path = Path(pyfhd_config["output_dir"], "calibration")
437 cal_path.mkdir(exist_ok=True)
438 cal_path = Path(cal_path, f"{pyfhd_config['obs_id']}_cal.h5")
439 logger.info(f"Saving the calibration dictionary to {cal_path}")
440 save(cal_path, cal, "cal", logger=logger)
442 if pyfhd_config["save_weights"]:
443 weights_path = Path(
444 pyfhd_config["visibilities_path"],
445 f"{pyfhd_config['obs_id']}_calibrated_vis_weights.h5",
446 )
447 logger.info(f"Saving the calibrated weights to {weights_path}")
448 save(weights_path, vis_weights, "weights", logger=logger)
450 if pyfhd_config["save_visibilities"]:
451 cal_vis_arr_path = Path(
452 pyfhd_config["visibilities_path"],
453 f"{pyfhd_config['obs_id']}_calibrated_vis_arr.h5",
454 )
455 logger.info(f"Saving the calibrated visibilities to {cal_vis_arr_path}")
456 save(cal_vis_arr_path, vis_arr, "visibilities", logger=logger)
457 logger.info(
458 "The cal_stop option was used, calibration was finished, exiting PyFHD"
459 )
460 finish_pyfhd(pyfhd_start, logger, psf, pyfhd_config)
461 pyfhd_successful = True
462 sys.exit(0)
464 if (
465 psf["image_info"]["image_power_beam_arr"] is not None
466 and psf["image_info"]["image_power_beam_arr"].shape == 1
467 ):
468 # Turn off beam_per_baseline if image_power_beam_arr is
469 # only one value
470 pyfhd_config["beam_per_baseline"] = False
472 if (
473 pyfhd_config["recalculate_grid"]
474 and pyfhd_config["gridding_checkpoint"] is None
475 ):
476 grid_start = time.time()
477 image_uv = np.empty(
478 (obs["n_pol"], obs["elements"], obs["dimension"]), dtype=np.complex128
479 )
480 weights_uv = np.empty(
481 (obs["n_pol"], obs["elements"], obs["dimension"]), dtype=np.complex128
482 )
483 variance_uv = np.empty((obs["n_pol"], obs["elements"], obs["dimension"]))
484 uniform_filter_uv = np.empty((obs["elements"], obs["dimension"]))
485 if vis_model_arr is not None:
486 model_uv = np.empty(
487 (obs["n_pol"], obs["elements"], obs["dimension"]),
488 dtype=np.complex128,
489 )
490 # Since it's done per polarization, we can do multi-processing if it's not fast enough
491 for pol_i in range(obs["n_pol"]):
492 logger.info(
493 f"Gridding has begun for polarization {obs['pol_names'][pol_i]}"
494 )
495 if pol_i == 0:
496 uniform_flag = True
497 else:
498 uniform_flag = False
499 if pol_i > 1:
500 no_conjugate = True
501 else:
502 no_conjugate = False
503 gridding_dict = visibility_grid(
504 vis_arr[pol_i],
505 vis_weights[pol_i],
506 obs,
507 psf,
508 params,
509 pol_i,
510 pyfhd_config,
511 logger,
512 uniform_flag=uniform_flag,
513 no_conjugate=no_conjugate,
514 model=vis_model_arr[pol_i],
515 )
516 if len(gridding_dict.keys()) != 0:
517 image_uv[pol_i] = gridding_dict["image_uv"]
518 weights_uv[pol_i] = gridding_dict["weights"]
519 variance_uv[pol_i] = gridding_dict["variance"]
520 if uniform_flag:
521 uniform_filter_uv = gridding_dict["uniform_filter"]
522 obs["nf_vis"] = gridding_dict["obs"]["nf_vis"]
523 if vis_model_arr is not None:
524 model_uv[pol_i] = gridding_dict["model_return"]
525 logger.info(
526 f"Gridding has finished for polarization {obs['pol_names'][pol_i]}"
527 )
528 else:
529 logger.error("All data was flagged during gridding, exiting")
530 sys.exit(1)
531 if obs["n_pol"] == 4:
532 logger.info("Performing Crosspol reformatting")
533 image_uv = crosspol_reformat(image_uv)
534 weights_uv = crosspol_reformat(weights_uv)
535 if vis_model_arr is not None:
536 model_uv = crosspol_reformat(model_uv)
537 if pyfhd_config["gridding_plots"]:
538 logger.info(
539 f"Plotting the continuum gridding outputs into {pyfhd_config['output_dir']/'plots'/'gridding'}"
540 )
541 plot_gridding(
542 obs,
543 image_uv,
544 weights_uv,
545 variance_uv,
546 pyfhd_config,
547 model_uv=model_uv,
548 logger=logger,
549 )
550 if pyfhd_config["save_checkpoints"]:
551 checkpoint = {
552 "image_uv": image_uv,
553 "weights_uv": weights_uv,
554 "variance_uv": variance_uv,
555 "uniform_filter_uv": uniform_filter_uv,
556 }
557 if vis_model_arr is not None:
558 checkpoint["model_uv"] = model_uv
559 save(
560 Path(
561 pyfhd_config["checkpoint_dir"],
562 f"{checkpoint_name}_gridding_checkpoint.h5",
563 ),
564 checkpoint,
565 "gridding_checkpoint",
566 logger=logger,
567 )
568 del checkpoint
569 logger.info(
570 f"Checkpoint Saved: The Gridded UV Planes saved into {Path(pyfhd_config['output_dir'], 'gridding_checkpoint.h5')}"
571 )
572 grid_end = time.time()
573 _print_time_diff(grid_start, grid_end, "Visibilities gridded", logger)
574 else:
575 if pyfhd_config["gridding_checkpoint"]:
576 grid_checkpoint = load(
577 pyfhd_config["gridding_checkpoint"], logger=logger
578 )
579 image_uv = grid_checkpoint["image_uv"]
580 weights_uv = grid_checkpoint["weights_uv"]
581 variance_uv = grid_checkpoint["variance_uv"]
582 uniform_filter_uv = grid_checkpoint["uniform_filter_uv"]
583 if "model_uv" in grid_checkpoint:
584 model_uv = grid_checkpoint["model_uv"]
585 del grid_checkpoint
586 logger.info(
587 f"Checkpoint Loaded: The Gridded UV Planes loaded from {Path(pyfhd_config['output_dir'], 'gridding_checkpoint.h5')}"
588 )
590 # Call quickview to save the all the variables if set in the config. Also create dirty images and save
591 # FITS files with the dirty images on a per polarization basis
592 if pyfhd_config["export_images"]:
593 quickview(
594 obs,
595 psf,
596 params,
597 cal,
598 vis_arr,
599 vis_weights,
600 image_uv,
601 weights_uv,
602 variance_uv,
603 uniform_filter_uv,
604 model_uv,
605 pyfhd_config,
606 logger,
607 )
609 # Create the healpix HDF5 cubes and save them to disk
610 if pyfhd_config["snapshot_healpix_export"]:
611 healpix_snapshot_cube_generate(
612 obs,
613 psf,
614 cal,
615 params,
616 vis_arr,
617 vis_model_arr,
618 vis_weights,
619 pyfhd_config,
620 logger,
621 )
622 pyfhd_successful = True
623 finish_pyfhd(pyfhd_start, logger, psf, pyfhd_config)
624 except Exception as e:
625 logger.exception(
626 f"An error occurred in PyFHD: {e}\n\tExiting PyFHD.",
627 exc_info=True,
628 )
629 pyfhd_successful = False
630 finally:
631 if pyfhd_successful:
632 return
633 else:
634 pyfhd_end = time.time()
635 runtime = timedelta(seconds=pyfhd_end - pyfhd_start)
636 # Close all open h5 files
637 if "psf" in locals() and isinstance(psf, h5py.File):
638 psf.close()
639 logger.info(
640 f'PyFHD Run Unsuccessful for {pyfhd_config["obs_id"]}\nTotal Runtime (Days:Hours:Minutes:Seconds.Millseconds): {runtime}'
641 )
642 # Close the handlers in the log
643 for handler in logger.handlers:
644 handler.close()
645 sys.exit(1)
648if __name__ == "__main__":
649 main()