Coverage for PyFHD/beam_setup/beam_utils.py: 60%

166 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 10:58 +0800

1import numpy as np 

2from astropy.constants import c 

3from pyuvdata import BeamInterface, UVBeam 

4from pyuvdata.analytic_beam import AnalyticBeam 

5from PyFHD.pyfhd_tools.pyfhd_utils import histogram, region_grow 

6import h5py 

7from numpy.typing import NDArray 

8from scipy.ndimage import map_coordinates 

9 

10 

11def gaussian_decomp( 

12 x: np.ndarray, 

13 y: np.ndarray, 

14 p: np.ndarray, 

15 ftransform: bool = False, 

16 model_npix: float | None = None, 

17 model_res: float | None = None, 

18) -> tuple[np.ndarray, float, float]: 

19 """ 

20 Create an analytically built Gaussian decomposition of the beam on 

21 the supplied x-y grid given the input Gaussian parameters. If 

22 ftransform is True, then the analytic Fourier Transform of the input 

23 gaussians on the supplied x-y grid is returned. Any number of Gaussians 

24 can be supplied in the p vector. To transfer Gaussian parameters from 

25 a different grid to the current x-y grid, the model_npix and model_res 

26 parameters can be supplied. 

27 

28 Parameters 

29 ---------- 

30 x : np.ndarray 

31 X-axis vector of pixel numbers 

32 y : np.ndarray 

33 Y-axis vector of pixel numbers 

34 p : np.ndarray 

35 Gaussian parameter vector, ordered as amp, offset x, sigma x, offset y, 

36 sigma y per lobe 

37 ftransform : bool, optional 

38 Return the analytic Fourier Transform of the input gaussians on the supplied 

39 x-y grid, by default False 

40 model_npix : float | None, optional 

41 The number of pixels on an axis used to derive the input parameters to 

42 convert to the current x-y grid, by default None 

43 model_res : float | None, optional 

44 The grid resolution used to derive the input parameters to convert to 

45 the current grid resolution, by default None 

46 

47 Returns 

48 ------- 

49 tuple[np.ndarray, float, float] 

50 The Gaussian decomposition of the beam on the supplied x-y grid given the input 

51 Gaussian parameters, along with the analytic volume and squared volume of the 

52 Gaussian decomposition. 

53 """ 

54 decomp_beam = np.zeros([x.size, y.size]) 

55 i = 1j 

56 # Expand the p vector into readable names 

