Coverage for PyFHD/beam_setup/antenna.py: 0%

133 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 10:58 +0800

1import importlib_resources 

2import numpy as np 

3from numpy.typing import NDArray 

4from logging import Logger 

5from astropy.constants import c 

6from astropy.coordinates import SkyCoord, EarthLocation 

7from astropy import units 

8from scipy.interpolate import interp1d 

9from PyFHD.beam_setup.mwa import dipole_mutual_coupling 

10from PyFHD.pyfhd_tools.unit_conv import pixel_to_radec, radec_to_altaz 

11from pyuvdata import ShortDipoleBeam, BeamInterface, UVBeam 

12from pyuvdata.telescopes import known_telescope_location 

13from pyuvdata.analytic_beam import AnalyticBeam 

14from typing import Literal 

15from astropy.time import Time 

16 

17 

18def init_beam(obs: dict, pyfhd_config: dict, logger: Logger) -> dict: 

19 """ 

20 Build an antenna-specific metadata dictionary and a full station/tile power beam model 

21 and dictionary. Currently, the jones matrix of the antenna is acquired through pyuvdata. 

22 

23 The antenna dictionary contains antenna parameters as well as response and jones matrices 

24 that can be used to build a beam power response. The psf dictionary contains parameters 

25 and options required to build a uv-response from that beam power with reduced aliasing 

26 contamination. 

27 

28 Parameters 

29 ---------- 

30 obs : dict 

31 Observation metadata dictionary. 

32 pyfhd_config : dict 

33 PyFHD's configuration dictionary containing all the options for a run 

34 logger : Logger 

35 PyFHD's logger. 

36 

37 Returns 

38 ------- 

39 antenna : dict 

40 Antenna metadata dictionary, including jones and response matrices. 

41 psf : dict 

42 Beam metadata dictionary for the station/tile. 

43 beam : UVBeam or AnalyticBeam 

44 A pyuvdata beam, can be a UVBeam, and AnalyticBeam subclass, or a 

45 BeamInterface object. 

46 

47 Raises 

48 ------ 

49 FileNotFoundError 

50 If the MWA beam file does not exist and needs to be downloaded. 

51 ValueError 

52 If the instrument is not supported or if the antenna configuration is invalid. 

53 """ 

54 

55 # Setup the constants and variables 

56 # Almost all instruments have two instrumental polarizations (either linear or circular) 

57 n_ant_pol = 2 

58 frequency_array = obs["baseline_info"]["freq"] 

59 freq_bin_i = obs["baseline_info"]["fbin_i"] 

60 nfreq_bin = int(np.max(freq_bin_i)) + 1 

61 tile_a = obs["baseline_info"]["tile_a"] 

62 tile_b = obs["baseline_info"]["tile_b"] 

63 ant_names = np.unique(tile_a[: obs["n_baselines"]]) 

64 if pyfhd_config["beam_offset_time"] is not None: 

65 jdate_use = obs["jd0"] + pyfhd_config["beam_offset_time"] / 24 / 3600 

66 else: 

67 jdate_use = obs["jd0"] 

68 

69 freq_center = np.zeros(nfreq_bin) 

70 interp_func = interp1d(freq_bin_i, frequency_array) 

71 for fi in range(nfreq_bin): 

72 fi_i = np.where(freq_bin_i == fi)[0] 

73 if fi_i.size == 0: 

74 freq_center[fi] = interp_func(fi) 

75 else: 

76 freq_center[fi] = np.median(frequency_array[fi_i]) 

77 

78 antenna_size = { 

79 "mwa": 5, 

80 "hera": 14, 

81 } 

82 

83 if pyfhd_config["instrument"] == "mwa": 

84 # Get the antenna coordinates 

85 n_dipoles = 16 

86 antenna_spacing = 1.1 

87 xc_arr, yc_arr = np.meshgrid(np.arange(4), np.arange(4)) 

88 xc_arr = xc_arr.flatten() * antenna_spacing 

89 yc_arr = np.flipud(yc_arr).flatten() * antenna_spacing 

90 zc_arr = np.zeros(n_dipoles) 

91 coords = np.array([xc_arr, yc_arr, zc_arr]) 

92 # Get the delays 

93 delays = obs["delays"] * 4.35e-10 

94 if pyfhd_config["dipole_mutual_coupling_factor"]: 

95 coupling = dipole_mutual_coupling(freq_center) 

96 else: 

97 coupling = np.tile( 

98 np.identity(n_dipoles), (n_ant_pol, freq_center.size, 1, 1) 

99 ) 

