Coverage for PyFHD/source_modeling/vis_model_transfer.py: 69%

158 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 10:58 +0800

1import numpy as np 

2from numpy.typing import NDArray 

3from PyFHD.data_setup.uvfits import extract_visibilities, create_params, extract_header 

4import logging 

5from PyFHD.pyfhd_tools.pyfhd_utils import run_command 

6from PyFHD.io.pyfhd_io import recarray_to_dict, save, load 

7import importlib_resources 

8import os 

9import shutil 

10import h5py 

11from scipy.io import readsav 

12from pathlib import Path 

13import sys 

14 

15 

16def vis_model_transfer( 

17 pyfhd_config: dict, obs: dict, params: dict, logger: logging.Logger 

18) -> tuple[NDArray[np.complex128], dict]: 

19 """ 

20 Transfer in a simulated model of the visibilities from either a sav file or uvfits file. 

21 

22 Parameters 

23 ---------- 

24 pyfhd_config : dict 

25 PyFHD's configuration dictionary containing all the options for a PyFHD run 

26 obs : dict 

27 The Observation Metadata dictionary 

28 params : dict 

29 Visibility metadata dictionary 

30 logger : logging.Logger 

31 PyFHD's logger 

32 

33 Returns 

34 ------- 

35 vis_model_arr : NDArray[np.complex128] 

36 Simulated model for the visibilities 

37 

38 See Also 

39 -------- 

40 PyFHD.source_modeling.vis_model_transfer.import_vis_model_from_sav : Import model from a sav file 

41 PyFHD.source_modeling.vis_model_transfer.import_vis_model_from_uvfits : Import model from a uvfits file 

42 """ 

43 if pyfhd_config["model_file_type"] == "sav": 

44 vis_model, params_model = import_vis_model_from_sav(pyfhd_config, obs, logger) 

45 elif pyfhd_config["model_file_type"] == "uvfits": 45 ↛ 49line 45 didn't jump to line 49 because the condition on line 45 was always true

46 vis_model, params_model = import_vis_model_from_uvfits( 

47 pyfhd_config, obs, logger 

48 ) 

49 elif pyfhd_config["model_file_type"] == "h5": 

50 # Assume it's a PyFHD h5 file 

51 model = load(pyfhd_config["model_file_path"], logger=logger) 

52 vis_model = model["vis_model_arr"] 

53 params_model = model["params"] 

54 else: 

55 logger.error("You chose a file type PyFHD can't import, exiting") 

56 raise ValueError( 

57 f"File type {pyfhd_config['model_file_type']} not supported. Please use 'sav' or 'uvfits'." 

58 ) 

59 

60 if pyfhd_config["flag_model"]: 

61 vis_model = flag_model_visibilities( 

62 vis_model, params, params_model, obs, pyfhd_config, logger 

63 ) 

64 else: 

65 logger.warning( 

66 "You have chosen not to flag the model visibilities, so PyFHD will not account for difference in time steps between the data and the model " 

67 "or any flagged tiles. This may lead to incorrect calibration results if the model visibilities are not compatible with the data visibilities." 

68 ) 

69 

70 if pyfhd_config["save_model"]: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 model_dir = Path(pyfhd_config["output_dir"], "model") 

72 model_dir.mkdir(parents=True, exist_ok=True) 

73 model_file = Path(model_dir, f"{pyfhd_config['obs_id']}_vis_model.h5") 

74 model = { 

75 "vis_model_arr": vis_model, 

76 "params": params_model, 

77 } 

78 save(model_file, model, "vis_model", logger=logger) 

79 

80 return vis_model 

81 

82 

83def import_vis_model_from_sav( 

84 pyfhd_config: dict, obs: dict, logger: logging.Logger 

85) -> tuple[NDArray[np.complex128], dict]: 

