Coverage for PyFHD/beam_setup/beam_utils.py: 60%
166 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
1import numpy as np
2from astropy.constants import c
3from pyuvdata import BeamInterface, UVBeam
4from pyuvdata.analytic_beam import AnalyticBeam
5from PyFHD.pyfhd_tools.pyfhd_utils import histogram, region_grow
6import h5py
7from numpy.typing import NDArray
8from scipy.ndimage import map_coordinates
11def gaussian_decomp(
12 x: np.ndarray,
13 y: np.ndarray,
14 p: np.ndarray,
15 ftransform: bool = False,
16 model_npix: float | None = None,
17 model_res: float | None = None,
18) -> tuple[np.ndarray, float, float]:
19 """
20 Create an analytically built Gaussian decomposition of the beam on
21 the supplied x-y grid given the input Gaussian parameters. If
22 ftransform is True, then the analytic Fourier Transform of the input
23 gaussians on the supplied x-y grid is returned. Any number of Gaussians
24 can be supplied in the p vector. To transfer Gaussian parameters from
25 a different grid to the current x-y grid, the model_npix and model_res
26 parameters can be supplied.
28 Parameters
29 ----------
30 x : np.ndarray
31 X-axis vector of pixel numbers
32 y : np.ndarray
33 Y-axis vector of pixel numbers
34 p : np.ndarray
35 Gaussian parameter vector, ordered as amp, offset x, sigma x, offset y,
36 sigma y per lobe
37 ftransform : bool, optional
38 Return the analytic Fourier Transform of the input gaussians on the supplied
39 x-y grid, by default False
40 model_npix : float | None, optional
41 The number of pixels on an axis used to derive the input parameters to
42 convert to the current x-y grid, by default None
43 model_res : float | None, optional
44 The grid resolution used to derive the input parameters to convert to
45 the current grid resolution, by default None
47 Returns
48 -------
49 tuple[np.ndarray, float, float]
50 The Gaussian decomposition of the beam on the supplied x-y grid given the input
51 Gaussian parameters, along with the analytic volume and squared volume of the
52 Gaussian decomposition.
53 """
54 decomp_beam = np.zeros([x.size, y.size])
55 i = 1j
56 # Expand the p vector into readable names
57 var = np.reshape(p, [p.size // 5, 5])
58 amp = var[:, 0]
59 offset_x = var[:, 1]
60 sigma_x = var[:, 2]
61 offset_y = var[:, 3]
62 sigma_y = var[:, 4]
63 n_lobes = var[:, 0].size
65 # If the parameters were built on a different grid, then put on new grid
66 # Npix only affects the offset params
67 if model_npix is not None: 67 ↛ 77line 67 didn't jump to line 77 because the condition on line 67 was always true
68 if model_npix < x.size: 68 ↛ 73line 68 didn't jump to line 73 because the condition on line 68 was always true
69 offset = np.abs(x.size / 2 - model_npix / 2)
70 offset_x += offset
71 offset_y += offset
72 else:
73 offset = np.abs(model_npix / 2 - x.size / 2)
74 offset_x -= offset
75 offset_y -= offset
76 # Resolution affects gaussian sigma and offsets
77 if model_res is not None: 77 ↛ 83line 77 didn't jump to line 83 because the condition on line 77 was always true
78 sigma_x *= model_res
79 sigma_y *= model_res
80 offset_x = ((offset_x - x.size / 2) * model_res) + x.size / 2
81 offset_y = ((offset_y - y.size / 2) * model_res) + y.size / 2
83 if not ftransform: 83 ↛ 93line 83 didn't jump to line 93 because the condition on line 83 was always true
84 for lobe in range(n_lobes):
85 decomp_beam += amp[lobe] * np.outer(
86 np.exp(-((y - offset_y[lobe]) ** 2) / (2 * sigma_y[lobe] ** 2)),
87 np.exp(-((x - offset_x[lobe]) ** 2) / (2 * sigma_x[lobe] ** 2)),
88 )
89 volume_beam = 0
90 sq_volume_beam = 0
91 else:
92 # Full uv model with all the gaussian components
93 decomp_beam = decomp_beam.astype(np.complex128)
94 volume_beam = np.sum(amp)
95 sq_volume_beam = np.pi * np.sum(sigma_x * sigma_y * amp**2) / (x.size * y.size)
97 offset_x -= x.size / 2
98 offset_y -= y.size / 2
100 kx = np.outer(np.arange(x.size) - x.size / 2, np.ones(y.size))
101 ky = np.outer(np.ones(x.size), np.arange(y.size) - y.size / 2)
102 decomp_beam += (
103 amp**2
104 * np.pi
105 / (x.size * y.size)
106 * sigma_x
107 * sigma_y
108 * np.exp(
109 (
110 2 * np.pi**2 / (x.size * y.size) * sigma_x**2 * kx**2
111 + sigma_y**2 * ky**2
112 )
113 - (2 * np.pi / x.size * 1j * (offset_x * kx + offset_y * ky))
114 )
115 )
117 return decomp_beam, volume_beam, sq_volume_beam
120def beam_image(
121 psf: dict | h5py.File,
122 obs: dict,
123 pol_i: int,
124 freq_i: int | None = None,
125 abs=False,
126 square=False,
127) -> np.ndarray:
128 """
129 Generates the average beam in image space for one polarization over all
130 frequencies, or optionally for one frequency. The UV->sky transformation
131 uses the inverse FFT for the beam, but the forward FFT for the image.
132 This convention ensures the correct orientation of the UV-space beam
133 for gridding visibilities. If the psf dictionary has Gaussian parameters,
134 then the Gaussian decomposition is used to generate the analytic beam image.
136 Parameters
137 ----------
138 psf : dict
139 Beam dictionary
140 obs : dict
141 Observation metadata dictionary
142 pol_i : int
143 Index of the polarization to use
144 freq_i : int
145 Index of the frequency to use, by default None
146 abs : bool, optional
147 Return the absolute value of the beam image, by default False
148 square : bool, optional
149 Return the square of the beam image, by default False
151 Returns
152 -------
153 beam_base : np.ndarray
154 The average beam in image space for the specified polarization
155 and all frequencies, or for a specific frequency if freq_i is set.
156 """
158 psf_dim = psf["dim"]
159 freq_norm = psf["fnorm"]
160 pix_horizon = psf["pix_horizon"]
161 group_id = psf["id"][pol_i, 0, :]
162 if "beam_gaussian_params" in psf: 162 ↛ 165line 162 didn't jump to line 165 because the condition on line 162 was always true
163 beam_gaussian_params = psf["beam_gaussian_params"][:]
164 else:
165 beam_gaussian_params = None
166 rbin = 0
167 # If we lazy loaded psf, get actual numbers out of the datasets
168 if isinstance(psf, h5py.File): 168 ↛ 172line 168 didn't jump to line 172 because the condition on line 168 was always true
169 psf_dim = psf_dim[0]
170 freq_norm = freq_norm[:]
171 pix_horizon = pix_horizon[0]
172 dimension = elements = obs["dimension"]
173 xl = dimension / 2 - psf_dim / 2 + 1
174 xh = dimension / 2 - psf_dim / 2 + psf_dim
175 yl = elements / 2 - psf_dim / 2 + 1
176 yh = elements / 2 - psf_dim / 2 + psf_dim
178 group_n, _, ri_id = histogram(group_id, min=0)
179 gi_use = np.nonzero(group_n)
180 # Most likely going to be 1 as PyFHD does only one beam mostly
181 n_groups = np.count_nonzero(group_n)
183 if beam_gaussian_params is not None: 183 ↛ 192line 183 didn't jump to line 192 because the condition on line 183 was always true
184 # 1.3 is the padding factor for the gaussian fitting procedure
185 # (2.*obs.kpix) is the ratio of full sky (2 in l,m) to the analysis range (1/obs.kpix)
186 # (2.*obs.kpix*dimension/psf.pix_horizon) is the scale factor between the psf pixels-to-horizon and the
187 # analysis pixels-to-horizon
188 # (0.5/obs.kpix) is the resolution scaling of what the beam model was made at and the current res
189 model_npix = pix_horizon * 1.3
190 model_res = (2 * obs["kpix"] * dimension) / pix_horizon * (0.5 / obs["kpix"])
192 freq_bin_i = obs["baseline_info"]["fbin_i"]
193 freq_i_use = np.nonzero(obs["baseline_info"]["freq_use"])[0]
194 n_bin_use = 0
195 # We assume freq_i is an int when provided (i.e. a single frequency index)
196 if freq_i is not None:
197 freq_i_use = freq_i
199 if square:
200 # Do note freq_i_use could be an integer or an array if freq_i is supplied or not
201 beam_base = np.zeros([dimension, elements])
202 freq_bin_use = freq_bin_i[freq_i_use]
203 fbin_use = np.sort(np.unique(freq_bin_use))
204 nbin = fbin_use.size
206 if beam_gaussian_params is not None: 206 ↛ 209line 206 didn't jump to line 209 because the condition on line 206 was always true
207 beam_single = np.zeros([dimension, elements])
208 else:
209 beam_single = np.zeros([psf_dim, psf_dim], dtype=np.complex128)
210 for bin_i in range(nbin):
211 fbin = fbin_use[bin_i]
212 nf_bin = np.count_nonzero(freq_bin_use == fbin)
213 if beam_gaussian_params is not None: 213 ↛ 231line 213 didn't jump to line 231 because the condition on line 213 was always true
214 for gi in range(n_groups):
215 # beam_gaussian_params needs to be copied here rather than
216 # a view as interestingly gaussian_decomp affects the values
217 # of the array used with the calculations done to var and no copies
218 # are made, only views are adjusted. so we explcitly call copy
219 params = beam_gaussian_params[pol_i, fbin, :].copy()
220 gaussian = gaussian_decomp(
221 np.arange(dimension),
222 np.arange(elements),
223 params,
224 model_npix=model_npix,
225 model_res=model_res,
226 )[0]
227 beam_single += gaussian * group_n[gi_use[gi]]
228 beam_single /= np.sum(group_n[gi_use])
229 beam_base += nf_bin * beam_single**2
230 else:
231 for gi in range(n_groups):
232 beam_single += (
233 psf["beam_ptr"][0, fbin, rbin, rbin] * group_n[gi_use[gi]]
234 ).reshape([psf_dim, psf_dim])
235 beam_single /= np.sum(group_n[gi_use])
236 if abs:
237 beam_single = np.abs(beam_single)
238 beam_base_uv1 = np.zeros([dimension, elements], np.complex128)
239 beam_base_uv1[xl : xh + 1, yl : yh + 1] = beam_single
240 beam_base_single = np.fft.fftshift(
241 np.fft.ifftn(np.fft.fftshift(beam_base_uv1))
242 )
243 beam_base += (
244 nf_bin * (beam_base_single * np.conjugate(beam_base_single)).real
245 )
246 n_bin_use += nf_bin * freq_norm[fbin]
247 else:
248 nf_use = freq_i_use.size
249 if beam_gaussian_params is not None: 249 ↛ 253line 249 didn't jump to line 253 because the condition on line 249 was always true
250 beam_base_uv = np.zeros([dimension, elements])
251 beam_single = np.zeros([dimension, elements])
252 else:
253 beam_base_uv = np.zeros([psf_dim, psf_dim], dtype=np.complex128)
254 beam_single = np.zeros([psf_dim, psf_dim], dtype=np.complex128)
255 for f_idx in range(nf_use):
256 fi = freq_i_use[f_idx]
257 if freq_i is not None: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true
258 if freq_i != fi:
259 continue
260 fbin = freq_bin_i[fi]
261 beam_single[:, :] = 0
262 if beam_gaussian_params is not None: 262 ↛ 274line 262 didn't jump to line 274 because the condition on line 262 was always true
263 for gi in range(n_groups):
264 params = beam_gaussian_params[pol_i, fbin, :].copy()
265 gaussian = gaussian_decomp(
266 np.arange(dimension),
267 np.arange(elements),
268 params,
269 model_npix=model_npix,
270 model_res=model_res,
271 )[0]
272 beam_single += gaussian * group_n[gi_use[gi]]
273 else:
274 for gi in range(n_groups):
275 beam_single += (
276 psf["beam_ptr"][0, fbin, rbin, rbin] * group_n[gi_use[gi]]
277 ).reshape([psf_dim, psf_dim])
278 beam_single /= np.sum(group_n[gi_use])
279 beam_base_uv += beam_single
280 n_bin_use += freq_norm[fbin]
282 if beam_gaussian_params is None: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 beam_base_uv1 = np.zeros([dimension, elements], dtype=np.complex128)
284 beam_base_uv1[xl : xh + 1, yl : yh + 1] = beam_base_uv
285 beam_base = np.fft.fftshift(np.fft.ifftn(np.fft.fftshift(beam_base_uv1)))
286 else:
287 beam_base = beam_base_uv
289 beam_base /= n_bin_use
290 return beam_base.real
293def beam_image_hyperresolved(
294 antenna: dict,
295 beam: UVBeam | AnalyticBeam,
296 ant_pol_1: int,
297 ant_pol_2: int,
298 freq_i: int,
299 zen_int_x: float,
300 zen_int_y: float,
301 psf: dict,
302) -> NDArray[np.complexfloating]:
303 """
304 Build the hyperresolved image-space beam power for a station/tile. Currently, this
305 function assumes that the amplitude of the jones matrix response between two antennas,
306 multiplied by the station/tile response, is the image beam power. This calculation
307 may be suject to change in the future.
309 Parameters
310 ----------
311 antenna : dict
312 Antenna metadata dictionary
313 beam : UVBeam | AnalyticBeam
314 UVBeam or AnalyticBeam object containing the beam model and metadata.
315 ant_pol_1 : int
316 Polarization index for the first antenna
317 ant_pol_2 : int
318 Polarization index for the second antenna
319 freq_i : int
320 Frequency index for current iteration
321 zen_int_x : float
322 x pixel index of the zenith for the beam power image
323 zen_int_y : float
324 y pixel index of the zenith for the beam power image
325 psf : dict
326 Beam metadata dictionary
328 Returns
329 -------
330 NDArray[np.complexfloating]
331 An estimation of the image-space beam power, normalized to the zenith power.
332 """
333 # FHD was designed to account for multiple antennas but in most cases only one was ever used
334 # So we will just use the first antenna twice as I PyFHD does not support multiple antennas at this time,
335 # If you want to use multiple antennas, please open an issue on the PyFHD GitHub repository or do the translation and/or
336 # adjustments yourself.
337 beam_ant_1 = antenna["response"][ant_pol_1, freq_i]
338 beam_ant_2 = np.conjugate(antenna["response"][ant_pol_2, freq_i])
340 # Amplitude of the response from "ant1" (again FHD takes more than one antenna)
341 # is Sqrt(|J1[0,pol1]|^2 + |J1[1,pol1]|^2)
342 amp_1 = (
343 np.abs(antenna["jones"][0, ant_pol_1]) ** 2
344 + np.abs(antenna["jones"][1, ant_pol_1]) ** 2
345 )
346 # Amplitude of the response from "ant2" (again FHD takes more than one antenna)
347 # is Sqrt(|J2[0,pol2]|^2 + |J2[1,pol2]|^2)
348 amp_2 = (
349 np.abs(antenna["jones"][0, ant_pol_2]) ** 2
350 + np.abs(antenna["jones"][1, ant_pol_2]) ** 2
351 )
352 # Amplitude of the baseline response is the product of the "two" antenna responses
353 power_zenith_beam = np.sqrt(amp_1 * amp_2)
355 # Create one full-scale array
356 image_power_beam = np.zeros([psf["dim"], psf["dim"]], dtype=np.complex128)
358 # Co-opt the array to calculate the power at zenith
359 image_power_beam[antenna["pix_use"]] = power_zenith_beam
360 # TODO: Work out the interpolation of the zenith power, it uses cubic interpolation
361 # But the IDL Interpolate function in IDL uses an interpolation paramter of -0.5, where
362 # scipy, numpy with their B-Splines seem to use a parameter of 0 by default with no way
363 # to change it.
364 # The interp is a placeholder for now, but it should be replaced with a proper
365 # interpolation function that matches the IDL Interpolate function.
366 # TODO: Replace with UVBeam interface object as_power_beam, then
367 # compute_response at za 0 az 0
368 # Initial trying out of using pyuvdata, not close at all. This is interpolating
369 # the zenith power using the x and y pixel coordinates, to use pyuvdata likely need to do
370 # pixel to ra/dec then to za/az
371 power_zenith = np.interp(zen_int_x, zen_int_y, image_power_beam)
373 # Normalize the image power beam to the zenith
374 image_power_beam[antenna["pix_use"]] = (
375 power_zenith_beam * beam_ant_1 * beam_ant_2
376 ) / power_zenith
378 return image_power_beam
381def beam_power(
382 antenna: dict,
383 beam: UVBeam | AnalyticBeam,
384 ant_pol_1: int,
385 ant_pol_2: int,
386 freq_i: int,
387 psf: dict,
388 zen_int_x: float,
389 zen_int_y: float,
390 xvals_uv_superres: np.ndarray,
391 yvals_uv_superres: np.ndarray,
392 pyfhd_config: dict,
393) -> NDArray[np.complexfloating]:
394 """
395 Generate the hyperresolved image-space beam power to reduce aliasing artifacts, and
396 fourier transform it to a specific grid in complex uv-space. Reduce artifacts further
397 by applying an extremely low-level contiguous mask to the uv-space beam power and
398 renomalizing the beam power to a volume of 1.
400 Parameters
401 ----------
402 antenna : dict
403 Antenna metadata dictionary
404 beam : UVBeam | AnalyticBeam
405 UVBeam or AnalyticBeam object containing the beam model and metadata.
406 ant_pol_1 : int
407 Polarization index for the first antenna
408 ant_pol_2 : int
409 Polarization index for the second antenna
410 freq_i : int
411 Frequency index for current iteration
412 psf : dict
413 Beam metadata dictionary
414 zen_int_x : np.ndarray
415 x pixel index of the zenith for the beam power image
416 zen_int_y : np.ndarray
417 y pixel index of the zenith for the beam power image
418 xvals_uv_superres : np.ndarray
419 A grid of the hyperresolved indices in the u direction for the uv-space beam power image
420 yvals_uv_superres : np.ndarray
421 A grid of the hyperresolved indices in the v direction for the uv-space beam power image
422 pyfhd_config : dict
423 The PyFHD configuration dictionary
425 Returns
426 -------
427 NDArray[np.complexfloating]
428 Hyperresolved uv-space of the beam power image, normalized to a volume of 1
429 """
430 # For now we will ignore beam_gaussian_decomp and much of the debug keywords
431 image_power_beam = beam_image_hyperresolved(
432 antenna,
433 beam,
434 ant_pol_1,
435 ant_pol_2,
436 freq_i,
437 zen_int_x,
438 zen_int_y,
439 psf,
440 pyfhd_config,
441 )
442 if pyfhd_config["kernel_window"]:
443 image_power_beam *= antenna["pix_window"]
444 psf_base_single = np.fft.fftshift(np.fft.ifftn(np.fft.fftshift(image_power_beam)))
445 # TODO: Same cubic problem as in beam_image_hyperresolved here
446 # Map Coordinates isn't the same as IDL Interpolate
447 # But its closish, more of a placeholder for now.
448 psf_base_superres = map_coordinates(
449 psf_base_single, (xvals_uv_superres, yvals_uv_superres)
450 )
452 # Build a mask to create a well-defined finite beam
453 uv_mask_superres = np.zeros(
454 [[psf_base_superres.shape[0], psf_base_superres.shape[1]]], dtype=np.float64
455 )
456 psf_mask_threshold_use = (
457 np.max(np.abs(psf_base_superres)) / pyfhd_config["beam_mask_threshold"]
458 )
459 beam_i = region_grow(
460 np.abs(psf_base_superres),
461 psf["superres_dim"] * (1 + psf["superres_dim"]) / 2,
462 low=psf_mask_threshold_use,
463 high=np.max(np.abs(psf_base_superres)),
464 )
465 uv_mask_superres[beam_i] = 1
467 # FFT normalization correction in case this changes the total number of pixels
468 psf_base_superres *= psf["intermediate_res"] ** 2
470 """
471 total of the gaussian decomposition can be calculated analytically, but is an over-estimate
472 of the numerical representation and results in a beam norm of greater than one,
473 thus the discrete total is used
474 """
475 psf_val_ref = np.sum(psf_base_superres)
477 # If you wish to add interpolate_beam_threshold functionality then do so here
478 psf_base_superres *= uv_mask_superres
480 if pyfhd_config["beam_clip_floor"]:
481 i_use = np.where(np.abs(psf_base_superres))
482 psf_amp = np.abs(psf_base_superres)
483 psf_phase = np.arctan(psf_base_superres.imag / psf_base_superres.real)
484 psf_floor = psf_mask_threshold_use * (psf["intermediate_res"] ** 2)
485 psf_amp[i_use] -= psf_floor
486 psf_base_superres = psf_amp * np.cos(psf_phase) + 1j * psf_amp * np.sin(
487 psf_phase
488 )
490 psf_base_superres *= psf_val_ref / np.sum(psf_base_superres)
492 return psf_base_superres