import numpy as np
from astropy.constants import c
from pyuvdata import BeamInterface, UVBeam
from pyuvdata.analytic_beam import AnalyticBeam
from PyFHD.pyfhd_tools.pyfhd_utils import histogram, region_grow
import h5py
from numpy.typing import NDArray
from scipy.ndimage import map_coordinates
[docs]
def gaussian_decomp(
x: np.ndarray,
y: np.ndarray,
p: np.ndarray,
ftransform: bool = False,
model_npix: float | None = None,
model_res: float | None = None,
) -> tuple[np.ndarray, float, float]:
"""
Create an analytically built Gaussian decomposition of the beam on
the supplied x-y grid given the input Gaussian parameters. If
ftransform is True, then the analytic Fourier Transform of the input
gaussians on the supplied x-y grid is returned. Any number of Gaussians
can be supplied in the p vector. To transfer Gaussian parameters from
a different grid to the current x-y grid, the model_npix and model_res
parameters can be supplied.
Parameters
----------
x : np.ndarray
X-axis vector of pixel numbers
y : np.ndarray
Y-axis vector of pixel numbers
p : np.ndarray
Gaussian parameter vector, ordered as amp, offset x, sigma x, offset y,
sigma y per lobe
ftransform : bool, optional
Return the analytic Fourier Transform of the input gaussians on the supplied
x-y grid, by default False
model_npix : float | None, optional
The number of pixels on an axis used to derive the input parameters to
convert to the current x-y grid, by default None
model_res : float | None, optional
The grid resolution used to derive the input parameters to convert to
the current grid resolution, by default None
Returns
-------
tuple[np.ndarray, float, float]
The Gaussian decomposition of the beam on the supplied x-y grid given the input
Gaussian parameters, along with the analytic volume and squared volume of the
Gaussian decomposition.
"""
decomp_beam = np.zeros([x.size, y.size])
i = 1j
# Expand the p vector into readable names
var = np.reshape(p, [p.size // 5, 5])
amp = var[:, 0]
offset_x = var[:, 1]
sigma_x = var[:, 2]
offset_y = var[:, 3]
sigma_y = var[:, 4]
n_lobes = var[:, 0].size
# If the parameters were built on a different grid, then put on new grid
# Npix only affects the offset params
if model_npix is not None:
if model_npix < x.size:
offset = np.abs(x.size / 2 - model_npix / 2)
offset_x += offset
offset_y += offset
else:
offset = np.abs(model_npix / 2 - x.size / 2)
offset_x -= offset
offset_y -= offset
# Resolution affects gaussian sigma and offsets
if model_res is not None:
sigma_x *= model_res
sigma_y *= model_res
offset_x = ((offset_x - x.size / 2) * model_res) + x.size / 2
offset_y = ((offset_y - y.size / 2) * model_res) + y.size / 2
if not ftransform:
for lobe in range(n_lobes):
decomp_beam += amp[lobe] * np.outer(
np.exp(-((y - offset_y[lobe]) ** 2) / (2 * sigma_y[lobe] ** 2)),
np.exp(-((x - offset_x[lobe]) ** 2) / (2 * sigma_x[lobe] ** 2)),
)
volume_beam = 0
sq_volume_beam = 0
else:
# Full uv model with all the gaussian components
decomp_beam = decomp_beam.astype(np.complex128)
volume_beam = np.sum(amp)
sq_volume_beam = np.pi * np.sum(sigma_x * sigma_y * amp**2) / (x.size * y.size)
offset_x -= x.size / 2
offset_y -= y.size / 2
kx = np.outer(np.arange(x.size) - x.size / 2, np.ones(y.size))
ky = np.outer(np.ones(x.size), np.arange(y.size) - y.size / 2)
decomp_beam += (
amp**2
* np.pi
/ (x.size * y.size)
* sigma_x
* sigma_y
* np.exp(
(
2 * np.pi**2 / (x.size * y.size) * sigma_x**2 * kx**2
+ sigma_y**2 * ky**2
)
- (2 * np.pi / x.size * 1j * (offset_x * kx + offset_y * ky))
)
)
return decomp_beam, volume_beam, sq_volume_beam
[docs]
def beam_image(
psf: dict | h5py.File,
obs: dict,
pol_i: int,
freq_i: int | None = None,
abs=False,
square=False,
) -> np.ndarray:
"""
Generates the average beam in image space for one polarization over all
frequencies, or optionally for one frequency. The UV->sky transformation
uses the inverse FFT for the beam, but the forward FFT for the image.
This convention ensures the correct orientation of the UV-space beam
for gridding visibilities. If the psf dictionary has Gaussian parameters,
then the Gaussian decomposition is used to generate the analytic beam image.
Parameters
----------
psf : dict
Beam dictionary
obs : dict
Observation metadata dictionary
pol_i : int
Index of the polarization to use
freq_i : int
Index of the frequency to use, by default None
abs : bool, optional
Return the absolute value of the beam image, by default False
square : bool, optional
Return the square of the beam image, by default False
Returns
-------
beam_base : np.ndarray
The average beam in image space for the specified polarization
and all frequencies, or for a specific frequency if freq_i is set.
"""
psf_dim = psf["dim"]
freq_norm = psf["fnorm"]
pix_horizon = psf["pix_horizon"]
group_id = psf["id"][pol_i, 0, :]
if "beam_gaussian_params" in psf:
beam_gaussian_params = psf["beam_gaussian_params"][:]
else:
beam_gaussian_params = None
rbin = 0
# If we lazy loaded psf, get actual numbers out of the datasets
if isinstance(psf, h5py.File):
psf_dim = psf_dim[0]
freq_norm = freq_norm[:]
pix_horizon = pix_horizon[0]
dimension = elements = obs["dimension"]
xl = dimension / 2 - psf_dim / 2 + 1
xh = dimension / 2 - psf_dim / 2 + psf_dim
yl = elements / 2 - psf_dim / 2 + 1
yh = elements / 2 - psf_dim / 2 + psf_dim
group_n, _, ri_id = histogram(group_id, min=0)
gi_use = np.nonzero(group_n)
# Most likely going to be 1 as PyFHD does only one beam mostly
n_groups = np.count_nonzero(group_n)
if beam_gaussian_params is not None:
# 1.3 is the padding factor for the gaussian fitting procedure
# (2.*obs.kpix) is the ratio of full sky (2 in l,m) to the analysis range (1/obs.kpix)
# (2.*obs.kpix*dimension/psf.pix_horizon) is the scale factor between the psf pixels-to-horizon and the
# analysis pixels-to-horizon
# (0.5/obs.kpix) is the resolution scaling of what the beam model was made at and the current res
model_npix = pix_horizon * 1.3
model_res = (2 * obs["kpix"] * dimension) / pix_horizon * (0.5 / obs["kpix"])
freq_bin_i = obs["baseline_info"]["fbin_i"]
freq_i_use = np.nonzero(obs["baseline_info"]["freq_use"])[0]
n_bin_use = 0
# We assume freq_i is an int when provided (i.e. a single frequency index)
if freq_i is not None:
freq_i_use = freq_i
if square:
# Do note freq_i_use could be an integer or an array if freq_i is supplied or not
beam_base = np.zeros([dimension, elements])
freq_bin_use = freq_bin_i[freq_i_use]
fbin_use = np.sort(np.unique(freq_bin_use))
nbin = fbin_use.size
if beam_gaussian_params is not None:
beam_single = np.zeros([dimension, elements])
else:
beam_single = np.zeros([psf_dim, psf_dim], dtype=np.complex128)
for bin_i in range(nbin):
fbin = fbin_use[bin_i]
nf_bin = np.count_nonzero(freq_bin_use == fbin)
if beam_gaussian_params is not None:
for gi in range(n_groups):
# beam_gaussian_params needs to be copied here rather than
# a view as interestingly gaussian_decomp affects the values
# of the array used with the calculations done to var and no copies
# are made, only views are adjusted. so we explcitly call copy
params = beam_gaussian_params[pol_i, fbin, :].copy()
gaussian = gaussian_decomp(
np.arange(dimension),
np.arange(elements),
params,
model_npix=model_npix,
model_res=model_res,
)[0]
beam_single += gaussian * group_n[gi_use[gi]]
beam_single /= np.sum(group_n[gi_use])
beam_base += nf_bin * beam_single**2
else:
for gi in range(n_groups):
beam_single += (
psf["beam_ptr"][0, fbin, rbin, rbin] * group_n[gi_use[gi]]
).reshape([psf_dim, psf_dim])
beam_single /= np.sum(group_n[gi_use])
if abs:
beam_single = np.abs(beam_single)
beam_base_uv1 = np.zeros([dimension, elements], np.complex128)
beam_base_uv1[xl : xh + 1, yl : yh + 1] = beam_single
beam_base_single = np.fft.fftshift(
np.fft.ifftn(np.fft.fftshift(beam_base_uv1))
)
beam_base += (
nf_bin * (beam_base_single * np.conjugate(beam_base_single)).real
)
n_bin_use += nf_bin * freq_norm[fbin]
else:
nf_use = freq_i_use.size
if beam_gaussian_params is not None:
beam_base_uv = np.zeros([dimension, elements])
beam_single = np.zeros([dimension, elements])
else:
beam_base_uv = np.zeros([psf_dim, psf_dim], dtype=np.complex128)
beam_single = np.zeros([psf_dim, psf_dim], dtype=np.complex128)
for f_idx in range(nf_use):
fi = freq_i_use[f_idx]
if freq_i is not None:
if freq_i != fi:
continue
fbin = freq_bin_i[fi]
beam_single[:, :] = 0
if beam_gaussian_params is not None:
for gi in range(n_groups):
params = beam_gaussian_params[pol_i, fbin, :].copy()
gaussian = gaussian_decomp(
np.arange(dimension),
np.arange(elements),
params,
model_npix=model_npix,
model_res=model_res,
)[0]
beam_single += gaussian * group_n[gi_use[gi]]
else:
for gi in range(n_groups):
beam_single += (
psf["beam_ptr"][0, fbin, rbin, rbin] * group_n[gi_use[gi]]
).reshape([psf_dim, psf_dim])
beam_single /= np.sum(group_n[gi_use])
beam_base_uv += beam_single
n_bin_use += freq_norm[fbin]
if beam_gaussian_params is None:
beam_base_uv1 = np.zeros([dimension, elements], dtype=np.complex128)
beam_base_uv1[xl : xh + 1, yl : yh + 1] = beam_base_uv
beam_base = np.fft.fftshift(np.fft.ifftn(np.fft.fftshift(beam_base_uv1)))
else:
beam_base = beam_base_uv
beam_base /= n_bin_use
return beam_base.real
[docs]
def beam_image_hyperresolved(
antenna: dict,
beam: UVBeam | AnalyticBeam,
ant_pol_1: int,
ant_pol_2: int,
freq_i: int,
zen_int_x: float,
zen_int_y: float,
psf: dict,
) -> NDArray[np.complexfloating]:
"""
Build the hyperresolved image-space beam power for a station/tile. Currently, this
function assumes that the amplitude of the jones matrix response between two antennas,
multiplied by the station/tile response, is the image beam power. This calculation
may be suject to change in the future.
Parameters
----------
antenna : dict
Antenna metadata dictionary
beam : UVBeam | AnalyticBeam
UVBeam or AnalyticBeam object containing the beam model and metadata.
ant_pol_1 : int
Polarization index for the first antenna
ant_pol_2 : int
Polarization index for the second antenna
freq_i : int
Frequency index for current iteration
zen_int_x : float
x pixel index of the zenith for the beam power image
zen_int_y : float
y pixel index of the zenith for the beam power image
psf : dict
Beam metadata dictionary
Returns
-------
NDArray[np.complexfloating]
An estimation of the image-space beam power, normalized to the zenith power.
"""
# FHD was designed to account for multiple antennas but in most cases only one was ever used
# So we will just use the first antenna twice as I PyFHD does not support multiple antennas at this time,
# If you want to use multiple antennas, please open an issue on the PyFHD GitHub repository or do the translation and/or
# adjustments yourself.
beam_ant_1 = antenna["response"][ant_pol_1, freq_i]
beam_ant_2 = np.conjugate(antenna["response"][ant_pol_2, freq_i])
# Amplitude of the response from "ant1" (again FHD takes more than one antenna)
# is Sqrt(|J1[0,pol1]|^2 + |J1[1,pol1]|^2)
amp_1 = (
np.abs(antenna["jones"][0, ant_pol_1]) ** 2
+ np.abs(antenna["jones"][1, ant_pol_1]) ** 2
)
# Amplitude of the response from "ant2" (again FHD takes more than one antenna)
# is Sqrt(|J2[0,pol2]|^2 + |J2[1,pol2]|^2)
amp_2 = (
np.abs(antenna["jones"][0, ant_pol_2]) ** 2
+ np.abs(antenna["jones"][1, ant_pol_2]) ** 2
)
# Amplitude of the baseline response is the product of the "two" antenna responses
power_zenith_beam = np.sqrt(amp_1 * amp_2)
# Create one full-scale array
image_power_beam = np.zeros([psf["dim"], psf["dim"]], dtype=np.complex128)
# Co-opt the array to calculate the power at zenith
image_power_beam[antenna["pix_use"]] = power_zenith_beam
# TODO: Work out the interpolation of the zenith power, it uses cubic interpolation
# But the IDL Interpolate function in IDL uses an interpolation paramter of -0.5, where
# scipy, numpy with their B-Splines seem to use a parameter of 0 by default with no way
# to change it.
# The interp is a placeholder for now, but it should be replaced with a proper
# interpolation function that matches the IDL Interpolate function.
# TODO: Replace with UVBeam interface object as_power_beam, then
# compute_response at za 0 az 0
# Initial trying out of using pyuvdata, not close at all. This is interpolating
# the zenith power using the x and y pixel coordinates, to use pyuvdata likely need to do
# pixel to ra/dec then to za/az
power_zenith = np.interp(zen_int_x, zen_int_y, image_power_beam)
# Normalize the image power beam to the zenith
image_power_beam[antenna["pix_use"]] = (
power_zenith_beam * beam_ant_1 * beam_ant_2
) / power_zenith
return image_power_beam
[docs]
def beam_power(
antenna: dict,
beam: UVBeam | AnalyticBeam,
ant_pol_1: int,
ant_pol_2: int,
freq_i: int,
psf: dict,
zen_int_x: float,
zen_int_y: float,
xvals_uv_superres: np.ndarray,
yvals_uv_superres: np.ndarray,
pyfhd_config: dict,
) -> NDArray[np.complexfloating]:
"""
Generate the hyperresolved image-space beam power to reduce aliasing artifacts, and
fourier transform it to a specific grid in complex uv-space. Reduce artifacts further
by applying an extremely low-level contiguous mask to the uv-space beam power and
renomalizing the beam power to a volume of 1.
Parameters
----------
antenna : dict
Antenna metadata dictionary
beam : UVBeam | AnalyticBeam
UVBeam or AnalyticBeam object containing the beam model and metadata.
ant_pol_1 : int
Polarization index for the first antenna
ant_pol_2 : int
Polarization index for the second antenna
freq_i : int
Frequency index for current iteration
psf : dict
Beam metadata dictionary
zen_int_x : np.ndarray
x pixel index of the zenith for the beam power image
zen_int_y : np.ndarray
y pixel index of the zenith for the beam power image
xvals_uv_superres : np.ndarray
A grid of the hyperresolved indices in the u direction for the uv-space beam power image
yvals_uv_superres : np.ndarray
A grid of the hyperresolved indices in the v direction for the uv-space beam power image
pyfhd_config : dict
The PyFHD configuration dictionary
Returns
-------
NDArray[np.complexfloating]
Hyperresolved uv-space of the beam power image, normalized to a volume of 1
"""
# For now we will ignore beam_gaussian_decomp and much of the debug keywords
image_power_beam = beam_image_hyperresolved(
antenna,
beam,
ant_pol_1,
ant_pol_2,
freq_i,
zen_int_x,
zen_int_y,
psf,
pyfhd_config,
)
if pyfhd_config["kernel_window"]:
image_power_beam *= antenna["pix_window"]
psf_base_single = np.fft.fftshift(np.fft.ifftn(np.fft.fftshift(image_power_beam)))
# TODO: Same cubic problem as in beam_image_hyperresolved here
# Map Coordinates isn't the same as IDL Interpolate
# But its closish, more of a placeholder for now.
psf_base_superres = map_coordinates(
psf_base_single, (xvals_uv_superres, yvals_uv_superres)
)
# Build a mask to create a well-defined finite beam
uv_mask_superres = np.zeros(
[[psf_base_superres.shape[0], psf_base_superres.shape[1]]], dtype=np.float64
)
psf_mask_threshold_use = (
np.max(np.abs(psf_base_superres)) / pyfhd_config["beam_mask_threshold"]
)
beam_i = region_grow(
np.abs(psf_base_superres),
psf["superres_dim"] * (1 + psf["superres_dim"]) / 2,
low=psf_mask_threshold_use,
high=np.max(np.abs(psf_base_superres)),
)
uv_mask_superres[beam_i] = 1
# FFT normalization correction in case this changes the total number of pixels
psf_base_superres *= psf["intermediate_res"] ** 2
"""
total of the gaussian decomposition can be calculated analytically, but is an over-estimate
of the numerical representation and results in a beam norm of greater than one,
thus the discrete total is used
"""
psf_val_ref = np.sum(psf_base_superres)
# If you wish to add interpolate_beam_threshold functionality then do so here
psf_base_superres *= uv_mask_superres
if pyfhd_config["beam_clip_floor"]:
i_use = np.where(np.abs(psf_base_superres))
psf_amp = np.abs(psf_base_superres)
psf_phase = np.arctan(psf_base_superres.imag / psf_base_superres.real)
psf_floor = psf_mask_threshold_use * (psf["intermediate_res"] ** 2)
psf_amp[i_use] -= psf_floor
psf_base_superres = psf_amp * np.cos(psf_phase) + 1j * psf_amp * np.sin(
psf_phase
)
psf_base_superres *= psf_val_ref / np.sum(psf_base_superres)
return psf_base_superres