100 

101 else: 

102 n_dipoles = 1 

103 coords = np.zeros((3, n_dipoles)) 

104 delays = np.zeros(n_dipoles) 

105 coupling = np.tile(np.identity(n_dipoles), (n_ant_pol, freq_center.size, 1, 1)) 

106 

107 # Create basic antenna dictionary 

108 antenna = { 

109 "n_pol": n_ant_pol, 

110 "antenna_type": pyfhd_config["instrument"], 

111 "size_meters": ( 

112 antenna_size[pyfhd_config["instrument"]] 

113 if pyfhd_config["instrument"] in antenna_size 

114 else 10 

115 ), 

116 "names": ant_names, 

117 "beam_model_version": pyfhd_config["beam_model_version"], 

118 "freq": freq_center, 

119 "nfreq_bin": nfreq_bin, 

120 "n_ant_elements": 0, 

121 # Anything that was pointer arrays in IDL will be None until assigned in Python 

122 "jones": None, 

123 "coupling": coupling, 

124 "gain": np.ones([n_ant_pol, freq_center.size, n_dipoles], dtype=np.float64), 

125 "coords": coords, 

126 "delays": delays, 

127 "response": None, 

128 # PyFHD supports one instrument at a time, so we setup the group so they're all in the same group. 

129 "group_id": np.zeros([n_ant_pol, obs["n_tile"]], dtype=np.int8), 

130 "pix_window": None, 

131 "pix_use": None, 

132 } 

133 

134 # Create the initial psf dict 

135 psf = { 

136 "dim": ( 

137 pyfhd_config["psf_dim"] 

138 if pyfhd_config["psf_dim"] 

139 else np.ceil( 

140 ( 

141 antenna["size_meters"] 

142 * 2 

143 * np.max(obs["baseline_info"]["freq"]) 

144 / c.value 

145 ) 

146 / obs["kpix"] 

147 ) 

148 ), 

149 "resolution": pyfhd_config["psf_resolution"], 

150 # This is more of a placeholder, if we want PyFHD to support processing more than one instrument at a time we'll need to edit this to be calculated rather than hardcoded. 

151 "id": np.zeros( 

152 [obs["n_pol"], obs["n_freq"], obs["n_baselines"]], dtype=np.int64 

153 ), 

154 "beam_mask_threshold": pyfhd_config["beam_mask_threshold"], 

155 "freq_norm": np.ones(obs["n_freq"], dtype=np.int64), 

156 "image_resolution": 10, # Add psf_image_resolution to the config file 

157 "fbin_i": obs["baseline_info"]["fbin_i"], 

158 } 

159 # Set up coordinates to generate the high uv resolution model. 

160 # Remember that field of view = uv resolution, image pixel scale = uv span. 

161 # So, the cropped uv span (psf_dim) means we do not need to calculate at full image resolution, 

162 # while the increased uv resolution can correspond to super-horizon scales. We construct the beam model in 

163 # image space, and while we don't need the full image resolution we need to avoid quantization errors that 

164 # come in if we make too small an image and then take the FFT 

165 psf["intermediate_res"] = np.min( 

166 [np.ceil(np.sqrt(psf["resolution"]) / 2) * 2, psf["resolution"]] 

167 ) 

168 # use a larger box to build the model than will ultimately be used, to 

169 # allow higher resolution in the initial image space beam model 

170 psf["image_dim"] = int( 

171 psf["dim"] * psf["image_resolution"] * psf["intermediate_res"] 

172 ) 

173 psf["scale"] = obs["dimension"] * psf["intermediate_res"] / psf["image_dim"] 

174 psf["pix_horizon"] = obs["dimension"] / psf["scale"] 

175 psf["superres_dim"] = psf["dim"] * psf["resolution"] 

176 

177 location = EarthLocation.of_site(obs["instrument"]) 

178 

179 # Get the zenith angle and azimuth angle arrays 

180 xvals_celestial, yvals_celestial = np.meshgrid( 

181 np.arange(psf["image_dim"]), 

182 np.arange(psf["image_dim"]), 

183 ) 

184 xvals_celestial = ( 

185 xvals_celestial * psf["scale"] 

186 - psf["image_dim"] * psf["scale"] / 2 

187 + obs["obsx"] 

188 ) 

189 yvals_celestial = ( 

190 yvals_celestial * psf["scale"] 

191 - psf["image_dim"] * psf["scale"] / 2 

192 + obs["obsy"] 

193 ) 

