Source code for pyfhd.beam_setup.beam_utils

import numpy as np
from astropy.constants import c
from pyuvdata import BeamInterface, UVBeam
from pyuvdata.analytic_beam import AnalyticBeam
from pyfhd.pyfhd_tools.pyfhd_utils import histogram, region_grow
import h5py
from numpy.typing import NDArray
from scipy.ndimage import map_coordinates


[docs] def gaussian_decomp( x: np.ndarray, y: np.ndarray, p: np.ndarray, ftransform: bool = False, model_npix: float | None = None, model_res: float | None = None, ) -> tuple[np.ndarray, float, float]: """ Create an analytically built Gaussian decomposition of the beam on the supplied x-y grid given the input Gaussian parameters. If ftransform is True, then the analytic Fourier Transform of the input gaussians on the supplied x-y grid is returned. Any number of Gaussians can be supplied in the p vector. To transfer Gaussian parameters from a different grid to the current x-y grid, the model_npix and model_res parameters can be supplied. Parameters ---------- x : np.ndarray X-axis vector of pixel numbers y : np.ndarray Y-axis vector of pixel numbers p : np.ndarray Gaussian parameter vector, ordered as amp, offset x, sigma x, offset y, sigma y per lobe ftransform : bool, optional Return the analytic Fourier Transform of the input gaussians on the supplied x-y grid, by default False model_npix : float | None, optional The number of pixels on an axis used to derive the input parameters to convert to the current x-y grid, by default None model_res : float | None, optional The grid resolution used to derive the input parameters to convert to the current grid resolution, by default None Returns ------- tuple[np.ndarray, float, float] The Gaussian decomposition of the beam on the supplied x-y grid given the input Gaussian parameters, along with the analytic volume and squared volume of the Gaussian decomposition. """ decomp_beam = np.zeros([x.size, y.size]) i = 1j # Expand the p vector into readable names var = np.reshape(p, [p.size // 5, 5]) amp = var[:, 0] offset_x = var[:, 1] sigma_x = var[:, 2] offset_y = var[:, 3] sigma_y = var[:, 4] n_lobes = var[:, 0].size # If the parameters were built on a different grid, then put on new grid # Npix only affects the offset params if model_npix is not None: if model_npix < x.size: offset = np.abs(x.size / 2 - model_npix / 2) offset_x += offset offset_y += offset else: offset = np.abs(model_npix / 2 - x.size / 2) offset_x -= offset offset_y -= offset # Resolution affects gaussian sigma and offsets if model_res is not None: sigma_x *= model_res sigma_y *= model_res offset_x = ((offset_x - x.size / 2) * model_res) + x.size / 2 offset_y = ((offset_y - y.size / 2) * model_res) + y.size / 2 if not ftransform: for lobe in range(n_lobes): decomp_beam += amp[lobe] * np.outer( np.exp(-((y - offset_y[lobe]) ** 2) / (2 * sigma_y[lobe] ** 2)), np.exp(-((x - offset_x[lobe]) ** 2) / (2 * sigma_x[lobe] ** 2)), ) volume_beam = 0 sq_volume_beam = 0 else: # Full uv model with all the gaussian components decomp_beam = decomp_beam.astype(np.complex128) volume_beam = np.sum(amp) sq_volume_beam = np.pi * np.sum(sigma_x * sigma_y * amp**2) / (x.size * y.size) offset_x -= x.size / 2 offset_y -= y.size / 2 kx = np.outer(np.arange(x.size) - x.size / 2, np.ones(y.size)) ky = np.outer(np.ones(x.size), np.arange(y.size) - y.size / 2) decomp_beam += ( amp**2 * np.pi / (x.size * y.size) * sigma_x * sigma_y * np.exp( ( 2 * np.pi**2 / (x.size * y.size) * sigma_x**2 * kx**2 + sigma_y**2 * ky**2 ) - (2 * np.pi / x.size * 1j * (offset_x * kx + offset_y * ky)) ) ) return decomp_beam, volume_beam, sq_volume_beam
[docs] def beam_image( psf: dict | h5py.File, obs: dict, pol_i: int, freq_i: int | None = None, abs=False, square=False, ) -> np.ndarray: """ Generates the average beam in image space for one polarization over all frequencies, or optionally for one frequency. The UV->sky transformation uses the inverse FFT for the beam, but the forward FFT for the image. This convention ensures the correct orientation of the UV-space beam for gridding visibilities. If the psf dictionary has Gaussian parameters, then the Gaussian decomposition is used to generate the analytic beam image. Parameters ---------- psf : dict Beam dictionary obs : dict Observation metadata dictionary pol_i : int Index of the polarization to use freq_i : int Index of the frequency to use, by default None abs : bool, optional Return the absolute value of the beam image, by default False square : bool, optional Return the square of the beam image, by default False Returns ------- beam_base : np.ndarray The average beam in image space for the specified polarization and all frequencies, or for a specific frequency if freq_i is set. """ psf_dim = psf["dim"] if "freq_norm" in psf: freq_norm = psf["freq_norm"] elif "fnorm" in psf: # handling for older files or imports from IDL FHD freq_norm = psf["fnorm"] pix_horizon = psf["pix_horizon"] group_id = psf["id"][pol_i, 0, :] if "beam_gaussian_params" in psf: beam_gaussian_params = psf["beam_gaussian_params"][:] else: beam_gaussian_params = None rbin = 0 # If we lazy loaded psf, get actual numbers out of the datasets if isinstance(psf, h5py.File): psf_dim = psf_dim[0] freq_norm = freq_norm[:] pix_horizon = pix_horizon[0] dimension = elements = obs["dimension"] # these should all be integers b/c dimensions are usually even numbers. # but they have to be cast to ints to be used in slicing. xl = int(dimension / 2 - psf_dim / 2 + 1) xh = int(dimension / 2 - psf_dim / 2 + psf_dim) yl = int(elements / 2 - psf_dim / 2 + 1) yh = int(elements / 2 - psf_dim / 2 + psf_dim) group_n, _, ri_id = histogram(group_id, min=0) gi_use = np.nonzero(group_n) # Most likely going to be 1 as pyfhd does only one beam mostly n_groups = np.count_nonzero(group_n) if beam_gaussian_params is not None: # 1.3 is the padding factor for the gaussian fitting procedure # (2.*obs.kpix) is the ratio of full sky (2 in l,m) to the analysis range (1/obs.kpix) # (2.*obs.kpix*dimension/psf.pix_horizon) is the scale factor between the psf pixels-to-horizon and the # analysis pixels-to-horizon # (0.5/obs.kpix) is the resolution scaling of what the beam model was made at and the current res model_npix = pix_horizon * 1.3 model_res = (2 * obs["kpix"] * dimension) / pix_horizon * (0.5 / obs["kpix"]) freq_bin_i = obs["baseline_info"]["fbin_i"] freq_i_use = np.nonzero(obs["baseline_info"]["freq_use"])[0] n_bin_use = 0 # We assume freq_i is an int when provided (i.e. a single frequency index) if freq_i is not None: freq_i_use = freq_i if square: # Do note freq_i_use could be an integer or an array if freq_i is supplied or not beam_base = np.zeros([dimension, elements]) freq_bin_use = freq_bin_i[freq_i_use] fbin_use = np.sort(np.unique(freq_bin_use)) nbin = fbin_use.size if beam_gaussian_params is not None: beam_single = np.zeros([dimension, elements]) else: beam_single = np.zeros([psf_dim, psf_dim], dtype=np.complex128) for bin_i in range(nbin): fbin = fbin_use[bin_i] nf_bin = np.count_nonzero(freq_bin_use == fbin) if beam_gaussian_params is not None: for gi in range(n_groups): # beam_gaussian_params needs to be copied here rather than # a view as interestingly gaussian_decomp affects the values # of the array used with the calculations done to var and no copies # are made, only views are adjusted. so we explcitly call copy params = beam_gaussian_params[pol_i, fbin, :].copy() gaussian = gaussian_decomp( np.arange(dimension), np.arange(elements), params, model_npix=model_npix, model_res=model_res, )[0] beam_single += gaussian * group_n[gi_use[gi]] beam_single /= np.sum(group_n[gi_use]) beam_base += nf_bin * beam_single**2 else: for gi in range(n_groups): beam_single += ( psf["beam_ptr"][0, fbin, rbin, rbin] * group_n[gi_use[gi]] ).reshape([psf_dim, psf_dim]) beam_single /= np.sum(group_n[gi_use]) if abs: beam_single = np.abs(beam_single) beam_base_uv1 = np.zeros([dimension, elements], np.complex128) beam_base_uv1[xl : xh + 1, yl : yh + 1] = beam_single beam_base_single = np.fft.fftshift( np.fft.fftn(np.fft.fftshift(beam_base_uv1)) ) beam_base += ( nf_bin * (beam_base_single * np.conjugate(beam_base_single)).real ) n_bin_use += nf_bin * freq_norm[fbin] else: nf_use = freq_i_use.size if beam_gaussian_params is not None: beam_base_uv = np.zeros([dimension, elements]) beam_single = np.zeros([dimension, elements]) else: beam_base_uv = np.zeros([psf_dim, psf_dim], dtype=np.complex128) beam_single = np.zeros([psf_dim, psf_dim], dtype=np.complex128) for f_idx in range(nf_use): fi = freq_i_use[f_idx] if freq_i is not None: if freq_i != fi: continue fbin = freq_bin_i[fi] beam_single[:, :] = 0 if beam_gaussian_params is not None: for gi in range(n_groups): params = beam_gaussian_params[pol_i, fbin, :].copy() gaussian = gaussian_decomp( np.arange(dimension), np.arange(elements), params, model_npix=model_npix, model_res=model_res, )[0] beam_single += gaussian * group_n[gi_use[gi]] else: for gi in range(n_groups): beam_single += ( psf["beam_ptr"][0, fbin, rbin, rbin] * group_n[gi_use[gi]] ).reshape([psf_dim, psf_dim]) beam_single /= np.sum(group_n[gi_use]) beam_base_uv += beam_single n_bin_use += freq_norm[fbin] if beam_gaussian_params is None: beam_base_uv1 = np.zeros([dimension, elements], dtype=np.complex128) beam_base_uv1[xl : xh + 1, yl : yh + 1] = beam_base_uv beam_base = np.fft.fftshift(np.fft.fftn(np.fft.fftshift(beam_base_uv1))) else: beam_base = beam_base_uv beam_base /= n_bin_use return beam_base.real
[docs] def beam_image_hyperresolved( antenna: dict, beam: UVBeam | AnalyticBeam, ant_pol_1: int, ant_pol_2: int, freq_i: int, zen_int_x: float, zen_int_y: float, psf: dict, ) -> NDArray[np.complexfloating]: """ Build the hyperresolved image-space beam power for a station/tile. This is computed as the product of the F-matrices for the two antennas. The Jones matrices are decomposed into F and K, where F gives the complex sensitivity of each instrumental polarization to unpolarized emission (so it only has an instrumental polarization index, the phase is related to time delays which can vary spatially) and K is the projection from celestial polarization vector components (orthogonal on the sky) to instrumental polarization vector components (often non-orthogonal on the sky) Parameters ---------- antenna : dict Antenna metadata dictionary beam : UVBeam | AnalyticBeam UVBeam or AnalyticBeam object containing the beam model and metadata. ant_pol_1 : int Polarization index for the first antenna ant_pol_2 : int Polarization index for the second antenna freq_i : int Frequency index for current iteration zen_int_x : float x pixel index of the zenith for the beam power image zen_int_y : float y pixel index of the zenith for the beam power image psf : dict Beam metadata dictionary Returns ------- NDArray[np.complexfloating] An estimation of the image-space beam power, normalized to the zenith power. """ # Create one full-scale array image_power_beam = np.zeros( [psf["image_dim"], psf["image_dim"]], dtype=np.complex128 ) # FHD was designed to account for multiple antennas but in most cases only one was ever used # So we will just use the first antenna twice as I pyfhd does not support multiple antennas at this time, # If you want to use multiple antennas, please open an issue on the pyfhd GitHub repository or do the translation and/or # adjustments yourself. # baseline response (power beam) is product of the "two" antenna responses image_power_beam.flat[antenna["pix_use"]] = ( antenna["aligned_response"][ant_pol_1, freq_i] * np.conjugate(antenna["aligned_response"][ant_pol_2, freq_i]) ).flatten() # TODO: Work out the interpolation of the zenith power, it uses cubic interpolation # But the IDL Interpolate function in IDL uses an interpolation paramter of -0.5, where # scipy, numpy with their B-Splines seem to use a parameter of 0 by default with no way # to change it. # The interp is a placeholder for now, but it should be replaced with a proper # interpolation function that matches the IDL Interpolate function. # TODO: Replace with UVBeam interface object as_power_beam, then # compute_response at za 0 az 0 # Initial trying out of using pyuvdata, not close at all. This is interpolating # the zenith power using the x and y pixel coordinates, to use pyuvdata likely need to do # pixel to ra/dec then to za/az # Use order=1 for bilinear interpolation (matches IDL parameter -0.5) # mode='nearest' handles out-of-bounds by using nearest edge values # Interpolate the abs to get the abs power at zenith, which I think is what we want here. power_zenith = map_coordinates( np.abs(image_power_beam), [[zen_int_x], [zen_int_y]], order=1, mode="nearest" )[0] # Normalize the image power beam to the zenith image_power_beam.flat[antenna["pix_use"]] = ( image_power_beam.flat[antenna["pix_use"]] / power_zenith ) return image_power_beam
[docs] def beam_power( antenna: dict, beam: UVBeam | AnalyticBeam, ant_pol_1: int, ant_pol_2: int, freq_i: int, psf: dict, zen_int_x: float, zen_int_y: float, xvals_uv_superres: np.ndarray, yvals_uv_superres: np.ndarray, pyfhd_config: dict, ) -> NDArray[np.complexfloating]: """ Generate the hyperresolved image-space beam power to reduce aliasing artifacts, and fourier transform it to a specific grid in complex uv-space. Reduce artifacts further by applying an extremely low-level contiguous mask to the uv-space beam power and renomalizing the beam power to a volume of 1. Parameters ---------- antenna : dict Antenna metadata dictionary beam : UVBeam | AnalyticBeam UVBeam or AnalyticBeam object containing the beam model and metadata. ant_pol_1 : int Polarization index for the first antenna ant_pol_2 : int Polarization index for the second antenna freq_i : int Frequency index for current iteration psf : dict Beam metadata dictionary zen_int_x : np.ndarray x pixel index of the zenith for the beam power image zen_int_y : np.ndarray y pixel index of the zenith for the beam power image xvals_uv_superres : np.ndarray A grid of the hyperresolved indices in the u direction for the uv-space beam power image yvals_uv_superres : np.ndarray A grid of the hyperresolved indices in the v direction for the uv-space beam power image pyfhd_config : dict The pyfhd configuration dictionary Returns ------- NDArray[np.complexfloating] Hyperresolved uv-space of the beam power image, normalized to a volume of 1 """ # For now we will ignore beam_gaussian_decomp and much of the debug keywords image_power_beam = beam_image_hyperresolved( antenna, beam, ant_pol_1, ant_pol_2, freq_i, zen_int_x, zen_int_y, psf, ) if pyfhd_config.get("kernel_window", False): image_power_beam *= antenna["pix_window"] psf_base_single = np.fft.fftshift(np.fft.ifftn(np.fft.fftshift(image_power_beam))) # TODO: Same cubic problem as in beam_image_hyperresolved here # Map Coordinates isn't the same as IDL Interpolate # But its closish, more of a placeholder for now. psf_base_superres = map_coordinates( psf_base_single, (xvals_uv_superres, yvals_uv_superres) ) # Build a mask to create a well-defined finite beam uv_mask_superres = np.zeros(psf_base_superres.shape, dtype=np.float64) psf_mask_threshold_use = ( np.max(np.abs(psf_base_superres)) / pyfhd_config["beam_mask_threshold"] ) beam_i = region_grow( np.abs(psf_base_superres), int(psf["superres_dim"] * (1 + psf["superres_dim"]) / 2), low=psf_mask_threshold_use, high=np.max(np.abs(psf_base_superres)), ) uv_mask_superres.flat[beam_i] = 1 # FFT normalization correction in case this changes the total number of pixels psf_base_superres *= psf["intermediate_res"] ** 2 """ total of the gaussian decomposition can be calculated analytically, but is an over-estimate of the numerical representation and results in a beam norm of greater than one, thus the discrete total is used """ psf_val_ref = np.sum(psf_base_superres) # If you wish to add interpolate_beam_threshold functionality then do so here psf_base_superres *= uv_mask_superres if pyfhd_config["beam_clip_floor"]: i_use = np.where(np.abs(psf_base_superres)) psf_amp = np.abs(psf_base_superres) psf_phase = np.angle(psf_base_superres) psf_floor = psf_mask_threshold_use * (psf["intermediate_res"] ** 2) psf_amp[i_use] -= psf_floor psf_base_superres = psf_amp * np.cos(psf_phase) + 1j * psf_amp * np.sin( psf_phase ) psf_base_superres *= psf_val_ref / np.sum(psf_base_superres) return psf_base_superres