Coverage for PyFHD/calibration/calibrate.py: 25%

107 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 10:58 +0800

1import numpy as np 

2from numpy.typing import NDArray 

3from copy import deepcopy 

4from logging import Logger 

5from PyFHD.calibration.calibration_utils import ( 

6 vis_extract_autocorr, 

7 vis_cal_auto_init, 

8 vis_calibration_flag, 

9 vis_cal_bandpass, 

10 vis_cal_polyfit, 

11 vis_cal_auto_fit, 

12 vis_calibration_apply, 

13 vis_baseline_hist, 

14 cal_auto_ratio_divide, 

15 cal_auto_ratio_remultiply, 

16) 

17from PyFHD.calibration.vis_calibrate_subroutine import vis_calibrate_subroutine 

18from PyFHD.pyfhd_tools.pyfhd_utils import resistant_mean, reshape_and_average_in_time 

19from PyFHD.plotting.calibration import plot_cals 

20 

21 

22def calibrate( 

23 obs: dict, 

24 params: dict, 

25 vis_arr: NDArray[np.complex128], 

26 vis_weights: NDArray[np.float64], 

27 vis_model_arr: NDArray[np.complex128], 

28 pyfhd_config: dict, 

29 logger: Logger, 

30) -> tuple[NDArray[np.complex128], dict, dict]: 

31 """ 

32 Solve for the amplitude and phase of the electronic response of each tile or station, and apply these 

33 calibration solutions to the raw, data visiblities. Various options for initial estimates, time/tile averaging, 

34 and polynomial/cable reflections fitting are available. 

35 

36 Parameters 

37 ---------- 

38 obs : dict 

39 Observation metadata dictionary 

40 params : dict 

41 Visibility metadata dictionary 

42 vis_arr : NDArray[np.complex128] 

43 Uncalibrated data visiblities 

44 vis_weights : NDArray[np.float64] 

45 Weights (flags) of the visibilities 

46 vis_model_arr : NDArray[np.complex128] 

47 Simulated model visibilites 

48 pyfhd_config : dict 

49 PyFHD's configuration dictionary containing all the options set for a PyFHD run 

50 logger : Logger 

51 PyFHD's logger for displaying errors and info to the log files 

52 

53 Returns 

54 ------- 

55 vis_cal : tuple[NDArray[np.complex128] 

56 The calibrated data visibilities 

57 cal : dict 

58 The updated calibration dictionary 

59 obs : dict 

60 Updated observation metadata dictionary 

61 pyfhd_config : dict 

62 Updated PyFHD configuration dictionary (possibly - check for any warnings in the log) 

63 """ 

64 # Initialize cal dict 

65 cal = {} 

66 # Add any default values to calibration here 

67 cal["n_pol"] = min(obs["n_pol"], 2) 

68 cal["conv_thresh"] = pyfhd_config["cal_convergence_threshold"] 

69 cal["ref_antenna"] = 1 

70 cal["ref_antenna_name"] = obs["baseline_info"]["tile_names"][cal["ref_antenna"]] 

71 

72 # Calculate auto-correlation visibilities, optionally use them for initial calibration estimates 

73 vis_auto, auto_tile_i = vis_extract_autocorr(obs, vis_arr, pyfhd_config) 

74 # Calculate auto-correlation visibilities 

75 vis_auto_model, auto_tile_i = vis_extract_autocorr(obs, vis_model_arr, pyfhd_config) 

76 # Initalize the gain 

77 if pyfhd_config["calibration_auto_initialize"]: 

78 cal["gain"] = vis_cal_auto_init( 

79 obs, cal, vis_arr, vis_model_arr, vis_auto, vis_auto_model, auto_tile_i 

80 ) 

81 else: 

82 cal["gain"] = np.full( 

83 (cal["n_pol"], obs["n_freq"], obs["n_tile"]), 

84 pyfhd_config["cal_gain_init"], 

85 dtype=np.complex128, 

86 ) 

87 

88 # Do the calibration with vis_calibrate_subroutine 

89 logger.info("Gain initialized beginning vis_calibrate subroutine") 

90 cal = vis_calibrate_subroutine( 

91 vis_arr, vis_model_arr, vis_weights, obs, cal, params, pyfhd_config, logger 

92 ) 

93 logger.info("Function vis_calibrate_subroutine has completed.") 

94 if pyfhd_config["flag_calibration"]: 

95 logger.info( 

96 "Flagging Calibration has been activated and calibration will now be flagged" 

97 ) 

98 obs = vis_calibration_flag(obs, cal, pyfhd_config, logger) 

99 

100 # Copy the cal structure with per-frequency gain solutions for future comparisons 

101 cal_base = deepcopy(cal) 

102 

103 # Perform bandpass (amp + phase per fine freq) and polynomial fitting (low order amp + phase fit plus cable reflection fit) 

