Coverage for PyFHD/calibration/calibrate.py: 25%
107 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
1import numpy as np
2from numpy.typing import NDArray
3from copy import deepcopy
4from logging import Logger
5from PyFHD.calibration.calibration_utils import (
6 vis_extract_autocorr,
7 vis_cal_auto_init,
8 vis_calibration_flag,
9 vis_cal_bandpass,
10 vis_cal_polyfit,
11 vis_cal_auto_fit,
12 vis_calibration_apply,
13 vis_baseline_hist,
14 cal_auto_ratio_divide,
15 cal_auto_ratio_remultiply,
16)
17from PyFHD.calibration.vis_calibrate_subroutine import vis_calibrate_subroutine
18from PyFHD.pyfhd_tools.pyfhd_utils import resistant_mean, reshape_and_average_in_time
19from PyFHD.plotting.calibration import plot_cals
22def calibrate(
23 obs: dict,
24 params: dict,
25 vis_arr: NDArray[np.complex128],
26 vis_weights: NDArray[np.float64],
27 vis_model_arr: NDArray[np.complex128],
28 pyfhd_config: dict,
29 logger: Logger,
30) -> tuple[NDArray[np.complex128], dict, dict]:
31 """
32 Solve for the amplitude and phase of the electronic response of each tile or station, and apply these
33 calibration solutions to the raw, data visiblities. Various options for initial estimates, time/tile averaging,
34 and polynomial/cable reflections fitting are available.
36 Parameters
37 ----------
38 obs : dict
39 Observation metadata dictionary
40 params : dict
41 Visibility metadata dictionary
42 vis_arr : NDArray[np.complex128]
43 Uncalibrated data visiblities
44 vis_weights : NDArray[np.float64]
45 Weights (flags) of the visibilities
46 vis_model_arr : NDArray[np.complex128]
47 Simulated model visibilites
48 pyfhd_config : dict
49 PyFHD's configuration dictionary containing all the options set for a PyFHD run
50 logger : Logger
51 PyFHD's logger for displaying errors and info to the log files
53 Returns
54 -------
55 vis_cal : tuple[NDArray[np.complex128]
56 The calibrated data visibilities
57 cal : dict
58 The updated calibration dictionary
59 obs : dict
60 Updated observation metadata dictionary
61 pyfhd_config : dict
62 Updated PyFHD configuration dictionary (possibly - check for any warnings in the log)
63 """
64 # Initialize cal dict
65 cal = {}
66 # Add any default values to calibration here
67 cal["n_pol"] = min(obs["n_pol"], 2)
68 cal["conv_thresh"] = pyfhd_config["cal_convergence_threshold"]
69 cal["ref_antenna"] = 1
70 cal["ref_antenna_name"] = obs["baseline_info"]["tile_names"][cal["ref_antenna"]]
72 # Calculate auto-correlation visibilities, optionally use them for initial calibration estimates
73 vis_auto, auto_tile_i = vis_extract_autocorr(obs, vis_arr, pyfhd_config)
74 # Calculate auto-correlation visibilities
75 vis_auto_model, auto_tile_i = vis_extract_autocorr(obs, vis_model_arr, pyfhd_config)
76 # Initalize the gain
77 if pyfhd_config["calibration_auto_initialize"]:
78 cal["gain"] = vis_cal_auto_init(
79 obs, cal, vis_arr, vis_model_arr, vis_auto, vis_auto_model, auto_tile_i
80 )
81 else:
82 cal["gain"] = np.full(
83 (cal["n_pol"], obs["n_freq"], obs["n_tile"]),
84 pyfhd_config["cal_gain_init"],
85 dtype=np.complex128,
86 )
88 # Do the calibration with vis_calibrate_subroutine
89 logger.info("Gain initialized beginning vis_calibrate subroutine")
90 cal = vis_calibrate_subroutine(
91 vis_arr, vis_model_arr, vis_weights, obs, cal, params, pyfhd_config, logger
92 )
93 logger.info("Function vis_calibrate_subroutine has completed.")
94 if pyfhd_config["flag_calibration"]:
95 logger.info(
96 "Flagging Calibration has been activated and calibration will now be flagged"
97 )
98 obs = vis_calibration_flag(obs, cal, pyfhd_config, logger)
100 # Copy the cal structure with per-frequency gain solutions for future comparisons
101 cal_base = deepcopy(cal)
103 # Perform bandpass (amp + phase per fine freq) and polynomial fitting (low order amp + phase fit plus cable reflection fit)
104 if pyfhd_config["bandpass_calibrate"]:
105 logger.info("You have chosen to perform a bandpass calculation and calibration")
106 if pyfhd_config["auto_ratio_calibration"]:
107 cal, auto_ratio = cal_auto_ratio_divide(obs, cal, vis_auto, auto_tile_i)
108 else:
109 auto_ratio = None
110 cal_bandpass, cal_remainder = vis_cal_bandpass(obs, cal, pyfhd_config, logger)
112 if pyfhd_config["calibration_polyfit"]:
113 logger.info(
114 "You have selected to perform polynomial fits over the frequency band"
115 )
116 cal_polyfit, pyfhd_config = vis_cal_polyfit(
117 obs, cal_remainder, auto_ratio, pyfhd_config, logger
118 )
119 # Replace vis_cal_combine with this line as the gain is the same size for polyfit and bandpass
120 cal["gain"] = cal_polyfit["gain"] * cal_bandpass["gain"]
121 for key in cal_polyfit:
122 if key not in cal:
123 cal[key] = cal_polyfit[key]
124 else:
125 cal = cal_bandpass
127 if pyfhd_config["auto_ratio_calibration"]:
128 cal = cal_auto_ratio_remultiply(cal, auto_ratio, auto_tile_i)
129 elif pyfhd_config["calibration_polyfit"]:
130 cal, pyfhd_config = vis_cal_polyfit(cal, obs, None, pyfhd_config, logger)
132 # Get the gain residuals
133 if pyfhd_config["calibration_auto_fit"]:
134 # Get amp from auto-correlation visibilities for plotting (or optionally for the calibration solution itself)
135 cal_auto = vis_cal_auto_fit(obs, cal, vis_auto, vis_auto_model, auto_tile_i)
136 # These subtractions replace vis_cal_subtract
137 cal_res_gain = cal_base["gain"] - cal_auto["gain"]
138 else:
139 # These subtractions replace vis_cal_subtract
140 cal_res_gain = cal_base["gain"] - cal["gain"]
142 # If calibration_auto_fit was set then replace cal with cal_auto, usually for diagnostic purposes
143 if pyfhd_config["calibration_auto_fit"]:
144 cal = cal_auto
145 # Apply Calibration
146 logger.info("Applying the calibration")
147 vis_cal, cal = vis_calibration_apply(
148 vis_arr, obs, cal, vis_model_arr, vis_weights, logger
149 )
150 cal["gain_residual"] = cal_res_gain
152 # Save the ratio and sigma average variance related to vis_cal
153 logger.info("Saving the ratio and sigma average variance")
154 if pyfhd_config["vis_baseline_hist"]:
155 cal["vis_baseline_hist"] = vis_baseline_hist(
156 obs, params, vis_cal, vis_model_arr
157 )
159 # Calculate statistics to put into the calibration dictionary for output purposes
160 logger.info("Calculating statistics from calibration")
161 nc_pol = min(obs["n_pol"], 2)
162 cal_gain_avg = np.zeros(nc_pol)
163 cal_res_avg = np.zeros(nc_pol)
164 cal_res_restrict = np.zeros(nc_pol)
165 cal_res_stddev = np.zeros(nc_pol)
166 for pol_i in range(nc_pol):
167 tile_use_i = np.where(obs["baseline_info"]["tile_use"])[0]
168 freq_use_i = np.where(obs["baseline_info"]["freq_use"])[0]
169 if tile_use_i.size == 0 or freq_use_i.size == 0:
170 continue
171 # Replaced extract_subarray with just the proper indexing
172 gain_ref = cal["gain"][pol_i, freq_use_i, :][:, tile_use_i]
173 gain_res = cal_res_gain[pol_i, freq_use_i][:, tile_use_i]
174 cal_gain_avg[pol_i] = np.mean(np.abs(gain_ref))
175 cal_res_avg[pol_i] = np.mean(np.abs(gain_res))
176 res_mean = resistant_mean(np.abs(gain_res), 2)
177 cal_res_restrict[pol_i] = res_mean
178 cal_res_stddev[pol_i] = np.std(np.abs(gain_res))
180 cal["mean_gain"] = cal_gain_avg
181 cal["mean_gain_residual"] = cal_res_avg
182 cal["mean_gain_restrict"] = cal_res_restrict
183 cal["stddev_gain_residual"] = cal_res_stddev
185 if pyfhd_config["calibration_plots"]:
186 logger.info(
187 f"Plotting the calibration solutions into {pyfhd_config['output_dir']/'plots'/'calibration'}"
188 )
189 plot_cals(obs, cal, pyfhd_config)
191 # Return the calibrated visibility array
192 return vis_cal, cal, obs, pyfhd_config
195def calibrate_qu_mixing(
196 vis_arr: NDArray[np.complex128],
197 vis_model_arr: NDArray[np.complex128],
198 vis_weights: NDArray[np.float64],
199 obs: dict,
200) -> float:
201 """
202 Solve for the degenerate phase between pseudo Q (YY - XX) and pseudo U (YX + XY) for the calibrated data and
203 the simulated model separately, and return their difference. This difference represents the excess mixing
204 angle between Q and U due to the instrumental beam not captured in a typical polarization-independent
205 calibration.
207 Parameters
208 ----------
209 vis_arr : np.ndarray
210 Uncalibrated data visiblities
211 vis_model_arr : np.ndarray
212 Simulated model visibilites
213 vis_weights : np.ndarray
214 Weights (flags) of the visibilities
215 obs : dict
216 Observation metadata dictionary
218 Returns
219 -------
220 stokes_mix_phase : float
221 The excess mixing angle between Q and U from instrumental effects
222 """
224 n_freq = obs["n_freq"]
225 # n_tile = obs['n_tile']
226 n_time = obs["n_time"]
227 # This should be number of baselines for one time step
228 n_baselines = obs["n_baselines"]
230 # reshape from (n_freq, n_baselines*n_times) to (n_freq, n_times, n_baselines). Turns out due to the row major vs col major difference
231 # between IDL and python, this shape also changes
232 new_shape = (n_freq, n_time, n_baselines)
234 # Use the xx weightss (yy should be identical at this point)
235 weights_use = np.reshape(vis_weights[0, :, :], new_shape)
236 # carried over from FHD code - not sure this is necessary (maybe avoids NaNs?)
237 weights_use = np.maximum(weights_use, np.zeros_like(weights_use))
238 weights_use = np.minimum(weights_use, np.ones_like(weights_use))
240 # Q = YY - XX for data
241 pseudo_q = reshape_and_average_in_time(
242 vis_arr[1, :, :] - vis_arr[0, :, :], n_freq, n_time, n_baselines, weights_use
243 )
245 # U = YX + XY for data
246 pseudo_u = reshape_and_average_in_time(
247 vis_arr[3, :, :] + vis_arr[2, :, :], n_freq, n_time, n_baselines, weights_use
248 )
250 # Q = YY - XX for model
251 pseudo_q_model = reshape_and_average_in_time(
252 vis_model_arr[1, :, :] - vis_model_arr[0, :, :],
253 n_freq,
254 n_time,
255 n_baselines,
256 weights_use,
257 )
259 # U = YX + XY for model
260 pseudo_u_model = reshape_and_average_in_time(
261 vis_model_arr[3, :, :] + vis_model_arr[2, :, :],
262 n_freq,
263 n_time,
264 n_baselines,
265 weights_use,
266 )
268 weights_t_avg = np.sum(weights_use, axis=1)
270 i_use = np.nonzero(weights_t_avg)
271 pseudo_u = np.squeeze(pseudo_u[i_use]).flatten()
272 pseudo_q = np.squeeze(pseudo_q[i_use]).flatten()
273 pseudo_q_model = np.squeeze(pseudo_q_model[i_use]).flatten()
274 pseudo_u_model = np.squeeze(pseudo_u_model[i_use]).flatten()
276 # LA_LEAST_SQUARES does not use double precision by default as such you will see differences.
277 # Also LA_LEAST_SQUARES uses different method to numpy, generally LA_LEAST_SQUARES assumes the first matrix
278 # has full rank, while numpy does not assume this.
280 x = np.vstack([pseudo_u, np.ones(pseudo_u.size)]).T
281 u_q_mix = np.linalg.lstsq(x, pseudo_q, rcond=None)[0]
282 u_q_phase = np.arctan2(u_q_mix[0].imag, u_q_mix[0].real)
284 x = np.vstack([pseudo_u_model, np.ones(pseudo_u_model.size)]).T
285 u_q_mix_model = np.linalg.lstsq(x, pseudo_q_model, rcond=None)[0]
286 u_q_phase_model = np.arctan2(u_q_mix_model[0].imag, u_q_mix_model[0].real)
288 return u_q_phase_model - u_q_phase