Source code for regressproxy.models_cel

# -*- coding: utf-8 -*-
# vim:fileencoding=utf-8
#
# Copyright (c) 2017-2022 Stefan Bender
#
# This module is part of pyregressproxy.
# pyregressproxy is free software: you can redistribute it or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, version 2.
# See accompanying LICENSE file or http://www.gnu.org/licenses/gpl-2.0.html.
"""Proxy classes for regression analysis (celerite version)

Proxy model classes for regression analysis using the
:mod:`celerite.modeling` modelling protocol [#]_.

.. [#] https://celerite.readthedocs.io/en/stable/python/modeling/
"""
from __future__ import absolute_import, division, print_function

import numpy as np

from celerite.modeling import Model, ModelSet, ConstantModel

__all__ = [
	"ConstantModel",
	"HarmonicModelCosineSine", "HarmonicModelAmpPhase",
	"ProxyModel", "ProxyModelSet",
	"proxy_model_set",
	"setup_proxy_model_with_bounds",
]


[docs]class HarmonicModelCosineSine(Model): """Model for harmonic terms Models a harmonic term using a cosine and sine part. The amplitude and phase returned are that for a sine function, i.e. amplitude * sin(t + phase). Parameters ---------- freq : float The frequency in years^-1 cos : float The amplitude of the cosine part sin : float The amplitude of the sine part See Also -------- HarmonicModelAmpPhase """ parameter_names = ("freq", "cos", "sin")
[docs] def get_value(self, t): t = np.atleast_1d(t) return (self.cos * np.cos(self.freq * 2 * np.pi * t) + self.sin * np.sin(self.freq * 2 * np.pi * t))
[docs] def get_amplitude(self): return np.sqrt(self.cos**2 + self.sin**2)
[docs] def get_phase(self): return np.arctan2(self.cos, self.sin)
[docs] def compute_gradient(self, t): t = np.atleast_1d(t) dcos = np.cos(self.freq * 2 * np.pi * t) dsin = np.sin(self.freq * 2 * np.pi * t) df = 2 * np.pi * t * (self.sin * dcos - self.cos * dsin) return np.array([df, dcos, dsin])
[docs]class HarmonicModelAmpPhase(Model): """Model for harmonic terms Models a harmonic term using amplitude and phase of a sine. It hase the same phase as returned by ``HarmonicModelCosineSine.get_phase()``. Parameters ---------- freq : float The frequency in years^-1 amp : float The amplitude of the harmonic term phase : float The phase of the harmonic part See Also -------- HarmonicModelCosineSine """ parameter_names = ("freq", "amp", "phase")
[docs] def get_value(self, t): t = np.atleast_1d(t) return self.amp * np.sin(self.freq * 2 * np.pi * t + self.phase)
[docs] def get_amplitude(self): return self.amp
[docs] def get_phase(self): return self.phase
[docs] def compute_gradient(self, t): t = np.atleast_1d(t) damp = np.sin(self.freq * 2 * np.pi * t + self.phase) dphi = self.amp * np.cos(self.freq * 2 * np.pi * t + self.phase) df = 2 * np.pi * t * dphi return np.array([df, damp, dphi])
[docs]class ProxyModel(Model): """Model for proxy terms Models proxy terms with a finite and (semi-)annually varying life time. Parameters ---------- proxy_times : (N,) array_like The times of the proxy values according to ``days_per_time_unit``. proxy_vals : (N,) array_like The proxy values at `proxy_times`. amp : float The amplitude of the proxy term. lag : float The lag of the proxy value (in days, see ``days_per_time_unit``). tau0 : float The base life time of the proxy (in days, see ``days_per_time_unit``). taucos1 : float The amplitude of the cosine part of the annual life time variation (in days, see ``days_per_time_unit``). tausin1 : float The amplitude of the sine part of the annual life time variation (in days, see ``days_per_time_unit``). taucos2 : float The amplitude of the cosine part of the semi-annual life time variation (in days, see ``days_per_time_unit``). tausin2 : float The amplitude of the sine part of the semi-annual life time variation (in days, see ``days_per_time_unit``). ltscan : float The number of days to sum the previous proxy values. If it is negative, the value will be set to three times the maximal lifetime. No lifetime adjustemets are calculated when set to zero. center : bool, optional Centers the proxy values by subtracting the overall mean. The mean is calculated from the whole `proxy_vals` array and is stored in the `mean` attribute. Default: False phi_intp : scipy.interpolate.interp1d() instance, optional When not `None`, the interpolated angle `phi` (e.g. SZA) and cos(phi) and sin(phi) are used to model the variation of the lifetime instead of the time. Semi-annual variations are not used in this case. Default: None fit_phase : bool, optional Fit the phase shift directly instead of using sine and cosine terms for the (semi-)annual lifetime variations. If True, the fitted cosine parameter is the amplitude and the sine parameter the phase. Default: False (= fit sine and cosine terms) lifetime_prior : str, optional The prior probability density for each coefficient of the lifetime. Possible types are "flat" or `None` for a flat prior, "exp" for an exponential density ~ :math:`\\text{exp}(-|\\tau| / \\text{metric})`, and "normal" for a normal distribution ~ :math:`\\text{exp}(-\\tau^2 / (2 * \\text{metric}^2))`. Default: None (= flat prior). lifetime_metric : float, optional The metric (scale) of the lifetime priors in days, see `prior`. Default 1. days_per_time_unit : float, optional The number of days per time unit, used to normalize the lifetime units. Use 365.25 if the times are in fractional years, or 1 if they are in days. Default: 365.25 """ parameter_names = ( "amp", "lag", "tau0", "taucos1", "tausin1", "taucos2", "tausin2", "ltscan", ) def __init__( self, proxy_times, proxy_vals, center=False, phi_intp=None, fit_phase=False, lifetime_prior=None, lifetime_metric=1., days_per_time_unit=365.25, *args, **kwargs ): self.mean = 0. if center: self.mean = np.nanmean(proxy_vals) self.times = proxy_times self.dt = 1. self.values = proxy_vals - self.mean self.phi_intp = phi_intp self.fit_phase = fit_phase self.days_per_time_unit = days_per_time_unit self.omega = 2 * np.pi * days_per_time_unit / 365.25 self.lifetime_prior = lifetime_prior self.lifetime_metric = lifetime_metric # Makes "(m)jd" and "jyear" compatible for the lifetime # seasonal variation. The julian epoch (the default) # is slightly offset with respect to (modified) julian days. self.t_adj = 0. if self.days_per_time_unit == 1: # discriminate between julian days and modified julian days, # 1.8e6 is year 216 in julian days and year 6787 in # modified julian days. It should be pretty safe to judge on # that for most use cases. if self.times[0] > 1.8e6: # julian days self.t_adj = 13. else: # modified julian days self.t_adj = -44.25 super(ProxyModel, self).__init__(*args, **kwargs) def _lt_corr(self, t, tau, tmax=60.): """Lifetime corrected values Corrects for a finite lifetime by summing over the last `tmax` days with an exponential decay given of lifetime(s) `taus`. """ bs = np.arange(self.dt, tmax + self.dt, self.dt) yp = np.zeros_like(t) tauexp = np.exp(-self.dt / tau) taufac = np.ones_like(tau) for b in bs: taufac *= tauexp yp += taufac * np.interp( t - (self.lag + b) / self.days_per_time_unit, self.times, self.values, left=0., right=0., ) return yp * self.dt def _lt_corr_grad(self, t, tau, tmax=60.): """Lifetime corrected gradient Corrects for a finite lifetime by summing over the last `tmax` days with an exponential decay given of lifetime(s) `taus`. """ bs = np.arange(self.dt, tmax + self.dt, self.dt) ypg = np.zeros_like(t) tauexp = np.exp(-self.dt / tau) taufac = np.ones_like(tau) for b in bs: taufac *= tauexp ypg += b * taufac * np.interp( t - (self.lag + b) / self.days_per_time_unit, self.times, self.values, left=0., right=0., ) return ypg * self.dt / tau**2
[docs] def get_value(self, t): t = np.atleast_1d(t) proxy_val = np.interp( t - self.lag / self.days_per_time_unit, self.times, self.values, left=0., right=0., ) if self.ltscan == 0: # no lifetime, nothing else to do return self.amp * proxy_val # annual variation of the proxy lifetime if self.phi_intp is not None: # using the angle tau_cs = ( self.taucos1 * np.cos(np.radians(self.phi_intp(t))) + self.tausin1 * np.sin(np.radians(self.phi_intp(t))) ) elif self.fit_phase: # using time (cos) and phase (sin) tau_cs = ( self.taucos1 * np.cos(1 * self.omega * (t + self.t_adj) + self.tausin1) + self.taucos2 * np.cos(2 * self.omega * (t + self.t_adj) + self.tausin2) ) else: # using time tau_cs = ( self.taucos1 * np.cos(1 * self.omega * (t + self.t_adj)) + self.tausin1 * np.sin(1 * self.omega * (t + self.t_adj)) + self.taucos2 * np.cos(2 * self.omega * (t + self.t_adj)) + self.tausin2 * np.sin(2 * self.omega * (t + self.t_adj)) ) tau_cs = np.maximum(0., tau_cs) # clip to zero tau = self.tau0 + tau_cs if self.ltscan > 0: _ltscn = int(np.floor(self.ltscan)) else: # infer the scan time from the maximal lifetime _ltscn = 3 * int( np.ceil(self.tau0 + np.sqrt(self.taucos1**2 + self.tausin1**2)) ) if np.all(tau > 0): proxy_val += self._lt_corr(t, tau, tmax=_ltscn) return self.amp * proxy_val
[docs] def compute_gradient(self, t): t = np.atleast_1d(t) proxy_val = np.interp( t - self.lag / self.days_per_time_unit, self.times, self.values, left=0., right=0., ) proxy_val_grad0 = proxy_val.copy() # annual variation of the proxy lifetime if self.phi_intp is not None: # using the solar zenith angle dtau_cos1 = np.cos(np.radians(self.phi_intp(t))) dtau_sin1 = np.sin(np.radians(self.phi_intp(t))) dtau_cos2 = np.zeros_like(t) dtau_sin2 = np.zeros_like(t) tau_cs = self.taucos1 * dtau_cos1 + self.tausin1 * dtau_sin1 elif self.fit_phase: # using time (cos) and phase (sin) dtau_cos1 = np.cos(1 * self.omega * (t + self.t_adj) + self.tausin1) dtau_sin1 = -self.taucos1 * np.sin(1 * self.omega * t + self.tausin1) dtau_cos2 = np.cos(2 * self.omega * (t + self.t_adj) + self.tausin2) dtau_sin2 = -self.taucos2 * np.sin(2 * self.omega * t + self.tausin2) tau_cs = self.taucos1 * dtau_cos1 + self.taucos2 * dtau_cos2 else: # using time dtau_cos1 = np.cos(1 * self.omega * (t + self.t_adj)) dtau_sin1 = np.sin(1 * self.omega * (t + self.t_adj)) dtau_cos2 = np.cos(2 * self.omega * (t + self.t_adj)) dtau_sin2 = np.sin(2 * self.omega * (t + self.t_adj)) tau_cs = ( self.taucos1 * dtau_cos1 + self.tausin1 * dtau_sin1 + self.taucos2 * dtau_cos2 + self.tausin2 * dtau_sin2 ) tau_cs = np.maximum(0., tau_cs) # clip to zero tau = self.tau0 + tau_cs if self.ltscan > 0: _ltscn = int(np.floor(self.ltscan)) else: # infer the scan time from the maximal lifetime _ltscn = 3 * int( np.ceil(self.tau0 + np.sqrt(self.taucos1**2 + self.tausin1**2)) ) if np.all(tau > 0): proxy_val += self._lt_corr(t, tau, tmax=_ltscn) proxy_val_grad0 += self._lt_corr_grad(t, tau, tmax=_ltscn) return np.array([proxy_val, # set the gradient wrt lag to zero for now np.zeros_like(t), self.amp * proxy_val_grad0, self.amp * proxy_val_grad0 * dtau_cos1, self.amp * proxy_val_grad0 * dtau_sin1, self.amp * proxy_val_grad0 * dtau_cos2, self.amp * proxy_val_grad0 * dtau_sin2, # set the gradient wrt lifetime scan to zero for now np.zeros_like(t)])
def _log_prior_normal(self): l_prior = super(ProxyModel, self).log_prior() if not np.isfinite(l_prior): return -np.inf for n, p in self.get_parameter_dict().items(): if n.startswith("tau"): # Gaussian prior for the lifetimes l_prior -= 0.5 * (p / self.lifetime_metric)**2 return l_prior def _log_prior_exp(self): l_prior = super(ProxyModel, self).log_prior() if not np.isfinite(l_prior): return -np.inf for n, p in self.get_parameter_dict().items(): if n.startswith("tau"): # exponential prior for the lifetimes l_prior -= np.abs(p / self.lifetime_metric) return l_prior
[docs] def log_prior(self): _priors = { "exp": self._log_prior_exp, "normal": self._log_prior_normal, } if self.lifetime_prior is None or self.lifetime_prior == "flat": return super(ProxyModel, self).log_prior() return _priors[self.lifetime_prior]()
[docs]class ProxyModelSet(ModelSet): """Combined model class for, e.g. trace gases (and probably other data) Inherited from :class:`celerite.ModelSet`, provides `get_value()` and `compute_gradient()` methods. """
[docs] def get_value(self, t): t = np.atleast_1d(t) v = np.zeros_like(t) for m in self.models.values(): v += m.get_value(t) return v
[docs] def compute_gradient(self, t): t = np.atleast_1d(t) grad = [] for m in self.models.values(): grad.extend(list(m.compute_gradient(t))) return np.array(grad)
[docs]def setup_proxy_model_with_bounds( times, values, max_amp=1e10, max_days=100, **kwargs ): # extract setup from `kwargs` center = kwargs.get("center", False) fit_phase = kwargs.get("fit_phase", False) lag = kwargs.get("lag", 0.) lt_metric = kwargs.get("lifetime_metric", 1) lt_prior = kwargs.get("lifetime_prior", "exp") lt_scan = kwargs.get("lifetime_scan", 60) positive = kwargs.get("positive", False) phi_intp = kwargs.get("phi_intp", None) time_format = kwargs.get("time_format", "jyear") days_per_time_unit = kwargs.get( "days_per_time_unit", 1. if time_format.endswith("d") else 365.25 ) return ProxyModel( times, values, center=center, phi_intp=phi_intp, fit_phase=fit_phase, lifetime_prior=lt_prior, lifetime_metric=lt_metric, days_per_time_unit=days_per_time_unit, amp=0., lag=lag, tau0=0, taucos1=0, tausin1=0, taucos2=0, tausin2=0, ltscan=lt_scan, bounds={ "amp": [0, max_amp] if positive else [-max_amp, max_amp], "lag": [0, max_days], "tau0": [0, max_days], "taucos1": [0, max_days] if fit_phase else [-max_days, max_days], "tausin1": [-np.pi, np.pi] if fit_phase else [-max_days, max_days], # semi-annual cycles for the life time "taucos2": [0, max_days] if fit_phase else [-max_days, max_days], "tausin2": [-np.pi, np.pi] if fit_phase else [-max_days, max_days], "ltscan": [0, 200], } )
[docs]def proxy_model_set(constant=True, freqs=None, proxy_config=None, **kwargs): """Model set including proxies and harmonics Sets up a proxy model for easy access. All parameters are optional, defaults to an offset, no harmonics, proxies uncentered and unscaled. Parameters ---------- constant : bool, optional Whether or not to include a constant (offset) term, default is True. freqs : list, optional Frequencies of the harmonic terms in 1 / a^-1 (inverse years). proxy_config : dict, optional Proxy configuration if different from the standard setup. **kwargs : optional Additional keyword arguments, all of them are also passed on to the proxy setup. For now, supported are the following which are also passed along to the proxy setup with `setup_proxy_model_with_bounds()`: * fit_phase : bool fit amplitude and phase instead of sine and cosine * scale : float the factor by which the data is scaled, used to constrain the maximum and minimum amplitudes to be fitted. * time_format : string The `astropy.time.Time` format string to setup the time axis. * days_per_time_unit : float The number of days per time unit, used to normalize the frequencies for the harmonic terms. Use 365.25 if the times are in fractional years, 1 if they are in days. Default: 365.25 * max_amp : float Maximum magnitude of the coefficients, used to constrain the parameter search. * max_days : float Maximum magnitude of the lifetimes, used to constrain the parameter search. Returns ------- model : :class:`ProxyModelSet` (extends :class:`celerite.ModelSet`) """ fit_phase = kwargs.get("fit_phase", False) scale = kwargs.get("scale", 1e-6) delta_t = kwargs.get("days_per_time_unit", 365.25) max_amp = kwargs.pop("max_amp", 1e10 * scale) max_days = kwargs.pop("max_days", 100) offset_model = [] if constant: offset_model = [ ( "offset", ConstantModel( value=0., bounds={"value": [-max_amp, max_amp]} ) ) ] freqs = freqs or [] harmonic_models = [] for freq in freqs: if not fit_phase: harm = HarmonicModelCosineSine( freq=freq * delta_t / 365.25, cos=0, sin=0, bounds={ "cos": [-max_amp, max_amp], "sin": [-max_amp, max_amp], } ) else: harm = HarmonicModelAmpPhase( freq=freq * delta_t / 365.25, amp=0, phase=0, bounds={ "amp": [0, max_amp], "phase": [-np.pi, np.pi], } ) harm.freeze_parameter("freq") harmonic_models.append(("f{0:.0f}".format(freq), harm)) proxy_config = proxy_config or {} proxy_models = [] for pn, conf in proxy_config.items(): if "max_amp" not in conf: conf.update(dict(max_amp=max_amp)) if "max_days" not in conf: conf.update(dict(max_days=max_days)) kw = kwargs.copy() # don't mess with the passed arguments kw.update(conf) proxy_models.append( (pn, setup_proxy_model_with_bounds(**kw)) ) return ProxyModelSet(offset_model + harmonic_models + proxy_models)