104 if pyfhd_config["bandpass_calibrate"]: 

105 logger.info("You have chosen to perform a bandpass calculation and calibration") 

106 if pyfhd_config["auto_ratio_calibration"]: 

107 cal, auto_ratio = cal_auto_ratio_divide(obs, cal, vis_auto, auto_tile_i) 

108 else: 

109 auto_ratio = None 

110 cal_bandpass, cal_remainder = vis_cal_bandpass(obs, cal, pyfhd_config, logger) 

111 

112 if pyfhd_config["calibration_polyfit"]: 

113 logger.info( 

114 "You have selected to perform polynomial fits over the frequency band" 

115 ) 

116 cal_polyfit, pyfhd_config = vis_cal_polyfit( 

117 obs, cal_remainder, auto_ratio, pyfhd_config, logger 

118 ) 

119 # Replace vis_cal_combine with this line as the gain is the same size for polyfit and bandpass 

120 cal["gain"] = cal_polyfit["gain"] * cal_bandpass["gain"] 

121 for key in cal_polyfit: 

122 if key not in cal: 

123 cal[key] = cal_polyfit[key] 

124 else: 

125 cal = cal_bandpass 

126 

127 if pyfhd_config["auto_ratio_calibration"]: 

128 cal = cal_auto_ratio_remultiply(cal, auto_ratio, auto_tile_i) 

129 elif pyfhd_config["calibration_polyfit"]: 

130 cal, pyfhd_config = vis_cal_polyfit(cal, obs, None, pyfhd_config, logger) 

131 

132 # Get the gain residuals 

133 if pyfhd_config["calibration_auto_fit"]: 

134 # Get amp from auto-correlation visibilities for plotting (or optionally for the calibration solution itself) 

135 cal_auto = vis_cal_auto_fit(obs, cal, vis_auto, vis_auto_model, auto_tile_i) 

136 # These subtractions replace vis_cal_subtract 

137 cal_res_gain = cal_base["gain"] - cal_auto["gain"] 

138 else: 

139 # These subtractions replace vis_cal_subtract 

140 cal_res_gain = cal_base["gain"] - cal["gain"] 

141 

142 # If calibration_auto_fit was set then replace cal with cal_auto, usually for diagnostic purposes 

143 if pyfhd_config["calibration_auto_fit"]: 

144 cal = cal_auto 

145 # Apply Calibration 

146 logger.info("Applying the calibration") 

147 vis_cal, cal = vis_calibration_apply( 

148 vis_arr, obs, cal, vis_model_arr, vis_weights, logger 

149 ) 

150 cal["gain_residual"] = cal_res_gain 

151 

152 # Save the ratio and sigma average variance related to vis_cal 

153 logger.info("Saving the ratio and sigma average variance") 

154 if pyfhd_config["vis_baseline_hist"]: 

155 cal["vis_baseline_hist"] = vis_baseline_hist( 

156 obs, params, vis_cal, vis_model_arr 

157 ) 

158 

159 # Calculate statistics to put into the calibration dictionary for output purposes 

160 logger.info("Calculating statistics from calibration") 

161 nc_pol = min(obs["n_pol"], 2) 

162 cal_gain_avg = np.zeros(nc_pol) 

163 cal_res_avg = np.zeros(nc_pol) 

164 cal_res_restrict = np.zeros(nc_pol) 

165 cal_res_stddev = np.zeros(nc_pol) 

166 for pol_i in range(nc_pol): 

167 tile_use_i = np.where(obs["baseline_info"]["tile_use"])[0] 

168 freq_use_i = np.where(obs["baseline_info"]["freq_use"])[0] 

169 if tile_use_i.size == 0 or freq_use_i.size == 0: 

170 continue 

171 # Replaced extract_subarray with just the proper indexing 

172 gain_ref = cal["gain"][pol_i, freq_use_i, :][:, tile_use_i] 

173 gain_res = cal_res_gain[pol_i, freq_use_i][:, tile_use_i] 

174 cal_gain_avg[pol_i] = np.mean(np.abs(gain_ref)) 

175 cal_res_avg[pol_i] = np.mean(np.abs(gain_res)) 

176 res_mean = resistant_mean(np.abs(gain_res), 2) 

177 cal_res_restrict[pol_i] = res_mean 

178 cal_res_stddev[pol_i] = np.std(np.abs(gain_res)) 

179 

180 cal["mean_gain"] = cal_gain_avg 

181 cal["mean_gain_residual"] = cal_res_avg 

182 cal["mean_gain_restrict"] = cal_res_restrict 

183 cal["stddev_gain_residual"] = cal_res_stddev 

184 

185 if pyfhd_config["calibration_plots"]: 

