Coverage for PyFHD/source_modeling/vis_model_transfer.py: 69%
158 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
1import numpy as np
2from numpy.typing import NDArray
3from PyFHD.data_setup.uvfits import extract_visibilities, create_params, extract_header
4import logging
5from PyFHD.pyfhd_tools.pyfhd_utils import run_command
6from PyFHD.io.pyfhd_io import recarray_to_dict, save, load
7import importlib_resources
8import os
9import shutil
10import h5py
11from scipy.io import readsav
12from pathlib import Path
13import sys
16def vis_model_transfer(
17 pyfhd_config: dict, obs: dict, params: dict, logger: logging.Logger
18) -> tuple[NDArray[np.complex128], dict]:
19 """
20 Transfer in a simulated model of the visibilities from either a sav file or uvfits file.
22 Parameters
23 ----------
24 pyfhd_config : dict
25 PyFHD's configuration dictionary containing all the options for a PyFHD run
26 obs : dict
27 The Observation Metadata dictionary
28 params : dict
29 Visibility metadata dictionary
30 logger : logging.Logger
31 PyFHD's logger
33 Returns
34 -------
35 vis_model_arr : NDArray[np.complex128]
36 Simulated model for the visibilities
38 See Also
39 --------
40 PyFHD.source_modeling.vis_model_transfer.import_vis_model_from_sav : Import model from a sav file
41 PyFHD.source_modeling.vis_model_transfer.import_vis_model_from_uvfits : Import model from a uvfits file
42 """
43 if pyfhd_config["model_file_type"] == "sav":
44 vis_model, params_model = import_vis_model_from_sav(pyfhd_config, obs, logger)
45 elif pyfhd_config["model_file_type"] == "uvfits": 45 ↛ 49line 45 didn't jump to line 49 because the condition on line 45 was always true
46 vis_model, params_model = import_vis_model_from_uvfits(
47 pyfhd_config, obs, logger
48 )
49 elif pyfhd_config["model_file_type"] == "h5":
50 # Assume it's a PyFHD h5 file
51 model = load(pyfhd_config["model_file_path"], logger=logger)
52 vis_model = model["vis_model_arr"]
53 params_model = model["params"]
54 else:
55 logger.error("You chose a file type PyFHD can't import, exiting")
56 raise ValueError(
57 f"File type {pyfhd_config['model_file_type']} not supported. Please use 'sav' or 'uvfits'."
58 )
60 if pyfhd_config["flag_model"]:
61 vis_model = flag_model_visibilities(
62 vis_model, params, params_model, obs, pyfhd_config, logger
63 )
64 else:
65 logger.warning(
66 "You have chosen not to flag the model visibilities, so PyFHD will not account for difference in time steps between the data and the model "
67 "or any flagged tiles. This may lead to incorrect calibration results if the model visibilities are not compatible with the data visibilities."
68 )
70 if pyfhd_config["save_model"]: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 model_dir = Path(pyfhd_config["output_dir"], "model")
72 model_dir.mkdir(parents=True, exist_ok=True)
73 model_file = Path(model_dir, f"{pyfhd_config['obs_id']}_vis_model.h5")
74 model = {
75 "vis_model_arr": vis_model,
76 "params": params_model,
77 }
78 save(model_file, model, "vis_model", logger=logger)
80 return vis_model
83def import_vis_model_from_sav(
84 pyfhd_config: dict, obs: dict, logger: logging.Logger
85) -> tuple[NDArray[np.complex128], dict]:
86 """
87 Read a model visibility array in from multiple IDL sav files which are in a directory
88 given by pyfhd_config['model-file-path']. The data is assumed to be in the format of
89 <obs_id>_params.sav, <obs_id>_vis_model_<pol_name>.sav. The pol_name follows the pol_names
90 in the obs dictionary which are ['XX','YY','XY','YX','I','Q','U','V'].
92 Parameters
93 ----------
94 pyfhd_config : dict
95 The PyFHD config dictionary
96 obs : dict
97 The dictionary containing observation data and metadata
98 logger : logging.Logger
99 PyFHD's logger
101 Returns
102 -------
103 vis_model_arr : NDArray[np.complex128]
104 Simulated model for the visibilities
105 params_model : dict
106 The parameters for said model used for flagging
107 """
108 try:
109 path = Path(
110 pyfhd_config["model_file_path"], f"{pyfhd_config['obs_id']}_params.sav"
111 )
112 params_model = readsav(path)
113 params_model = recarray_to_dict(params_model.params)
114 # Read in the first polarization from pol_names
115 pol_i = 0
116 path = Path(
117 pyfhd_config["model_file_path"],
118 f"{pyfhd_config['obs_id']}_vis_model_{obs['pol_names'][pol_i]}.sav",
119 )
120 curr_vis_model = readsav(path)
121 if isinstance(curr_vis_model, dict): 121 ↛ 123line 121 didn't jump to line 123 because the condition on line 121 was always true
122 curr_vis_model = curr_vis_model["vis_model_ptr"]
123 elif isinstance(curr_vis_model, np.recarray):
124 curr_vis_model = curr_vis_model.vis_model_ptr
125 # Sometimes arrays a packed in via a pointer, if so extract it
126 if curr_vis_model.size == 1: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 curr_vis_model = curr_vis_model[0]
128 curr_vis_model = curr_vis_model.transpose().astype(np.complex128)
129 # The shape should be n_pol, n_freq, n_time * n_baselines
130 vis_model_shape = [obs["n_pol"]] + list(curr_vis_model.shape)
131 vis_model_arr = np.empty(vis_model_shape, dtype=np.complex128)
132 vis_model_arr[pol_i] = curr_vis_model
133 for pol_i in range(1, obs["n_pol"]):
134 path = Path(
135 pyfhd_config["model_file_path"],
136 f"{pyfhd_config['obs_id']}_vis_model_{obs['pol_names'][pol_i]}.sav",
137 )
138 curr_vis_model = readsav(path)
139 if isinstance(curr_vis_model, dict): 139 ↛ 141line 139 didn't jump to line 141 because the condition on line 139 was always true
140 curr_vis_model = curr_vis_model["vis_model_ptr"]
141 elif isinstance(curr_vis_model, np.recarray):
142 # Should be a rec array containing one item vis_model_ptr
143 curr_vis_model = curr_vis_model.vis_model_ptr
144 # Sometimes arrays a packed in via a pointer, if so extract it
145 if curr_vis_model.size == 1: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 curr_vis_model = curr_vis_model[0]
147 vis_model_arr[pol_i] = curr_vis_model.transpose().astype(np.complex128)
148 except FileNotFoundError as e:
149 logger.error(
150 f"PyFHD failed to load in the model visibilities while trying to transfer in file: {path} as the file wasn't found. PyFHD is exiting execution"
151 )
152 exit()
153 return vis_model_arr, params_model
156def import_vis_model_from_uvfits(
157 pyfhd_config: dict, obs: dict, logger: logging.Logger
158) -> tuple[NDArray[np.complex128], dict]:
159 """Read a model visibility array in from a `uvfits` with filepath given
160 by pyfhd_config['model_file_path']. Reads data in via
161 `PyFHD.data_setup.uvfits import extract_visibilities`.
163 Parameters
164 ----------
165 pyfhd_config : dict
166 The options from argparse in a dictionary, that have been verified using
167 `PyFHD.pyfhd_tools.pyfhd_setup.pyfhd_setup`.
168 obs : dict
169 The observation dictionary as populated by `PyFHD.data_setup.obs.create_obs`
171 Returns
172 -------
173 vis_model_arr : NDArray[np.complex128]
174 Simulated model for the visibilities
175 params_model : dict
176 The parameters for said model used for flagging
177 """
179 header_model, params_data_model, _, _ = extract_header(
180 pyfhd_config, logger, model_uvfits=True
181 )
183 if header_model["n_freq"] != obs["n_freq"]: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 model_path = pyfhd_config["model_file_path"]
185 logger.error(
186 f"The obs was expecting {obs['n_freq']} frequencies, "
187 f"but the model visibilities read in from {model_path} "
188 f"contain {header_model['n_freq']} freqs. Please supply "
189 "model visibilities that match the data. Exiting now."
190 )
191 exit()
193 params_model = create_params(header_model, params_data_model, logger)
195 vis_model_arr, _ = extract_visibilities(
196 header_model, params_data_model, pyfhd_config, logger
197 )
199 return vis_model_arr, params_model
202class _FlaggingInfoCounter(object):
203 """Something to count and hold numbers to do with baselines"""
205 def __init__(self, params: dict):
206 """
207 Given a populated `params` dict (as populated by
208 `PyFHD.data_setup.uvfits.create_params`), calculate many useful quantities
209 to do with antenna (tile) names and numbers, expected number of cross and
210 auto correlations etc.
211 """
213 self.unique_times = np.unique(params["time"])
214 self.num_times = len(self.unique_times)
216 ant_names1 = np.unique(params["antenna1"])
217 ant_names2 = np.unique(params["antenna2"])
219 self.num_visis = len(params["antenna1"])
221 # If there are no auto-correlations, you don't get every unique tile
222 # in either antenna1 or antenna2, so do a unique on both of them to be sure
223 self.ant_names = np.unique(np.append(ant_names1, ant_names2))
224 self.num_ants = len(self.ant_names)
226 # indexes of the auto-correlations
227 self.auto_locs = params["antenna1"] == params["antenna2"]
228 self.num_autos = np.count_nonzero(self.auto_locs)
230 if self.num_autos == 0:
231 self.num_autos_per_time = 0
232 else:
233 self.num_autos_per_time = self.num_ants
235 # indexes of the cross-correlations
236 self.cross_locs = params["antenna1"] != params["antenna2"]
238 # how many cross-correlations there should be per time step
239 self.num_cross_per_time = int((self.num_ants * (self.num_ants - 1)) / 2)
241 # number of visibilities per time step
242 self.num_visi_per_time_step = self.num_cross_per_time + self.num_autos_per_time
244 # within a single time step, where the cross-correlations are indexed
245 # can use this while iterating over time to select the crosses only
246 self.cross_locs_per_time = np.where(
247 self.cross_locs[: self.num_visi_per_time_step]
248 )[0]
250 # within a single time step, where the cross-correlations are indexed
251 # can use this while iterating over time to select the crosses only
252 self.auto_locs_per_time = np.where(
253 self.auto_locs[: self.num_visi_per_time_step]
254 )[0]
256 self.ant1_single_time = params["antenna1"][: self.num_visi_per_time_step]
257 self.ant2_single_time = params["antenna2"][: self.num_visi_per_time_step]
260def flag_model_visibilities(
261 vis_model_arr: NDArray[np.complex128],
262 params_data: dict,
263 params_model: dict,
264 obs: dict,
265 pyfhd_config: dict,
266 logger: logging.Logger,
267) -> NDArray[np.complex128]:
268 """
269 Account for time offset and tile flags, and check that the uvfits
270 and compatible. Needs to check if auto-correlations are present
272 Parameters
273 ----------
274 vis_model_arr : NDArray[np.complex128]
275 The model visibility array from the model uvfits file
276 params_data : dict
277 The params data from the observation uvfits
278 params_model : dict
279 The params data from the model uvfits
280 obs : dict
281 The observaton dictionary containing all the data from the observation uvfits file
282 pyfhd_config : dict
283 The PyFHD configuration dictionary
284 logger : logging.Logger
285 The PyFHD logger
287 Returns
288 -------
289 vis_model_arr_flagged: NDArray[np.complex128]
290 The flagged model visibility array
291 """
293 # Calculate a number of things we'll need to compare the data to the model
294 flaginfo_data = _FlaggingInfoCounter(params_data)
295 flaginfo_model = _FlaggingInfoCounter(params_model)
297 # For all time steps in both data and model, check the minimum time offset
298 # between the two. If there is a constant minimum > 0, the times might
299 # be consistently offset because of QUACK time
300 time_offsets = []
301 for time in flaginfo_data.unique_times:
302 # convert from julian to seconds via 24*60*60
303 time_offsets.append(
304 np.min(np.abs(flaginfo_model.unique_times - time)) * (24.0 * 60 * 60)
305 )
307 # The highest time resolution data we'll be dealing with. If the offset is
308 # less than half of this, could be error in calculations rather than an
309 # actual time offset
310 min_integration = 0.5
312 # Try to ascertain is the model is offset in time from the data
313 # Only believe an offset to an accuracy of 0.25, so round to two decimal
314 # places. Here we are essentially trying to divine the QUACK time
315 rounded_offset = np.round(np.mean(time_offsets), 2)
317 # If offset is big enough to be real, but smaller than half an integration
318 # that add the `rounded_offset` (our calculated QUACK time) to the model
319 # to work out which time steps are closest between model and data
320 if ( 320 ↛ 329line 320 didn't jump to line 329 because the condition on line 320 was always true
321 rounded_offset >= min_integration / 2.0
322 and rounded_offset <= obs["time_res"] / 2.0
323 ):
324 flaginfo_model.unique_times += rounded_offset / (24.0 * 60 * 60)
325 logger.warning(
326 f"Model time stamps are offset from data by an average of {rounded_offset}. Accounting for this to match model time steps to data"
327 )
329 model_times_to_use = []
330 # For each time step in the data, find the closest time step in the model
331 for time in flaginfo_data.unique_times:
332 t_ind = np.argmin(np.abs(flaginfo_model.unique_times - time))
333 model_times_to_use.append(t_ind)
335 model_times_to_use = np.array(model_times_to_use)
337 # This means one or more time steps in the model match best to the same
338 # time step in the data. This shouldn't happen if the data and model
339 # have the same time resolution so something bad has happened
340 if flaginfo_data.num_times != len(np.unique(model_times_to_use)): 340 ↛ 342line 340 didn't jump to line 342 because the condition on line 340 was never true
342 data_path = pyfhd_config["input_path"], pyfhd_config["obs_id"] + ".uvfits"
343 model_path = (
344 str(pyfhd_config["model_file_path"]) + pyfhd_config["model_file_type"]
345 )
346 raise ValueError(
347 f"Could not match the time steps in the data uvfits: {data_path}"
348 f" and model uvfits in {model_path}. Please check the model "
349 "and try again. Exiting now."
350 )
352 # Now to flag the model - some models have no flagged tiles (antennas),
353 # whereas the data might have flagged tiles (and so missing baselines).
354 # This means we need to flag the missing tiles out of the data and
355 # reshape the model
357 # If less antennas in the model than the data, we can't calibrate the whole
358 # dataset so just error for now
359 if flaginfo_model.num_ants < flaginfo_data.num_ants: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true
360 model_path = pyfhd_config["model_file_path"] + pyfhd_config["model_file_type"]
361 raise ValueError(
362 f"There are less antennas (tiles) in the model "
363 f"{model_path} than in the data, so cannot calibrate the "
364 "whole dataset. Please check the model "
365 "and try again. Exiting now."
366 )
368 # Test to see if there are auto-correlations in data
369 if flaginfo_model.num_autos == 0:
370 logger.warning("There are no auto-correlations present in model.")
372 # This is where the fun begins - pyuvdata uses the tile number as written
373 # in the 'TILE' column in the metafits file to encode baselines (and so
374 # populate params[antenna1] and params[antenna2]). The numbers are MWA
375 # assigned, and have nothing to do with antenna index. Birli and WODEN use
376 # the tile index (one-indexed as the BASELINE encoding is one-indexed).
377 # So to match a flagged tile from the data to the model, need to work out
378 # the index of any flagged tiles in the data. All tile names are read in
379 # from the metafits
381 flag_indexes = []
383 # if the data just have tile indexes, the maximum should be the number
384 # of tiles - use that to work out if we have Birli of pyuvdata input
386 # we should have a pyuvdata input in this case as a tile name is greater
387 # than the number of tiles
388 if np.max(flaginfo_data.ant_names) > len(obs["baseline_info"]["tile_names"]): 388 ↛ 392line 388 didn't jump to line 392 because the condition on line 388 was never true
390 # Loop over all possible antenna (tile) names, and if they're not in the
391 # list of antennas in this data set, append to flag_indexes
392 for ant_ind, ant_name in enumerate(obs["baseline_info"]["tile_names"]):
393 if ant_name not in flaginfo_data.ant_names:
394 flag_indexes.append(ant_ind + 1)
396 # tiles are named by their index (1 indexed as per uvfits standard)
397 else:
398 for ant_name in range(1, len(obs["baseline_info"]["tile_names"]) + 1):
399 if ant_name not in flaginfo_data.ant_names:
400 flag_indexes.append(ant_name)
402 if len(flag_indexes) > 0: 402 ↛ 409line 402 didn't jump to line 409 because the condition on line 402 was always true
403 logger.info(
404 f"Found flagged tiles {flag_indexes} in the data, flagging from the model"
405 )
407 # This gives us true/false if the visibilities should be included
408 # for a single time step based on antenna1 and antenna2
409 include_per_time_ant1 = np.isin(
410 flaginfo_model.ant1_single_time, flag_indexes, invert=True
411 )
412 include_per_time_ant2 = np.isin(
413 flaginfo_model.ant2_single_time, flag_indexes, invert=True
414 )
416 # Doing a logic union combines info from ant1 and ant2
417 model_include_per_time = np.nonzero(include_per_time_ant1 & include_per_time_ant2)[
418 0
419 ]
421 # Check if the model has auto-correlations
422 if (flaginfo_model.num_autos) > 0:
423 data_include_per_time = np.sort(
424 np.concat(
425 [flaginfo_data.cross_locs_per_time, flaginfo_data.auto_locs_per_time]
426 )
427 )
428 else:
429 # If no auto-correlations, just use the cross-correlations, Keep the autos as zeroes
430 data_include_per_time = flaginfo_data.cross_locs_per_time
431 if flaginfo_data.num_autos > 0: 431 ↛ 438line 431 didn't jump to line 438 because the condition on line 431 was always true
432 logger.warning(
433 "The data has auto-correlations, but the model does not. "
434 "Setting the auto correlations locations to zero in the model."
435 )
437 # empty holder for the flagged model - this should be the same shape
438 vis_model_arr_flagged = np.zeros(
439 (obs["n_pol"], obs["n_freq"], flaginfo_data.num_visis), dtype=np.complex128
440 )
442 # For each time step that matches the data, copy across any visibilities
443 # that aren't to be flagged
444 for t_data_ind, t_model_ind in enumerate(model_times_to_use):
446 # Subset of cross-corrs from flagged model to select for this time step
447 t_flag_inds = (
448 t_data_ind * flaginfo_data.num_visi_per_time_step + data_include_per_time
449 )
450 # Subset of cross-corrs from full model to select for this time step
451 t_model_inds = (
452 t_model_ind * flaginfo_model.num_visi_per_time_step + model_include_per_time
453 )
454 # Stick it in the flagged model
455 vis_model_arr_flagged[:, :, t_flag_inds] = vis_model_arr[:, :, t_model_inds]
457 return vis_model_arr_flagged
460def convert_vis_model_arr_to_sav(
461 vis_model_arr: NDArray[np.complex128],
462 pyfhd_config: dict,
463 logger: logging.Logger,
464 model_vis_dir: str,
465 n_pol: int,
466):
467 """
468 Converts the contents of `vis_model_arr` into an FHD .sav file format
469 so we can import into existing IDL code with ease. First writes data to
470 `hdf5` format, then uses IDL code template to convert to IDL `.sav` format
471 compatible with FHD. Sticks the outputs into `model_vis_dir`.
473 Parameters
474 ----------
475 vis_model_arr : NDArray[np.complex128]
476 Complex array hold the model visibilities
477 pyfhd_config : dict
478 The options from argparse in a dictionary, that have been verified using
479 `PyFHD.pyfhd_tools.pyfhd_setup.pyfhd_setup`.
480 logger : logging.Logger
481 PyFHD logger to feed information to
482 model_vis_dir : str
483 Directory location to write the output files to
484 n_pol : int
485 Number of polarisations to write out (each is written to an individual)
486 `.sav` file
487 """
489 pol_names = ["XX", "YY", "XY", "YX"]
491 logger.info(
492 f"vis_model_transfer: saving {model_vis_dir}/{pyfhd_config['obs_id']}_vis_model.h5"
493 )
495 with h5py.File(f"{model_vis_dir}/{pyfhd_config['obs_id']}_vis_model.h5", "w") as hf:
497 for pol, pol_name in enumerate(pol_names[:n_pol]):
499 hf.create_dataset(
500 f"{pyfhd_config['obs_id']}_vis_model_{pol_name}",
501 data=vis_model_arr[pol].transpose(),
502 )
504 hf.close()
506 # Grab the template IDL code and transfer so people can see what code was used
507 # and modify if they want
508 model_arr_convert_pro = importlib_resources.files("PyFHD.templates").joinpath(
509 "convert_model_arr_to_sav.pro"
510 )
511 shutil.copy(model_arr_convert_pro, model_vis_dir)
513 # Move into the output directory so IDL can see all the .pro files
514 os.chdir(model_vis_dir)
516 # Run the IDL code
517 logger.info(
518 f"vis_model_transfer: converting {model_vis_dir}/{pyfhd_config['obs_id']}_vis_model.h5 to .sav format"
519 )
521 idl_command = f"idl -IDL_DEVICE ps -e convert_model_arr_to_sav -args {model_vis_dir} {pyfhd_config['obs_id']} {n_pol}"
523 run_command(idl_command, False)