86 """ 

87 Read a model visibility array in from multiple IDL sav files which are in a directory 

88 given by pyfhd_config['model-file-path']. The data is assumed to be in the format of 

89 <obs_id>_params.sav, <obs_id>_vis_model_<pol_name>.sav. The pol_name follows the pol_names 

90 in the obs dictionary which are ['XX','YY','XY','YX','I','Q','U','V']. 

91 

92 Parameters 

93 ---------- 

94 pyfhd_config : dict 

95 The PyFHD config dictionary 

96 obs : dict 

97 The dictionary containing observation data and metadata 

98 logger : logging.Logger 

99 PyFHD's logger 

100 

101 Returns 

102 ------- 

103 vis_model_arr : NDArray[np.complex128] 

104 Simulated model for the visibilities 

105 params_model : dict 

106 The parameters for said model used for flagging 

107 """ 

108 try: 

109 path = Path( 

110 pyfhd_config["model_file_path"], f"{pyfhd_config['obs_id']}_params.sav" 

111 ) 

112 params_model = readsav(path) 

113 params_model = recarray_to_dict(params_model.params) 

114 # Read in the first polarization from pol_names 

115 pol_i = 0 

116 path = Path( 

117 pyfhd_config["model_file_path"], 

118 f"{pyfhd_config['obs_id']}_vis_model_{obs['pol_names'][pol_i]}.sav", 

119 ) 

120 curr_vis_model = readsav(path) 

121 if isinstance(curr_vis_model, dict): 121 ↛ 123line 121 didn't jump to line 123 because the condition on line 121 was always true

122 curr_vis_model = curr_vis_model["vis_model_ptr"] 

123 elif isinstance(curr_vis_model, np.recarray): 

124 curr_vis_model = curr_vis_model.vis_model_ptr 

125 # Sometimes arrays a packed in via a pointer, if so extract it 

126 if curr_vis_model.size == 1: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true

127 curr_vis_model = curr_vis_model[0] 

128 curr_vis_model = curr_vis_model.transpose().astype(np.complex128) 

129 # The shape should be n_pol, n_freq, n_time * n_baselines 

130 vis_model_shape = [obs["n_pol"]] + list(curr_vis_model.shape) 

131 vis_model_arr = np.empty(vis_model_shape, dtype=np.complex128) 

132 vis_model_arr[pol_i] = curr_vis_model 

133 for pol_i in range(1, obs["n_pol"]): 

134 path = Path( 

135 pyfhd_config["model_file_path"], 

136 f"{pyfhd_config['obs_id']}_vis_model_{obs['pol_names'][pol_i]}.sav", 

137 ) 

138 curr_vis_model = readsav(path) 

139 if isinstance(curr_vis_model, dict): 139 ↛ 141line 139 didn't jump to line 141 because the condition on line 139 was always true

140 curr_vis_model = curr_vis_model["vis_model_ptr"] 

141 elif isinstance(curr_vis_model, np.recarray): 

142 # Should be a rec array containing one item vis_model_ptr 

143 curr_vis_model = curr_vis_model.vis_model_ptr 

144 # Sometimes arrays a packed in via a pointer, if so extract it 

145 if curr_vis_model.size == 1: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true

146 curr_vis_model = curr_vis_model[0] 

147 vis_model_arr[pol_i] = curr_vis_model.transpose().astype(np.complex128) 

148 except FileNotFoundError as e: 

149 logger.error( 

150 f"PyFHD failed to load in the model visibilities while trying to transfer in file: {path} as the file wasn't found. PyFHD is exiting execution" 

151 ) 

152 exit() 

153 return vis_model_arr, params_model 

154 

155 

156def import_vis_model_from_uvfits( 

157 pyfhd_config: dict, obs: dict, logger: logging.Logger 

158) -> tuple[NDArray[np.complex128], dict]: 

