Source code for general_python.common.hdf5man

"""
A collection of functions to read, write, and process HDF5 files.

This module provides utilities for:
- Reading data from HDF5 files with flexible options for key selection and error handling.
- Concatenating datasets across multiple files with shape handling and cleaning options.
- Saving and appending data to HDF5 files with robust logic for different input types.
- Managing file lists and streaming data from multiple files or directories with conditions.

----------------------------------
file        : gener

----------------------------------

"""
from __future__ import annotations

import  os
import  h5py
import  logging
import  numpy as np
from    typing import List, Dict, Any, Optional, Union, Generator, Tuple, Callable

try:
    from ..common.directories   import Directories
except ImportError as e:
    raise ImportError("Required modules from 'common' package are missing.") from e

# --------------------------------
try:
    from .flog import get_global_logger
    _logger = get_global_logger()
except Exception:
    _logger = logging.getLogger(__name__)

# --------------------------------

#! HDF5Manager
[docs] class HDF5Manager: """ A class encapsulating methods for reading, writing, and processing HDF5 files. Methods include: - load_file_data: Read data from a single HDF5 file. - stream_key_from_loaded_files: Generator to yield specific dataset from loaded data. """ # --------------------------------- #! Data Processing Methods @staticmethod def _get_all_dataset_paths(h5_group: h5py.Group) -> List[str]: """ Recursively collects all dataset paths within an HDF5 group. A dataset path is its full path starting from the root. """ dataset_paths = [] def _collect_paths(obj, current_path=""): if isinstance(obj, h5py.Dataset): dataset_paths.append(obj.name) elif isinstance(obj, h5py.Group): for name, item in obj.items(): _collect_paths(item, os.path.join(current_path, name)) _collect_paths(h5_group) return dataset_paths @staticmethod def _validate_file(file_path, verbose: bool = False): """ Validates if the file exists and is an HDF5 file. """ if not os.path.exists(file_path): if verbose: logging.error(f"File does not exist: {file_path}") return False if not str(file_path).lower().endswith(('.h5', '.hdf5', '.hdf')): if verbose: logging.error(f"File is not an HDF5 file (based on extension): {file_path}") return False return True @staticmethod def _read_data_key(hf: h5py.File, key: str) -> Optional[np.ndarray]: """ Reads a specific dataset from an HDF5 file. Returns None if the dataset is not found or cannot be read. """ try: return hf[key][()] except KeyError: logging.error(f"Dataset key '{key}' not found in HDF5 file.") return None except Exception as e: logging.error(f"Error reading dataset '{key}': {e}") return None # --------------------------------- #! Loading and Concatenation Methods # ---------------------------------
[docs] @staticmethod def load_file_data( file_path : str, dataset_keys : Optional[List[str]] = None, verbose : bool = False, remove_corrupted_file : bool = False, strict_keys : bool = True, include_missing_keys : bool = False, missing_value : Any = np.nan) -> Dict[str, Any]: """ Reads data from an HDF5 file. - If `dataset_keys` is provided: * strict_keys=True -> skip file entirely if any key is missing * strict_keys=False -> load only available keys * include_missing_keys=True -> attach missing keys with `missing_value` """ data: Dict[str, Any] = {} if not HDF5Manager._validate_file(file_path, verbose): return data try: with h5py.File(file_path, "r") as hf: if dataset_keys: keys_to_read = dataset_keys else: # Only scan tree if we want EVERYTHING keys_to_read = HDF5Manager._get_all_dataset_paths(hf) # strict key check against the actual file content if dataset_keys and strict_keys: missing = [k for k in dataset_keys if k not in hf] if missing: if verbose: _logger.warning(f"Skipping file {file_path}: missing keys {missing}") return {} if verbose: _logger.info(f"Reading {len(keys_to_read)} datasets from {file_path}") for key in keys_to_read: if key in hf: data_in = HDF5Manager._read_data_key(hf, key) if data_in is not None: data[key] = data_in elif strict_keys: if verbose: _logger.warning(f"Missing strict key: {key}") return {} # Fail fast elif include_missing_keys: data[key] = missing_value elif verbose: _logger.warning(f"Missing key '{key}' in {file_path}; key omitted.") if data: # only attach filename if some data loaded data["filename"] = file_path return data except Exception as e: logging.error(f"Error opening or reading HDF5 file {file_path}: {e}") if remove_corrupted_file and ("truncated" in str(e).lower() or "doesn't exist" in str(e).lower()): _logger.warning(f"Attempting to remove corrupted file: {file_path}") try: os.remove(file_path) _logger.info(f"Successfully removed corrupted file: {file_path}") except OSError as oe: logging.error(f"Failed to remove corrupted file {file_path}: {oe}") return {}
[docs] @staticmethod def read_hdf5(file_path, keys=None, verbose=False, remove_bad=False): return HDF5Manager.load_file_data(file_path, dataset_keys=keys, verbose=verbose, remove_corrupted_file=remove_bad)
# ---------------------------------
[docs] @staticmethod def stream_key_from_loaded_files(loaded_hdf5_data_list : List[Dict[str, Any]], key : str) -> Generator[np.ndarray, None, None]: """ Yields data for a specific key from a list of already loaded HDF5 data dictionaries. Each dictionary in loaded_hdf5_data_list is expected to be the output of 'load_file_data'. Args: loaded_hdf5_data_list: List of dictionaries, where each dict contains data from an HDF5 file. key: The dataset key to extract from each dictionary. Yields: numpy.ndarray: The dataset corresponding to the key. """ def _is_missing_value(v: Any) -> bool: if v is None: return True try: arr = np.asarray(v) except Exception: return False if arr.size == 0: return True if np.issubdtype(arr.dtype, np.number): return np.all(np.isnan(arr)) return False for data_dict in loaded_hdf5_data_list: if key in data_dict: value = data_dict[key] if _is_missing_value(value): continue yield value else: _logger.warning(f"Key '{key}' not found in loaded data from file: {data_dict.get('filename', 'Unknown file')}")
[docs] @staticmethod def concatenate_key_from_loaded_files( loaded_hdf5_data_list : List[Dict[str, Any]], key : str, concat_axis : int = 0, target_shape_axis : Optional[int] = None, # The axis whose dimension should be consistent allow_padding : bool = False, is_vector : bool = False, clean_zeros_params : Optional[Dict[str, Any]] = None, # e.g., {'axis': 0, 'check_limit': 10} clean_threshold_params : Optional[Dict[str, Any]] = None, # e.g., {'axis': 0, 'threshold': -1e4} verbose : bool = False) -> np.ndarray: """ Concatenates a specific dataset key from a list of loaded HDF5 data. Handles shape mismatches by padding (if enabled) or skipping. Args: loaded_hdf5_data_list: List of dictionaries (from 'load_file_data'). key: Dataset key to extract and concatenate. concat_axis: Axis along which to concatenate arrays. target_shape_axis: If specified, datasets must have the same size along this axis as the first valid dataset found. Usually this is 'concat_axis'. allow_padding: If True, pads/truncates datasets to match the target shape on 'target_shape_axis'. is_vector: If True and data is 2D (1, N), flatten to 1D (N,). clean_zeros_params: Optional dict of parameters for 'clean_data_remove_zeros'. clean_threshold_params: Optional dict of parameters for 'clean_data_remove_thresholded'. verbose: If True, log detailed information. Returns: A concatenated numpy array. Returns an empty array if no data is found or processed. """ datasets_to_concat = [] reference_dim_size = None if target_shape_axis is None: target_shape_axis = concat_axis # Default to concat_axis if not specified for data_dict in loaded_hdf5_data_list: filename = data_dict.get('filename', 'Unknown file') if key not in data_dict: if verbose: _logger.warning(f"Key '{key}' not in {filename}, skipping.") continue d = data_dict[key] if not isinstance(d, np.ndarray): if verbose: _logger.warning(f"Data for key '{key}' in {filename} is not a numpy array, skipping.") continue if verbose: _logger.info(f"{filename}: Reading key '{key}' with shape {d.shape}") if d.ndim <= target_shape_axis: if verbose: logging.error(f"Dataset for key '{key}' in {filename} has insufficient dimensions ({d.ndim}) for target_shape_axis {target_shape_axis}, skipping.") continue current_dim_size = d.shape[target_shape_axis] if reference_dim_size is None: reference_dim_size = current_dim_size if current_dim_size != reference_dim_size: if allow_padding: padding_shape = list(d.shape) padding_shape[target_shape_axis] = reference_dim_size padded_d = np.zeros(tuple(padding_shape), dtype=d.dtype) slices = [slice(None)] * d.ndim slices[target_shape_axis] = slice(0, min(current_dim_size, reference_dim_size)) # Ensure compatible slicing for assignment source_slices = [slice(None)] * d.ndim source_slices[target_shape_axis] = slice(0, min(current_dim_size, reference_dim_size)) padded_d[tuple(slices)] = d[tuple(source_slices)] d = padded_d if verbose: _logger.info(f"Padded/truncated data for key '{key}' in {filename} to shape {d.shape} on axis {target_shape_axis}.") else: if verbose: _logger.warning(f"Shape mismatch for key '{key}' in {filename} on axis {target_shape_axis} " f"(expected {reference_dim_size}, got {current_dim_size}). Skipping file.") continue if is_vector and d.ndim == 2 and d.shape[0] == 1: d = d.ravel() datasets_to_concat.append(d) if not datasets_to_concat: if verbose: _logger.warning(f"No data found or processed for key '{key}'.") return np.array([]) try: concatenated_data = np.concatenate(datasets_to_concat, axis=concat_axis) except ValueError as e: logging.error(f"Error concatenating data for key '{key}': {e}. Check shapes: {[arr.shape for arr in datasets_to_concat]}") return np.array([]) if clean_zeros_params: concatenated_data = HDF5Manager.clean_data_remove_zeros(concatenated_data, **clean_zeros_params) if clean_threshold_params: concatenated_data = HDF5Manager.clean_data_remove_thresholded(concatenated_data, **clean_threshold_params) return concatenated_data
# --------------------------------- #! File Management Methods # ---------------------------------
[docs] @staticmethod def stream_data_from_multiple_files( file_paths : List[str], dataset_keys : Optional[List[str]] = None, sort_files : bool = True, verbose : bool = False, strict_keys : bool = True, # pass-through include_missing_keys : bool = True, missing_value : Any = np.nan, ) -> Generator[Dict[str, Any], None, None]: """ Streams data dictionary (from 'load_file_data') for each HDF5 file found in specified paths. Parameters ---------- file_paths: List of HDF5 file paths. dataset_keys: Specific dataset keys to load from each file. If None, loads all datasets. If a single key, it will be converted to a list. sort_files: Whether to sort the found files by name. verbose: If True, log detailed information. strict_keys: If True, skip files that are missing any of the specified keys. include_missing_keys: If True, include missing keys with a specified value instead of skipping. missing_value: The value to use for missing keys if include_missing_keys is True. Yields: dict: Data dictionary from each processed HDF5 file. """ if sort_files: file_paths.sort() if dataset_keys is not None and isinstance(dataset_keys, str): dataset_keys = [dataset_keys] for file_path in file_paths: if verbose: _logger.info(f"Processing file: {file_path}") data = HDF5Manager.load_file_data(file_path, dataset_keys, verbose=verbose, strict_keys=strict_keys, include_missing_keys=include_missing_keys, missing_value=missing_value) if data: # Only yield if data was successfully loaded yield data
[docs] @staticmethod def load_data_from_multiple_files( file_paths : List[str] | list[str], dataset_keys : Optional[List[str]] = None, sort_files : bool = True, verbose : bool = False, strict_keys : bool = True, include_missing_keys : bool = True, missing_value : Any = np.nan) -> List[Dict[str, Any]]: """ Loads data from multiple HDF5 files into a list of dictionaries. Eager evaluation. (This was 'read_multiple_hdf5' before) """ return list(HDF5Manager.stream_data_from_multiple_files(file_paths, dataset_keys, sort_files, verbose, strict_keys=strict_keys, include_missing_keys=include_missing_keys, missing_value=missing_value,))
[docs] @staticmethod def stream_lazy_from_multiple_files( file_paths : List[str] | list[str], dataset_keys : Optional[List[str]] = None, sort_files : bool = True, verbose : bool = False, ) -> Generator[Any, None, None]: """ Stream lazy HDF5 entries, optionally filtered by dataset key presence. Notes ----- - Does not read datasets eagerly. - If `dataset_keys` is provided, files are kept only when all listed keys are present (checked via lazy key listing). """ from .lazy_entry import LazyHDF5Entry paths = list(file_paths) if sort_files: paths.sort() required = None if dataset_keys is not None: required = [dataset_keys] if isinstance(dataset_keys, str) else list(dataset_keys) for file_path in paths: if verbose: _logger.info(f"Processing lazy file: {file_path}") if not HDF5Manager._validate_file(file_path, verbose=verbose): continue entry = LazyHDF5Entry(str(file_path), params={}) if required is not None: try: if not all((k in entry) for k in required): continue except Exception: continue yield entry
[docs] @staticmethod def load_lazy_from_multiple_files( file_paths : List[str] | list[str], dataset_keys : Optional[List[str]] = None, sort_files : bool = True, verbose : bool = False, ) -> List[Any]: """Eagerly collect lazy HDF5 entries into a list.""" return list(HDF5Manager.stream_lazy_from_multiple_files(file_paths, dataset_keys, sort_files, verbose))
# --------------------------------- #! Saving Methods # --------------------------------- @staticmethod def _generate_dataset_names( num_datasets : int, proposed_names : Optional[Union[List[str], str]] = None) -> List[str]: """ Generates dataset names for saving multiple datasets. """ if isinstance(proposed_names, str): return [f"{proposed_names}_{i}" for i in range(num_datasets)] if isinstance(proposed_names, list): if len(proposed_names) == num_datasets: if any(name == "" for name in proposed_names): raise ValueError("Dataset names must not be empty strings.") return proposed_names if len(proposed_names) == 1: prefix = proposed_names[0] if not prefix: raise ValueError("Dataset name prefix must not be an empty string.") return [f"{prefix}_{i}" for i in range(num_datasets)] # Default naming return [f"dataset_{i}" for i in range(num_datasets)]
[docs] @staticmethod def save_data_to_file( directory : Union[str, Directories], filename : str, data_to_save : Union[np.ndarray, List[np.ndarray], Dict[str, np.ndarray]], target_shape : Optional[Tuple[int, ...]] = None, dataset_names_config: Optional[Union[List[str], str]] = None, # Used if data_to_save is list/ndarray overwrite : bool = True, *args, data : Optional[Dict[str, Any]] = None ): """ Saves data to an HDF5 file. Parameters ---------- directory: Directory to save the file. filename: Name of the HDF5 file (extension .h5 or .hdf5 will be ensured). data_to_save: Data to save. Can be a single np.ndarray, a list of np.ndarrays, or a dictionary {name: np.ndarray}. target_shape: If specified, datasets will be reshaped to this shape before saving. dataset_names_config: Names for datasets if 'data_to_save' is a list/ndarray. If a string, used as a prefix. If a list, used as names. overwrite: If True (default), overwrites the file if it exists. """ if not os.path.exists(directory): try: os.makedirs(directory, exist_ok=True) except OSError as e: logging.error(f"Could not create directory {directory}: {e}") return # If data_to_save is not provided, use 'data' if it's given, otherwise default to empty dict. if data_to_save is None: data_to_save = data if data is not None else {} base, ext = os.path.splitext(filename) if ext.lower() not in ['.h5', '.hdf5']: filename = base + '.h5' file_path = os.path.join(directory, filename) mode = 'w' if overwrite else 'w-' # 'w-' fails if file exists try: with h5py.File(file_path, mode) as hf: if isinstance(data_to_save, dict): for name, dataset_array in data_to_save.items(): dtype = np.complex128 if np.iscomplexobj(dataset_array) else np.float64 # Or infer from array array_to_write = np.array(dataset_array, dtype=dtype) if target_shape: array_to_write = array_to_write.reshape(target_shape) hf.create_dataset(name, data=array_to_write) elif isinstance(data_to_save, (list, np.ndarray)): datasets = data_to_save if isinstance(data_to_save, np.ndarray) and data_to_save.ndim == 0: # scalar datasets = [data_to_save] # treat as list of one elif isinstance(data_to_save, np.ndarray) and data_to_save.ndim > 0: # If it's a single multidim array, and no dataset_names_config is given, # or dataset_names_config is a single string, save as one dataset. # If dataset_names_config is a list, it implies data_to_save should be a list of arrays. # This behavior needs to be clear. Assuming if ndarray, it's one dataset unless names imply multiple. if not dataset_names_config or isinstance(dataset_names_config, str) or \ (isinstance(dataset_names_config, list) and len(dataset_names_config) == 1): datasets = [data_to_save] # Treat as a list containing one dataset # else: if names_config is list of N > 1, but data is single ndarray, that's ambiguous. names = HDF5Manager._generate_dataset_names(len(datasets), dataset_names_config) for i, name in enumerate(names): dataset_array = datasets[i] dtype = np.complex128 if np.iscomplexobj(dataset_array) else np.float64 array_to_write = np.array(dataset_array, dtype=dtype) if target_shape: array_to_write = array_to_write.reshape(target_shape) hf.create_dataset(name, data=array_to_write) else: logging.error(f"Unsupported data type for saving: {type(data_to_save)}. Must be dict, list, or ndarray.") except Exception as e: logging.error(f"Error saving HDF5 file {file_path}: {e}")
[docs] @staticmethod def append_data_to_file( directory : str, filename : str, new_data : Union[np.ndarray, List[np.ndarray], Dict[str, np.ndarray]], dataset_names_config : Optional[Union[List[str], str]] = None, # Used if new_data is list/ndarray overwrite_existing_datasets : bool = True, # If dataset exists, overwrite or append rows allow_dataset_creation : bool = True, # If dataset does not exist, create it, *, data : Optional[Dict[str, Any]] = None ): """ Appends data to an existing HDF5 file or creates it if it doesn't exist. Parameters ---------- directory: Directory of the HDF5 file. filename: Name of the HDF5 file. new_data: Data to append. dataset_names_config: Names for datasets if 'new_data' is list/ndarray. overwrite_existing_datasets: - If True and dataset exists, it's deleted and recreated. - If False and dataset exists, data is appended (row-wise). Requires dataset to be resizable. allow_dataset_creation: If True, new datasets are created if they don't exist. """ base, ext = os.path.splitext(filename) if ext.lower() not in ['.h5', '.hdf5']: filename = base + '.h5' file_path = os.path.join(directory, filename) if new_data is None: if data is not None: new_data = data else: logging.error("No new data provided to append.") return if not os.path.exists(file_path): if allow_dataset_creation: _logger.info(f"File {file_path} does not exist. Creating and saving new data.") HDF5Manager.save_data_to_file(directory, filename, new_data, dataset_names_config=dataset_names_config, overwrite=True) return else: logging.error(f"File {file_path} does not exist and dataset creation is not allowed.") return try: with h5py.File(file_path, 'a') as hf: data_items_to_process: Dict[str, np.ndarray] = {} if isinstance(new_data, dict): data_items_to_process = new_data elif isinstance(new_data, (list, np.ndarray)): datasets = new_data if isinstance(new_data, np.ndarray) and new_data.ndim > 0: # Single ndarray datasets = [new_data] names = HDF5Manager._generate_dataset_names(len(datasets), dataset_names_config) data_items_to_process = {name: arr for name, arr in zip(names, datasets)} else: logging.error("Invalid data type for appending.") return for name, data_array in data_items_to_process.items(): data_array_np = np.array(data_array) # Ensure it's a numpy array if name in hf: if overwrite_existing_datasets: del hf[name] # Create with maxshape for potential future non-overwrite appends hf.create_dataset(name, data=data_array_np, maxshape=(None,) + data_array_np.shape[1:]) else: # Append rows if not hf[name].maxshape or hf[name].maxshape[0] is None : # Check if resizable original_shape = hf[name].shape hf[name].resize((original_shape[0] + data_array_np.shape[0]), axis=0) hf[name][-data_array_np.shape[0]:] = data_array_np else: logging.error(f"Dataset '{name}' in {file_path} is not resizable for appending. Maxshape: {hf[name].maxshape}") elif allow_dataset_creation: # Create with maxshape for future appends hf.create_dataset(name, data=data_array_np, maxshape=(None,) + data_array_np.shape[1:] if data_array_np.ndim > 0 else (None,)) else: _logger.warning(f"Dataset '{name}' not found in {file_path} and creation is not allowed.") except Exception as e: logging.error(f"Error appending to HDF5 file {file_path}: {e}")
[docs] @staticmethod def update_fields_in_file( directory : Union[str, Directories], filename : str, data_to_update : Union[np.ndarray, List[np.ndarray], Dict[str, np.ndarray]], target_shape : Optional[Tuple[int, ...]] = None, dataset_names_config : Optional[Union[List[str], str]] = None, create_if_missing : bool = True, ): """ Updates only provided datasets in an HDF5 file and keeps all other datasets unchanged. Example: Existing file has '/a' and '/b'. Calling with data_to_update={'/a': new_data} updates only '/a' and leaves '/b' intact. Parameters ---------- directory: Directory where the HDF5 file is located. filename: Name of the HDF5 file. data_to_update: Data to update. Can be dict {name: array}, list of arrays, or single ndarray. target_shape: If provided, each updated dataset is reshaped before writing. dataset_names_config: Names for datasets when data_to_update is list/ndarray. create_if_missing: If True, creates a missing file and/or missing dataset paths as needed. If False, update only existing datasets in an existing file. """ base, ext = os.path.splitext(filename) if ext.lower() not in ['.h5', '.hdf5']: filename = base + '.h5' file_path = os.path.join(directory, filename) if not os.path.exists(file_path): if create_if_missing: HDF5Manager.save_data_to_file(directory, filename, data_to_update, target_shape=target_shape, dataset_names_config=dataset_names_config, overwrite=True, ) return logging.error(f"File {file_path} does not exist and create_if_missing=False.") return try: with h5py.File(file_path, 'a') as hf: data_items_to_process: Dict[str, np.ndarray] = {} if isinstance(data_to_update, dict): data_items_to_process = data_to_update elif isinstance(data_to_update, (list, np.ndarray)): datasets = data_to_update if isinstance(data_to_update, np.ndarray) and data_to_update.ndim > 0: datasets = [data_to_update] names = HDF5Manager._generate_dataset_names(len(datasets), dataset_names_config) data_items_to_process = {name: arr for name, arr in zip(names, datasets)} else: logging.error("Invalid data type for updating. Must be dict, list, or ndarray.") return for name, dataset_array in data_items_to_process.items(): if name not in hf and not create_if_missing: _logger.warning(f"Dataset '{name}' not found in {file_path}; skipping update.") continue dtype = np.complex128 if np.iscomplexobj(dataset_array) else np.float64 array_to_write = np.array(dataset_array, dtype=dtype) if target_shape: array_to_write = array_to_write.reshape(target_shape) if name in hf: del hf[name] hf.create_dataset(name, data=array_to_write) except Exception as e: logging.error(f"Error updating datasets in HDF5 file {file_path}: {e}")
save_hdf5 = save_data_to_file append_hdf5 = append_data_to_file update_hdf5 = update_fields_in_file # --------------------------------- #! Folders # ---------------------------------
[docs] @staticmethod def file_list_matching(directories : Union[List, Directories, str], *args, # additional arguments to create the directories conditions : List[Callable] = [], check_hdf5_condition : bool = True, as_string : bool = True): """ Returns a list of HDF5 files in the specified directories matching given conditions. Args: directories: A list of directory paths (str) or Directories objects, or a single one. *args: Additional arguments passed to Directories constructor if directories are str. conditions: A list of callables that take a filename and return True if it matches the condition. check_hdf5_condition: If True (default), adds a condition to only include files ending with .h5 or .hdf5. as_string: If True (default), returns file paths as strings. If False, returns as Path objects. Returns: A sorted list of file paths matching the conditions. """ if isinstance(directories, str) or isinstance(directories, Directories): directories = [directories] if not isinstance(conditions, list): if callable(conditions): conditions = [conditions] else: conditions = [] if check_hdf5_condition: conditions = conditions + [lambda x: str(x).endswith('.h5') or str(x).endswith('.hdf5')] # get all directories directories_in = [Directories(d, *args) for d in directories] filelist = [x for d in directories_in for x in d.list_files(filters = conditions)] filelist = sorted(filelist) if as_string: filelist = [str(x) for x in filelist] return filelist
[docs] @staticmethod def stream_data_from_multiple_folders( directory_paths : List[Directories], file_conditions : Optional[List[Any]] = None, # Conditions for Directories dataset_keys : Optional[List[str]] = None, sort_files : bool = True, verbose : bool = False) -> Generator[Dict[str, Any], None, None]: """ Streams data dictionary (from 'load_file_data') for each HDF5 file found in specified directories. Args: directory_paths: List of directories to search for HDF5 files. file_conditions: Conditions passed to 'Directories.listDirs' for filtering files. dataset_keys: Specific dataset keys to load from each file. sort_files: Whether to sort the found files by name. verbose: If True, log detailed information. Yields: dict: Data dictionary from each processed HDF5 file. """ # Use a placeholder for sortCondition if not specifically different sort_lambda = (lambda x: x) if sort_files else None try: # Assuming Directories.listDirs returns full paths if appendDir=True # and uses conditions to filter filenames. # For safety, always use appendDir=True here or handle path construction. file_paths = [d.list_files(filters=file_conditions, sort_key=sort_lambda) for d in directory_paths] except NameError: # Directories class might not be defined if import failed logging.error("'Directories' class is not available. Cannot list files.") return # yield nothing except Exception as e: logging.error(f"Error listing directories: {e}") return if not file_paths: _logger.warning(f"No files found in {directory_paths} matching conditions.") return for file_path in file_paths: if verbose: _logger.info(f"Processing file: {file_path}") data = HDF5Manager.load_file_data(file_path, dataset_keys, verbose=verbose) if data: # Only yield if data was successfully loaded yield data
[docs] @staticmethod def load_data_from_multiple_folders( directory_paths : List[str], file_conditions : Optional[List[Any]] = None, dataset_keys : Optional[List[str]] = None, sort_files : bool = True, verbose : bool = False) -> List[Dict[str, Any]]: """ Loads data from multiple HDF5 files into a list of dictionaries. Eager evaluation. (This was 'read_multiple_hdf5l' before) """ return list(HDF5Manager.stream_data_from_multiple_files( directory_paths, file_conditions, dataset_keys, sort_files, verbose))
[docs] @staticmethod def load_and_concatenate_key_from_folders( directory_paths : List[str], key_to_extract : str, file_conditions : Optional[List[Any]] = None, concat_axis : int = 0, target_shape_axis : Optional[int] = None, allow_padding : bool = False, is_vector : bool = False, clean_zeros_params : Optional[Dict[str, Any]] = None, clean_threshold_params : Optional[Dict[str, Any]] = None, sort_files : bool = True, verbose : bool = False) -> np.ndarray: """ Reads a specific dataset key from multiple HDF5 files found in directories and concatenates them. (This was 'read_hdf5_extract_and_concat' before) """ if target_shape_axis is None: target_shape_axis = concat_axis loaded_data_stream = HDF5Manager.stream_data_from_multiple_folders( directory_paths, file_conditions, dataset_keys = [key_to_extract] if key_to_extract else None, # Load only the required key sort_files = sort_files, verbose = verbose) # Collect data first, then pass to concatenate_key_from_loaded_data # This is because concatenate_key_from_loaded_data needs the first dataset to establish reference shape # A more memory-efficient version would require more complex logic if files are huge and padding is involved. # For now, this matches the implied logic of the original code. data_dicts_list = [data for data in loaded_data_stream if data] if not data_dicts_list: if verbose: _logger.warning(f"No files processed or data loaded for key '{key_to_extract}'.") return np.array([]) return HDF5Manager.concatenate_key_from_loaded_files( data_dicts_list, key_to_extract, concat_axis = concat_axis, target_shape_axis = target_shape_axis, allow_padding = allow_padding, is_vector = is_vector, clean_zeros_param = clean_zeros_params, clean_threshold_params = clean_threshold_params, verbose = verbose)
[docs] @staticmethod def load_and_concatenate_key_per_directory( list_of_directory_paths : List[str], # Each item is a directory to process independently key_to_extract : str, file_conditions : Optional[List[Any]] = None, concat_axis : int = 0, target_shape_axis : Optional[int] = None, allow_padding : bool = False, is_vector : bool = False, clean_zeros_params : Optional[Dict[str, Any]] = None, clean_threshold_params : Optional[Dict[str, Any]] = None, verbose : bool = False) -> List[np.ndarray]: """ For each directory in 'list_of_directory_paths', loads and concatenates data for 'key_to_extract'. Returns a list of concatenated numpy arrays, one for each input directory. (This was 'read_hdf5_extract_and_concat_list' before) """ results = [] for dir_path in list_of_directory_paths: concatenated_data = HDF5Manager.load_and_concatenate_key_from_folders( directory_paths = [dir_path], # Process one directory at a time key_to_extract = key_to_extract, file_conditions = file_conditions, concat_axis = concat_axis, target_shape_axis = target_shape_axis, allow_padding = allow_padding, is_vector = is_vector, clean_zeros_params = clean_zeros_params, clean_threshold_params = clean_threshold_params, sort_files = True, # Default sort within directory verbose = verbose ) results.append(concatenated_data) return results
# --------------------------------- #! Data Cleaning Methods # --------------------------------- @staticmethod def _coerce_array(a: Any, axis_realization: int = 0) -> np.ndarray: """ Coerce HDF5 dataset/list/scalar to a numeric ndarray. """ if a is None: return np.array([], dtype=float) arr = np.asarray(a) if arr.dtype == object: # ragged: list of arrays/scalars parts = [] for e in arr: if e is None: continue e = np.asarray(e) if e.size == 0: continue parts.append(e) if not parts: return np.array([], dtype=float) # all 1D or all 2D? ndims = {p.ndim for p in parts} if len(ndims) != 1: raise ValueError("Mixed ranks in object array; cannot concatenate cleanly.") if parts[0].ndim == 1: return np.concatenate([p.reshape(-1) for p in parts], axis=axis_realization) elif parts[0].ndim == 2: dset = {p.shape[1] for p in parts} if len(dset) != 1: raise ValueError("Inconsistent second dimension in ragged 2D parts.") return np.concatenate(parts, axis=axis_realization) else: raise ValueError("Only 1D or 2D arrays are supported.") # scalar -> (1,) if arr.ndim == 0: arr = arr.reshape(1) return arr
[docs] @staticmethod def process_data( data, keys : str | list[str] | tuple[str, ...], throw_if_bad : bool = False, unpack : bool = True, expected_ndim : int | None = None, expected_dim0 : int | None = None, expected_dim1 : int | None = None, expected_first_val : Any = None, return_skipped : bool = False) -> np.ndarray | tuple[np.ndarray, list[str]]: """ Collects arrays from iterable of mappings and concatenates them robustly. Parameters ---------- data iterable of dict-like objects keys: key or list of possible keys to try (first available is used) throw_if_bad: whether to throw if no valid arrays are found unpack: whether to flatten nested arrays along first axis expected_ndim: enforce specific ndim (1 or 2). If None, auto-infer from first valid array. This is stricter than just checking consistency. expected_dim0: enforce first dimension length (skip mismatches) expected_dim1: enforce second dimension length (skip mismatches) expected_first_val: if not None, enforce that the first value of the array is close to this (skip mismatches) return_skipped: if True, return (array, skipped_filenames) Returns ------- - ndarray (default) - (ndarray, skipped_filenames) if return_skipped=True Example: >>> energies = HDF5Manager.process_data(data, "energies") >>> energies = HDF5Manager.process_data(data, ["energies", "E"], expected_ndim=1) >>> obs = HDF5Manager.process_data(data, "observables", expected_ndim=2, expected_dim1=4) """ if isinstance(keys, str): keys = [keys] arrays: list[np.ndarray] = [] target_dtype = None target_ndim = expected_ndim target_dim1 = expected_dim1 target_first_val = expected_first_val skipped: list[str] = [] for idx, x in enumerate(data): fname = x.get("filename", f"<item {idx}>") try: # find a matching key found_key = next((k for k in keys if k in x), None) if found_key is None: skipped.append(fname) continue val = x[found_key] if val is None: skipped.append(fname) continue arr = np.asarray(val) if arr.size == 0: skipped.append(fname) continue # Missing-key sentinels are represented as scalar/flat NaN values. # Skip those entries so downstream concatenation ignores only bad files. if arr.ndim == 0 and np.issubdtype(arr.dtype, np.number) and np.isnan(arr.item()): skipped.append(fname) continue if arr.ndim >= 1 and np.issubdtype(arr.dtype, np.number) and np.all(np.isnan(arr)): skipped.append(fname) continue if target_first_val is not None: if arr.ndim == 0: first_val = arr.item() elif arr.ndim >= 1 and arr.shape[0] > 0: first_val = arr.flat[0] else: skipped.append(fname) continue if not np.isclose(first_val, target_first_val): skipped.append(fname) continue # normalize rank if arr.ndim > 2: arr = arr.reshape(arr.shape[0], -1) # enforce ndim if expected_ndim is not None and arr.ndim != expected_ndim: skipped.append(fname) continue if target_ndim is None: target_ndim = arr.ndim if target_ndim == 2: target_dim1 = arr.shape[1] # consistency checks if target_ndim == 1: if arr.ndim == 2: if arr.shape[1] != 1: skipped.append(fname) continue arr = arr.reshape(-1) elif target_ndim == 2: if arr.ndim != 2: skipped.append(fname) continue if target_dim1 is not None and arr.shape[1] != target_dim1: skipped.append(fname) continue if expected_dim0 is not None and arr.shape[0] != expected_dim0: skipped.append(fname) continue if expected_dim1 is not None and arr.ndim > 1 and arr.shape[1] != expected_dim1: skipped.append(fname) continue if target_dtype is None: target_dtype = arr.dtype arrays.append(arr) except Exception as e: logging.error(f"Error processing {fname}: {e}") skipped.append(fname) if not arrays: if throw_if_bad: raise ValueError(f"No valid data found for keys {keys}") result = np.array([], dtype=float) else: if target_ndim == 1: if unpack: result = np.concatenate(arrays).astype(target_dtype, copy=False) else: result = np.array(arrays, dtype=target_dtype) else: # target_ndim == 2 if unpack: result = np.concatenate(arrays, axis=0) else: result = np.array(arrays, dtype=target_dtype) return (result, skipped) if return_skipped else result
# --------------------------------- #! Data Processing Methods # ---------------------------------
[docs] @staticmethod def clean_data_remove_zeros( matrix : np.ndarray, axis : int = 0, tolerance : float = 1e-9, check_limit : Optional[int] = 10) -> np.ndarray: """ Removes slices (e.g., rows or columns) from a matrix where initial elements are all close to zero. For 1D vector, removes elements close to zero from the beginning up to check_limit. Args: matrix: Input numpy array. axis: Axis along which to check for zero elements and remove slices. tolerance: Tolerance for considering an element as zero. check_limit: Max number of elements along the slice (or vector) to check. If None, checks all elements in the slice. Returns: Cleaned numpy array. """ if matrix.ndim == 0: # Scalar return matrix if matrix.size == 0: # Empty return matrix if matrix.ndim == 1: limit = matrix.shape[0] if check_limit is None else min(check_limit, matrix.shape[0]) elements_to_check = matrix[:limit] if np.all(np.isclose(elements_to_check, 0.0, atol=tolerance)): if np.all(np.isclose(matrix, 0.0, atol=tolerance)): # Check all elements if initial ones are zero return np.array([]) # Return empty if all are zero return matrix # Or return as is if not all (checked) are zero if axis < 0 or axis >= matrix.ndim: logging.error(f"Invalid axis {axis} for matrix with {matrix.ndim} dimensions.") return matrix # Move the target axis to the first position for easier processing matrix_moved = np.moveaxis(matrix, axis, 0) num_slices = matrix_moved.shape[0] elements_per_slice_dim = tuple(range(1, matrix_moved.ndim)) # Dims other than the first (moved) one limit = None if check_limit is not None: # For multi-dim slices, check_limit applies to the first dimension of the slice. # This interpretation might need refinement based on exact intent. # Assuming check_limit applies to the first dimension of the slice itself. # Example: if axis=0 (rows), check_limit applies to columns of each row. # If matrix_moved.shape = (num_rows, C, D), slice shape is (C, D). # Check M_moved[:, :check_limit, ...] if C is the dimension to check. # The original code: M_moved[:, :check_limit] (implicitly checking columns up to check_limit) if matrix_moved.ndim > 1: # Slices are at least 1D limit = min(check_limit, matrix_moved.shape[1]) if matrix_moved.shape[1] > 0 else None valid_slice_indices = [] for i in range(num_slices): current_slice = matrix_moved[i] slice_to_check = current_slice if limit is not None and current_slice.ndim > 0: # if slice is not scalar # Take the first 'limit' elements along the first dimension of the slice # Example: if slice is (C, D, E), slice_to_check is current_slice[:limit, :, :] idx = [slice(None)] * current_slice.ndim idx[0] = slice(0, limit) slice_to_check = current_slice[tuple(idx)] if not np.all(np.isclose(slice_to_check, 0.0, atol=tolerance)): valid_slice_indices.append(i) if not valid_slice_indices: # All slices were "bad" final_shape = list(matrix.shape) final_shape[axis] = 0 return np.empty(tuple(final_shape), dtype=matrix.dtype) matrix_filtered = matrix_moved[valid_slice_indices] return np.moveaxis(matrix_filtered, 0, axis)
[docs] @staticmethod def clean_data_remove_thresholded( matrix : np.ndarray, axis : int = 0, threshold : float = -1e4, check_limit : Optional[int] = None) -> np.ndarray: """ Removes slices from a matrix where initial elements are all below a threshold. Improved to handle any axis using np.moveaxis. Args: matrix: Input numpy array. axis: Axis along which to check and remove slices. threshold: Threshold value. Slices are removed if all checked elements are < threshold. check_limit: Max number of elements along the slice to check. If None, checks all. Returns: Cleaned numpy array. """ if matrix.ndim == 0: return matrix if matrix.size == 0: return matrix if axis < 0 or axis >= matrix.ndim: logging.error(f"Invalid axis {axis} for matrix with {matrix.ndim} dimensions.") return matrix matrix_moved = np.moveaxis(matrix, axis, 0) num_slices = matrix_moved.shape[0] limit = None if check_limit is not None and matrix_moved.ndim > 1 and matrix_moved.shape[1] > 0: limit = min(check_limit, matrix_moved.shape[1]) valid_slice_indices = [] for i in range(num_slices): current_slice = matrix_moved[i] slice_to_check = current_slice if limit is not None and current_slice.ndim > 0: idx = [slice(None)] * current_slice.ndim idx[0] = slice(0, limit) # Check along the first dimension of the slice slice_to_check = current_slice[tuple(idx)] if not np.all(slice_to_check < threshold): valid_slice_indices.append(i) if not valid_slice_indices: final_shape = list(matrix.shape) final_shape[axis] = 0 return np.empty(tuple(final_shape), dtype=matrix.dtype) matrix_filtered = matrix_moved[valid_slice_indices] return np.moveaxis(matrix_filtered, 0, axis)
# --------------------------------- #! Batch Processing # ---------------------------------
[docs] @staticmethod def process_file_content( source_directory : str, source_filename : str, key_map : Optional[Dict[str, str]] = None, # Maps old keys to new keys clean_zeros_axis : Optional[int] = None, clean_values_axis : Optional[int] = None, clean_check_limit : int = 10, output_directory : Optional[str] = None, # If None, overwrites source verbose : bool = False): """ Loads an HDF5 file, optionally renames keys, cleans data, and saves it. (This was 'change_h5_bad' before) Args: source_directory: Directory of the source HDF5 file. source_filename: Filename of the source HDF5 file. key_map: Dictionary to rename dataset keys {old_key: new_key}. clean_zeros_axis: Axis for 'clean_data_remove_zeros'. clean_values_axis: Axis for 'clean_data_remove_thresholded'. clean_check_limit: 'check_limit' for cleaning functions. output_directory: Directory to save the processed file. If None, overwrites original. verbose: If True, log detailed information. """ full_source_path = os.path.join(source_directory, source_filename) data = HDF5Manager.load_file_data(full_source_path, verbose=verbose) if not data or 'filename' not in data: # load_file_data returns empty dict on failure logging.error(f"Could not read or data is empty for {full_source_path}") return #! Remove 'filename' key before processing datasets original_filepath_in_data = data.pop('filename', None) processed_data = {} for current_key, dataset_array in data.items(): # Apply cleaning if clean_zeros_axis is not None: dataset_array = HDF5Manager.clean_data_remove_zeros(dataset_array, axis=clean_zeros_axis, check_limit=clean_check_limit) if clean_values_axis is not None: dataset_array = HDF5Manager.clean_data_remove_thresholded(dataset_array, axis=clean_values_axis, check_limit=clean_check_limit) # Apply key mapping new_key = key_map.get(current_key, current_key) if key_map else current_key processed_data[new_key] = dataset_array if not processed_data: if verbose: _logger.info(f"No data left after processing {full_source_path}") # Decide if an empty HDF5 file should be saved or not. # Current: does not save if processed_data is empty. return target_dir = output_directory if output_directory else source_directory target_filename = source_filename # Assumes filename remains the same if verbose: action = "Overwriting" if target_dir == source_directory else "Saving to" _logger.info(f"{action} {os.path.join(target_dir, target_filename)}") HDF5Manager.save_data_to_file(target_dir, target_filename, processed_data, overwrite=True)
[docs] @staticmethod def batch_process_files_in_dirs( source_directories : List[str], file_conditions : Optional[List[Any]] = None, key_map : Optional[Dict[str, str]] = None, clean_zeros_axis : Optional[int] = None, clean_values_axis : Optional[int] = None, clean_check_limit : int = 10, output_directory_base : Optional[str] = None, # If set, processed files go to output_directory_base/original_subdir_structure is_test_run : bool = False, # If true, appends "testrun" to output dir names verbose : bool = False, exception_handler : Optional[Callable[[Exception, str], None]] = None ): """ Processes multiple HDF5 files across directories. (This was 'change_h5_bad_dirs' before) """ for current_source_dir in source_directories: try: if verbose: _logger.info(f"Processing files in directory: {current_source_dir}") # Directories.listDirs with appendDir=False returns relative filenames relative_filenames = Directories.listDirs( [current_source_dir], conditions=file_conditions or [], appendDir=False ) if not relative_filenames: if verbose: _logger.info(f"No files matching conditions found in {current_source_dir}") continue for rel_filename in relative_filenames: target_output_dir = None if output_directory_base: # Recreate subdirectory structure if current_source_dir is nested relative_dir_path = os.path.relpath(current_source_dir, start=min(source_directories, key=len)) #Simplistic base if relative_dir_path == '.': relative_dir_path = "" target_output_dir = os.path.join(output_directory_base, relative_dir_path) elif is_test_run: # original behavior for testrun target_output_dir = os.path.join(current_source_dir, "testrun") # else: target_output_dir remains None, so process_file_content overwrites source. if verbose: _logger.info(f"Processing {os.path.join(current_source_dir, rel_filename)} -> " f"Output dir: {target_output_dir if target_output_dir else current_source_dir}") HDF5Manager.process_file_content( current_source_dir, rel_filename, key_map=key_map, clean_zeros_axis=clean_zeros_axis, clean_values_axis=clean_values_axis, clean_check_limit=clean_check_limit, output_directory=target_output_dir, verbose=verbose ) except Exception as e: logging.error(f"Error processing directory {current_source_dir}: {e}") if exception_handler: exception_handler(e, f"Error processing directory {current_source_dir}")
# else: print("Exception:", e) # Original behavior # -------------------------------- #! Histogram / Data Series Combination Utilities # --------------------------------
[docs] @staticmethod def average_histograms( y_arrays_list : List[np.ndarray], x_arrays_list : List[np.ndarray], filter_y_lt_one : bool = False, use_interpolation : bool = True) -> Tuple[np.ndarray, np.ndarray]: """ Combines and averages y-values (e.g., histogram counts) across multiple series, aligning them by their x-values (e.g., bin centers). Args: y_arrays_list: List of Y-value arrays. x_arrays_list: List of corresponding X-value arrays (bins). filter_y_lt_one: If True, y-values < 1.0 (and corresponding x) are filtered out before averaging. (Original 'typical' parameter). use_interpolation: - If True, interpolates Y-values onto a common X-grid. - If False, aggregates only at exact X-matches and appends unique X-bins. Returns: Tuple (y_combined_averaged, x_common_grid). """ if not y_arrays_list or not x_arrays_list: raise ValueError("Input y_arrays_list and x_arrays_list cannot be empty.") if len(y_arrays_list) != len(x_arrays_list): raise ValueError("y_arrays_list and x_arrays_list must have the same length.") if len(x_arrays_list) == 1: y_curr, x_curr = y_arrays_list[0], x_arrays_list[0] if filter_y_lt_one: mask = y_curr >= 1.0 return y_curr[mask], x_curr[mask] return y_curr, x_curr processed_series = [] for y, x in zip(y_arrays_list, x_arrays_list): if filter_y_lt_one: mask = y >= 1.0 processed_series.append((y[mask], x[mask])) else: processed_series.append((y, x)) if use_interpolation: # Create a common, sorted, unique x-grid from all series all_x_values = np.concatenate([s[1] for s in processed_series]) x_common_grid = np.sort(np.unique(all_x_values)) sum_y_on_common_grid = np.zeros_like(x_common_grid, dtype=float) counts_on_common_grid = np.zeros_like(x_common_grid, dtype=int) for y_s, x_s in processed_series: if x_s.size == 0: continue # Skip empty series # Interpolate y_s onto x_common_grid y_interp = np.interp(x_common_grid, x_s, y_s, left=0, right=0) # Or np.nan and handle later sum_y_on_common_grid += y_interp min_x_s, max_x_s = np.min(x_s), np.max(x_s) counts_on_common_grid += ((x_common_grid >= min_x_s) & (x_common_grid <= max_x_s) & (y_interp != 0)) # Approximation # Avoid division by zero valid_counts = counts_on_common_grid > 0 y_combined_averaged = np.zeros_like(x_common_grid, dtype=float) y_combined_averaged[valid_counts] = sum_y_on_common_grid[valid_counts] / counts_on_common_grid[valid_counts] return y_combined_averaged, x_common_grid else: # Non-interpolation method (original logic) if not processed_series: return np.array([]), np.array([]) # Vectorized aggregation using np.unique and np.bincount all_y = np.concatenate([s[0] for s in processed_series]) all_x = np.concatenate([s[1] for s in processed_series]) if all_x.size == 0: return np.array([]), np.array([]) unique_x, inverse = np.unique(all_x, return_inverse=True) sum_y = np.bincount(inverse, weights=all_y) counts = np.bincount(inverse) return sum_y / counts, unique_x
[docs] @staticmethod def align_and_fill_histograms( y_arrays_list : List[np.ndarray], # List of lists of y-arrays if grouped by x_arrays x_arrays_list : List[np.ndarray], # List of x-arrays group_lengths : List[int], # Number of y_arrays corresponding to each x_array fill_value : float = np.nan) -> Tuple[List[np.ndarray], np.ndarray]: """ Aligns multiple y-value series (histograms) to a common x-grid by interpolation, filling values for x-points not present in an original series. Args: y_arrays_list: A list where each element can be a list of y-arrays (if multiple realizations share an x_array) or a single y-array. The structure should align with group_lengths. Example: [[y1_real1, y1_real2], [y2_real1]] x_arrays_list: List of x-value arrays (bins), one for each group of y_arrays. Example: [x1_bins, x2_bins] group_lengths: List indicating how many y-arrays in 'y_arrays_list' correspond to each x-array in 'x_arrays_list'. Example: [2, 1] means y_arrays_list[0] (a list of 2 y-arrays) uses x_arrays_list[0], and y_arrays_list[1] (a list of 1 y-array) uses x_arrays_list[1]. If y_arrays_list elements are single y-arrays, then group_lengths would be [1, 1, ...]. fill_value: Value used for points in the common x-grid that are outside an original series' x-range. Returns: Tuple (y_aligned_all, x_common_grid). y_aligned_all: List of 1D numpy arrays, each y-series interpolated to x_common_grid. """ if not y_arrays_list or not x_arrays_list: raise ValueError("Input lists cannot be empty.") if len(group_lengths) != len(x_arrays_list): # Each x_array defines a group raise ValueError("group_lengths must match the number of x_arrays.") # Create a common, sorted, unique x-grid from all x_arrays_list all_x_values = np.concatenate(x_arrays_list) x_common_grid = np.sort(np.unique(all_x_values)) y_aligned_all = [] y_list_flat_idx = 0 # To iterate through y_arrays_list elements correctly for group_idx, num_realizations_in_group in enumerate(group_lengths): current_x_array = x_arrays_list[group_idx] for _ in range(num_realizations_in_group): # y_arrays_list could be a list of lists or a flat list of y-arrays # Let's assume y_arrays_list[group_idx] gives us the y-array or list of y-arrays for this group. y_data_for_group = y_arrays_list[group_idx] # This might be a single array or a list of arrays if num_realizations_in_group == 1 and isinstance(y_data_for_group, np.ndarray) and y_data_for_group.ndim == 1: y_current_realization = y_data_for_group elif isinstance(y_data_for_group, list) and len(y_data_for_group) == num_realizations_in_group: y_current_realization = y_data_for_group[_] # _ is realization index within group else: # Fallback or error for mismatched structure # This part depends on exact structure of y_arrays_list. # Assume y_arrays_list is a FLAT list of all y_arrays. if y_list_flat_idx >= len(y_arrays_list): raise ValueError("Mismatch between group_lengths and total number of y_arrays.") y_current_realization = y_arrays_list[y_list_flat_idx] y_list_flat_idx += 1 if current_x_array.size == 0: # Skip if x array is empty y_aligned_all.append(np.full_like(x_common_grid, fill_value, dtype=float)) continue y_interp = np.interp( x_common_grid, current_x_array, y_current_realization, left=fill_value, right=fill_value ) y_aligned_all.append(y_interp) return y_aligned_all, x_common_grid
# ---------------------------------------- #! END OF HDF5Manager CLASS # ----------------------------------------