from __future__ import division
from copy import deepcopy
import numpy as np
import scipy.optimize
from .parameter import ParameterSet
from .expr import Expr
from time import time

class Curve(object):
    def __init__(self, name, model, data, weights=None, **user_args):
        A curve to be fit against.

        :param name: A friendly name for the curve.
        :type model: :class:`Expr`
        :param model: The model to be fitted.
        :type weights: array of shape `(nsamples,)`
        :param data: The values of the dependent variable to fit against.
        :param weights: Weighing factors.
        self._name = name
        self._model = model
        self._data = data
        self._weights = weights
        self._user_args = user_args

    def name(self):
        """ The friendly name of the curve """
        return self._name

    def model(self):
        """ The model to be fitted """
        return self._model

    def data(self):
        """ The values of the dependent variable to be fitted against """
        return self._data

    def weights(self):
        """ The fitting weights """
        return self._weights

    def user_args(self):
        """ User arguments """
        return self._user_args

    def eval_packed(self, params, **user_args):
        """ Evaluate the model """
        args = self.user_args.copy()
        return self.model.evaluate(params, **args)

    def residuals_packed(self, params, weighted=True, **user_args):
        Compute the residuals between the curve and its model (:math:`model - data`).

        :type params: array of shape (nparams,)
        :param params: Packed array of parameter values.
        :type weighted: bool
        :param weighted: If False weights are ignored.
        :rtype: array of shape (npoints,)
        residuals = self.eval_packed(params, **user_args) -
        if weighted and self.weights is not None:
            residuals *= self.weights
        return residuals

