import numpy as np
from numpy.typing import NDArray
from logging import Logger
from PyFHD.pyfhd_tools.pyfhd_utils import (
resistant_mean,
weight_invert,
rebin,
histogram,
)
from copy import deepcopy
from astropy.io import fits
from astropy.constants import c
from pathlib import Path
from scipy.ndimage import uniform_filter
import importlib_resources
[docs]
def vis_cal_auto_init(
obs: dict,
cal: dict,
vis_arr: NDArray[np.complex128],
vis_model_arr: NDArray[np.complex128],
vis_auto: NDArray[np.float64],
vis_auto_model: NDArray[np.float64],
auto_tile_i: NDArray[np.integer],
) -> NDArray[np.complex128]:
"""
Initialize the calibration solutions using the autocorrelations prior to the linear least squares fit
for faster convergence. The auto-correlations and cross-correlations have separate formulisms for the
initialization.
Autos -- Square root of "Ratio of data autos to model autos" x "overall mean for both crosses and
autos of ratio of data to model" / "overall mean of ratio of data autos to model autos, with 1's
for crosses", results in a per-frequency, per-baseline gain scaling.
Crosses -- Square root of "overall mean for both crosses and autos of ratio of data to model" /
"overall mean of ratio of data autos to model autos, with 1's for crosses", results in a single
number gain scaling.
Parameters
----------
obs : dict
Observation metadata dictionary
cal : dict
Calibration dictionary
vis_arr : NDArray[np.complex128]
Uncalibrated data visiblities
vis_model_arr : NDArray[np.complex128]
Simulated model visibilites
vis_auto : NDArray[np.float64]
Data auto-correlations
vis_auto_model : NDArray[np.float64]
Simulated model auto-correlations
auto_tile_i : NDArray[np.integer]
Index array for auto-correlation visibilities
Returns
-------
auto_gain : NDArray[np.complex128]
Initialization gain array
"""
# Set out the
auto_scale = np.zeros((cal["n_pol"], obs["n_freq"], obs["n_tile"]))
auto_gain = np.ones(
(cal["n_pol"], obs["n_freq"], obs["n_tile"]), dtype=np.complex128
)
freq_i_use = np.where(obs["baseline_info"]["freq_use"])[0]
for pol_i in range(cal["n_pol"]):
res_mean_data = resistant_mean(np.abs(vis_arr[pol_i, freq_i_use, :]), 2)
res_mean_model = resistant_mean(np.abs(vis_model_arr[pol_i, freq_i_use, :]), 2)
auto_scale[pol_i] = np.sqrt(res_mean_data / res_mean_model)
auto_gain = np.ones(
(cal["n_pol"], obs["n_freq"], obs["n_tile"]), dtype=np.complex128
)
# Would love to get rid of the loop altogether, haven't figurted it out yet
for pol_i in range(cal["n_pol"]):
auto_gain[pol_i, :, :][:, auto_tile_i] = np.sqrt(
vis_auto[pol_i, :, :] * weight_invert(vis_auto_model[pol_i, :, :])
)
auto_gain[pol_i, :, :] *= auto_scale[pol_i] * weight_invert(
np.mean(auto_gain[pol_i, :, :])
)
auto_gain[pol_i, :, :][np.isnan(auto_gain[pol_i, :, :])] = 1
auto_gain[pol_i, :, :][auto_gain[pol_i, :, :] <= 0] = 1
return auto_gain
[docs]
def vis_calibration_flag(
obs: dict, cal: dict, pyfhd_config: dict, logger: Logger
) -> dict:
"""
Flag tile and frequency outliers based on the calibration solutions. First, iteratively flag a maximum of three
times on amplitude with three tests: 1) flag frequencies above 5 sigma, 2) flag tiles above 5 sigma, and 3) flag
tiles either 2x lower or 2x higher than average. Second, iteratively flag a maximum of three times on phase with
two tests: 1) flag tiles with slopes above 5 sigma, and 2) flag tiles with per-frequency deviations from their slope
above 5 sigma.
Parameters
----------
obs : dict
Observation metadata dictionary
cal : dict
Calibration dictionary
pyfhd_config : dict
PyFHD's configuration dictionary containing all the options set for a PyFHD run
logger : Logger
PyFHD's logger for displaying errors and info to the log files
Returns
-------
obs: dict
Observation metadata dictionary
"""
amp_sigma_threshold = 5
amp_threshold = 2
phase_sigma_threshold = 5
freq_use = obs["baseline_info"]["freq_use"].copy()
for pol_i in range(cal["n_pol"]):
tile_use_i = np.nonzero(obs["baseline_info"]["tile_use"])[0]
freq_use_i = np.nonzero(freq_use)[0]
gain = cal["gain"][pol_i]
phase = np.arctan2(gain.imag, gain.real)
amp = np.abs(gain)
# first flag based on overall amplitude
# extract_subarray is not being used as it was FHD's way of taking the fact that
# IDL's indexing can be weird and won't allow to index the result
# use np.where on gain, which will be multidimensional as well
amp_sub = amp[:, tile_use_i][freq_use_i, :]
gain_freq_fom = np.std(amp_sub, axis=1)
# Calculate the y values from a polynomial fit and keep the standard deviation of the error in gain_tile_fom
amp_sub_fit = np.zeros_like(amp_sub)
# Polynomial polyval can only take 1d arrays and doesn't vectorize across a 2d array.
for tile_i in range(tile_use_i.size):
amp_sub_fit[:, tile_i] = np.polynomial.polynomial.polyval(
freq_use_i,
np.polynomial.Polynomial.fit(
freq_use_i,
amp_sub[:, tile_i],
deg=pyfhd_config["cal_amp_degree_fit"],
)
.convert()
.coef,
)
# Get standard deviation of the residuals on a per tile basis
gain_tile_fom = np.std(amp_sub - amp_sub_fit, axis=0)
# Get the median of every tile, why is it call avg?
gain_tile_avg = np.median(amp_sub, axis=0)
gain_freq_fom[np.isnan(gain_freq_fom)] = 0
gain_tile_fom[np.isnan(gain_tile_fom)] = 0
freq_cut_i = np.where(gain_freq_fom == 0)[0]
freq_uncut_i = np.nonzero(gain_freq_fom)[0]
if freq_cut_i.size > 0:
freq_use[freq_use_i][freq_cut_i] = 0
tile_cut_i = np.where(gain_tile_fom == 0)[0]
tile_uncut_i = np.nonzero(gain_tile_fom)[0]
if tile_cut_i.size > 0:
obs["baseline_info"]["tile_use"][tile_use_i][tile_cut_i] = 0
if freq_uncut_i.size == 0 or tile_uncut_i.size == 0:
logger.error(
"The frequency and tile flagging inside calibration found some values not detected in previous flagging or calibration"
)
n_addl_cut = max(freq_cut_i.size + tile_cut_i.size, 1)
n_cut = freq_cut_i.size + tile_cut_i.size
iter = 0
while n_addl_cut > 0 and iter < 3:
gain_freq_sigma = np.std(gain_freq_fom[freq_uncut_i])
gain_tile_sigma = np.std(gain_tile_fom[tile_uncut_i])
freq_cut_i = np.where(
(
gain_freq_fom
- np.median(gain_freq_fom[freq_uncut_i])
- amp_sigma_threshold * gain_freq_sigma
)
> 0
)[0]
# Update the complement of freq_cut_i (the NOT) i.e. freq_uncut_i
freq_uncut_i = freq_uncut_i[~np.isin(freq_uncut_i, freq_cut_i)]
tile_cut_test1 = (
gain_tile_fom
- np.median(gain_tile_fom[tile_uncut_i])
- amp_sigma_threshold * gain_tile_sigma
) > 0
tile_cut_test2 = (
gain_tile_avg < np.median(gain_tile_avg) / amp_threshold
) | (gain_tile_avg > np.median(gain_tile_avg) * amp_threshold)
tile_cut_i = np.where(tile_cut_test1 | tile_cut_test2)[0]
# Update the complement of tile_cut_i (the NOT) i.e. tile_uncut_i
tile_uncut_i = tile_uncut_i[~np.isin(tile_uncut_i, tile_cut_i)]
n_addl_cut = (freq_cut_i.size + tile_cut_i.size) - n_cut
n_cut = freq_cut_i.size + tile_cut_i.size
iter += 1
if (freq_cut_i.size) > 0:
freq_use[freq_use_i[freq_cut_i]] = 0
if (tile_cut_i.size) > 0:
obs["baseline_info"]["tile_use"][tile_use_i[tile_cut_i]] = 0
# Reset freq_use_i and tile_use_i for flagging based on phase
tile_use_i = np.nonzero(obs["baseline_info"]["tile_use"])[0]
freq_use_i = np.nonzero(freq_use)[0]
# Start flagging based on phase
phase_sub = phase[:, tile_use_i][freq_use_i, :]
phase_slope_arr = np.empty(tile_use_i.size)
phase_sigma_arr = np.empty(tile_use_i.size)
for tile_i in range(tile_use_i.size):
phase_use = np.unwrap(phase_sub[:, tile_i])
phase_params = np.polynomial.polynomial.Polynomial.fit(
freq_use_i, phase_use, deg=pyfhd_config["cal_phase_degree_fit"]
)
phase_params = phase_params.convert().coef
phase_fit = np.polynomial.polynomial.polyval(freq_use_i, phase_params)
phase_sigma2 = np.std(phase_use - phase_fit)
# In an unusual scenario sometimes you'll get an all 0 array to fit on, which gives only a zero back for the fit
phase_slope_arr[tile_i] = phase_params[1] if phase_params.size > 1 else 0
phase_sigma_arr[tile_i] = phase_sigma2
iter = 0
n_addl_cut = 1
n_cut = 0
while n_addl_cut > 0 and iter < 3:
slope_sigma = np.nanstd(phase_slope_arr)
tile_cut_test1 = (
np.abs(phase_slope_arr) - np.median(np.abs(phase_slope_arr))
) > phase_sigma_threshold * slope_sigma
tile_cut_test2 = (phase_sigma_arr - np.median(phase_sigma_arr)) > (
phase_sigma_threshold * np.nanstd(phase_sigma_arr)
)
tile_cut_i = np.where(tile_cut_test1 | tile_cut_test2)[0]
n_addl_cut = tile_cut_i.size - n_cut
n_cut = tile_cut_i.size
iter += 1
if tile_cut_i.size > 0:
obs["baseline_info"]["tile_use"][tile_use_i[tile_cut_i]] = 0
# If we decide not to flag the frequencies, ignore any frequency flagging
if pyfhd_config["flag_calibration_frequencies"]:
obs["baseline_info"]["freq_use"] = freq_use
# Return the obs with an updated baseline_info on the use of tiles and frequency
return obs
[docs]
def transfer_bandpass(
obs: dict, cal: dict, pyfhd_config: dict, logger: Logger
) -> tuple[dict, dict]:
"""
Apply a previously saved bandpass via a calfits file (github:pyuvdata). Check adherance to standards,
and match the polarizations, frequencies, timing, pointings, and tiles.
Parameters
----------
obs : dict
Observation metadata dictionary
cal : dict
Calibration dictionary
pyfhd_config : dict
PyFHD's configuration dictionary containing all the options set for a PyFHD run
logger : Logger
PyFHD's logger for displaying errors and info to the log files
Returns
-------
cal_bandpass : dict
Calibration dictionary with bandpass gains
cal_remainder : dict
calibration dictionary with residuals after removing the bandpass gains
"""
cal_bandpass = {}
try:
calfits = fits.open(
Path(pyfhd_config["input_path"], pyfhd_config["cal_bp_transfer"])
)
# Get the data
data_array = calfits[0].data
# Read in the header
naxis = calfits[0].header["naxis"]
n_jones = calfits[0].header["njones"]
if "delay" in calfits[0].header["caltype"]:
raise RuntimeWarning(
"Input Delay calibration not supported at this time, skipping calibration bandpass transfer."
)
time_integration = calfits[0].header["inttime"]
freq_channel_width = calfits[0].header["chwidth"]
x_orient = calfits[0].header["xorient"].strip()
data_dims = np.empty(naxis, dtype=np.integer)
data_types = naxis * [""]
for naxis_i in range(naxis):
# Get the dimensions of the data
data_dims[naxis_i] = calfits[0].header[f"naxis{naxis_i + 1}"]
# Get the ctypes
data_types[naxis_i] = (
calfits[0].header[f"ctype{naxis_i + 1}"].lower().strip()
)
data_types = np.array(data_types)
# Get the indexes for the FITS standard checks
data_index = np.nonzero("narrays" == data_types)[0][0]
ant_index = np.nonzero("antaxis" == data_types)[0][0]
freq_index = np.nonzero("freqs" == data_types)[0][0]
time_index = np.nonzero("time" == data_types)[0][0]
jones_index = np.nonzero("jones" == data_types)[0][0]
# Deal with spec_wind_index separately as default highband file doesn't have this in
# May cause issues with other fits files, adjust the code then.
spec_wind_index = np.nonzero("if" == data_types)[0]
if spec_wind_index.size == 0:
spec_wind_index = -1
# Check the indexes given to see if they match standards, if not raise exception
if (
data_index != 0
or ant_index != 4
or freq_index != 3
or time_index != 2
or jones_index != 1
):
if (
data_index == 0
and ant_index == 5
and freq_index == 3
and time_index == 2
and jones_index == 1
and spec_wind_index == 4
):
logger.info("Calfits adheres to the Fall 2018 pyuvdata convention")
if calfits[0].header["naxis5"] != 1:
raise RuntimeWarning(
"Calfits file includes more than one spectral window. Note that this feature is not yet supported in PyFHD."
)
# Remove spectral window dimension for compatibility
data_array = np.mean(data_array, axis=0)
else:
raise RuntimeWarning(
"Calfits file does not appear to adhere to standard. Please see github:pyuvdata/docs/references"
)
freq_start = calfits[0].header[f"crval{freq_index + 1}"]
time_start = calfits[0].header[f"crval{time_index + 1}"]
time_delt = calfits[0].header[f"cdelt{time_index + 1}"]
jones_start = calfits[0].header[f"crval{jones_index + 1}"]
jones_delt = calfits[0].header[f"cdelt{jones_index + 1}"]
n_ant_data = data_array.shape[0]
n_freq = data_array.shape[1]
n_time = data_array.shape[2]
# Check whether the number of polarizations specified matches the observation analysis run
jones_type_matrix = np.zeros(data_dims[1])
for jones_i in range(1, data_dims[1]):
jones_type_matrix[jones_i - 1] = jones_start + (jones_delt * jones_i)
if data_dims[1] > obs["n_pol"]:
logger.warning(
"More polarizations in calibration fits file than in observation analysis. Reducing calibration to match obs."
)
data_dims[1] = obs["n_pol"]
jones_type_matrix = jones_type_matrix[0 : obs["n_pol"]]
data_array = data_array[:, :, :, 0 : obs["n_pol"], :]
elif data_dims[1] < obs["n_pol"]:
raise RuntimeWarning(
"Not enough polarizations defined in the calibration fits file."
)
# Switch the pol convention to FHD standard if necessary
if x_orient == "north" or x_orient == "south":
data_array[:, :, :, 0, :], data_array[:, :, :, 1, :] = (
data_array[:, :, :, 1, :],
data_array[:, :, :, 0, :],
)
if obs["n_pol"] > 2:
data_array[:, :, :, 2, :], data_array[:, :, :, 3, :] = (
data_array[:, :, :, 3, :],
data_array[:, :, :, 2, :],
)
# Check to see if the calibraton and observation frequency resolution match
if freq_channel_width != obs["freq_res"]:
freq_factor = obs["freq_res"] / freq_channel_width
if freq_factor >= 1:
logic_test = freq_factor - np.floor(freq_factor)
else:
logic_test = 1 / freq_factor - np.floor(1 / freq_factor)
if logic_test != 0:
raise RuntimeWarning(
f"Calfits input freq channel width is not easily castable to the observation, different by a factor of {freq_factor}"
)
if freq_start != obs["baseline_info"]["freq"][0]:
raise RuntimeWarning(
"Calfits input freq start is not equal to observation freq start"
)
# Downselect the data array
if freq_factor > 1:
logger.warning(
f"Calfits input freq channel width is different by a factor of {freq_factor}. Avergaing Down."
)
# Set flagged indices to NAN to remove them from mean calculation
flag_inds = np.where(np.abs(np.squeeze(data_array[:, :, :, :, 2])) == 1)
data_array[flag_inds][0] = np.nan
data_array[flag_inds][1] = np.nan
for channel_i in range(obs["n_freq"]):
data_array[:, channel_i, :, :, :] = np.nanmean(
data_array[
:,
max(
channel_i * freq_factor - np.floor(freq_factor / 2), 0
) : channel_i
* freq_factor
+ np.floor(freq_factor / 2.0),
:,
:,
:,
]
)
data_array = data_array[:, 0 : obs["n_freq"], :, :, :]
elif freq_factor < 1:
logger.warning(
f"Calfits input freq channel width is different by a factor of {freq_factor}. Using linear interpolation"
)
# The IDL code has 5 nested loops, and I can't think of the vectorization right now in a reasonable ampount of time
# Someone please vectorize laster if you can
data_array_temp = np.zeros(
(data_dims[4], obs["n_freq"], n_time, n_jones, 2)
)
for data_i in range(2):
for jones_i in range(n_jones):
for times_i in range(n_time):
for tile_i in range(data_dims[4]):
for channel_i in range(obs["n_freq"] - 1):
start_idx = int(channel_i * (1.0 / freq_factor))
end_idx = int((channel_i + 1) * (1.0 / freq_factor))
data_array_temp[
tile_i,
start_idx:end_idx,
times_i,
jones_i,
data_i,
] = np.interp(
np.arange(start_idx, end_idx, 1.0),
np.arange(channel_i, channel_i + 1, 1.0),
data_array[
data_i,
jones_i,
times_i,
channel_i : channel_i + 1,
tile_i,
],
)
data_array_temp[:, obs["n_freq"] - 1, :, :, :] = data_array[
:, n_freq - 1, :, :, :
]
data_array = np.copy(data_array_temp)
# Check to see what time range this needs to be applied to, and if pointings are necessary
if n_time != 1:
sec_upperlimit = 2000
sec_lowerlimit = 1600
if (
time_integration < sec_upperlimit and time_integration > sec_lowerlimit
) or (time_delt < sec_upperlimit and time_delt > sec_lowerlimit):
# Calibration fits are per pointing
# Keep all the delay patterns in a dictionary
delay_patterns = {
(-5): [0, 5, 10, 15, 1, 6, 11, 16, 2, 7, 12, 17, 3, 8, 13, 18],
(-4): [0, 4, 8, 12, 1, 5, 9, 13, 2, 6, 10, 14, 3, 7, 11, 15],
(-3): [0, 3, 6, 9, 0, 3, 6, 9, 0, 3, 6, 9, 0, 3, 6, 9],
(-2): [0, 2, 4, 6, 0, 2, 4, 6, 0, 2, 4, 6, 0, 2, 4, 6],
(-1): [0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3],
(0): [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
(5): [15, 10, 5, 0, 16, 11, 6, 1, 17, 12, 7, 2, 18, 13, 8, 3],
(4): [12, 8, 4, 0, 13, 9, 5, 1, 14, 10, 6, 2, 15, 11, 7, 3],
(3): list(
reversed([0, 3, 6, 9, 0, 3, 6, 9, 0, 3, 6, 9, 0, 3, 6, 9])
),
(2): list(
reversed([0, 2, 4, 6, 0, 2, 4, 6, 0, 2, 4, 6, 0, 2, 4, 6])
),
(1): list(
reversed([0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3])
),
}
obs_pointing = None
for pointing, pattern in delay_patterns.items():
if np.array_equal(obs["delays"], pattern):
obs_pointing = pointing
if obs_pointing is None:
raise RuntimeWarning(
"Pointing number not within 5 pointings around zenith."
)
obs_julian_date = obs["astr"]["mjdobs"] + 2400000.5
# Find corresponding in the calfits
# Number of days since Auguest 23 2013
days_since_ref = np.floor(obs_julian_date) - np.floor(time_start)
# pointing start shift amount depending on how many days since ref
obs_pointing_shift_since_ref = ((24 - 23.9344699) / 24) * days_since_ref
# pointing start time for HH:MM:SS on Aug23 (in JD) plus comparible pointing start time for reference, using calculated shift
pointing_jdhms_ref = (
np.array(
[
0.13611,
0.15157,
0.17565,
0.19963,
0.22222,
0.24342593,
0.26453705,
0.28574074,
0.30694,
0.33657407,
]
)
+ obs_pointing_shift_since_ref
)
pointing_num_ref = [-5, -4, -3, -2, -1, 0, 1, 2, 3, 4]
# pointing start time for HH:MM:SS for calfits day (in JD)
pointing_jdhms_calfits = time_start - np.floor(time_start)
# find the closest index between reference and calfits
pointing_calfits_index = np.argmin(
np.abs(pointing_jdhms_ref - pointing_jdhms_calfits)
)
# make sure the index is greater than the pointing start time
if pointing_jdhms_calfits < pointing_jdhms_ref[pointing_calfits_index]:
pointing_calfits_index -= 1
if (
pointing_jdhms_calfits
> (
np.max(pointing_jdhms_ref)
+ (pointing_jdhms_ref[1] - pointing_jdhms_ref[0])
)
) or (
pointing_jdhms_calfits
< (
np.min(pointing_jdhms_ref)
- (pointing_jdhms_ref[1] - pointing_jdhms_ref[0])
)
):
raise RuntimeWarning(
"Calfits does not start between five pointings before zenith and four pointings after zenith. Not suitable for pointing cal at this time."
)
# find which pointing is the start of the calfits data
pointing_calfits_start = pointing_num_ref[pointing_calfits_index]
if obs_pointing < pointing_calfits_start:
raise RuntimeWarning(
"Calfits file does not contain pointing of observation"
)
# select pointing index in calfits that matches observation
obs_pointing_index = abs(pointing_calfits_start - obs_pointing)
# choose the corresponding pointing from the calfits data array
data_array = data_array[:, :, obs_pointing_index, :, :]
elif np.floor(time_delt) == np.floor(obs["time_res"]):
# Calibration fits are per-timeres
logger.info(
"Averaging calfits to observation length, an FHD requirement at this time."
)
data_array_temp = np.zeros(
[data_dims[4], obs.n_freq, 1, data_dims[1], data_dims[0]]
)
data_array_temp[:, :, 0, :, :] = np.mean(data_array, axis=2)
data_array = np.copy(data_array_temp)
else:
# Calibration fits are for a random set of times
logger.info(
"Finding closest match in time between calfits and obs. Obs metadata assumed to report start time, calfits metadata assumed to report center time."
)
time_delta = time_integration / (60 * 60 * 24)
time_array = np.full(n_time, time_start + time_delta)
obs_julian_date = (
obs["astr"]["mjdobs"]
+ 2400000.5
+ ((obs["n_time"] * obs["time_res"]) / (2 * 60 * 60 * 24))
)
if (obs_julian_date < time_array[0] - 2 * time_delta) or (
obs_julian_date > time_array[-1] + 2 * time_delta
):
raise RuntimeWarning(
"Observation does not seem to fit within the time frame of the calfits"
)
# find the closest index between calfits and observation
time_index = np.argmin(np.abs(obs_julian_date - time_array))
data_array = data_array[:, :, time_index, :, :]
# Check number of tiles
if n_ant_data != obs["n_tile"]:
raise RuntimeWarning(
"Number of antennas in calfits file does match observation antenna number"
)
# Now that the checks are done, return the cal structure
cal_bandpass["n_pol"] = min(obs["n_pol"], 2)
cal_bandpass["conv_thresh"] = 1e-7
cal_bandpass["gain"] = np.full(
(cal_bandpass["n_pol"], obs["n_tile"], obs["n_freq"]),
pyfhd_config["cal_gain_init"],
)
cal_bandpass["gain"][0 : obs["n_pol"], :, :] = np.squeeze(
data_array[:, :, 0, 0 : obs["n_pol"], 0]
) + 1j * np.squeeze(data_array[:, :, 0, 0 : obs["n_pol"], 1])
logger.info("Calfits File has been read and cal_bandpass has been created")
except FileNotFoundError as e:
logger.error(
f"{pyfhd_config['cal_bp_transfer']} file wasn't found, skipping calibration bandpass transfer"
)
return {}, {}
except RuntimeWarning as e:
logger.error(e)
return {}, {}
cal_remainder = deepcopy(cal)
cal_remainder["gain"][0 : cal["n_pol"], :, :] = (
cal["gain"][0 : cal["n_pol"], :, :]
/ cal_bandpass["gain"][0 : cal["n_pol"], :, :]
)
return cal_bandpass, cal_remainder
[docs]
def vis_cal_bandpass(
obs: dict, cal: dict, pyfhd_config: dict, logger: Logger
) -> tuple[dict, dict]:
"""
Reduce the degrees of freedom on the per-frequency calibration amplitudes by averaging solutions
together. Options include averaging over tiles which use a particular beamformer-to-receiver cable
lengths/types, or averaging over all tiles for a global bandpass.
Parameters
----------
obs : dict
Observation metadata dictionary
cal : dict
Calibration dictionary
pyfhd_config : dict
PyFHD's configuration dictionary containing all the options set for a PyFHD run
logger : Logger
PyFHD's logger for displaying errors and info to the log files
Returns
-------
cal_bandpass : dict
Calibration dictionary with bandpass gains
cal_remainder : dict
calibration dictionary with residuals after removing the bandpass gains
"""
freq_use = np.nonzero(obs["baseline_info"]["freq_use"])[0]
tile_use = np.nonzero(obs["baseline_info"]["tile_use"])[0]
n_pol = cal["gain"].shape[0]
# Set a flag for global bandpass, will turn true if too many tiles are flagged
global_bandpass = False
# Initialize cal_bandpass and cal_remainder and transfer them in, if a file has been set (fits only supported right now)
if pyfhd_config["cal_bp_transfer"] is not None:
cal_bandpass, cal_remainder = transfer_bandpass(obs, cal, pyfhd_config, logger)
if len(cal_bandpass.keys()) != 0 and len(cal_remainder.keys()) != 0:
logger.info(
f"Calibration Bandpass FITS file {pyfhd_config['cal_bp_transfer']} transferred in for cal_bandpass and cal_remainder"
)
return cal_bandpass, cal_remainder
cal_bandpass = deepcopy(cal)
cal_remainder = deepcopy(cal)
cal_bandpass_gain = np.empty(cal["gain"].shape, dtype=np.complex128)
cal_remainder_gain = np.empty(cal["gain"].shape, dtype=np.complex128)
if pyfhd_config["cable_bandpass_fit"]:
# Using preexisting file to extract information about which tiles have which cable length
# cable_len = np.loadtxt(Path(pyfhd_config["input"], pyfhd_config["cable-reflection-coefficients"]), skiprows=1)[:, 2].flatten()
cable_len_filepath = importlib_resources.files(
"PyFHD.resources.instrument_config"
).joinpath(f"{pyfhd_config['instrument']}_cable_reflection_coefficients.txt")
cable_len = np.loadtxt(cable_len_filepath, skiprows=1)[:, 2].flatten()
# Taking tile information and cross-matching it with the nonflagged tiles array, resulting in nonflagged tile arrays grouped by cable length
cable_length_ref = np.unique(cable_len)
tile_use_arr = [0] * cable_length_ref.size
for cable_i in range(cable_length_ref.size):
tile_use_arr[cable_i] = np.where(
(obs["baseline_info"]["tile_use"])
& (cable_len == cable_length_ref[cable_i])
)[0]
if tile_use_arr[cable_i].size == 0:
logger.warning(
"Too Many flagged tiles to implement bandpass cable averaging, using global bandpass."
)
global_bandpass = True
# n_freq x 13 array. columns are frequency, 90m xx, 90m yy, 150m xx, 150m yy, 230m xx, 230m yy, 320m xx, 320m yy, 400m xx, 400m yy, 524m xx, 524m yy
bandpass_arr = np.zeros(
(obs["n_freq"], cal["n_pol"] * cable_length_ref.size + 1)
)
bandpass_arr[:, 0] = obs["baseline_info"]["freq"]
bandpass_col_count = 1
if pyfhd_config["auto_ratio_calibration"]:
logger.info("auto_ratio_calibration is set, using global bandpass")
global_bandpass = True
for cable_i in range(cable_length_ref.size):
# This is an option to calibrate over all tiles to find the 'global' bandpass. It will be looped over by the number
# of cable lengths, and will redo the same calculation everytime. It is inefficient, but effective.
if global_bandpass:
tile_use_cable = tile_use
else:
tile_use_cable = tile_use_arr[cable_i]
for pol_i in range(cal["n_pol"]):
gain = cal["gain"][pol_i]
# gain2 is a temporary variable used in place of the gain array for an added layer of safety
if cable_i == 0 and pol_i == 0:
gain2 = np.zeros(cal["gain"].shape, dtype=np.complex128)
# Only use gains from unflagged tiles and frequencies, and calculate the amplitude and phase
gain_use = gain[freq_use, :][:, tile_use_cable]
amp = np.abs(gain_use)
# amp2 is a temporary variable used in place of the amp array for an added layer of safety
amp2 = np.zeros((freq_use.size, tile_use_cable.size))
# This is the normalization loop for each tile. If the mean of gain amplitudes over all frequencies is nonzero, then divide
# the gain amplitudes by that number, otherwise make the gain amplitudes zero.
for tile_i in range(tile_use_cable.size):
res_mean = resistant_mean(amp[:, tile_i], 2)
if res_mean != 0:
amp2[:, tile_i] = amp[:, tile_i] / res_mean
else:
amp2[:, tile_i] = 0
# This finds the normalized gain amplitude mean per frequency over all tiles, which is the final bandpass per cable group.
bandpass_single = np.empty(freq_use.size)
# If this is slow, resistant_mean can be vectorized
for f_i in range(freq_use.size):
bandpass_single[f_i] = resistant_mean(amp2[f_i, :], 2)
# Want iterative to start at 1 (to not overwrite freq) and store final bandpass per cable group.
bandpass_arr[freq_use, bandpass_col_count] = bandpass_single
bandpass_col_count += 1
# Fill temporary variable gain2, set equal to final bandpass per cable group for each tile that will use that bandpass.
# Vectorize later if possible
for tile_i in range(tile_use_cable.size):
gain2[pol_i, freq_use, tile_use_cable[tile_i]] = bandpass_single
# For the last bit at the end of the cable
if cable_i == cable_length_ref.size - 1:
# Set gain3 to the input gains
gain3 = cal["gain"][pol_i].copy()
# Set what will be passed back as the output gain as the final bandpass per cable type.
gain2_input = gain2[pol_i, :, :]
cal_bandpass_gain[pol_i] = gain2_input
# Set what will be passed back as the residual as the input gain divided by the final bandpass per cable type.
gain3[freq_use, :] /= gain2_input[freq_use, :]
cal_remainder_gain[pol_i] = gain3
# Add Levine Memo bandpass to the gain solutions here if you wish
else:
bandpass_arr = np.zeros((obs["n_freq"], cal["n_pol"] + 1))
bandpass_arr[:, 0] = obs["baseline_info"]["freq"]
for pol_i in range(cal["n_pol"]):
gain = cal["gain"][pol_i]
gain_use = gain[freq_use, :][:, tile_use]
amp = np.abs(gain_use)
amp2 = np.zeros((freq_use.size, tile_use.size))
for tile_i in range(tile_use.size):
res_mean = resistant_mean(amp[:, tile_i], 2)
if res_mean != 0:
amp2[:, tile_i] = amp[:, tile_i] / res_mean
else:
amp2[:, tile_i] = 0
bandpass_single = np.empty(freq_use.size)
# If this is slow, resistant_mean can be vectorized
for f_i in range(freq_use.size):
bandpass_single[f_i] = resistant_mean(amp2[f_i, :], 2)
bandpass_arr[freq_use, pol_i + 1] = bandpass_single
# Work out the gain for the bandpass
gain2 = np.zeros(gain.shape, dtype=np.complex128)
gain3 = deepcopy(gain)
# Vectorize later if possible
for tile_i in range(obs["n_tile"]):
gain2[freq_use, tile_i] = bandpass_single
gain3[freq_use, tile_i] /= bandpass_single
cal_bandpass_gain[pol_i, :, :] = gain2
cal_remainder_gain[pol_i, :, :] = gain3
cal_bandpass["gain"] = cal_bandpass_gain
cal_remainder["gain"] = cal_remainder_gain
return cal_bandpass, cal_remainder
[docs]
def vis_cal_polyfit(
obs: dict,
cal: dict,
auto_ratio: NDArray[np.float64] | None,
pyfhd_config: dict,
logger: Logger,
) -> dict:
"""
Reduce the degrees of freedom on the per-frequency calibration amplitudes and phases by fitting
the full frequency band with polynomials of a specified degree, with options for split polynomials
over certain regions.
In addition, fit cable reflections with a text file of fits, theoretical fits given cable length/type,
or the finding the maximum in delay space. Any of these can then be used as an initial estimate in a
hyperresolved FFT of the residual calibration solutions to fit the final mode, phase, and amplitude.
Residual calibration solutions can optionally be made by using an incoherent mean, or a mean over
all tiles which *do not* have the same cable length/type, to reduce bias in the residual, and
the cable reflections can also be fit using just the phases to reduce dependency on the polyphase
filter bank shape.
Parameters
----------
obs : dict
Observation metadata dictionary
cal : dict
Calibration dictionary
auto_ratio: NDArray[np.float64] | None
The estimation of the antenna-dependent bias through the square root of the autocorrelation
visibilities normalized via a reference tile
pyfhd_config : dict
PyFHD's configuration dictionary containing all the options set for a PyFHD run
logger : Logger
PyFHD's logger for displaying errors and info to the log files
Returns
-------
cal : dict
Calibration dictionary with polynomial gain fits
pyfhd_config : dict
PyFHD's configuration dictionary containing all the options set for a PyFHD run
Is returned in case the digital_gain_jump_polyfit was set here.
"""
# Keep the og_gain_arr for calculations later
og_gain_arr = np.copy(cal["gain"])
if (
pyfhd_config["cal_reflection_mode_theory"]
or pyfhd_config["cal_reflection_mode_file"]
or pyfhd_config["cal_reflection_mode_delay"]
or pyfhd_config["cal_reflection_hyperresolve"]
):
if (
pyfhd_config["cal_reflection_mode_theory"]
and abs(pyfhd_config["cal_reflection_mode_theory"]) > 1
):
cal_mode_fit = pyfhd_config["cal_reflection_mode_theory"]
else:
cal_mode_fit = True
else:
cal_mode_fit = False
freq_use = np.where(obs["baseline_info"]["freq_use"])[0]
tile_use = np.where(obs["baseline_info"]["tile_use"])[0]
pre_dig_inds = np.where(obs["baseline_info"]["freq"][freq_use] < 187.515e6)
if pre_dig_inds[0].size == 0:
logger.warning(
"No frequencies below 187.515MHz, using full band polyfit, digital gain jump polyfit disabled."
)
pyfhd_config["digital_gain_jump_polyfit"] = False
else:
f_d = np.max(pre_dig_inds[0])
f_end = freq_use.size
if ((f_d + 1) == f_end) and pyfhd_config["digital_gain_jump_polyfit"]:
logger.warning(
"No frequency found above 187.515MHz, using full band polyfit, digital gain jump polyfit disabled."
)
pyfhd_config["digital_gain_jump_polyfit"] = False
# If the observation date is beyond the date where digital gain jump polyfit is required, disable it
# Date is 2014-07-23 (23rd July 2014) or Julian Day 2456861.5000000
if obs["jd0"] > 2456861.5000000 and pyfhd_config["digital_gain_jump_polyfit"]:
logger.warning(
"Observation date is beyond the date where digital gain jump polyfit is required, using full band polyfit, digital gain jump polyfit disabled."
)
pyfhd_config["digital_gain_jump_polyfit"] = False
# If the amp_degree or phase_degree weren't used, then apply the defaults
if not pyfhd_config["cal_amp_degree_fit"]:
pyfhd_config["cal_amp_degree_fit"] = 2
if not pyfhd_config["cal_phase_degree_fit"]:
pyfhd_config["cal_phase_degree_fit"] = 1
# If you wish to find any steps that are outliers beyond 5sigma, and remove them add that code here.
# The cal_step_fit option isn't in the eor_defaults_wrapper or defined in the FHD dictionary.
# Polynomial fitting over the frequency band
gain_residual = np.empty((cal["n_pol"], obs["n_freq"], obs["n_tile"]))
# create amp_params with the shape of (n_pol, n_tile, amp_degree + 1)
# If we're using the digital_gain_jump_polyfit add extra dimension to amp_params
if pyfhd_config["digital_gain_jump_polyfit"]:
cal["amp_params"] = np.empty(
(
cal["n_pol"],
obs["n_tile"],
pyfhd_config["cal_amp_degree_fit"],
pyfhd_config["cal_amp_degree_fit"],
)
)
else:
cal["amp_params"] = np.empty(
(cal["n_pol"], obs["n_tile"], pyfhd_config["cal_amp_degree_fit"] + 1)
)
cal["phase_params"] = np.empty(
(cal["n_pol"], obs["n_tile"], pyfhd_config["cal_phase_degree_fit"] + 1)
)
for pol_i in range(cal["n_pol"]):
gain_arr = np.copy(cal["gain"][pol_i])
gain_amp = np.abs(gain_arr)
gain_phase = np.arctan2(gain_arr.imag, gain_arr.real)
for tile_i in range(obs["n_tile"]):
gain = np.squeeze(gain_amp[freq_use, tile_i])
gain_fit = np.zeros(obs["n_freq"])
# Pre and post digital gain jump separately for highband MWA data
if pyfhd_config["digital_gain_jump_polyfit"]:
if pre_dig_inds[0].size > 0:
fit_params1 = (
np.polynomial.Polynomial.fit(
freq_use[0 : f_d + 1],
gain[0 : f_d + 1],
deg=pyfhd_config["cal_amp_degree_fit"] - 1,
)
.convert()
.coef
)
fit_params2 = (
np.polynomial.Polynomial.fit(
freq_use[f_d + 1 : f_end],
gain[f_d + 1 : f_end],
deg=pyfhd_config["cal_amp_degree_fit"] - 1,
)
.convert()
.coef
)
for di in range(pyfhd_config["cal_amp_degree_fit"]):
gain_fit[freq_use[0] : freq_use[f_d] + 1] += fit_params1[di] * (
# Had to use the size as use freq_use[f_d] could cause
# off by one indexing errors (due to freq_use[0] potentially being
# 1 or 0 which throws off the np.arange indexing)
np.arange(gain_fit[freq_use[0] : freq_use[f_d] + 1].size)
** di
)
gain_fit[freq_use[f_d + 1] : freq_use[f_end - 1] + 1] += (
fit_params2[di]
* (
np.arange(freq_use[f_end - 1] - freq_use[f_d + 1] + 1)
+ freq_use[f_d + 1]
)
** di
)
# Do notice this is saving the coefficients on a per row basis, so fit_params1 a,b will be [pol_i, tile_i, 0, :]
# While fit_params2 a, b coefficients will be in [pol_i, tile_i, 1, :]
fit_params = np.vstack([fit_params1, fit_params2])
cal["amp_params"][pol_i, tile_i] = fit_params
else:
logger.warning(
"digital_gain_jump_polyfit only works with highband mwa data. Full band polyfit applied instead."
)
# Fit for amplitude
else:
fit_params = (
np.polynomial.Polynomial.fit(
freq_use, gain, deg=pyfhd_config["cal_amp_degree_fit"]
)
.convert()
.coef
)
cal["amp_params"][pol_i, tile_i, :] = fit_params
for di in range(pyfhd_config["cal_amp_degree_fit"]):
gain_fit += fit_params[di] * np.arange(obs["n_freq"]) ** di
gain_residual[pol_i, :, tile_i] = gain_amp[:, tile_i] - gain_fit
# Fit for phase
phase_use = np.unwrap(np.squeeze(gain_phase[freq_use, tile_i]))
phase_params = (
np.polynomial.Polynomial.fit(
freq_use, phase_use, pyfhd_config["cal_phase_degree_fit"]
)
.convert()
.coef
)
cal["phase_params"][pol_i, tile_i, :] = phase_params
phase_fit = np.zeros(obs["n_freq"])
for di in range(phase_params.size):
phase_fit += phase_params[di] * np.arange(obs["n_freq"]) ** di
gain_arr[:, tile_i] = gain_fit * np.exp(1j * phase_fit)
cal["gain"][pol_i] = gain_arr
# Cable Reflection Fitting
if cal_mode_fit:
if pyfhd_config["cal_reflection_mode_file"]:
logger.info(
"Using mwa calibration reflections fits from instrument_config/mwa_cable_reflection_coefficients.txt."
)
cable_len_filepath = importlib_resources.files(
"PyFHD.resources.instrument_config"
).joinpath(
f"{pyfhd_config['instrument']}_cable_reflection_coefficients.txt"
)
cable_reflections = np.loadtxt(cable_len_filepath, skiprows=1).transpose()
cable_length = cable_reflections[2]
tile_ref_flag = np.minimum(
np.maximum(np.zeros_like(cable_reflections[4]), cable_reflections[4]),
np.ones_like(cable_reflections[4]),
)
tile_mode_X = cable_reflections[5]
tile_amp_X = cable_reflections[6]
tile_phase_X = cable_reflections[7]
tile_mode_Y = cable_reflections[8]
tile_amp_Y = cable_reflections[9]
tile_phase_Y = cable_reflections[10]
# Modes in fourier transform units
mode_i_arr = np.zeros((cal["n_pol"], obs["n_tile"]))
mode_i_arr[0, :] = tile_mode_X * tile_ref_flag
mode_i_arr[1, :] = tile_mode_Y * tile_ref_flag
amp_arr = np.vstack([tile_amp_X, tile_amp_Y])
phase_arr = np.vstack[[tile_phase_X, tile_phase_Y]]
elif pyfhd_config["cal_reflection_mode_theory"]:
logger.info(
"Using theory calculation in nominal reflection mode calibration."
)
# Get the nominal tile lengths and velocity factors
cable_len_filepath = importlib_resources.files(
"PyFHD.resources.instrument_config"
).joinpath(f"{pyfhd_config['instrument']}_cable_length.txt")
cable_length_data = np.loadtxt(cable_len_filepath, skiprows=1).transpose()
cable_length = cable_length_data[2]
cable_vf = cable_length_data[3]
tile_ref_flag = np.minimum(np.maximum(0, cable_length_data[4]), 1)
# Nominal Reflect Time
reflect_time = (2 * cable_length) / (c.value * cable_vf)
bandwidth = (
(
np.max(obs["baseline_info"]["freq"])
- np.min(obs["baseline_info"]["freq"])
)
* obs["n_freq"]
) / (obs["n_freq"] - 1)
# Modes in fourier transform units
mode_i_arr = np.tile(
bandwidth * reflect_time * tile_ref_flag, [cal["n_pol"], 1]
)
elif pyfhd_config["cal_reflection_mode_delay"]:
logger.info(
"Using calibration delay spectrum to calculate nominal reflection modes."
)
spec_mask = np.zeros(obs["n_freq"])
spec_mask[freq_use] = 1
freq_cut = np.where(spec_mask == 0)
# IDL uses forward FFT by default
spec_psf = np.abs(np.fft.fftn(spec_mask, norm="forward"))
spec_inds = np.arange(obs["n_freq"] // 2)
spec_psf = spec_psf[spec_inds]
mode_test = np.zeros(obs["n_freq"] // 2)
for pol_i in range(cal["n_pol"]):
for ti in range(tile_use.size):
tile_i = tile_use[ti]
spec0 = np.abs(np.fft.fftn(gain_residual[pol_i, tile_i]))
mode_test += spec0[spec_inds]
psf_mask = np.zeros(obs["n_freq"] // 2)
if freq_cut[0].size > 0:
psf_mask[np.where(spec_psf > (np.max(spec_psf) / 1000))] = 1
# Replaces IDL smooth with edge_truncate
psf_mask = uniform_filter(psf_mask, size=5, mode="nearest")
mask_i = np.nonzero(psf_mask)
if mask_i[0].size > 0:
mode_test[mask_i] = 0
mode_i_arr = np.zeros((cal["n_pol"], obs["n_tile"])) + np.argmax(mode_test)
# Fit only certain cable lengths
# Positive length indicates fit mode, negative length indicates exclude mode
# This is currently assuming cal_mode_fit is an integer or number, not an array!
# If you need an array to fit or exclude cable lengths, then create another option for it
# in the config and adjust the code accordingly. Ensure every config option only has one purpose.
if auto_ratio is None and cal_mode_fit != 1:
tile_ref_logic = np.zeros(obs["n_tile"])
if cal_mode_fit > 0:
cable_cut_i = np.where(cable_length != cal_mode_fit)
if cable_cut_i[0].size > 0:
tile_ref_logic[cable_cut_i] = 1
else:
cable_cut_i = np.where(cable_length == abs(cal_mode_fit))
if cable_cut_i[0].size > 0:
tile_ref_logic[cable_cut_i] = 1
cable_cut = np.nonzero(tile_ref_logic)
tile_ref_flag = np.ones(obs["n_tile"])
if cable_cut[0].size > 0:
tile_ref_flag[cable_cut] = 0
mode_i_arr *= tile_ref_flag
cal["mode_params"] = np.empty([cal["n_pol"], obs["n_tile"], 3])
for pol_i in range(cal["n_pol"]):
# Divide the polyfit to reveal the residual cable reflections better
gain_arr = og_gain_arr[pol_i] / cal["gain"][pol_i]
for ti in range(tile_use.size):
tile_i = tile_use[ti]
mode_i = mode_i_arr[pol_i, tile_i]
if mode_i == 0:
continue
else:
# Options to hyperresolve or fit the reflection modes/amp/phase given the nominal calculations
if pyfhd_config["cal_reflection_hyperresolve"]:
# start with nominal cable length
mode0 = mode_i
# overresolve the FT used for the fit (normal resolution would be dmode=1)
dmode = 0.05
# range around the central mode to test
nmodes = 101
# array of modes to try
modes = (np.arange(nmodes) - nmodes // 2) * dmode + mode0
# reshape for ease of computing
modes = rebin(modes, (freq_use.size, nmodes)).T
if auto_ratio is not None:
# Find tiles which will *not* be accidently coherent in their cable reflection in order to reduce bias
inds = np.where(
(obs["baseline_info"]["tile_use"])
& (mode_i_arr[pol_i, :] > 0)
& ((np.abs(mode_i_arr[pol_i, :] - mode_i)) > 0.01)
)
# mean over frequency for each tile
freq_mean = np.nanmean(auto_ratio[pol_i], axis=0)
norm_autos = auto_ratio[pol_i] / rebin(
freq_mean, (obs["n_freq"], obs["n_tile"])
)
# mean over all tiles which *are not* accidently coherent as a func of freq
incoherent_mean = np.nanmean(norm_autos[:, inds[0]], axis=1)
# Residual and normalized (using incoherent mean) auto-correlation
resautos = (
norm_autos[:, tile_i] / incoherent_mean
) - np.nanmean(norm_autos[:, tile_i] / incoherent_mean)
gain_temp = rebin(
resautos[freq_use], (nmodes, freq_use.size)
)
else:
# dimension manipulation, add dim for mode fitting
# Subtract the mean so aliasing is reduced in the dft cable fitting
gain_temp = rebin(
gain_arr[freq_use, tile_i]
- np.mean(gain_arr[freq_use, tile_i]),
(nmodes, freq_use.size),
)
# freq_use matrix to multiply/collapse in fit
freq_mat = rebin(freq_use, (nmodes, freq_use.size))
# Perform DFT of gains to test modes
test_fits = np.sum(
np.exp(1j * 2 * np.pi / obs["n_freq"] * modes * freq_mat)
* gain_temp,
axis=1,
)
# Pick out highest amplitude fit (mode_ind gives the index of the mode)
amp_use = np.max(np.abs(test_fits)) / freq_use.size
mode_ind = np.argmax(np.abs(test_fits))
# Phase of said fit
phase_use = np.arctan2(
test_fits[mode_ind].imag, test_fits[mode_ind].real
)
mode_i = modes[mode_ind, 0]
# Using the mode selected from the gains, optionally use the phase to find the amp and phase
if auto_ratio is not None:
# Find tiles which will not be accidently coherent in their cable reflection in order to reduce bias
inds = np.where(
(obs["baseline_info"]["tile_use"])
& (mode_i_arr[pol_i, :] > 0)
& (np.abs(mode_i_arr[pol_i, :] - mode_i) > 0.01)
)
residual_phase = np.arctan2(
gain_arr[freq_use, :].imag, gain_arr[freq_use, :].real
)
incoherent_residual_phase = residual_phase[
:, tile_i
] - np.nanmean(residual_phase[:, inds[0]], axis=1)
test_fits = np.sum(
np.exp(
1j * 2 * np.pi / obs["n_freq"] * mode_i * freq_use
)
* incoherent_residual_phase
)
# Factor of 2 from fitting just the phase
amp_use = 2 * np.abs(test_fits) / freq_use.size
# Factor of pi/2 from just fitting the phase
phase_use = (
np.arctan2(test_fits.imag, test_fits.real) + np.pi / 2
)
elif pyfhd_config["cal_reflection_mode_file"]:
# Use predetermined fits
amp_use = amp_arr[pol_i, tile_i]
phase_use = phase_arr[pol_i, tile_i]
else:
# Use nominal delay mode, but fit amplitude and phase of reflections
mode_fit = np.sum(
np.exp(1j * 2 * np.pi / obs["n_freq"] * mode_i * freq_use)
* gain_arr[freq_use, tile_i]
)
amp_use = np.abs(mode_fit) / freq_use[0].size
phase_use = np.arctan2(mode_fit.imag, mode_fit.real)
gain_mode_fit = amp_use * np.exp(
-1j
* 2
* np.pi
* (mode_i * np.arange(obs["n_freq"]) / obs["n_freq"])
+ 1j * phase_use
)
if auto_ratio is not None:
# Only fit for the cable reflection in the phases
cal["gain"][pol_i, :, tile_i] *= np.exp(1j * gain_mode_fit.imag)
else:
cal["gain"][pol_i, :, tile_i] *= 1 + gain_mode_fit
cal["mode_params"][pol_i, tile_i] = np.array(
[mode_i, amp_use, phase_use]
)
return cal, pyfhd_config
[docs]
def vis_cal_auto_fit(
obs: dict,
cal: dict,
vis_auto: NDArray[np.float64],
vis_auto_model: NDArray[np.float64],
auto_tile_i: NDArray[np.integer],
) -> dict:
"""
Solve for each tile's calibration amplitude via the square root of the ratio of the data autocorrelation
to the model autocorrelation using the definition of a gain. Then, remove dependence on the correlated
noise term in the autocorrelations by scaling this amplitude down to the crosscorrelations gain via a
simple, linear fit. Build a full, scaled autocorrelation gain solution by adding in the phase term via
the crosscorrelation gains.
Parameters
----------
obs : dict
Observation metadata dictionary
cal : dict
Calibration dictionary
vis_auto : NDArray[np.float64]
Data autocorrelations
vis_auto_model : NDArray[np.float64]
Simulated model autocorrelations
auto_tile_i : NDArray[np.integer]
Index array of the tile array that have defined autocorrelations
Returns
-------
cal: dict
Calibration dictionary with scaled autocorrelation gains
"""
freq_i_use = np.nonzero(obs["baseline_info"]["freq_use"])
freq_i_flag = np.where(obs["baseline_info"]["freq_use"] == 0)[0]
# If the number of frequencies not being used is above 0, then ignore the frequencies surrounding them.
if freq_i_flag.size > 0:
freq_flag = np.zeros(obs["n_freq"])
freq_flag[freq_i_use] = 1
for freq_i in range(freq_i_flag.size):
minimum = max(0, freq_i_flag[freq_i] - 1)
maximum = min(obs["n_freq"], freq_i_flag[freq_i] + 2)
freq_flag[minimum:maximum] = 0
freq_i_use = np.nonzero(freq_flag)
# Vectorized loop for via_cal_auto_fit lines 45-55 in IDL
# However the logic still indexes the full 128 tiles, so need to shove
# outputs into an empty array of correct size
# We're not using the cross polarizations if they are present
auto_gain = np.empty((cal["n_pol"], obs["n_freq"], obs["n_tile"]))
auto_gain[:, :, auto_tile_i] = np.sqrt(
vis_auto[: cal["n_pol"]] * weight_invert(vis_auto_model[: cal["n_pol"]])
)
gain_cross = cal["gain"]
fit_slope = np.empty((cal["n_pol"], obs["n_tile"]))
fit_offset = np.empty_like(fit_slope)
# Didn't vectorize as the polyfit won't be vectorized
for pol_i in range(cal["n_pol"]):
for tile in range(auto_tile_i.size):
tile_idx = auto_tile_i[tile]
phase_cross_single = np.arctan2(
gain_cross[pol_i, :, tile_idx].imag, gain_cross[pol_i, :, tile_idx].real
)
gain_auto_single = np.abs(auto_gain[pol_i, :, tile_idx])
gain_cross_single = np.abs(gain_cross[pol_i, :, tile_idx])
# mask out any NaN values; numpy doesn't like them,
# I assume the IDL equiv function just masks them?
# or maybe we need to do a catch for NaNs here, and abandon all
# hope for a fit if there are NaNs?
notnan = np.where(
(np.isnan(gain_auto_single[freq_i_use]) != True)
& (np.isnan(gain_cross_single[freq_i_use]) != True)
)
gain_auto_single_fit = gain_auto_single[freq_i_use][notnan]
gain_cross_single_fit = gain_cross_single[freq_i_use][notnan]
# linfit from IDL uses chi-square error calculations to do the linear fit, instead of least squares.
# The polynomial fit uses least square method
x = np.vstack([gain_auto_single_fit, np.ones(gain_auto_single_fit.size)]).T
fit_single = np.linalg.lstsq(x, gain_cross_single_fit, rcond=None)[0]
# IDL gives the solution in terms of [A, B] while Python does [B, A] assuming we're
# solving the equation y = A + Bx
cal["gain"][pol_i, :, tile_idx] = (
gain_auto_single * fit_single[0] + fit_single[1]
) * np.exp(1j * phase_cross_single)
fit_slope[pol_i, tile_idx] = fit_single[0]
fit_offset[pol_i, tile_idx] = fit_single[1]
cal["auto_scale"] = np.sum(fit_slope, axis=1) / auto_tile_i.size
cal["auto_params"] = np.empty([cal["n_pol"], cal["n_pol"], obs["n_tile"]])
cal["auto_params"][0, :, :] = fit_offset
cal["auto_params"][1, :, :] = fit_slope
return cal
[docs]
def vis_calibration_apply(
vis_arr: NDArray[np.complex128],
obs: dict,
cal: dict,
vis_model_arr: NDArray[np.complex128],
vis_weights: NDArray[np.float64],
logger: Logger,
) -> tuple[NDArray[np.complex128], dict]:
"""
Apply the calibration solutions to the input, data visibilities to create calibrated, data visibilities using
the definition of the gains.
Definition of the gain:
(visibility for baseline of tile i and tile j) = (gain of tile i) (gain of tile j) (model visibility for baseline of tile i and tile j)
If only two othogonal polarizations were used to calibrate, calculate the phase offset between the two orthogonal dipoles to
solve for a degeneracy in the cross polarizations. The formula can be derived by multiplying the gains by an equal and opposite
phase in the linear least square solver and solving for the phase when the partial derivative w.r.t the offset is 0.
Parameters
----------
vis_arr : NDArray[np.complex128]
Uncalibrated data visiblities
obs : dict
Observation metadata dictionary
cal : dict
Calibration dictionary
vis_model_arr : NDArray[np.complex128]
Simulated model visibilites
vis_weights : NDArray[np.float64]
Weights (flags) of the visibilities
logger : Logger
PyFHD's logger for displaying errors and info to the log files
Returns
-------
vis_arr : NDArray[np.complex128]
Calibrated visibilities
cal : dict
Calibration dictionary
"""
# tile numbering starts at 1
tile_a_i = obs["baseline_info"]["tile_a"] - 1
tile_b_i = obs["baseline_info"]["tile_b"] - 1
n_pol_vis = vis_arr.shape[0]
gain_pol_arr1 = [0, 1, 0, 1]
gain_pol_arr2 = [0, 1, 1, 0]
# OK, it really makes sense to use native python functionality here
# We're just trying to match up the frequency-dependent gains to the
# correct baselines, and apply them. Can use `meshgrid` here instead of
# `rebin`, which will make 2D indexing arrays, so we can directly leave
# the gain arrays in the correct shape and index the directly. Using `rebin`
# means we have to flatten them
(
inds_a_baseline,
inds_a_freqs,
) = np.meshgrid(tile_a_i, np.arange(obs["n_freq"]))
inds_b_baseline, inds_b_freqs = np.meshgrid(tile_b_i, np.arange(obs["n_freq"]))
for pol_i in range(n_pol_vis):
gain_arr1 = cal["gain"][gain_pol_arr1[pol_i], :, :]
gain_arr2 = cal["gain"][gain_pol_arr2[pol_i], :, :]
vis_gain = gain_arr1[inds_a_freqs, inds_a_baseline] * np.conjugate(
gain_arr2[inds_b_freqs, inds_b_baseline]
)
vis_arr[pol_i, :, :] *= weight_invert(vis_gain, use_abs=False)
# We haven't run FHD in a way that uses 4 pols yet so this is all
# untested
if n_pol_vis == 4:
if type(vis_model_arr) == np.ndarray and type(vis_weights) == np.ndarray:
# This if statement replaces vis_calibrate_crosspol_phase
# as this was the only place where the function was used
# Note inside vis_calibrate_crosspol_phase there is a
# if n_pol_vis == 4 check hence only run if n_pol_vis == 4
# This code should calculate the phase fit between the X and Y
# antenna polarizations.
# Use the xx flags (yy should be identitical at this point)
# this is num baselines per time step
n_baselines = int(len(tile_a_i) / obs["n_times"])
# reshape from (n_freq, n_baselines*n_times) to (n_freq, n_times, n_baselines). Turns out due to the row major vs col major difference
# between IDL and python, this shape also changes
new_shape = (obs["n_freq"], obs["n_times"], n_baselines)
weights_use = np.reshape(vis_weights[0, :, :], new_shape)
# carried over from FHD code
weights_use = np.maximum(weights_use, np.zeros_like(weights_use))
weights_use = np.minimum(weights_use, np.ones_like(weights_use))
# Average the visbilities in time
axis_avg = 1
vis_xy = np.reshape(vis_arr[2, :, :], new_shape)
vis_xy = np.sum(vis_xy * weights_use, axis=axis_avg)
vis_yx = np.reshape(vis_arr[3, :, :], new_shape)
vis_yx = np.sum(vis_yx * weights_use, axis=axis_avg)
model_xy = np.reshape(vis_model_arr[2, :, :], new_shape)
model_xy = np.sum(model_xy * weights_use, axis=axis_avg)
model_yx = np.reshape(vis_model_arr[3, :, :], new_shape)
model_yx = np.sum(model_yx * weights_use, axis=axis_avg)
# Remove Zeros
weight = np.sum(weights_use, axis=axis_avg)
i_use = np.nonzero(weight)
vis_xy = np.squeeze(vis_xy[i_use])
vis_yx = np.squeeze(vis_yx[i_use])
model_xy = np.squeeze(model_xy[i_use])
model_yx = np.squeeze(model_yx[i_use])
vis_sum = np.sum(np.conjugate(vis_xy) * model_xy) + np.sum(
vis_yx * np.conjugate(model_yx)
)
cross_phase = np.arctan2(vis_sum.imag, vis_sum.real)
logger.info(
f"Phase fit between X and Y antenna polarizations: {cross_phase}"
)
cal["cross_phase"] = cross_phase
vis_arr[2, :, :] *= np.exp(1j * 0)
vis_arr[3, :, :] *= np.exp(-1j * 0)
return vis_arr, cal
[docs]
def vis_baseline_hist(
obs: dict,
params: dict,
vis_cal: NDArray[np.complex128],
vis_model_arr: NDArray[np.complex128],
) -> dict:
"""
Create diagnostic histograms of both the mean and sigma for the percent difference between calibrated
data and simulated model visibilities as a function of baseline length.
Parameters
----------
obs : dict
Observation metadata dictionary
params : dict
Visibility metadata dictionary
vis_cal : NDArray[np.complex128]
Calibrated data visibilities
vis_model_arr : NDArray[np.complex128]
Simulated model visibilites
Returns
-------
vis_baseline_hist : dict
Dictionary of the mean and sigma histograms for the percent difference between calibrated data
and simulated model visibilities as a function of baseline length.
"""
kx_arr = params["uu"] / obs["kpix"]
ky_arr = params["vv"] / obs["kpix"]
kr_arr = np.sqrt(kx_arr**2 + ky_arr**2)
# take the transpose of this, given our `vis_cal` and `vis_mode_arr` are the
# transpose of the original FHD code
dist_arr = np.outer(kr_arr, obs["baseline_info"]["freq"]).transpose() * obs["kpix"]
dist_hist, bins, dist_ri = histogram(
dist_arr, min=obs["min_baseline"], max=obs["max_baseline"], bin_size=5.0
)
vis_res_ratio_mean = np.zeros([obs["n_pol"], bins.size])
vis_res_sigma = np.zeros([obs["n_pol"], bins.size])
for pol_i in range(obs["n_pol"]):
for bin_i in range(bins.size):
if dist_hist[bin_i] > 0:
inds = dist_ri[dist_ri[bin_i] : dist_ri[bin_i + 1]]
model_vals = (vis_model_arr[pol_i]).flatten()[inds]
wh_noflag = np.where(np.abs(model_vals) > 0)[0]
if wh_noflag.size > 0:
inds = inds[wh_noflag]
else:
continue
# if Keyword_Set(calibration_visibilities_subtract) THEN BEGIN
# but calibration_visibilities_subtract isn't a function keyword
# so we'll only translate the False of that statement
# vis_cal_use = (vis_cal[pol_i].transpose()).flatten()[inds]
vis_cal_use = (vis_cal[pol_i]).flatten()[inds]
vis_res_ratio_mean[pol_i, bin_i] = np.mean(
np.abs(vis_cal_use - model_vals)
) / np.mean(np.abs(model_vals))
vis_res_sigma[pol_i, bin_i] = np.sqrt(
np.var(np.abs(vis_cal_use - model_vals))
) / np.mean(np.abs(model_vals))
else:
continue
return {
"baseline_length": bins.size,
"vis_res_ratio_mean": vis_res_ratio_mean,
"vis_res_sigma": vis_res_sigma,
}
[docs]
def cal_auto_ratio_divide(
obs: dict,
cal: dict,
vis_auto: NDArray[np.float64],
auto_tile_i: NDArray[np.integer],
) -> tuple[dict, NDArray[np.float64]]:
"""
Remove antenna-dependent parameters (i.e. cable reflections) from the calculated gains
to reduce the bias on individual tile variation before the creation of averaged quantities
like the global bandpass. Antenna-dependent parameters are estimated from the square root of the
autocorrelation visibilities normalized via a reference tile.
Parameters
----------
obs : dict
Observation metadata dictionary
cal : dict
Calibration dictionary
vis_auto : NDArray[np.float64]
Data autocorrelations
auto_tile_i : NDArray[np.integer]
Index array of the tile array that have defined autocorrelations
Returns
-------
cal : dict
Calibration dictionary with gains that have reduced antenna-dependent bias
auto_ratio : NDArray[np.float64]
The estimation of the antenna-dependent bias through the square root of the autocorrelation
visibilities normalized via a reference tile.
"""
auto_ratio = np.empty([cal["n_pol"], obs["n_freq"], obs["n_tile"]])
# This should be possible to Vectorize if it's slow
for pol_i in range(cal["n_pol"]):
# fhd_struct_init_cal puts the ref_antenna as 1 if it's not set, which is never appears to be
v0 = vis_auto[pol_i, :, auto_tile_i[cal["ref_antenna"]]]
auto_ratio[pol_i, :, auto_tile_i] = np.sqrt(
vis_auto[pol_i, :, np.arange(auto_tile_i.size)] * weight_invert(v0)
)
cal["gain"][pol_i] = cal["gain"][pol_i] * weight_invert(auto_ratio[pol_i])
return cal, auto_ratio
[docs]
def cal_auto_ratio_remultiply(
cal: dict, auto_ratio: NDArray[np.float64], auto_tile_i: NDArray[np.integer]
) -> dict:
"""
Return antenna-dependent parameters to the calculated gains after averaged quantities
have been calculated. The antenna-dependent parameters are captured via the square root of
the referenced autocorrelation visiblities calculated in cal_auto_ratio_divide.
Parameters
----------
obs : dict
Observation metadata dictionary
cal : dict
Calibration dictionary
auto_ratio : NDArray[np.float64]
Square root of the autocorrelation visibilities normalized via a reference tile
auto_tile_i : NDArray[np.integer]
Index array of the tile array that have defined autocorrelations
Returns
-------
cal: dict
Calibration dictonary containing the reformed gain
"""
# Replaced for loop in remultiply, this should remultiply by the auto_ratios
cal["gain"][: cal["n_pol"], :, auto_tile_i] = cal["gain"][
: cal["n_pol"], :, auto_tile_i
] * np.abs(auto_ratio[: cal["n_pol"], :, auto_tile_i])
return cal
[docs]
def calculate_adaptive_gain(
gain_list: NDArray[np.float64],
convergence_list: NDArray[np.float64],
iter: int,
base_gain: int | float,
final_convergence_estimate: float | None = None,
):
"""
Perform a Kalman filter to calculate the best weighting to use in the next iteration of the
linear least squares fitting between the data and simulated model, which reduces the number
of total iterations till convergence.
The Kalman filter takes previous convergence values, or the percent change in calibration solution from
the previous iteration, to calculate the relative weight to give to the next calculated iteration versus
the previous iteration. For example, a gain (relative weight in this context) of 1 would give the old
solution and the new solution the same weighting, and thus the next guess in the linear least square solver
is the average of the new and old solution. A gain (relative weight) of 2 would give the new solution twice
as much weight at the old solution, and thus the next guess in the linear least square solver is the sum of
the old solution and twice the new solution, all divided by three.
Parameters
----------
gain_list : NDArray[np.float64]
Relative weighting between the previous iteration and new iteration in the linear least squares
solver for calibration solutions
convergence_list : NDArray[np.float64]
An array of the percent change in the calibration solutions between one iteration and the next
iter : int
The current iteration in the linear least squares solver
base_gain : int|float
The initial relative weighting between the previous iteration and new iteration in the linear
least squares solver for calibration solutions
final_convergence_estimate : float, optional
The prediction of the percent change of the previous iteration and the forward model estimate from
the Kalman filter, by default None
Returns
-------
gain : float
Relative weighting between the previous iteration and the new iteration in the calibration
linear least squares solver
"""
if iter > 2:
# To calculate the best gain to use, compare the past gains that have been used
# with the resulting convergences to estimate the best gain to use.
# Algorithmically, this is a Kalman filter.
# If forward modeling proceeds perfectly, the convergence metric should
# asymptotically approach a final value.
# We can estimate that value from the measured changes in convergence
# weighted by the gains used in each previous iteration.
# For some applications such as calibration this may be known in advance.
# In calibration, it is expressed as the change in a
# value, in which case the final value should be zero.
if final_convergence_estimate is None:
est_final_conv = np.zeros(iter - 1)
for i in range(iter - 1):
final_convergence_test = (
(1 + gain_list[i]) * convergence_list[i + 1] - convergence_list[i]
) / gain_list[i]
# The convergence metric is strictly positive, so if the estimated final convergence is
# less than zero, force it to zero.
est_final_conv[i] = np.max((0, final_convergence_test))
# Because the estimate may slowly change over time, only use the most recent measurements.
final_convergence_estimate = np.median(est_final_conv[max(iter - 5, 0) :])
last_gain = gain_list[iter - 1]
last_conv = convergence_list[iter - 2]
new_conv = convergence_list[iter - 1]
# The predicted convergence is the value we would get if the new model calculated
# in the previous iteration was perfect. Recall that the updated model that is
# actually used is the gain-weighted average of the new and old model,
# so the convergence would be similarly weighted.
predicted_conv = (final_convergence_estimate * last_gain + last_conv) / (
base_gain + last_gain
)
# If the measured and predicted convergence are very close, that indicates
# that our forward model is accurate and we can use a more aggressive gain
# If the measured convergence is significantly worse (or better!) than predicted,
# that indicates that the model is not converging as expected and
# we should use a more conservative gain.
delta = (predicted_conv - new_conv) / (
(last_conv - final_convergence_estimate) / (base_gain + last_gain)
)
new_gain = 1 - abs(delta)
# Average the gains to prevent oscillating solutions.
new_gain = (new_gain + last_gain) / 2
# For some reason base_gain can be a numpy float in testing so putting in a tuple solves this.
gain = np.max((base_gain / 2, new_gain))
else:
gain = base_gain
gain_list[iter] = gain
return gain