Coverage for PyFHD/healpix/export.py: 0%
86 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 10:58 +0800
1import numpy as np
2from numpy.typing import NDArray
3from logging import Logger
4import h5py
5from pathlib import Path
6from PyFHD.io.pyfhd_io import save
7from PyFHD.data_setup.obs import update_obs
8from PyFHD.healpix.healpix_utils import (
9 healpix_cnv_generate,
10 healpix_cnv_apply,
11 beam_image_cube,
12 vis_model_freq_split,
13)
14from PyFHD.flagging.flagging import vis_flag_tiles
15from PyFHD.pyfhd_tools.pyfhd_utils import (
16 vis_weights_update,
17 split_vis_weights,
18 vis_noise_calc,
19)
22def healpix_snapshot_cube_generate(
23 obs: dict,
24 psf: dict | h5py.File,
25 cal: dict,
26 params: dict,
27 vis_arr: NDArray[np.complex128],
28 vis_model_arr: NDArray[np.complex128],
29 vis_weights: NDArray[np.float64],
30 pyfhd_config: dict,
31 logger: Logger,
32) -> None:
33 """
34 Generate and save HEALPix images per polarization and per frequency channel for
35 each of the gridded visibility products, creating calibrated data, model
36 data, sampling map, and variance map cubes. General settings split up the
37 outputs into even and odd interleaved time samples. Interpolation is
38 performed from the orthoslant projection to the HEALPix grid.
40 Parameters
41 ----------
42 obs : dict
43 Observation metadata dictionary.
44 psf : dict | h5py.File
45 Beam dictionary
46 cal : dict
47 Calibration dictionary
48 params : dict
49 Visibility metadata dictionary
50 vis_arr : NDArray[np.complex128]
51 The visibility array
52 vis_model_arr : NDArray[np.complex128]
53 The model visibility array
54 vis_weights : NDArray[np.float64]
55 The visibility weights
56 pyfhd_config : dict
57 PyFHD configuration settings
58 logger : Logger
59 PyFHD's Logger
60 """
62 if pyfhd_config["split_ps_export"]:
63 cube_name = ["hpx_even", "hpx_odd"]
64 else:
65 cube_name = ["healpix_cube"]
67 if pyfhd_config["ps_kbinsize"] is not None:
68 kbinsize = pyfhd_config["ps_kbinsize"]
69 elif pyfhd_config["ps_fov"] is not None:
70 # Does this get used in conjunction with the fov_use down
71 # below, if so, we're converting from Radians to Degrees twice?
72 kbinsize = (180 / np.pi) / pyfhd_config["ps_fov"]
73 else:
74 kbinsize = obs["kpix"]
76 fov_use = (180 / np.pi) / kbinsize
78 if pyfhd_config["ps_kspan"] is not None:
79 dimension_use = pyfhd_config["ps_kspan"] / kbinsize
80 elif pyfhd_config["ps_dimension"] is not None:
81 dimension_use = pyfhd_config["ps_dimension"]
82 elif pyfhd_config["ps_degpix"]:
83 dimension_use = fov_use / pyfhd_config["ps_degpix"]
84 else:
85 dimension_use = fov_use / obs["degpix"]
87 dimension_use = int(dimension_use)
89 if pyfhd_config["ps_nfreq_avg"] is None:
90 fbin_i = psf["fbin_i"]
91 if isinstance(psf, h5py.File):
92 fbin_i = fbin_i[:]
93 ps_nfreq_avg = np.round(obs["n_freq"] / np.max(fbin_i + 1))
94 else:
95 ps_nfreq_avg = pyfhd_config["ps_nfreq_avg"]
97 degpix_use = fov_use / dimension_use
98 pix_sky = (4 * np.pi * (180 / np.pi) ** 2) / degpix_use**2
99 # Below should = 1024 for 0.1119 degrees/pixel
100 nside = 2 ** (np.ceil(np.log(np.sqrt(pix_sky / 12)) / np.log(2)))
101 n_freq_use = int(np.floor(obs["n_freq"] / pyfhd_config["n_avg"]))
103 obs_out = update_obs(
104 obs, dimension_use, kbinsize, beam_nfreq_avg=ps_nfreq_avg, fov=fov_use
105 )
106 # To have a psf that has reacted to the new beam_nfreq_avg you have set that isn't
107 # the default, tell PyFHD to re-create the psf here once beam_setup has been translated
109 beam_arr, beam_mask = beam_image_cube(
110 obs_out,
111 psf,
112 logger,
113 square=True,
114 beam_threshold=pyfhd_config["ps_beam_threshold"],
115 n_freq_bin=n_freq_use,
116 )
118 hpx_radius = fov_use / np.sqrt(2)
120 hpx_cnv, obs_out = healpix_cnv_generate(
121 obs_out, beam_mask, hpx_radius, pyfhd_config, logger, nside=nside
122 )
123 hpx_inds = hpx_cnv["inds"]
125 if len(pyfhd_config["ps_tile_flag_list"]) > 0:
126 vis_weights = vis_flag_tiles(
127 obs_out, vis_weights, pyfhd_config["ps_tile_flag_list"], logger
128 )
130 vis_weights, obs_out = vis_weights_update(vis_weights, obs_out, psf, params)
132 if pyfhd_config["split_ps_export"]:
133 n_iter = 2
134 vis_weights_use, bi_use = split_vis_weights(obs_out, vis_weights)
135 obs_out["vis_noise"] = vis_noise_calc(
136 obs_out, vis_arr, vis_weights, bi_use=bi_use
137 )
138 uvf_name = ["even", "odd"]
139 else:
140 n_iter = 1
141 uvf_name = [""]
142 obs_out["vis_noise"] = vis_noise_calc(obs_out, vis_arr, vis_weights)
143 bi_use = np.zeros(1, dtype=np.int64)
145 # Looks like this is set to False by default?
146 residual_flag = obs_out["residual"]
147 # Since the model is imported by default, dirty_flag is usually True
148 dirty_flag = not residual_flag and vis_model_arr is not None
149 # Create the healpix dir Path
150 healpix_dir = Path(pyfhd_config["output_dir"], "healpix")
151 healpix_dir.mkdir(exist_ok=True)
152 for iter in range(n_iter):
153 # To save on memory, PyFHD moved to saving the healpix cubes in a polarization loop
154 # rather than trying to store the entirety of the everything in memory consuming over
155 # 100GB of memory
156 for pol_i in range(obs["n_pol"]):
157 split = vis_model_freq_split(
158 obs_out,
159 psf,
160 params,
161 vis_weights_use,
162 vis_model_arr,
163 vis_arr,
164 pol_i,
165 pyfhd_config,
166 logger,
167 uvf_name=uvf_name[iter],
168 bi_use=bi_use[iter],
169 )
170 if dirty_flag:
171 residual_flag = False
172 else:
173 residual_flag = True
174 nf_vis = split["obs"]["nf_vis"]
175 nf_vis_use = np.zeros(n_freq_use)
176 for freq_i in range(n_freq_use):
177 nf_vis_use[freq_i] = np.sum(
178 nf_vis[
179 freq_i
180 * pyfhd_config["n_avg"] : (freq_i + 1)
181 * pyfhd_config["n_avg"]
182 ]
183 )
185 beam_squared_cube = np.zeros([n_freq_use, hpx_inds.size])
186 weights_cube = np.zeros([n_freq_use, hpx_inds.size])
187 variance_cube = np.zeros([n_freq_use, hpx_inds.size])
188 model_cube = np.zeros([n_freq_use, hpx_inds.size])
189 dirty_or_res_cube = np.zeros([n_freq_use, hpx_inds.size])
191 for freq_i in range(n_freq_use):
192 beam_squared_cube[freq_i, :] = healpix_cnv_apply(
193 beam_arr[pol_i, freq_i] * nf_vis_use[freq_i], hpx_cnv
194 )
195 weights_cube[freq_i, :] = healpix_cnv_apply(
196 split["weights_arr"][freq_i, :, :], hpx_cnv
197 )
198 variance_cube[freq_i, :] = healpix_cnv_apply(
199 split["variance_arr"][freq_i, :, :], hpx_cnv
200 )
201 model_cube[freq_i, :] = healpix_cnv_apply(
202 split["model_arr"][freq_i, :, :], hpx_cnv
203 )
204 dirty_or_res_cube[freq_i, :] = healpix_cnv_apply(
205 split["residual_arr"][freq_i, :, :], hpx_cnv
206 )
207 healpix_pol_dict = {
208 "obs": split["obs"],
209 "hpx_inds": hpx_inds,
210 "n_avg": pyfhd_config["n_avg"],
211 "beam_squared_cube": beam_squared_cube,
212 "weights_cube": weights_cube,
213 "variance_cube": variance_cube,
214 "model_cube": model_cube,
215 }
216 if residual_flag:
217 healpix_pol_dict["res_cube"] = dirty_or_res_cube
218 elif dirty_flag:
219 healpix_pol_dict["dirty_cube"] = dirty_or_res_cube
220 save(
221 healpix_dir
222 / f"{pyfhd_config['obs_id']}_{cube_name[iter]}_{obs['pol_names'][pol_i]}.h5",
223 healpix_pol_dict,
224 f"{pyfhd_config['obs_id']}_{cube_name[iter]}_{obs['pol_names'][pol_i]}",
225 logger=logger,
226 )