Coverage for PyFHD/beam_setup/antenna.py: 0%
133 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
1import importlib_resources
2import numpy as np
3from numpy.typing import NDArray
4from logging import Logger
5from astropy.constants import c
6from astropy.coordinates import SkyCoord, EarthLocation
7from astropy import units
8from scipy.interpolate import interp1d
9from PyFHD.beam_setup.mwa import dipole_mutual_coupling
10from PyFHD.pyfhd_tools.unit_conv import pixel_to_radec, radec_to_altaz
11from pyuvdata import ShortDipoleBeam, BeamInterface, UVBeam
12from pyuvdata.telescopes import known_telescope_location
13from pyuvdata.analytic_beam import AnalyticBeam
14from typing import Literal
15from astropy.time import Time
18def init_beam(obs: dict, pyfhd_config: dict, logger: Logger) -> dict:
19 """
20 Build an antenna-specific metadata dictionary and a full station/tile power beam model
21 and dictionary. Currently, the jones matrix of the antenna is acquired through pyuvdata.
23 The antenna dictionary contains antenna parameters as well as response and jones matrices
24 that can be used to build a beam power response. The psf dictionary contains parameters
25 and options required to build a uv-response from that beam power with reduced aliasing
26 contamination.
28 Parameters
29 ----------
30 obs : dict
31 Observation metadata dictionary.
32 pyfhd_config : dict
33 PyFHD's configuration dictionary containing all the options for a run
34 logger : Logger
35 PyFHD's logger.
37 Returns
38 -------
39 antenna : dict
40 Antenna metadata dictionary, including jones and response matrices.
41 psf : dict
42 Beam metadata dictionary for the station/tile.
43 beam : UVBeam or AnalyticBeam
44 A pyuvdata beam, can be a UVBeam, and AnalyticBeam subclass, or a
45 BeamInterface object.
47 Raises
48 ------
49 FileNotFoundError
50 If the MWA beam file does not exist and needs to be downloaded.
51 ValueError
52 If the instrument is not supported or if the antenna configuration is invalid.
53 """
55 # Setup the constants and variables
56 # Almost all instruments have two instrumental polarizations (either linear or circular)
57 n_ant_pol = 2
58 frequency_array = obs["baseline_info"]["freq"]
59 freq_bin_i = obs["baseline_info"]["fbin_i"]
60 nfreq_bin = int(np.max(freq_bin_i)) + 1
61 tile_a = obs["baseline_info"]["tile_a"]
62 tile_b = obs["baseline_info"]["tile_b"]
63 ant_names = np.unique(tile_a[: obs["n_baselines"]])
64 if pyfhd_config["beam_offset_time"] is not None:
65 jdate_use = obs["jd0"] + pyfhd_config["beam_offset_time"] / 24 / 3600
66 else:
67 jdate_use = obs["jd0"]
69 freq_center = np.zeros(nfreq_bin)
70 interp_func = interp1d(freq_bin_i, frequency_array)
71 for fi in range(nfreq_bin):
72 fi_i = np.where(freq_bin_i == fi)[0]
73 if fi_i.size == 0:
74 freq_center[fi] = interp_func(fi)
75 else:
76 freq_center[fi] = np.median(frequency_array[fi_i])
78 antenna_size = {
79 "mwa": 5,
80 "hera": 14,
81 }
83 if pyfhd_config["instrument"] == "mwa":
84 # Get the antenna coordinates
85 n_dipoles = 16
86 antenna_spacing = 1.1
87 xc_arr, yc_arr = np.meshgrid(np.arange(4), np.arange(4))
88 xc_arr = xc_arr.flatten() * antenna_spacing
89 yc_arr = np.flipud(yc_arr).flatten() * antenna_spacing
90 zc_arr = np.zeros(n_dipoles)
91 coords = np.array([xc_arr, yc_arr, zc_arr])
92 # Get the delays
93 delays = obs["delays"] * 4.35e-10
94 if pyfhd_config["dipole_mutual_coupling_factor"]:
95 coupling = dipole_mutual_coupling(freq_center)
96 else:
97 coupling = np.tile(
98 np.identity(n_dipoles), (n_ant_pol, freq_center.size, 1, 1)
99 )
101 else:
102 n_dipoles = 1
103 coords = np.zeros((3, n_dipoles))
104 delays = np.zeros(n_dipoles)
105 coupling = np.tile(np.identity(n_dipoles), (n_ant_pol, freq_center.size, 1, 1))
107 # Create basic antenna dictionary
108 antenna = {
109 "n_pol": n_ant_pol,
110 "antenna_type": pyfhd_config["instrument"],
111 "size_meters": (
112 antenna_size[pyfhd_config["instrument"]]
113 if pyfhd_config["instrument"] in antenna_size
114 else 10
115 ),
116 "names": ant_names,
117 "beam_model_version": pyfhd_config["beam_model_version"],
118 "freq": freq_center,
119 "nfreq_bin": nfreq_bin,
120 "n_ant_elements": 0,
121 # Anything that was pointer arrays in IDL will be None until assigned in Python
122 "jones": None,
123 "coupling": coupling,
124 "gain": np.ones([n_ant_pol, freq_center.size, n_dipoles], dtype=np.float64),
125 "coords": coords,
126 "delays": delays,
127 "response": None,
128 # PyFHD supports one instrument at a time, so we setup the group so they're all in the same group.
129 "group_id": np.zeros([n_ant_pol, obs["n_tile"]], dtype=np.int8),
130 "pix_window": None,
131 "pix_use": None,
132 }
134 # Create the initial psf dict
135 psf = {
136 "dim": (
137 pyfhd_config["psf_dim"]
138 if pyfhd_config["psf_dim"]
139 else np.ceil(
140 (
141 antenna["size_meters"]
142 * 2
143 * np.max(obs["baseline_info"]["freq"])
144 / c.value
145 )
146 / obs["kpix"]
147 )
148 ),
149 "resolution": pyfhd_config["psf_resolution"],
150 # This is more of a placeholder, if we want PyFHD to support processing more than one instrument at a time we'll need to edit this to be calculated rather than hardcoded.
151 "id": np.zeros(
152 [obs["n_pol"], obs["n_freq"], obs["n_baselines"]], dtype=np.int64
153 ),
154 "beam_mask_threshold": pyfhd_config["beam_mask_threshold"],
155 "freq_norm": np.ones(obs["n_freq"], dtype=np.int64),
156 "image_resolution": 10, # Add psf_image_resolution to the config file
157 "fbin_i": obs["baseline_info"]["fbin_i"],
158 }
159 # Set up coordinates to generate the high uv resolution model.
160 # Remember that field of view = uv resolution, image pixel scale = uv span.
161 # So, the cropped uv span (psf_dim) means we do not need to calculate at full image resolution,
162 # while the increased uv resolution can correspond to super-horizon scales. We construct the beam model in
163 # image space, and while we don't need the full image resolution we need to avoid quantization errors that
164 # come in if we make too small an image and then take the FFT
165 psf["intermediate_res"] = np.min(
166 [np.ceil(np.sqrt(psf["resolution"]) / 2) * 2, psf["resolution"]]
167 )
168 # use a larger box to build the model than will ultimately be used, to
169 # allow higher resolution in the initial image space beam model
170 psf["image_dim"] = int(
171 psf["dim"] * psf["image_resolution"] * psf["intermediate_res"]
172 )
173 psf["scale"] = obs["dimension"] * psf["intermediate_res"] / psf["image_dim"]
174 psf["pix_horizon"] = obs["dimension"] / psf["scale"]
175 psf["superres_dim"] = psf["dim"] * psf["resolution"]
177 location = EarthLocation.of_site(obs["instrument"])
179 # Get the zenith angle and azimuth angle arrays
180 xvals_celestial, yvals_celestial = np.meshgrid(
181 np.arange(psf["image_dim"]),
182 np.arange(psf["image_dim"]),
183 )
184 xvals_celestial = (
185 xvals_celestial * psf["scale"]
186 - psf["image_dim"] * psf["scale"] / 2
187 + obs["obsx"]
188 )
189 yvals_celestial = (
190 yvals_celestial * psf["scale"]
191 - psf["image_dim"] * psf["scale"] / 2
192 + obs["obsy"]
193 )
194 ra_arr, dec_arr = pixel_to_radec(xvals_celestial, yvals_celestial, obs["astr"])
195 del xvals_celestial, yvals_celestial
196 valid_i = np.where(np.isfinite(ra_arr))
197 ra_arr = ra_arr[valid_i]
198 dec_arr = dec_arr[valid_i]
199 alt_arr, az_arr = radec_to_altaz(
200 ra_arr.value,
201 dec_arr.value,
202 location.lat.value,
203 location.lon.value,
204 location.height.value,
205 jdate_use,
206 )
207 zenith_angle_arr = np.full([psf["image_dim"], psf["image_dim"]], 90)
208 zenith_angle_arr[valid_i] = 90 - alt_arr.value
209 # Initialize the azimuth angle array in degrees
210 azimuth_arr = np.zeros([psf["image_dim"], psf["image_dim"]])
211 azimuth_arr[valid_i] = az_arr.value
212 # Save some memory by deleting the unused arrays
213 del ra_arr, dec_arr, alt_arr, az_arr
215 if pyfhd_config["instrument"] == "mwa":
216 mwa_beam_file = importlib_resources.files(
217 "PyFHD.resources.instrument_config"
218 ).joinpath("mwa_full_embedded_element_pattern.h5")
219 if not mwa_beam_file.exists():
220 # Download the MWA beam file if it does not exist
221 raise FileNotFoundError(
222 f"MWA beam file {mwa_beam_file} does not exist. "
223 "Please download it from http://ws.mwatelescope.org/static/mwa_full_embedded_element_pattern.h5 into the."
224 f"directory {mwa_beam_file.parent}"
225 )
226 beam = UVBeam.from_file(mwa_beam_file, delays=obs["delays"])
227 # If you wish to add a different insturment, do it by adding a new elif here
228 else:
229 # Do an analytic beam as a placeholder
230 beam = ShortDipoleBeam()
232 # Get the jones matrix for the antenna
233 antenna["jones"] = general_jones_matrix(
234 beam,
235 za_array=zenith_angle_arr,
236 az_array=azimuth_arr,
237 freq_array=frequency_array,
238 telescope_location=location,
239 )
241 # Get the antenna response
242 antenna["response"] = general_antenna_response(
243 obs,
244 antenna,
245 za_arr=zenith_angle_arr,
246 az_arr=azimuth_arr,
247 )
249 return antenna, psf, beam
252def general_jones_matrix(
253 beam_obj: UVBeam | AnalyticBeam | BeamInterface,
254 za_array: np.ndarray[float] | None = None,
255 alt_array: np.ndarray[float] | None = None,
256 az_array: np.ndarray[float] | None = None,
257 ra_array: np.ndarray[float] | None = None,
258 dec_array: np.ndarray[float] | None = None,
259 az_convention: Literal["east of north", "north of east"] = "east of north",
260 frame: str = "icrs",
261 time: Time | None = None,
262 telescope_location: EarthLocation | None = None,
263 freq_array: np.ndarray[float] | None = None,
264 spline_opts: dict | None = None,
265 check_azza_domain: bool = True,
266) -> NDArray[np.complexfloating]:
267 """
268 Get beam values from a pyuvdata beam for a set of directions on the sky.
270 Accepts zenith angle and azimuth, altitude and aziumth or RA/Dec arrays
271 along with the associated frame and astropy Time and EarthLocation objects.
272 Azimuth convention is specified using the `az_convention` parameter,
273 options are "north of east" (the UVBeam convention) or "east of north"
274 (the astropy alt/az frame convention and the FHD convention).
276 Parameters
277 ----------
278 beam_obj : UVBeam or AnalyticBeam or BeamInterface
279 A pyuvdata beam, can be a UVBeam, and AnalyticBeam subclass, or a
280 BeamInterface object.
281 alt_array : np.ndarray[float]
282 Array of altitudes (also called elevations) in radians. Must be a 1D array.
283 za_array : np.ndarray[float]
284 Array of zenith angles (zenith is zero, horizon is 90 degrees). Must be
285 a 1D array.
286 az_array : np.ndarray[float]
287 Array of azimuths in radians. Defined according to the az_convention parameter.
288 Must be a 1D array.
289 ra_array : np.ndarray[float]
290 Array of right ascensions in radians. Must be a 1D array.
291 dec_array : np.ndarray[float]
292 Array of declinations in radians. Must be a 1D array.
293 az_convention : str
294 either "east of north" N=0, E=90 degrees or "north of east" E=0, N=90 degrees.
295 frame : str
296 The frame for RA and Dec, ignored if alt/az are provided. Must be a frame
297 known to astropy.
298 time : astropy.time.Time
299 Astropy Time object specifying the center of the observation time. Used
300 for converting RA/Dec to AltAz, ignored if alt/az are provided.
301 telescope_location : astropy.coordinates.EarthLocation
302 Astropy EarthLocation object specifying the telescope location. Used
303 for converting RA/Dec to AltAz, ignored if alt/az are provided.
304 freq_array : np.ndarray[float]
305 Frequencies to get the beam response for in Hz. Requried for analytic beams,
306 defaults to the frequencies defined on the beam object for UVBeams.
307 spline_opts : dict
308 Provide options to numpy.RectBivariateSpline. This includes spline
309 order parameters `kx` and `ky`, and smoothing parameter `s`. Only
310 applies if beam is a UVBeam.
311 check_azza_domain : bool
312 Whether to check the domain of az/za to ensure that they are covered by the
313 intrinsic data array. Checking them can be quite computationally expensive.
314 Conversely, if the passed az/za are outside of the domain, they will be
315 silently extrapolated and the behavior is not well-defined. Only
316 applies if beam is a UVBeam. Should be set to False if it is known that
317 the beam covers the whole sky.
319 Returns
320 -------
321 NDArray[np.complexfloating]
322 An array of computed values, shape (number of vector directions (usually 2),
323 number of feeds (usually 2), number of frequencies, number of directions).
324 The first axis indexes over the polarization vector components, generally
325 aligned with the azimuthal then zenith angle directions. The second axis
326 indexes over the feeds (order defined in the beam feed array).
327 """
328 alt_az_in = alt_array is not None and az_array is not None
329 za_az_in = za_array is not None and az_array is not None
330 ra_dec_in = np.all(
331 [var is not None for var in [ra_array, dec_array, time, telescope_location]]
332 )
334 if not alt_az_in and not za_az_in and not ra_dec_in:
335 raise ValueError(
336 "Either alt_array and az_array must be provided or ra_array, dec_array, "
337 "time and telescope_location must all be provided."
338 )
340 allowed_az_convention = ["east of north", "north of east"]
341 if (alt_az_in or za_az_in) and (az_convention not in allowed_az_convention):
342 raise ValueError(
343 f"az_convention must be one of {allowed_az_convention}. "
344 f"It was {az_convention}."
345 )
347 # FHD requires an Efield beam, so set it here to be explicit
348 beam = BeamInterface(beam_obj, beam_type="efield")
350 if ra_dec_in:
351 if ra_array.shape != dec_array.shape:
352 raise ValueError("ra_array and dec_array must have the same shape")
354 # convert to alt/az
355 skycoord = SkyCoord(
356 ra=ra_array * units.rad,
357 dec=dec_array * units.rad,
358 frame=frame,
359 obstime=time,
360 location=telescope_location,
361 ).transform_to("altaz")
363 alt_array = skycoord.alt.to("rad").value
364 az_array = skycoord.az.to("rad").value
365 az_convention = "east of north"
366 elif alt_az_in:
367 if alt_array.shape != az_array.shape:
368 raise ValueError("alt_array and az_array must have the same shape")
369 else:
370 if za_array.shape != az_array.shape:
371 raise ValueError("za_array and az_array must have the same shape")
373 if alt_az_in or ra_dec_in:
374 za_array = np.pi / 2 - alt_array
376 if az_convention == "east of north":
377 noe_az_array = np.pi / 2 - az_array
378 else:
379 noe_az_array = az_array
381 # Wrap the azimuth array to [0, 2pi] to match the extent of the UVBeam azimuth
382 where_neg_az = np.nonzero(noe_az_array < 0)
383 noe_az_array[where_neg_az] = noe_az_array[where_neg_az] + np.pi * 2.0
385 # use the faster interpolation method if appropriate
386 if beam._isuvbeam and beam.beam.pixel_coordinate_system == "az_za":
387 interpol_fn = "az_za_map_coordinates"
388 else:
389 interpol_fn = None
391 return beam.compute_response(
392 az_array=noe_az_array,
393 za_array=za_array,
394 freq_array=freq_array,
395 interpolation_function=interpol_fn,
396 spline_opts=spline_opts,
397 check_azza_domain=check_azza_domain,
398 )
401def general_antenna_response(
402 obs: dict,
403 antenna: dict,
404 za_arr: NDArray[np.floating],
405 az_arr: NDArray[np.floating],
406) -> NDArray[np.complexfloating]:
407 """
408 Calculate the response of a set of antennas for a given observation and antenna configuration,
409 including the electrical delays and coupling.
411 Parameters
412 ----------
413 obs : dict
414 Observation metadata dictionary
415 antenna : dict
416 Antenna metadata dictionary
417 za_arr : NDArray[np.floating]
418 Zenith angle array in radians
419 az_arr : NDArray[np.floating]
420 Azimuth angle array in radians
422 Returns
423 -------
424 response
425 The unpolarised response of a set of antennas
426 """
427 light_speed = c.value
429 """
430 Given that in FHD the antenna response is a pointer array of shape (antenna["n_pol", obs["n_tile"])
431 where each pointer is an array of pointers of shape (antenna["n_freq_bin"]). Each pointer in the array
432 of shape (antenna["n_freq_bin"]) points to a complex array of shape (antenna["pix_use"].size,).
434 Furthermore, when the antenna response is calculated, it looks like this is done on a per frequency bin
435 basis and each tile will point to the same antenna response for that frequency bin. This means we can ignore
436 the tile dimension and just calculate the antenna response for each frequency bin and polarization to save
437 memory in Python.
438 """
439 response = np.zeros(
440 [antenna["n_pol"], antenna["nfreq_bin"], antenna["pix_use"].size],
441 dtype=np.complex128,
442 )
444 # Calculate projections only at locations of non-zero pixels
445 proj_east_use = np.sin(za_arr[antenna["pix_use"]]) * np.sin(
446 az_arr[antenna["pix_use"]]
447 )
448 proj_north_use = np.sin(za_arr[antenna["pix_use"]]) * np.cos(
449 az_arr[antenna["pix_use"]]
450 )
451 proj_z_use = np.cos(za_arr[antenna["pix_use"]])
453 # FHD assumes you might be dealing with more than one antenna, hence the groupings it used.
454 # PyFHD currently only supports one antenna, so we can ignore the groupings.
455 for pol_i in range(antenna["n_pol"]):
456 # Phase of each dipole for the source (relative to the beamformer settings)
457 D_d = (
458 np.outer(antenna["coords"][0], proj_east_use)
459 + np.outer(antenna["coords"][1], proj_north_use)
460 + np.outer(antenna["coords"][2], proj_z_use)
461 )
463 for freq_i in range(antenna["nfreq_bin"]):
464 Kconv = 2 * np.pi * antenna["freq"][freq_i] / light_speed
465 voltage_delay = np.exp(
466 1j
467 * 2
468 * np.pi
469 * antenna["delays"]
470 * antenna["freq"][freq_i]
471 * antenna["gain"][pol_i, freq_i]
472 )
473 # TODO: Check if it's actually outer, although it does look like voltage_delay is likely 1D
474 measured_current = np.outer(
475 voltage_delay, antenna["coupling"][pol_i, freq_i]
476 )
477 zenith_norm = np.outer(
478 np.ones(antenna["n_ant_elements"]),
479 antenna["coupling"][pol_i, freq_i],
480 )
481 measured_current /= zenith_norm
483 # TODO: This loop can probably be vectorized
484 for ii in range(antenna["n_ant_elements"]):
485 # TODO: check the way D_d needs to be indexed
486 antenna_gain_arr = np.exp(-1j * Kconv * D_d[ii, :])
487 response[pol_i, freq_i] += (
488 antenna_gain_arr * measured_current[ii] / antenna["n_ant_elements"]
489 )
491 return response