# This file is part of the fitter software
#
# Copyright (c) 2014-2022
#
# File author(s): Thomas Cokelaer <cokelaer@gmail.com>
#
# Distributed under the GPLv3 License.
# See accompanying file LICENSE.txt or copy at
# http://www.gnu.org/licenses/gpl-3.0.html
#
# source: https://github.com/cokelaer/fitter
# Documentation: http://packages.python.org/fitter
# Package: http://pypi.python.org/fitter
#
##############################################################################
"""Main module of the fitter package.
This module provides the Fitter class for fitting multiple probability distributions
to data samples and comparing their goodness of fit using various metrics.
.. sectionauthor:: Thomas Cokelaer, Aug 2014-2020
"""
from __future__ import annotations
import contextlib
import multiprocessing
from typing import Any
import joblib
import numpy as np
import pandas as pd
import scipy.stats
from joblib.parallel import Parallel, delayed
from loguru import logger
from matplotlib import pyplot as plt
from scipy.integrate import IntegrationWarning
from scipy.stats import entropy as kl_div
from scipy.stats import kstest
from tqdm import tqdm
__all__ = ["Fitter", "get_common_distributions", "get_distributions"]
# A solution to wrap joblib parallel call in tqdm from
# https://stackoverflow.com/questions/24983493/tracking-progress-of-joblib-parallel-execution/58936697#58936697
# and https://github.com/louisabraham/tqdm_joblib
@contextlib.contextmanager
def tqdm_joblib(*args: Any, **kwargs: Any) -> Any:
"""Context manager to patch joblib to report into tqdm progress bar.
Args:
*args: Positional arguments passed to tqdm.
**kwargs: Keyword arguments passed to tqdm.
Yields:
tqdm object for progress tracking.
"""
tqdm_object = tqdm(*args, **kwargs)
class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __call__(self, *args, **kwargs):
tqdm_object.update(n=self.batch_size)
return super().__call__(*args, **kwargs)
old_batch_callback = joblib.parallel.BatchCompletionCallBack
joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
try:
yield tqdm_object
finally:
joblib.parallel.BatchCompletionCallBack = old_batch_callback
tqdm_object.close()
[docs]
def get_distributions() -> list[str]:
"""Get all scipy.stats distributions that have a fit method.
Returns:
List of distribution names as strings.
"""
# BUGFIX: Replace eval() with getattr() - safer and faster
distributions = [name for name in dir(scipy.stats) if hasattr(getattr(scipy.stats, name, None), "fit")]
return distributions
[docs]
def get_common_distributions() -> list[str]:
"""Get commonly used distributions that are available in scipy.stats.
Returns:
List of common distribution names that have fit methods.
Note:
Filters based on scipy version to avoid errors with missing distributions.
"""
distributions = get_distributions()
# Convert to set for O(1) lookup (much faster than list membership)
dist_set = set(distributions)
# Common distributions to avoid error due to changes in scipy
common = [
"cauchy",
"chi2",
"expon",
"exponpow",
"gamma",
"lognorm",
"norm",
"powerlaw",
"rayleigh",
"uniform",
]
# Single-pass filter with set lookup (O(n) instead of O(n²))
return [x for x in common if x in dist_set]
[docs]
class Fitter:
"""Fit a data sample to known distributions
A naive approach often performed to figure out the undelying distribution that
could have generated a data set, is to compare the histogram of the data with
a PDF (probability distribution function) of a known distribution (e.g., normal).
Yet, the parameters of the distribution are not known and there are lots of
distributions. Therefore, an automatic way to fit many distributions to the data
would be useful, which is what is implemented here.
Given a data sample, we use the `fit` method of SciPy to extract the parameters
of that distribution that best fit the data. We repeat this for all available distributions.
Finally, we provide a summary so that one can see the quality of the fit for those distributions
Here is an example where we generate a sample from a gamma distribution.
::
>>> # First, we create a data sample following a Gamma distribution
>>> from scipy import stats
>>> data = stats.gamma.rvs(2, loc=1.5, scale=2, size=20000)
>>> # We then create the Fitter object
>>> import fitter
>>> f = fitter.Fitter(data)
>>> # just a trick to use only 10 distributions instead of 80 to speed up the fitting
>>> f.distributions = f.distributions[0:10] + ['gamma']
>>> # fit and plot
>>> f.fit()
>>> f.summary()
sumsquare_error aic bic kl_div ks_statistic ks_pvalue
loggamma 0.001176 995.866732 -159536.164644 inf 0.008459 0.469031
gennorm 0.001181 993.145832 -159489.437372 inf 0.006833 0.736164
norm 0.001189 992.975187 -159427.247523 inf 0.007138 0.685416
truncnorm 0.001189 996.975182 -159408.826771 inf 0.007138 0.685416
crystalball 0.001189 996.975078 -159408.821960 inf 0.007138 0.685434
Once the data has been fitted, the :meth:`summary` method returns a sorted dataframe where
the index represents the distribution names.
The AIC is computed using :math:`aic = 2 * k - 2 * log(Lik)`,
and the BIC is computed as :math:`k * log(n) - 2 * log(Lik)`.
where Lik is the maximized value of the likelihood function of the model,
n the number of data point and k the number of parameter.
Looping over the 80 distributions in SciPy could takes some times so you can overwrite the
:attr:`distributions` with a subset if you want. In order to reload all distributions,
call :meth:`load_all_distributions`.
Some distributions do not converge when fitting. There is a timeout of 30 seconds after which
the fitting procedure is cancelled. You can change this :attr:`timeout` attribute if needed.
If the histogram of the data has outlier of very long tails, you may want to increase the
:attr:`bins` binning or to ignore data below or above a certain range. This can be achieved
by setting the :attr:`xmin` and :attr:`xmax` attributes. If you set xmin, you can come back to
the original data by setting xmin to None (same for xmax) or just recreate an instance.
"""
def __init__(
self,
data: np.ndarray | list[float],
xmin: float | None = None,
xmax: float | None = None,
bins: int = 100,
distributions: list[str] | str | None = None,
timeout: int = 30,
density: bool = True,
verbose: bool = True,
) -> None:
""".. rubric:: Constructor
:param list data: a numpy array or a list
:param float xmin: if None, use the data minimum value, otherwise histogram and
fits will be cut
:param float xmax: if None, use the data maximum value, otherwise histogram and
fits will be cut
:param int bins: numbers of bins to be used for the cumulative histogram. This has
an impact on the quality of the fit.
:param list distributions: give a list of distributions to look at. If none, use
all scipy distributions that have a fit method. If you want to use
only one distribution and know its name, you may provide a string (e.g.
'gamma'). Finally, you may set to 'common' to include only common
distributions, which are: cauchy, chi2, expon, exponpow, gamma,
lognorm, norm, powerlaw, irayleigh, uniform.
:param timeout: max time for a given distribution. If timeout is
reached, the distribution is skipped.
:param bool verbose: if True (default), log fitting progress messages. Set to
False to suppress all informational output.
.. versionchanged:: 1.3.0 re-add verbose argument to allow suppressing log output.
.. versionchanged:: 1.0.8 increase timeout from 10 to 30 seconds.
"""
self.verbose = verbose
self.timeout = timeout
# USER input
self._data = None
# Issue https://github.com/cokelaer/fitter/issues/22 asked for setting
# the density to False in the fitting and plotting. I first tought it
# would be possible, but the fitting is performed using the PDF of scipy
# so one would still need to normalise the data so that it is
# comparable. Therefore I do not see anyway to do it without using
# density set to True for now.
self._density = True
#: list of distributions to test
self.distributions: list[str]
if distributions is None:
self._load_all_distributions()
elif distributions == "common":
self.distributions = get_common_distributions()
elif isinstance(distributions, str):
self.distributions = [distributions]
else:
self.distributions = distributions
self.bins = bins
self._alldata: np.ndarray = np.asarray(data)
# Use ternary for cleaner code
self._xmin: float = self._alldata.min() if xmin is None else xmin
self._xmax: float = self._alldata.max() if xmax is None else xmax
self._trim_data()
self._update_data_pdf()
# Other attributes
self._init()
def _init(self) -> None:
"""Initialize result storage dictionaries."""
self.fitted_param: dict[str, tuple] = {}
self.fitted_pdf: dict[str, np.ndarray] = {}
self._fitted_errors: dict[str, float] = {}
self._aic: dict[str, float] = {}
self._bic: dict[str, float] = {}
self._kldiv: dict[str, float] = {}
self._ks_stat: dict[str, float] = {}
self._ks_pval: dict[str, float] = {}
self._fit_i: int = 0 # fit progress
def _update_data_pdf(self) -> None:
"""Compute histogram of data and convert bin edges to bin centers.
Note:
np.histogram returns N+1 bin edges for N bins. We convert to N bin centers.
"""
self.y: np.ndarray
self.x: np.ndarray
self.y, bin_edges = np.histogram(self._data, bins=self.bins, density=self._density)
# OPTIMIZATION: Vectorized bin center calculation (much faster than list comprehension)
self.x = (bin_edges[:-1] + bin_edges[1:]) / 2.0
def _trim_data(self) -> None:
"""Filter data to be within [xmin, xmax] range."""
# Vectorized boolean indexing (efficient)
self._data: np.ndarray = self._alldata[(self._alldata >= self._xmin) & (self._alldata <= self._xmax)]
def _get_xmin(self) -> float:
"""Get the minimum x value for data filtering."""
return self._xmin
def _set_xmin(self, value: float | None) -> None:
"""Set the minimum x value for data filtering."""
if value is None or value < self._alldata.min():
value = self._alldata.min()
self._xmin = value
self._trim_data()
self._update_data_pdf()
xmin = property(_get_xmin, _set_xmin, doc="consider only data above xmin. reset if None")
def _get_xmax(self) -> float:
"""Get the maximum x value for data filtering."""
return self._xmax
def _set_xmax(self, value: float | None) -> None:
"""Set the maximum x value for data filtering."""
if value is None or value > self._alldata.max():
value = self._alldata.max()
self._xmax = value
self._trim_data()
self._update_data_pdf()
xmax = property(_get_xmax, _set_xmax, doc="consider only data below xmax. reset if None")
def _load_all_distributions(self) -> None:
"""Replace the :attr:`distributions` attribute with all scipy distributions."""
self.distributions = get_distributions()
[docs]
def hist(self) -> None:
"""Draw normalized histogram of the data using :attr:`bins`.
Examples:
>>> from scipy import stats
>>> data = stats.gamma.rvs(2, loc=1.5, scale=2, size=20000)
>>> import fitter
>>> fitter.Fitter(data).hist()
"""
plt.hist(self._data, bins=self.bins, density=self._density)
plt.grid(True)
@staticmethod
def _fit_single_distribution(
distribution: str,
data: np.ndarray,
x: np.ndarray,
y: np.ndarray,
timeout: int,
verbose: bool = True,
) -> tuple[str, tuple | None]:
"""Fit a single distribution to data and compute goodness-of-fit metrics.
Args:
distribution: Name of the scipy.stats distribution to fit.
data: Raw data array to fit.
x: Bin centers for histogram comparison.
y: Histogram density values.
timeout: Maximum time allowed for fitting (seconds).
verbose: If True, log fitting progress messages.
Returns:
Tuple of (distribution_name, results_tuple) where results_tuple contains
(params, pdf_fitted, sq_error, aic, bic, kl_div, ks_stat, ks_pval)
or None if fitting failed.
"""
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=IntegrationWarning)
try:
# BUGFIX: Replace eval() with getattr() - safer and faster
dist = getattr(scipy.stats, distribution)
param = Fitter._with_timeout(dist.fit, args=(data,), timeout=timeout)
# Compute PDF at bin centers for visualization
pdf_fitted = dist.pdf(x, *param)
# Calculate sum of squared errors between fitted PDF and histogram
sq_error = np.sum((pdf_fitted - y) ** 2)
# CRITICAL BUGFIX: logLik should be computed on DATA, not bin centers
# Original used x (bins) which gives wrong likelihood
logLik = np.sum(dist.logpdf(data, *param))
k = len(param) # Number of parameters
n = len(data) # Number of data points
# Akaike Information Criterion: AIC = 2k - 2*ln(L)
aic = 2 * k - 2 * logLik
# Bayesian Information Criterion: BIC = k*ln(n) - 2*ln(L)
bic = k * np.log(n) - 2 * logLik
# Calculate Kullback-Leibler divergence (requires positive values)
# Add small epsilon to avoid log(0) issues
eps = 1e-10
kullback_leibler = kl_div(pdf_fitted + eps, y + eps)
# Create frozen distribution for efficient CDF evaluation
dist_fitted = dist(*param)
# Validate that the CDF is bounded within [0, 1] over the data range.
# Some distributions (e.g. geninvgauss) can return CDF values slightly
# above 1 due to numerical issues, which indicates an invalid fit.
cdf_values = dist_fitted.cdf(x)
if np.any(cdf_values > 1) or np.any(cdf_values < 0):
if verbose:
logger.warning(
f"SKIPPED {distribution}: CDF values outside [0, 1] "
f"(min={cdf_values.min():.6g}, max={cdf_values.max():.6g})"
)
return distribution, None
# Calculate Kolmogorov-Smirnov goodness-of-fit statistic
ks_stat, ks_pval = kstest(data, dist_fitted.cdf)
if verbose:
logger.info(f"Fitted {distribution}: error={sq_error:.6f}, " f"AIC={aic:.2f}, KS={ks_stat:.4f}")
return distribution, (
param,
pdf_fitted,
sq_error,
aic,
bic,
kullback_leibler,
ks_stat,
ks_pval,
)
except Exception as e: # pragma: no cover
if verbose:
logger.warning(f"SKIPPED {distribution}: {type(e).__name__} " f"(timeout={timeout}s or fitting failed)")
return distribution, None
[docs]
def fit(
self,
progress: bool = False,
n_jobs: int = -1,
max_workers: int = -1,
prefer: str = "processes",
) -> None:
r"""Fit all distributions to the data and compute goodness-of-fit metrics.
Loops over all distributions in parallel and finds the best parameters to fit
the data. Populates the following attributes:
- :attr:`df_errors`: DataFrame with sum of squared errors and information criteria
- :attr:`fitted_param`: Parameters that best fit the data for each distribution
- :attr:`fitted_pdf`: PDF values generated with the fitted parameters
Args:
progress: If True, display progress bar during fitting.
n_jobs: Number of jobs for parallel processing (deprecated, use max_workers).
max_workers: Number of parallel workers (-1 for all CPUs).
prefer: Joblib parallelization method ('processes' or 'threads').
Note:
The fitting uses parallel processing for speed. Distributions that fail
or timeout are assigned infinite error values.
"""
n_dists = len(self.distributions)
with tqdm_joblib(
desc=f"Fitting {n_dists} distributions",
total=n_dists,
disable=not progress,
) as progress_bar:
results = Parallel(n_jobs=max_workers, prefer=prefer)(
delayed(Fitter._fit_single_distribution)(dist, self._data, self.x, self.y, self.timeout, self.verbose)
for dist in self.distributions
)
# Process results and populate dictionaries
for distribution, values in results:
if values is not None:
(
param,
pdf_fitted,
sq_error,
aic,
bic,
kullback_leibler,
ks_stat,
ks_pval,
) = values
self.fitted_param[distribution] = param
self.fitted_pdf[distribution] = pdf_fitted
self._fitted_errors[distribution] = sq_error
self._aic[distribution] = aic
self._bic[distribution] = bic
self._kldiv[distribution] = kullback_leibler
self._ks_stat[distribution] = ks_stat
self._ks_pval[distribution] = ks_pval
else:
# Assign infinity for failed fits
self._fitted_errors[distribution] = np.inf
self._aic[distribution] = np.inf
self._bic[distribution] = np.inf
self._kldiv[distribution] = np.inf
self._ks_stat[distribution] = np.inf
self._ks_pval[distribution] = 0.0
# Create results DataFrame
self.df_errors: pd.DataFrame = pd.DataFrame(
{
"sumsquare_error": self._fitted_errors,
"aic": self._aic,
"bic": self._bic,
"kl_div": self._kldiv,
"ks_statistic": self._ks_stat,
"ks_pvalue": self._ks_pval,
}
)
self.df_errors.sort_index(inplace=True)
[docs]
def plot_pdf(
self,
names: str | list[str] | None = None,
Nbest: int = 5,
lw: float = 2,
method: str = "sumsquare_error",
) -> None:
"""Plot probability density functions of fitted distributions.
Args:
names: Distribution name(s) to plot. If None, plots the Nbest distributions.
Can be a single string or list of strings.
Nbest: Number of best-fitting distributions to plot (when names is None).
lw: Line width for the plots.
method: Metric to use for ranking distributions ('sumsquare_error', 'aic', 'bic', etc.).
"""
assert Nbest > 0, "Nbest must be positive"
Nbest = min(Nbest, len(self.distributions))
if isinstance(names, list):
for name in names:
if name in self.fitted_pdf:
plt.plot(self.x, self.fitted_pdf[name], lw=lw, label=name)
else:
logger.warning(f"{name} was not fitted successfully")
elif names:
if names in self.fitted_pdf:
plt.plot(self.x, self.fitted_pdf[names], lw=lw, label=names)
else:
logger.warning(f"{names} was not fitted successfully")
else:
# Get best N distributions by specified method
try:
best_names = self.df_errors.sort_values(by=method).index[:Nbest]
except Exception:
# Fallback for older pandas versions
best_names = self.df_errors.sort_values(method).index[:Nbest]
for name in best_names:
if name in self.fitted_pdf:
plt.plot(self.x, self.fitted_pdf[name], lw=lw, label=name)
else: # pragma: no cover
logger.warning(f"{name} was not fitted. No parameters available")
plt.grid(True)
plt.legend()
[docs]
def get_best(self, method: str = "sumsquare_error") -> dict[str, dict[str, float]]:
"""Return the best fitted distribution and its parameters.
Args:
method: Metric to use for ranking ('sumsquare_error', 'aic', 'bic', etc.).
Returns:
Dictionary with distribution name as key and parameter dictionary as value.
Example: {'gamma': {'a': 2.0, 'loc': 1.5, 'scale': 2.0}}
"""
# Get best distribution (lowest error/AIC/BIC)
best_name = self.df_errors.sort_values(method).iloc[0].name
params = self.fitted_param[best_name]
distribution = getattr(scipy.stats, best_name)
# Extract parameter names from distribution
if distribution.shapes:
param_names = (distribution.shapes + ", loc, scale").split(", ")
else:
param_names = ["loc", "scale"]
# Create parameter dictionary using dict comprehension (faster)
param_dict = dict(zip(param_names, params))
return {best_name: param_dict}
[docs]
def summary(
self,
Nbest: int = 5,
lw: float = 2,
plot: bool = True,
method: str = "sumsquare_error",
clf: bool = True,
) -> pd.DataFrame:
"""Display summary of best fitting distributions.
Args:
Nbest: Number of best distributions to include in summary.
lw: Line width for plots.
plot: If True, create histogram and PDF overlay plot.
method: Metric to use for ranking distributions.
clf: If True, clear figure before plotting.
Returns:
DataFrame with fitting results for the Nbest distributions.
"""
if plot:
if clf:
plt.clf()
self.hist()
self.plot_pdf(Nbest=Nbest, lw=lw, method=method)
plt.grid(True)
Nbest = min(Nbest, len(self.distributions))
try:
best_names = self.df_errors.sort_values(by=method).index[:Nbest]
except Exception: # pragma: no cover
# Fallback for older pandas versions
best_names = self.df_errors.sort_values(method).index[:Nbest]
return self.df_errors.loc[best_names]
@staticmethod
def _with_timeout(
func: Any,
args: tuple = (),
kwargs: dict[str, Any] | None = None,
timeout: int = 30,
) -> Any:
"""Execute a function with a timeout limit.
Args:
func: Function to execute.
args: Positional arguments for the function.
kwargs: Keyword arguments for the function.
timeout: Maximum execution time in seconds.
Returns:
Result of the function call.
Raises:
TimeoutError: If function execution exceeds timeout.
"""
if kwargs is None:
kwargs = {}
with multiprocessing.pool.ThreadPool(1) as pool:
async_result = pool.apply_async(func, args, kwargs)
return async_result.get(timeout=timeout)
""" For book-keeping
Another way to prevent a statement to run for a long time and to
stop it is to use the signal module but did not work with scipy presumably
because a try/except inside the distribution function interferes
def handler(signum, frame):
raise Exception("end of time")
import signal
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout)
try:
param = dist.fit(data)
except Exception as err:
print(err.message)
"""