Source code for fitter.fitter

#  This file is part of the fitter software
#
#  Copyright (c) 2014-2022
#
#  File author(s): Thomas Cokelaer <cokelaer@gmail.com>
#
#  Distributed under the GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      http://www.gnu.org/licenses/gpl-3.0.html
#
#  source: https://github.com/cokelaer/fitter
#  Documentation: http://packages.python.org/fitter
#  Package: http://pypi.python.org/fitter
#
##############################################################################
"""main module of the fitter package

.. sectionauthor:: Thomas Cokelaer, Aug 2014-2020

"""
import contextlib
import sys
import threading
from datetime import datetime

import joblib
import numpy as np
import pandas as pd
import pylab
import scipy.stats
from joblib.parallel import Parallel, delayed
from loguru import logger
from scipy.stats import entropy as kl_div
from scipy.stats import kstest
from tqdm import tqdm

__all__ = ["get_common_distributions", "get_distributions", "Fitter"]


# A solution to wrap joblib parallel call in tqdm from
# https://stackoverflow.com/questions/24983493/tracking-progress-of-joblib-parallel-execution/58936697#58936697
# and https://github.com/louisabraham/tqdm_joblib
@contextlib.contextmanager
def tqdm_joblib(*args, **kwargs):
    """Context manager to patch joblib to report into tqdm progress bar
    given as argument"""

    tqdm_object = tqdm(*args, **kwargs)

    class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)

        def __call__(self, *args, **kwargs):
            tqdm_object.update(n=self.batch_size)
            return super().__call__(*args, **kwargs)

    old_batch_callback = joblib.parallel.BatchCompletionCallBack
    joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
    try:
        yield tqdm_object
    finally:
        joblib.parallel.BatchCompletionCallBack = old_batch_callback
        tqdm_object.close()


def get_distributions():
    distributions = []
    for this in dir(scipy.stats):
        if "fit" in eval("dir(scipy.stats." + this + ")"):
            distributions.append(this)
    return distributions


def get_common_distributions():
    distributions = get_distributions()
    # to avoid error due to changes in scipy
    common = [
        "cauchy",
        "chi2",
        "expon",
        "exponpow",
        "gamma",
        "lognorm",
        "norm",
        "powerlaw",
        "rayleigh",
        "uniform",
    ]
    common = [x for x in common if x in distributions]
    return common