159 """Read a model visibility array in from a `uvfits` with filepath given 

160 by pyfhd_config['model_file_path']. Reads data in via 

161 `PyFHD.data_setup.uvfits import extract_visibilities`. 

162 

163 Parameters 

164 ---------- 

165 pyfhd_config : dict 

166 The options from argparse in a dictionary, that have been verified using 

167 `PyFHD.pyfhd_tools.pyfhd_setup.pyfhd_setup`. 

168 obs : dict 

169 The observation dictionary as populated by `PyFHD.data_setup.obs.create_obs` 

170 

171 Returns 

172 ------- 

173 vis_model_arr : NDArray[np.complex128] 

174 Simulated model for the visibilities 

175 params_model : dict 

176 The parameters for said model used for flagging 

177 """ 

178 

179 header_model, params_data_model, _, _ = extract_header( 

180 pyfhd_config, logger, model_uvfits=True 

181 ) 

182 

183 if header_model["n_freq"] != obs["n_freq"]: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 model_path = pyfhd_config["model_file_path"] 

185 logger.error( 

186 f"The obs was expecting {obs['n_freq']} frequencies, " 

187 f"but the model visibilities read in from {model_path} " 

188 f"contain {header_model['n_freq']} freqs. Please supply " 

189 "model visibilities that match the data. Exiting now." 

190 ) 

191 exit() 

192 

193 params_model = create_params(header_model, params_data_model, logger) 

194 

195 vis_model_arr, _ = extract_visibilities( 

196 header_model, params_data_model, pyfhd_config, logger 

197 ) 

198 

199 return vis_model_arr, params_model 

200 

201 

202class _FlaggingInfoCounter(object): 

203 """Something to count and hold numbers to do with baselines""" 

204 

205 def __init__(self, params: dict): 

206 """ 

207 Given a populated `params` dict (as populated by 

208 `PyFHD.data_setup.uvfits.create_params`), calculate many useful quantities 

209 to do with antenna (tile) names and numbers, expected number of cross and 

210 auto correlations etc. 

211 """ 

212 

213 self.unique_times = np.unique(params["time"]) 

214 self.num_times = len(self.unique_times) 

215 

216 ant_names1 = np.unique(params["antenna1"]) 

217 ant_names2 = np.unique(params["antenna2"]) 

218 

219 self.num_visis = len(params["antenna1"]) 

220 

221 # If there are no auto-correlations, you don't get every unique tile 

222 # in either antenna1 or antenna2, so do a unique on both of them to be sure 

223 self.ant_names = np.unique(np.append(ant_names1, ant_names2)) 

224 self.num_ants = len(self.ant_names) 

225 

226 # indexes of the auto-correlations 

227 self.auto_locs = params["antenna1"] == params["antenna2"] 

228 self.num_autos = np.count_nonzero(self.auto_locs) 

229 

230 if self.num_autos == 0: 

231 self.num_autos_per_time = 0 

232 else: 

233 self.num_autos_per_time = self.num_ants 

234 

235 # indexes of the cross-correlations 

236 self.cross_locs = params["antenna1"] != params["antenna2"] 

237 

238 # how many cross-correlations there should be per time step 

239 self.num_cross_per_time = int((self.num_ants * (self.num_ants - 1)) / 2) 

240 

241 # number of visibilities per time step 

242 self.num_visi_per_time_step = self.num_cross_per_time + self.num_autos_per_time 

243 

244 # within a single time step, where the cross-correlations are indexed 

245 # can use this while iterating over time to select the crosses only 

246 self.cross_locs_per_time = np.where( 

247 self.cross_locs[: self.num_visi_per_time_step] 

248 )[0] 

249 

250 # within a single time step, where the cross-correlations are indexed 

251 # can use this while iterating over time to select the crosses only 

252 self.auto_locs_per_time = np.where( 

253 self.auto_locs[: self.num_visi_per_time_step] 

254 )[0] 

255 

256 self.ant1_single_time = params["antenna1"][: self.num_visi_per_time_step] 

257 self.ant2_single_time = params["antenna2"][: self.num_visi_per_time_step] 

258 

259 

260def flag_model_visibilities( 

261 vis_model_arr: NDArray[np.complex128], 

262 params_data: dict, 

263 params_model: dict, 

264 obs: dict, 

265 pyfhd_config: dict, 

266 logger: logging.Logger, 

267) -> NDArray[np.complex128]: 

