Coverage for PyFHD/calibration/vis_calibrate_subroutine.py: 89%
234 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
1import numpy as np
2from numpy.typing import NDArray
3import warnings
4from logging import Logger
5from PyFHD.calibration.calibration_utils import calculate_adaptive_gain
6from PyFHD.pyfhd_tools.pyfhd_utils import weight_invert, histogram, idl_median
9def vis_calibrate_subroutine(
10 vis_arr: NDArray[np.complex128],
11 vis_model_ptr: NDArray[np.complex128],
12 vis_weight_ptr: NDArray[np.float64],
13 obs: dict,
14 cal: dict,
15 params: dict,
16 pyfhd_config: dict,
17 logger: Logger,
18 calibration_weights=False,
19 no_ref_tile=False,
20):
21 """
22 Perform a linear least-squares regression between the data visilbities and the simulated model
23 visiblities to solve for an amplitude and phase for each tile for each frequency channel.
25 Parameters
26 ----------
27 vis_arr : NDArray[np.complex128]
28 Uncalibrated data visiblities
29 vis_model_ptr : NDArray[np.complex128]
30 Simulated model visibilites
31 vis_weight_ptr : NDArray[np.float64]
32 Weights (flags) of the visibilities
33 obs : dict
34 Observation metadata dictionary
35 cal : dict
36 Calibration dictionary
37 pyfhd_config : dict
38 PyFHD's configuration dictionary containing all the options set for a PyFHD run
39 calibration_weights : bool, optional
40 Weight the visibilities at the minimum and maximum baseline by a soft taper for
41 calibration purposes only, by default False
42 no_ref_tile : bool, optional
43 Do not reference calibration phases at this stage, by default False
45 Returns
46 -------
47 cal : dict
48 Calibration dictionary
50 Raises
51 ------
52 ValueError
53 Should almost never happen, only gets raised for max_cal_iter has been set to less than 5, however
54 since it's currently hardcoded in here it's more of an exception for any developers of PyFHD
55 """
56 # Retrieve values from data structures
57 # There is a few hardcoded values in here that were previously hardcoded in fhd_struct_init_cal
58 # If you wish to change them, add them to the pyfhd_config through pyfhd_setup and the config in pyfhd.yaml
59 reference_tile = 1
60 min_baseline = obs["min_baseline"]
61 max_baseline = obs["max_baseline"]
62 dimension = obs["dimension"]
63 elements = obs["elements"]
64 min_cal_baseline = (
65 max(pyfhd_config["min_cal_baseline"], obs["min_baseline"])
66 if pyfhd_config["min_cal_baseline"]
67 else obs["min_baseline"]
68 )
69 max_cal_baseline = (
70 min(pyfhd_config["max_cal_baseline"], obs["max_baseline"])
71 if pyfhd_config["max_cal_baseline"]
72 else obs["max_baseline"]
73 )
74 # minimum number of calibration equations needed to solve for the gain of one baseline
75 min_cal_solutions = 5
76 # For record keeping
77 cal["min_cal_solutions"] = min_cal_solutions
78 # average the visibilities across time steps before solving for the gains
79 time_average = pyfhd_config["cal_time_average"]
80 # maximum iterations to perform for the linear least-squares solver
81 max_cal_iter = pyfhd_config["max_cal_iter"]
82 # Leave a warning if its less than 5 iterations, or an Error if its less than 1
83 if max_cal_iter < 5: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true
84 warnings.warn(
85 "At Least 5 calibrations iterations is recommended.\nYou're currently using {} iterations".format(
86 int(max_cal_iter)
87 )
88 )
89 elif max_cal_iter < 1: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 raise ValueError(
91 "max_cal_iter should be 1 or more. A max_cal_iter of 5 or more is recommended"
92 )
93 conv_thresh = pyfhd_config["cal_convergence_threshold"]
94 use_adaptive_gain = pyfhd_config["cal_adaptive_calibration_gain"]
95 base_gain = pyfhd_config["cal_base_gain"]
96 # halt if the strict convergence is worse than most of the last x iterations
97 divergence_history = 3
98 # halt if the convergence gets significantly worse by a factor of x in one iteration
99 divergence_factor = 1.5
100 # For record keeping
101 cal["divergence_history"] = 3
102 cal["divergence_factor"] = 1.5
103 n_pol = cal["n_pol"]
104 n_freq = obs["n_freq"]
105 n_tile = obs["n_tile"]
106 n_time = obs["n_time"]
107 # weights WILL be over-written! (Only for NAN gain solutions)
108 vis_weight_ptr_use = vis_weight_ptr
109 # tile_a & tile_b contribution indexed from 0
110 tile_A_i = obs["baseline_info"]["tile_a"] - 1
111 tile_B_i = obs["baseline_info"]["tile_b"] - 1
112 freq_arr = obs["baseline_info"]["freq"]
113 n_baselines = obs["n_baselines"]
114 if pyfhd_config["cal_phase_fit_iter"]: 114 ↛ 118line 114 didn't jump to line 118 because the condition on line 114 was always true
115 phase_fit_iter = pyfhd_config["cal_phase_fit_iter"]
116 else:
117 # This gets set multiple times in FHD, by default it's 4, and is set in fhd_struct_init_cal
118 phase_fit_iter = np.min([np.floor(max_cal_iter / 4), 4])
119 kbinsize = obs["kpix"]
121 cal["convergence"] = np.zeros([n_pol, n_freq, n_tile])
122 cal["conv_iter"] = np.zeros([n_pol, n_freq, n_tile])
123 cal["n_converged"] = np.zeros(n_pol)
124 cal["tile_flag"] = np.zeros([n_tile], dtype=np.bool)
125 for pol_i in range(n_pol):
126 logger.info(
127 f"Beginning Calibration for polarization {pol_i} ({obs['pol_names'][pol_i]})"
128 )
129 convergence = np.zeros((n_freq, n_tile))
130 conv_iter_arr = np.zeros((n_freq, n_tile))
131 # Want to ensure we're not affecting the current array till we overwrite it
132 gain_arr = cal["gain"][pol_i].copy()
134 # Average the visibilities over the time steps before solving for the gains solutions
135 # This is not recommended, as longer baselines will be downweighted artifically.
136 if time_average:
137 # The visibilities have dimension nfreq x (n_baselines x n_time),
138 # which can be reformed to nfreq x n_baselines x n_time
139 tile_A_i = tile_A_i[0:n_baselines]
140 tile_B_i = tile_B_i[0:n_baselines]
141 # So IDL does reforms as REFORM(x, cols, rows, num_of_col_row_arrays)
142 # Python is row-major, so we need to flip that shape that is used in REFORM
143 # Although, to get the same results in the shape we want we need to do a few
144 # transposes
145 shape = np.array([n_time, n_baselines, n_freq])
146 # Transpose here so it's in the same shape as IDL when reshaping, then transpose
147 # back to get the shape we need to suit the shape of the rest of the arrays
148 # I suspect this is because the reshape has no guarantee of the memory layout
149 # at the end, which means we swap indexes of values that is not consistent with IDL
150 vis_weight_use_reshaped = np.reshape(vis_weight_ptr_use[pol_i].T, shape).T
151 vis_weight_use = np.maximum(vis_weight_use_reshaped, 0)
152 vis_weight_use = np.minimum(vis_weight_use, 1)
153 vis_model = np.reshape(vis_model_ptr[pol_i].T, shape).T
154 vis_model = np.sum(vis_model * vis_weight_use, axis=2)
155 vis_measured = np.reshape(vis_arr[pol_i].T, shape).T
156 vis_avg = np.sum(vis_measured * vis_weight_use, axis=2)
157 weight = np.sum(vis_weight_use, axis=2)
159 kx_arr = params["uu"][0:n_baselines] / kbinsize
160 ky_arr = params["vv"][0:n_baselines] / kbinsize
161 else:
162 # In the case of not using a time_average do the following setup instead for weight and vis_avg
163 vis_weight_use = np.maximum(0, vis_weight_ptr_use[pol_i])
164 vis_weight_use = np.minimum(vis_weight_use, 1)
165 vis_model = vis_model_ptr[pol_i] * vis_weight_use
166 vis_avg = vis_arr[pol_i] * vis_weight_use
167 weight = vis_weight_use
169 kx_arr = params["uu"] / kbinsize
170 ky_arr = params["vv"] / kbinsize
171 # Now use the common code from the two possibilities in vis_calibrate_subroutine.pro
172 kr_arr = np.sqrt(kx_arr**2 + ky_arr**2)
173 # When IDL does a matrix multiply on two 1D vectors it does the outer product.
174 dist_arr = np.outer(kr_arr, freq_arr).T * kbinsize
175 xcen = np.outer(abs(kx_arr), freq_arr).T
176 ycen = np.outer(abs(ky_arr), freq_arr).T
177 if calibration_weights:
178 flag_dist_cut = np.where(
179 (dist_arr < min_baseline)
180 | (xcen > (elements / 2))
181 | (ycen > (dimension / 2))
182 )
183 if min_cal_baseline > min_baseline: 183 ↛ 188line 183 didn't jump to line 188 because the condition on line 183 was always true
184 taper_min = np.maximum(
185 (np.sqrt(2) * min_cal_baseline - dist_arr) / min_cal_baseline, 0
186 )
187 else:
188 taper_min = 0
189 if max_cal_baseline < max_baseline: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 taper_max = np.maximum(
191 (dist_arr - max_cal_baseline) / min_cal_baseline, 0
192 )
193 else:
194 taper_max = 0
195 baseline_weights = np.maximum(1 - (taper_min + taper_max) ** 2, 0)
196 else:
197 flag_dist_cut = np.where(
198 (dist_arr < min_cal_baseline)
199 | (dist_arr > max_cal_baseline)
200 | (xcen > (elements / 2))
201 | (ycen > (dimension / 2))
202 )
203 # Remove kx_arr, ky_arr and dist_arr from the namespace, allow garbage collector to do its work
204 del (kx_arr, ky_arr, dist_arr)
206 if np.size(flag_dist_cut) > 0: 206 ↛ 208line 206 didn't jump to line 208 because the condition on line 206 was always true
207 weight[flag_dist_cut] = 0
208 vis_avg *= weight_invert(weight)
209 vis_model *= weight_invert(weight)
211 tile_use_flag = obs["baseline_info"]["tile_use"]
212 freq_use_flag = obs["baseline_info"]["freq_use"]
214 freq_weight = np.sum(weight, axis=1)
215 baseline_weight = np.sum(weight, axis=0)
216 freq_use = np.where((freq_weight > 0) & (freq_use_flag > 0))[0]
217 baseline_use = np.nonzero(baseline_weight)
218 hist_tile_A, _, riA = histogram(tile_A_i[baseline_use], min=0, max=n_tile - 1)
219 hist_tile_B, _, riB = histogram(tile_B_i[baseline_use], min=0, max=n_tile - 1)
220 tile_use = np.where(((hist_tile_A + hist_tile_B) > 0) & (tile_use_flag > 0))[0]
221 tile_flag = np.where(((hist_tile_A + hist_tile_B) == 0) & (tile_use_flag == 0))[
222 0
223 ]
225 # Should be able to reduce precision if memory is a concern
226 tile_A_i_use = np.zeros(np.size(baseline_use), dtype=np.int64)
227 tile_B_i_use = np.zeros(np.size(baseline_use), dtype=np.int64)
228 for tile_i in range(np.size(tile_use)):
229 if hist_tile_A[tile_use[tile_i]] > 0:
230 # Calculate tile contributions for each non-flagged baseline
231 tile_A_i_use[riA[riA[tile_use[tile_i]] : riA[tile_use[tile_i] + 1]]] = (
232 tile_i
233 )
234 if hist_tile_B[tile_use[tile_i]] > 0:
235 # Calculate tile contributions for each non-flagged baseline
236 tile_B_i_use[riB[riB[tile_use[tile_i]] : riB[tile_use[tile_i] + 1]]] = (
237 tile_i
238 )
240 ref_tile_use = np.where(reference_tile == tile_use)
241 if ref_tile_use[0].size == 0: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 ref_tile_use = 0
243 cal["ref_antenna"] = tile_use[ref_tile_use]
244 cal["ref_antenna_name"] = obs["baseline_info"]["tile_names"][
245 cal["ref_antenna"]
246 ]
247 else:
248 # Extract out value to avoid any weird stuff happening
249 ref_tile_use = ref_tile_use[0][0]
250 # Replace all NaNs with 0's
251 vis_model[np.isnan(vis_model)] = 0
253 conv_test = np.zeros((freq_use.size, max_cal_iter))
254 n_converged = 0
255 for fii in range(freq_use.size):
256 fi = freq_use[fii]
257 gain_curr = gain_arr[fi, tile_use]
258 # Set up data and model arrays of the original and conjugated versions. This
259 # provides twice as many equations into the linear least-squares solver.
260 vis_data2 = np.squeeze(vis_avg[fi, baseline_use])
261 vis_data2 = np.hstack([vis_data2, np.conj(vis_data2)])
262 vis_model2 = np.squeeze(vis_model[fi, baseline_use])
263 vis_model2 = np.hstack([vis_model2, np.conj(vis_model2)])
264 weight2 = np.squeeze(weight[fi, baseline_use])
265 weight2 = np.hstack([weight2, weight2])
266 if calibration_weights:
267 baseline_wts2 = np.squeeze(baseline_weights[fi, baseline_use])
268 # Concatenate two of the same array, why?
269 baseline_wts2 = np.hstack([baseline_wts2, baseline_wts2])
271 b_i_use = np.where(weight2 > 0)
272 weight2 = weight2[b_i_use]
273 vis_data2 = vis_data2[b_i_use]
274 vis_model2 = vis_model2[b_i_use]
276 A_ind = np.hstack([tile_A_i_use, tile_B_i_use])
277 A_ind = A_ind[b_i_use]
278 B_ind = np.hstack([tile_B_i_use, tile_A_i_use])
279 B_ind = B_ind[b_i_use]
281 A_ind_arr = []
282 n_arr = np.zeros(tile_use.size)
283 for tile_i in range(tile_use.size):
284 inds = np.where(A_ind == tile_i)[0]
285 if inds.size > 1: 285 ↛ 288line 285 didn't jump to line 288 because the condition on line 285 was always true
286 A_ind_arr.append(np.reshape(inds, (inds.size, 1)))
287 else:
288 A_ind_arr.append(-1)
289 # NEED SOMETHING MORE IN CASE INDIVIDUAL TILES ARE FLAGGED FOR ONLY A FEW FREQUENCIES!!
290 n_arr[tile_i] = inds.size
291 # I suspect a list of lists maybe faster than the object array, check during optimization
292 # Although I doubt it will make a huge difference.
293 A_ind_arr = np.array(A_ind_arr, dtype=object)
294 # For tiles which don't satisfy the minimum number of solutions, pre-emptively set them to 0
295 # in order to prevent certain failure in meeting strict convergence threshold
296 inds_min_cal = np.where(n_arr < min_cal_solutions)[0]
297 if inds_min_cal.size > 0: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true
298 gain_curr[inds_min_cal] = 0
299 gain_new = np.zeros(tile_use.size, dtype=np.complex128)
300 convergence_list = np.zeros(max_cal_iter)
301 conv_gain_list = np.zeros(max_cal_iter)
302 convergence_loose = 0
303 for i in range(max_cal_iter):
304 convergence_loose_prev = convergence_loose
305 divergence_flag = 0
306 vis_use = vis_data2
308 vis_model_matrix = vis_model2 * np.conj(gain_curr[B_ind])
309 for tile_i in range(tile_use.size):
310 if n_arr[tile_i] >= min_cal_solutions: 310 ↛ 309line 310 didn't jump to line 309 because the condition on line 310 was always true
311 if calibration_weights:
312 xmat = vis_model_matrix[A_ind_arr[tile_i]]
313 # For some reason IDL multiplcation just allows two arrays of
314 # very dissimilar sizes to be multiplied by just ignoring everything
315 # after the index of the smallest one!?! Could be worth checking this is
316 # what this line was meant to do.
317 xmat = xmat.flatten()
318 xmat_dag = np.conj(xmat) * baseline_wts2[0 : xmat.size]
319 gain_new[tile_i] = (
320 1
321 / np.dot(xmat, xmat_dag)
322 * np.dot(
323 np.transpose(vis_use[A_ind_arr[tile_i]]), xmat_dag
324 )
325 )[0]
326 else:
327 gain_new[tile_i] = np.linalg.lstsq(
328 vis_model_matrix[A_ind_arr[tile_i]],
329 vis_use[A_ind_arr[tile_i]],
330 rcond=None,
331 )[0][0][0]
333 gain_old = gain_curr.copy()
334 if np.sum(np.abs(gain_new)) == 0: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true
335 gain_curr = gain_new.copy()
336 # Break the loop
337 break
338 if phase_fit_iter - i > 0:
339 # fit only phase at first
340 gain_new *= np.abs(gain_old) * weight_invert(np.abs(gain_new))
341 if use_adaptive_gain:
342 conv_gain = calculate_adaptive_gain(
343 conv_gain_list,
344 convergence_list,
345 i,
346 base_gain,
347 final_convergence_estimate=0,
348 )
349 else:
350 conv_gain = base_gain
351 gain_curr = (gain_new * conv_gain + gain_old * base_gain) / (
352 base_gain + conv_gain
353 )
354 dgain = np.abs(gain_curr) * weight_invert(np.abs(gain_old))
355 diverge_i = np.where(dgain < np.abs(gain_old) / 2)[0]
356 if diverge_i.size > 0:
357 gain_curr[diverge_i] = (
358 gain_new[diverge_i] + gain_old[diverge_i] * 2
359 ) / 3
360 if np.size(np.where(np.isnan(gain_curr))) > 0: 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true
361 gain_curr[np.where(np.isnan(gain_curr))] = gain_old[
362 np.where(np.isnan(gain_curr))
363 ]
364 if not no_ref_tile: 364 ↛ 368line 364 didn't jump to line 368 because the condition on line 364 was always true
365 gain_curr *= np.conj(gain_curr[ref_tile_use]) / np.abs(
366 gain_curr[ref_tile_use]
367 )
368 convergence_strict = np.max(
369 np.abs(gain_curr - gain_old) * weight_invert(np.abs(gain_old))
370 )
371 convergence_loose = np.mean(
372 np.abs(gain_curr - gain_old) * weight_invert(np.abs(gain_old))
373 )
374 convergence_list[i] = convergence_strict
375 conv_test[fii, i] = convergence_strict
376 if i > phase_fit_iter + divergence_history:
377 if convergence_strict <= conv_thresh:
378 # Stop if the solution has converged to the specified threshold
379 n_converged += 1
380 break
381 if convergence_loose >= convergence_loose_prev:
382 # Stop if the solutions are no longer converging
383 if (convergence_loose <= conv_thresh) or (
384 convergence_loose_prev <= conv_thresh
385 ):
386 n_converged += 1
387 if convergence_loose <= conv_thresh:
388 # If the previous solution met the threshold, but the current one did not, then
389 # back up one iteration and use the previous solution
390 gain_curr = gain_old.copy()
391 convergence[fi, tile_use] = conv_test[fii, i - 1]
392 conv_iter_arr[fi, tile_use] = i - 1
393 break
394 else:
395 # Halt if the strict convergence is worse than most of the recent iterations
396 # Needed to use IDL_MEDIAN here as the numpy median was producing values different
397 # from IDL as the even keyword was not used. This need to may change over time as
398 # the calibration gets adjusted for double precision
399 divergence_test_1 = convergence_strict >= idl_median(
400 conv_test[fii, i - divergence_history - 1 : i]
401 )
402 # Also halt if the convergence gets significantly worse in one iteration
403 divergence_test_2 = (
404 convergence_strict
405 >= np.min(conv_test[fii, 0:i]) * divergence_factor
406 )
407 # possible bug fix; should we really test for divergence when only
408 # fitting the phase? Fix below doesn't use the phase-only portion
409 # of fitting when checking for divergence
410 # divergence_test_2 = convergence_strict >= np.min(conv_test[int(phase_fit_iter): i, fii]) * divergence_factor
411 if divergence_test_1 or divergence_test_2:
412 # If both measures of convergence are getting worse, we need to stop.
413 logger.info(
414 f"Calibration diverged at iteration: {i}, for pol_i: {pol_i}, freq_i: {fi}. Convergence was: {conv_test[fii, i - 1]} and the threshold was: {conv_thresh}"
415 )
416 divergence_flag = True
417 break
418 if divergence_flag:
419 # If the solution diverged, back up one iteration and use the previous solution
420 gain_curr = gain_old.copy()
421 convergence[fi, tile_use] = conv_test[fii, i - 1]
422 conv_iter_arr[fi, tile_use] = i - 1
423 else:
424 convergence[fi, tile_use] = np.abs(
425 gain_curr - gain_old
426 ) * weight_invert(np.abs(gain_old))
427 conv_iter_arr[fi, tile_use] = i
428 if i == max_cal_iter: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true
429 logger.info(
430 f"Calibration reach max iterations before converging for pol_i: {pol_i} and freq_i: {fi}. Convergence was: {conv_test[fii, i - 1]} and the threshold was: {conv_thresh}"
431 )
432 del A_ind_arr
433 logger.info(
434 f"Convergence was reached for polarization: {obs['pol_names'][pol_i]} ({pol_i}) and frequency: {fi}, with a convergence of: {conv_test[fii, i]} and the threshold was: {conv_thresh}"
435 )
436 gain_arr[fi, tile_use] = gain_curr
437 nan_i = np.where(np.isnan(gain_curr))[0]
438 if nan_i.size > 0: 438 ↛ 440line 438 didn't jump to line 440 because the condition on line 438 was never true
439 # any gains with NANs -> all tiles for that freq will have NANs
440 freq_nan_i = nan_i % n_freq
441 freq_nan_i = freq_nan_i[np.unique(freq_nan_i)]
442 vis_weight_ptr_use[pol_i][:, freq_nan_i] = 0
443 weight[:, freq_nan_i] = 0
444 gain_arr[nan_i] = 0
445 if tile_flag.size > 0:
446 # set flagged tiles to NAN to remove from calculations
447 gain_arr[:, tile_flag] = np.nan
449 cal["tile_flag"][tile_flag] = True
450 cal["gain"][pol_i] = gain_arr
451 cal["convergence"][pol_i] = convergence
452 cal["n_converged"][pol_i] = n_converged
453 cal["conv_iter"][pol_i] = conv_iter_arr
455 n_vis_cal = np.size(np.nonzero(weight)[0])
456 cal["n_vis_cal"] = n_vis_cal
457 return cal