# -*- coding: utf-8 -*-
from __future__ import division, print_function
import numpy as np
from itertools import chain
from collections import OrderedDict
__all__ = ["Model", "ModelSet", "ConstantModel"]
class Model(object):
"""
An abstract class implementing the skeleton of the modeling protocol
Initial parameter values can either be provided as arguments in the same
order as ``parameter_names`` or by name as keyword arguments.
A minimal subclass of this would have the form:
.. code-block:: python
class CustomModel(Model):
parameter_names = ("parameter_1", "parameter_2")
def get_value(self, x):
return self.parameter_1 + self.parameter_2 + x
model = CustomModel(parameter_1=1.0, parameter_2=2.0)
# or...
model = CustomModel(1.0, 2.0)
Args:
bounds (Optional[list or dict]): Bounds can be given for each
parameter setting their minimum and maximum allowed values.
This parameter can either be a ``list`` (with length
``full_size``) or a ``dict`` with named parameters. Any parameters
that are omitted from the ``dict`` will be assumed to have no
bounds. These bounds can be retrieved later using the
:func:`celerite.Model.get_parameter_bounds` method and, by
default, they are used in the :func:`celerite.Model.log_prior`
method.
"""
parameter_names = tuple()
def __init__(self, *args, **kwargs):
self.unfrozen_mask = np.ones(self.full_size, dtype=bool)
self.dirty = True
# Deal with bounds
self.parameter_bounds = []
bounds = kwargs.pop("bounds", dict())
try:
# Try to treat 'bounds' as a dictionary
for name in self.parameter_names:
self.parameter_bounds.append(bounds.get(name, (None, None)))
except AttributeError:
# 'bounds' isn't a dictionary - it had better be a list
self.parameter_bounds = list(bounds)
if len(self.parameter_bounds) != self.full_size:
raise ValueError("the number of bounds must equal the number of "
"parameters")
if any(len(b) != 2 for b in self.parameter_bounds):
raise ValueError("the bounds for each parameter must have the "
"format: '(min, max)'")
# Parameter values can be specified as arguments or keywords
if len(args):
if len(args) != self.full_size:
raise ValueError("expected {0} arguments but got {1}"
.format(self.full_size, len(args)))
if len(kwargs):
raise ValueError("parameters must be fully specified by "
"arguments or keyword arguments, not both")
self.parameter_vector = args
else:
# Loop over the kwargs and set the parameter values
params = []
for k in self.parameter_names:
v = kwargs.pop(k, None)
if v is None:
raise ValueError("missing parameter '{0}'".format(k))
params.append(v)
self.parameter_vector = params
if len(kwargs):
raise ValueError("unrecognized parameter(s) '{0}'"
.format(list(kwargs.keys())))
# Check the initial prior value
quiet = kwargs.get("quiet", False)
if not quiet and not np.isfinite(self.log_prior()):
raise ValueError("non-finite log prior value")
def get_value(self, *args, **kwargs):
"""
Compute the "value" of the model for the current parameters
This method should be overloaded by subclasses to implement the actual
functionality of the model.
"""
raise NotImplementedError("overloaded by subclasses")
def compute_gradient(self, *args, **kwargs):
"""
Compute the "gradient" of the model for the current parameters
This method should be overloaded by subclasses to implement the actual
functionality of the model. The output of this function should be an
array where the first dimension is ``full_size``.
"""
raise NotImplementedError("overloaded by subclasses")
def get_gradient(self, *args, **kwargs):
include_frozen = kwargs.pop("include_frozen", False)
g = self.compute_gradient(*args, **kwargs)
if include_frozen:
return g
return g[self.unfrozen_mask]
def __len__(self):
return self.vector_size
def _get_name(self, name_or_index):
try:
int(name_or_index)
except TypeError:
return name_or_index
return self.get_parameter_names()[int(name_or_index)]
def __getitem__(self, name_or_index):
return self.get_parameter(self._get_name(name_or_index))
def __setitem__(self, name_or_index, value):
return self.set_parameter(self._get_name(name_or_index), value)
@property
def full_size(self):
"""The total number of parameters (including frozen parameters)"""
return len(self.parameter_names)
@property
def vector_size(self):
"""The number of active (or unfrozen) parameters"""
return self.unfrozen_mask.sum()
@property
def parameter_vector(self):
"""An array of all parameters (including frozen parameters)"""
return np.array([getattr(self, k) for k in self.parameter_names])
@parameter_vector.setter
def parameter_vector(self, v):
if len(v) != self.full_size:
raise ValueError("dimension mismatch")
for k, val in zip(self.parameter_names, v):
setattr(self, k, float(val))
self.dirty = True
def get_parameter_dict(self, include_frozen=False):
"""
Get an ordered dictionary of the parameters
Args:
include_frozen (Optional[bool]): Should the frozen parameters be
included in the returned value? (default: ``False``)
"""
return OrderedDict(zip(
self.get_parameter_names(include_frozen=include_frozen),
self.get_parameter_vector(include_frozen=include_frozen),
))
def get_parameter_names(self, include_frozen=False):
"""
Get a list of the parameter names
Args:
include_frozen (Optional[bool]): Should the frozen parameters be
included in the returned value? (default: ``False``)
"""
if include_frozen:
return self.parameter_names
return tuple(p
for p, f in zip(self.parameter_names, self.unfrozen_mask)
if f)
def get_parameter_bounds(self, include_frozen=False):
"""
Get a list of the parameter bounds
Args:
include_frozen (Optional[bool]): Should the frozen parameters be
included in the returned value? (default: ``False``)
"""
if include_frozen:
return self.parameter_bounds
return list(p
for p, f in zip(self.parameter_bounds, self.unfrozen_mask)
if f)
def get_parameter_vector(self, include_frozen=False):
"""
Get an array of the parameter values in the correct order
Args:
include_frozen (Optional[bool]): Should the frozen parameters be
included in the returned value? (default: ``False``)
"""
if include_frozen:
return self.parameter_vector
return self.parameter_vector[self.unfrozen_mask]
def set_parameter_vector(self, vector, include_frozen=False):
"""
Set the parameter values to the given vector
Args:
vector (array[vector_size] or array[full_size]): The target
parameter vector. This must be in the same order as
``parameter_names`` and it should only include frozen
parameters if ``include_frozen`` is ``True``.
include_frozen (Optional[bool]): Should the frozen parameters be
included in the returned value? (default: ``False``)
"""
v = self.parameter_vector
if include_frozen:
v[:] = vector
else:
v[self.unfrozen_mask] = vector
self.parameter_vector = v
self.dirty = True
def freeze_parameter(self, name):
"""
Freeze a parameter by name
Args:
name: The name of the parameter
"""
i = self.get_parameter_names(include_frozen=True).index(name)
self.unfrozen_mask[i] = False
def thaw_parameter(self, name):
"""
Thaw a parameter by name
Args:
name: The name of the parameter
"""
i = self.get_parameter_names(include_frozen=True).index(name)
self.unfrozen_mask[i] = True
def freeze_all_parameters(self):
"""Freeze all parameters of the model"""
self.unfrozen_mask[:] = False
def thaw_all_parameters(self):
"""Thaw all parameters of the model"""
self.unfrozen_mask[:] = True
def get_parameter(self, name):
"""
Get a parameter value by name
Args:
name: The name of the parameter
"""
i = self.get_parameter_names(include_frozen=True).index(name)
return self.get_parameter_vector(include_frozen=True)[i]
def set_parameter(self, name, value):
"""
Set a parameter value by name
Args:
name: The name of the parameter
value (float): The new value for the parameter
"""
i = self.get_parameter_names(include_frozen=True).index(name)
v = self.get_parameter_vector(include_frozen=True)
v[i] = value
self.set_parameter_vector(v, include_frozen=True)
def log_prior(self):
"""Compute the log prior probability of the current parameters"""
for p, b in zip(self.parameter_vector, self.parameter_bounds):
if b[0] is not None and p < b[0]:
return -np.inf
if b[1] is not None and p > b[1]:
return -np.inf
return 0.0
class ModelSet(Model):
"""
An abstract wrapper for combining named :class:`Model` objects
The parameter names of a composite model are prepended by the submodel
name. For example:
.. code-block:: python
model = ModelSet([
("model1", Model(par1=1.0, par2=2.0)),
("model2", Model(par3=3.0, par4=4.0)),
])
print(model.get_parameter_names())
will print
.. code-block:: python
["model1:par1", "model1:par2", "model2:par3", "model2:par4"]
Args:
models: This should be a list of the form: ``[("model1", Model(...)),
("model2", Model(...)), ...]``.
"""
def __init__(self, models):
self.models = OrderedDict()
for name, model in models:
self.models[name] = model
def __getattr__(self, name):
if "models" in self.__dict__ and name in self.models:
return self.models[name]
raise AttributeError(name)
@property
def dirty(self):
return any(m.dirty for m in self.models.values())
@dirty.setter
def dirty(self, value):
for m in self.models.values():
m.dirty = value
@property
def full_size(self):
return sum(m.full_size for m in self.models.values())
@property
def vector_size(self):
return sum(m.vector_size for m in self.models.values())
@property
def unfrozen_mask(self):
return np.concatenate([
m.unfrozen_mask for m in self.models.values()
])
@property
def parameter_vector(self):
return np.concatenate([
m.parameter_vector for m in self.models.values()
])
@parameter_vector.setter
def parameter_vector(self, v):
i = 0
for m in self.models.values():
l = m.full_size
m.parameter_vector = v[i:i+l]
i += l
@property
def parameter_names(self):
return tuple(chain(*(
map("{0}:{{0}}".format(name).format, m.parameter_names)
for name, m in self.models.items()
)))
@property
def parameter_bounds(self):
return list(chain(*(
m.parameter_bounds for m in self.models.values()
)))
def _apply_to_parameter(self, func, name, *args):
comp = name.split(":")
if comp[0] not in self.models:
raise ValueError("unrecognized parameter '{0}'".format(name))
return getattr(self.models[comp[0]], func)(":".join(comp[1:]), *args)
def freeze_parameter(self, name):
self._apply_to_parameter("freeze_parameter", name)
def thaw_parameter(self, name):
self._apply_to_parameter("thaw_parameter", name)
def freeze_all_parameters(self):
for model in self.models.values():
model.freeze_all_parameters()
def thaw_all_parameters(self):
for model in self.models.values():
model.thaw_all_parameters()
def get_parameter(self, name):
return self._apply_to_parameter("get_parameter", name)
def set_parameter(self, name, value):
self.dirty = True
return self._apply_to_parameter("set_parameter", name, value)
def log_prior(self):
lp = 0.0
for model in self.models.values():
lp += model.log_prior()
if not np.isfinite(lp):
return -np.inf
return lp
[docs]class ConstantModel(Model):
"""
A simple concrete model with a single parameter ``value``
Args:
value (float): The value of the model.
"""
parameter_names = ("value", )
[docs] def get_value(self, x):
return self.value + np.zeros_like(x)
[docs] def compute_gradient(self, x):
return np.array([np.ones_like(x)])