268 """ 

269 Account for time offset and tile flags, and check that the uvfits 

270 and compatible. Needs to check if auto-correlations are present 

271 

272 Parameters 

273 ---------- 

274 vis_model_arr : NDArray[np.complex128] 

275 The model visibility array from the model uvfits file 

276 params_data : dict 

277 The params data from the observation uvfits 

278 params_model : dict 

279 The params data from the model uvfits 

280 obs : dict 

281 The observaton dictionary containing all the data from the observation uvfits file 

282 pyfhd_config : dict 

283 The PyFHD configuration dictionary 

284 logger : logging.Logger 

285 The PyFHD logger 

286 

287 Returns 

288 ------- 

289 vis_model_arr_flagged: NDArray[np.complex128] 

290 The flagged model visibility array 

291 """ 

292 

293 # Calculate a number of things we'll need to compare the data to the model 

294 flaginfo_data = _FlaggingInfoCounter(params_data) 

295 flaginfo_model = _FlaggingInfoCounter(params_model) 

296 

297 # For all time steps in both data and model, check the minimum time offset 

298 # between the two. If there is a constant minimum > 0, the times might 

299 # be consistently offset because of QUACK time 

300 time_offsets = [] 

301 for time in flaginfo_data.unique_times: 

302 # convert from julian to seconds via 24*60*60 

303 time_offsets.append( 

304 np.min(np.abs(flaginfo_model.unique_times - time)) * (24.0 * 60 * 60) 

305 ) 

306 

307 # The highest time resolution data we'll be dealing with. If the offset is 

308 # less than half of this, could be error in calculations rather than an 

309 # actual time offset 

310 min_integration = 0.5 

311 

312 # Try to ascertain is the model is offset in time from the data 

313 # Only believe an offset to an accuracy of 0.25, so round to two decimal 

314 # places. Here we are essentially trying to divine the QUACK time 

315 rounded_offset = np.round(np.mean(time_offsets), 2) 

316 

317 # If offset is big enough to be real, but smaller than half an integration 

318 # that add the `rounded_offset` (our calculated QUACK time) to the model 

319 # to work out which time steps are closest between model and data 

320 if ( 320 ↛ 329line 320 didn't jump to line 329 because the condition on line 320 was always true

321 rounded_offset >= min_integration / 2.0 

322 and rounded_offset <= obs["time_res"] / 2.0 

323 ): 

324 flaginfo_model.unique_times += rounded_offset / (24.0 * 60 * 60) 

325 logger.warning( 

326 f"Model time stamps are offset from data by an average of {rounded_offset}. Accounting for this to match model time steps to data" 

327 ) 

328 

329 model_times_to_use = [] 

330 # For each time step in the data, find the closest time step in the model 

331 for time in flaginfo_data.unique_times: 

332 t_ind = np.argmin(np.abs(flaginfo_model.unique_times - time)) 

333 model_times_to_use.append(t_ind) 

334 

335 model_times_to_use = np.array(model_times_to_use) 

336 

337 # This means one or more time steps in the model match best to the same 

338 # time step in the data. This shouldn't happen if the data and model 

339 # have the same time resolution so something bad has happened 

340 if flaginfo_data.num_times != len(np.unique(model_times_to_use)): 340 ↛ 342line 340 didn't jump to line 342 because the condition on line 340 was never true

341 

342 data_path = pyfhd_config["input_path"], pyfhd_config["obs_id"] + ".uvfits" 

343 model_path = ( 

344 str(pyfhd_config["model_file_path"]) + pyfhd_config["model_file_type"] 

345 ) 

346 raise ValueError( 

347 f"Could not match the time steps in the data uvfits: {data_path}" 

348 f" and model uvfits in {model_path}. Please check the model " 

349 "and try again. Exiting now." 

350 ) 

351 

352 # Now to flag the model - some models have no flagged tiles (antennas), 

353 # whereas the data might have flagged tiles (and so missing baselines). 

