Coverage for PyFHD/calibration/vis_calibrate_subroutine.py: 89%

234 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 10:58 +0800

1import numpy as np 

2from numpy.typing import NDArray 

3import warnings 

4from logging import Logger 

5from PyFHD.calibration.calibration_utils import calculate_adaptive_gain 

6from PyFHD.pyfhd_tools.pyfhd_utils import weight_invert, histogram, idl_median 

7 

8 

9def vis_calibrate_subroutine( 

10 vis_arr: NDArray[np.complex128], 

11 vis_model_ptr: NDArray[np.complex128], 

12 vis_weight_ptr: NDArray[np.float64], 

13 obs: dict, 

14 cal: dict, 

15 params: dict, 

16 pyfhd_config: dict, 

17 logger: Logger, 

18 calibration_weights=False, 

19 no_ref_tile=False, 

20): 

21 """ 

22 Perform a linear least-squares regression between the data visilbities and the simulated model 

23 visiblities to solve for an amplitude and phase for each tile for each frequency channel. 

24 

25 Parameters 

26 ---------- 

27 vis_arr : NDArray[np.complex128] 

28 Uncalibrated data visiblities 

29 vis_model_ptr : NDArray[np.complex128] 

30 Simulated model visibilites 

31 vis_weight_ptr : NDArray[np.float64] 

32 Weights (flags) of the visibilities 

33 obs : dict 

34 Observation metadata dictionary 

35 cal : dict 

36 Calibration dictionary 

37 pyfhd_config : dict 

38 PyFHD's configuration dictionary containing all the options set for a PyFHD run 

39 calibration_weights : bool, optional 

40 Weight the visibilities at the minimum and maximum baseline by a soft taper for 

41 calibration purposes only, by default False 

42 no_ref_tile : bool, optional 

43 Do not reference calibration phases at this stage, by default False 

44 

45 Returns 

46 ------- 

47 cal : dict 

48 Calibration dictionary 

49 

50 Raises 

51 ------ 

52 ValueError 

53 Should almost never happen, only gets raised for max_cal_iter has been set to less than 5, however 

54 since it's currently hardcoded in here it's more of an exception for any developers of PyFHD 

55 """ 

56 # Retrieve values from data structures 

57 # There is a few hardcoded values in here that were previously hardcoded in fhd_struct_init_cal 

58 # If you wish to change them, add them to the pyfhd_config through pyfhd_setup and the config in pyfhd.yaml 

59 reference_tile = 1 

60 min_baseline = obs["min_baseline"] 

61 max_baseline = obs["max_baseline"] 

62 dimension = obs["dimension"] 

63 elements = obs["elements"] 

64 min_cal_baseline = ( 

65 max(pyfhd_config["min_cal_baseline"], obs["min_baseline"]) 

66 if pyfhd_config["min_cal_baseline"] 

67 else obs["min_baseline"] 

68 ) 

69 max_cal_baseline = ( 

70 min(pyfhd_config["max_cal_baseline"], obs["max_baseline"]) 

71 if pyfhd_config["max_cal_baseline"] 

72 else obs["max_baseline"] 

73 ) 

74 # minimum number of calibration equations needed to solve for the gain of one baseline 

75 min_cal_solutions = 5 

76 # For record keeping 

77 cal["min_cal_solutions"] = min_cal_solutions 

78 # average the visibilities across time steps before solving for the gains 

79 time_average = pyfhd_config["cal_time_average"] 

80 # maximum iterations to perform for the linear least-squares solver 

81 max_cal_iter = pyfhd_config["max_cal_iter"] 

82 # Leave a warning if its less than 5 iterations, or an Error if its less than 1 

83 if max_cal_iter < 5: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 warnings.warn( 

85 "At Least 5 calibrations iterations is recommended.\nYou're currently using {} iterations".format( 

86 int(max_cal_iter) 

87 ) 

88 ) 

89 elif max_cal_iter < 1: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 raise ValueError( 

91 "max_cal_iter should be 1 or more. A max_cal_iter of 5 or more is recommended" 

92 ) 

93 conv_thresh = pyfhd_config["cal_convergence_threshold"] 

