Coverage for PyFHD/io/pyfhd_quickview.py: 76%

177 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 10:58 +0800

1import numpy as np 

2from numpy.typing import NDArray 

3from PyFHD.io.pyfhd_io import save 

4from logging import Logger 

5from pathlib import Path 

6from astropy.io import fits 

7from datetime import datetime 

8from PyFHD.data_setup.obs import update_obs 

9from PyFHD.beam_setup.beam_utils import beam_image 

10from PyFHD.pyfhd_tools.unit_conv import pixel_to_radec 

11from PyFHD.pyfhd_tools.pyfhd_utils import ( 

12 meshgrid, 

13 rebin, 

14 weight_invert, 

15 region_grow, 

16 crosspol_split_real_imaginary, 

17) 

18from PyFHD.gridding.gridding_utils import dirty_image_generate 

19from PyFHD.plotting.image import plot_fits_image 

20 

21 

22def get_image_renormalization( 

23 obs: dict, 

24 weights: NDArray[np.float64], 

25 beam_base: NDArray[np.complex128], 

26 filter_arr: NDArray[np.float64], 

27 pyfhd_config: dict, 

28 logger: Logger, 

29) -> np.ndarray: 

30 """ 

31 Use the weights to renormalize the image for Jy/beam to Jy/sr. While 

32 Jy/beam is more common in radio astronomy, Jy/str is a physical 

33 brightness unit, rather than an instrumental brightness unit. 

34 

35 Parameters 

36 ---------- 

37 obs : dict 

38 Observation metadata dictionary. 

39 weights : NDArray[np.float64] 

40 Visibility weights array. 

41 beam_base : NDArray[np.complex128] 

42 Beam orthoslant image per polarization. 

43 filter_arr : NDArray[np.float64] 

44 u-v array of filter weights, relevant for 

45 a uniform filter. 

46 pyfhd_config : dict 

47 PyFHD configuration settings. 

48 logger : Logger 

49 PyFHD's Logger. 

50 

51 Returns 

52 ------- 

53 NDArray[np.float64] 

54 Conversion in image space from Jy/beam to Jy/sr per pixel. 

55 """ 

56 # Use the weights to renormalize the image to units of Jy/sr 

57 renorm_factor = np.empty(obs["n_pol"]) 

58 for pol_i in range(obs["n_pol"]): 

59 dirty_image, _, _ = dirty_image_generate( 

60 weights[pol_i], 

61 pyfhd_config, 

62 logger, 

63 weights=weights[pol_i], 

64 pad_uv_image=pyfhd_config["pad_uv_image"], 

65 filter=filter_arr[pol_i], 

66 beam_ptr=beam_base[pol_i], 

67 degpix=obs["degpix"], 

68 ) 

