Coverage for PyFHD/beam_setup/beam.py: 0%

120 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 10:58 +0800

1import numpy as np 

2from astropy.constants import c 

3from logging import Logger 

4from scipy.interpolate import interp1d 

5from scipy.io import readsav 

6from PyFHD.beam_setup.antenna import init_beam 

7from PyFHD.beam_setup.beam_utils import beam_power 

8from PyFHD.io.pyfhd_io import recarray_to_dict 

9from pathlib import Path 

10from PyFHD.io.pyfhd_io import save, load 

11from h5py import File 

12import sys 

13 

14from PyFHD.pyfhd_tools.pyfhd_utils import histogram, rebin, weight_invert 

15 

16 

17def create_psf(obs: dict, pyfhd_config: dict, logger: Logger) -> dict | File: 

18 """ 

19 Creates the psf dictionary, which includes the hyperresolved uv-kernel used for 

20 gridding, by building the image-space response of a set of antennas and transforming 

21 it to uv-space, or by loading in a `sav` or `HDF5` file from a FHD run. The 

22 base response of an antenna is retreived from a pyuvdata class/object. 

23 In the future it would be nice to have the ability to read in a beam fits file. 

24 Do note, that PyFHD was made with the assumption that the beam is the MWA beam, 

25 and assumes the beam does not differ on a per baseline basis. If you wish to use 

26 separate baselines you'll need to add that functionality yourself. 

27 

28 Parameters 

29 ---------- 

30 obs : dict 

31 The observation metadata dictionary 

32 pyfhd_config : dict 

33 The PyFHD configuration dictionary 

34 logger : Logger 

35 PyFHD's logger 

36 

37 Raises 

38 ------ 

39 ValueError 

40 If the beam file type is not recognized or if the beam file path is not set correctly. 

41 

42 Returns 

43 ------- 

44 dict | h5py.File 

45 Return the psf dictionary containing the hyperresolved uv-beam and other metadata, 

46 or a h5py File object if lazy loading is enabled. 

47 """ 

48 

49 if pyfhd_config["beam_file_path"] is None: 

50 # Form the beam from scratch using pyuvdata for the Jones Matrix 

51 # and translations from FHD for the antenna response. 

52 logger.info( 

53 "PyFHD will do the beam forming from scratch using pyuvdata and the antenna response from FHD." 

54 "Please note, gaussian decomp for MWA is not implemented yet." 

55 ) 

56 antenna, psf, beam = init_beam(obs, pyfhd_config, logger) 

57 # TODO: we'll see if the +1 is necessary, IDL indexing thing 

58 n_freq_bin = np.max(obs["baseline_info"]["fbin_i"]) + 1 

59 # TODO: Double check the shape 

60 beam_arr = np.zeros( 

61 [ 

62 obs["n_pol"], 

63 n_freq_bin, 

64 psf["resolution"] + 1, 

65 psf["resolution"] + 1, 

66 psf["dim"] ** 2, 

67 ], 

68 dtype=np.complex128, 

69 ) 

70 xvals_i, yvals_i = np.meshgrid( 

71 np.arange(psf["resolution"]), np.arange(psf["resolution"]), indexing="ij" 

72 ) 

73 xvals_i *= psf["resolution"] 

74 yvals_i *= psf["resolution"] 

75 xvals_i = xvals_i.flatten() 

76 yvals_i = yvals_i.flatten() 

77 xvals_psf_dim, yvals_psf_dim = np.meshgrid( 

78 np.arange(psf["dim"]), np.arange(psf["dim"]), indexing="ij" 

79 ) 

80 psf["xvals"] = np.zeros( 

81 [psf["resolution"], psf["resolution"], psf["dim"], psf["dim"]] 

82 ) 

83 psf["yvals"] = np.zeros( 

84 [psf["resolution"], psf["resolution"], psf["dim"], psf["dim"]] 

85 ) 

86 for i in range(psf["resolution"]): 

87 for j in range(psf["resolution"]): 