94 use_adaptive_gain = pyfhd_config["cal_adaptive_calibration_gain"] 

95 base_gain = pyfhd_config["cal_base_gain"] 

96 # halt if the strict convergence is worse than most of the last x iterations 

97 divergence_history = 3 

98 # halt if the convergence gets significantly worse by a factor of x in one iteration 

99 divergence_factor = 1.5 

100 # For record keeping 

101 cal["divergence_history"] = 3 

102 cal["divergence_factor"] = 1.5 

103 n_pol = cal["n_pol"] 

104 n_freq = obs["n_freq"] 

105 n_tile = obs["n_tile"] 

106 n_time = obs["n_time"] 

107 # weights WILL be over-written! (Only for NAN gain solutions) 

108 vis_weight_ptr_use = vis_weight_ptr 

109 # tile_a & tile_b contribution indexed from 0 

110 tile_A_i = obs["baseline_info"]["tile_a"] - 1 

111 tile_B_i = obs["baseline_info"]["tile_b"] - 1 

112 freq_arr = obs["baseline_info"]["freq"] 

113 n_baselines = obs["n_baselines"] 

114 if pyfhd_config["cal_phase_fit_iter"]: 114 ↛ 118line 114 didn't jump to line 118 because the condition on line 114 was always true

115 phase_fit_iter = pyfhd_config["cal_phase_fit_iter"] 

116 else: 

117 # This gets set multiple times in FHD, by default it's 4, and is set in fhd_struct_init_cal 

118 phase_fit_iter = np.min([np.floor(max_cal_iter / 4), 4]) 

119 kbinsize = obs["kpix"] 

120 

121 cal["convergence"] = np.zeros([n_pol, n_freq, n_tile]) 

122 cal["conv_iter"] = np.zeros([n_pol, n_freq, n_tile]) 

123 cal["n_converged"] = np.zeros(n_pol) 

124 cal["tile_flag"] = np.zeros([n_tile], dtype=np.bool) 

125 for pol_i in range(n_pol): 

126 logger.info( 

127 f"Beginning Calibration for polarization {pol_i} ({obs['pol_names'][pol_i]})" 

128 ) 

129 convergence = np.zeros((n_freq, n_tile)) 

130 conv_iter_arr = np.zeros((n_freq, n_tile)) 

131 # Want to ensure we're not affecting the current array till we overwrite it 

132 gain_arr = cal["gain"][pol_i].copy() 

133 

134 # Average the visibilities over the time steps before solving for the gains solutions 

135 # This is not recommended, as longer baselines will be downweighted artifically. 

136 if time_average: 

137 # The visibilities have dimension nfreq x (n_baselines x n_time), 

138 # which can be reformed to nfreq x n_baselines x n_time 

139 tile_A_i = tile_A_i[0:n_baselines] 

140 tile_B_i = tile_B_i[0:n_baselines] 

141 # So IDL does reforms as REFORM(x, cols, rows, num_of_col_row_arrays) 

142 # Python is row-major, so we need to flip that shape that is used in REFORM 

143 # Although, to get the same results in the shape we want we need to do a few 

144 # transposes 

145 shape = np.array([n_time, n_baselines, n_freq]) 

146 # Transpose here so it's in the same shape as IDL when reshaping, then transpose 

147 # back to get the shape we need to suit the shape of the rest of the arrays 

148 # I suspect this is because the reshape has no guarantee of the memory layout 

149 # at the end, which means we swap indexes of values that is not consistent with IDL 

150 vis_weight_use_reshaped = np.reshape(vis_weight_ptr_use[pol_i].T, shape).T 

151 vis_weight_use = np.maximum(vis_weight_use_reshaped, 0) 

152 vis_weight_use = np.minimum(vis_weight_use, 1) 

153 vis_model = np.reshape(vis_model_ptr[pol_i].T, shape).T 

154 vis_model = np.sum(vis_model * vis_weight_use, axis=2) 

155 vis_measured = np.reshape(vis_arr[pol_i].T, shape).T 

156 vis_avg = np.sum(vis_measured * vis_weight_use, axis=2) 

