import os
import numpy as np
import h5py
from logging import Logger
from pathlib import Path
from typing import Any
from numpy.typing import NDArray, DTypeLike
from scipy.io import readsav
[docs]
def dtype_picker(dtype: DTypeLike) -> type:
"""
Picks the double precision type for the given dtype for saving the hdf5 file to ensure everything
is saved without losing information.
Parameters
----------
dtype : type
The numpy dtype of an array
Returns
-------
type
The corresponding double precision type
"""
if np.issubdtype(dtype, np.integer):
return np.int64
elif np.issubdtype(dtype, np.floating):
return np.float64
elif np.issubdtype(dtype, np.complexfloating):
return np.complex128
else:
# Should never get here, this should throw an error
return None
@np.vectorize
def _is_complex(value: Any) -> bool:
"""
Finds if a value is complex, this works regardless of the array type
unlike np.iscomplex or np.iscomplexobj which can't handle object arrays.
This being vectorized also allows us to check this for any complex type,
whether it be the python complex type or a numpy complex type
Parameters
----------
value : Any
The value to check in a NumPy array
Returns
-------
bool
True if value is a complex, False otherwise
"""
return np.iscomplexobj(value)
@np.vectorize
def _is_string(value: Any) -> bool:
"""
Finds if a value is a string or not, works regardless of the array type.
There is no string check available for object arrays
Parameters
----------
value : Any
A value to check
Returns
-------
bool
True if value is a str, False otherwise
"""
return isinstance(value, str)
@np.vectorize
def _is_none(value: Any) -> bool:
"""
Checks for a none object and is vectorized to work across any numpy array
even if it's an object array.
Parameters
----------
value : Any
A value to be checked if None
Returns
-------
bool
True if value is None, otherwise False
"""
return value is None
@np.vectorize
def _decode_byte_arr(value: NDArray[np.byte]) -> str:
"""
Decodes a byte string into a string
Parameters
----------
value : NDArray[np.byte]
Value to decode
Returns
-------
str
The decoded value
"""
return value.decode()
[docs]
def save_dataset(
h5py_obj: h5py.File | h5py.Group,
key: str,
value: Any,
to_chunk: dict[str, dict],
variable_lengths: [str, DTypeLike],
logger: Logger | None,
) -> bool:
"""
A general function for saving a dataset inside a HDF5 File or Group. It's used exclusively for saving
a dictionary into a HDF5 file, hence why we take a `key` and `value` pair. The `to_chunk` parameter is
explained in the `save` function, please look there for explanation. In the case of finding a None object
an Empty Dataset is saved and the is_none is returned as True, so the attribute associated with the key
can also be set to True to indicate to PyFHD later that the value is meant to be None when reading in the
dataset again.
Parameters
----------
h5py_obj : h5py.File | h5py.Group
A h5py object that has access to the `create_dataset` and `create_group` methods
key : str
The key from the dictionary we're saving
value : Any
The value from the dictionary
to_chunk : dict[str, dict]
A dictionary where each key-value pair represents a key in the to_save dictionary, and the value is a dictionary
which should contain two key-value pairs, `shape` which should be the `shape` of the array and `chunk` which tells
hdf5 how to chunk the dataset when it's being read/written. If you're not sure how to `chunk` the dataset, set `chunk`
to True which enables h5py to guess the chunk size for you. By default {}
variable_lengths : dict[str, DTypeLike]
A dictionary where each key-value pair represents a key in the to_save dictionary, and the value is a dtype. This is
for special cases where you must save an array of variable length arrays. H5Py does support variable length arrays, but
you must use a special type, using the `h5py.vlen_dtype()` you can create a dtype which accepts object arrays of variable
lengths. For example if you wish to have variable integer array called `ija`, you would use `h5py.vlen_dtype(np.int64)`,
and save use it in the variable_lengths dictionary like so, `{'ija': h5py.vlen_dtype(np.int64)}`, which will set the dtype appropriately
during a `create_dataset` call. By default {}
logger : Logger | None
PyFHD's Logger
Returns
-------
is_none : bool
True if the value is None, False otherwise
See Also
--------
PyFHD.io.pyfhd_io.save : Save a HDF5 file
PyFHD.io.pyfhd_io.dict_to_group : Converts a dictionary to a h5py Group Object
"""
is_none = False
# Match the type
match value:
case dict():
group = h5py_obj.create_group(key)
# dict_to_group will be recursively called if there is another dict
# in this dict
dict_to_group(group, value, to_chunk, variable_lengths, logger)
case np.ndarray():
if key not in variable_lengths:
# Find and replace all None objects
value = format_array(value)
value_dtype = dtype_picker(value.dtype)
else:
# Since we're dealing with variable length arrays, we need to use a special dtype
# and process each array individually
for i, arr in enumerate(value):
value[i] = format_array(arr)
value_dtype = variable_lengths[key]
# If we want it to be chunked do that, always compress it
if key in to_chunk:
h5py_obj.create_dataset(
key,
shape=to_chunk[key]["shape"],
data=value,
dtype=value_dtype,
chunks=to_chunk[key]["chunk"],
compression="gzip",
)
else:
h5py_obj.create_dataset(
key,
shape=value.shape,
data=value,
dtype=value_dtype,
compression="gzip",
)
case list():
# Was easier to convert to a NumPy array to get vectorization
# Given that H5Py converts it into a NumPy array anyway, we can
# at least control the conversion (if we need to)
if key in variable_lengths:
value = np.array(value, dtype=object)
for i, arr in enumerate(value):
value[i] = format_array(arr)
data_dtype = variable_lengths[key]
else:
try:
value = np.array(value)
value = format_array(value)
data_dtype = dtype_picker(value.dtype)
except ValueError as e:
if "inhomogeneous" in str(e):
logger.warning(
f"Failed to save {key} as an array as the list couldn't turn into a NumPy array, trying to save as a variable length array. Please add {key} to the variable_lengths dictionary in the save function in future."
)
value = np.array(value, dtype=object)
for i, arr in enumerate(value):
value[i] = format_array(arr)
data_dtype = h5py.vlen_dtype(dtype_picker(value[0].dtype))
else:
logger.info(
f"You received an error not related to the array being inhomogeneous, Here's the error: {e}"
)
h5py_obj.create_dataset(
key, data=value, dtype=data_dtype, compression="gzip"
)
case Path():
# If we find a Path object, convert it to a string
value = str(value)
h5py_obj.create_dataset(key, data=value)
case None:
is_none = True
# In the case we get something that is none, create empty dataset
h5py_obj.create_dataset(key, dtype="b")
case _:
try:
# Store the value in a single size dataset, used for ints, floats, strings etc
h5py_obj.create_dataset(key, data=value)
except ValueError:
if logger is not None:
logger.error(
f"Failed to save {key}, the type of key was {type(value)}"
)
return is_none
[docs]
def dict_to_group(
group: h5py.Group,
to_convert: dict,
to_chunk: dict[str, dict],
variable_lengths: dict[str, DTypeLike],
logger: Logger | None,
) -> None:
"""
Converts a dictionary to a HDF5 group. This is called in the event a dictionary is found inside
a dictionary that is being saved in a HDF5 file. Creates a subgroup for the hdf5 file with everything
turning into individual datasets.
Parameters
----------
group : h5py.Group
The created group to save the dictionary in
to_convert : dict
The dictionary to save into the group
to_chunk : dict[str, dict]
The chunking dictionary, see `save` for more information
variable_lengths : dict[str, DTypeLike]
The variable length dictionary, see `save` for more information
logger : Logger
PyFHD's Logger
See Also
--------
PyFHD.io.pyfhd_io.save : Save a HDF5 file
"""
for key in to_convert:
group.attrs[key] = save_dataset(
group, key, to_convert[key], to_chunk, variable_lengths, logger
)
[docs]
def save(
file_name: Path,
to_save: NDArray[Any] | dict,
dataset_name: str,
logger: Logger | None = None,
to_chunk: dict[str, dict] = {},
variable_lengths: dict[str, DTypeLike] = {},
) -> None:
"""
Saves a numpy array or dictionary into a hdf5 file using h5py, with compression applied to all arrays/datasets.
An array will be saved as a single dataset, while a dictionary will be saved where each key will be a dataset
unless the key points a dictionary in which case a group will be created and `dict_to_group` called to turn each
key in that sub dict into a dataset (or another group if it's another sub dictionary). This function should be
kept as general as possible, if something needs formatting for saving, format it before calling this function.
If you are converting a sav file to hdf5 with this function, use `recarray_to_dict` which converts the sav output
from readsav into a proper python dictionary (rather than recarrays or weird array shapes, objects arrays etc.)
Parameters
----------
file_name : Path
The file to save as hdf5 should be /path/to/file_name.h5 (or .hdf5)
to_save : NDArray[Any] | dict
The dictionary or numpy array to save into the hdf5 file
dataset_name : str
Used in the case that the to_save variable is an array, this name will
be used as the key for the dataset in the hdf5 file.
logger : Logger, optional
PyFHD's Logger, by default None (in case you don't want to use the logger for testing)
to_chunk : dict[str, dict], optional
A dictionary where each key-value pair represents a key in the to_save dictionary, and the value is a dictionary
which should contain two key-value pairs, `shape` which should be the `shape` of the array and `chunk` which tells
hdf5 how to chunk the dataset when it's being read/written. If you're not sure how to `chunk` the dataset, set `chunk`
to True which enables h5py to guess the chunk size for you. By default {}
variable_lengths : dict[str, DTypeLike], optional
A dictionary where each key-value pair represents a key in the to_save dictionary, and the value is a dtype. This is
for special cases where you must save an array of variable length arrays. H5Py does support variable length arrays, but
you must use a special type, using the `h5py.vlen_dtype()` you can create a dtype which accepts object arrays of variable
lengths. For example if you wish to have variable integer array called `ija`, you would use `h5py.vlen_dtype(np.int64)`,
and save use it in the variable_lengths dictionary like so, `{'ija': h5py.vlen_dtype(np.int64)}`, which will set the dtype appropriately
during a `create_dataset` call. By default {}
See Also
--------
PyFHD.io.pyfhd_io.load : Load a HDF5 file
PyFHD.io.pyfhd_io.dict_to_group : Converts a dictionary to a h5py Group Object
PyFHD.io.pyfhd_io.recarray_to_dict : Turns any record arrays into dicts, also formats object arrays into the correct dtype array
PyFHD.io.pyfhd_io.save_dataset : Saves a single dataset based off a dictionary key-value pair
PyFHD.io.pyfhd_io.format_array : Finds any None is an array and replaces them appropriately
"""
# Create a custom vectorized function to check for complex numbers
# is_complex_vectorized = np.vectorize(is_complex)
with h5py.File(file_name, "w") as h5_file:
match to_save:
case np.ndarray():
if logger:
logger.info(f"Writing the {dataset_name} array to {file_name}")
h5_file.attrs[dataset_name] = save_dataset(
h5_file, dataset_name, to_save, to_chunk, variable_lengths, logger
)
case dict():
if logger:
logger.info(
f"Writing the {dataset_name} dict to {file_name}, each key will be a dataset, if the key contains a dict then it will be a group."
)
for key in to_save:
# We're using the attributes as a mask, where if True then we know
# the dataset is representing a None object.
h5_file.attrs[key] = save_dataset(
h5_file, key, to_save[key], to_chunk, variable_lengths, logger
)
case _:
h5_file.attrs[dataset_name] = save_dataset(
h5_file, dataset_name, to_save, to_chunk, variable_lengths, logger
)
if logger:
logger.warning(
"Not a dict or numpy array, PyFHD won't write other types at this time, refer to PyFHD.io.pyfhd_io.save to see what is supported"
)
[docs]
def load_dataset(
h5py_obj: h5py.File | h5py.Group, key: str, dataset: h5py.Dataset
) -> Any:
"""
Loads a single dataset from a HDF5 File or Group, the key here is the dataset name from the
file or group and is only used to check the attributes of said file or group. If the attribute
associated with the key is True, then we assume the value saved is an empty dataset and we should
return None. If this is False, load the value and check if this value should be a single value. There
are special checks for byte arrays, if there is byte arrays, PyFHD assumes these are meant to be strings.
Parameters
----------
h5py_obj : h5py.File | h5py.Group
A HDF5 file or group
key : str
The dataset name
dataset : h5py.Dataset
The dataset we are loading
Returns
-------
Any
The value stored in the HDF5 Dataset
See Also
--------
PyFHD.io.pyfhd_io.load : Load a HDF5 file
"""
# If the corresponding attribute is True set the current
# key to None as its an empty dataset
if h5py_obj.attrs[key]:
return None
else:
if dataset.shape == ():
value = dataset[()]
else:
value = dataset[:]
if isinstance(value, np.ndarray) and value.dtype.kind == "S":
value = _decode_byte_arr(value)
if isinstance(value, bytes):
value = value.decode()
return value
[docs]
def group_to_dict(group: h5py.Group) -> dict:
"""
When loading a hdf5 file into a dictionary, this turns a group into a dictionary,
and then returns the dictionary.
Parameters
----------
group : h5py.Group
A h5py group to turn into a dictionary
Returns
-------
return_dict: dict
The group turned into a dictionary
"""
return_dict = {}
for key in group:
match group[key]:
case h5py.Dataset():
return_dict[key] = load_dataset(group, key, group[key])
case h5py.Group():
return_dict[key] = group_to_dict(group[key])
return return_dict
[docs]
def load(
file_name: Path, logger: Logger | None = None, lazy_load: bool = False
) -> dict[str, object] | NDArray[Any] | h5py.File:
"""
Loads a HDF5 file into PyFHD, it reads the HDF5 into an array if the
HDF5 file contains a single dataset, while a HDF5 which contains multiple
datasets will load them into a dictionary. Any groups will be convered to
sub dictionaries using `group_to_dict`
Parameters
----------
file_name : Path
The /path/to/the/hdf5.h5
logger : Logger
PyFHD's Logger
lazy_load : bool, optional
Set to true if you wish to lazy load the file, currently the only file that will be
supported to do this in PyFHD will be the beam/psf file, but support for other files can
be done easily enough, by default False
Returns
-------
return_dict | array | h5_file: dict[str, object] | NDArray[Any] | h5py.File
Returns a dict in the case the HDF5 file contains multple datasets,
An array if the HDF5 contains one dataset or h5py File object if the
file is lazy loaded to conserve memory.
See Also
--------
PyFHD.io.pyfhd_io.save : Save a HDF5 file
PyFHD.io.pyfhd_io.group_to_dict : Converts a h5py Group object to a dictionary
"""
h5_file = h5py.File(file_name, "r")
if lazy_load:
return h5_file
try:
if len(h5_file.keys()) == 1:
# Assume that it contains only one numpy array, in which case read the array
key = list(h5_file.keys())[0]
if logger:
logger.info(f"Loading {key} from {file_name} into an array")
array = load_dataset(h5_file, key, h5_file[key])
return array
else:
return_dict = {}
if logger:
logger.info(f"Loading {file_name} into a dictionary")
for key in h5_file:
match h5_file[key]:
case h5py.Dataset():
return_dict[key] = load_dataset(h5_file, key, h5_file[key])
case h5py.Group():
return_dict[key] = group_to_dict(h5_file[key])
return return_dict
finally:
if not lazy_load:
h5_file.close()
[docs]
def recarray_to_dict(data: np.recarray | dict) -> dict:
"""
Turns a record array into a dict, but does it as a deep convert. This was needed due to scipy's readsav
returning an inception like experience of record arrays. This would mean to access values from something
like the obs structure for a test, the code had to be obs[0]['baseline_info'][0]['tile_a'], which was became
untenable as the full python translation won't require these leaving us two codebases for IDL compatible and
Python compatible. Instead, this function turns all record arrays into dictionaries, which are easier to understand
and are faster.
This was made specifically to work with the readsav function, to get compatibility with general recarrays remove the
zero index, as readsav for some reason adds a single dimension to all recarrays.
This was updated later to also take a dictionary which may contain record arrays too.
This was also updated later to turn object arrays into multidimensional arrays if they can be one. In the
case the object array couldn't be turned into a multidimensional array it was turned into a list
Parameters
----------
data : np.recarray or dict
A record array or dictionary maybe containing nested record arrays
Returns
-------
data: dict
A potentially nested dictionaries of dictionaries
"""
# Convert the original record array into a dictionary
if type(data) == np.recarray:
data = {name.lower(): data[name] for name in data.dtype.names}
# For every key, if it's a record array, recursively call the function
for key in data:
# Every now and then you do get object arrays that contain only one element or arrays that contain only one element
# These are not useful so I will extract the element out
if type(data[key]) == np.ndarray and data[key].size == 1:
data[key] = data[key][0]
# Sometimes the recarray is in a standard numpy object array and other times its not for some reason...
if type(data[key]) == np.recarray:
data[key] = recarray_to_dict(data[key])
elif type(data[key]) == np.ndarray and type(data[key][0]) == np.recarray:
data[key] = recarray_to_dict(data[key][0])
# We found a single array with only None
elif type(data[key]) == np.ndarray and isinstance(data[key][0], type(None)):
# Get all the None values and turn them into NaNs
none_values = np.where(data[key] == None)
if np.size(none_values) > 0:
data[key][none_values] = np.nan
# If all of the values were None, then set the array dtype to float64
# (as we don't know what dtype it actually was), probably only relevant for testing
if np.size(none_values) == np.size(data[key]):
data[key] = data[key].astype(np.float64)
# Assume we found a string array since it's bytes, convert to a string list
elif type(data[key]) == np.ndarray and isinstance(data[key].flat[0], bytes):
data[key] = [x.decode().strip() for x in data[key]]
# Found only bytes, assume it's a string, convert the string
elif isinstance(data[key], bytes):
data[key] = data[key].decode()
# You can also get object arrays which themselves contain numpy arrays, it's best to turn these
# into multidimensional arrays. If they can't turn into multidimensional arrays due to them
# being different types or not of the same size then it will convert the numpy object array
# into a list of objects instead.
elif (
type(data[key]) == np.ndarray
and data[key].dtype == object
and type(data[key][0]) == np.ndarray
):
try:
# Get all the None values and turn them into NaNs
none_values = np.nonzero(_is_none(data[key]))
if np.size(none_values) > 0:
data[key][none_values] = np.nan
# If all of the values were None, then set the array dtype to float64
# (as we don't know what dtype it actually was), probably only relevant for testing
if (np.size(none_values) // len(data[key].shape)) == np.size(data[key]):
data[key] = data[key].astype(np.float64)
# If it's not an object array, numpy will stack the axes, which isn't desired here
# as we want to maintain the multidimensional nature of the data. So we'll create an
# array of the desired size using the shape of the first element.
elif data[key][0].dtype != object:
new_array = np.empty(
[data[key].size, *data[key][0].shape], dtype=data[key][0].dtype
)
for idx in range(new_array.shape[0]):
new_array[idx] = data[key][idx]
data[key] = new_array
else:
# For an object array you can flatten it, and stack all inner arrays together until it's not an object array
# Crucially this assumes the array not as an object array can fit in memory! If you're doing the beam_ptr
# conversion take this into consideration
while data[key].dtype == object:
data[key] = np.vstack(data[key].flatten()).reshape(
list(data[key].shape) + list(data[key].flat[0].shape)
)
except ValueError:
data[key] = list(x for x in data[key])
return data
[docs]
def convert_sav_to_dict(sav_path: str, logger: Logger, tmp_dir="temp_pyfhd"):
"""
Given a path to an IDL style .sav file, load into a python dictionary
using scipy.io.readsav.
If the file was saved with the IDL /compress option, the readsav function
has to save a decompressed version of the file. By default this uses
the tempfile module to find a location, but this usually finds a bad
location with little storage when called on a super cluster. So explicitly
make our own temp dir `tmp_pyfhd` where the code is being called. It is
assumed many files are to be converted, so `tmp_pyfhd` should be deleted
after all calls.
Mostly used just for testing, if you;re not a developer you can safely ignore this function
Parameters
----------
sav_path : str
Filepath for an IDL .sav file
logger : Logger
The logger to output any error messages to
tmp_dir : str
Dir to place temporary files, creates the directory if doesn't exist.
Default: "tmp_pyfhd".
Returns
--------
sav_dict : dict
Dictionary containing whatever was in the .sav file
"""
if os.path.isfile(sav_path):
# logger.info(f"{sav_path} found, converting now.")
# Ensure the tmp dir exists, create if not
os.makedirs(tmp_dir, exist_ok=True)
# Strip off any leading path to leave just the file name
temp_name = f"{tmp_dir}/{sav_path.split('/')[-1]}"
# Load into a dictionary, decompressed and saving a temporary file if need
# be
sav_dict = readsav(sav_path, python_dict=True, uncompressed_file_name=temp_name)
return sav_dict
else:
# sys.exit(f"{sav_path} does not exist. Cannot grid so exiting")
logger.error(f"{sav_path} doesn't exist, please check your input path")
for handler in logger.handlers:
handler.close()
exit()