Coverage for PyFHD/healpix/export.py: 0%

86 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-01 10:58 +0800

1import numpy as np 

2from numpy.typing import NDArray 

3from logging import Logger 

4import h5py 

5from pathlib import Path 

6from PyFHD.io.pyfhd_io import save 

7from PyFHD.data_setup.obs import update_obs 

8from PyFHD.healpix.healpix_utils import ( 

9 healpix_cnv_generate, 

10 healpix_cnv_apply, 

11 beam_image_cube, 

12 vis_model_freq_split, 

13) 

14from PyFHD.flagging.flagging import vis_flag_tiles 

15from PyFHD.pyfhd_tools.pyfhd_utils import ( 

16 vis_weights_update, 

17 split_vis_weights, 

18 vis_noise_calc, 

19) 

20 

21 

22def healpix_snapshot_cube_generate( 

23 obs: dict, 

24 psf: dict | h5py.File, 

25 cal: dict, 

26 params: dict, 

27 vis_arr: NDArray[np.complex128], 

28 vis_model_arr: NDArray[np.complex128], 

29 vis_weights: NDArray[np.float64], 

30 pyfhd_config: dict, 

31 logger: Logger, 

32) -> None: 

33 """ 

34 Generate and save HEALPix images per polarization and per frequency channel for 

35 each of the gridded visibility products, creating calibrated data, model 

36 data, sampling map, and variance map cubes. General settings split up the 

37 outputs into even and odd interleaved time samples. Interpolation is 

38 performed from the orthoslant projection to the HEALPix grid. 

39 

40 Parameters 

41 ---------- 

42 obs : dict 

43 Observation metadata dictionary. 

44 psf : dict | h5py.File 

45 Beam dictionary 

46 cal : dict 

47 Calibration dictionary 

48 params : dict 

49 Visibility metadata dictionary 

50 vis_arr : NDArray[np.complex128] 

51 The visibility array 

52 vis_model_arr : NDArray[np.complex128] 

53 The model visibility array 

54 vis_weights : NDArray[np.float64] 

55 The visibility weights 

56 pyfhd_config : dict 

57 PyFHD configuration settings 

58 logger : Logger 

59 PyFHD's Logger 

60 """ 

61 

62 if pyfhd_config["split_ps_export"]: 

63 cube_name = ["hpx_even", "hpx_odd"] 

64 else: 

65 cube_name = ["healpix_cube"] 

66 

67 if pyfhd_config["ps_kbinsize"] is not None: 

68 kbinsize = pyfhd_config["ps_kbinsize"] 

69 elif pyfhd_config["ps_fov"] is not None: 

70 # Does this get used in conjunction with the fov_use down 

71 # below, if so, we're converting from Radians to Degrees twice? 

72 kbinsize = (180 / np.pi) / pyfhd_config["ps_fov"] 

73 else: 

74 kbinsize = obs["kpix"] 

75 

76 fov_use = (180 / np.pi) / kbinsize 

77 

78 if pyfhd_config["ps_kspan"] is not None: 

79 dimension_use = pyfhd_config["ps_kspan"] / kbinsize 

80 elif pyfhd_config["ps_dimension"] is not None: 

81 dimension_use = pyfhd_config["ps_dimension"] 

82 elif pyfhd_config["ps_degpix"]: 

83 dimension_use = fov_use / pyfhd_config["ps_degpix"] 

84 else: 

85 dimension_use = fov_use / obs["degpix"] 

86 

87 dimension_use = int(dimension_use) 

88 

89 if pyfhd_config["ps_nfreq_avg"] is None: 

90 fbin_i = psf["fbin_i"] 

91 if isinstance(psf, h5py.File): 

92 fbin_i = fbin_i[:] 

93 ps_nfreq_avg = np.round(obs["n_freq"] / np.max(fbin_i + 1)) 

94 else: 

95 ps_nfreq_avg = pyfhd_config["ps_nfreq_avg"] 

96 

97 degpix_use = fov_use / dimension_use 

98 pix_sky = (4 * np.pi * (180 / np.pi) ** 2) / degpix_use**2 

99 # Below should = 1024 for 0.1119 degrees/pixel 

100 nside = 2 ** (np.ceil(np.log(np.sqrt(pix_sky / 12)) / np.log(2))) 

101 n_freq_use = int(np.floor(obs["n_freq"] / pyfhd_config["n_avg"])) 

102 

103 obs_out = update_obs( 

104 obs, dimension_use, kbinsize, beam_nfreq_avg=ps_nfreq_avg, fov=fov_use 

105 ) 

106 # To have a psf that has reacted to the new beam_nfreq_avg you have set that isn't 

107 # the default, tell PyFHD to re-create the psf here once beam_setup has been translated 

108 

109 beam_arr, beam_mask = beam_image_cube( 

110 obs_out, 

111 psf, 

112 logger, 

113 square=True, 

114 beam_threshold=pyfhd_config["ps_beam_threshold"], 

115 n_freq_bin=n_freq_use, 

116 ) 

117 

118 hpx_radius = fov_use / np.sqrt(2) 

119 