157 weight = np.sum(vis_weight_use, axis=2) 

158 

159 kx_arr = params["uu"][0:n_baselines] / kbinsize 

160 ky_arr = params["vv"][0:n_baselines] / kbinsize 

161 else: 

162 # In the case of not using a time_average do the following setup instead for weight and vis_avg 

163 vis_weight_use = np.maximum(0, vis_weight_ptr_use[pol_i]) 

164 vis_weight_use = np.minimum(vis_weight_use, 1) 

165 vis_model = vis_model_ptr[pol_i] * vis_weight_use 

166 vis_avg = vis_arr[pol_i] * vis_weight_use 

167 weight = vis_weight_use 

168 

169 kx_arr = params["uu"] / kbinsize 

170 ky_arr = params["vv"] / kbinsize 

171 # Now use the common code from the two possibilities in vis_calibrate_subroutine.pro 

172 kr_arr = np.sqrt(kx_arr**2 + ky_arr**2) 

173 # When IDL does a matrix multiply on two 1D vectors it does the outer product. 

174 dist_arr = np.outer(kr_arr, freq_arr).T * kbinsize 

175 xcen = np.outer(abs(kx_arr), freq_arr).T 

176 ycen = np.outer(abs(ky_arr), freq_arr).T 

177 if calibration_weights: 

178 flag_dist_cut = np.where( 

179 (dist_arr < min_baseline) 

180 | (xcen > (elements / 2)) 

181 | (ycen > (dimension / 2)) 

182 ) 

183 if min_cal_baseline > min_baseline: 183 ↛ 188line 183 didn't jump to line 188 because the condition on line 183 was always true

184 taper_min = np.maximum( 

185 (np.sqrt(2) * min_cal_baseline - dist_arr) / min_cal_baseline, 0 

186 ) 

187 else: 

188 taper_min = 0 

189 if max_cal_baseline < max_baseline: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 taper_max = np.maximum( 

191 (dist_arr - max_cal_baseline) / min_cal_baseline, 0 

192 ) 

193 else: 

194 taper_max = 0 

195 baseline_weights = np.maximum(1 - (taper_min + taper_max) ** 2, 0) 

196 else: 

197 flag_dist_cut = np.where( 

198 (dist_arr < min_cal_baseline) 

199 | (dist_arr > max_cal_baseline) 

200 | (xcen > (elements / 2)) 

201 | (ycen > (dimension / 2)) 

202 ) 

203 # Remove kx_arr, ky_arr and dist_arr from the namespace, allow garbage collector to do its work 

204 del (kx_arr, ky_arr, dist_arr) 

205 

206 if np.size(flag_dist_cut) > 0: 206 ↛ 208line 206 didn't jump to line 208 because the condition on line 206 was always true

207 weight[flag_dist_cut] = 0 

208 vis_avg *= weight_invert(weight) 

209 vis_model *= weight_invert(weight) 

210 

211 tile_use_flag = obs["baseline_info"]["tile_use"] 

212 freq_use_flag = obs["baseline_info"]["freq_use"] 

213 

214 freq_weight = np.sum(weight, axis=1) 

215 baseline_weight = np.sum(weight, axis=0) 

216 freq_use = np.where((freq_weight > 0) & (freq_use_flag > 0))[0] 

217 baseline_use = np.nonzero(baseline_weight) 

218 hist_tile_A, _, riA = histogram(tile_A_i[baseline_use], min=0, max=n_tile - 1) 

219 hist_tile_B, _, riB = histogram(tile_B_i[baseline_use], min=0, max=n_tile - 1) 

220 tile_use = np.where(((hist_tile_A + hist_tile_B) > 0) & (tile_use_flag > 0))[0] 

221 tile_flag = np.where(((hist_tile_A + hist_tile_B) == 0) & (tile_use_flag == 0))[ 

222 0 

223 ] 

224 

225 # Should be able to reduce precision if memory is a concern 

226 tile_A_i_use = np.zeros(np.size(baseline_use), dtype=np.int64) 

227 tile_B_i_use = np.zeros(np.size(baseline_use), dtype=np.int64) 

228 for tile_i in range(np.size(tile_use)): 