194 ra_arr, dec_arr = pixel_to_radec(xvals_celestial, yvals_celestial, obs["astr"]) 

195 del xvals_celestial, yvals_celestial 

196 valid_i = np.where(np.isfinite(ra_arr)) 

197 ra_arr = ra_arr[valid_i] 

198 dec_arr = dec_arr[valid_i] 

199 alt_arr, az_arr = radec_to_altaz( 

200 ra_arr.value, 

201 dec_arr.value, 

202 location.lat.value, 

203 location.lon.value, 

204 location.height.value, 

205 jdate_use, 

206 ) 

207 zenith_angle_arr = np.full([psf["image_dim"], psf["image_dim"]], 90) 

208 zenith_angle_arr[valid_i] = 90 - alt_arr.value 

209 # Initialize the azimuth angle array in degrees 

210 azimuth_arr = np.zeros([psf["image_dim"], psf["image_dim"]]) 

211 azimuth_arr[valid_i] = az_arr.value 

212 # Save some memory by deleting the unused arrays 

213 del ra_arr, dec_arr, alt_arr, az_arr 

214 

215 if pyfhd_config["instrument"] == "mwa": 

216 mwa_beam_file = importlib_resources.files( 

217 "PyFHD.resources.instrument_config" 

218 ).joinpath("mwa_full_embedded_element_pattern.h5") 

219 if not mwa_beam_file.exists(): 

220 # Download the MWA beam file if it does not exist 

221 raise FileNotFoundError( 

222 f"MWA beam file {mwa_beam_file} does not exist. " 

223 "Please download it from http://ws.mwatelescope.org/static/mwa_full_embedded_element_pattern.h5 into the." 

224 f"directory {mwa_beam_file.parent}" 

225 ) 

226 beam = UVBeam.from_file(mwa_beam_file, delays=obs["delays"]) 

227 # If you wish to add a different insturment, do it by adding a new elif here 

228 else: 

229 # Do an analytic beam as a placeholder 

230 beam = ShortDipoleBeam() 

231 

232 # Get the jones matrix for the antenna 

233 antenna["jones"] = general_jones_matrix( 

234 beam, 

235 za_array=zenith_angle_arr, 

236 az_array=azimuth_arr, 

237 freq_array=frequency_array, 

238 telescope_location=location, 

239 ) 

240 

241 # Get the antenna response 

242 antenna["response"] = general_antenna_response( 

243 obs, 

244 antenna, 

245 za_arr=zenith_angle_arr, 

246 az_arr=azimuth_arr, 

247 ) 

248 

249 return antenna, psf, beam 

250 

251 

252def general_jones_matrix( 

253 beam_obj: UVBeam | AnalyticBeam | BeamInterface, 

254 za_array: np.ndarray[float] | None = None, 

255 alt_array: np.ndarray[float] | None = None, 

256 az_array: np.ndarray[float] | None = None, 

257 ra_array: np.ndarray[float] | None = None, 

258 dec_array: np.ndarray[float] | None = None, 

259 az_convention: Literal["east of north", "north of east"] = "east of north", 

260 frame: str = "icrs", 

261 time: Time | None = None, 

262 telescope_location: EarthLocation | None = None, 

263 freq_array: np.ndarray[float] | None = None, 

264 spline_opts: dict | None = None, 

265 check_azza_domain: bool = True, 

266) -> NDArray[np.complexfloating]: 