57 var = np.reshape(p, [p.size // 5, 5]) 

58 amp = var[:, 0] 

59 offset_x = var[:, 1] 

60 sigma_x = var[:, 2] 

61 offset_y = var[:, 3] 

62 sigma_y = var[:, 4] 

63 n_lobes = var[:, 0].size 

64 

65 # If the parameters were built on a different grid, then put on new grid 

66 # Npix only affects the offset params 

67 if model_npix is not None: 67 ↛ 77line 67 didn't jump to line 77 because the condition on line 67 was always true

68 if model_npix < x.size: 68 ↛ 73line 68 didn't jump to line 73 because the condition on line 68 was always true

69 offset = np.abs(x.size / 2 - model_npix / 2) 

70 offset_x += offset 

71 offset_y += offset 

72 else: 

73 offset = np.abs(model_npix / 2 - x.size / 2) 

74 offset_x -= offset 

75 offset_y -= offset 

76 # Resolution affects gaussian sigma and offsets 

77 if model_res is not None: 77 ↛ 83line 77 didn't jump to line 83 because the condition on line 77 was always true

78 sigma_x *= model_res 

79 sigma_y *= model_res 

80 offset_x = ((offset_x - x.size / 2) * model_res) + x.size / 2 

81 offset_y = ((offset_y - y.size / 2) * model_res) + y.size / 2 

82 

83 if not ftransform: 83 ↛ 93line 83 didn't jump to line 93 because the condition on line 83 was always true

84 for lobe in range(n_lobes): 

85 decomp_beam += amp[lobe] * np.outer( 

86 np.exp(-((y - offset_y[lobe]) ** 2) / (2 * sigma_y[lobe] ** 2)), 

87 np.exp(-((x - offset_x[lobe]) ** 2) / (2 * sigma_x[lobe] ** 2)), 

88 ) 

89 volume_beam = 0 

90 sq_volume_beam = 0 

91 else: 

92 # Full uv model with all the gaussian components 

93 decomp_beam = decomp_beam.astype(np.complex128) 

94 volume_beam = np.sum(amp) 

95 sq_volume_beam = np.pi * np.sum(sigma_x * sigma_y * amp**2) / (x.size * y.size) 

96 

97 offset_x -= x.size / 2 

98 offset_y -= y.size / 2 

99 

100 kx = np.outer(np.arange(x.size) - x.size / 2, np.ones(y.size)) 

101 ky = np.outer(np.ones(x.size), np.arange(y.size) - y.size / 2) 

102 decomp_beam += ( 

103 amp**2 

104 * np.pi 

105 / (x.size * y.size) 

106 * sigma_x 

107 * sigma_y 

108 * np.exp( 

109 ( 

110 2 * np.pi**2 / (x.size * y.size) * sigma_x**2 * kx**2 

111 + sigma_y**2 * ky**2 

112 ) 

113 - (2 * np.pi / x.size * 1j * (offset_x * kx + offset_y * ky)) 

114 ) 

115 ) 

116 

117 return decomp_beam, volume_beam, sq_volume_beam 

118 

119 

120def beam_image( 

121 psf: dict | h5py.File, 

122 obs: dict, 

123 pol_i: int, 

124 freq_i: int | None = None, 

125 abs=False, 

126 square=False, 

127) -> np.ndarray: 

128 """ 

129 Generates the average beam in image space for one polarization over all 

130 frequencies, or optionally for one frequency. The UV->sky transformation 

131 uses the inverse FFT for the beam, but the forward FFT for the image. 

132 This convention ensures the correct orientation of the UV-space beam 

133 for gridding visibilities. If the psf dictionary has Gaussian parameters, 

134 then the Gaussian decomposition is used to generate the analytic beam image. 

135 

136 Parameters 

137 ---------- 

138 psf : dict 

139 Beam dictionary 

140 obs : dict 

141 Observation metadata dictionary 

142 pol_i : int 

143 Index of the polarization to use 

144 freq_i : int 

145 Index of the frequency to use, by default None 

146 abs : bool, optional 

147 Return the absolute value of the beam image, by default False 

148 square : bool, optional 

149 Return the square of the beam image, by default False 

150 

151 Returns 

152 ------- 

153 beam_base : np.ndarray 

154 The average beam in image space for the specified polarization 

155 and all frequencies, or for a specific frequency if freq_i is set. 

156 """ 

157 

158 psf_dim = psf["dim"] 

159 freq_norm = psf["fnorm"] 

160 pix_horizon = psf["pix_horizon"] 

161 group_id = psf["id"][pol_i, 0, :] 

162 if "beam_gaussian_params" in psf: 162 ↛ 165line 162 didn't jump to line 165 because the condition on line 162 was always true

163 beam_gaussian_params = psf["beam_gaussian_params"][:] 

164 else: 

165 beam_gaussian_params = None 

166 rbin = 0 

167 # If we lazy loaded psf, get actual numbers out of the datasets 

168 if isinstance(psf, h5py.File): 168 ↛ 172line 168 didn't jump to line 172 because the condition on line 168 was always true

169 psf_dim = psf_dim[0] 

170 freq_norm = freq_norm[:] 

171 pix_horizon = pix_horizon[0] 

172 dimension = elements = obs["dimension"] 

173 xl = dimension / 2 - psf_dim / 2 + 1 

174 xh = dimension / 2 - psf_dim / 2 + psf_dim 

175 yl = elements / 2 - psf_dim / 2 + 1 

176 yh = elements / 2 - psf_dim / 2 + psf_dim 

177 

178 group_n, _, ri_id = histogram(group_id, min=0) 

179 gi_use = np.nonzero(group_n) 

180 # Most likely going to be 1 as PyFHD does only one beam mostly 

181 n_groups = np.count_nonzero(group_n) 

182 

183 if beam_gaussian_params is not None: 183 ↛ 192line 183 didn't jump to line 192 because the condition on line 183 was always true

184 # 1.3 is the padding factor for the gaussian fitting procedure 

185 # (2.*obs.kpix) is the ratio of full sky (2 in l,m) to the analysis range (1/obs.kpix) 

186 # (2.*obs.kpix*dimension/psf.pix_horizon) is the scale factor between the psf pixels-to-horizon and the 

187 # analysis pixels-to-horizon 

188 # (0.5/obs.kpix) is the resolution scaling of what the beam model was made at and the current res 

189 model_npix = pix_horizon * 1.3 

190 model_res = (2 * obs["kpix"] * dimension) / pix_horizon * (0.5 / obs["kpix"]) 

191 

192 freq_bin_i = obs["baseline_info"]["fbin_i"] 

193 freq_i_use = np.nonzero(obs["baseline_info"]["freq_use"])[0] 

194 n_bin_use = 0 

195 # We assume freq_i is an int when provided (i.e. a single frequency index) 

196 if freq_i is not None: 

197 freq_i_use = freq_i 

198 

199 if square: 

200 # Do note freq_i_use could be an integer or an array if freq_i is supplied or not 

201 beam_base = np.zeros([dimension, elements]) 

202 freq_bin_use = freq_bin_i[freq_i_use] 

203 fbin_use = np.sort(np.unique(freq_bin_use)) 

204 nbin = fbin_use.size 

205 

206 if beam_gaussian_params is not None: 206 ↛ 209line 206 didn't jump to line 209 because the condition on line 206 was always true

207 beam_single = np.zeros([dimension, elements]) 

208 else: 

209 beam_single = np.zeros([psf_dim, psf_dim], dtype=np.complex128) 

210 for bin_i in range(nbin): 

211 fbin = fbin_use[bin_i] 

212 nf_bin = np.count_nonzero(freq_bin_use == fbin) 

213 if beam_gaussian_params is not None: 213 ↛ 231line 213 didn't jump to line 231 because the condition on line 213 was always true

214 for gi in range(n_groups): 

215 # beam_gaussian_params needs to be copied here rather than 

216 # a view as interestingly gaussian_decomp affects the values 

217 # of the array used with the calculations done to var and no copies 

218 # are made, only views are adjusted. so we explcitly call copy 

219 params = beam_gaussian_params[pol_i, fbin, :].copy() 

220 gaussian = gaussian_decomp( 

221 np.arange(dimension), 

222 np.arange(elements), 

223 params, 

224 model_npix=model_npix, 

225 model_res=model_res, 

226 )[0] 

227 beam_single += gaussian * group_n[gi_use[gi]] 

228 beam_single /= np.sum(group_n[gi_use]) 

229 beam_base += nf_bin * beam_single**2 

230 else: 

231 for gi in range(n_groups): 

232 beam_single += ( 

233 psf["beam_ptr"][0, fbin, rbin, rbin] * group_n[gi_use[gi]] 

234 ).reshape([psf_dim, psf_dim]) 

235 beam_single /= np.sum(group_n[gi_use]) 

236 if abs: 

237 beam_single = np.abs(beam_single) 

238 beam_base_uv1 = np.zeros([dimension, elements], np.complex128) 

239 beam_base_uv1[xl : xh + 1, yl : yh + 1] = beam_single 

240 beam_base_single = np.fft.fftshift( 

241 np.fft.ifftn(np.fft.fftshift(beam_base_uv1)) 

242 ) 

243 beam_base += ( 

244 nf_bin * (beam_base_single * np.conjugate(beam_base_single)).real 

245 ) 

246 n_bin_use += nf_bin * freq_norm[fbin] 

247 else: 

248 nf_use = freq_i_use.size 

249 if beam_gaussian_params is not None: 249 ↛ 253line 249 didn't jump to line 253 because the condition on line 249 was always true

250 beam_base_uv = np.zeros([dimension, elements]) 

251 beam_single = np.zeros([dimension, elements]) 

252 else: 

253 beam_base_uv = np.zeros([psf_dim, psf_dim], dtype=np.complex128) 

254 beam_single = np.zeros([psf_dim, psf_dim], dtype=np.complex128) 

255 for f_idx in range(nf_use): 

256 fi = freq_i_use[f_idx] 

257 if freq_i is not None: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 if freq_i != fi: 

259 continue 

260 fbin = freq_bin_i[fi] 

261 beam_single[:, :] = 0 

262 if beam_gaussian_params is not None: 262 ↛ 274line 262 didn't jump to line 274 because the condition on line 262 was always true

263 for gi in range(n_groups): 

264 params = beam_gaussian_params[pol_i, fbin, :].copy() 

265 gaussian = gaussian_decomp( 

266 np.arange(dimension), 

267 np.arange(elements), 

268 params, 

269 model_npix=model_npix, 

270 model_res=model_res, 

271 )[0] 

272 beam_single += gaussian * group_n[gi_use[gi]] 

273 else: 

274 for gi in range(n_groups): 

275 beam_single += ( 

276 psf["beam_ptr"][0, fbin, rbin, rbin] * group_n[gi_use[gi]] 

277 ).reshape([psf_dim, psf_dim]) 

278 beam_single /= np.sum(group_n[gi_use]) 

279 beam_base_uv += beam_single 

280 n_bin_use += freq_norm[fbin] 

281 

282 if beam_gaussian_params is None: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 beam_base_uv1 = np.zeros([dimension, elements], dtype=np.complex128) 

284 beam_base_uv1[xl : xh + 1, yl : yh + 1] = beam_base_uv 

285 beam_base = np.fft.fftshift(np.fft.ifftn(np.fft.fftshift(beam_base_uv1))) 

286 else: 

287 beam_base = beam_base_uv 

288 

289 beam_base /= n_bin_use 

290 return beam_base.real 

291 

292 

293def beam_image_hyperresolved( 

294 antenna: dict, 

295 beam: UVBeam | AnalyticBeam, 

296 ant_pol_1: int, 

297 ant_pol_2: int, 

298 freq_i: int, 

299 zen_int_x: float, 

300 zen_int_y: float, 

301 psf: dict, 

302) -> NDArray[np.complexfloating]: 

303 """ 

304 Build the hyperresolved image-space beam power for a station/tile. Currently, this 

305 function assumes that the amplitude of the jones matrix response between two antennas, 

306 multiplied by the station/tile response, is the image beam power. This calculation 

307 may be suject to change in the future. 

308 

309 Parameters 

310 ---------- 

311 antenna : dict 

312 Antenna metadata dictionary 

313 beam : UVBeam | AnalyticBeam 

314 UVBeam or AnalyticBeam object containing the beam model and metadata. 

315 ant_pol_1 : int 

316 Polarization index for the first antenna 

317 ant_pol_2 : int 

318 Polarization index for the second antenna 

319 freq_i : int 

320 Frequency index for current iteration 

321 zen_int_x : float 

322 x pixel index of the zenith for the beam power image 

323 zen_int_y : float 

324 y pixel index of the zenith for the beam power image 

325 psf : dict 

326 Beam metadata dictionary 

327 

328 Returns 

329 ------- 

330 NDArray[np.complexfloating] 

331 An estimation of the image-space beam power, normalized to the zenith power. 

332 """ 

333 # FHD was designed to account for multiple antennas but in most cases only one was ever used 

334 # So we will just use the first antenna twice as I PyFHD does not support multiple antennas at this time, 

335 # If you want to use multiple antennas, please open an issue on the PyFHD GitHub repository or do the translation and/or 

336 # adjustments yourself. 

337 beam_ant_1 = antenna["response"][ant_pol_1, freq_i] 

338 beam_ant_2 = np.conjugate(antenna["response"][ant_pol_2, freq_i]) 

339 

340 # Amplitude of the response from "ant1" (again FHD takes more than one antenna) 

341 # is Sqrt(|J1[0,pol1]|^2 + |J1[1,pol1]|^2) 

342 amp_1 = ( 

343 np.abs(antenna["jones"][0, ant_pol_1]) ** 2 

344 + np.abs(antenna["jones"][1, ant_pol_1]) ** 2 

345 ) 

346 # Amplitude of the response from "ant2" (again FHD takes more than one antenna) 

347 # is Sqrt(|J2[0,pol2]|^2 + |J2[1,pol2]|^2) 

348 amp_2 = ( 

349 np.abs(antenna["jones"][0, ant_pol_2]) ** 2 

350 + np.abs(antenna["jones"][1, ant_pol_2]) ** 2 

351 ) 

352 # Amplitude of the baseline response is the product of the "two" antenna responses 

353 power_zenith_beam = np.sqrt(amp_1 * amp_2) 

354 

355 # Create one full-scale array 

356 image_power_beam = np.zeros([psf["dim"], psf["dim"]], dtype=np.complex128) 

357 

358 # Co-opt the array to calculate the power at zenith 

359 image_power_beam[antenna["pix_use"]] = power_zenith_beam 

360 # TODO: Work out the interpolation of the zenith power, it uses cubic interpolation 

361 # But the IDL Interpolate function in IDL uses an interpolation paramter of -0.5, where 

362 # scipy, numpy with their B-Splines seem to use a parameter of 0 by default with no way 

363 # to change it. 

364 # The interp is a placeholder for now, but it should be replaced with a proper 

365 # interpolation function that matches the IDL Interpolate function. 

366 # TODO: Replace with UVBeam interface object as_power_beam, then 

367 # compute_response at za 0 az 0 

368 # Initial trying out of using pyuvdata, not close at all. This is interpolating 

369 # the zenith power using the x and y pixel coordinates, to use pyuvdata likely need to do 

370 # pixel to ra/dec then to za/az 

371 power_zenith = np.interp(zen_int_x, zen_int_y, image_power_beam) 

372 

373 # Normalize the image power beam to the zenith 

374 image_power_beam[antenna["pix_use"]] = ( 

375 power_zenith_beam * beam_ant_1 * beam_ant_2 

376 ) / power_zenith 

377 

378 return image_power_beam 

379 

380 

381def beam_power( 

382 antenna: dict, 

383 beam: UVBeam | AnalyticBeam, 

384 ant_pol_1: int, 

385 ant_pol_2: int, 

386 freq_i: int, 

387 psf: dict, 

388 zen_int_x: float, 

389 zen_int_y: float, 

390 xvals_uv_superres: np.ndarray, 

391 yvals_uv_superres: np.ndarray, 

392 pyfhd_config: dict, 

393) -> NDArray[np.complexfloating]: 

394 """ 

395 Generate the hyperresolved image-space beam power to reduce aliasing artifacts, and 

396 fourier transform it to a specific grid in complex uv-space. Reduce artifacts further 

397 by applying an extremely low-level contiguous mask to the uv-space beam power and 

398 renomalizing the beam power to a volume of 1. 

399 

400 Parameters 

401 ---------- 

402 antenna : dict 

403 Antenna metadata dictionary 

404 beam : UVBeam | AnalyticBeam 

405 UVBeam or AnalyticBeam object containing the beam model and metadata. 

406 ant_pol_1 : int 

407 Polarization index for the first antenna 

408 ant_pol_2 : int 

409 Polarization index for the second antenna 

410 freq_i : int 

411 Frequency index for current iteration 

412 psf : dict 

413 Beam metadata dictionary 

414 zen_int_x : np.ndarray 

415 x pixel index of the zenith for the beam power image 

416 zen_int_y : np.ndarray 

417 y pixel index of the zenith for the beam power image 

418 xvals_uv_superres : np.ndarray 

419 A grid of the hyperresolved indices in the u direction for the uv-space beam power image 

420 yvals_uv_superres : np.ndarray 

421 A grid of the hyperresolved indices in the v direction for the uv-space beam power image 

422 pyfhd_config : dict 

423 The PyFHD configuration dictionary 

424 

425 Returns 

426 ------- 

427 NDArray[np.complexfloating] 

428 Hyperresolved uv-space of the beam power image, normalized to a volume of 1 

429 """ 

430 # For now we will ignore beam_gaussian_decomp and much of the debug keywords 

431 image_power_beam = beam_image_hyperresolved( 

432 antenna, 

433 beam, 

434 ant_pol_1, 

435 ant_pol_2, 

436 freq_i, 

437 zen_int_x, 

438 zen_int_y, 

439 psf, 

440 pyfhd_config, 

441 ) 

442 if pyfhd_config["kernel_window"]: 

443 image_power_beam *= antenna["pix_window"] 

444 psf_base_single = np.fft.fftshift(np.fft.ifftn(np.fft.fftshift(image_power_beam))) 

445 # TODO: Same cubic problem as in beam_image_hyperresolved here 

446 # Map Coordinates isn't the same as IDL Interpolate 

447 # But its closish, more of a placeholder for now. 

448 psf_base_superres = map_coordinates( 

449 psf_base_single, (xvals_uv_superres, yvals_uv_superres) 

450 ) 

451 

452 # Build a mask to create a well-defined finite beam 

453 uv_mask_superres = np.zeros( 

454 [[psf_base_superres.shape[0], psf_base_superres.shape[1]]], dtype=np.float64 

455 ) 

456 psf_mask_threshold_use = ( 

457 np.max(np.abs(psf_base_superres)) / pyfhd_config["beam_mask_threshold"] 

458 ) 

459 beam_i = region_grow( 

460 np.abs(psf_base_superres), 

461 psf["superres_dim"] * (1 + psf["superres_dim"]) / 2, 

462 low=psf_mask_threshold_use, 

463 high=np.max(np.abs(psf_base_superres)), 

464 ) 

465 uv_mask_superres[beam_i] = 1 

466 

467 # FFT normalization correction in case this changes the total number of pixels 

468 psf_base_superres *= psf["intermediate_res"] ** 2 

469 

470 """ 

471 total of the gaussian decomposition can be calculated analytically, but is an over-estimate  

472 of the numerical representation and results in a beam norm of greater than one, 

473 thus the discrete total is used 

474 """ 

475 psf_val_ref = np.sum(psf_base_superres) 

476 

477 # If you wish to add interpolate_beam_threshold functionality then do so here 

478 psf_base_superres *= uv_mask_superres 

479 

480 if pyfhd_config["beam_clip_floor"]: 

481 i_use = np.where(np.abs(psf_base_superres)) 

482 psf_amp = np.abs(psf_base_superres) 

483 psf_phase = np.arctan(psf_base_superres.imag / psf_base_superres.real) 

484 psf_floor = psf_mask_threshold_use * (psf["intermediate_res"] ** 2) 

485 psf_amp[i_use] -= psf_floor 

486 psf_base_superres = psf_amp * np.cos(psf_phase) + 1j * psf_amp * np.sin( 

487 psf_phase 

488 ) 

489 

490 psf_base_superres *= psf_val_ref / np.sum(psf_base_superres) 

491 

492 return psf_base_superres