# Copyright 2021 The Trieste Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains acquisition function builders, which build and define our acquisition
functions --- functions that estimate the utility of evaluating sets of candidate points.
"""
from __future__ import annotations
from typing import Callable, Mapping, Optional, cast
import tensorflow as tf
import tensorflow_probability as tfp
from check_shapes import check_shapes
from ...data import Dataset
from ...models import ProbabilisticModel, ReparametrizationSampler
from ...models.interfaces import (
HasReparamSampler,
SupportsGetObservationNoise,
SupportsReparamSamplerObservationNoise,
)
from ...space import SearchSpace
from ...types import Tag, TensorType
from ...utils import DEFAULTS
from ..interface import (
AcquisitionFunction,
AcquisitionFunctionBuilder,
AcquisitionFunctionClass,
ProbabilisticModelType,
SingleModelAcquisitionBuilder,
SingleModelVectorizedAcquisitionBuilder,
)
from .utils import MultivariateNormalCDF
[docs]
class ProbabilityOfImprovement(SingleModelAcquisitionBuilder[ProbabilisticModel]):
"""
Builder for the probability of improvement function, where the "best" value
is taken to be the minimum of the posterior mean at observed points.
"""
[docs]
def __repr__(self) -> str:
""""""
return "ProbabilityOfImprovement()"
[docs]
def prepare_acquisition_function(
self, model: ProbabilisticModel, dataset: Optional[Dataset] = None
) -> AcquisitionFunction:
"""
:param model: The model.
:param dataset: The data from the observer. Must be populated.
:return: The probability of improvement function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one.
:raise tf.errors.InvalidArgumentError: If ``dataset`` is empty.
"""
tf.debugging.Assert(dataset is not None, [])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
mean, _ = model.predict(dataset.query_points)
eta = tf.reduce_min(mean, axis=0)[0]
return probability_below_threshold(model, eta)
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param model: The model.
:param dataset: The data from the observer. Must be populated.
"""
tf.debugging.Assert(dataset is not None, [])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
tf.debugging.Assert(isinstance(function, probability_below_threshold), [tf.constant([])])
mean, _ = model.predict(dataset.query_points)
eta = tf.reduce_min(mean, axis=0)[0]
function.update(eta) # type: ignore
return function
[docs]
class ExpectedImprovement(SingleModelAcquisitionBuilder[ProbabilisticModel]):
"""
Builder for the expected improvement function where the "best" value is taken to be the minimum
of the posterior mean at observed points.
In the presence of constraints in the search_space the "best" value is computed only at the
feasible query points. If there are no feasible points, the "best" value is instead taken to be
the maximum of the posterior mean at all observed points.
"""
def __init__(self, search_space: Optional[SearchSpace] = None):
"""
:param search_space: The global search space over which the optimisation is defined. This is
only used to determine explicit constraints.
"""
self._search_space = search_space
[docs]
def __repr__(self) -> str:
""""""
return f"ExpectedImprovement({self._search_space!r})"
[docs]
def prepare_acquisition_function(
self,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param model: The model.
:param dataset: The data from the observer. Must be populated.
:return: The expected improvement function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one.
:raise tf.errors.InvalidArgumentError: If ``dataset`` is empty.
"""
tf.debugging.Assert(dataset is not None, [tf.constant([])])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
# Check feasibility against any explicit constraints in the search space.
if self._search_space is not None and self._search_space.has_constraints:
is_feasible = self._search_space.is_feasible(dataset.query_points)
if not tf.reduce_any(is_feasible):
query_points = dataset.query_points
else:
query_points = tf.boolean_mask(dataset.query_points, is_feasible)
else:
is_feasible = tf.constant([True], dtype=bool)
query_points = dataset.query_points
mean, _ = model.predict(query_points)
if not tf.reduce_any(is_feasible):
eta = tf.reduce_max(mean, axis=0)
else:
eta = tf.reduce_min(mean, axis=0)
return expected_improvement(model, eta)
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param model: The model.
:param dataset: The data from the observer. Must be populated.
"""
tf.debugging.Assert(dataset is not None, [tf.constant([])])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
tf.debugging.Assert(isinstance(function, expected_improvement), [tf.constant([])])
# Check feasibility against any explicit constraints in the search space.
if self._search_space is not None and self._search_space.has_constraints:
is_feasible = self._search_space.is_feasible(dataset.query_points)
if not tf.reduce_any(is_feasible):
query_points = dataset.query_points
else:
query_points = tf.boolean_mask(dataset.query_points, is_feasible)
else:
is_feasible = tf.constant([True], dtype=bool)
query_points = dataset.query_points
mean, _ = model.predict(query_points)
if not tf.reduce_any(is_feasible):
eta = tf.reduce_max(mean, axis=0)
else:
eta = tf.reduce_min(mean, axis=0)
function.update(eta) # type: ignore
return function
[docs]
class expected_improvement(AcquisitionFunctionClass):
def __init__(self, model: ProbabilisticModel, eta: TensorType):
r"""
Return the Expected Improvement (EI) acquisition function for single-objective global
optimization. Improvement is with respect to the current "best" observation ``eta``, where
an improvement moves towards the objective function's minimum and the expectation is
calculated with respect to the ``model`` posterior. For model posterior :math:`f`, this is
.. math:: x \mapsto \mathbb E \left[ \max (\eta - f(x), 0) \right]
This function was introduced by Mockus et al, 1975. See :cite:`Jones:1998` for details.
:param model: The model of the objective function.
:param eta: The "best" observation.
:return: The expected improvement function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one.
"""
self._model = model
self._eta = tf.Variable(eta)
[docs]
def update(self, eta: TensorType) -> None:
"""Update the acquisition function with a new eta value."""
self._eta.assign(eta)
@tf.function
@check_shapes(
"x: [N..., 1, D] # This acquisition function only supports batch sizes of one",
"return: [N..., L]",
)
[docs]
def __call__(self, x: TensorType) -> TensorType:
mean, variance = self._model.predict(tf.squeeze(x, -2))
normal = tfp.distributions.Normal(mean, tf.sqrt(variance))
return (self._eta - mean) * normal.cdf(self._eta) + variance * normal.prob(self._eta)
[docs]
class AugmentedExpectedImprovement(SingleModelAcquisitionBuilder[SupportsGetObservationNoise]):
"""
Builder for the augmented expected improvement function for optimization single-objective
optimization problems with high levels of observation noise.
"""
[docs]
def __repr__(self) -> str:
""""""
return "AugmentedExpectedImprovement()"
[docs]
def prepare_acquisition_function(
self,
model: SupportsGetObservationNoise,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param model: The model.
:param dataset: The data from the observer. Must be populated.
:return: The expected improvement function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one.
:raise tf.errors.InvalidArgumentError: If ``dataset`` is empty.
"""
if not isinstance(model, SupportsGetObservationNoise):
raise NotImplementedError(
f"AugmentedExpectedImprovement only works with models that support "
f"get_observation_noise; received {model!r}"
)
tf.debugging.Assert(dataset is not None, [tf.constant([])])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
mean, _ = model.predict(dataset.query_points)
eta = tf.reduce_min(mean, axis=0)
return augmented_expected_improvement(model, eta)
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
model: SupportsGetObservationNoise,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param model: The model.
:param dataset: The data from the observer. Must be populated.
"""
tf.debugging.Assert(dataset is not None, [tf.constant([])])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
tf.debugging.Assert(isinstance(function, augmented_expected_improvement), [tf.constant([])])
mean, _ = model.predict(dataset.query_points)
eta = tf.reduce_min(mean, axis=0)
function.update(eta) # type: ignore
return function
[docs]
class augmented_expected_improvement(AcquisitionFunctionClass):
def __init__(self, model: SupportsGetObservationNoise, eta: TensorType):
r"""
Return the Augmented Expected Improvement (AEI) acquisition function for single-objective
global optimization under homoscedastic observation noise.
Improvement is with respect to the current "best" observation ``eta``, where an
improvement moves towards the objective function's minimum and the expectation is calculated
with respect to the ``model`` posterior. In contrast to standard EI, AEI has an additional
multiplicative factor that penalizes evaluations made in areas of the space with very small
posterior predictive variance. Thus, when applying standard EI to noisy optimisation
problems, AEI avoids getting trapped and repeatedly querying the same point.
For model posterior :math:`f`, this is
.. math:: x \mapsto EI(x) * \left(1 - frac{\tau^2}{\sqrt{s^2(x)+\tau^2}}\right),
where :math:`s^2(x)` is the predictive variance and :math:`\tau` is observation noise.
This function was introduced by Huang et al, 2006. See :cite:`Huang:2006` for details.
:param model: The model of the objective function.
:param eta: The "best" observation.
:return: The expected improvement function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one or a model without homoscedastic observation noise.
"""
self._model = model
self._eta = tf.Variable(eta)
self._noise_variance = tf.Variable(model.get_observation_noise())
[docs]
def update(self, eta: TensorType) -> None:
"""Update the acquisition function with a new eta value and noise variance."""
self._eta.assign(eta)
self._noise_variance.assign(self._model.get_observation_noise())
@tf.function
[docs]
def __call__(self, x: TensorType) -> TensorType:
tf.debugging.assert_shapes(
[(x, [..., 1, None])],
message="This acquisition function only supports batch sizes of one.",
)
mean, variance = self._model.predict(tf.squeeze(x, -2))
normal = tfp.distributions.Normal(mean, tf.sqrt(variance))
ei = (self._eta - mean) * normal.cdf(self._eta) + variance * normal.prob(self._eta)
augmentation = 1 - (tf.math.sqrt(self._noise_variance)) / (
tf.math.sqrt(self._noise_variance + variance)
)
return ei * augmentation
[docs]
class NegativeLowerConfidenceBound(SingleModelAcquisitionBuilder[ProbabilisticModel]):
"""
Builder for the negative of the lower confidence bound. The lower confidence bound is typically
minimised, so the negative is suitable for maximisation.
"""
def __init__(self, beta: float = 1.96):
"""
:param beta: Weighting given to the variance contribution to the lower confidence bound.
Must not be negative.
"""
self._beta = beta
[docs]
def __repr__(self) -> str:
""""""
return f"NegativeLowerConfidenceBound({self._beta!r})"
[docs]
def prepare_acquisition_function(
self,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param model: The model.
:param dataset: Unused.
:return: The negative lower confidence bound function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one.
:raise ValueError: If ``beta`` is negative.
"""
lcb = lower_confidence_bound(model, self._beta)
return tf.function(lambda at: -lcb(at))
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param model: The model.
:param dataset: Unused.
"""
return function # no need to update anything
[docs]
class NegativePredictiveMean(NegativeLowerConfidenceBound):
"""
Builder for the negative of the predictive mean. The predictive mean is minimised on minimising
the objective function. The negative predictive mean is therefore maximised.
"""
def __init__(self) -> None:
super().__init__(beta=0.0)
def __repr__(self) -> str:
""""""
return "NegativePredictiveMean()"
[docs]
def lower_confidence_bound(model: ProbabilisticModel, beta: float) -> AcquisitionFunction:
r"""
The lower confidence bound (LCB) acquisition function for single-objective global optimization.
.. math:: x^* \mapsto \mathbb{E} [f(x^*)|x, y] - \beta \sqrt{ \mathrm{Var}[f(x^*)|x, y] }
See :cite:`Srinivas:2010` for details.
:param model: The model of the objective function.
:param beta: The weight to give to the standard deviation contribution of the LCB. Must not be
negative.
:return: The lower confidence bound function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one.
:raise tf.errors.InvalidArgumentError: If ``beta`` is negative.
"""
tf.debugging.assert_non_negative(
beta, message="Standard deviation scaling parameter beta must not be negative"
)
@tf.function
def acquisition(x: TensorType) -> TensorType:
tf.debugging.assert_shapes(
[(x, [..., 1, None])],
message="This acquisition function only supports batch sizes of one.",
)
mean, variance = model.predict(tf.squeeze(x, -2))
return mean - beta * tf.sqrt(variance)
return acquisition
[docs]
class ProbabilityOfFeasibility(SingleModelAcquisitionBuilder[ProbabilisticModel]):
r"""
Uses the :func:`probability_below_threshold` function to build a
probability of feasiblity acquisition function, defined in :cite:`gardner14` as
.. math::
\int_{-\infty}^{\tau} p(c(\mathbf{x}) | \mathbf{x}, \mathcal{D}) \mathrm{d} c(\mathbf{x})
\qquad ,
where :math:`\tau` is a threshold. Values below the threshold are considered feasible by the
constraint function. See also :cite:`schonlau1998global` for details.
"""
def __init__(self, threshold: float | TensorType):
"""
:param threshold: The (scalar) probability of feasibility threshold.
:raise ValueError (or InvalidArgumentError): If ``threshold`` is not a scalar.
"""
tf.debugging.assert_scalar(threshold)
self._threshold = threshold
[docs]
def __repr__(self) -> str:
""""""
return f"ProbabilityOfFeasibility({self._threshold!r})"
@property
[docs]
def threshold(self) -> float | TensorType:
"""The probability of feasibility threshold."""
return self._threshold
[docs]
def prepare_acquisition_function(
self,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param model: The model.
:param dataset: Unused.
:return: The probability of feasibility function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one.
"""
return probability_below_threshold(model, self.threshold)
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param model: The model.
:param dataset: Unused.
"""
return function # no need to update anything
[docs]
class probability_below_threshold(AcquisitionFunctionClass):
def __init__(self, model: ProbabilisticModel, threshold: float | TensorType):
r"""
The probability of being below the threshold. This brings together commonality
between probability of improvement and probability of feasiblity.
Probability is is caculated with respect to the `model` posterior.
For model posterior :math:`f`, this is
.. math:: x \mapsto \mathbb P \left (f(x) < \eta)\right]
where :math:`\eta` is the threshold.
:param model: The model of the objective function.
:param threshold: The (scalar) probability of feasibility threshold.
:return: The probability of feasibility function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one.
:raise ValueError or tf.errors.InvalidArgumentError: If ``threshold`` is not a scalar.
"""
tf.debugging.assert_scalar(threshold)
self._model = model
self._threshold = tf.Variable(threshold)
@tf.function
[docs]
def __call__(self, x: TensorType) -> TensorType:
tf.debugging.assert_shapes(
[(x, [..., 1, None])],
message="This acquisition function only supports batch sizes of one.",
)
mean, var = self._model.predict(tf.squeeze(x, -2))
distr = tfp.distributions.Normal(mean, tf.sqrt(var))
return distr.cdf(tf.cast(self._threshold, x.dtype))
[docs]
def update(self, threshold: TensorType) -> None:
"""Update the acquisition function with a new threshold value."""
self._threshold.assign(threshold)
[docs]
class FastConstraintsFeasibility(SingleModelAcquisitionBuilder[ProbabilisticModel]):
"""
Builds a feasiblity acquisition function from the residuals of explicit constraints defined in
the search space.
"""
def __init__(
self,
search_space: SearchSpace,
smoothing_function: Optional[Callable[[TensorType], TensorType]] = None,
):
"""
:param search_space: The global search space over which the feasibility of the constraints
is defined.
:param smoothing_function: The smoothing function used for constraints residuals. The
default is CDF of the Normal distribution with a scale of `1e-3`.
:raise NotImplementedError: If the `search_space` does not have constraints.
"""
if not search_space.has_constraints:
raise NotImplementedError(
"FastConstraintsFeasibility requires constraints in the search space."
)
self._search_space = search_space
self._smoothing_function = smoothing_function
[docs]
def __repr__(self) -> str:
""""""
return f"FastConstraintsFeasibility({self._search_space!r}, {self._smoothing_function!r})"
[docs]
def prepare_acquisition_function(
self,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param model: Unused.
:param dataset: Unused.
:return: The function for feasibility of constraints.
"""
return fast_constraints_feasibility(self._search_space, self._smoothing_function)
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param model: Unused.
:param dataset: Unused.
:return: The function for feasibility of constraints.
"""
return function # No need to update anything.
[docs]
def fast_constraints_feasibility(
search_space: SearchSpace,
smoothing_function: Optional[Callable[[TensorType], TensorType]] = None,
) -> AcquisitionFunction:
"""
Returns a feasiblity acquisition function from the residuals of explicit constraints defined in
the search space.
:param search_space: The global search space over which the feasibility of the constraints
is defined.
:param smoothing_function: The smoothing function used for constraints residuals. The
default is CDF of the Normal distribution with a scale of `1e-3`.
:return: The function for feasibility of constraints.
:raise NotImplementedError: If the `search_space` does not have constraints.
"""
if not search_space.has_constraints:
raise NotImplementedError(
"fast_constraints_feasibility requires constraints in the search space."
)
@tf.function
def acquisition(x: TensorType) -> TensorType:
if smoothing_function is None:
_smoothing_function = tfp.distributions.Normal(
tf.constant(0.0, x.dtype), tf.constant(1e-3, x.dtype)
).cdf
else:
_smoothing_function = smoothing_function
residuals = search_space.constraints_residuals(x)
return tf.math.reduce_prod(_smoothing_function(residuals), axis=-1)
return acquisition
[docs]
class ExpectedConstrainedImprovement(AcquisitionFunctionBuilder[ProbabilisticModelType]):
"""
Builder for the *expected constrained improvement* acquisition function defined in
:cite:`gardner14`. The acquisition function computes the expected improvement from the best
feasible point, where feasible points are those that (probably) satisfy some constraint. Where
there are no feasible points, this builder simply builds the constraint function.
"""
def __init__(
self,
objective_tag: Tag,
constraint_builder: AcquisitionFunctionBuilder[ProbabilisticModelType],
min_feasibility_probability: float | TensorType = 0.5,
search_space: Optional[SearchSpace] = None,
):
"""
:param objective_tag: The tag for the objective data and model.
:param constraint_builder: The builder for the constraint function.
:param min_feasibility_probability: The minimum probability of feasibility for a
"best point" to be considered feasible.
:param search_space: The global search space over which the optimisation is defined. This is
only used to determine explicit constraints.
:raise ValueError (or tf.errors.InvalidArgumentError): If ``min_feasibility_probability``
is not a scalar in the unit interval :math:`[0, 1]`.
"""
tf.debugging.assert_scalar(min_feasibility_probability)
if isinstance(min_feasibility_probability, (int, float)):
tf.debugging.assert_greater_equal(float(min_feasibility_probability), 0.0)
tf.debugging.assert_less_equal(float(min_feasibility_probability), 1.0)
else:
dtype = min_feasibility_probability.dtype
tf.debugging.assert_greater_equal(min_feasibility_probability, tf.constant(0, dtype))
tf.debugging.assert_less_equal(min_feasibility_probability, tf.constant(1, dtype))
self._objective_tag = objective_tag
self._constraint_builder = constraint_builder
self._search_space = search_space
self._min_feasibility_probability = min_feasibility_probability
self._constraint_fn: Optional[AcquisitionFunction] = None
self._expected_improvement_fn: Optional[AcquisitionFunction] = None
self._constrained_improvement_fn: Optional[AcquisitionFunction] = None
[docs]
def __repr__(self) -> str:
""""""
return (
f"ExpectedConstrainedImprovement({self._objective_tag!r}, {self._constraint_builder!r},"
f" {self._min_feasibility_probability!r}, {self._search_space!r})"
)
[docs]
def prepare_acquisition_function(
self,
models: Mapping[Tag, ProbabilisticModelType],
datasets: Optional[Mapping[Tag, Dataset]] = None,
) -> AcquisitionFunction:
"""
:param models: The models over each tag.
:param datasets: The data from the observer.
:return: The expected constrained improvement acquisition function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one.
:raise KeyError: If `objective_tag` is not found in ``datasets`` and ``models``.
:raise tf.errors.InvalidArgumentError: If the objective data is empty.
"""
tf.debugging.Assert(datasets is not None, [tf.constant([])])
datasets = cast(Mapping[Tag, Dataset], datasets)
objective_model = models[self._objective_tag]
objective_dataset = datasets[self._objective_tag]
tf.debugging.assert_positive(
len(objective_dataset),
message="Expected improvement is defined with respect to existing points in the"
" objective data, but the objective data is empty.",
)
self._constraint_fn = self._constraint_builder.prepare_acquisition_function(
models, datasets=datasets
)
pof = self._constraint_fn(objective_dataset.query_points[:, None, ...])
is_feasible = tf.squeeze(pof >= self._min_feasibility_probability, axis=-1)
# Check feasibility against any explicit constraints in the search space.
if self._search_space is not None and self._search_space.has_constraints:
ss_is_feasible = self._search_space.is_feasible(objective_dataset.query_points)
is_feasible = tf.logical_and(is_feasible, ss_is_feasible)
if not tf.reduce_any(is_feasible):
return self._constraint_fn
feasible_query_points = tf.boolean_mask(objective_dataset.query_points, is_feasible)
feasible_mean, _ = objective_model.predict(feasible_query_points)
self._update_expected_improvement_fn(objective_model, feasible_mean)
@tf.function
def constrained_function(x: TensorType) -> TensorType:
return cast(AcquisitionFunction, self._expected_improvement_fn)(x) * cast(
AcquisitionFunction, self._constraint_fn
)(x)
self._constrained_improvement_fn = constrained_function
return constrained_function
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
models: Mapping[Tag, ProbabilisticModelType],
datasets: Optional[Mapping[Tag, Dataset]] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param models: The models for each tag.
:param datasets: The data from the observer.
"""
tf.debugging.Assert(datasets is not None, [tf.constant([])])
datasets = cast(Mapping[Tag, Dataset], datasets)
objective_model = models[self._objective_tag]
objective_dataset = datasets[self._objective_tag]
tf.debugging.assert_positive(
len(objective_dataset),
message="Expected improvement is defined with respect to existing points in the"
" objective data, but the objective data is empty.",
)
tf.debugging.Assert(self._constraint_fn is not None, [tf.constant([])])
constraint_fn = cast(AcquisitionFunction, self._constraint_fn)
self._constraint_builder.update_acquisition_function(
constraint_fn, models, datasets=datasets
)
pof = constraint_fn(objective_dataset.query_points[:, None, ...])
is_feasible = tf.squeeze(pof >= self._min_feasibility_probability, axis=-1)
# Check feasibility against any explicit constraints in the search space.
if self._search_space is not None and self._search_space.has_constraints:
ss_is_feasible = self._search_space.is_feasible(objective_dataset.query_points)
is_feasible = tf.logical_and(is_feasible, ss_is_feasible)
if not tf.reduce_any(is_feasible):
return constraint_fn
feasible_query_points = tf.boolean_mask(objective_dataset.query_points, is_feasible)
feasible_mean, _ = objective_model.predict(feasible_query_points)
self._update_expected_improvement_fn(objective_model, feasible_mean)
if self._constrained_improvement_fn is not None:
return self._constrained_improvement_fn
@tf.function
def constrained_function(x: TensorType) -> TensorType:
return cast(AcquisitionFunction, self._expected_improvement_fn)(x) * cast(
AcquisitionFunction, self._constraint_fn
)(x)
self._constrained_improvement_fn = constrained_function
return self._constrained_improvement_fn
[docs]
def _update_expected_improvement_fn(
self, objective_model: ProbabilisticModelType, feasible_mean: TensorType
) -> None:
"""
Set or update the unconstrained expected improvement function.
:param objective_model: The objective model.
:param feasible_mean: The mean of the feasible query points.
"""
eta = tf.reduce_min(feasible_mean, axis=0)
if self._expected_improvement_fn is None:
self._expected_improvement_fn = expected_improvement(objective_model, eta)
else:
tf.debugging.Assert(
isinstance(self._expected_improvement_fn, expected_improvement), [tf.constant([])]
)
self._expected_improvement_fn.update(eta) # type: ignore
[docs]
class MonteCarloExpectedImprovement(SingleModelAcquisitionBuilder[HasReparamSampler]):
"""
Builder for a Monte Carlo-based expected improvement function for use with a model without
analytical expected improvement (e.g. a deep GP). The "best" value is taken to be
the minimum of the posterior mean at observed points. See
:class:`monte_carlo_expected_improvement` for details.
"""
def __init__(self, sample_size: int, *, jitter: float = DEFAULTS.JITTER):
"""
:param sample_size: The number of samples for each batch of points.
:param jitter: The jitter for the reparametrization sampler.
:raise tf.errors.InvalidArgumentError: If ``sample_size`` is not positive, or ``jitter`` is
negative.
"""
tf.debugging.assert_positive(sample_size)
tf.debugging.assert_greater_equal(jitter, 0.0)
super().__init__()
self._sample_size = sample_size
self._jitter = jitter
[docs]
def __repr__(self) -> str:
""""""
return f"MonteCarloExpectedImprovement({self._sample_size!r}, jitter={self._jitter!r})"
[docs]
def prepare_acquisition_function(
self,
model: HasReparamSampler,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param model: The model over the specified ``dataset``. Must have output dimension [1].
:param dataset: The data from the observer. Cannot be empty.
:return: The estimated *expected improvement* acquisition function.
:raise ValueError (or InvalidArgumentError): If ``dataset`` is not populated, ``model``
does not have an output dimension of [1] or does not have a ``reparam_sample`` method.
"""
if not isinstance(model, HasReparamSampler):
raise ValueError(
f"MonteCarloExpectedImprovement only supports models with a reparam_sampler method;"
f"received {model!r}"
)
sampler = model.reparam_sampler(self._sample_size)
tf.debugging.Assert(dataset is not None, [tf.constant([])])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
samples_at_query_points = sampler.sample(
dataset.query_points[..., None, :], jitter=self._jitter
)
mean = tf.reduce_mean(samples_at_query_points, axis=-3, keepdims=True) # [N, 1, 1, L]
tf.debugging.assert_shapes(
[(mean, [..., 1])], message="Expected model with output dimension [1]."
)
eta = tf.squeeze(tf.reduce_min(mean, axis=0))
return monte_carlo_expected_improvement(sampler, eta)
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
model: HasReparamSampler,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param model: The model. Must have output dimension [1]. Unused here.
:param dataset: The data from the observer. Cannot be empty
"""
tf.debugging.Assert(dataset is not None, [tf.constant([])])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
tf.debugging.Assert(
isinstance(function, monte_carlo_expected_improvement), [tf.constant([])]
)
sampler = function._sampler # type: ignore
sampler.reset_sampler()
samples_at_query_points = sampler.sample(
dataset.query_points[..., None, :], jitter=self._jitter
)
mean = tf.reduce_mean(samples_at_query_points, axis=-3, keepdims=True)
tf.debugging.assert_shapes(
[(mean, [..., 1])], message="Expected model with output dimension [1]."
)
eta = tf.squeeze(tf.reduce_min(mean, axis=0))
function.update(eta) # type: ignore
return function
[docs]
class monte_carlo_expected_improvement(AcquisitionFunctionClass):
r"""
Return a Monte Carlo based Expected Improvement (EI) acquisition function for
single-objective global optimization. Improvement is with respect to the current "best"
observation ``eta``, where an improvement moves towards the objective function's minimum
and the expectation is calculated with respect to the ``model`` posterior. For model
posterior :math:`f`, this is
.. math:: x \mapsto \mathbb E \left[ \max (\eta - f(x), 0) \right].
For the Monte Carlo version, the expectation is calculated by samples that we save. See
:cite:`wilson2018maximizing` for details.
"""
def __init__(self, sampler: ReparametrizationSampler[HasReparamSampler], eta: TensorType):
r"""
:param sampler: The model sampler of the objective function.
:param eta: The "best" observation.
:return: The Monte Carlo expected improvement function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one.
"""
self._sampler = sampler
self._eta = tf.Variable(eta)
[docs]
def update(self, eta: TensorType) -> None:
"""Update the acquisition function with a new eta value."""
self._eta.assign(eta)
@tf.function
[docs]
def __call__(self, at: TensorType) -> TensorType:
tf.debugging.assert_shapes(
[(at, [..., 1, None])],
message="This acquisition function only supports batch sizes of one.",
)
samples = tf.squeeze(self._sampler.sample(at), axis=-1) # [..., S, 1]
improvement = tf.maximum(self._eta - samples, 0.0) # [..., S, 1]
return tf.reduce_mean(improvement, axis=-2) # [..., 1]
[docs]
class MonteCarloAugmentedExpectedImprovement(
SingleModelAcquisitionBuilder[SupportsReparamSamplerObservationNoise]
):
"""
Builder for a Monte Carlo-based augmented expected improvement function for use with a model
without analytical augmented expected improvement (e.g. a deep GP). The "best" value is taken to
be the minimum of the posterior mean at observed points. See
:class:`monte_carlo_augmented_expected_improvement` for details.
"""
def __init__(self, sample_size: int, *, jitter: float = DEFAULTS.JITTER):
"""
:param sample_size: The number of samples for each batch of points.
:param jitter: The jitter for the reparametrization sampler.
:raise tf.errors.InvalidArgumentError: If ``sample_size`` is not positive, or ``jitter`` is
negative.
"""
tf.debugging.assert_positive(sample_size)
tf.debugging.assert_greater_equal(jitter, 0.0)
super().__init__()
self._sample_size = sample_size
self._jitter = jitter
[docs]
def __repr__(self) -> str:
""""""
return (
f"MonteCarloAugmentedExpectedImprovement({self._sample_size!r}, "
f"jitter={self._jitter!r})"
)
[docs]
def prepare_acquisition_function(
self,
model: SupportsReparamSamplerObservationNoise,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param model: The model over the specified ``dataset``. Must have output dimension [1].
:param dataset: The data from the observer. Cannot be empty.
:return: The estimated *expected improvement* acquisition function.
:raise ValueError (or InvalidArgumentError): If ``dataset`` is not populated, ``model``
does not have an output dimension of [1], does not have a ``reparam_sample`` method, or
does not support observation noise.
"""
if not isinstance(model, SupportsReparamSamplerObservationNoise):
raise ValueError(
f"MonteCarloAugmentedExpectedImprovement only supports models with a "
f"reparam_sampler method and that support observation noise; received "
f"{model!r}."
)
sampler = model.reparam_sampler(self._sample_size)
tf.debugging.Assert(dataset is not None, [tf.constant([])])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
samples_at_query_points = sampler.sample(
dataset.query_points[..., None, :], jitter=self._jitter
)
mean = tf.reduce_mean(samples_at_query_points, axis=-3, keepdims=True) # [N, 1, 1, L]
tf.debugging.assert_shapes(
[(mean, [..., 1])], message="Expected model with output dimension [1]."
)
eta = tf.squeeze(tf.reduce_min(mean, axis=0))
return monte_carlo_augmented_expected_improvement(model, sampler, eta)
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
model: SupportsReparamSamplerObservationNoise,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param model: The model. Must have output dimension [1]. Unused here
:param dataset: The data from the observer. Cannot be empty.
"""
tf.debugging.Assert(dataset is not None, [tf.constant([])])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
tf.debugging.Assert(
isinstance(function, monte_carlo_augmented_expected_improvement), [tf.constant([])]
)
sampler = function._sampler # type: ignore
sampler.reset_sampler()
samples_at_query_points = sampler.sample(
dataset.query_points[..., None, :], jitter=self._jitter
)
mean = tf.reduce_mean(samples_at_query_points, axis=-3, keepdims=True) # [N, 1, 1, L]
tf.debugging.assert_shapes(
[(mean, [..., 1])], message="Expected model with output dimension [1]."
)
eta = tf.squeeze(tf.reduce_min(mean, axis=0))
function.update(eta) # type: ignore
return function
[docs]
class monte_carlo_augmented_expected_improvement(AcquisitionFunctionClass):
r"""
Return a Monte Carlo based Augmented Expected Improvement (AEI) acquisition function for
single-objective global optimization with high levels of observation noise. See
:cite:`wilson2018maximizing` for details on using the reparametrization trick for optimizing
acquisition functions and :cite:`Huang:2006`: for details of AEI.
"""
def __init__(
self,
model: SupportsReparamSamplerObservationNoise,
sampler: ReparametrizationSampler[SupportsReparamSamplerObservationNoise],
eta: TensorType,
):
r"""
:param model: The model of the objective function.
:param sampler: The model sampler of the objective function.
:param eta: The "best" observation.
:return: The Monte Carlo expected improvement function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one.
"""
self._model = model
self._sampler = sampler
self._eta = tf.Variable(eta)
self._noise_variance = tf.Variable(model.get_observation_noise())
[docs]
def update(self, eta: TensorType) -> None:
"""Update the acquisition function with a new eta and noise variance"""
self._eta.assign(eta)
self._noise_variance.assign(self._model.get_observation_noise())
@tf.function
[docs]
def __call__(self, at: TensorType) -> TensorType:
tf.debugging.assert_shapes(
[(at, [..., 1, None])],
message="This acquisition function only supports batch sizes of one.",
)
samples = tf.squeeze(self._sampler.sample(at), axis=-1) # [..., S, 1]
improvement = tf.maximum(self._eta - samples, 0.0) # [..., S, 1]
variance = tf.math.reduce_variance(samples, -2) # [..., 1]
augmentation = 1 - (
tf.math.sqrt(self._noise_variance) / tf.math.sqrt(self._noise_variance + variance)
)
return augmentation * tf.reduce_mean(improvement, axis=-2) # [..., 1]
[docs]
class BatchMonteCarloExpectedImprovement(SingleModelAcquisitionBuilder[HasReparamSampler]):
"""
Expected improvement for batches of points (or :math:`q`-EI), approximated using Monte Carlo
estimation with the reparametrization trick. See :cite:`Ginsbourger2010` for details.
Improvement is measured with respect to the minimum predictive mean at observed query points.
This is calculated in :class:`BatchMonteCarloExpectedImprovement` by assuming observations
at new points are independent from those at known query points. This is faster, but is an
approximation for noisy observers.
"""
def __init__(self, sample_size: int, *, jitter: float = DEFAULTS.JITTER):
"""
:param sample_size: The number of samples for each batch of points.
:param jitter: The size of the jitter to use when stabilising the Cholesky decomposition of
the covariance matrix.
:raise tf.errors.InvalidArgumentError: If ``sample_size`` is not positive, or ``jitter``
is negative.
"""
tf.debugging.assert_positive(sample_size)
tf.debugging.assert_greater_equal(jitter, 0.0)
self._sample_size = sample_size
self._jitter = jitter
[docs]
def __repr__(self) -> str:
""""""
return f"BatchMonteCarloExpectedImprovement({self._sample_size!r}, jitter={self._jitter!r})"
[docs]
def prepare_acquisition_function(
self,
model: HasReparamSampler,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param model: The model. Must have event shape [1].
:param dataset: The data from the observer. Must be populated.
:return: The batch *expected improvement* acquisition function.
:raise ValueError (or InvalidArgumentError): If ``dataset`` is not populated, or ``model``
does not have an event shape of [1].
"""
tf.debugging.Assert(dataset is not None, [tf.constant([])])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
mean, _ = model.predict(dataset.query_points)
tf.debugging.assert_shapes(
[(mean, ["_", 1])], message="Expected model with event shape [1]."
)
eta = tf.reduce_min(mean, axis=0)
return batch_monte_carlo_expected_improvement(self._sample_size, model, eta, self._jitter)
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
model: HasReparamSampler,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param model: The model. Must have event shape [1].
:param dataset: The data from the observer. Must be populated.
"""
tf.debugging.Assert(dataset is not None, [tf.constant([])])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
tf.debugging.Assert(
isinstance(function, batch_monte_carlo_expected_improvement), [tf.constant([])]
)
mean, _ = model.predict(dataset.query_points)
eta = tf.reduce_min(mean, axis=0)
function.update(eta) # type: ignore
return function
[docs]
class batch_monte_carlo_expected_improvement(AcquisitionFunctionClass):
def __init__(self, sample_size: int, model: HasReparamSampler, eta: TensorType, jitter: float):
"""
:param sample_size: The number of Monte-Carlo samples.
:param model: The model of the objective function.
:param eta: The "best" observation.
:param jitter: The size of the jitter to use when stabilising the Cholesky decomposition of
the covariance matrix.
:return: The expected improvement function. This function will raise
:exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size
greater than one.
"""
self._sample_size = sample_size
if not isinstance(model, HasReparamSampler):
raise ValueError(
f"The batch Monte-Carlo expected improvement acquisition function only supports "
f"models that implement a reparam_sampler method; received {model!r}"
)
sampler = model.reparam_sampler(self._sample_size)
self._sampler = sampler
self._eta = tf.Variable(eta)
self._jitter = jitter
[docs]
def update(self, eta: TensorType) -> None:
"""Update the acquisition function with a new eta value and reset the reparam sampler."""
self._eta.assign(eta)
self._sampler.reset_sampler()
@tf.function
[docs]
def __call__(self, x: TensorType) -> TensorType:
samples = tf.squeeze(self._sampler.sample(x, jitter=self._jitter), axis=-1) # [..., S, B]
min_sample_per_batch = tf.reduce_min(samples, axis=-1) # [..., S]
batch_improvement = tf.maximum(self._eta - min_sample_per_batch, 0.0) # [..., S]
return tf.reduce_mean(batch_improvement, axis=-1, keepdims=True) # [..., 1]
[docs]
class BatchExpectedImprovement(SingleModelAcquisitionBuilder[ProbabilisticModel]):
"""Accurate approximation of the batch expected improvement, using the
method of Chvallier and Ginsbourger :cite:`chevalier2013fast`.
Internally, this uses a highly accurate approximation of the cumulative
density function of the multivariate Gaussian, developed by Alan Genz
:cite:`genz2016numerical`.
"""
def __init__(
self,
sample_size: int,
*,
jitter: float = DEFAULTS.JITTER,
):
"""Initialise the BatchExpectedImprovement instance.
:param sample_size: int, number of Sobol samples to use.
:param jitter: float, amount of jitter for Cholesky factorisations.
"""
tf.debugging.assert_positive(sample_size)
tf.debugging.assert_greater_equal(jitter, 0.0)
self._sample_size = sample_size
self._jitter = jitter
[docs]
def __repr__(self) -> str:
""""""
return f"BatchExpectedImprovement({self._sample_size!r}, jitter={self._jitter!r})"
[docs]
def prepare_acquisition_function(
self,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param model: The model. Must have event shape [1].
:param dataset: The data from the observer. Must be populated.
:return: The batch *expected improvement* acquisition function.
:raise ValueError (or InvalidArgumentError): If ``dataset`` is not populated, or ``model``
does not have an event shape of [1].
"""
tf.debugging.Assert(dataset is not None, [])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
# Get mean and covariance
mean, _ = model.predict(dataset.query_points)
tf.debugging.assert_shapes(
[(mean, ["_", 1])],
message="Expected model with event shape [1].",
)
eta = tf.reduce_min(mean, axis=0)
acquisition_function = batch_expected_improvement(
self._sample_size,
model,
eta,
self._jitter,
)
return acquisition_function
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param model: The model. Must have event shape [1].
:param dataset: The data from the observer. Must be populated.
"""
tf.debugging.Assert(dataset is not None, [])
dataset = cast(Dataset, dataset)
tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.")
tf.debugging.Assert(isinstance(function, batch_expected_improvement), [])
# Get mean and covariance
mean, _ = model.predict(dataset.query_points)
eta = tf.reduce_min(mean, axis=0)
function.update(eta=eta) # type: ignore
return function
[docs]
class batch_expected_improvement(AcquisitionFunctionClass):
def __init__(
self,
sample_size: int,
model: ProbabilisticModel,
eta: TensorType,
jitter: float,
):
"""Initialise the batch_expected_improvement instance.
:param sample_size: int, number of samples to use.
:param model: Gaussian process regression model.
:param eta: Tensor of shape (,), expected improvement threshold. This
is the best value observed so far durin the BO loop.
:param jitter: float, amount of jitter for Cholesky factorisations.
"""
self._sample_size = sample_size
self._jitter = jitter
self._eta = tf.Variable(eta)
self._model = model
self._mvn_cdf_1: Optional[MultivariateNormalCDF] = None
self._mvn_cdf_2: Optional[MultivariateNormalCDF] = None
self._num_sobol_skip = int(tf.math.floor(10**9 * tf.random.uniform((), dtype=tf.float32)))
[docs]
def update(self, eta: TensorType) -> None:
"""Update the acquisition function with a new eta value and reset the
reparam sampler.
"""
self._eta.assign(eta)
self._num_sobol_skip = int(tf.math.floor(10**9 * tf.random.uniform((), dtype=tf.float32)))
[docs]
def _compute_bm(
self,
mean: tf.Tensor,
threshold: tf.Tensor,
) -> TensorType:
"""Helper function for the batch expected improvement, which computes
the tensors b and m as detailed in Chevalier and Ginsbourger
:cite:`chevalier2013fast`.
:param mean: Tensor of shape (B, Q)
:param threshold: Tensor of shape (B,)
:returns b: Tensor of shape (B, Q, Q)
:returns m: Tensor of shape (B, Q, Q)
"""
# Check shapes of input tensors
tf.debugging.assert_shapes(
[
(mean, ("B", "Q")),
(threshold, ("B",)),
]
)
# Unpack tensor shape and data type
B, Q = mean.shape
dtype = mean.dtype
# Compute b tensor
threshold = tf.tile(threshold[:, None], (1, Q))
threshold = tf.linalg.diag(threshold) # (B, Q, Q)
b = tf.zeros(shape=(B, Q, Q), dtype=dtype)
b = b - threshold
# Compute m tensor
m = mean[:, None, :] - mean[:, :, None] # (B, Q, Q)
m = m - tf.linalg.diag(mean) # (B, Q, Q)
return b, m
[docs]
def _delta(
self, idx: int, dim: int, B: int, transpose: bool, dtype: tf.DType
) -> TensorType: # pragma: no cover (tf.map_fn)
"""Helper function for the _compute_Sigma function, which computes a
*delta* tensor of shape (B, idx, idx) such that
delta[B, i, :] = 1 if i == idx
delta[B, i, :] = 0 otherwise.
If transpose == True, then the last two dimensions of the tensor are
transposed, in which case
delta[B, :, i] = 1 if i == idx
delta[B, :, i] = 0 otherwise.
:param idx: Index for entries equal to 1.
:param dim: Dimension of the last and second to last axes.
:param B: Leading dimension of tensor.
:param transpose: Whether to transpose the last two dimensions or not.
:param dtype: The dtype of the tensor, either tf.float32 or tf.float64.
"""
# Check input parameters
tf.debugging.assert_non_negative(idx)
tf.debugging.assert_non_negative(dim)
tf.debugging.assert_positive(B)
o1 = tf.ones(shape=(B, idx, dim), dtype=dtype)
z1 = tf.zeros(shape=(B, 1, dim), dtype=dtype)
o2 = tf.ones(shape=(B, dim - idx - 1, dim), dtype=dtype)
delta = tf.concat([o1, z1, o2], axis=1)
delta = tf.transpose(delta, perm=[0, 2, 1]) if transpose else delta
return delta
[docs]
def _compute_Sigma(
self,
covariance: tf.Tensor,
) -> TensorType:
"""Helper function for the batch expected improvement, which computes
the tensor Sigma, as detailed in Chevalier and Ginsbourger
:cite:`chevalier2013fast`.
:param covariance: Tensor of shape (B, Q, Q)
:returns Sigma: Tensor of shape (B, Q, Q, Q)
"""
# Check shapes of covariance tensor
tf.debugging.assert_shapes([(covariance, ("B", "Q", "Q"))])
# Unpack tensor shape and dtype
B, Q, _ = covariance.shape
dtype = covariance.dtype
Sigma = tf.zeros(shape=(B, Q, Q, Q))
def compute_single_slice(q: int) -> TensorType: # pragma: no cover (tf.map_fn)
diq = self._delta(q, Q, B, transpose=False, dtype=dtype)
dqj = self._delta(q, Q, B, transpose=True, dtype=dtype)
Sigma_ij = covariance[:, :, :]
Sigma_iq = covariance[:, :, q : q + 1]
Sigma_qj = covariance[:, q : q + 1, :]
Sigma_qq = covariance[:, q : q + 1, q : q + 1]
cov = Sigma_ij * diq * dqj - Sigma_iq * diq - Sigma_qj * dqj + Sigma_qq
return cov
Sigma = tf.map_fn(
compute_single_slice,
tf.range(Q),
fn_output_signature=dtype,
)
Sigma = tf.transpose(Sigma, perm=[1, 0, 2, 3])
return Sigma
[docs]
def _compute_p(
self,
m_reshaped: tf.Tensor,
b_reshaped: tf.Tensor,
Sigma_reshaped: tf.Tensor,
mvn_cdf: Callable[[TensorType, TensorType, TensorType, float], TensorType],
) -> TensorType:
"""Helper function for the batch expected improvement, which computes
the tensor p, as detailed in Chevalier and Ginsbourger
:cite:`chevalier2013fast`.
:param m_reshaped: Tensor of shape (BQ, Q)
:param b_reshaped: Tensor of shape (BQ, Q)
:param Sigma_reshaped: Tensor of shape (BQ, Q, Q)
:returns p: Tensor of shape (B, Q)
"""
# Check shapes of covariance tensor
tf.debugging.assert_shapes(
[
(m_reshaped, ("BQ", "Q")),
(b_reshaped, ("BQ", "Q")),
(Sigma_reshaped, ("BQ", "Q", "Q")),
]
)
# Unpack dtype and mean shape
dtype = m_reshaped.dtype
BQ, Q = m_reshaped.shape # (B*Q, Q)
if BQ % Q == 0:
B = BQ // Q
else:
raise ValueError(
f"Expected size of dimension 0 of m_reshaped tensor to be "
f"divisible by size of dimension 1, instead found "
f"{m_reshaped.shape[0]} and {m_reshaped.shape[1]}."
)
# Compute mean, covariance and x for p mvn normal cdf
p_cdf_mean = tf.zeros(shape=(BQ, Q), dtype=dtype) # (B*Q, Q)
p_cdf_cov = Sigma_reshaped # (B*Q, Q, Q)
p_cdf_x = b_reshaped - m_reshaped # (B*Q, Q)
p = mvn_cdf( # type: ignore
x=p_cdf_x,
mean=p_cdf_mean,
cov=p_cdf_cov,
) # (B*Q,)
p = tf.reshape(p, shape=(B, Q)) # (B, Q)
return p
[docs]
def _compute_c(
self,
m_reshaped: tf.Tensor,
b_reshaped: tf.Tensor,
Sigma_reshaped: tf.Tensor,
) -> TensorType:
"""Helper function for the batch expected improvement, which computes
the tensor c, which is the c^{(i)} tensor detailed in Chevalier and
Ginsbourger :cite:`chevalier2013fast`.
:param m_reshaped: Tensor of shape (BQ, Q)
:param b_reshaped: Tensor of shape (BQ, Q)
:param Sigma_reshaped: Tensor of shape (BQ, Q, Q)
:returns c: Tensor of shape (B, Q, Q-1)
"""
# Check shapes of covariance tensor
tf.debugging.assert_shapes(
[
(m_reshaped, ("BQ", "Q")),
(b_reshaped, ("BQ", "Q")),
(Sigma_reshaped, ("BQ", "Q", "Q")),
]
)
# Unpack tensor shape
BQ, Q = m_reshaped.shape
# Compute difference between b and m tensors
diff = b_reshaped - m_reshaped # (B*Q, Q)
# Compute c, including the ith entry, which we want to remove
cov_ratio = Sigma_reshaped / tf.linalg.diag_part(Sigma_reshaped)[:, :, None] # (B*Q, Q, Q)
c = diff[:, None, :] - diff[:, :, None] * cov_ratio # (B*Q, Q, Q)
# Remove the ith entry by masking c with a boolean mask with False across
# the diagonal and True in the off-diagonal terms
mask = tf.math.logical_not(tf.cast(tf.eye(Q, dtype=tf.int32), dtype=tf.bool))
mask = tf.tile(mask[None, :, :], (c.shape[0], 1, 1))
c = tf.ragged.boolean_mask(c, mask).to_tensor()
return c
[docs]
def _compute_R(
self,
Sigma_reshaped: tf.Tensor,
) -> TensorType:
"""Helper function for the batch expected improvement, which computes
the tensor R, which is the Sigma^{(i)} tensor detailed in Chevalier
and Ginsbourger :cite:`chevalier2013fast`.
:param Sigma_reshaped: Tensor of shape (BQ, Q, Q)
:returns R: Tensor of shape (B, Q-1, Q-1)
"""
# Check shapes of covariance tensor
tf.debugging.assert_shapes([(Sigma_reshaped, ("BQ", "Q", "Q"))])
# Unpack tensor shape
BQ, Q, _ = Sigma_reshaped.shape
Sigma_uv = tf.tile(Sigma_reshaped[:, None, :, :], (1, Q, 1, 1))
Sigma_iu = tf.tile(Sigma_reshaped[:, :, :, None], (1, 1, 1, Q))
Sigma_iv = tf.tile(Sigma_reshaped[:, :, None, :], (1, 1, Q, 1))
Sigma_ii = tf.linalg.diag_part(Sigma_reshaped)[:, :, None, None]
R_whole = Sigma_uv - Sigma_iu * Sigma_iv / Sigma_ii
def create_blocks(q: int) -> TensorType: # pragma: no cover (tf.map_fn)
block1 = tf.concat(
[
R_whole[:, q, :q, :q],
R_whole[:, q, q + 1 :, :q],
],
axis=1,
)
block2 = tf.concat(
[
R_whole[:, q, :q, q + 1 :],
R_whole[:, q, q + 1 :, q + 1 :],
],
axis=1,
)
R_block = tf.concat([block1, block2], axis=2)
return R_block
R = tf.map_fn(
create_blocks,
tf.range(Q),
fn_output_signature=R_whole.dtype,
)
R = tf.transpose(R, perm=[1, 0, 2, 3])
return R
[docs]
def _compute_Phi(
self,
c: tf.Tensor,
R: tf.Tensor,
mvn_cdf: Callable[[TensorType, TensorType, TensorType, float], TensorType],
) -> TensorType:
"""Helper function for the batch expected improvement, which computes
the tensor Phi, which is the tensor of multivariate Gaussian CDFs, in
the inner sum of the equation (3) in Chevalier and Ginsbourger
:cite:`chevalier2013fast`.
:param c: Tensor of shape (BQ, Q, Q-1).
:param R: Tensor of shape (BQ, Q, Q-1, Q-1).
:param mvn_cdf: Multivariate Gaussian CDF, made using MultivariateNormalCDF.
:returns Phi: Tensor of multivariate Gaussian CDFs.
"""
# Check shapes of covariance tensor
tf.debugging.assert_shapes(
[
(c, ("BQ", "Q", "Q_")),
(R, ("BQ", "Q", "Q_", "Q_")),
]
)
# Unpack tensor shape and data type
BQ, Q, _, Q_ = R.shape
dtype = R.dtype
try:
assert BQ % Q == 0
except AssertionError:
raise ValueError(
f"Expected size of dimension 0 of R tensor to be "
f"divisible by size of dimension 1, instead found "
f"{R.shape[0]} and {R.shape[1]}."
)
# Compute parallelisation dimension from batch size
B = BQ // Q
c_reshaped = tf.reshape(c, (BQ * Q, Q - 1))
R_reshaped = tf.reshape(R, (BQ * Q, Q - 1, Q - 1))
# Compute mean, covariance and x for Phi mvn normal cdf
Phi_cdf_x = c_reshaped # (B*Q, Q-1)
Phi_cdf_mean = tf.zeros(shape=(BQ * Q, Q - 1), dtype=dtype) # (B*Q*Q, Q)
Phi_cdf_cov = R_reshaped # (B*Q*Q, Q-1, Q-1)
# Compute multivariate cdfs
mvn_cdfs = mvn_cdf( # type: ignore
x=Phi_cdf_x,
mean=Phi_cdf_mean,
cov=Phi_cdf_cov,
)
mvn_cdfs = tf.reshape(mvn_cdfs, (B, Q, Q)) # (B, Q, Q)
return mvn_cdfs
[docs]
def _compute_batch_expected_improvement(
self,
mean: tf.Tensor,
covariance: tf.Tensor,
threshold: tf.Tensor,
mvn_cdf_1: Callable[[TensorType, TensorType, TensorType, float], TensorType],
mvn_cdf_2: Callable[[TensorType, TensorType, TensorType, float], TensorType],
) -> TensorType:
"""Accurate Monte Carlo approximation of the batch expected
improvement, using the method of Chevalier and Ginsbourger
:cite:`chevalier2013fast`.
:param mean: Tensor of shape (B, Q).
:param covariance: Tensor of shape (B, Q, Q).
:param threshold: Tensor of shape (B, Q).
:param mvn_cdf_1: Callable computing the multivariate CDF of a Q-dimensional Gaussian.
:param mvn_cdf_2: Callable computing the multivariate CDF of a (Q-1)-dimensional Gaussian.
:returns ei: Tensor of shape (B,), expected improvement.
"""
# Check shapes of covariance tensor
tf.debugging.assert_shapes(
[
(mean, ("B", "Q")),
(covariance, ("B", "Q", "Q")),
(threshold, ("B",)),
]
)
# Unpack and mean shape
B, Q = mean.shape
# Compute b and m tensors
b, m = self._compute_bm(
mean=mean,
threshold=threshold,
) # (B, Q, Q), (B, Q, Q)
# Compute Sigma
Sigma = self._compute_Sigma(covariance=covariance) # (B, Q, Q, Q)
# Reshape all tensors, for batching
b_reshaped = tf.reshape(b, (B * Q, Q))
m_reshaped = tf.reshape(m, (B * Q, Q))
Sigma_reshaped = tf.reshape(Sigma, (B * Q, Q, Q))
# Compute p tensor
p = self._compute_p(
m_reshaped=m_reshaped,
b_reshaped=b_reshaped,
Sigma_reshaped=Sigma_reshaped,
mvn_cdf=mvn_cdf_1,
)
# Compute c
c = self._compute_c(
m_reshaped=m_reshaped,
b_reshaped=b_reshaped,
Sigma_reshaped=Sigma_reshaped,
) # (B*Q, Q, Q-1)
# Compute Sigma_i
R = self._compute_R(
Sigma_reshaped=Sigma_reshaped,
) # (B*Q, Q, Q-1, Q-1)
# Compute Q-1 multivariate CDFs
Phi_mvn_cdfs = self._compute_Phi(
c=c,
R=R,
mvn_cdf=mvn_cdf_2,
)
# Compute univariate pdfs
S_diag = tf.linalg.diag_part(Sigma)
normal = tfp.distributions.Normal(loc=m, scale=S_diag**0.5)
uvn_pdfs = tf.math.exp(normal.log_prob(b)) # (B, Q, Q)
Sigma_diag = tf.linalg.diag_part(tf.transpose(Sigma, perm=[0, 2, 1, 3]))
Sigma_diag = tf.transpose(Sigma_diag, perm=[0, 2, 1])
T = tf.tile(threshold[:, None], (1, Q))
mean_T_term = (mean - T) * p
# Compute inner sum
sum_term = tf.reduce_sum(
Sigma_diag * uvn_pdfs * Phi_mvn_cdfs,
axis=2,
)
# Compute outer sum
ei = tf.reduce_sum(mean_T_term + sum_term, axis=1)
return ei
@tf.function
[docs]
def __call__(self, x: TensorType) -> TensorType:
"""Computes the accurate approximation of the multi-point expected
improvement.
:param x: Tensor of shape (B, Q, D).
:returns ei: Tensor of shape (B,), expected improvement.
"""
if self._mvn_cdf_1 is None:
self._mvn_cdf_1 = MultivariateNormalCDF(
sample_size=self._sample_size,
dim=x.shape[1],
dtype=x.dtype,
num_sobol_skip=self._num_sobol_skip,
)
if self._mvn_cdf_2 is None:
self._mvn_cdf_2 = MultivariateNormalCDF(
sample_size=self._sample_size,
dim=x.shape[1] - 1,
dtype=x.dtype,
num_sobol_skip=self._num_sobol_skip,
)
mean, covariance = self._model.predict_joint(x) # type: ignore
mean = mean[:, :, 0]
covariance = covariance[:, 0, :, :]
covariance = (
covariance
+ 1e-6
* tf.eye(
covariance.shape[-1],
dtype=covariance.dtype,
)[None, :, :]
)
threshold = tf.tile(self._eta, (mean.shape[0],))
# Check shapes of x, mean, covariance and threshold tensors
tf.debugging.assert_shapes(
[
(x, ("B", "Q", "D")),
(mean, ("B", "Q")),
(covariance, ("B", "Q", "Q")),
(threshold, ("B",)),
]
)
ei = self._compute_batch_expected_improvement(
mean=-mean,
covariance=covariance,
threshold=-threshold,
mvn_cdf_1=self._mvn_cdf_1,
mvn_cdf_2=self._mvn_cdf_2,
)[:, None]
return ei
[docs]
class MultipleOptimismNegativeLowerConfidenceBound(
SingleModelVectorizedAcquisitionBuilder[ProbabilisticModel]
):
"""
A simple parallelization of the lower confidence bound acquisition function that produces
a vectorized acquisition function which can efficiently optimized even for large batches.
See :cite:`torossian2020bayesian` for details.
"""
def __init__(self, search_space: SearchSpace):
"""
:param search_space: The global search space over which the optimisation is defined.
"""
self._search_space = search_space
[docs]
def __repr__(self) -> str:
""""""
return f"MultipleOptimismNegativeLowerConfidenceBound({self._search_space!r})"
[docs]
def prepare_acquisition_function(
self,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param model: The model.
:param dataset: Unused.
:return: The multiple optimism negative lower confidence bound function.
"""
return multiple_optimism_lower_confidence_bound(model, self._search_space.dimension)
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
model: ProbabilisticModel,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param model: The model.
:param dataset: Unused.
"""
tf.debugging.Assert(
isinstance(function, multiple_optimism_lower_confidence_bound), [tf.constant([])]
)
return function # nothing to update
[docs]
class multiple_optimism_lower_confidence_bound(AcquisitionFunctionClass):
r"""
The multiple optimism lower confidence bound (MOLCB) acquisition function for single-objective
global optimization.
Each batch dimension of this acquisiton function correponds to a lower confidence bound
acquisition function with different beta values, i.e. each point in a batch chosen by this
acquisition function lies on a gradient of exploration/exploitation trade-offs.
We choose the different beta values following the cdf method of :cite:`torossian2020bayesian`.
See their paper for more details.
"""
def __init__(self, model: ProbabilisticModel, search_space_dim: int):
"""
:param model: The model of the objective function.
:param search_space_dim: The dimensions of the optimisation problem's search space.
:raise tf.errors.InvalidArgumentError: If ``search_space_dim`` is not postive.
"""
tf.debugging.assert_positive(search_space_dim)
self._search_space_dim = search_space_dim
self._model = model
self._initialized = tf.Variable(False) # Keep track of when we need to resample
self._betas = tf.Variable(tf.ones([0], dtype=tf.float64), shape=[None]) # [0] lazy init
@tf.function
[docs]
def __call__(self, x: TensorType) -> TensorType:
batch_size = tf.shape(x)[-2]
tf.debugging.assert_positive(batch_size)
if self._initialized: # check batch size hasnt changed during BO
tf.debugging.assert_equal(
batch_size,
tf.shape(self._betas)[0],
f"{type(self).__name__} requires a fixed batch size. Got batch size {batch_size}"
f" but previous batch size was {tf.shape(self._betas)[0]}.",
)
if not self._initialized:
normal = tfp.distributions.Normal(
tf.constant(0.0, dtype=x.dtype), tf.constant(1.0, dtype=x.dtype)
)
spread = 0.5 + 0.5 * tf.range(1, batch_size + 1, dtype=x.dtype) / (
tf.cast(batch_size, dtype=x.dtype) + 1.0
) # [B]
betas = normal.quantile(spread) # [B]
scaled_betas = 5.0 * tf.cast(self._search_space_dim, dtype=x.dtype) * betas # [B]
self._betas.assign(scaled_betas) # [B]
self._initialized.assign(True)
mean, variance = self._model.predict(x) # [..., B, 1]
mean, variance = tf.squeeze(mean, -1), tf.squeeze(variance, -1)
return -mean + tf.sqrt(variance) * self._betas # [..., B]
[docs]
class MakePositive(SingleModelAcquisitionBuilder[ProbabilisticModelType]):
r"""
Converts an acquisition function builder into one that only returns positive values, via
:math:`x \mapsto \log(1 + \exp(x))`.
This is sometimes a useful transformation: for example, converting non-batch acquisition
functions into batch acquisition functions with local penalization requires functions
that only return positive values.
"""
def __init__(
self,
base_acquisition_function_builder: SingleModelAcquisitionBuilder[ProbabilisticModelType],
) -> None:
"""
:param base_acquisition_function_builder: Base acquisition function to be made positive.
"""
self._base_builder = base_acquisition_function_builder
[docs]
def __repr__(self) -> str:
""""""
return f"MakePositive({self._base_builder})"
[docs]
def prepare_acquisition_function(
self,
model: ProbabilisticModelType,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param model: The model.
:param dataset: The data to use to build the acquisition function (optional).
:return: An acquisition function.
"""
self._base_function = self._base_builder.prepare_acquisition_function(model, dataset)
@tf.function
def acquisition(x: TensorType) -> TensorType:
return tf.math.log(1 + tf.math.exp(self._base_function(x)))
return acquisition
[docs]
def update_acquisition_function(
self,
function: AcquisitionFunction,
model: ProbabilisticModelType,
dataset: Optional[Dataset] = None,
) -> AcquisitionFunction:
"""
:param function: The acquisition function to update.
:param model: The model.
:param dataset: The data from the observer (optional).
:return: The updated acquisition function.
"""
up_fn = self._base_builder.update_acquisition_function(self._base_function, model, dataset)
if up_fn is self._base_function:
return function
else:
self._base_function = up_fn
@tf.function
def acquisition(x: TensorType) -> TensorType:
return tf.math.log(1 + tf.math.exp(self._base_function(x)))
return acquisition