120 hpx_cnv, obs_out = healpix_cnv_generate( 

121 obs_out, beam_mask, hpx_radius, pyfhd_config, logger, nside=nside 

122 ) 

123 hpx_inds = hpx_cnv["inds"] 

124 

125 if len(pyfhd_config["ps_tile_flag_list"]) > 0: 

126 vis_weights = vis_flag_tiles( 

127 obs_out, vis_weights, pyfhd_config["ps_tile_flag_list"], logger 

128 ) 

129 

130 vis_weights, obs_out = vis_weights_update(vis_weights, obs_out, psf, params) 

131 

132 if pyfhd_config["split_ps_export"]: 

133 n_iter = 2 

134 vis_weights_use, bi_use = split_vis_weights(obs_out, vis_weights) 

135 obs_out["vis_noise"] = vis_noise_calc( 

136 obs_out, vis_arr, vis_weights, bi_use=bi_use 

137 ) 

138 uvf_name = ["even", "odd"] 

139 else: 

140 n_iter = 1 

141 uvf_name = [""] 

142 obs_out["vis_noise"] = vis_noise_calc(obs_out, vis_arr, vis_weights) 

143 bi_use = np.zeros(1, dtype=np.int64) 

144 

145 # Looks like this is set to False by default? 

146 residual_flag = obs_out["residual"] 

147 # Since the model is imported by default, dirty_flag is usually True 

148 dirty_flag = not residual_flag and vis_model_arr is not None 

149 # Create the healpix dir Path 

150 healpix_dir = Path(pyfhd_config["output_dir"], "healpix") 

151 healpix_dir.mkdir(exist_ok=True) 

152 for iter in range(n_iter): 

153 # To save on memory, PyFHD moved to saving the healpix cubes in a polarization loop 

154 # rather than trying to store the entirety of the everything in memory consuming over 

155 # 100GB of memory 

156 for pol_i in range(obs["n_pol"]): 

157 split = vis_model_freq_split( 

158 obs_out, 

159 psf, 

160 params, 

161 vis_weights_use, 

162 vis_model_arr, 

163 vis_arr, 

164 pol_i, 

165 pyfhd_config, 

166 logger, 

167 uvf_name=uvf_name[iter], 

168 bi_use=bi_use[iter], 

169 ) 

170 if dirty_flag: 

171 residual_flag = False 

172 else: 

173 residual_flag = True 

174 nf_vis = split["obs"]["nf_vis"] 

175 nf_vis_use = np.zeros(n_freq_use) 

176 for freq_i in range(n_freq_use): 

177 nf_vis_use[freq_i] = np.sum( 

178 nf_vis[ 

179 freq_i 

180 * pyfhd_config["n_avg"] : (freq_i + 1) 

181 * pyfhd_config["n_avg"] 

182 ] 

183 ) 

184 

185 beam_squared_cube = np.zeros([n_freq_use, hpx_inds.size]) 

186 weights_cube = np.zeros([n_freq_use, hpx_inds.size]) 

187 variance_cube = np.zeros([n_freq_use, hpx_inds.size]) 

188 model_cube = np.zeros([n_freq_use, hpx_inds.size]) 

189 dirty_or_res_cube = np.zeros([n_freq_use, hpx_inds.size]) 

190 

191 for freq_i in range(n_freq_use): 

192 beam_squared_cube[freq_i, :] = healpix_cnv_apply( 

193 beam_arr[pol_i, freq_i] * nf_vis_use[freq_i], hpx_cnv 

194 ) 

195 weights_cube[freq_i, :] = healpix_cnv_apply( 

196 split["weights_arr"][freq_i, :, :], hpx_cnv 

197 ) 

198 variance_cube[freq_i, :] = healpix_cnv_apply( 

199 split["variance_arr"][freq_i, :, :], hpx_cnv 

200 ) 

201 model_cube[freq_i, :] = healpix_cnv_apply( 

202 split["model_arr"][freq_i, :, :], hpx_cnv 

203 ) 

204 dirty_or_res_cube[freq_i, :] = healpix_cnv_apply( 

205 split["residual_arr"][freq_i, :, :], hpx_cnv 

206 ) 

207 healpix_pol_dict = { 

208 "obs": split["obs"], 

209 "hpx_inds": hpx_inds, 

210 "n_avg": pyfhd_config["n_avg"], 

211 "beam_squared_cube": beam_squared_cube, 

212 "weights_cube": weights_cube, 

213 "variance_cube": variance_cube, 

214 "model_cube": model_cube, 

215 } 

216 if residual_flag: 

217 healpix_pol_dict["res_cube"] = dirty_or_res_cube 

218 elif dirty_flag: 

219 healpix_pol_dict["dirty_cube"] = dirty_or_res_cube 

220 save( 

221 healpix_dir 

222 / f"{pyfhd_config['obs_id']}_{cube_name[iter]}_{obs['pol_names'][pol_i]}.h5", 

223 healpix_pol_dict, 

224 f"{pyfhd_config['obs_id']}_{cube_name[iter]}_{obs['pol_names'][pol_i]}", 

225 logger=logger, 

226 )