229 if hist_tile_A[tile_use[tile_i]] > 0: 

230 # Calculate tile contributions for each non-flagged baseline 

231 tile_A_i_use[riA[riA[tile_use[tile_i]] : riA[tile_use[tile_i] + 1]]] = ( 

232 tile_i 

233 ) 

234 if hist_tile_B[tile_use[tile_i]] > 0: 

235 # Calculate tile contributions for each non-flagged baseline 

236 tile_B_i_use[riB[riB[tile_use[tile_i]] : riB[tile_use[tile_i] + 1]]] = ( 

237 tile_i 

238 ) 

239 

240 ref_tile_use = np.where(reference_tile == tile_use) 

241 if ref_tile_use[0].size == 0: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 ref_tile_use = 0 

243 cal["ref_antenna"] = tile_use[ref_tile_use] 

244 cal["ref_antenna_name"] = obs["baseline_info"]["tile_names"][ 

245 cal["ref_antenna"] 

246 ] 

247 else: 

248 # Extract out value to avoid any weird stuff happening 

249 ref_tile_use = ref_tile_use[0][0] 

250 # Replace all NaNs with 0's 

251 vis_model[np.isnan(vis_model)] = 0 

252 

253 conv_test = np.zeros((freq_use.size, max_cal_iter)) 

254 n_converged = 0 

255 for fii in range(freq_use.size): 

256 fi = freq_use[fii] 

257 gain_curr = gain_arr[fi, tile_use] 

258 # Set up data and model arrays of the original and conjugated versions. This 

259 # provides twice as many equations into the linear least-squares solver. 

260 vis_data2 = np.squeeze(vis_avg[fi, baseline_use]) 

261 vis_data2 = np.hstack([vis_data2, np.conj(vis_data2)]) 

262 vis_model2 = np.squeeze(vis_model[fi, baseline_use]) 

263 vis_model2 = np.hstack([vis_model2, np.conj(vis_model2)]) 

264 weight2 = np.squeeze(weight[fi, baseline_use]) 

265 weight2 = np.hstack([weight2, weight2]) 

266 if calibration_weights: 

267 baseline_wts2 = np.squeeze(baseline_weights[fi, baseline_use]) 

268 # Concatenate two of the same array, why? 

269 baseline_wts2 = np.hstack([baseline_wts2, baseline_wts2]) 

270 

271 b_i_use = np.where(weight2 > 0) 

272 weight2 = weight2[b_i_use] 

273 vis_data2 = vis_data2[b_i_use] 

274 vis_model2 = vis_model2[b_i_use] 

275 

276 A_ind = np.hstack([tile_A_i_use, tile_B_i_use]) 

277 A_ind = A_ind[b_i_use] 

278 B_ind = np.hstack([tile_B_i_use, tile_A_i_use]) 

279 B_ind = B_ind[b_i_use] 

280 

281 A_ind_arr = [] 

282 n_arr = np.zeros(tile_use.size) 

283 for tile_i in range(tile_use.size): 

284 inds = np.where(A_ind == tile_i)[0] 

285 if inds.size > 1: 285 ↛ 288line 285 didn't jump to line 288 because the condition on line 285 was always true

286 A_ind_arr.append(np.reshape(inds, (inds.size, 1))) 

287 else: 

288 A_ind_arr.append(-1) 

289 # NEED SOMETHING MORE IN CASE INDIVIDUAL TILES ARE FLAGGED FOR ONLY A FEW FREQUENCIES!! 

290 n_arr[tile_i] = inds.size 

291 # I suspect a list of lists maybe faster than the object array, check during optimization 

292 # Although I doubt it will make a huge difference. 

293 A_ind_arr = np.array(A_ind_arr, dtype=object) 

294 # For tiles which don't satisfy the minimum number of solutions, pre-emptively set them to 0 

295 # in order to prevent certain failure in meeting strict convergence threshold 

296 inds_min_cal = np.where(n_arr < min_cal_solutions)[0] 

297 if inds_min_cal.size > 0: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 gain_curr[inds_min_cal] = 0 

299 gain_new = np.zeros(tile_use.size, dtype=np.complex128) 