69 dirty_num = dirty_image[obs["dimension"] // 2, obs["elements"] // 2] 

70 renorm_factor[pol_i] = 1 / dirty_num 

71 renorm_factor[pol_i] *= ( 

72 beam_base[pol_i, int(obs["obsx"]), int(obs["obsy"])] ** 2 

73 ) 

74 renorm_factor[pol_i] /= (obs["degpix"] * (np.pi / 180)) ** 2 

75 return renorm_factor 

76 

77 

78def quickview( 

79 obs: dict, 

80 psf: dict, 

81 params: dict, 

82 cal: dict, 

83 vis_arr: NDArray[np.complex128], 

84 vis_weights: NDArray[np.float64], 

85 image_uv: NDArray[np.complex128], 

86 weights_uv: NDArray[np.complex128], 

87 variance_uv: NDArray[np.float64], 

88 uniform_filter_uv: NDArray[np.float64], 

89 model_uv: NDArray[np.complex128], 

90 pyfhd_config: dict, 

91 logger: Logger, 

92) -> None: 

93 """ 

94 Generate continuum images from all gridded u-v planes, and save as 

95 FITS files and optionally PNG files. 

96 

97 Parameters 

98 ---------- 

99 obs : dict 

100 Observation metadata dictionary. 

101 psf : dict 

102 Beam dictionary. 

103 params : dict 

104 Visibility metadata dictionary. 

105 cal : dict 

106 Calibration dictionary. 

107 vis_arr : NDArray[np.complex128] 

108 Calibrated visibilities array. 

109 vis_weights : NDArray[np.float64] 

110 Visibility weights array. 

111 image_uv : NDArray[np.complex128] 

112 Continuum uv-plane of the calibrated data. 

113 weights_uv : NDArray[np.complex128] 

114 Continuum uv-plane of the weights (the sampling map). 

115 variance_uv : NDArray[np.float64] 

116 Continuum uv-plane of the variance (the variance map). 

117 uniform_filter_uv : NDArray[np.float64] 

118 Continuum uv-plane of the uniform filter (if used). 

119 model_uv : NDArray[np.complex128] 

120 Continuum uv-plane of the model data. 

121 pyfhd_config : dict 

122 PyFHD configuration settings. 

123 logger : Logger 

124 PyFHD's Logger. 

125 """ 

126 # Save all the things into the output directory 

127 pyfhd_config["metadata_dir"] = Path(pyfhd_config["output_dir"], "metadata") 

128 pyfhd_config["visibilities_path"] = Path(pyfhd_config["output_dir"], "visibilities") 

129 pyfhd_config["metadata_dir"].mkdir(exist_ok=True) 

130 pyfhd_config["visibilities_path"].mkdir(exist_ok=True) 

131 if pyfhd_config["save_obs"]: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 obs_path = Path( 

133 pyfhd_config["metadata_dir"], f"{pyfhd_config['obs_id']}_obs.h5" 

134 ) 

135 logger.info(f"Saving the obs dictionary to {obs_path}") 

136 save(obs_path, obs, "obs", logger=logger) 

137 if pyfhd_config["save_params"]: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true

138 params_path = Path( 

139 pyfhd_config["metadata_dir"], f"{pyfhd_config['obs_id']}_params.h5" 

140 ) 

141 logger.info(f"Saving params dictionary to {params_path}") 

142 save(params_path, params, "params", logger=logger) 

143 if pyfhd_config["save_visibilities"]: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 if pyfhd_config["recalculate_grid"]: 

145 gridding_path = Path(pyfhd_config["output_dir"], "gridding") 

146 gridding_path.mkdir(exist_ok=True) 

147 logger.info(f"Saving the gridded uv planes to {gridding_path}") 

148 save( 

149 Path(gridding_path, f"{pyfhd_config['obs_id']}_image_uv.h5"), 

150 image_uv, 

151 "image_uv", 

152 logger=logger, 

153 ) 

154 save( 

155 Path(gridding_path, f"{pyfhd_config['obs_id']}_weights_uv.h5"), 

156 weights_uv, 

157 "weights_uv", 

158 logger=logger, 

159 ) 

160 save( 

161 Path(gridding_path, f"{pyfhd_config['obs_id']}_variance_uv.h5"), 

162 variance_uv, 

163 "variance_uv", 

164 logger=logger, 

165 ) 

166 save( 

167 Path(gridding_path, f"{pyfhd_config['obs_id']}_uniform_filter_uv.h5"), 

168 uniform_filter_uv, 

169 "uniform_filter_uv", 

170 logger=logger, 

171 ) 

172 save( 

173 Path(gridding_path, f"{pyfhd_config['obs_id']}_model_uv.h5"), 

174 model_uv, 

175 "model_uv", 

176 logger=logger, 

177 ) 

178 cal_vis_arr_path = Path( 

179 pyfhd_config["visibilities_path"], 

180 f"{pyfhd_config['obs_id']}_calibrated_vis_arr.h5", 

181 ) 

182 logger.info(f"Saving the calibrated visibilities to {cal_vis_arr_path}") 

183 save(cal_vis_arr_path, vis_arr, "visibilities", logger=logger) 

184 if pyfhd_config["save_cal"] and pyfhd_config["calibrate_visibilities"]: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true

185 cal_path = Path(pyfhd_config["output_dir"], "calibration") 

186 cal_path.mkdir(exist_ok=True) 

187 cal_path = Path(cal_path, f"{pyfhd_config['obs_id']}_cal.h5") 

188 logger.info(f"Saving the calibration dictionary to {cal_path}") 

189 save(cal_path, cal, "cal", logger=logger) 

190 if pyfhd_config["save_weights"]: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 weights_path = Path( 

192 pyfhd_config["visibilities_path"], 

193 f"{pyfhd_config['obs_id']}_calibrated_vis_weights.h5", 

194 ) 

195 logger.info(f"Saving the calibrated weights to {weights_path}") 

196 save(weights_path, vis_weights, "weights", logger=logger) 

197 

198 obs_out = update_obs( 

199 obs, int(obs["dimension"] * pyfhd_config["pad_uv_image"]), obs["kpix"] 

200 ) 

201 # In case pad_uv_image was a float can get a float out rather than int 

202 obs_out["dimension"] = int(obs_out["dimension"]) 

203 obs_out["elements"] = int(obs_out["elements"]) 

204 horizon_mask = np.ones([obs_out["dimension"], obs_out["elements"]]) 

205 ra, _ = pixel_to_radec( 

206 meshgrid(obs_out["dimension"], obs_out["elements"], 1), 

207 meshgrid(obs_out["dimension"], obs_out["elements"], 2), 

208 obs_out["astr"], 

209 ) 

210 horizon_test = np.isnan(ra) 

211 horizon_mask[horizon_test] = 0 

212 

213 # Calculate the beam mask and beam indexes associated with that mask 

214 beam_mask = np.ones([obs_out["dimension"], obs_out["elements"]]) 

215 beam_avg = np.zeros([obs_out["dimension"], obs_out["elements"]]) 

216 beam_base_out = np.empty( 

217 [obs_out["n_pol"], obs_out["dimension"], obs_out["elements"]] 

218 ) 

219 beam_correction_out = np.empty_like(beam_base_out) 

220 for pol_i in range(obs["n_pol"]): 

221 beam_image_pol = beam_image(psf, obs, pol_i, square=False) 

222 # Interpolate the beam_image_pol to the new dimensions with padding 

223 beam_base_out[pol_i] = ( 

224 rebin(beam_image_pol, [obs_out["dimension"], obs_out["elements"]]) 

225 * horizon_mask 

226 ) 

227 beam_correction_out[pol_i] = weight_invert(beam_base_out[pol_i], 1e-3) 

228 if pol_i == 0: 

229 beam_mask_test = beam_base_out[pol_i] 

230 if pyfhd_config["allow_sidelobe_model_sources"]: 230 ↛ 235line 230 didn't jump to line 235 because the condition on line 230 was always true

231 beam_i = np.where( 

232 beam_mask_test.flat >= 0.025 

233 ) # This is beam_threshold/2 in FHD 

234 else: 

235 beam_i = region_grow( 

236 beam_mask_test, 

237 np.array( 

238 [ 

239 obs_out["dimension"] / 2 

240 + obs_out["dimension"] * obs_out["elements"] / 2 

241 ], 

242 dtype=np.int64, 

243 ), 

244 low=0.05 / 2, # This is beam_threshold/2 in FHD 

245 high=np.max(beam_mask_test), 

246 ) 

247 beam_mask0 = np.zeros([obs_out["dimension"], obs_out["elements"]]) 

248 beam_mask0.flat[beam_i] = 1 

249 beam_avg += beam_base_out[pol_i] ** 2 

250 beam_mask *= beam_mask0 

251 beam_avg /= min(obs["n_pol"], 2) 

252 beam_avg = np.sqrt(np.maximum(beam_avg, 0)) * beam_mask 

253 beam_i = np.nonzero(beam_mask) 

254 

255 # Generate our dirty images of the uv planes 

256 instr_dirty_arr = np.empty([obs["n_pol"], obs["dimension"], obs["elements"]]) 

257 instr_model_arr = np.empty([obs["n_pol"], obs["dimension"], obs["elements"]]) 

258 filter_arr = np.zeros([obs["n_pol"], obs["dimension"], obs["elements"]]) 

259 for pol_i in range(obs["n_pol"]): 

260 complex_flag = pol_i > 1 

261 filter = np.empty(0) 

262 # Get the dirty image of the uv plane and the filter from filter_uv_uniform 

263 instr_dirty_arr[pol_i], filter, _ = dirty_image_generate( 

264 image_uv[pol_i], 

265 pyfhd_config, 

266 logger, 

267 uniform_filter_uv=uniform_filter_uv, 

268 degpix=obs_out["degpix"], 

269 weights=weights_uv[pol_i], 

270 pad_uv_image=pyfhd_config["pad_uv_image"], 

271 filter=filter, 

272 not_real=complex_flag, 

273 beam_ptr=beam_base_out[pol_i], 

274 ) 

275 filter_arr[pol_i] = filter 

276 instr_model_arr[pol_i], filter, _ = dirty_image_generate( 

277 model_uv[pol_i], 

278 pyfhd_config, 

279 logger, 

280 uniform_filter_uv=uniform_filter_uv, 

281 degpix=obs_out["degpix"], 

282 weights=weights_uv[pol_i], 

283 pad_uv_image=pyfhd_config["pad_uv_image"], 

284 filter=filter, 

285 not_real=complex_flag, 

286 beam_ptr=beam_base_out[pol_i], 

287 ) 

288 renorm_factor = get_image_renormalization( 

289 obs_out, weights_uv, beam_base_out, filter_arr, pyfhd_config, logger 

290 ) 

291 # Reshape renorm factor to multiply per polarization without loop to [obs["n_pol"], 1, 1] 

292 renorm_factor = np.expand_dims(renorm_factor.reshape([obs_out["n_pol"], 1]), -1) 

293 instr_dirty_arr *= renorm_factor 

294 instr_model_arr *= renorm_factor 

295 instr_residual_arr = instr_dirty_arr - instr_model_arr 

296 # Get the pol_names 

297 pol_names = obs["pol_names"] 

298 # The cross-polarization XY and YX images are both complex, but are conjugate mirrors of each other 

299 # To make images of these, we simply take the real and imaginary parts separately 

300 if obs_out["n_pol"] >= 4: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 logger.info("Peforming Cross Polarization splits for real and imaginary") 

302 instr_dirty_arr, pol_names = crosspol_split_real_imaginary( 

303 instr_dirty_arr, pol_names=pol_names 

304 ) 

305 instr_model_arr, _ = crosspol_split_real_imaginary(instr_model_arr) 

306 instr_residual_arr, _ = crosspol_split_real_imaginary(instr_residual_arr) 

307 # The weights should have been saved at this point and we only need them like this from here 

308 weights_uv, _ = crosspol_split_real_imaginary(weights_uv) 

309 

310 # Build a fits header 

311 logger.info("Building the FITS Header for all the FITS files") 

312 fits_file = fits.PrimaryHDU(instr_dirty_arr[0]) 

313 # Write in the WCS into the header from the astr dictionary in obs_out 

314 # You may want to change this later to make it more compatible with Astropy directly 

315 # when eppsilon is switched to python. 

316 fits_file.header.set("ctype1", str(obs_out["astr"]["ctype"][0]), "Coordinate Type") 

317 fits_file.header.set("ctype2", str(obs_out["astr"]["ctype"][1]), "Coordinate Type") 

318 fits_file.header.set( 

319 "equinox", obs_out["astr"]["equinox"], "Equinox of Ref. Coord." 

320 ) 

321 cd = obs_out["astr"]["cd"] 

322 cd[0, :] = cd[0, :] * obs_out["astr"]["cdelt"][0] 

323 cd[1, :] = cd[1, :] * obs_out["astr"]["cdelt"][1] 

324 fits_file.header.set("cd1_1", cd[0, 0], "Degrees / Pixel") 

325 fits_file.header.set("cd1_2", cd[0, 1], "Degrees / Pixel") 

326 fits_file.header.set("cd2_1", cd[1, 0], "Degrees / Pixel") 

327 fits_file.header.set("cd2_2", cd[1, 1], "Degrees / Pixel") 

328 fits_file.header.set( 

329 "crpix1", int(obs_out["astr"]["crpix"][0]), "Reference Pixel in X" 

330 ) 

331 fits_file.header.set( 

332 "crpix2", int(obs_out["astr"]["crpix"][1]), "Reference Pixel in Y" 

333 ) 

334 fits_file.header.set( 

335 "crval1", obs_out["astr"]["crval"][0], "R.A. (degrees) of reference pixel" 

336 ) 

337 fits_file.header.set( 

338 "crval2", obs_out["astr"]["crval"][1], "Declination of reference pixel" 

339 ) 

340 fits_file.header.set("pv2_1", obs_out["astr"]["pv2"][0], "Projection parameter 1") 

341 fits_file.header.set("pv2_2", obs_out["astr"]["pv2"][1], "Projection parameter 2") 

342 for i in range(obs_out["astr"]["pv1"].size): 

343 fits_file.header.set( 

344 f"pv1_{i}", obs_out["astr"]["pv1"][i], "Projection parameters" 

345 ) 

346 fits_file.header.set( 

347 "mjd-obs", obs_out["astr"]["mjdobs"], "Modified Julian day of observations" 

348 ) 

349 fits_file.header.set("date-obs", obs_out["astr"]["dateobs"], "Date of observations") 

350 fits_file.header.set("radecsys", obs_out["astr"]["radecsys"], "Reference Frame") 

351 fits_file.header.set( 

352 "history", 

353 f"World Coordinate System parameters written by PyFHD: {datetime.now().strftime('%b %d %Y %H:%M:%S')}", 

354 ) 

355 # Create the fits header to store the dirty images 

356 fits_file_apparent = fits_file.copy() 

357 fits_file_apparent.header.set("bunit", "Jy/sr (apparent)") 

358 

359 # Create the fits header for the weights 

360 fits_file_uv = fits.PrimaryHDU(np.abs(weights_uv[0])) 

361 fits_file_uv.header.set("CD1_1", obs["kpix"], "Wavelengths / Pixel") 

362 fits_file_uv.header.set("CD2_1", 0.0, "Wavelengths / Pixel") 

363 fits_file_uv.header.set("CD1_2", 0.0, "Wavelengths / Pixel") 

364 fits_file_uv.header.set("CD2_2", obs["kpix"], "Wavelengths / Pixel") 

365 fits_file_uv.header.set( 

366 "CRPIX1", obs_out["dimension"] / 2 + 1, "Reference Pixel in X" 

367 ) 

368 fits_file_uv.header.set( 

369 "CRPIX2", obs_out["elements"] / 2 + 1, "Reference Pixel in Y" 

370 ) 

371 fits_file_uv.header.set("CRVAL1", 0.0, "Wavelengths (u)") 

372 fits_file_uv.header.set("CRVAL2", 0.0, "Wavelengths (v)") 

373 fits_file_uv.header.set( 

374 "MJD-OBS", obs_out["astr"]["mjdobs"], "Modified Julian day of observation" 

375 ) 

376 fits_file_uv.header.set( 

377 "DATE-OBS", obs_out["astr"]["dateobs"], "Date of observation" 

378 ) 

379 

380 # If you need the beam_contour arrays add them here, lines 369-378 in fhd_quickview.pro 

381 

382 filter_name = pyfhd_config["image_filter"].split("_")[-1] 

383 fits_output: Path = pyfhd_config["output_dir"] / "fits" 

384 fits_output.mkdir(exist_ok=True) 

385 if pyfhd_config["image_plots"]: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true

386 png_output: Path = pyfhd_config["output_dir"] / "plots" / "images" 

387 png_output.mkdir(exist_ok=True) 

388 for pol_i in range(obs["n_pol"]): 

389 logger.info(f"Saving the FITS files for polarization {pol_names[pol_i]}") 

390 instr_residual = instr_residual_arr[pol_i] * beam_correction_out[pol_i] 

391 instr_dirty = instr_dirty_arr[pol_i] * beam_correction_out[pol_i] 

392 instr_model = instr_model_arr[pol_i] * beam_correction_out[pol_i] 

393 beam_use = beam_base_out[pol_i] 

394 

395 # Write the fits files for the dirty images 

396 fits_file_apparent.data = instr_dirty 

397 instr_dirty_name = ( 

398 f"{pyfhd_config['obs_id']}_{filter_name}_dirty_{pol_names[pol_i]}" 

399 ) 

400 fits_file_apparent.writeto( 

401 Path( 

402 fits_output, 

403 f"{instr_dirty_name}.fits", 

404 ), 

405 overwrite=True, 

406 ) 

407 fits_file_apparent.data = instr_model 

408 instr_model_name = ( 

409 f"{pyfhd_config['obs_id']}_{filter_name}_model_{pol_names[pol_i]}" 

410 ) 

411 fits_file_apparent.writeto( 

412 Path( 

413 fits_output, 

414 f"{instr_model_name}.fits", 

415 ), 

416 overwrite=True, 

417 ) 

418 fits_file_apparent.data = instr_residual 

419 instr_residual_name = ( 

420 f"{pyfhd_config['obs_id']}_{filter_name}_residual_{pol_names[pol_i]}" 

421 ) 

422 fits_file_apparent.writeto( 

423 Path( 

424 fits_output, 

425 f"{instr_residual_name}.fits", 

426 ), 

427 overwrite=True, 

428 ) 

429 fits_file.data = beam_use 

430 beam_name = f"{pyfhd_config['obs_id']}_beam_{pol_names[pol_i]}" 

431 fits_file.writeto( 

432 Path(fits_output, f"{beam_name}.fits"), 

433 overwrite=True, 

434 ) 

435 fits_file_uv.data = np.abs(weights_uv) * obs["n_vis"] 

436 weights_name = f"{pyfhd_config['obs_id']}_uv_weights_{pol_names[pol_i]}" 

437 fits_file_uv.writeto( 

438 Path( 

439 fits_output, 

440 f"{weights_name}.fits", 

441 ), 

442 overwrite=True, 

443 ) 

444 

445 if pyfhd_config["image_plots"]: 445 ↛ 446line 445 didn't jump to line 446 because the condition on line 445 was never true

446 logger.info( 

447 f"Plotting the continuum images for polarization {pol_names[pol_i]} into {pyfhd_config['output_dir']/'plots'/'images'}" 

448 ) 

449 plot_fits_image( 

450 Path(fits_output, f"{instr_dirty_name}.fits"), 

451 Path(png_output, f"{instr_dirty_name}.png"), 

452 title=f"Dirty Image {pol_names[pol_i]}", 

453 logger=logger, 

454 ) 

455 plot_fits_image( 

456 Path(fits_output, f"{instr_model_name}.fits"), 

457 Path(png_output, f"{instr_model_name}.png"), 

458 title=f"Model Image {pol_names[pol_i]}", 

459 logger=logger, 

460 ) 

461 plot_fits_image( 

462 Path(fits_output, f"{instr_residual_name}.fits"), 

463 Path(png_output, f"{instr_residual_name}.png"), 

464 title=f"Residual Image {pol_names[pol_i]}", 

465 logger=logger, 

466 ) 

467 plot_fits_image( 

468 Path(fits_output, f"{beam_name}.fits"), 

469 Path(png_output, f"{beam_name}.png"), 

470 title=f"Beam Image {pol_names[pol_i]}", 

471 logger=logger, 

472 ) 

473 plot_fits_image( 

474 Path(fits_output, f"{weights_name}.fits"), 

475 Path(png_output, f"{weights_name}.png"), 

476 title=f"Weight Image {pol_names[pol_i]}", 

477 logger=logger, 

478 )