Source code for pyfhd.healpix.export

import numpy as np
from numpy.typing import NDArray
from logging import Logger
import h5py
from pathlib import Path
from pyfhd.io.pyfhd_io import save
from pyfhd.data_setup.obs import update_obs
from pyfhd.healpix.healpix_utils import (
    healpix_cnv_generate,
    healpix_cnv_apply,
    beam_image_cube,
    vis_model_freq_split,
)
from pyfhd.flagging.flagging import vis_flag_tiles
from pyfhd.pyfhd_tools.pyfhd_utils import (
    vis_weights_update,
    split_vis_weights,
    vis_noise_calc,
)


[docs] def healpix_snapshot_cube_generate( obs: dict, psf: dict | h5py.File, cal: dict, params: dict, vis_arr: NDArray[np.complex128], vis_model_arr: NDArray[np.complex128], vis_weights: NDArray[np.float64], pyfhd_config: dict, logger: Logger, ) -> None: """ Generate and save HEALPix images per polarization and per frequency channel for each of the gridded visibility products, creating calibrated data, model data, sampling map, and variance map cubes. General settings split up the outputs into even and odd interleaved time samples. Interpolation is performed from the orthoslant projection to the HEALPix grid. Parameters ---------- obs : dict Observation metadata dictionary. psf : dict | h5py.File Beam dictionary cal : dict Calibration dictionary params : dict Visibility metadata dictionary vis_arr : NDArray[np.complex128] The visibility array vis_model_arr : NDArray[np.complex128] The model visibility array vis_weights : NDArray[np.float64] The visibility weights pyfhd_config : dict pyfhd configuration settings logger : Logger pyfhd's Logger """ if pyfhd_config["split_ps_export"]: cube_name = ["hpx_even", "hpx_odd"] else: cube_name = ["healpix_cube"] if pyfhd_config["ps_kbinsize"] is not None: kbinsize = pyfhd_config["ps_kbinsize"] elif pyfhd_config["ps_fov"] is not None: # Does this get used in conjunction with the fov_use down # below, if so, we're converting from Radians to Degrees twice? kbinsize = (180 / np.pi) / pyfhd_config["ps_fov"] else: kbinsize = obs["kpix"] fov_use = (180 / np.pi) / kbinsize if pyfhd_config["ps_kspan"] is not None: dimension_use = pyfhd_config["ps_kspan"] / kbinsize elif pyfhd_config["ps_dimension"] is not None: dimension_use = pyfhd_config["ps_dimension"] elif pyfhd_config["ps_degpix"]: dimension_use = fov_use / pyfhd_config["ps_degpix"] else: dimension_use = fov_use / obs["degpix"] dimension_use = int(dimension_use) if pyfhd_config["ps_nfreq_avg"] is None: fbin_i = psf["fbin_i"] if isinstance(psf, h5py.File): fbin_i = fbin_i[:] ps_nfreq_avg = np.round(obs["n_freq"] / np.max(fbin_i + 1)) else: ps_nfreq_avg = pyfhd_config["ps_nfreq_avg"] degpix_use = fov_use / dimension_use pix_sky = (4 * np.pi * (180 / np.pi) ** 2) / degpix_use**2 # Below should = 1024 for 0.1119 degrees/pixel nside = 2 ** (np.ceil(np.log(np.sqrt(pix_sky / 12)) / np.log(2))) n_freq_use = int(np.floor(obs["n_freq"] / pyfhd_config["n_avg"])) obs_out = update_obs( obs, dimension_use, kbinsize, beam_nfreq_avg=ps_nfreq_avg, fov=fov_use ) # To have a psf that has reacted to the new beam_nfreq_avg you have set that isn't # the default, tell pyfhd to re-create the psf here once beam_setup has been translated beam_arr, beam_mask = beam_image_cube( obs_out, psf, logger, square=True, beam_threshold=pyfhd_config["ps_beam_threshold"], n_freq_bin=n_freq_use, ) hpx_radius = fov_use / np.sqrt(2) hpx_cnv, obs_out = healpix_cnv_generate( obs_out, beam_mask, hpx_radius, pyfhd_config, logger, nside=nside ) hpx_inds = hpx_cnv["inds"] if len(pyfhd_config["ps_tile_flag_list"]) > 0: vis_weights = vis_flag_tiles( obs_out, vis_weights, pyfhd_config["ps_tile_flag_list"], logger ) vis_weights, obs_out = vis_weights_update(vis_weights, obs_out, psf, params) if pyfhd_config["split_ps_export"]: n_iter = 2 vis_weights_use, bi_use = split_vis_weights(obs_out, vis_weights) obs_out["vis_noise"] = vis_noise_calc( obs_out, vis_arr, vis_weights, bi_use=bi_use ) uvf_name = ["even", "odd"] else: n_iter = 1 uvf_name = [""] obs_out["vis_noise"] = vis_noise_calc(obs_out, vis_arr, vis_weights) nb = vis_weights.shape[-1] bi_use = (np.arange(nb, dtype=np.int64),) vis_weights_use = vis_weights # Looks like this is set to False by default? residual_flag = obs_out["residual"] # Since the model is imported by default, dirty_flag is usually True dirty_flag = not residual_flag and vis_model_arr is not None # Create the healpix dir Path healpix_dir = Path(pyfhd_config["output_dir"], "healpix") healpix_dir.mkdir(exist_ok=True) for iter in range(n_iter): # To save on memory, pyfhd moved to saving the healpix cubes in a polarization loop # rather than trying to store the entirety of the everything in memory consuming over # 100GB of memory for pol_i in range(obs["n_pol"]): split = vis_model_freq_split( obs_out, psf, params, vis_weights_use, vis_model_arr, vis_arr, pol_i, pyfhd_config, logger, uvf_name=uvf_name[iter], bi_use=bi_use[iter], ) if dirty_flag: residual_flag = False else: residual_flag = True nf_vis = split["obs"]["nf_vis"] nf_vis_use = np.zeros(n_freq_use) for freq_i in range(n_freq_use): nf_vis_use[freq_i] = np.sum( nf_vis[ freq_i * pyfhd_config["n_avg"] : (freq_i + 1) * pyfhd_config["n_avg"] ] ) beam_squared_cube = np.zeros([n_freq_use, hpx_inds.size]) weights_cube = np.zeros([n_freq_use, hpx_inds.size]) variance_cube = np.zeros([n_freq_use, hpx_inds.size]) model_cube = np.zeros([n_freq_use, hpx_inds.size]) dirty_or_res_cube = np.zeros([n_freq_use, hpx_inds.size]) for freq_i in range(n_freq_use): beam_squared_cube[freq_i, :] = healpix_cnv_apply( beam_arr[pol_i, freq_i] * nf_vis_use[freq_i], hpx_cnv ) weights_cube[freq_i, :] = healpix_cnv_apply( split["weights_arr"][freq_i, :, :], hpx_cnv ) variance_cube[freq_i, :] = healpix_cnv_apply( split["variance_arr"][freq_i, :, :], hpx_cnv ) model_cube[freq_i, :] = healpix_cnv_apply( split["model_arr"][freq_i, :, :], hpx_cnv ) dirty_or_res_cube[freq_i, :] = healpix_cnv_apply( split["residual_arr"][freq_i, :, :], hpx_cnv ) healpix_pol_dict = { "obs": split["obs"], "hpx_inds": hpx_inds, "nside": nside, "n_avg": pyfhd_config["n_avg"], "beam_squared_cube": beam_squared_cube, "weights_cube": weights_cube, "variance_cube": variance_cube, "model_cube": model_cube, } if residual_flag: healpix_pol_dict["res_cube"] = dirty_or_res_cube elif dirty_flag: healpix_pol_dict["dirty_cube"] = dirty_or_res_cube save( healpix_dir / f"{pyfhd_config['obs_id']}_{cube_name[iter]}_{obs['pol_names'][pol_i]}.h5", healpix_pol_dict, f"{pyfhd_config['obs_id']}_{cube_name[iter]}_{obs['pol_names'][pol_i]}", logger=logger, )