88 psf["xvals"][i, j, :, :] = ( 

89 xvals_psf_dim - psf["dim"] / 2 + i / psf["resolution"] 

90 ) 

91 psf["yvals"][i, j, :, :] = ( 

92 yvals_psf_dim - psf["dim"] / 2 + j / psf["resolution"] 

93 ) 

94 

95 zen_int_x = (obs["zenx"] - obs["obsx"]) / psf["scale"] + psf["image_dim"] / 2 

96 zen_int_y = (obs["zeny"] - obs["obsy"]) / psf["scale"] + psf["image_dim"] / 2 

97 # Calculate the hyperresolved uv-vals of the beam kernel at highest precision prior to cast to 

98 # be accurate yet small 

99 res_super = 1 / (psf["resolution"] / psf["intermediate_res"]) 

100 

101 xvals_uv_superres, yvals_uv_superres = np.meshgrid( 

102 np.arange(psf["superres_dim"]), 

103 np.arange(psf["superres_dim"]), 

104 ) 

105 xvals_uv_superres = ( 

106 xvals_uv_superres * res_super 

107 - np.floor(psf["dim"] / 2) * psf["intermediate_res"] 

108 + np.floor(psf["dim"] / 2) 

109 ) 

110 yvals_uv_superres = ( 

111 yvals_uv_superres * res_super 

112 - np.floor(psf["dim"] / 2) * psf["intermediate_res"] 

113 + np.floor(psf["dim"] / 2) 

114 ) 

115 

116 freq_center = antenna["freq"][0] 

117 primary_beam_area = np.zeros([obs["n_pol"], obs["n_freq"]], dtype=np.float64) 

118 primary_beam_sq_area = np.zeros([obs["n_pol"], obs["n_freq"]], dtype=np.float64) 

119 ant_a_list = obs["baseline_info"]["tile_A"][0 : obs["n_baselines"]] 

120 ant_b_list = obs["baseline_info"]["tile_B"][0 : obs["n_baselines"]] 

121 baseline_mod = np.max( 

122 [ 

123 2 

124 ** ( 

125 np.ceil( 

126 np.log(np.sqrt(obs["n_baselines"] - obs["n_tile"])) / np.log(2) 

127 ) 

128 ), 

129 np.max(ant_a_list), 

130 np.max(ant_b_list), 

131 256, 

132 2 * obs["n_tile"], 

133 ] 

134 ) 

135 bi_list = ant_b_list + ant_a_list * baseline_mod 

136 bi_hist0, _, _ = histogram(bi_list, min=0, bin_size=1) 

137 bi_max = np.max(bi_list) 

138 pol_arr = np.array([[0, 0], [1, 1], [0, 1], [1, 0]], dtype=np.int8) 

139 for pol_i in range(obs["n_pol"]): 

140 ant_pol1 = pol_arr[0, pol_i] 

141 ant_pol2 = pol_arr[1, pol_i] 

142 

143 # Group IDs label unique beams across the array (should be all 0s as theres only one group) 

144 group1 = antenna["group_id"][ant_pol1] 

145 group2 = antenna["group_id"][ant_pol2] 

146 

147 # Histogram group IDs, get reverse indicies, and calculate number of unique beams (again that should be 1) 

148 hgroup1, _, gri1 = histogram(group1, min=0) 

149 hgroup2, _, gri2 = histogram(group2, min=0) 

150 # Histogram matrix between all separate groups of different beams 

151 group_matrix = np.outer(hgroup2, hgroup1) 

152 for freq_i in range(n_freq_bin): 

153 beam_int = 0 

154 beam_int_2 = 0 

155 baseline_group_n = group_matrix[0, 0] 

156 # Get antenna indices which use this group's unique beam (probably all of them...) 

157 ant_1_arr = gri1[gri1[0] : gri1[1]] 

158 ant_2_arr = gri2[gri2[0] : gri2[1]] 

159 # Get the number of baselines in each group 

160 ant_1_n = hgroup1[0] 

161 ant_2_n = hgroup2[0] 