300 convergence_list = np.zeros(max_cal_iter) 

301 conv_gain_list = np.zeros(max_cal_iter) 

302 convergence_loose = 0 

303 for i in range(max_cal_iter): 

304 convergence_loose_prev = convergence_loose 

305 divergence_flag = 0 

306 vis_use = vis_data2 

307 

308 vis_model_matrix = vis_model2 * np.conj(gain_curr[B_ind]) 

309 for tile_i in range(tile_use.size): 

310 if n_arr[tile_i] >= min_cal_solutions: 310 ↛ 309line 310 didn't jump to line 309 because the condition on line 310 was always true

311 if calibration_weights: 

312 xmat = vis_model_matrix[A_ind_arr[tile_i]] 

313 # For some reason IDL multiplcation just allows two arrays of 

314 # very dissimilar sizes to be multiplied by just ignoring everything 

315 # after the index of the smallest one!?! Could be worth checking this is 

316 # what this line was meant to do. 

317 xmat = xmat.flatten() 

318 xmat_dag = np.conj(xmat) * baseline_wts2[0 : xmat.size] 

319 gain_new[tile_i] = ( 

320 1 

321 / np.dot(xmat, xmat_dag) 

322 * np.dot( 

323 np.transpose(vis_use[A_ind_arr[tile_i]]), xmat_dag 

324 ) 

325 )[0] 

326 else: 

327 gain_new[tile_i] = np.linalg.lstsq( 

328 vis_model_matrix[A_ind_arr[tile_i]], 

329 vis_use[A_ind_arr[tile_i]], 

330 rcond=None, 

331 )[0][0][0] 

332 

333 gain_old = gain_curr.copy() 

334 if np.sum(np.abs(gain_new)) == 0: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 gain_curr = gain_new.copy() 

336 # Break the loop 

337 break 

338 if phase_fit_iter - i > 0: 

339 # fit only phase at first 

340 gain_new *= np.abs(gain_old) * weight_invert(np.abs(gain_new)) 

341 if use_adaptive_gain: 

342 conv_gain = calculate_adaptive_gain( 

343 conv_gain_list, 

344 convergence_list, 

345 i, 

346 base_gain, 

347 final_convergence_estimate=0, 

348 ) 

349 else: 

350 conv_gain = base_gain 

351 gain_curr = (gain_new * conv_gain + gain_old * base_gain) / ( 

352 base_gain + conv_gain 

353 ) 

354 dgain = np.abs(gain_curr) * weight_invert(np.abs(gain_old)) 

355 diverge_i = np.where(dgain < np.abs(gain_old) / 2)[0] 

356 if diverge_i.size > 0: 

357 gain_curr[diverge_i] = ( 

358 gain_new[diverge_i] + gain_old[diverge_i] * 2 

359 ) / 3 

360 if np.size(np.where(np.isnan(gain_curr))) > 0: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true

361 gain_curr[np.where(np.isnan(gain_curr))] = gain_old[ 

362 np.where(np.isnan(gain_curr)) 

363 ] 

364 if not no_ref_tile: 364 ↛ 368line 364 didn't jump to line 368 because the condition on line 364 was always true

365 gain_curr *= np.conj(gain_curr[ref_tile_use]) / np.abs( 

366 gain_curr[ref_tile_use] 

367 ) 

368 convergence_strict = np.max( 

369 np.abs(gain_curr - gain_old) * weight_invert(np.abs(gain_old)) 

370 ) 

371 convergence_loose = np.mean( 

372 np.abs(gain_curr - gain_old) * weight_invert(np.abs(gain_old)) 

373 ) 

374 convergence_list[i] = convergence_strict 

375 conv_test[fii, i] = convergence_strict 

376 if i > phase_fit_iter + divergence_history: 

377 if convergence_strict <= conv_thresh: 

378 # Stop if the solution has converged to the specified threshold 

379 n_converged += 1 

380 break 

381 if convergence_loose >= convergence_loose_prev: 

382 # Stop if the solutions are no longer converging 

383 if (convergence_loose <= conv_thresh) or ( 

384 convergence_loose_prev <= conv_thresh 

385 ): 

