Source code for pyfhd.source_modeling.vis_model_transfer

import numpy as np
from numpy.typing import NDArray
from pyfhd.data_setup.uvfits import extract_visibilities, create_params, extract_header
import logging
from pyfhd.pyfhd_tools.pyfhd_utils import run_command
from pyfhd.io.pyfhd_io import recarray_to_dict, save, load
import importlib_resources
import os
import shutil
import h5py
from scipy.io import readsav
from pathlib import Path
import sys


[docs] def vis_model_transfer( pyfhd_config: dict, obs: dict, params: dict, logger: logging.Logger ) -> tuple[NDArray[np.complex128], dict]: """ Transfer in a simulated model of the visibilities from either a sav file or uvfits file. Parameters ---------- pyfhd_config : dict pyfhd's configuration dictionary containing all the options for a pyfhd run obs : dict The Observation Metadata dictionary params : dict Visibility metadata dictionary logger : logging.Logger pyfhd's logger Returns ------- vis_model_arr : NDArray[np.complex128] Simulated model for the visibilities See Also -------- pyfhd.source_modeling.vis_model_transfer.import_vis_model_from_sav : Import model from a sav file pyfhd.source_modeling.vis_model_transfer.import_vis_model_from_uvfits : Import model from a uvfits file """ if pyfhd_config["model_file_type"] == "sav": vis_model, params_model = import_vis_model_from_sav(pyfhd_config, obs, logger) elif pyfhd_config["model_file_type"] == "uvfits": vis_model, params_model = import_vis_model_from_uvfits( pyfhd_config, obs, logger ) elif pyfhd_config["model_file_type"] == "h5": # Assume it's a pyfhd h5 file model = load(pyfhd_config["model_file_path"], logger=logger) vis_model = model["vis_model_arr"] params_model = model["params"] else: logger.error("You chose a file type pyfhd can't import, exiting") raise ValueError( f"File type {pyfhd_config['model_file_type']} not supported. Please use 'sav' or 'uvfits'." ) if pyfhd_config["flag_model"]: vis_model = flag_model_visibilities( vis_model, params, params_model, obs, pyfhd_config, logger ) else: logger.warning( "You have chosen not to flag the model visibilities, so pyfhd will not account for difference in time steps between the data and the model " "or any flagged tiles. This may lead to incorrect calibration results if the model visibilities are not compatible with the data visibilities." ) if pyfhd_config["save_model"]: model_dir = Path(pyfhd_config["output_dir"], "model") model_dir.mkdir(parents=True, exist_ok=True) model_file = Path(model_dir, f"{pyfhd_config['obs_id']}_vis_model.h5") model = { "vis_model_arr": vis_model, "params": params_model, } save(model_file, model, "vis_model", logger=logger) return vis_model
[docs] def import_vis_model_from_sav( pyfhd_config: dict, obs: dict, logger: logging.Logger ) -> tuple[NDArray[np.complex128], dict]: """ Read a model visibility array in from multiple IDL sav files which are in a directory given by pyfhd_config['model-file-path']. The data is assumed to be in the format of <obs_id>_params.sav, <obs_id>_vis_model_<pol_name>.sav. The pol_name follows the pol_names in the obs dictionary which are ['XX','YY','XY','YX','I','Q','U','V']. Parameters ---------- pyfhd_config : dict The pyfhd config dictionary obs : dict The dictionary containing observation data and metadata logger : logging.Logger pyfhd's logger Returns ------- vis_model_arr : NDArray[np.complex128] Simulated model for the visibilities params_model : dict The parameters for said model used for flagging """ fhd_subdirs = { "params": "metadata", "vis": "vis_data", } try: path = Path( pyfhd_config["model_file_path"], f"{pyfhd_config['obs_id']}_params.sav" ) if not path.exists(): path = Path( pyfhd_config["model_file_path"], fhd_subdirs["params"], f"{pyfhd_config['obs_id']}_params.sav", ) params_model = readsav(path) params_model = recarray_to_dict(params_model.params) # Read in the first polarization from pol_names pol_i = 0 vis_path_parts = [pyfhd_config["model_file_path"]] path = Path( *vis_path_parts, f"{pyfhd_config['obs_id']}_vis_model_{obs['pol_names'][pol_i]}.sav", ) if not path.exists(): vis_path_parts = [pyfhd_config["model_file_path"], fhd_subdirs["vis"]] path = Path( *vis_path_parts, f"{pyfhd_config['obs_id']}_vis_model_{obs['pol_names'][pol_i]}.sav", ) curr_vis_model = readsav(path) if isinstance(curr_vis_model, dict): curr_vis_model = curr_vis_model["vis_model_ptr"] elif isinstance(curr_vis_model, np.recarray): curr_vis_model = curr_vis_model.vis_model_ptr # Sometimes arrays a packed in via a pointer, if so extract it if curr_vis_model.size == 1: curr_vis_model = curr_vis_model[0] curr_vis_model = curr_vis_model.transpose().astype(np.complex128) # The shape should be n_pol, n_freq, n_time * n_baselines vis_model_shape = [obs["n_pol"]] + list(curr_vis_model.shape) vis_model_arr = np.empty(vis_model_shape, dtype=np.complex128) vis_model_arr[pol_i] = curr_vis_model for pol_i in range(1, obs["n_pol"]): path = Path( *vis_path_parts, f"{pyfhd_config['obs_id']}_vis_model_{obs['pol_names'][pol_i]}.sav", ) curr_vis_model = readsav(path) if isinstance(curr_vis_model, dict): curr_vis_model = curr_vis_model["vis_model_ptr"] elif isinstance(curr_vis_model, np.recarray): # Should be a rec array containing one item vis_model_ptr curr_vis_model = curr_vis_model.vis_model_ptr # Sometimes arrays a packed in via a pointer, if so extract it if curr_vis_model.size == 1: curr_vis_model = curr_vis_model[0] vis_model_arr[pol_i] = curr_vis_model.transpose().astype(np.complex128) except FileNotFoundError as e: logger.error( f"pyfhd failed to load in the model visibilities while trying to transfer in file: {path} as the file wasn't found. pyfhd is exiting execution" ) exit() return vis_model_arr, params_model
[docs] def import_vis_model_from_uvfits( pyfhd_config: dict, obs: dict, logger: logging.Logger ) -> tuple[NDArray[np.complex128], dict]: """Read a model visibility array in from a `uvfits` with filepath given by pyfhd_config['model_file_path']. Reads data in via `pyfhd.data_setup.uvfits import extract_visibilities`. Parameters ---------- pyfhd_config : dict The options from argparse in a dictionary, that have been verified using `pyfhd.pyfhd_tools.pyfhd_setup.pyfhd_setup`. obs : dict The observation dictionary as populated by `pyfhd.data_setup.obs.create_obs` Returns ------- vis_model_arr : NDArray[np.complex128] Simulated model for the visibilities params_model : dict The parameters for said model used for flagging """ header_model, params_data_model, _, _ = extract_header( pyfhd_config, logger, model_uvfits=True ) if header_model["n_freq"] != obs["n_freq"]: model_path = pyfhd_config["model_file_path"] logger.error( f"The obs was expecting {obs['n_freq']} frequencies, " f"but the model visibilities read in from {model_path} " f"contain {header_model['n_freq']} freqs. Please supply " "model visibilities that match the data. Exiting now." ) exit() params_model = create_params(header_model, params_data_model, logger) vis_model_arr, _ = extract_visibilities( header_model, params_data_model, pyfhd_config, logger ) return vis_model_arr, params_model
class _FlaggingInfoCounter(object): """Something to count and hold numbers to do with baselines""" def __init__(self, params: dict): """ Given a populated `params` dict (as populated by `pyfhd.data_setup.uvfits.create_params`), calculate many useful quantities to do with antenna (tile) names and numbers, expected number of cross and auto correlations etc. """ self.unique_times = np.unique(params["time"]) self.num_times = len(self.unique_times) ant_names1 = np.unique(params["antenna1"]) ant_names2 = np.unique(params["antenna2"]) self.num_visis = len(params["antenna1"]) # If there are no auto-correlations, you don't get every unique tile # in either antenna1 or antenna2, so do a unique on both of them to be sure self.ant_names = np.unique(np.append(ant_names1, ant_names2)) self.num_ants = len(self.ant_names) # indexes of the auto-correlations self.auto_locs = params["antenna1"] == params["antenna2"] self.num_autos = np.count_nonzero(self.auto_locs) if self.num_autos == 0: self.num_autos_per_time = 0 else: self.num_autos_per_time = self.num_ants # indexes of the cross-correlations self.cross_locs = params["antenna1"] != params["antenna2"] # how many cross-correlations there should be per time step self.num_cross_per_time = int((self.num_ants * (self.num_ants - 1)) / 2) # number of visibilities per time step self.num_visi_per_time_step = self.num_cross_per_time + self.num_autos_per_time # within a single time step, where the cross-correlations are indexed # can use this while iterating over time to select the crosses only self.cross_locs_per_time = np.where( self.cross_locs[: self.num_visi_per_time_step] )[0] # within a single time step, where the cross-correlations are indexed # can use this while iterating over time to select the crosses only self.auto_locs_per_time = np.where( self.auto_locs[: self.num_visi_per_time_step] )[0] self.ant1_single_time = params["antenna1"][: self.num_visi_per_time_step] self.ant2_single_time = params["antenna2"][: self.num_visi_per_time_step]
[docs] def flag_model_visibilities( vis_model_arr: NDArray[np.complex128], params_data: dict, params_model: dict, obs: dict, pyfhd_config: dict, logger: logging.Logger, ) -> NDArray[np.complex128]: """ Account for time offset and tile flags, and check that the uvfits and compatible. Needs to check if auto-correlations are present Parameters ---------- vis_model_arr : NDArray[np.complex128] The model visibility array from the model uvfits file params_data : dict The params data from the observation uvfits params_model : dict The params data from the model uvfits obs : dict The observaton dictionary containing all the data from the observation uvfits file pyfhd_config : dict The pyfhd configuration dictionary logger : logging.Logger The pyfhd logger Returns ------- vis_model_arr_flagged: NDArray[np.complex128] The flagged model visibility array """ # Calculate a number of things we'll need to compare the data to the model flaginfo_data = _FlaggingInfoCounter(params_data) flaginfo_model = _FlaggingInfoCounter(params_model) # For all time steps in both data and model, check the minimum time offset # between the two. If there is a constant minimum > 0, the times might # be consistently offset because of QUACK time time_offsets = [] for time in flaginfo_data.unique_times: # convert from julian to seconds via 24*60*60 time_offsets.append( np.min(np.abs(flaginfo_model.unique_times - time)) * (24.0 * 60 * 60) ) # The highest time resolution data we'll be dealing with. If the offset is # less than half of this, could be error in calculations rather than an # actual time offset # TODO: shouldn't this be derived from the time resolution of the data or model? min_integration = 0.5 # Try to ascertain is the model is offset in time from the data # Only believe an offset to an accuracy of 0.25, so round to two decimal # places. Here we are essentially trying to divine the QUACK time rounded_offset = np.round(np.mean(time_offsets), 2) # If offset is big enough to be real, but smaller than half an integration # that add the `rounded_offset` (our calculated QUACK time) to the model # to work out which time steps are closest between model and data if ( rounded_offset >= min_integration / 2.0 and rounded_offset <= obs["time_res"] / 2.0 ): flaginfo_model.unique_times += rounded_offset / (24.0 * 60 * 60) logger.warning( f"Model time stamps are offset from data by an average of {rounded_offset}. Accounting for this to match model time steps to data" ) model_times_to_use = [] # For each time step in the data, find the closest time step in the model for time in flaginfo_data.unique_times: t_ind = np.argmin(np.abs(flaginfo_model.unique_times - time)) model_times_to_use.append(t_ind) model_times_to_use = np.array(model_times_to_use) # This means one or more time steps in the model match best to the same # time step in the data. This shouldn't happen if the data and model # have the same time resolution so something bad has happened if flaginfo_data.num_times != len(np.unique(model_times_to_use)): data_path = pyfhd_config["input_path"], pyfhd_config["obs_id"] + ".uvfits" model_path = ( str(pyfhd_config["model_file_path"]) + pyfhd_config["model_file_type"] ) raise ValueError( f"Could not match the time steps in the data uvfits: {data_path}" f" and model uvfits in {model_path}. Please check the model " "and try again. Exiting now." ) # Now to flag the model - some models have no flagged tiles (antennas), # whereas the data might have flagged tiles (and so missing baselines). # This means we need to flag the missing tiles out of the data and # reshape the model # If less antennas in the model than the data, we can't calibrate the whole # dataset so just error for now if flaginfo_model.num_ants < flaginfo_data.num_ants: model_path = pyfhd_config["model_file_path"] + pyfhd_config["model_file_type"] raise ValueError( f"There are less antennas (tiles) in the model " f"{model_path} than in the data, so cannot calibrate the " "whole dataset. Please check the model " "and try again. Exiting now." ) # Test to see if there are auto-correlations in data if flaginfo_model.num_autos == 0: logger.warning("There are no auto-correlations present in model.") # This is where the fun begins - pyuvdata uses the tile number as written # in the 'TILE' column in the metafits file to encode baselines (and so # populate params[antenna1] and params[antenna2]). The numbers are MWA # assigned, and have nothing to do with antenna index. Birli and WODEN use # the tile index (one-indexed as the BASELINE encoding is one-indexed). # So to match a flagged tile from the data to the model, need to work out # the index of any flagged tiles in the data. All tile names are read in # from the metafits flag_indexes = [] # if the data just have tile indexes, the maximum should be the number # of tiles - use that to work out if we have Birli of pyuvdata input # we should have a pyuvdata input in this case as a tile name is greater # than the number of tiles if np.max(flaginfo_data.ant_names) > len(obs["baseline_info"]["tile_names"]): # Loop over all possible antenna (tile) names, and if they're not in the # list of antennas in this data set, append to flag_indexes for ant_ind, ant_name in enumerate(obs["baseline_info"]["tile_names"]): if ant_name not in flaginfo_data.ant_names: flag_indexes.append(ant_ind + 1) # tiles are named by their index (1 indexed as per uvfits standard) else: for ant_name in range(1, len(obs["baseline_info"]["tile_names"]) + 1): if ant_name not in flaginfo_data.ant_names: flag_indexes.append(ant_name) if len(flag_indexes) > 0: logger.info( f"Found flagged tiles {flag_indexes} in the data, flagging from the model" ) # This gives us true/false if the visibilities should be included # for a single time step based on antenna1 and antenna2 include_per_time_ant1 = np.isin( flaginfo_model.ant1_single_time, flag_indexes, invert=True ) include_per_time_ant2 = np.isin( flaginfo_model.ant2_single_time, flag_indexes, invert=True ) # Doing a logic union combines info from ant1 and ant2 model_include_per_time = np.nonzero(include_per_time_ant1 & include_per_time_ant2)[ 0 ] # Check if the model has auto-correlations if (flaginfo_model.num_autos) > 0: data_include_per_time = np.sort( np.concat( [flaginfo_data.cross_locs_per_time, flaginfo_data.auto_locs_per_time] ) ) else: # If no auto-correlations, just use the cross-correlations, Keep the autos as zeroes data_include_per_time = flaginfo_data.cross_locs_per_time if flaginfo_data.num_autos > 0: logger.warning( "The data has auto-correlations, but the model does not. " "Setting the auto correlations locations to zero in the model." ) # empty holder for the flagged model - this should be the same shape vis_model_arr_flagged = np.zeros( (obs["n_pol"], obs["n_freq"], flaginfo_data.num_visis), dtype=np.complex128 ) # For each time step that matches the data, copy across any visibilities # that aren't to be flagged for t_data_ind, t_model_ind in enumerate(model_times_to_use): # Subset of cross-corrs from flagged model to select for this time step t_flag_inds = ( t_data_ind * flaginfo_data.num_visi_per_time_step + data_include_per_time ) # Subset of cross-corrs from full model to select for this time step t_model_inds = ( t_model_ind * flaginfo_model.num_visi_per_time_step + model_include_per_time ) # Stick it in the flagged model vis_model_arr_flagged[:, :, t_flag_inds] = vis_model_arr[:, :, t_model_inds] return vis_model_arr_flagged
[docs] def convert_vis_model_arr_to_sav( vis_model_arr: NDArray[np.complex128], pyfhd_config: dict, logger: logging.Logger, model_vis_dir: str, n_pol: int, ): """ Converts the contents of `vis_model_arr` into an FHD .sav file format so we can import into existing IDL code with ease. First writes data to `hdf5` format, then uses IDL code template to convert to IDL `.sav` format compatible with FHD. Sticks the outputs into `model_vis_dir`. Parameters ---------- vis_model_arr : NDArray[np.complex128] Complex array hold the model visibilities pyfhd_config : dict The options from argparse in a dictionary, that have been verified using `pyfhd.pyfhd_tools.pyfhd_setup.pyfhd_setup`. logger : logging.Logger pyfhd logger to feed information to model_vis_dir : str Directory location to write the output files to n_pol : int Number of polarisations to write out (each is written to an individual) `.sav` file """ pol_names = ["XX", "YY", "XY", "YX"] logger.info( f"vis_model_transfer: saving {model_vis_dir}/{pyfhd_config['obs_id']}_vis_model.h5" ) with h5py.File(f"{model_vis_dir}/{pyfhd_config['obs_id']}_vis_model.h5", "w") as hf: for pol, pol_name in enumerate(pol_names[:n_pol]): hf.create_dataset( f"{pyfhd_config['obs_id']}_vis_model_{pol_name}", data=vis_model_arr[pol].transpose(), ) hf.close() # Grab the template IDL code and transfer so people can see what code was used # and modify if they want model_arr_convert_pro = importlib_resources.files("pyfhd.templates").joinpath( "convert_model_arr_to_sav.pro" ) shutil.copy(model_arr_convert_pro, model_vis_dir) # Move into the output directory so IDL can see all the .pro files os.chdir(model_vis_dir) # Run the IDL code logger.info( f"vis_model_transfer: converting {model_vis_dir}/{pyfhd_config['obs_id']}_vis_model.h5 to .sav format" ) idl_command = f"idl -IDL_DEVICE ps -e convert_model_arr_to_sav -args {model_vis_dir} {pyfhd_config['obs_id']} {n_pol}" run_command(idl_command, False)