162 bi_use = np.flatten( 

163 rebin(ant_1_arr + 1, (ant_1_n, ant_2_n)) * baseline_mod 

164 + rebin(ant_2_arr.T + 1, (ant_2_n, ant_1_n)) 

165 ) 

166 bi_use2 = np.flatten( 

167 rebin(ant_1_arr + 1, (ant_1_n, ant_2_n)) 

168 + rebin(ant_2_arr.T + 1, (ant_2_n, ant_1_n)) * baseline_mod 

169 ) 

170 bi_use = np.concatenate([bi_use, bi_use2]) 

171 if np.max(bi_use) > bi_max: 

172 bi_use = bi_use[np.where(bi_use <= bi_max)] 

173 bi_use_i = np.nonzero(bi_hist0[bi_use]) 

174 if bi_use_i.size > 0: 

175 bi_use = bi_use[bi_use_i] 

176 baseline_group_n = bi_use.size 

177 # Calculate power beam from antenna beams 

178 psf_base_superres = beam_power( 

179 antenna, 

180 beam, 

181 ant_pol1, 

182 ant_pol2, 

183 freq_i, 

184 psf, 

185 zen_int_x, 

186 zen_int_y, 

187 xvals_uv_superres, 

188 yvals_uv_superres, 

189 pyfhd_config, 

190 ) 

191 

192 # divide by psf_resolution^2 since the FFT is done at 

193 # a different resolution and requires a different normalization 

194 beam_int += ( 

195 baseline_group_n 

196 + np.sum(psf_base_superres) / psf["resolution"] ** 2 

197 ) 

198 beam_int_2 += ( 

199 baseline_group_n 

200 + np.sum(np.abs(psf_base_superres)) / psf["resolution"] ** 2 

201 ) 

202 n_grp_use += baseline_group_n 

203 psf_single = np.zeros( 

204 [psf["resolution"] + 1, psf["resolution"] + 1], 

205 psf["dim"] ** 2, 

206 dtype=np.complex128, 

207 ) 

208 

209 for i in range(psf["resolution"]): 

210 for j in range(psf["resolution"]): 

211 psf_single[psf["resolution"] - i, psf["resolution"] - j] = ( 

212 psf_base_superres[xvals_i + i, yvals_i + j] 

213 ) 

214 # TODO: check the rolling (shifting) and potential reshaping done here (should already be in) 

215 for i in range(psf["resolution"]): 

216 psf_single[psf["resolution"] - i, psf["resolution"]] = np.roll( 

217 psf_base_superres[xvals_i + i, yvals_i + psf["resolution"]], 

218 1, 

219 0, 

220 ).flatten() 

221 for j in range(psf["resolution"]): 

222 psf_single[psf["resolution"], psf["resolution"] - j] = np.roll( 

223 psf_base_superres[xvals_i + psf["resolution"], yvals_i + j], 

224 1, 

225 1, 

226 ).flatten() 

227 psf_single[psf["resolution"], psf["resolution"]] = np.roll( 

228 np.roll( 

229 psf_base_superres[ 

230 xvals_i + psf["resolution"], yvals_i + psf["resolution"] 

231 ], 

232 1, 

233 1, 

234 ), 

235 1, 

236 0, 

237 ).flatten() 

238 # Update the beam arr 

239 beam_arr[pol_i, freq_i, :, :, :] = psf_single 

240 

241 # Calculate the primary beam area and squared area 

242 beam_int_2 *= weight_invert(n_grp_use) / obs["kpix"] ** 2 

243 beam_int *= weight_invert(n_grp_use) / obs["kpix"] ** 2 

244 fi_use = np.where(obs["baseline_info"]["fbin_i"] == freq_i) 

245 primary_beam_area[pol_i, fi_use] = beam_int 

246 primary_beam_sq_area[pol_i, fi_use] = beam_int_2 

247 

248 psf["beam_ptr"] = beam_arr 

249 obs["primary_beam_area"] = primary_beam_area 

250 obs["primary_beam_sq_area"] = primary_beam_sq_area 

251 psf["group_arr"] = np.zeros( 

252 [obs["n_pol"], obs["n_freq"], obs["n_baselines"]], dtype=np.int8 

253 ) 