267 """ 

268 Get beam values from a pyuvdata beam for a set of directions on the sky. 

269 

270 Accepts zenith angle and azimuth, altitude and aziumth or RA/Dec arrays 

271 along with the associated frame and astropy Time and EarthLocation objects. 

272 Azimuth convention is specified using the `az_convention` parameter, 

273 options are "north of east" (the UVBeam convention) or "east of north" 

274 (the astropy alt/az frame convention and the FHD convention). 

275 

276 Parameters 

277 ---------- 

278 beam_obj : UVBeam or AnalyticBeam or BeamInterface 

279 A pyuvdata beam, can be a UVBeam, and AnalyticBeam subclass, or a 

280 BeamInterface object. 

281 alt_array : np.ndarray[float] 

282 Array of altitudes (also called elevations) in radians. Must be a 1D array. 

283 za_array : np.ndarray[float] 

284 Array of zenith angles (zenith is zero, horizon is 90 degrees). Must be 

285 a 1D array. 

286 az_array : np.ndarray[float] 

287 Array of azimuths in radians. Defined according to the az_convention parameter. 

288 Must be a 1D array. 

289 ra_array : np.ndarray[float] 

290 Array of right ascensions in radians. Must be a 1D array. 

291 dec_array : np.ndarray[float] 

292 Array of declinations in radians. Must be a 1D array. 

293 az_convention : str 

294 either "east of north" N=0, E=90 degrees or "north of east" E=0, N=90 degrees. 

295 frame : str 

296 The frame for RA and Dec, ignored if alt/az are provided. Must be a frame 

297 known to astropy. 

298 time : astropy.time.Time 

299 Astropy Time object specifying the center of the observation time. Used 

300 for converting RA/Dec to AltAz, ignored if alt/az are provided. 

301 telescope_location : astropy.coordinates.EarthLocation 

302 Astropy EarthLocation object specifying the telescope location. Used 

303 for converting RA/Dec to AltAz, ignored if alt/az are provided. 

304 freq_array : np.ndarray[float] 

305 Frequencies to get the beam response for in Hz. Requried for analytic beams, 

306 defaults to the frequencies defined on the beam object for UVBeams. 

307 spline_opts : dict 

308 Provide options to numpy.RectBivariateSpline. This includes spline 

309 order parameters `kx` and `ky`, and smoothing parameter `s`. Only 

310 applies if beam is a UVBeam. 

311 check_azza_domain : bool 

312 Whether to check the domain of az/za to ensure that they are covered by the 

313 intrinsic data array. Checking them can be quite computationally expensive. 

314 Conversely, if the passed az/za are outside of the domain, they will be 

315 silently extrapolated and the behavior is not well-defined. Only 

316 applies if beam is a UVBeam. Should be set to False if it is known that 

317 the beam covers the whole sky. 

318 

319 Returns 

320 ------- 

321 NDArray[np.complexfloating] 

322 An array of computed values, shape (number of vector directions (usually 2), 

323 number of feeds (usually 2), number of frequencies, number of directions). 

324 The first axis indexes over the polarization vector components, generally 

325 aligned with the azimuthal then zenith angle directions. The second axis 

326 indexes over the feeds (order defined in the beam feed array). 

327 """ 

328 alt_az_in = alt_array is not None and az_array is not None 

329 za_az_in = za_array is not None and az_array is not None 

330 ra_dec_in = np.all( 

331 [var is not None for var in [ra_array, dec_array, time, telescope_location]] 

332 ) 

333 

334 if not alt_az_in and not za_az_in and not ra_dec_in: 

335 raise ValueError( 

336 "Either alt_array and az_array must be provided or ra_array, dec_array, " 

337 "time and telescope_location must all be provided." 

338 ) 

339 

340 allowed_az_convention = ["east of north", "north of east"] 

341 if (alt_az_in or za_az_in) and (az_convention not in allowed_az_convention): 

342 raise ValueError( 

343 f"az_convention must be one of {allowed_az_convention}. " 

344 f"It was {az_convention}." 

345 ) 

346 

347 # FHD requires an Efield beam, so set it here to be explicit 

348 beam = BeamInterface(beam_obj, beam_type="efield") 

349 

350 if ra_dec_in: 

351 if ra_array.shape != dec_array.shape: 

352 raise ValueError("ra_array and dec_array must have the same shape") 

353 

354 # convert to alt/az 

355 skycoord = SkyCoord( 

356 ra=ra_array * units.rad, 

357 dec=dec_array * units.rad, 

358 frame=frame, 

359 obstime=time, 

360 location=telescope_location, 

361 ).transform_to("altaz") 

362 

363 alt_array = skycoord.alt.to("rad").value 

364 az_array = skycoord.az.to("rad").value 

365 az_convention = "east of north" 

366 elif alt_az_in: 

367 if alt_array.shape != az_array.shape: 

368 raise ValueError("alt_array and az_array must have the same shape") 

369 else: 

370 if za_array.shape != az_array.shape: 

371 raise ValueError("za_array and az_array must have the same shape") 

372 

373 if alt_az_in or ra_dec_in: 

374 za_array = np.pi / 2 - alt_array 

375 

376 if az_convention == "east of north": 

377 noe_az_array = np.pi / 2 - az_array 

378 else: 

379 noe_az_array = az_array 

380 

381 # Wrap the azimuth array to [0, 2pi] to match the extent of the UVBeam azimuth 

382 where_neg_az = np.nonzero(noe_az_array < 0) 

383 noe_az_array[where_neg_az] = noe_az_array[where_neg_az] + np.pi * 2.0 

384 

385 # use the faster interpolation method if appropriate 

