'''
Math utilities for various mathematical operations, fittings, and distributions.
It includes functions for finding maxima, nearest values, and fitting data to various models.
Imports:
- scipy.optimize.curve_fit
- scipy.interpolate.splrep
- scipy.interpolate.BSpline
Functions:
- Fitter
- FitterParams
File : general_python/maths/math_utils.py
Version : 0.1.0
Author : Maksymilian Kliczkowski
License : MIT
'''
# fitting and distributions
from scipy.optimize import curve_fit as fit
from scipy.interpolate import splrep, BSpline
from scipy import interpolate
from scipy.stats import poisson, norm, expon, gaussian_kde
from scipy.special import gamma
# fit the functions
try:
import pandas as pd
except ImportError:
pd = None
import math
import numpy as np
# Optional JAX support without import-time side effects
try:
import jax.numpy as jnp # type: ignore
_JAX_AVAILABLE = True
except Exception: # pragma: no cover - optional dependency
jnp = None # type: ignore
_JAX_AVAILABLE = False
TWOPI = math.pi * 2
PI = math.pi
PIHALF = math.pi / 2
#################################### FINDERS ####################################
[docs]
def find_maximum_idx(x):
'''
Find maximum index in a DataFrame, numpy array, or JAX array
- x : DataFrame, numpy array, or JAX array
'''
if pd is not None and isinstance(x, pd.DataFrame):
return x.idxmax(axis=1)
elif isinstance(x, np.ndarray):
return np.argmax(x, axis=1)
elif _JAX_AVAILABLE:
try:
return jnp.argmax(x, axis=1)
except Exception:
raise TypeError("Unsupported JAX array type or shape for argmax")
else:
raise TypeError("Input must be a DataFrame, numpy array, or JAX array")
[docs]
def find_nearest_val(x, val, col):
'''
Find the nearest value to the value given
- x : a DataFrame or numpy array
- val : a scalar
- col : a string on which column to find the nearest
'''
if pd is not None and isinstance(x, pd.DataFrame):
idx = (x[col]-val).abs().idxmin()
return x.at[idx, col]
elif isinstance(x, np.ndarray):
idx = (np.abs(x - val)).argmin()
return x.flat[idx]
elif _JAX_AVAILABLE:
try:
idx = (jnp.abs(x - val)).argmin()
return x.ravel()[idx]
except Exception:
raise TypeError("Unsupported JAX array type for nearest value computation")
else:
raise TypeError("Input must be a DataFrame, numpy array, or JAX array")
[docs]
def find_nearest_idx(x, val : float, **kwargs):
'''
Find the nearest idx to the value given
- x : a DataFrame or numpy array
- val : a scalar
- col : a string on which column to find the nearest
Returns the index of the nearest value to the given value
'''
if pd is not None and isinstance(x, pd.DataFrame):
col = kwargs.get('col', None)
if col is None:
raise ValueError("Column name must be provided for DataFrame.")
return (x[col]-val).abs().idxmin()
elif isinstance(x, np.ndarray):
return (np.abs(x - val)).argmin()
elif _JAX_AVAILABLE:
try:
return jnp.array((jnp.abs(x - val)).argmin())
except Exception:
raise TypeError("Unsupported JAX array type for nearest index computation")
else:
raise TypeError("Input must be a DataFrame, numpy array, or JAX array")
###################################### FITS ######################################
[docs]
class FitterParams(object):
'''
Class that stores only the parameters of the fit function
- popt : parameters of the fit
- pcov : covariance matrix of the fit
- funct : function of the fit
'''
[docs]
def __init__(self, funct, popt, pcov):
'''
Initialize the class
- funct : function of the fit
- popt : parameters of the fit
- pcov : covariance matrix of the fit
'''
self._popt = popt
self._pcov = pcov
self._funct = funct
[docs]
def get_popt(self):
"""Optimized fit parameters."""
return self._popt
[docs]
def get_pcov(self):
"""Estimated covariance matrix of the fitted parameters."""
return self._pcov
[docs]
def get_fun(self):
"""Fitted callable."""
return self._funct
@property
def popt(self):
"""Optimized fit parameters."""
return self._popt
@property
def pcov(self):
"""Estimated covariance matrix of the fitted parameters."""
return self._pcov
@property
def funct(self):
"""Fitted callable."""
return self._funct
def __call__(self, x):
return self._funct(x)
def __str__(self):
return 'FitterParams: ' + str(self._popt) + ' ' + str(self._pcov)
# -----------------------------------------------------------------------------
[docs]
class Fitter:
'''
Class that contains the fit functions and their general usage.
- x : arguments
- y : values
- fitter: FitterParams object
'''
###################################################
[docs]
def __init__(self, x : np.ndarray, y : np.ndarray):
'''
Initialize the class
- x : arguments
- y : values
'''
self._x = x
self._y = y
self._fitter = FitterParams(None, None, None)
###################################################
[docs]
def apply(self, x : np.ndarray):
"""Evaluate the current fitted function at ``x``."""
return self._fitter.funct(x)
###################################################
[docs]
@staticmethod
def skip(x, y, skipF = 0, skipL = 0):
'''
Skips a certain part of the values for the fit
- x : arguments to trim
- y : values to trim
- skipF : number of first elements to skip
- skipL : number of last elements to skip
'''
xfit = x[skipF if skipF!= 0 else None : -skipL if skipL!= 0 else None]
yfit = y[skipF if skipF!= 0 else None : -skipL if skipL!= 0 else None]
return xfit, yfit
# ------------------------------------------------------------------
# Robust helpers for scaling analysis
# ------------------------------------------------------------------
_MEAN_ALIASES = {
'arithmetic' : {'arithmetic', 'arith', 'avg', 'average', 'mean'},
'typical' : {'typical', 'typ', 'geometric', 'geo', 'log-mean', 'logmean'},
'median' : {'median', 'med'},
'harmonic' : {'harmonic', 'harm'},
}
@staticmethod
def _canonical_mean(mean_type: str) -> str:
key = str(mean_type).strip().lower()
for canon, aliases in Fitter._MEAN_ALIASES.items():
if key in aliases:
return canon
raise ValueError(f"Unknown mean_type '{mean_type}'. Use one of: arithmetic/avg/mean, typical/typ/geometric, median, harmonic.")
@staticmethod
def _as_1d(a, dtype=float):
return np.asarray(a, dtype=dtype).ravel()
@staticmethod
def _prepare_xy(x, y, *, require_positive_x: bool = False, require_positive_y: bool = False, eps: float = 1e-300, dtype=float):
"""Prepare x and y arrays for fitting by filtering out non-finite values and optionally enforcing positivity. Returns the filtered x and y arrays."""
x_arr = Fitter._as_1d(x, dtype=dtype)
y_arr = Fitter._as_1d(y, dtype=dtype)
if x_arr.size != y_arr.size:
raise ValueError(f"x and y must have same length, got {x_arr.size} and {y_arr.size}.")
mask = np.isfinite(x_arr) & np.isfinite(y_arr)
if require_positive_x and x_arr.dtype in [np.float32, np.float64]:
mask &= x_arr > 0.0
if require_positive_y and y_arr.dtype in [np.float32, np.float64]:
mask &= y_arr > eps
x_ok = x_arr[mask]
y_ok = y_arr[mask]
if x_ok.size < 2:
raise ValueError("Need at least 2 valid points after filtering.")
return x_ok, y_ok
[docs]
@staticmethod
def aggregate(values, *, mean_type: str = 'arithmetic', weights=None, trim_fraction: float = 0.0, eps: float = 1e-300):
"""
Aggregate samples with configurable averaging convention.
Parameters
----------
values : array-like
Input samples.
mean_type : str
One of arithmetic/avg/mean, typical/typ/geometric, median, harmonic.
weights : array-like, optional
Optional weights for arithmetic averaging only.
trim_fraction : float, default=0.0
Fraction trimmed from each tail for arithmetic mean (0 <= trim < 0.5).
eps : float, default=1e-300
Positivity cutoff for logarithmic/harmonic aggregations.
"""
arr = Fitter._as_1d(values, dtype=float)
arr = arr[np.isfinite(arr)]
if arr.size == 0:
return np.nan
mode = Fitter._canonical_mean(mean_type)
if mode == 'median':
return float(np.median(arr))
if mode == 'typical':
pos = arr[arr > eps]
if pos.size == 0:
return np.nan
return float(np.exp(np.mean(np.log(pos))))
if mode == 'harmonic':
pos = arr[arr > eps]
if pos.size == 0:
return np.nan
return float(pos.size / np.sum(1.0 / pos))
# arithmetic
trim = float(trim_fraction)
if trim < 0.0 or trim >= 0.5:
raise ValueError("trim_fraction must satisfy 0 <= trim_fraction < 0.5.")
if trim > 0.0 and arr.size >= 3:
arr = np.sort(arr)
k = int(trim * arr.size)
if 2 * k < arr.size:
arr = arr[k: arr.size - k]
if weights is None:
return float(np.mean(arr))
w = Fitter._as_1d(weights, dtype=float)
if w.size != Fitter._as_1d(values, dtype=float).size:
raise ValueError("weights must have the same length as values.")
# apply mask to both values and weights
mask = np.isfinite(Fitter._as_1d(values, dtype=float)) & np.isfinite(w)
v = Fitter._as_1d(values, dtype=float)[mask]
wv = w[mask]
s = np.sum(wv)
if s == 0.0:
return np.nan
return float(np.dot(v, wv) / s)
[docs]
@staticmethod
def fit_loglog_linear(x, y):
"""Fit ``log(y) = a + b log(x)`` and return ``(a, b, r2)``."""
x_ok, y_ok = Fitter._prepare_xy(x, y, require_positive_x=True, require_positive_y=True)
lx = np.log(x_ok)
ly = np.log(y_ok)
b, a = np.polyfit(lx, ly, 1)
yhat = a + b * lx
ss_res = float(np.sum((ly - yhat) ** 2))
ss_tot = float(np.sum((ly - np.mean(ly)) ** 2))
r2 = 1.0 - ss_res / ss_tot if ss_tot > 0.0 else 1.0
return float(a), float(b), float(r2)
[docs]
@staticmethod
def fit_power_scaling(x, y):
"""Fit ``y = A x^beta`` and return ``(A, beta, r2_log)``."""
a, b, r2 = Fitter.fit_loglog_linear(x, y)
return float(np.exp(a)), float(b), float(r2)
[docs]
@staticmethod
def fit_inverse_power_scaling(x, y):
"""Fit ``y = A x^{-alpha}`` and return ``(A, alpha, r2_log)``."""
A, beta, r2 = Fitter.fit_power_scaling(x, y)
return float(A), float(-beta), float(r2)
[docs]
@staticmethod
def fit_ipr_scaling(dimensions, iprs, q: float = 2.0):
r"""
Fit ``IPR(N) = A N^{-alpha}`` and infer ``D_q = alpha/(q-1)``.
Returns
-------
tuple
``(A, alpha, D_q, r2_log)``.
"""
A, alpha, r2 = Fitter.fit_inverse_power_scaling(dimensions, iprs)
Dq = alpha / (q - 1.0) if q != 1.0 else np.inf
return float(A), float(alpha), float(Dq), float(r2)
#################### F I T S ! ####################
[docs]
def fit_linear(self, skipF = 0, skipL = 0):
'''
Fits a linear function.
- skipF : skip first arguments
- skipL : skip last arguments
'''
xfit, yfit = Fitter.skip(self._x, self._y, skipF, skipL)
a, b = np.polyfit(xfit, yfit, 1)
self._fitter = FitterParams(lambda x: a * x + b, [a, b], [])
[docs]
@staticmethod
def fitLinear( x, y,
skipF = 0,
skipL = 0):
'''
Fits a linear function.
- x : arguments
- y : values
- skipF : skip first arguments
- skipL : skip last arguments
'''
xfit, yfit = Fitter.skip(x, y, skipF, skipL)
a, b = np.polyfit(xfit, yfit, 1)
return FitterParams(lambda x: a * x + b, [a, b], [])
#############
[docs]
def fit_exp(self,
skipF = 0,
skipL = 0):
'''
Fits [a * exp(-b * x)]
- skipF : skip first arguments
- skipL : skip last arguments
'''
xfit, yfit = Fitter.skip(self._x, self._y, skipF, skipL)
funct = lambda x, a, b: a * np.exp(-b * x)
popt, pcov = fit(funct, xfit, yfit)
self._fitter = FitterParams(lambda x: popt[0] * np.exp(-popt[1] * x), popt, pcov)
[docs]
@staticmethod
def fitExp( x,
y,
skipF = 0,
skipL = 0):
'''
Fits [a * exp(-b * x)]
- x : arguments
- y : values
- skipF : skip first arguments
- skipL : skip last arguments
'''
xfit, yfit = Fitter.skip(x, y, skipF, skipL)
funct = lambda x, a, b: a * np.exp(-b * x)
popt, pcov = fit(funct, xfit, yfit)
return FitterParams(lambda x: popt[0] * np.exp(-popt[1] * x), popt, pcov)
#############
[docs]
def fit_x_plus_x2( self,
skipF = 0,
skipL = 0):
'''
Fits [a * x + b * x ** 2]
- skipF : number of elements to skip on the left
- skipR : number of elements to skip on the right
'''
xfit, yfit = Fitter.skip(self._x, self._y, skipF, skipL)
funct = lambda x, a, b: (a * x) + (b * x ** 2)
popt, pcov = fit(funct, xfit, yfit)
self._fitter = FitterParams(lambda x: popt[0] * x + popt[1] * x ** 2, popt, pcov)
[docs]
@staticmethod
def fitXPlusX2( x,
y,
skipF = 0,
skipL = 0):
'''
Fits [a * x + b * x ** 2]
- x : arguments to the fit
- y : values to fit
- skipF : number of elements to skip on the left
- skipR : number of elements to skip on the right
'''
xfit, yfit = Fitter.skip(x, y, skipF, skipL)
funct = lambda x, a, b: (a * x) + (b * x ** 2)
popt, pcov = fit(funct, xfit, yfit)
return FitterParams(lambda x: popt[0] * x + popt[1] * x ** 2, popt, pcov)
#############
[docs]
def fit_power(self,
skipF = 0,
skipL = 0):
'''
Fits function [a*x**b]
- x : arguments to the fit
- y : values to the fit
- skipF : number of elements to skip on the left
- skipR : number of elements to skip on the right
'''
xfit, yfit = Fitter.skip(self._x, self._y, skipF, skipL)
funct = lambda x, a, b: a * x ** b
popt, pcov = fit(funct, xfit, yfit)
self._fitter = FitterParams(funct, popt, pcov)
[docs]
@staticmethod
def fitPower( x,
y,
skipF = 0,
skipL = 0):
'''
Fits function [a*x**b]
- x : arguments to the fit
- y : values to the fit
- skipF : number of elements to skip on the left
- skipR : number of elements to skip on the right
'''
xfit, yfit = Fitter.skip(x, y, skipF, skipL)
funct = lambda x, a, b: a * x ** b
popt, pcov = fit(funct, xfit, yfit)
return FitterParams(lambda x: popt[0] * (x ** popt[1]), popt, pcov)
#################### G E N R L ####################
[docs]
def fit_any(self,
funct,
skipF = 0,
skipL = 0):
'''
Fits function [any]
- funct : function to fit to
- skipF : number of elements to skip on the left
- skipR : number of elements to skip on the right
'''
xfit, yfit = Fitter.skip(self._x, self._y, skipF, skipL)
popt, pcov = fit(funct, xfit, yfit)
self._fitter = FitterParams(funct, popt, pcov)
[docs]
@staticmethod
def fitAny( x,
y,
funct,
skipF = 0,
skipL = 0,
bounds = []):
'''
Fits function [any]
- x : arguments to fit
- y : values to fit
- funct : function to fit to
- skipF : number of elements to skip on the left
- skipR : number of elements to skip on the right
'''
xfit, yfit = Fitter.skip(x, y, skipF, skipL)
if bounds == []:
popt, pcov = fit(funct, xfit, yfit)
return FitterParams(lambda x: funct(x, *popt), popt, pcov)
else:
popt, pcov = fit(funct, xfit, yfit, bounds = bounds)
return FitterParams(lambda x: funct(x, *popt), popt, pcov)
#################### H I S T O ####################
[docs]
@staticmethod
def gen_cauchy(x, v = 1.0, gamma = 1.0, alpha = 1.0, beta = 1.0):
r'''
Generalized Cauchy distribution
- v is the normalization factor
- \alpha is the stability parameter, often referred to as the shape parameter,
- \beta is the scale parameter,
- \gamma is a scale parameter related to the width of the distribution.
'''
return v * gamma * (1 + (x * gamma / beta)**2) ** (-(alpha + 1) / 2)
[docs]
@staticmethod
def cauchy(x, x0 = 0., gamma = 1.0, v = 1.0):
'''
Cauchy distribution
- x : arguments
- x0 : x0 parameter
- gamma : gamma parameter
'''
y = v / ((x - x0)**2 + gamma**2)
return y
[docs]
@staticmethod
def pareto(x, v = 1.0, alpha = 1.0, xm = 1.0, mu = 0.0):
'''
Pareto distribution
- x : arguments
- alpha : alpha parameter
- xm : xm parameter
'''
return v * np.power(1.0 + alpha * (np.abs(x - mu)), -1.0 / xm - 1.0)
[docs]
@staticmethod
def poisson(x, lambd = 1.0, v = 1.0):
'''
Poisson distribution
- k : arguments
- lamb : lambda parameter
'''
return v * np.exp(-lambd * (np.abs(x)))
[docs]
@staticmethod
def chi2(x, k = 1.0, v = 1.0, z = 1.0):
'''
Chi2 distribution
- x : arguments
- k : k parameter
'''
return v * np.exp(-np.abs(z * x) / 2) * (np.abs(x) ** (k / 2 - 1)) / (2 ** (k/2) * gamma(k/2))
[docs]
@staticmethod
def gaussian(x, mu = 0.0, sigma = 1.0):
'''
Gaussian distribution
- x : arguments
- mu : mean
- sigma : standard deviation
'''
return norm.pdf(x, mu, sigma)
[docs]
@staticmethod
def laplace(x, lambd = 1.0, v = 1.0, mu = 0.0):
'''
Laplace distribution
- x : arguments
- mu : mean
- b : scale parameter
'''
return (0.5 + 0.5 * np.sign(x-mu) * (1.0 - np.exp(-np.abs(x - mu) / lambd))) / (v)
[docs]
@staticmethod
def exponential(x, lambd, sigma):
'''
Exponential distribution
- x : arguments
- lambd : lambda parameter
'''
return sigma * np.exp(-lambd * np.abs(x))
[docs]
@staticmethod
def lorentzian(x, v = 1.0, g = 1.0):
'''
Lorentzian distribution
- x : arguments
- v : multiplication constant
- g : gamma parameter
'''
return np.abs(v) * g / ((x)**2 + g**2)
[docs]
@staticmethod
def two_lorentzian(x, v = 1.0, g1 = 1.0, g2 = 1.0, v2 = 1.0):
'''
Two Lorentzian distribution
- x : arguments
- x0 : x0 parameter
- gamma : gamma parameter
'''
if g1 < 0 or g2 < 0:
return 1.0E10
if v < 0 or v2 < 0:
return 1.0E10
# return v * (g1 / ((x)**2 + g1**2) + g2 / ((x)**2 + g2**2))
return Fitter.lorentzian(x, v, g1) + Fitter.lorentzian(x, v2, g2)
# parametrized
[docs]
@staticmethod
def lorentzian_system_size(param):
"""Return a Lorentzian model whose width is scaled by system size."""
def lorentzian(x, g = 1.0, v = 1.0):
return v * (g / param[0]) / ( (x)**2 + (g / param[0])**2 )
return lorentzian
#################### H I S T O ####################
[docs]
@staticmethod
def fit_histogram(edges,
counts,
typek = 'gaussian',
skipF = 0,
skipL = 0,
centers = [],
params = [],
bounds = None):
"""Fit a parametric distribution to histogram bin edges and counts."""
if len(centers) == 0:
if len(edges) <= 1:
raise ValueError('Edges must have at least 2 elements')
centers = ((edges[:-1] + edges[1:]) / 2)
match typek:
case 'gaussian':
funct = Fitter.gaussian
case 'poisson':
funct = Fitter.poisson
case 'laplace':
funct = Fitter.laplace
case 'exponential':
funct = Fitter.exponential
case 'pareto':
funct = Fitter.pareto
case 'cauchy':
funct = Fitter.cauchy
case 'gen_cauchy':
funct = Fitter.gen_cauchy
case 'chi2':
funct = Fitter.chi2
case 'lorentzian':
funct = Fitter.lorentzian
case 'two_lorentzian':
funct = Fitter.two_lorentzian
case 'lorentzian_system_size':
funct = Fitter.lorentzian_system_size(params)
case _:
raise ValueError('Type not recognized: ' + typek)
centers, counts = Fitter.skip(centers, counts, skipF, skipL)
popt, pcov = fit(funct, centers, counts)
return FitterParams(lambda x: funct(x, *popt), popt, pcov)
[docs]
@staticmethod
def get_histogram(typek = 'gaussian'):
"""
Get the histogram function based on the type of distribution
"""
if typek == 'gaussian':
return Fitter.gaussian
elif typek == 'poisson':
return Fitter.poisson
elif typek == 'laplace':
return Fitter.laplace
elif typek == 'exponential':
return Fitter.exponential
elif typek == 'pareto':
return Fitter.pareto
elif typek == 'cauchy':
return Fitter.cauchy
else:
raise ValueError('Type not recognized: ' + typek)
##############
[docs]
def next_power(x : float, base : int = 2):
'''
Get the next power of a number (base) that is greater than x\
- x : number to get the next power
- base : base of the power (default 2 for binary, can be 10 for decimal)
'''
return base ** math.ceil(math.log(x) / math.log(base))
[docs]
def prev_power(x : float, base : int = 2):
'''
Get the previous power of a number (base) that is smaller than x\
- x : number to get the next power
- base : base of the power (default 2 for binary, can be 10 for decimal)
'''
return base ** math.floor(math.log(x) / math.log(base))
#################################################################################
[docs]
def mod_euc(a: int, b: int) -> int:
'''
Compute the modified Euclidean remainder of a divided by b.
This function ensures that the result has the same sign as b.
- a : integer dividend
- b : integer divisor
'''
if b == 0:
raise ValueError("Divisor 'b' cannot be zero.")
m = a % b
if m < 0:
m = m - b if b < 0 else m + b
return m
[docs]
def mod_floor(a: int, b: int) -> int:
'''
Compute the modified floor division of a divided by b.
This function ensures that the result has the same sign as b.
- a : integer dividend
- b : integer divisor
'''
if b == 0:
raise ValueError("Divisor 'b' cannot be zero.")
m = a // b
if (a < 0) != (b < 0) and a % b != 0:
m -= 1
return m
[docs]
def mod_ceil(a: int, b: int) -> int:
'''
Compute the modified ceiling division of a divided by b.
This function ensures that the result has the same sign as b.
- a : integer dividend
- b : integer divisor
'''
if b == 0:
raise ValueError("Divisor 'b' cannot be zero.")
m = a // b
if (a < 0) == (b < 0) and a % b != 0:
m += 1
return m
[docs]
def mod_trunc(a: int, b: int) -> int:
'''
Compute the modified truncation division of a divided by b.
This function ensures that the result has the same sign as a.
- a : integer dividend
- b : integer divisor
'''
if b == 0:
raise ValueError("Divisor 'b' cannot be zero.")
return a // b
[docs]
def mod_round(a: int, b: int) -> int:
'''
Compute the modified rounding division of a divided by b.
This function ensures that the result has the same sign as a.
- a : integer dividend
- b : integer divisor
'''
if b == 0:
raise ValueError("Divisor 'b' cannot be zero.")
m = a / b
if m < 0:
m = m - 1 if a % b < 0 else m + 1
return int(m)
#################################################################################