354 # This means we need to flag the missing tiles out of the data and 

355 # reshape the model 

356 

357 # If less antennas in the model than the data, we can't calibrate the whole 

358 # dataset so just error for now 

359 if flaginfo_model.num_ants < flaginfo_data.num_ants: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true

360 model_path = pyfhd_config["model_file_path"] + pyfhd_config["model_file_type"] 

361 raise ValueError( 

362 f"There are less antennas (tiles) in the model " 

363 f"{model_path} than in the data, so cannot calibrate the " 

364 "whole dataset. Please check the model " 

365 "and try again. Exiting now." 

366 ) 

367 

368 # Test to see if there are auto-correlations in data 

369 if flaginfo_model.num_autos == 0: 

370 logger.warning("There are no auto-correlations present in model.") 

371 

372 # This is where the fun begins - pyuvdata uses the tile number as written 

373 # in the 'TILE' column in the metafits file to encode baselines (and so 

374 # populate params[antenna1] and params[antenna2]). The numbers are MWA 

375 # assigned, and have nothing to do with antenna index. Birli and WODEN use 

376 # the tile index (one-indexed as the BASELINE encoding is one-indexed). 

377 # So to match a flagged tile from the data to the model, need to work out 

378 # the index of any flagged tiles in the data. All tile names are read in 

379 # from the metafits 

380 

381 flag_indexes = [] 

382 

383 # if the data just have tile indexes, the maximum should be the number 

384 # of tiles - use that to work out if we have Birli of pyuvdata input 

385 

386 # we should have a pyuvdata input in this case as a tile name is greater 

387 # than the number of tiles 

388 if np.max(flaginfo_data.ant_names) > len(obs["baseline_info"]["tile_names"]): 388 ↛ 392line 388 didn't jump to line 392 because the condition on line 388 was never true

389 

390 # Loop over all possible antenna (tile) names, and if they're not in the 

391 # list of antennas in this data set, append to flag_indexes 

392 for ant_ind, ant_name in enumerate(obs["baseline_info"]["tile_names"]): 

393 if ant_name not in flaginfo_data.ant_names: 

394 flag_indexes.append(ant_ind + 1) 

395 

396 # tiles are named by their index (1 indexed as per uvfits standard) 

397 else: 

398 for ant_name in range(1, len(obs["baseline_info"]["tile_names"]) + 1): 

399 if ant_name not in flaginfo_data.ant_names: 

400 flag_indexes.append(ant_name) 

401 

402 if len(flag_indexes) > 0: 402 ↛ 409line 402 didn't jump to line 409 because the condition on line 402 was always true

403 logger.info( 

404 f"Found flagged tiles {flag_indexes} in the data, flagging from the model" 

405 ) 

406 

407 # This gives us true/false if the visibilities should be included 

408 # for a single time step based on antenna1 and antenna2 

409 include_per_time_ant1 = np.isin( 

410 flaginfo_model.ant1_single_time, flag_indexes, invert=True 

411 ) 

412 include_per_time_ant2 = np.isin( 

413 flaginfo_model.ant2_single_time, flag_indexes, invert=True 

414 ) 

415 

416 # Doing a logic union combines info from ant1 and ant2 

417 model_include_per_time = np.nonzero(include_per_time_ant1 & include_per_time_ant2)[ 

418 0 

419 ] 

420 

421 # Check if the model has auto-correlations 

422 if (flaginfo_model.num_autos) > 0: 

423 data_include_per_time = np.sort( 

424 np.concat( 

425 [flaginfo_data.cross_locs_per_time, flaginfo_data.auto_locs_per_time] 

426 ) 

427 ) 

428 else: 

429 # If no auto-correlations, just use the cross-correlations, Keep the autos as zeroes 

430 data_include_per_time = flaginfo_data.cross_locs_per_time 

431 if flaginfo_data.num_autos > 0: 431 ↛ 438line 431 didn't jump to line 438 because the condition on line 431 was always true