254 

255 # Save the psf to a file 

256 pyfhd_config["beams_dir"] = Path(pyfhd_config["output_dir"], "beams") 

257 pyfhd_config["beams_dir"].mkdir(exist_ok=True) 

258 save( 

259 Path(pyfhd_config["beams_dir"], f"{pyfhd_config['obs_id']}_beam.h5"), 

260 psf, 

261 "psf", 

262 logger=logger, 

263 to_chunk={ 

264 "beam_ptr": { 

265 "shape": psf["beam_ptr"].shape, 

266 "chunk": tuple([1] * 2 + list(psf["beam_ptr"].shape)[2:]), 

267 } 

268 }, 

269 ) 

270 

271 return psf 

272 elif pyfhd_config["beam_file_path"].suffix == ".sav": 

273 # Read in a sav file containing the psf structure as we expect from FHD 

274 logger.info( 

275 "Reading in a beam sav file probably will take a long time. You will require double the storage size of the sav file in RAM at least. Do some other work or maybe watch your favourite long movie, for example the extended edition of LOTR: Return of the King is 4 hours 10 minutes. Check back when the Battle of the Pelennor Fields has finished or roughly 3 hours in." 

276 ) 

277 beam = readsav(pyfhd_config["beam_file_path"], python_dict=True) 

278 psf = beam["psf"] 

279 # Delete the read in sav file, now that we got the psf, at this point we will have the psf size twice! 

280 del beam 

281 psf["beam_ptr"][0] = psf["beam_ptr"][0].T 

282 # Take only the first baseline (as it assumes every baseline points to the first i.e. the FFT is done per frequency) 

283 # Has a bonus of reducing memory use, unless NumPy is really good at using representations, maybe use double memory 

284 psf["beam_ptr"][0] = psf["beam_ptr"][0][:, :, 0] 

285 # Transpose the group array 

286 psf["id"][0] = psf["id"][0].T 

287 # Recarray to dict completely unpack object arrays into the dict, although will require the beam_ptr in memory twice potentially temporarily 

288 psf = recarray_to_dict(psf) 

289 # The to_chunk is a dictionary of dictionaries which contain the information necessary to chunk the beam_ptr 

290 to_chunk = { 

291 "beam_ptr": { 

292 "shape": psf["beam_ptr"].shape, 

293 "chunk": tuple([1] * 2 + list(psf["beam_ptr"].shape)[2:]), 

294 } 

295 } 

296 # By default save the file in the same place as the original beam 

297 output_path = Path( 

298 pyfhd_config["beam_file_path"].parent, 

299 pyfhd_config["beam_file_path"].stem + ".h5", 

300 ) 

301 save(output_path, psf, "psf", logger=logger, to_chunk=to_chunk) 

302 # Since the psf is already in memory, return it 

303 return psf 

304 elif ( 

305 pyfhd_config["beam_file_path"].suffix == ".h5" 

306 or pyfhd_config["beam_file_path"].suffix == ".hdf5" 

307 ): 

308 logger.info(f"Reading in the HDF5 file {pyfhd_config['beam_file_path']}") 

309 # If you selected to lazy load the beam, then psf will be a h5py File Object 

310 psf = load( 

311 pyfhd_config["beam_file_path"], 

312 logger=logger, 

313 lazy_load=pyfhd_config["lazy_load_beam"], 

314 ) 

315 return psf 

316 elif pyfhd_config["beam_file_path"].suffix == ".fits": 

317 # Read in a fits file, when you do I assume you probably will be translating 

318 # FHD's beam setup while reading in a beam fits file. 

319 logger.error("The ability to read in a beam fits hasn't been implemented yet") 

320 sys.exit(1) 

321 raise ValueError( 

322 f"Unknown beam file type {pyfhd_config['beam_file_path'].suffix}. " 

323 "Please use a .sav, .h5, .hdf5" 

324 "If you meant for PyFHD to do the beam forming, please set the beam_file_path to None (~ in YAML)." 

325 )