Coverage for PyFHD/beam_setup/beam.py: 0%
120 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
1import numpy as np
2from astropy.constants import c
3from logging import Logger
4from scipy.interpolate import interp1d
5from scipy.io import readsav
6from PyFHD.beam_setup.antenna import init_beam
7from PyFHD.beam_setup.beam_utils import beam_power
8from PyFHD.io.pyfhd_io import recarray_to_dict
9from pathlib import Path
10from PyFHD.io.pyfhd_io import save, load
11from h5py import File
12import sys
14from PyFHD.pyfhd_tools.pyfhd_utils import histogram, rebin, weight_invert
17def create_psf(obs: dict, pyfhd_config: dict, logger: Logger) -> dict | File:
18 """
19 Creates the psf dictionary, which includes the hyperresolved uv-kernel used for
20 gridding, by building the image-space response of a set of antennas and transforming
21 it to uv-space, or by loading in a `sav` or `HDF5` file from a FHD run. The
22 base response of an antenna is retreived from a pyuvdata class/object.
23 In the future it would be nice to have the ability to read in a beam fits file.
24 Do note, that PyFHD was made with the assumption that the beam is the MWA beam,
25 and assumes the beam does not differ on a per baseline basis. If you wish to use
26 separate baselines you'll need to add that functionality yourself.
28 Parameters
29 ----------
30 obs : dict
31 The observation metadata dictionary
32 pyfhd_config : dict
33 The PyFHD configuration dictionary
34 logger : Logger
35 PyFHD's logger
37 Raises
38 ------
39 ValueError
40 If the beam file type is not recognized or if the beam file path is not set correctly.
42 Returns
43 -------
44 dict | h5py.File
45 Return the psf dictionary containing the hyperresolved uv-beam and other metadata,
46 or a h5py File object if lazy loading is enabled.
47 """
49 if pyfhd_config["beam_file_path"] is None:
50 # Form the beam from scratch using pyuvdata for the Jones Matrix
51 # and translations from FHD for the antenna response.
52 logger.info(
53 "PyFHD will do the beam forming from scratch using pyuvdata and the antenna response from FHD."
54 "Please note, gaussian decomp for MWA is not implemented yet."
55 )
56 antenna, psf, beam = init_beam(obs, pyfhd_config, logger)
57 # TODO: we'll see if the +1 is necessary, IDL indexing thing
58 n_freq_bin = np.max(obs["baseline_info"]["fbin_i"]) + 1
59 # TODO: Double check the shape
60 beam_arr = np.zeros(
61 [
62 obs["n_pol"],
63 n_freq_bin,
64 psf["resolution"] + 1,
65 psf["resolution"] + 1,
66 psf["dim"] ** 2,
67 ],
68 dtype=np.complex128,
69 )
70 xvals_i, yvals_i = np.meshgrid(
71 np.arange(psf["resolution"]), np.arange(psf["resolution"]), indexing="ij"
72 )
73 xvals_i *= psf["resolution"]
74 yvals_i *= psf["resolution"]
75 xvals_i = xvals_i.flatten()
76 yvals_i = yvals_i.flatten()
77 xvals_psf_dim, yvals_psf_dim = np.meshgrid(
78 np.arange(psf["dim"]), np.arange(psf["dim"]), indexing="ij"
79 )
80 psf["xvals"] = np.zeros(
81 [psf["resolution"], psf["resolution"], psf["dim"], psf["dim"]]
82 )
83 psf["yvals"] = np.zeros(
84 [psf["resolution"], psf["resolution"], psf["dim"], psf["dim"]]
85 )
86 for i in range(psf["resolution"]):
87 for j in range(psf["resolution"]):
88 psf["xvals"][i, j, :, :] = (
89 xvals_psf_dim - psf["dim"] / 2 + i / psf["resolution"]
90 )
91 psf["yvals"][i, j, :, :] = (
92 yvals_psf_dim - psf["dim"] / 2 + j / psf["resolution"]
93 )
95 zen_int_x = (obs["zenx"] - obs["obsx"]) / psf["scale"] + psf["image_dim"] / 2
96 zen_int_y = (obs["zeny"] - obs["obsy"]) / psf["scale"] + psf["image_dim"] / 2
97 # Calculate the hyperresolved uv-vals of the beam kernel at highest precision prior to cast to
98 # be accurate yet small
99 res_super = 1 / (psf["resolution"] / psf["intermediate_res"])
101 xvals_uv_superres, yvals_uv_superres = np.meshgrid(
102 np.arange(psf["superres_dim"]),
103 np.arange(psf["superres_dim"]),
104 )
105 xvals_uv_superres = (
106 xvals_uv_superres * res_super
107 - np.floor(psf["dim"] / 2) * psf["intermediate_res"]
108 + np.floor(psf["dim"] / 2)
109 )
110 yvals_uv_superres = (
111 yvals_uv_superres * res_super
112 - np.floor(psf["dim"] / 2) * psf["intermediate_res"]
113 + np.floor(psf["dim"] / 2)
114 )
116 freq_center = antenna["freq"][0]
117 primary_beam_area = np.zeros([obs["n_pol"], obs["n_freq"]], dtype=np.float64)
118 primary_beam_sq_area = np.zeros([obs["n_pol"], obs["n_freq"]], dtype=np.float64)
119 ant_a_list = obs["baseline_info"]["tile_A"][0 : obs["n_baselines"]]
120 ant_b_list = obs["baseline_info"]["tile_B"][0 : obs["n_baselines"]]
121 baseline_mod = np.max(
122 [
123 2
124 ** (
125 np.ceil(
126 np.log(np.sqrt(obs["n_baselines"] - obs["n_tile"])) / np.log(2)
127 )
128 ),
129 np.max(ant_a_list),
130 np.max(ant_b_list),
131 256,
132 2 * obs["n_tile"],
133 ]
134 )
135 bi_list = ant_b_list + ant_a_list * baseline_mod
136 bi_hist0, _, _ = histogram(bi_list, min=0, bin_size=1)
137 bi_max = np.max(bi_list)
138 pol_arr = np.array([[0, 0], [1, 1], [0, 1], [1, 0]], dtype=np.int8)
139 for pol_i in range(obs["n_pol"]):
140 ant_pol1 = pol_arr[0, pol_i]
141 ant_pol2 = pol_arr[1, pol_i]
143 # Group IDs label unique beams across the array (should be all 0s as theres only one group)
144 group1 = antenna["group_id"][ant_pol1]
145 group2 = antenna["group_id"][ant_pol2]
147 # Histogram group IDs, get reverse indicies, and calculate number of unique beams (again that should be 1)
148 hgroup1, _, gri1 = histogram(group1, min=0)
149 hgroup2, _, gri2 = histogram(group2, min=0)
150 # Histogram matrix between all separate groups of different beams
151 group_matrix = np.outer(hgroup2, hgroup1)
152 for freq_i in range(n_freq_bin):
153 beam_int = 0
154 beam_int_2 = 0
155 baseline_group_n = group_matrix[0, 0]
156 # Get antenna indices which use this group's unique beam (probably all of them...)
157 ant_1_arr = gri1[gri1[0] : gri1[1]]
158 ant_2_arr = gri2[gri2[0] : gri2[1]]
159 # Get the number of baselines in each group
160 ant_1_n = hgroup1[0]
161 ant_2_n = hgroup2[0]
162 bi_use = np.flatten(
163 rebin(ant_1_arr + 1, (ant_1_n, ant_2_n)) * baseline_mod
164 + rebin(ant_2_arr.T + 1, (ant_2_n, ant_1_n))
165 )
166 bi_use2 = np.flatten(
167 rebin(ant_1_arr + 1, (ant_1_n, ant_2_n))
168 + rebin(ant_2_arr.T + 1, (ant_2_n, ant_1_n)) * baseline_mod
169 )
170 bi_use = np.concatenate([bi_use, bi_use2])
171 if np.max(bi_use) > bi_max:
172 bi_use = bi_use[np.where(bi_use <= bi_max)]
173 bi_use_i = np.nonzero(bi_hist0[bi_use])
174 if bi_use_i.size > 0:
175 bi_use = bi_use[bi_use_i]
176 baseline_group_n = bi_use.size
177 # Calculate power beam from antenna beams
178 psf_base_superres = beam_power(
179 antenna,
180 beam,
181 ant_pol1,
182 ant_pol2,
183 freq_i,
184 psf,
185 zen_int_x,
186 zen_int_y,
187 xvals_uv_superres,
188 yvals_uv_superres,
189 pyfhd_config,
190 )
192 # divide by psf_resolution^2 since the FFT is done at
193 # a different resolution and requires a different normalization
194 beam_int += (
195 baseline_group_n
196 + np.sum(psf_base_superres) / psf["resolution"] ** 2
197 )
198 beam_int_2 += (
199 baseline_group_n
200 + np.sum(np.abs(psf_base_superres)) / psf["resolution"] ** 2
201 )
202 n_grp_use += baseline_group_n
203 psf_single = np.zeros(
204 [psf["resolution"] + 1, psf["resolution"] + 1],
205 psf["dim"] ** 2,
206 dtype=np.complex128,
207 )
209 for i in range(psf["resolution"]):
210 for j in range(psf["resolution"]):
211 psf_single[psf["resolution"] - i, psf["resolution"] - j] = (
212 psf_base_superres[xvals_i + i, yvals_i + j]
213 )
214 # TODO: check the rolling (shifting) and potential reshaping done here (should already be in)
215 for i in range(psf["resolution"]):
216 psf_single[psf["resolution"] - i, psf["resolution"]] = np.roll(
217 psf_base_superres[xvals_i + i, yvals_i + psf["resolution"]],
218 1,
219 0,
220 ).flatten()
221 for j in range(psf["resolution"]):
222 psf_single[psf["resolution"], psf["resolution"] - j] = np.roll(
223 psf_base_superres[xvals_i + psf["resolution"], yvals_i + j],
224 1,
225 1,
226 ).flatten()
227 psf_single[psf["resolution"], psf["resolution"]] = np.roll(
228 np.roll(
229 psf_base_superres[
230 xvals_i + psf["resolution"], yvals_i + psf["resolution"]
231 ],
232 1,
233 1,
234 ),
235 1,
236 0,
237 ).flatten()
238 # Update the beam arr
239 beam_arr[pol_i, freq_i, :, :, :] = psf_single
241 # Calculate the primary beam area and squared area
242 beam_int_2 *= weight_invert(n_grp_use) / obs["kpix"] ** 2
243 beam_int *= weight_invert(n_grp_use) / obs["kpix"] ** 2
244 fi_use = np.where(obs["baseline_info"]["fbin_i"] == freq_i)
245 primary_beam_area[pol_i, fi_use] = beam_int
246 primary_beam_sq_area[pol_i, fi_use] = beam_int_2
248 psf["beam_ptr"] = beam_arr
249 obs["primary_beam_area"] = primary_beam_area
250 obs["primary_beam_sq_area"] = primary_beam_sq_area
251 psf["group_arr"] = np.zeros(
252 [obs["n_pol"], obs["n_freq"], obs["n_baselines"]], dtype=np.int8
253 )
255 # Save the psf to a file
256 pyfhd_config["beams_dir"] = Path(pyfhd_config["output_dir"], "beams")
257 pyfhd_config["beams_dir"].mkdir(exist_ok=True)
258 save(
259 Path(pyfhd_config["beams_dir"], f"{pyfhd_config['obs_id']}_beam.h5"),
260 psf,
261 "psf",
262 logger=logger,
263 to_chunk={
264 "beam_ptr": {
265 "shape": psf["beam_ptr"].shape,
266 "chunk": tuple([1] * 2 + list(psf["beam_ptr"].shape)[2:]),
267 }
268 },
269 )
271 return psf
272 elif pyfhd_config["beam_file_path"].suffix == ".sav":
273 # Read in a sav file containing the psf structure as we expect from FHD
274 logger.info(
275 "Reading in a beam sav file probably will take a long time. You will require double the storage size of the sav file in RAM at least. Do some other work or maybe watch your favourite long movie, for example the extended edition of LOTR: Return of the King is 4 hours 10 minutes. Check back when the Battle of the Pelennor Fields has finished or roughly 3 hours in."
276 )
277 beam = readsav(pyfhd_config["beam_file_path"], python_dict=True)
278 psf = beam["psf"]
279 # Delete the read in sav file, now that we got the psf, at this point we will have the psf size twice!
280 del beam
281 psf["beam_ptr"][0] = psf["beam_ptr"][0].T
282 # Take only the first baseline (as it assumes every baseline points to the first i.e. the FFT is done per frequency)
283 # Has a bonus of reducing memory use, unless NumPy is really good at using representations, maybe use double memory
284 psf["beam_ptr"][0] = psf["beam_ptr"][0][:, :, 0]
285 # Transpose the group array
286 psf["id"][0] = psf["id"][0].T
287 # Recarray to dict completely unpack object arrays into the dict, although will require the beam_ptr in memory twice potentially temporarily
288 psf = recarray_to_dict(psf)
289 # The to_chunk is a dictionary of dictionaries which contain the information necessary to chunk the beam_ptr
290 to_chunk = {
291 "beam_ptr": {
292 "shape": psf["beam_ptr"].shape,
293 "chunk": tuple([1] * 2 + list(psf["beam_ptr"].shape)[2:]),
294 }
295 }
296 # By default save the file in the same place as the original beam
297 output_path = Path(
298 pyfhd_config["beam_file_path"].parent,
299 pyfhd_config["beam_file_path"].stem + ".h5",
300 )
301 save(output_path, psf, "psf", logger=logger, to_chunk=to_chunk)
302 # Since the psf is already in memory, return it
303 return psf
304 elif (
305 pyfhd_config["beam_file_path"].suffix == ".h5"
306 or pyfhd_config["beam_file_path"].suffix == ".hdf5"
307 ):
308 logger.info(f"Reading in the HDF5 file {pyfhd_config['beam_file_path']}")
309 # If you selected to lazy load the beam, then psf will be a h5py File Object
310 psf = load(
311 pyfhd_config["beam_file_path"],
312 logger=logger,
313 lazy_load=pyfhd_config["lazy_load_beam"],
314 )
315 return psf
316 elif pyfhd_config["beam_file_path"].suffix == ".fits":
317 # Read in a fits file, when you do I assume you probably will be translating
318 # FHD's beam setup while reading in a beam fits file.
319 logger.error("The ability to read in a beam fits hasn't been implemented yet")
320 sys.exit(1)
321 raise ValueError(
322 f"Unknown beam file type {pyfhd_config['beam_file_path'].suffix}. "
323 "Please use a .sav, .h5, .hdf5"
324 "If you meant for PyFHD to do the beam forming, please set the beam_file_path to None (~ in YAML)."
325 )