186 logger.info( 

187 f"Plotting the calibration solutions into {pyfhd_config['output_dir']/'plots'/'calibration'}" 

188 ) 

189 plot_cals(obs, cal, pyfhd_config) 

190 

191 # Return the calibrated visibility array 

192 return vis_cal, cal, obs, pyfhd_config 

193 

194 

195def calibrate_qu_mixing( 

196 vis_arr: NDArray[np.complex128], 

197 vis_model_arr: NDArray[np.complex128], 

198 vis_weights: NDArray[np.float64], 

199 obs: dict, 

200) -> float: 

201 """ 

202 Solve for the degenerate phase between pseudo Q (YY - XX) and pseudo U (YX + XY) for the calibrated data and 

203 the simulated model separately, and return their difference. This difference represents the excess mixing 

204 angle between Q and U due to the instrumental beam not captured in a typical polarization-independent 

205 calibration. 

206 

207 Parameters 

208 ---------- 

209 vis_arr : np.ndarray 

210 Uncalibrated data visiblities 

211 vis_model_arr : np.ndarray 

212 Simulated model visibilites 

213 vis_weights : np.ndarray 

214 Weights (flags) of the visibilities 

215 obs : dict 

216 Observation metadata dictionary 

217 

218 Returns 

219 ------- 

220 stokes_mix_phase : float 

221 The excess mixing angle between Q and U from instrumental effects 

222 """ 

223 

224 n_freq = obs["n_freq"] 

225 # n_tile = obs['n_tile'] 

226 n_time = obs["n_time"] 

227 # This should be number of baselines for one time step 

228 n_baselines = obs["n_baselines"] 

229 

230 # reshape from (n_freq, n_baselines*n_times) to (n_freq, n_times, n_baselines). Turns out due to the row major vs col major difference 

231 # between IDL and python, this shape also changes 

232 new_shape = (n_freq, n_time, n_baselines) 

233 

234 # Use the xx weightss (yy should be identical at this point) 

235 weights_use = np.reshape(vis_weights[0, :, :], new_shape) 

236 # carried over from FHD code - not sure this is necessary (maybe avoids NaNs?) 

237 weights_use = np.maximum(weights_use, np.zeros_like(weights_use)) 

238 weights_use = np.minimum(weights_use, np.ones_like(weights_use)) 

239 

240 # Q = YY - XX for data 

241 pseudo_q = reshape_and_average_in_time( 

242 vis_arr[1, :, :] - vis_arr[0, :, :], n_freq, n_time, n_baselines, weights_use 

243 ) 

244 

245 # U = YX + XY for data 

246 pseudo_u = reshape_and_average_in_time( 

247 vis_arr[3, :, :] + vis_arr[2, :, :], n_freq, n_time, n_baselines, weights_use 

248 ) 

249 

250 # Q = YY - XX for model 

251 pseudo_q_model = reshape_and_average_in_time( 

252 vis_model_arr[1, :, :] - vis_model_arr[0, :, :], 

253 n_freq, 

254 n_time, 

255 n_baselines, 

256 weights_use, 

257 ) 

258 

259 # U = YX + XY for model 

260 pseudo_u_model = reshape_and_average_in_time( 

261 vis_model_arr[3, :, :] + vis_model_arr[2, :, :], 

262 n_freq, 

263 n_time, 

264 n_baselines, 

265 weights_use, 

266 ) 

267 

268 weights_t_avg = np.sum(weights_use, axis=1) 

269 

270 i_use = np.nonzero(weights_t_avg) 

271 pseudo_u = np.squeeze(pseudo_u[i_use]).flatten() 

272 pseudo_q = np.squeeze(pseudo_q[i_use]).flatten() 

273 pseudo_q_model = np.squeeze(pseudo_q_model[i_use]).flatten() 

274 pseudo_u_model = np.squeeze(pseudo_u_model[i_use]).flatten() 

275 

276 # LA_LEAST_SQUARES does not use double precision by default as such you will see differences. 

277 # Also LA_LEAST_SQUARES uses different method to numpy, generally LA_LEAST_SQUARES assumes the first matrix 

278 # has full rank, while numpy does not assume this. 

279 

280 x = np.vstack([pseudo_u, np.ones(pseudo_u.size)]).T 

281 u_q_mix = np.linalg.lstsq(x, pseudo_q, rcond=None)[0] 

282 u_q_phase = np.arctan2(u_q_mix[0].imag, u_q_mix[0].real) 

283 

284 x = np.vstack([pseudo_u_model, np.ones(pseudo_u_model.size)]).T 

285 u_q_mix_model = np.linalg.lstsq(x, pseudo_q_model, rcond=None)[0] 

286 u_q_phase_model = np.arctan2(u_q_mix_model[0].imag, u_q_mix_model[0].real) 

287 

288 return u_q_phase_model - u_q_phase