386 if beam._isuvbeam and beam.beam.pixel_coordinate_system == "az_za": 

387 interpol_fn = "az_za_map_coordinates" 

388 else: 

389 interpol_fn = None 

390 

391 return beam.compute_response( 

392 az_array=noe_az_array, 

393 za_array=za_array, 

394 freq_array=freq_array, 

395 interpolation_function=interpol_fn, 

396 spline_opts=spline_opts, 

397 check_azza_domain=check_azza_domain, 

398 ) 

399 

400 

401def general_antenna_response( 

402 obs: dict, 

403 antenna: dict, 

404 za_arr: NDArray[np.floating], 

405 az_arr: NDArray[np.floating], 

406) -> NDArray[np.complexfloating]: 

407 """ 

408 Calculate the response of a set of antennas for a given observation and antenna configuration, 

409 including the electrical delays and coupling. 

410 

411 Parameters 

412 ---------- 

413 obs : dict 

414 Observation metadata dictionary 

415 antenna : dict 

416 Antenna metadata dictionary 

417 za_arr : NDArray[np.floating] 

418 Zenith angle array in radians 

419 az_arr : NDArray[np.floating] 

420 Azimuth angle array in radians 

421 

422 Returns 

423 ------- 

424 response 

425 The unpolarised response of a set of antennas 

426 """ 

427 light_speed = c.value 

428 

429 """ 

430 Given that in FHD the antenna response is a pointer array of shape (antenna["n_pol", obs["n_tile"]) 

431 where each pointer is an array of pointers of shape (antenna["n_freq_bin"]). Each pointer in the array 

432 of shape (antenna["n_freq_bin"]) points to a complex array of shape (antenna["pix_use"].size,). 

433 

434 Furthermore, when the antenna response is calculated, it looks like this is done on a per frequency bin 

435 basis and each tile will point to the same antenna response for that frequency bin. This means we can ignore 

436 the tile dimension and just calculate the antenna response for each frequency bin and polarization to save 

437 memory in Python. 

438 """ 

439 response = np.zeros( 

440 [antenna["n_pol"], antenna["nfreq_bin"], antenna["pix_use"].size], 

441 dtype=np.complex128, 

442 ) 

443 

444 # Calculate projections only at locations of non-zero pixels 

445 proj_east_use = np.sin(za_arr[antenna["pix_use"]]) * np.sin( 

446 az_arr[antenna["pix_use"]] 

447 ) 

448 proj_north_use = np.sin(za_arr[antenna["pix_use"]]) * np.cos( 

449 az_arr[antenna["pix_use"]] 

450 ) 

451 proj_z_use = np.cos(za_arr[antenna["pix_use"]]) 

452 

453 # FHD assumes you might be dealing with more than one antenna, hence the groupings it used. 

454 # PyFHD currently only supports one antenna, so we can ignore the groupings. 

455 for pol_i in range(antenna["n_pol"]): 

456 # Phase of each dipole for the source (relative to the beamformer settings) 

457 D_d = ( 

458 np.outer(antenna["coords"][0], proj_east_use) 

459 + np.outer(antenna["coords"][1], proj_north_use) 

460 + np.outer(antenna["coords"][2], proj_z_use) 

461 ) 

462 

463 for freq_i in range(antenna["nfreq_bin"]): 

464 Kconv = 2 * np.pi * antenna["freq"][freq_i] / light_speed 

465 voltage_delay = np.exp( 

466 1j 

467 * 2 

468 * np.pi 

469 * antenna["delays"] 

470 * antenna["freq"][freq_i] 

471 * antenna["gain"][pol_i, freq_i] 

472 ) 

473 # TODO: Check if it's actually outer, although it does look like voltage_delay is likely 1D 

474 measured_current = np.outer( 

475 voltage_delay, antenna["coupling"][pol_i, freq_i] 

476 ) 

477 zenith_norm = np.outer( 

478 np.ones(antenna["n_ant_elements"]), 

479 antenna["coupling"][pol_i, freq_i], 

480 ) 

481 measured_current /= zenith_norm 

482 

483 # TODO: This loop can probably be vectorized 

484 for ii in range(antenna["n_ant_elements"]): 

485 # TODO: check the way D_d needs to be indexed 

486 antenna_gain_arr = np.exp(-1j * Kconv * D_d[ii, :]) 

487 response[pol_i, freq_i] += ( 

488 antenna_gain_arr * measured_current[ii] / antenna["n_ant_elements"] 

489 ) 

490 

491 return response