432 logger.warning( 

433 "The data has auto-correlations, but the model does not. " 

434 "Setting the auto correlations locations to zero in the model." 

435 ) 

436 

437 # empty holder for the flagged model - this should be the same shape 

438 vis_model_arr_flagged = np.zeros( 

439 (obs["n_pol"], obs["n_freq"], flaginfo_data.num_visis), dtype=np.complex128 

440 ) 

441 

442 # For each time step that matches the data, copy across any visibilities 

443 # that aren't to be flagged 

444 for t_data_ind, t_model_ind in enumerate(model_times_to_use): 

445 

446 # Subset of cross-corrs from flagged model to select for this time step 

447 t_flag_inds = ( 

448 t_data_ind * flaginfo_data.num_visi_per_time_step + data_include_per_time 

449 ) 

450 # Subset of cross-corrs from full model to select for this time step 

451 t_model_inds = ( 

452 t_model_ind * flaginfo_model.num_visi_per_time_step + model_include_per_time 

453 ) 

454 # Stick it in the flagged model 

455 vis_model_arr_flagged[:, :, t_flag_inds] = vis_model_arr[:, :, t_model_inds] 

456 

457 return vis_model_arr_flagged 

458 

459 

460def convert_vis_model_arr_to_sav( 

461 vis_model_arr: NDArray[np.complex128], 

462 pyfhd_config: dict, 

463 logger: logging.Logger, 

464 model_vis_dir: str, 

465 n_pol: int, 

466): 

467 """ 

468 Converts the contents of `vis_model_arr` into an FHD .sav file format 

469 so we can import into existing IDL code with ease. First writes data to 

470 `hdf5` format, then uses IDL code template to convert to IDL `.sav` format 

471 compatible with FHD. Sticks the outputs into `model_vis_dir`. 

472 

473 Parameters 

474 ---------- 

475 vis_model_arr : NDArray[np.complex128] 

476 Complex array hold the model visibilities 

477 pyfhd_config : dict 

478 The options from argparse in a dictionary, that have been verified using 

479 `PyFHD.pyfhd_tools.pyfhd_setup.pyfhd_setup`. 

480 logger : logging.Logger 

481 PyFHD logger to feed information to 

482 model_vis_dir : str 

483 Directory location to write the output files to 

484 n_pol : int 

485 Number of polarisations to write out (each is written to an individual) 

486 `.sav` file 

487 """ 

488 

489 pol_names = ["XX", "YY", "XY", "YX"] 

490 

491 logger.info( 

492 f"vis_model_transfer: saving {model_vis_dir}/{pyfhd_config['obs_id']}_vis_model.h5" 

493 ) 

494 

495 with h5py.File(f"{model_vis_dir}/{pyfhd_config['obs_id']}_vis_model.h5", "w") as hf: 

496 

497 for pol, pol_name in enumerate(pol_names[:n_pol]): 

498 

499 hf.create_dataset( 

500 f"{pyfhd_config['obs_id']}_vis_model_{pol_name}", 

501 data=vis_model_arr[pol].transpose(), 

502 ) 

503 

504 hf.close() 

505 

506 # Grab the template IDL code and transfer so people can see what code was used 

507 # and modify if they want 

508 model_arr_convert_pro = importlib_resources.files("PyFHD.templates").joinpath( 

509 "convert_model_arr_to_sav.pro" 

510 ) 

511 shutil.copy(model_arr_convert_pro, model_vis_dir) 

512 

513 # Move into the output directory so IDL can see all the .pro files 

514 os.chdir(model_vis_dir) 

515 

516 # Run the IDL code 

517 logger.info( 

518 f"vis_model_transfer: converting {model_vis_dir}/{pyfhd_config['obs_id']}_vis_model.h5 to .sav format" 

519 ) 

520 

521 idl_command = f"idl -IDL_DEVICE ps -e convert_model_arr_to_sav -args {model_vis_dir} {pyfhd_config['obs_id']} {n_pol}" 

522 

523 run_command(idl_command, False)