[docs]class Fit(object): """ This represents a fit configuration. """ def __init__(self): """ Create a new fit configuration. """ self._curves = [] self.param_set = ParameterSet()
[docs] def param(self, name=None, initial=None): """ Create a new parameter. :type name: str, optional :param name: The name of the new parameter. A name will be generated if this is omitted. :type initial: float, optional :param initial: The initial value of the parameter. If not provided then a value must be provided when :func:`fit` is called. :rtype: :class:`.parameter.FittedParam` """ return self.param_set.param(name, initial=initial)
[docs] def add_curve(self, name, model, data, weights=None, **user_args): """ Add a curve to the fit. :type name: str :param name: A unique name for the curve. :type model: :class:`Expr` :param model: The analytical model which will be fit to the curve. :type data: array, shape = [n_samples] :param data: An array of the dependent values for each sample. :type weights: array, shape = [n_samples], optional :param weights: An array of the weight of each sample. Often this is :math:`1 / \sigma` where `\sigma` is the standard deviation of the dependent value. If not given uniform weights are used. :type user_args: kwargs :param user_args: Keyword arguments passed to the model during evaluation. """ if not isinstance(model, Expr): raise ValueError('Given model (%s) must be instance of Expr' % model) curve = Curve(name, model, data, weights, **user_args) self._curves.append(curve)
[docs] def eval_packed(self, params, **user_args): """ Evaluate the model against packed parameters values :type params: array, shape = [n_params] :param params: A packed array of parameters. :type user_args: kwargs :param user_args: Keyword arguments passed to the model. :rtype: dict, curve_name -> array, shape = [n_samples] """ return { curve.eval_packed(params, **user_args) for curve in self._curves}
[docs] def residuals_packed(self, params, weighted=True, **user_args): """ Compute the weighed residuals against packed paramater values. See :func:`eval_packed` for parameter descriptions. """ return { curve.residuals_packed(params, weighted, **user_args) for curve in self._curves}
[docs] def eval(self, params, **user_args): """ Evaluate the model against a dictionary of parameters :type params: dict, param_name -> float :param params: A dictionary of parameters values. :type user_args: kwargs :param user_args: Keyword arguments passed to the model. :rtype: dict, curve_name -> array, shape = [n_samples] :returns: The value of the model evaluated with the given parameters. """ return self.eval_packed(self.param_set._pack(params), **user_args)
[docs] def residuals(self, params, weighted=True, **user_args): """ Evaluate the weighted model residuals against a dictionary of parameters values. See :func:`eval` for parameter descriptions. """ return self.residuals_packed(self.param_set._pack(params), weighted, **user_args)
[docs] def fit(self, params0=None, report_progress=None, **user_args): """ Carry out the fit. :type params0: dict, param_name -> float :param params0: The initial parameter values. :type report_progress: float or None :param report_progress: Period with which to report on fit progress (in seconds). :type user_args: kwargs :param user_args: Keyword arguments passed to the model. :rtype: A :class:`FitResults` object. """ unpacked = self.param_set.initial_params() if params0 is not None: unpacked.update(params0) packed0 = self.param_set._pack(unpacked) last_call = time() n_evals = 0 def fit_func(p): nonlocal n_evals, last_call res = self.residuals_packed(p, **user_args) res = np.hstack(res.values()) n_evals += 1 t = time() if report_progress is not None and last_call + report_progress < t: last_call = t print('%d evaluations, ChiĀ²=%s' % (n_evals, np.sum(res**2))) print('Parameters:') print('\n'.join(' %s = %1.4g' % (k,v) for k,v in sorted(self.param_set._unpack(p).items()))) return res packed, cov_x, info, mesg, ier = scipy.optimize.leastsq(fit_func, packed0, full_output=True) def unpack_covar(matrix): return {name: self.param_set._unpack(inner) for name, inner in self.param_set._unpack(matrix).items()} if cov_x is None: cov_p = None else: nparams = len(self.param_set.params) npts = sum(len( for curve in self._curves) red_chisq = np.sum(info['fvec']**2) / (npts - nparams) cov_p = unpack_covar(cov_x * red_chisq) params = self.param_set._unpack(packed) initial = self.param_set._unpack(packed0) fit0 = FitResult(deepcopy(self), initial) fit = FitResult(deepcopy(self), params, cov_p, initial_result=fit0) return fit
class BoundedFit(Fit): """ A fit configuration which supports optimization over a bounded parameter space. Internally this uses the more flexible :func:`scipy.optimize.minimize` to perform the optimzation, in contrast to :class:`Fit` which uses :func:`scipy.optimize.leastsq`. """ def fit(self, params0=None, bounds={}, method='L-BFGS-B', report_progress=None, **user_args): """ Carry out the fit :type params0: dict, param_name -> float :param params0: The initial parameter values. :type bounds: dict, param_name -> (min,max) :param bounds: Parameter minimum and maximum bounds. :type method: str :param method: Optimization method to use. See :func:`scipy.optimize.minimize` for options. :type report_progress: float or None :param report_progress: Period with which to report on fit progress (in seconds). :type user_args: kwargs :param user_args: Keyword arguments passed to the model. :rtype: A :class:`FitResults` object. """ unpacked = self.param_set.initial_params() if params0 is not None: unpacked.update(params0) packed0 = self.param_set._pack(unpacked) last_call = time() n_evals = 0 def fit_func(p): nonlocal n_evals, last_call res = self.residuals_packed(p, **user_args) res = np.hstack(res.values()) res2 = np.sum(res**2) n_evals += 1 t = time() if report_progress is not None and last_call + report_progress < t: last_call = t print('%d evaluations, ChiĀ²=%s' % (n_evals, res2)) print('Parameters:') print('\n'.join(' %s = %1.4g' % (k,v) for k,v in sorted(self.param_set._unpack(p).items()))) return res2 if len(bounds) == 0: bounds = None else: bounds = [bounds.get(p, (None, None)) for p in self.param_set.param_names()] result = scipy.optimize.minimize(fit_func, packed0, method=method, bounds=bounds) cov_p = None params = self.param_set._unpack(result.x) initial = self.param_set._unpack(packed0) fit0 = FitResult(deepcopy(self), initial) fit = FitResult(deepcopy(self), params, cov_p, initial_result=fit0) return fit
[docs]class CurveResult(object): """ This embodies a set of parameter values describing the goodness-of-fit with respect to a given curve. """ def __init__(self, fit_result, curve): params = fit_result.params self._fit_result = fit_result self._curve = curve self._fit = self.eval() self._residuals = self.curve.residuals_packed( self._chi_sqr = sum(self.residuals**2) @property def fit_result(self): """ The :class:`FitResult` this :class:`CurveResult` was derived from. :rtype: :class:`FitResult` """ return self._fit_result @property def curve(self): """ The curve described by this result :rtype: :class:`Curve` """ return self._curve @property def fit(self): """ The model evaluated with the parameter values. :rtype: ndarray """ return self._fit @property def degrees_of_freedom(self): """ The number of degrees-of-freedom of the fit. This is defined as the number of data points in the curve minus the number of free parameters in the fitting model. :rtype: int """ if self.curve.weights is not None: npoints = np.count_nonzero(self.curve.weights) else: npoints = len( return npoints - len(self.curve.model.parameters()) @property def residuals(self): """ The weighted residuals of the fit. :rtype: ndarray """ return self._residuals @property def chi_sqr(self): """ Chi-squared of the fit. :rtype: float """ return self._chi_sqr @property def reduced_chi_sqr(self): """ Reduced chi-squared of the fit. Namely, ``chi_sqr / degrees_of_freedom``. :rtype: float """ return self.chi_sqr / self.degrees_of_freedom
[docs] def eval(self, **user_args): """ Evaluate the curve's model with overridden user arguments. """ packed = return self.curve.eval_packed(packed, **user_args)
[docs]class FitResult(object): """ This embodies a set of parameter values, possibly originating from a fit. """ def __init__(self, fit, params, covar_p=None, initial_result=None): self._fit = fit self._initial = initial_result self._params = params # unpacked self._curves = { CurveResult(self, curve) for curve in fit._curves} self._covar_p = covar_p
[docs] def eval(self, expr): """ Evaluate the given :class:`Expr` against the fitted parameter values. :type expr: :class:`Expr` :param expr: The expression to evaluate. :returns: The evaluation of ``expr`` """ if isinstance(expr, Expr): packed = return expr.evaluate(packed) else: return expr
@property def fit(self): """ The :class:`Fit` to which these parameter apply. :rtype: :class:`Fit` """ return self._fit @property def initial(self): """ The :class:`FitResult` used as the initial parameters for the :class:`Fit` from which this result originated. :rtype: :class:`FitResult` """ return self._initial @property def params(self): """ The fitted parameter values :rtype: array, shape = [n_params] """ return self._params @property def curves(self): """ Results for particular curves. :rtype: dict, curve_name -> :class:`CurveResult` """ return self._curves @property def covar(self): """ The covariances between parameters, or ``None`` if not available (which may either be due to numerical trouble in calculation or simply not being provided when the :class:`FitResult` was created). :rtype: dict of dicts, param_name -> param_name -> float or ``None`` """ return self._covar_p @property def stderr(self): """ The standard error of the parameter estimate or ``None`` if not available. :rtype: dict, param_name -> float or ``None`` """ if self._covar_p is None: return None return {name: np.sqrt(self._covar_p[name][name]) for name in self.params} @property def correl(self): """ The correlation coefficient between parameters or ``None`` if not available. :rtype: dict, param_name -> param_name -> float or ``None`` """ if self._covar_p is None: return None stderr = self.stderr return {name: {name2: self._covar_p[name][name2] / stderr[name] / stderr[name2] for name2 in self.params if name != name2} for name in self.params} @property def total_chi_sqr(self): """ The sum of the individual curves' chi-squared values. This can be useful as a global goodness-of-fit metric. :rtype: float """ return sum(curve.chi_sqr for curve in self.curves.values()) def _repr_html_(self): from .pretty import ipynb_fit_result return ipynb_fit_result(self)