386 n_converged += 1 

387 if convergence_loose <= conv_thresh: 

388 # If the previous solution met the threshold, but the current one did not, then 

389 # back up one iteration and use the previous solution 

390 gain_curr = gain_old.copy() 

391 convergence[fi, tile_use] = conv_test[fii, i - 1] 

392 conv_iter_arr[fi, tile_use] = i - 1 

393 break 

394 else: 

395 # Halt if the strict convergence is worse than most of the recent iterations 

396 # Needed to use IDL_MEDIAN here as the numpy median was producing values different 

397 # from IDL as the even keyword was not used. This need to may change over time as 

398 # the calibration gets adjusted for double precision 

399 divergence_test_1 = convergence_strict >= idl_median( 

400 conv_test[fii, i - divergence_history - 1 : i] 

401 ) 

402 # Also halt if the convergence gets significantly worse in one iteration 

403 divergence_test_2 = ( 

404 convergence_strict 

405 >= np.min(conv_test[fii, 0:i]) * divergence_factor 

406 ) 

407 # possible bug fix; should we really test for divergence when only 

408 # fitting the phase? Fix below doesn't use the phase-only portion 

409 # of fitting when checking for divergence 

410 # divergence_test_2 = convergence_strict >= np.min(conv_test[int(phase_fit_iter): i, fii]) * divergence_factor 

411 if divergence_test_1 or divergence_test_2: 

412 # If both measures of convergence are getting worse, we need to stop. 

413 logger.info( 

414 f"Calibration diverged at iteration: {i}, for pol_i: {pol_i}, freq_i: {fi}. Convergence was: {conv_test[fii, i - 1]} and the threshold was: {conv_thresh}" 

415 ) 

416 divergence_flag = True 

417 break 

418 if divergence_flag: 

419 # If the solution diverged, back up one iteration and use the previous solution 

420 gain_curr = gain_old.copy() 

421 convergence[fi, tile_use] = conv_test[fii, i - 1] 

422 conv_iter_arr[fi, tile_use] = i - 1 

423 else: 

424 convergence[fi, tile_use] = np.abs( 

425 gain_curr - gain_old 

426 ) * weight_invert(np.abs(gain_old)) 

427 conv_iter_arr[fi, tile_use] = i 

428 if i == max_cal_iter: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true

429 logger.info( 

430 f"Calibration reach max iterations before converging for pol_i: {pol_i} and freq_i: {fi}. Convergence was: {conv_test[fii, i - 1]} and the threshold was: {conv_thresh}" 

431 ) 

432 del A_ind_arr 

433 logger.info( 

434 f"Convergence was reached for polarization: {obs['pol_names'][pol_i]} ({pol_i}) and frequency: {fi}, with a convergence of: {conv_test[fii, i]} and the threshold was: {conv_thresh}" 

435 ) 

436 gain_arr[fi, tile_use] = gain_curr 

437 nan_i = np.where(np.isnan(gain_curr))[0] 

438 if nan_i.size > 0: 438 ↛ 440line 438 didn't jump to line 440 because the condition on line 438 was never true

439 # any gains with NANs -> all tiles for that freq will have NANs 

440 freq_nan_i = nan_i % n_freq 

441 freq_nan_i = freq_nan_i[np.unique(freq_nan_i)] 

442 vis_weight_ptr_use[pol_i][:, freq_nan_i] = 0 

443 weight[:, freq_nan_i] = 0 

444 gain_arr[nan_i] = 0 

445 if tile_flag.size > 0: 

446 # set flagged tiles to NAN to remove from calculations 

447 gain_arr[:, tile_flag] = np.nan 

448 

449 cal["tile_flag"][tile_flag] = True 

450 cal["gain"][pol_i] = gain_arr 

451 cal["convergence"][pol_i] = convergence 

452 cal["n_converged"][pol_i] = n_converged 

453 cal["conv_iter"][pol_i] = conv_iter_arr 

454 

455 n_vis_cal = np.size(np.nonzero(weight)[0]) 

456 cal["n_vis_cal"] = n_vis_cal 

457 return cal