"""
This module provides a DataHandler class for handling and processing data arrays.
It includes methods for filtering, initializing, interpolating, aggregating,
concatenating, and averaging data arrays.
Classes:
DataHandler: A class containing static methods for data handling and processing.
Methods:
_filter_typical_values(current_x, current_y, typical, threshold=1.0) -> tuple:
_initialize_combined_arrays(y_list, x_list, typical, threshold=1.0) -> tuple:
_interpolate_and_update(x_combined, y_combined, current_x, current_y, divider) -> tuple:
_aggregate_and_update(x_combined, y_combined, current_x, current_y, divider) -> tuple:
Aggregates and updates combined x and y data arrays with current x and y data arrays.
concat_and_average(y_list, x_list, typical=False, use_interpolation=True, threshold=1.0) -> tuple:
concat_and_fill(y_list, x_list, lengths, missing_val=np.nan) -> tuple:
"""
####################################################################################################
import numpy as np
from typing import Union, Sequence, Optional, Iterable, Tuple
####################################################################################################
[docs]
class DataHandler:
"""
DataHandler class provides static methods for handling and processing data arrays, including filtering,
interpolating, aggregating, concatenating, and cutting matrices based on specific criteria.
Methods:
_filter_typical_values(current_x, current_y, typical, threshold=1.0) -> tuple:
_initialize_combined_arrays(y_list, x_list, typical, threshold=1.0) -> tuple:
Initializes and combines arrays from given lists. If the `typical` flag is set to True,
it filters the combined arrays to include only elements where the values in `y_combined`
are less than the threshold.
_interpolate_and_update(x_combined, y_combined, current_x, current_y, divider) -> tuple:
_aggregate_and_update(x_combined, y_combined, current_x, current_y, divider) -> tuple:
Aggregates and updates combined x and y data arrays with current x and y data arrays
by summing common bins and appending unique bins.
concat_and_average(y_list, x_list, typical=False, use_interpolation=True, threshold=1.0) -> tuple:
concat_and_fill(y_list, x_list, lengths, missing_val=np.nan) -> tuple:
cut_matrix_bad_vals_zero(M, axis=0, tol=1e-9, check_limit: float | None = 10) -> np.ndarray:
Cuts off the slices (along any specified axis) in matrix M where all elements are close to zero.
cut_matrix_bad_vals(M, axis=0, threshold=-1e4, check_limit=None) -> np.ndarray:
Cuts off the rows or columns in matrix M where the first `check_limit` elements are all below a threshold.
"""
@staticmethod
def _filter_typical_values(current_x, current_y, typical, threshold = 1.0) -> tuple:
"""
Filters y values less than the threshold.
"""
if typical:
indices = current_y < threshold
current_x = current_x[indices]
current_y = current_y[indices]
return current_x, current_y
[docs]
@staticmethod
def _initialize_combined_arrays(y_list, x_list, typical, threshold = 1.0) -> tuple:
"""
Initializes and combines arrays from given lists.
This method takes two lists of arrays, `y_list` and `x_list`, and combines their first elements.
If the `typical` flag is set to True, it filters the combined arrays to include only elements
where the values in `y_combined` are less than 1.0.
Args:
y_list (list of numpy.ndarray): List of arrays to be combined for the y-axis.
x_list (list of numpy.ndarray): List of arrays to be combined for the x-axis.
typical (bool): Flag to determine if filtering should be applied.
Returns:
tuple: A tuple containing the combined y-axis array and x-axis array.
"""
y_combined = y_list[0]
x_combined = x_list[0]
return DataHandler._filter_typical_values(x_combined, y_combined, typical, threshold)
[docs]
@staticmethod
def _interpolate_and_update(x_combined, y_combined, current_x, current_y, divider):
"""
Interpolates and updates combined x and y data arrays with current x and y data arrays.
This function takes in combined x and y data arrays, and current x and y data arrays,
interpolates them to a common set of x values, and updates the combined y data array
by adding the interpolated current y data array. It also updates the divider array
by interpolating it to the new x values and incrementing it by 1.
Parameters:
x_combined (np.ndarray): The combined x data array.
y_combined (np.ndarray): The combined y data array.
current_x (np.ndarray): The current x data array.
current_y (np.ndarray): The current y data array.
divider (np.ndarray): The divider array.
Returns:
tuple: A tuple containing:
- new_x_combined (np.ndarray): The new combined x data array after interpolation.
- y_combined (np.ndarray): The updated combined y data array.
- divider (np.ndarray): The updated divider array.
"""
new_x_combined = np.sort(np.unique(np.concatenate([x_combined, current_x]))) # Create a new combined x grid
y_interpolated_combined = np.interp(new_x_combined, x_combined, y_combined, left=0, right=0) # Interpolate previous y values onto the new grid
y_interpolated_current = np.interp(new_x_combined, current_x, current_y, left=0, right=0) # Interpolate current y onto the new grid
y_combined = y_interpolated_combined + y_interpolated_current # Update combined y values
divider = np.interp(new_x_combined, x_combined, divider, left=0, right=0) + 1 # Update divider - increment by 1
return new_x_combined, y_combined, divider
[docs]
@staticmethod
def _aggregate_and_update(x_combined, y_combined, current_x, current_y, divider):
"""
Aggregates and updates combined x and y data arrays with current x and y data arrays.
Args:
x_combined (np.ndarray): The combined x data array.
y_combined (np.ndarray): The combined y data array.
current_x (np.ndarray): The current x data array.
current_y (np.ndarray): The current y data array.
divider (np.ndarray): The divider array.
"""
# Find common bins and separate unique bins
common_bins = np.intersect1d(x_combined, current_x, assume_unique=True) # Common bins in combined and current
unique_x_combined = np.setdiff1d(x_combined, common_bins, assume_unique=True) # Unique bins in combined - previous x's
unique_x_current = np.setdiff1d(current_x, common_bins, assume_unique=True) # Unique bins in current - new x's
if common_bins.size > 0:
common_indices_combined = np.isin(x_combined, common_bins) # Indices of common bins in combined
common_indices_current = np.isin(current_x, common_bins) # Indices of common bins in current
y_combined[common_indices_combined] += current_y[common_indices_current] # Sum y values
divider[common_indices_combined] += 1 # Update divider - increment by 1
if unique_x_current.size > 0:
x_combined = np.concatenate([x_combined, unique_x_current]) # Append unique bins
y_combined = np.concatenate([y_combined, current_y[np.isin(current_x, unique_x_current)]])
divider = np.concatenate([divider, np.ones_like(unique_x_current)]) # Update divider - join the list of ones (first occurrence)
sort_indices = np.argsort(x_combined) # Sort combined arrays
x_combined = x_combined[sort_indices] # Sort x_combined
y_combined = y_combined[sort_indices] # Sort y_combined
divider = divider[sort_indices] # Sort divider
return x_combined, y_combined, divider
################################################################################################
[docs]
@staticmethod
def concat_and_average(y_list, x_list, typical = False, use_interpolation = True, threshold = 1.0):
"""
Concatenates and averages y values across multiple histograms.
:param y_list : List of y matrices (each one corresponding to a realization).
:param x_list : List of x vectors (each one corresponding to a realization).
:param typical : If True, filter y values less than 1.0.
:param use_interpolation: If True, interpolate y values for non-matching bins.
If False, aggregate only exact matches and append unique bins.
:param threshold : The threshold value for filtering y values (default: 1.0).
:returns : Combined y values and x bins after averaging.
"""
# check the instances
if not isinstance(y_list, (list, np.ndarray)) or not isinstance(x_list, (list, np.ndarray)) or not isinstance(typical, bool) or not isinstance(use_interpolation, bool):
raise ValueError("Input lists must be of type list or numpy.ndarray, and flags must be boolean.")
# check if the arrays are already one dimensional and return them
if len(y_list[0].shape) == 1:
return y_list, x_list
# check if the arrays are empty or have different lengths
if len(y_list) == 0 or len(x_list) == 0:
raise ValueError("Input lists cannot be empty.")
# check if the arrays have the same length - when they are multidimensional
if len(y_list) != len(x_list):
raise ValueError("Input lists must have the same length.")
# check if the arrays have only one element and return them
if len(x_list) == 1:
return y_list[0], x_list[0]
# first initialization
y_combined, x_combined = DataHandler._initialize_combined_arrays(y_list, x_list, typical)
divider = np.ones_like(y_combined)
# loop over the rest of the arrays
for i in range(1, len(x_list)):
current_x, current_y = DataHandler._filter_typical_values(x_list[i], y_list[i], typical, threshold)
if use_interpolation:
x_combined, y_combined, divider = DataHandler._interpolate_and_update(x_combined, y_combined, current_x, current_y, divider)
else:
x_combined, y_combined, divider = DataHandler._aggregate_and_update(x_combined, y_combined, current_x, current_y, divider)
return y_combined / divider, x_combined # Final averaging - it shall divide each element by the number of realizations [each element in the divider]
[docs]
@staticmethod
def concat_and_fill(y_list, x_list, lengths, missing_val = np.nan):
"""
Concatenates y values across multiple histograms, combines x vectors into a single sorted array,
and fills missing values.
:param y_list: List of y arrays (each one corresponding to a realization).
:param x_list: List of x arrays (each one corresponding to a realization group).
:param lengths: List indicating how many y arrays correspond to each x array.
:param missing_val: Value to fill for missing data points after interpolation (default: np.nan).
:returns: A 2D NumPy array of y values interpolated to a common x grid and the combined x bins.
"""
# check the instances
if not isinstance(y_list, list) or not isinstance(x_list, list) or not isinstance(lengths, list):
raise ValueError("Input lists must be of type list.")
# check if the arrays are already one dimensional and return them
if len(y_list[0].shape) == 1:
return y_list, x_list
if len(y_list) == 0 or len(x_list) == 0:
raise ValueError("Input lists cannot be empty.")
if len(lengths) != len(x_list):
raise ValueError("Lengths list must match the size of x_list.")
# Combine all x vectors into a single sorted, unique array
x_combined = np.sort(np.unique(np.concatenate(x_list)))
# Interpolate each realization onto the combined x grid
y_all = []
for il, length in enumerate(lengths):
num_realizations = length[0] if (isinstance(length, list) or isinstance(length, tuple)) else length
for ii in range(num_realizations):
y = y_list[il][ii]
# Interpolate current y onto x_combined grid
y_all.append(np.interp(x_combined, x_list[il], y, left = missing_val, right = missing_val))
# Return as 2D array and combined x bins
return y_all, x_combined
################################################################################################
[docs]
@staticmethod
def cut_matrix_bad_vals_zero(M,
axis = 0,
tol = 1e-9,
check_limit : float | None = 10):
"""
Cut off the slices (along any specified axis) in matrix M where all elements are close to zero.
If a 1D vector is provided, it returns the vector unless all elements are close to zero,
in which case it returns an empty array.
Parameters:
- M (numpy.ndarray) : The input matrix or vector.
- axis (int) : The axis along which to check for zero elements.
For example, 0 for rows, 1 for columns, etc.
Ignored if M is a 1D vector.
- tol (float) : The tolerance for considering elements as zero.
- check_limit (int) : The maximum number of elements along the axis to check for zeros.
Returns:
- numpy.ndarray: The resulting matrix after removing slices (along the specified axis)
that are close to zero, or the vector after removing if all elements are close to zero.
"""
# handle vector shape!
if M.ndim == 1:
if check_limit is not None:
check_limit = min(check_limit, M.shape[0])
mask = np.isclose(M[:check_limit], 0.0, atol=tol)
else:
mask = np.isclose(M, 0.0, atol=tol)
return M[~mask]
# handle matrix shape!
M_moved = np.moveaxis(M, axis, 0)
if check_limit is not None:
check_limit = min(check_limit, M_moved.shape[1])
mask = ~np.all(np.isclose(M_moved[:, :check_limit], 0.0, atol=tol), axis=1)
else:
mask = ~np.all(np.isclose(M_moved, 0.0, atol=tol), axis=1)
M_filtered = M_moved[mask] # Use the mask to filter the slices along the moved axis
return np.moveaxis(M_filtered, 0, axis) # Move the axis back to its original position
[docs]
@staticmethod
def cut_matrix_bad_vals(M,
axis = 0,
threshold = -1e4,
check_limit = None):
"""
Cut off the rows or columns in matrix M where the first `check_limit` elements are all below a threshold.
Parameters:
- M (numpy.ndarray): The input matrix.
- axis (int): The axis along which to check for elements below the threshold (0 for rows, 1 for columns).
- threshold (float): The threshold value.
- check_limit (int, optional): The number of elements to check from each row or column.
Returns:
- numpy.ndarray: The resulting matrix after removing rows or columns where the first `check_limit` elements are below the threshold.
"""
if axis == 0:
# Check rows
if check_limit is not None:
check_limit = min(check_limit, M.shape[1])
mask = ~np.all(M[:, :check_limit] < threshold, axis=1)
else:
mask = ~np.all(M < threshold, axis=1)
elif axis == 1:
# Check columns
if check_limit is not None:
check_limit = min(check_limit, M.shape[0])
mask = ~np.all(M[:check_limit, :] < threshold, axis=0)
else:
mask = ~np.all(M < threshold, axis=0)
else:
raise ValueError("Axis must be 0 (rows) or 1 (columns).") # Invalid axis
return M[mask] if axis == 0 else M[:, mask] # Return the matrix with the rows or columns removed
################################################################################################
####################################################################################################
#! EOF
####################################################################################################