Coverage for PyFHD/io/pyfhd_quickview.py: 76%
177 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
1import numpy as np
2from numpy.typing import NDArray
3from PyFHD.io.pyfhd_io import save
4from logging import Logger
5from pathlib import Path
6from astropy.io import fits
7from datetime import datetime
8from PyFHD.data_setup.obs import update_obs
9from PyFHD.beam_setup.beam_utils import beam_image
10from PyFHD.pyfhd_tools.unit_conv import pixel_to_radec
11from PyFHD.pyfhd_tools.pyfhd_utils import (
12 meshgrid,
13 rebin,
14 weight_invert,
15 region_grow,
16 crosspol_split_real_imaginary,
17)
18from PyFHD.gridding.gridding_utils import dirty_image_generate
19from PyFHD.plotting.image import plot_fits_image
22def get_image_renormalization(
23 obs: dict,
24 weights: NDArray[np.float64],
25 beam_base: NDArray[np.complex128],
26 filter_arr: NDArray[np.float64],
27 pyfhd_config: dict,
28 logger: Logger,
29) -> np.ndarray:
30 """
31 Use the weights to renormalize the image for Jy/beam to Jy/sr. While
32 Jy/beam is more common in radio astronomy, Jy/str is a physical
33 brightness unit, rather than an instrumental brightness unit.
35 Parameters
36 ----------
37 obs : dict
38 Observation metadata dictionary.
39 weights : NDArray[np.float64]
40 Visibility weights array.
41 beam_base : NDArray[np.complex128]
42 Beam orthoslant image per polarization.
43 filter_arr : NDArray[np.float64]
44 u-v array of filter weights, relevant for
45 a uniform filter.
46 pyfhd_config : dict
47 PyFHD configuration settings.
48 logger : Logger
49 PyFHD's Logger.
51 Returns
52 -------
53 NDArray[np.float64]
54 Conversion in image space from Jy/beam to Jy/sr per pixel.
55 """
56 # Use the weights to renormalize the image to units of Jy/sr
57 renorm_factor = np.empty(obs["n_pol"])
58 for pol_i in range(obs["n_pol"]):
59 dirty_image, _, _ = dirty_image_generate(
60 weights[pol_i],
61 pyfhd_config,
62 logger,
63 weights=weights[pol_i],
64 pad_uv_image=pyfhd_config["pad_uv_image"],
65 filter=filter_arr[pol_i],
66 beam_ptr=beam_base[pol_i],
67 degpix=obs["degpix"],
68 )
69 dirty_num = dirty_image[obs["dimension"] // 2, obs["elements"] // 2]
70 renorm_factor[pol_i] = 1 / dirty_num
71 renorm_factor[pol_i] *= (
72 beam_base[pol_i, int(obs["obsx"]), int(obs["obsy"])] ** 2
73 )
74 renorm_factor[pol_i] /= (obs["degpix"] * (np.pi / 180)) ** 2
75 return renorm_factor
78def quickview(
79 obs: dict,
80 psf: dict,
81 params: dict,
82 cal: dict,
83 vis_arr: NDArray[np.complex128],
84 vis_weights: NDArray[np.float64],
85 image_uv: NDArray[np.complex128],
86 weights_uv: NDArray[np.complex128],
87 variance_uv: NDArray[np.float64],
88 uniform_filter_uv: NDArray[np.float64],
89 model_uv: NDArray[np.complex128],
90 pyfhd_config: dict,
91 logger: Logger,
92) -> None:
93 """
94 Generate continuum images from all gridded u-v planes, and save as
95 FITS files and optionally PNG files.
97 Parameters
98 ----------
99 obs : dict
100 Observation metadata dictionary.
101 psf : dict
102 Beam dictionary.
103 params : dict
104 Visibility metadata dictionary.
105 cal : dict
106 Calibration dictionary.
107 vis_arr : NDArray[np.complex128]
108 Calibrated visibilities array.
109 vis_weights : NDArray[np.float64]
110 Visibility weights array.
111 image_uv : NDArray[np.complex128]
112 Continuum uv-plane of the calibrated data.
113 weights_uv : NDArray[np.complex128]
114 Continuum uv-plane of the weights (the sampling map).
115 variance_uv : NDArray[np.float64]
116 Continuum uv-plane of the variance (the variance map).
117 uniform_filter_uv : NDArray[np.float64]
118 Continuum uv-plane of the uniform filter (if used).
119 model_uv : NDArray[np.complex128]
120 Continuum uv-plane of the model data.
121 pyfhd_config : dict
122 PyFHD configuration settings.
123 logger : Logger
124 PyFHD's Logger.
125 """
126 # Save all the things into the output directory
127 pyfhd_config["metadata_dir"] = Path(pyfhd_config["output_dir"], "metadata")
128 pyfhd_config["visibilities_path"] = Path(pyfhd_config["output_dir"], "visibilities")
129 pyfhd_config["metadata_dir"].mkdir(exist_ok=True)
130 pyfhd_config["visibilities_path"].mkdir(exist_ok=True)
131 if pyfhd_config["save_obs"]: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 obs_path = Path(
133 pyfhd_config["metadata_dir"], f"{pyfhd_config['obs_id']}_obs.h5"
134 )
135 logger.info(f"Saving the obs dictionary to {obs_path}")
136 save(obs_path, obs, "obs", logger=logger)
137 if pyfhd_config["save_params"]: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 params_path = Path(
139 pyfhd_config["metadata_dir"], f"{pyfhd_config['obs_id']}_params.h5"
140 )
141 logger.info(f"Saving params dictionary to {params_path}")
142 save(params_path, params, "params", logger=logger)
143 if pyfhd_config["save_visibilities"]: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 if pyfhd_config["recalculate_grid"]:
145 gridding_path = Path(pyfhd_config["output_dir"], "gridding")
146 gridding_path.mkdir(exist_ok=True)
147 logger.info(f"Saving the gridded uv planes to {gridding_path}")
148 save(
149 Path(gridding_path, f"{pyfhd_config['obs_id']}_image_uv.h5"),
150 image_uv,
151 "image_uv",
152 logger=logger,
153 )
154 save(
155 Path(gridding_path, f"{pyfhd_config['obs_id']}_weights_uv.h5"),
156 weights_uv,
157 "weights_uv",
158 logger=logger,
159 )
160 save(
161 Path(gridding_path, f"{pyfhd_config['obs_id']}_variance_uv.h5"),
162 variance_uv,
163 "variance_uv",
164 logger=logger,
165 )
166 save(
167 Path(gridding_path, f"{pyfhd_config['obs_id']}_uniform_filter_uv.h5"),
168 uniform_filter_uv,
169 "uniform_filter_uv",
170 logger=logger,
171 )
172 save(
173 Path(gridding_path, f"{pyfhd_config['obs_id']}_model_uv.h5"),
174 model_uv,
175 "model_uv",
176 logger=logger,
177 )
178 cal_vis_arr_path = Path(
179 pyfhd_config["visibilities_path"],
180 f"{pyfhd_config['obs_id']}_calibrated_vis_arr.h5",
181 )
182 logger.info(f"Saving the calibrated visibilities to {cal_vis_arr_path}")
183 save(cal_vis_arr_path, vis_arr, "visibilities", logger=logger)
184 if pyfhd_config["save_cal"] and pyfhd_config["calibrate_visibilities"]: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true
185 cal_path = Path(pyfhd_config["output_dir"], "calibration")
186 cal_path.mkdir(exist_ok=True)
187 cal_path = Path(cal_path, f"{pyfhd_config['obs_id']}_cal.h5")
188 logger.info(f"Saving the calibration dictionary to {cal_path}")
189 save(cal_path, cal, "cal", logger=logger)
190 if pyfhd_config["save_weights"]: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true
191 weights_path = Path(
192 pyfhd_config["visibilities_path"],
193 f"{pyfhd_config['obs_id']}_calibrated_vis_weights.h5",
194 )
195 logger.info(f"Saving the calibrated weights to {weights_path}")
196 save(weights_path, vis_weights, "weights", logger=logger)
198 obs_out = update_obs(
199 obs, int(obs["dimension"] * pyfhd_config["pad_uv_image"]), obs["kpix"]
200 )
201 # In case pad_uv_image was a float can get a float out rather than int
202 obs_out["dimension"] = int(obs_out["dimension"])
203 obs_out["elements"] = int(obs_out["elements"])
204 horizon_mask = np.ones([obs_out["dimension"], obs_out["elements"]])
205 ra, _ = pixel_to_radec(
206 meshgrid(obs_out["dimension"], obs_out["elements"], 1),
207 meshgrid(obs_out["dimension"], obs_out["elements"], 2),
208 obs_out["astr"],
209 )
210 horizon_test = np.isnan(ra)
211 horizon_mask[horizon_test] = 0
213 # Calculate the beam mask and beam indexes associated with that mask
214 beam_mask = np.ones([obs_out["dimension"], obs_out["elements"]])
215 beam_avg = np.zeros([obs_out["dimension"], obs_out["elements"]])
216 beam_base_out = np.empty(
217 [obs_out["n_pol"], obs_out["dimension"], obs_out["elements"]]
218 )
219 beam_correction_out = np.empty_like(beam_base_out)
220 for pol_i in range(obs["n_pol"]):
221 beam_image_pol = beam_image(psf, obs, pol_i, square=False)
222 # Interpolate the beam_image_pol to the new dimensions with padding
223 beam_base_out[pol_i] = (
224 rebin(beam_image_pol, [obs_out["dimension"], obs_out["elements"]])
225 * horizon_mask
226 )
227 beam_correction_out[pol_i] = weight_invert(beam_base_out[pol_i], 1e-3)
228 if pol_i == 0:
229 beam_mask_test = beam_base_out[pol_i]
230 if pyfhd_config["allow_sidelobe_model_sources"]: 230 ↛ 235line 230 didn't jump to line 235 because the condition on line 230 was always true
231 beam_i = np.where(
232 beam_mask_test.flat >= 0.025
233 ) # This is beam_threshold/2 in FHD
234 else:
235 beam_i = region_grow(
236 beam_mask_test,
237 np.array(
238 [
239 obs_out["dimension"] / 2
240 + obs_out["dimension"] * obs_out["elements"] / 2
241 ],
242 dtype=np.int64,
243 ),
244 low=0.05 / 2, # This is beam_threshold/2 in FHD
245 high=np.max(beam_mask_test),
246 )
247 beam_mask0 = np.zeros([obs_out["dimension"], obs_out["elements"]])
248 beam_mask0.flat[beam_i] = 1
249 beam_avg += beam_base_out[pol_i] ** 2
250 beam_mask *= beam_mask0
251 beam_avg /= min(obs["n_pol"], 2)
252 beam_avg = np.sqrt(np.maximum(beam_avg, 0)) * beam_mask
253 beam_i = np.nonzero(beam_mask)
255 # Generate our dirty images of the uv planes
256 instr_dirty_arr = np.empty([obs["n_pol"], obs["dimension"], obs["elements"]])
257 instr_model_arr = np.empty([obs["n_pol"], obs["dimension"], obs["elements"]])
258 filter_arr = np.zeros([obs["n_pol"], obs["dimension"], obs["elements"]])
259 for pol_i in range(obs["n_pol"]):
260 complex_flag = pol_i > 1
261 filter = np.empty(0)
262 # Get the dirty image of the uv plane and the filter from filter_uv_uniform
263 instr_dirty_arr[pol_i], filter, _ = dirty_image_generate(
264 image_uv[pol_i],
265 pyfhd_config,
266 logger,
267 uniform_filter_uv=uniform_filter_uv,
268 degpix=obs_out["degpix"],
269 weights=weights_uv[pol_i],
270 pad_uv_image=pyfhd_config["pad_uv_image"],
271 filter=filter,
272 not_real=complex_flag,
273 beam_ptr=beam_base_out[pol_i],
274 )
275 filter_arr[pol_i] = filter
276 instr_model_arr[pol_i], filter, _ = dirty_image_generate(
277 model_uv[pol_i],
278 pyfhd_config,
279 logger,
280 uniform_filter_uv=uniform_filter_uv,
281 degpix=obs_out["degpix"],
282 weights=weights_uv[pol_i],
283 pad_uv_image=pyfhd_config["pad_uv_image"],
284 filter=filter,
285 not_real=complex_flag,
286 beam_ptr=beam_base_out[pol_i],
287 )
288 renorm_factor = get_image_renormalization(
289 obs_out, weights_uv, beam_base_out, filter_arr, pyfhd_config, logger
290 )
291 # Reshape renorm factor to multiply per polarization without loop to [obs["n_pol"], 1, 1]
292 renorm_factor = np.expand_dims(renorm_factor.reshape([obs_out["n_pol"], 1]), -1)
293 instr_dirty_arr *= renorm_factor
294 instr_model_arr *= renorm_factor
295 instr_residual_arr = instr_dirty_arr - instr_model_arr
296 # Get the pol_names
297 pol_names = obs["pol_names"]
298 # The cross-polarization XY and YX images are both complex, but are conjugate mirrors of each other
299 # To make images of these, we simply take the real and imaginary parts separately
300 if obs_out["n_pol"] >= 4: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 logger.info("Peforming Cross Polarization splits for real and imaginary")
302 instr_dirty_arr, pol_names = crosspol_split_real_imaginary(
303 instr_dirty_arr, pol_names=pol_names
304 )
305 instr_model_arr, _ = crosspol_split_real_imaginary(instr_model_arr)
306 instr_residual_arr, _ = crosspol_split_real_imaginary(instr_residual_arr)
307 # The weights should have been saved at this point and we only need them like this from here
308 weights_uv, _ = crosspol_split_real_imaginary(weights_uv)
310 # Build a fits header
311 logger.info("Building the FITS Header for all the FITS files")
312 fits_file = fits.PrimaryHDU(instr_dirty_arr[0])
313 # Write in the WCS into the header from the astr dictionary in obs_out
314 # You may want to change this later to make it more compatible with Astropy directly
315 # when eppsilon is switched to python.
316 fits_file.header.set("ctype1", str(obs_out["astr"]["ctype"][0]), "Coordinate Type")
317 fits_file.header.set("ctype2", str(obs_out["astr"]["ctype"][1]), "Coordinate Type")
318 fits_file.header.set(
319 "equinox", obs_out["astr"]["equinox"], "Equinox of Ref. Coord."
320 )
321 cd = obs_out["astr"]["cd"]
322 cd[0, :] = cd[0, :] * obs_out["astr"]["cdelt"][0]
323 cd[1, :] = cd[1, :] * obs_out["astr"]["cdelt"][1]
324 fits_file.header.set("cd1_1", cd[0, 0], "Degrees / Pixel")
325 fits_file.header.set("cd1_2", cd[0, 1], "Degrees / Pixel")
326 fits_file.header.set("cd2_1", cd[1, 0], "Degrees / Pixel")
327 fits_file.header.set("cd2_2", cd[1, 1], "Degrees / Pixel")
328 fits_file.header.set(
329 "crpix1", int(obs_out["astr"]["crpix"][0]), "Reference Pixel in X"
330 )
331 fits_file.header.set(
332 "crpix2", int(obs_out["astr"]["crpix"][1]), "Reference Pixel in Y"
333 )
334 fits_file.header.set(
335 "crval1", obs_out["astr"]["crval"][0], "R.A. (degrees) of reference pixel"
336 )
337 fits_file.header.set(
338 "crval2", obs_out["astr"]["crval"][1], "Declination of reference pixel"
339 )
340 fits_file.header.set("pv2_1", obs_out["astr"]["pv2"][0], "Projection parameter 1")
341 fits_file.header.set("pv2_2", obs_out["astr"]["pv2"][1], "Projection parameter 2")
342 for i in range(obs_out["astr"]["pv1"].size):
343 fits_file.header.set(
344 f"pv1_{i}", obs_out["astr"]["pv1"][i], "Projection parameters"
345 )
346 fits_file.header.set(
347 "mjd-obs", obs_out["astr"]["mjdobs"], "Modified Julian day of observations"
348 )
349 fits_file.header.set("date-obs", obs_out["astr"]["dateobs"], "Date of observations")
350 fits_file.header.set("radecsys", obs_out["astr"]["radecsys"], "Reference Frame")
351 fits_file.header.set(
352 "history",
353 f"World Coordinate System parameters written by PyFHD: {datetime.now().strftime('%b %d %Y %H:%M:%S')}",
354 )
355 # Create the fits header to store the dirty images
356 fits_file_apparent = fits_file.copy()
357 fits_file_apparent.header.set("bunit", "Jy/sr (apparent)")
359 # Create the fits header for the weights
360 fits_file_uv = fits.PrimaryHDU(np.abs(weights_uv[0]))
361 fits_file_uv.header.set("CD1_1", obs["kpix"], "Wavelengths / Pixel")
362 fits_file_uv.header.set("CD2_1", 0.0, "Wavelengths / Pixel")
363 fits_file_uv.header.set("CD1_2", 0.0, "Wavelengths / Pixel")
364 fits_file_uv.header.set("CD2_2", obs["kpix"], "Wavelengths / Pixel")
365 fits_file_uv.header.set(
366 "CRPIX1", obs_out["dimension"] / 2 + 1, "Reference Pixel in X"
367 )
368 fits_file_uv.header.set(
369 "CRPIX2", obs_out["elements"] / 2 + 1, "Reference Pixel in Y"
370 )
371 fits_file_uv.header.set("CRVAL1", 0.0, "Wavelengths (u)")
372 fits_file_uv.header.set("CRVAL2", 0.0, "Wavelengths (v)")
373 fits_file_uv.header.set(
374 "MJD-OBS", obs_out["astr"]["mjdobs"], "Modified Julian day of observation"
375 )
376 fits_file_uv.header.set(
377 "DATE-OBS", obs_out["astr"]["dateobs"], "Date of observation"
378 )
380 # If you need the beam_contour arrays add them here, lines 369-378 in fhd_quickview.pro
382 filter_name = pyfhd_config["image_filter"].split("_")[-1]
383 fits_output: Path = pyfhd_config["output_dir"] / "fits"
384 fits_output.mkdir(exist_ok=True)
385 if pyfhd_config["image_plots"]: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true
386 png_output: Path = pyfhd_config["output_dir"] / "plots" / "images"
387 png_output.mkdir(exist_ok=True)
388 for pol_i in range(obs["n_pol"]):
389 logger.info(f"Saving the FITS files for polarization {pol_names[pol_i]}")
390 instr_residual = instr_residual_arr[pol_i] * beam_correction_out[pol_i]
391 instr_dirty = instr_dirty_arr[pol_i] * beam_correction_out[pol_i]
392 instr_model = instr_model_arr[pol_i] * beam_correction_out[pol_i]
393 beam_use = beam_base_out[pol_i]
395 # Write the fits files for the dirty images
396 fits_file_apparent.data = instr_dirty
397 instr_dirty_name = (
398 f"{pyfhd_config['obs_id']}_{filter_name}_dirty_{pol_names[pol_i]}"
399 )
400 fits_file_apparent.writeto(
401 Path(
402 fits_output,
403 f"{instr_dirty_name}.fits",
404 ),
405 overwrite=True,
406 )
407 fits_file_apparent.data = instr_model
408 instr_model_name = (
409 f"{pyfhd_config['obs_id']}_{filter_name}_model_{pol_names[pol_i]}"
410 )
411 fits_file_apparent.writeto(
412 Path(
413 fits_output,
414 f"{instr_model_name}.fits",
415 ),
416 overwrite=True,
417 )
418 fits_file_apparent.data = instr_residual
419 instr_residual_name = (
420 f"{pyfhd_config['obs_id']}_{filter_name}_residual_{pol_names[pol_i]}"
421 )
422 fits_file_apparent.writeto(
423 Path(
424 fits_output,
425 f"{instr_residual_name}.fits",
426 ),
427 overwrite=True,
428 )
429 fits_file.data = beam_use
430 beam_name = f"{pyfhd_config['obs_id']}_beam_{pol_names[pol_i]}"
431 fits_file.writeto(
432 Path(fits_output, f"{beam_name}.fits"),
433 overwrite=True,
434 )
435 fits_file_uv.data = np.abs(weights_uv) * obs["n_vis"]
436 weights_name = f"{pyfhd_config['obs_id']}_uv_weights_{pol_names[pol_i]}"
437 fits_file_uv.writeto(
438 Path(
439 fits_output,
440 f"{weights_name}.fits",
441 ),
442 overwrite=True,
443 )
445 if pyfhd_config["image_plots"]: 445 ↛ 446line 445 didn't jump to line 446 because the condition on line 445 was never true
446 logger.info(
447 f"Plotting the continuum images for polarization {pol_names[pol_i]} into {pyfhd_config['output_dir']/'plots'/'images'}"
448 )
449 plot_fits_image(
450 Path(fits_output, f"{instr_dirty_name}.fits"),
451 Path(png_output, f"{instr_dirty_name}.png"),
452 title=f"Dirty Image {pol_names[pol_i]}",
453 logger=logger,
454 )
455 plot_fits_image(
456 Path(fits_output, f"{instr_model_name}.fits"),
457 Path(png_output, f"{instr_model_name}.png"),
458 title=f"Model Image {pol_names[pol_i]}",
459 logger=logger,
460 )
461 plot_fits_image(
462 Path(fits_output, f"{instr_residual_name}.fits"),
463 Path(png_output, f"{instr_residual_name}.png"),
464 title=f"Residual Image {pol_names[pol_i]}",
465 logger=logger,
466 )
467 plot_fits_image(
468 Path(fits_output, f"{beam_name}.fits"),
469 Path(png_output, f"{beam_name}.png"),
470 title=f"Beam Image {pol_names[pol_i]}",
471 logger=logger,
472 )
473 plot_fits_image(
474 Path(fits_output, f"{weights_name}.fits"),
475 Path(png_output, f"{weights_name}.png"),
476 title=f"Weight Image {pol_names[pol_i]}",
477 logger=logger,
478 )