[docs] class Fitter(object): """Fit a data sample to known distributions A naive approach often performed to figure out the undelying distribution that could have generated a data set, is to compare the histogram of the data with a PDF (probability distribution function) of a known distribution (e.g., normal). Yet, the parameters of the distribution are not known and there are lots of distributions. Therefore, an automatic way to fit many distributions to the data would be useful, which is what is implemented here. Given a data sample, we use the `fit` method of SciPy to extract the parameters of that distribution that best fit the data. We repeat this for all available distributions. Finally, we provide a summary so that one can see the quality of the fit for those distributions Here is an example where we generate a sample from a gamma distribution. :: >>> # First, we create a data sample following a Gamma distribution >>> from scipy import stats >>> data = stats.gamma.rvs(2, loc=1.5, scale=2, size=20000) >>> # We then create the Fitter object >>> import fitter >>> f = fitter.Fitter(data) >>> # just a trick to use only 10 distributions instead of 80 to speed up the fitting >>> f.distributions = f.distributions[0:10] + ['gamma'] >>> # fit and plot >>> f.fit() >>> f.summary() sumsquare_error aic bic kl_div ks_statistic ks_pvalue loggamma 0.001176 995.866732 -159536.164644 inf 0.008459 0.469031 gennorm 0.001181 993.145832 -159489.437372 inf 0.006833 0.736164 norm 0.001189 992.975187 -159427.247523 inf 0.007138 0.685416 truncnorm 0.001189 996.975182 -159408.826771 inf 0.007138 0.685416 crystalball 0.001189 996.975078 -159408.821960 inf 0.007138 0.685434 Once the data has been fitted, the :meth:`summary` method returns a sorted dataframe where the index represents the distribution names. The AIC is computed using :math:`aic = 2 * k - 2 * log(Lik)`, and the BIC is computed as :math:`k * log(n) - 2 * log(Lik)`. where Lik is the maximized value of the likelihood function of the model, n the number of data point and k the number of parameter. Looping over the 80 distributions in SciPy could takes some times so you can overwrite the :attr:`distributions` with a subset if you want. In order to reload all distributions, call :meth:`load_all_distributions`. Some distributions do not converge when fitting. There is a timeout of 30 seconds after which the fitting procedure is cancelled. You can change this :attr:`timeout` attribute if needed. If the histogram of the data has outlier of very long tails, you may want to increase the :attr:`bins` binning or to ignore data below or above a certain range. This can be achieved by setting the :attr:`xmin` and :attr:`xmax` attributes. If you set xmin, you can come back to the original data by setting xmin to None (same for xmax) or just recreate an instance. """ def __init__( self, data, xmin=None, xmax=None, bins=100, distributions=None, timeout=30, density=True, ): """.. rubric:: Constructor :param list data: a numpy array or a list :param float xmin: if None, use the data minimum value, otherwise histogram and fits will be cut :param float xmax: if None, use the data maximum value, otherwise histogram and fits will be cut :param int bins: numbers of bins to be used for the cumulative histogram. This has an impact on the quality of the fit. :param list distributions: give a list of distributions to look at. If none, use all scipy distributions that have a fit method. If you want to use only one distribution and know its name, you may provide a string (e.g. 'gamma'). Finally, you may set to 'common' to include only common distributions, which are: cauchy, chi2, expon, exponpow, gamma, lognorm, norm, powerlaw, irayleigh, uniform. :param timeout: max time for a given distribution. If timeout is reached, the distribution is skipped. .. versionchanged:: 1.2.1 remove verbose argument, replacedb by logging module. .. versionchanged:: 1.0.8 increase timeout from 10 to 30 seconds. """ self.timeout = timeout # USER input self._data = None # Issue https://github.com/cokelaer/fitter/issues/22 asked for setting # the density to False in the fitting and plotting. I first tought it # would be possible, but the fitting is performed using the PDF of scipy # so one would still need to normalise the data so that it is # comparable. Therefore I do not see anyway to do it without using # density set to True for now. self._density = True #: list of distributions to test self.distributions = distributions if self.distributions == None: self._load_all_distributions() elif self.distributions == "common": self.distributions = get_common_distributions() elif isinstance(distributions, str): self.distributions = [distributions] self.bins = bins self._alldata = np.array(data) if xmin == None: self._xmin = self._alldata.min() else: self._xmin = xmin if xmax == None: self._xmax = self._alldata.max() else: self._xmax = xmax self._trim_data() self._update_data_pdf() # Other attributes self._init() def _init(self): self.fitted_param = {} self.fitted_pdf = {} self._fitted_errors = {} self._aic = {} self._bic = {} self._kldiv = {} self._ks_stat = {} self._ks_pval = {} self._fit_i = 0 # fit progress # self.pb = None def _update_data_pdf(self): # histogram retuns X with N+1 values. So, we rearrange the X output into only N self.y, self.x = np.histogram(self._data, bins=self.bins, density=self._density) self.x = [(this + self.x[i + 1]) / 2.0 for i, this in enumerate(self.x[0:-1])] def _trim_data(self): self._data = self._alldata[np.logical_and(self._alldata >= self._xmin, self._alldata <= self._xmax)] def _get_xmin(self): return self._xmin def _set_xmin(self, value): if value == None: value = self._alldata.min() elif value < self._alldata.min(): value = self._alldata.min() self._xmin = value self._trim_data() self._update_data_pdf() xmin = property(_get_xmin, _set_xmin, doc="consider only data above xmin. reset if None") def _get_xmax(self): return self._xmax def _set_xmax(self, value): if value == None: value = self._alldata.max() elif value > self._alldata.max(): value = self._alldata.max() self._xmax = value self._trim_data() self._update_data_pdf() xmax = property(_get_xmax, _set_xmax, doc="consider only data below xmax. reset if None ") def _load_all_distributions(self): """Replace the :attr:`distributions` attribute with all scipy distributions""" self.distributions = get_distributions()
[docs] def hist(self): """Draw normed histogram of the data using :attr:`bins` .. plot:: >>> from scipy import stats >>> data = stats.gamma.rvs(2, loc=1.5, scale=2, size=20000) >>> # We then create the Fitter object >>> import fitter >>> fitter.Fitter(data).hist() """ _ = pylab.hist(self._data, bins=self.bins, density=self._density) pylab.grid(True)
def _fit_single_distribution(self, distribution): try: # need a subprocess to check time it takes. If too long, skip it dist = eval("scipy.stats." + distribution) # TODO here, dist.fit may take a while or just hang forever # with some distributions. So, I thought to use signal module # to catch the error when signal takes too long. It did not work # presumably because another try/exception is inside the # fit function, so I used threading with a recipe from stackoverflow # See timed_run function above param = self._timed_run(dist.fit, distribution, args=self._data) # with signal, does not work. maybe because another expection is caught # hoping the order returned by fit is the same as in pdf pdf_fitted = dist.pdf(self.x, *param) self.fitted_param[distribution] = param[:] self.fitted_pdf[distribution] = pdf_fitted # calculate error sq_error = pylab.sum((self.fitted_pdf[distribution] - self.y) ** 2) # calculate information criteria logLik = np.sum(dist.logpdf(self.x, *param)) k = len(param[:]) n = len(self._data) aic = 2 * k - 2 * logLik # special case of gaussian distribution # bic = n * np.log(sq_error / n) + k * np.log(n) # general case: bic = k * pylab.log(n) - 2 * logLik # calculate kullback leibler divergence kullback_leibler = kl_div(self.fitted_pdf[distribution], self.y) # calculate goodness-of-fit statistic dist_fitted = dist(*param) ks_stat, ks_pval = kstest(self._data, dist_fitted.cdf) logger.info("Fitted {} distribution with error={})".format(distribution, round(sq_error, 6))) # compute some errors now self._fitted_errors[distribution] = sq_error self._aic[distribution] = aic self._bic[distribution] = bic self._kldiv[distribution] = kullback_leibler self._ks_stat[distribution] = ks_stat self._ks_pval[distribution] = ks_pval except Exception: # pragma: no cover logger.warning("SKIPPED {} distribution (taking more than {} seconds)".format(distribution, self.timeout)) # if we cannot compute the error, set it to large values self._fitted_errors[distribution] = np.inf self._aic[distribution] = np.inf self._bic[distribution] = np.inf self._kldiv[distribution] = np.inf
[docs] def fit(self, progress=False, n_jobs=-1, max_workers=-1): r"""Loop over distributions and find best parameter to fit the data for each When a distribution is fitted onto the data, we populate a set of dataframes: - :attr:`df_errors` :sum of the square errors between the data and the fitted distribution i.e., :math:`\sum_i \left( Y_i - pdf(X_i) \right)^2` - :attr:`fitted_param` : the parameters that best fit the data - :attr:`fitted_pdf` : the PDF generated with the parameters that best fit the data Indices of the dataframes contains the name of the distribution. """ import warnings warnings.filterwarnings("ignore", category=RuntimeWarning) N = len(self.distributions) with tqdm_joblib(desc=f"Fitting {N} distributions", total=N, disable=not progress) as progress_bar: Parallel(n_jobs=max_workers, backend="threading")( delayed(self._fit_single_distribution)(dist) for dist in self.distributions ) self.df_errors = pd.DataFrame( { "sumsquare_error": self._fitted_errors, "aic": self._aic, "bic": self._bic, "kl_div": self._kldiv, "ks_statistic": self._ks_stat, "ks_pvalue": self._ks_pval, } )
[docs] def plot_pdf(self, names=None, Nbest=5, lw=2, method="sumsquare_error"): """Plots Probability density functions of the distributions :param str,list names: names can be a single distribution name, or a list of distribution names, or kept as None, in which case, the first Nbest distribution will be taken (default to best 5) """ assert Nbest > 0 if Nbest > len(self.distributions): Nbest = len(self.distributions) if isinstance(names, list): for name in names: pylab.plot(self.x, self.fitted_pdf[name], lw=lw, label=name) elif names: pylab.plot(self.x, self.fitted_pdf[names], lw=lw, label=names) else: try: names = self.df_errors.sort_values(by=method).index[0:Nbest] except Exception: names = self.df_errors.sort(method).index[0:Nbest] for name in names: if name in self.fitted_pdf.keys(): pylab.plot(self.x, self.fitted_pdf[name], lw=lw, label=name) else: # pragma: no cover logger.warning("%s was not fitted. no parameters available" % name) pylab.grid(True) pylab.legend()
[docs] def get_best(self, method="sumsquare_error"): """Return best fitted distribution and its parameters a dictionary with one key (the distribution name) and its parameters """ # self.df should be sorted, so then us take the first one as the best name = self.df_errors.sort_values(method).iloc[0].name params = self.fitted_param[name] distribution = getattr(scipy.stats, name) param_names = (distribution.shapes + ", loc, scale").split(", ") if distribution.shapes else ["loc", "scale"] param_dict = {} for d_key, d_val in zip(param_names, params): param_dict[d_key] = d_val return {name: param_dict}
[docs] def summary(self, Nbest=5, lw=2, plot=True, method="sumsquare_error", clf=True): """Plots the distribution of the data and N best distributions""" if plot: if clf: pylab.clf() self.hist() self.plot_pdf(Nbest=Nbest, lw=lw, method=method) pylab.grid(True) Nbest = min(Nbest, len(self.distributions)) try: names = self.df_errors.sort_values(by=method).index[0:Nbest] except: # pragma: no cover names = self.df_errors.sort(method).index[0:Nbest] return self.df_errors.loc[names]
def _timed_run(self, func, distribution, args=(), kwargs={}, default=None): """This function will spawn a thread and run the given function using the args, kwargs and return the given default value if the timeout is exceeded. http://stackoverflow.com/questions/492519/timeout-on-a-python-function-call """ class InterruptableThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.result = default self.exc_info = (None, None, None) def run(self): try: self.result = func(args, **kwargs) except Exception as err: # pragma: no cover self.exc_info = sys.exc_info() def suicide(self): # pragma: no cover raise RuntimeError("Stop has been called") it = InterruptableThread() it.start() started_at = datetime.now() it.join(self.timeout) ended_at = datetime.now() diff = ended_at - started_at if it.exc_info[0] is not None: # pragma: no cover ; if there were any exceptions a, b, c = it.exc_info raise Exception(a, b, c) # communicate that to caller if it.is_alive(): # pragma: no cover it.suicide() raise RuntimeError else: return it.result
""" For book-keeping Another way to prevent a statement to run for a long time and to stop it is to use the signal module but did not work with scipy presumably because a try/except inside the distribution function interferes def handler(signum, frame): raise Exception("end of time") import signal signal.signal(signal.SIGALRM, handler) signal.alarm(timeout) try: param = dist.fit(data) except Exception as err: print(err.message) """