Source code for trieste.acquisition.function.multi_objective

# Copyright 2021 The Trieste Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains multi-objective acquisition function builders.
"""
from __future__ import annotations

import math
from itertools import combinations, product
from typing import Callable, Mapping, Optional, Sequence, cast

import tensorflow as tf
import tensorflow_probability as tfp

from ...data import Dataset
from ...models import ProbabilisticModel, ReparametrizationSampler
from ...models.interfaces import HasReparamSampler
from ...observer import OBJECTIVE
from ...types import Tag, TensorType
from ...utils import DEFAULTS
from ..interface import (
    AcquisitionFunction,
    AcquisitionFunctionBuilder,
    AcquisitionFunctionClass,
    GreedyAcquisitionFunctionBuilder,
    PenalizationFunction,
    ProbabilisticModelType,
    SingleModelAcquisitionBuilder,
)
from ..multi_objective.pareto import (
    Pareto,
    get_reference_point,
    prepare_default_non_dominated_partition_bounds,
)
from .function import ExpectedConstrainedImprovement


[docs] class ExpectedHypervolumeImprovement(SingleModelAcquisitionBuilder[ProbabilisticModel]): """ Builder for the expected hypervolume improvement acquisition function. The implementation of the acquisition function largely follows :cite:`yang2019efficient` """ def __init__( self, reference_point_spec: ( Sequence[float] | TensorType | Callable[..., TensorType] ) = get_reference_point, ): """ :param reference_point_spec: this method is used to determine how the reference point is calculated. If a Callable function specified, it is expected to take existing posterior mean-based observations (to screen out the observation noise) and return a reference point with shape [D] (D represents number of objectives). If the Pareto front location is known, this arg can be used to specify a fixed reference point in each bo iteration. A dynamic reference point updating strategy is used by default to set a reference point according to the datasets. """ if callable(reference_point_spec): self._ref_point_spec: tf.Tensor | Callable[..., TensorType] = reference_point_spec else: self._ref_point_spec = tf.convert_to_tensor(reference_point_spec) self._ref_point = None
[docs] def __repr__(self) -> str: """""" if callable(self._ref_point_spec): return f"ExpectedHypervolumeImprovement({self._ref_point_spec.__name__})" else: return f"ExpectedHypervolumeImprovement({self._ref_point_spec!r})"
[docs] def prepare_acquisition_function( self, model: ProbabilisticModel, dataset: Optional[Dataset] = None, ) -> AcquisitionFunction: """ :param model: The model. :param dataset: The data from the observer. Must be populated. :return: The expected hypervolume improvement acquisition function. """ tf.debugging.Assert(dataset is not None, [tf.constant([])]) dataset = cast(Dataset, dataset) tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.") mean, _ = model.predict(dataset.query_points) if callable(self._ref_point_spec): self._ref_point = tf.cast(self._ref_point_spec(mean), dtype=mean.dtype) else: self._ref_point = tf.cast(self._ref_point_spec, dtype=mean.dtype) _pf = Pareto(mean) screened_front = _pf.front[tf.reduce_all(_pf.front <= self._ref_point, -1)] # prepare the partitioned bounds of non-dominated region for calculating of the # hypervolume improvement in this area _partition_bounds = prepare_default_non_dominated_partition_bounds( self._ref_point, screened_front ) return expected_hv_improvement(model, _partition_bounds)
[docs] def update_acquisition_function( self, function: AcquisitionFunction, model: ProbabilisticModel, dataset: Optional[Dataset] = None, ) -> AcquisitionFunction: """ :param function: The acquisition function to update. :param model: The model. :param dataset: The data from the observer. Must be populated. """ tf.debugging.Assert(dataset is not None, [tf.constant([])]) dataset = cast(Dataset, dataset) tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.") tf.debugging.Assert(isinstance(function, expected_hv_improvement), [tf.constant([])]) mean, _ = model.predict(dataset.query_points) if callable(self._ref_point_spec): self._ref_point = self._ref_point_spec(mean) else: assert isinstance(self._ref_point_spec, tf.Tensor) # specified a fixed ref point self._ref_point = tf.cast(self._ref_point_spec, dtype=mean.dtype) _pf = Pareto(mean) screened_front = _pf.front[tf.reduce_all(_pf.front <= self._ref_point, -1)] _partition_bounds = prepare_default_non_dominated_partition_bounds( self._ref_point, screened_front ) function.update(_partition_bounds) # type: ignore return function
[docs] class expected_hv_improvement(AcquisitionFunctionClass): def __init__(self, model: ProbabilisticModel, partition_bounds: tuple[TensorType, TensorType]): r""" expected Hyper-volume (HV) calculating using Eq. 44 of :cite:`yang2019efficient` paper. The expected hypervolume improvement calculation in the non-dominated region can be decomposed into sub-calculations based on each partitioned cell. For easier calculation, this sub-calculation can be reformulated as a combination of two generalized expected improvements, corresponding to Psi (Eq. 44) and Nu (Eq. 45) function calculations, respectively. Note: 1. Since in Trieste we do not assume the use of a certain non-dominated region partition algorithm, we do not assume the last dimension of the partitioned cell has only one (lower) bound (i.e., minus infinity, which is used in the :cite:`yang2019efficient` paper). This is not as efficient as the original paper, but is applicable to different non-dominated partition algorithm. 2. As the Psi and nu function in the original paper are defined for maximization problems, we inverse our minimisation problem (to also be a maximisation), allowing use of the original notation and equations. :param model: The model of the objective function. :param partition_bounds: with shape ([N, D], [N, D]), partitioned non-dominated hypercell bounds for hypervolume improvement calculation :return: The expected_hv_improvement acquisition function modified for objective minimisation. This function will raise :exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size greater than one. """ self._model = model self._lb_points = tf.Variable( partition_bounds[0], trainable=False, shape=[None, partition_bounds[0].shape[-1]] ) self._ub_points = tf.Variable( partition_bounds[1], trainable=False, shape=[None, partition_bounds[1].shape[-1]] ) self._cross_index = tf.constant( list(product(*[[0, 1]] * self._lb_points.shape[-1])) ) # [2^d, indices_at_dim]
[docs] def update(self, partition_bounds: tuple[TensorType, TensorType]) -> None: """Update the acquisition function with new partition bounds.""" self._lb_points.assign(partition_bounds[0]) self._ub_points.assign(partition_bounds[1])
@tf.function
[docs] def __call__(self, x: TensorType) -> TensorType: tf.debugging.assert_shapes( [(x, [..., 1, None])], message="This acquisition function only supports batch sizes of one.", ) normal = tfp.distributions.Normal( loc=tf.zeros(shape=1, dtype=x.dtype), scale=tf.ones(shape=1, dtype=x.dtype) ) def Psi(a: TensorType, b: TensorType, mean: TensorType, std: TensorType) -> TensorType: return std * normal.prob((b - mean) / std) + (mean - a) * ( 1 - normal.cdf((b - mean) / std) ) def nu(lb: TensorType, ub: TensorType, mean: TensorType, std: TensorType) -> TensorType: return (ub - lb) * (1 - normal.cdf((ub - mean) / std)) def ehvi_based_on_partitioned_cell( neg_pred_mean: TensorType, pred_std: TensorType ) -> TensorType: r""" Calculate the ehvi based on cell i. """ neg_lb_points, neg_ub_points = -self._ub_points, -self._lb_points neg_ub_points = tf.minimum(neg_ub_points, 1e10) # clip to improve numerical stability psi_ub = Psi( neg_lb_points, neg_ub_points, neg_pred_mean, pred_std ) # [..., num_cells, out_dim] psi_lb = Psi( neg_lb_points, neg_lb_points, neg_pred_mean, pred_std ) # [..., num_cells, out_dim] psi_lb2ub = tf.maximum(psi_lb - psi_ub, 0.0) # [..., num_cells, out_dim] nu_contrib = nu(neg_lb_points, neg_ub_points, neg_pred_mean, pred_std) stacked_factors = tf.concat( [tf.expand_dims(psi_lb2ub, -2), tf.expand_dims(nu_contrib, -2)], axis=-2 ) # Take the cross product of psi_diff and nu across all outcomes # [..., num_cells, 2(operation_num, refer Eq. 45), num_obj] factor_combinations = tf.linalg.diag_part( tf.gather(stacked_factors, self._cross_index, axis=-2) ) # [..., num_cells, 2^d, 2(operation_num), num_obj] return tf.reduce_sum(tf.reduce_prod(factor_combinations, axis=-1), axis=-1) candidate_mean, candidate_var = self._model.predict(tf.squeeze(x, -2)) candidate_std = tf.sqrt(candidate_var) neg_candidate_mean = -tf.expand_dims(candidate_mean, 1) # [..., 1, out_dim] candidate_std = tf.expand_dims(candidate_std, 1) # [..., 1, out_dim] ehvi_cells_based = ehvi_based_on_partitioned_cell(neg_candidate_mean, candidate_std) return tf.reduce_sum( ehvi_cells_based, axis=-1, keepdims=True, )
[docs] class BatchMonteCarloExpectedHypervolumeImprovement( SingleModelAcquisitionBuilder[HasReparamSampler] ): """ Builder for the batch expected hypervolume improvement acquisition function. The implementation of the acquisition function largely follows :cite:`daulton2020differentiable` """ def __init__( self, sample_size: int, reference_point_spec: ( Sequence[float] | TensorType | Callable[..., TensorType] ) = get_reference_point, *, jitter: float = DEFAULTS.JITTER, ): """ :param sample_size: The number of samples from model predicted distribution for each batch of points. :param reference_point_spec: this method is used to determine how the reference point is calculated. If a Callable function specified, it is expected to take existing posterior mean-based observations (to screen out the observation noise) and return a reference point with shape [D] (D represents number of objectives). If the Pareto front location is known, this arg can be used to specify a fixed reference point in each bo iteration. A dynamic reference point updating strategy is used by default to set a reference point according to the datasets. :param jitter: The size of the jitter to use when stabilising the Cholesky decomposition of the covariance matrix. :raise ValueError (or InvalidArgumentError): If ``sample_size`` is not positive, or ``jitter`` is negative. """ tf.debugging.assert_positive(sample_size) tf.debugging.assert_greater_equal(jitter, 0.0) self._sample_size = sample_size self._jitter = jitter if callable(reference_point_spec): self._ref_point_spec: tf.Tensor | Callable[..., TensorType] = reference_point_spec else: self._ref_point_spec = tf.convert_to_tensor(reference_point_spec) self._ref_point = None
[docs] def __repr__(self) -> str: """""" if callable(self._ref_point_spec): return ( f"BatchMonteCarloExpectedHypervolumeImprovement({self._sample_size!r}," f" {self._ref_point_spec.__name__}," f" jitter={self._jitter!r})" ) else: return ( f"BatchMonteCarloExpectedHypervolumeImprovement({self._sample_size!r}," f" {self._ref_point_spec!r}" f" jitter={self._jitter!r})" )
[docs] def prepare_acquisition_function( self, model: HasReparamSampler, dataset: Optional[Dataset] = None, ) -> AcquisitionFunction: """ :param model: The model. Must have event shape [1]. :param dataset: The data from the observer. Must be populated. :return: The batch expected hypervolume improvement acquisition function. """ tf.debugging.Assert(dataset is not None, [tf.constant([])]) dataset = cast(Dataset, dataset) tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.") mean, _ = model.predict(dataset.query_points) if callable(self._ref_point_spec): self._ref_point = tf.cast(self._ref_point_spec(mean), dtype=mean.dtype) else: self._ref_point = tf.cast(self._ref_point_spec, dtype=mean.dtype) _pf = Pareto(mean) screened_front = _pf.front[tf.reduce_all(_pf.front <= self._ref_point, -1)] # prepare the partitioned bounds of non-dominated region for calculating of the # hypervolume improvement in this area _partition_bounds = prepare_default_non_dominated_partition_bounds( self._ref_point, screened_front ) if not isinstance(model, HasReparamSampler): raise ValueError( f"The batch Monte-Carlo expected hyper-volume improvement function only supports " f"models that implement a reparam_sampler method; received {model!r}" ) sampler = model.reparam_sampler(self._sample_size) return batch_ehvi(sampler, self._jitter, _partition_bounds)
[docs] def batch_ehvi( sampler: ReparametrizationSampler[HasReparamSampler], sampler_jitter: float, partition_bounds: tuple[TensorType, TensorType], ) -> AcquisitionFunction: """ :param sampler: The posterior sampler, which given query points `at`, is able to sample the possible observations at 'at'. :param sampler_jitter: The size of the jitter to use in sampler when stabilising the Cholesky decomposition of the covariance matrix. :param partition_bounds: with shape ([N, D], [N, D]), partitioned non-dominated hypercell bounds for hypervolume improvement calculation :return: The batch expected hypervolume improvement acquisition function for objective minimisation. """ def acquisition(at: TensorType) -> TensorType: _batch_size = at.shape[-2] # B def gen_q_subset_indices(q: int) -> tf.RaggedTensor: # generate all subsets of [1, ..., q] as indices indices = list(range(q)) return tf.ragged.constant([list(combinations(indices, i)) for i in range(1, q + 1)]) samples = sampler.sample(at, jitter=sampler_jitter) # [..., S, B, num_obj] q_subset_indices = gen_q_subset_indices(_batch_size) hv_contrib = tf.zeros(tf.shape(samples)[:-2], dtype=samples.dtype) lb_points, ub_points = partition_bounds def hv_contrib_on_samples( obj_samples: TensorType, ) -> TensorType: # calculate samples overlapped area's hvi for obj_samples # [..., S, Cq_j, j, num_obj] -> [..., S, Cq_j, num_obj] overlap_vertices = tf.reduce_max(obj_samples, axis=-2) overlap_vertices = tf.maximum( # compare overlap vertices and lower bound of each cell: tf.expand_dims(overlap_vertices, -3), # expand a cell dimension lb_points[tf.newaxis, tf.newaxis, :, tf.newaxis, :], ) # [..., S, K, Cq_j, num_obj] lengths_j = tf.maximum( # get hvi length per obj within each cell (ub_points[tf.newaxis, tf.newaxis, :, tf.newaxis, :] - overlap_vertices), 0.0 ) # [..., S, K, Cq_j, num_obj] areas_j = tf.reduce_sum( # sum over all subsets Cq_j -> [..., S, K] tf.reduce_prod(lengths_j, axis=-1), axis=-1 # calc hvi within each K ) return tf.reduce_sum(areas_j, axis=-1) # sum over cells -> [..., S] for j in tf.range(1, _batch_size + 1): # Inclusion-Exclusion loop q_choose_j = tf.gather(q_subset_indices, j - 1).to_tensor() # gather all combinations having j points from q batch points (Cq_j) j_sub_samples = tf.gather(samples, q_choose_j, axis=-2) # [..., S, Cq_j, j, num_obj] hv_contrib += tf.cast((-1) ** (j + 1), dtype=samples.dtype) * hv_contrib_on_samples( j_sub_samples ) return tf.reduce_mean(hv_contrib, axis=-1, keepdims=True) # average through MC return acquisition
[docs] class ExpectedConstrainedHypervolumeImprovement( ExpectedConstrainedImprovement[ProbabilisticModelType] ): """ Builder for the constrained expected hypervolume improvement acquisition function. This function essentially combines ExpectedConstrainedImprovement and ExpectedHypervolumeImprovement. """ def __init__( self, objective_tag: Tag, constraint_builder: AcquisitionFunctionBuilder[ProbabilisticModelType], min_feasibility_probability: float | TensorType = 0.5, reference_point_spec: ( Sequence[float] | TensorType | Callable[..., TensorType] ) = get_reference_point, ): """ :param objective_tag: The tag for the objective data and model. :param constraint_builder: The builder for the constraint function. :param min_feasibility_probability: The minimum probability of feasibility for a "best point" to be considered feasible. :param reference_point_spec: this method is used to determine how the reference point is calculated. If a Callable function specified, it is expected to take existing posterior mean-based feasible observations (to screen out the observation noise) and return a reference point with shape [D] (D represents number of objectives). If the feasible Pareto front location is known, this arg can be used to specify a fixed reference point in each bo iteration. A dynamic reference point updating strategy is used by default to set a reference point according to the datasets. """ super().__init__(objective_tag, constraint_builder, min_feasibility_probability) if callable(reference_point_spec): self._ref_point_spec: tf.Tensor | Callable[..., TensorType] = reference_point_spec else: self._ref_point_spec = tf.convert_to_tensor(reference_point_spec) self._ref_point = None def __repr__(self) -> str: """""" if callable(self._ref_point_spec): return ( f"ExpectedConstrainedHypervolumeImprovement({self._objective_tag!r}," f" {self._constraint_builder!r}, {self._min_feasibility_probability!r}," f" {self._ref_point_spec.__name__})" ) else: return ( f"ExpectedConstrainedHypervolumeImprovement({self._objective_tag!r}, " f" {self._constraint_builder!r}, {self._min_feasibility_probability!r}," f" ref_point_specification={repr(self._ref_point_spec)!r}" )
[docs] def _update_expected_improvement_fn( self, objective_model: ProbabilisticModelType, feasible_mean: TensorType ) -> None: """ Set or update the unconstrained expected improvement function. :param objective_model: The objective model. :param feasible_mean: The mean of the feasible query points. """ if callable(self._ref_point_spec): self._ref_point = tf.cast( self._ref_point_spec(feasible_mean), dtype=feasible_mean.dtype, ) else: self._ref_point = tf.cast(self._ref_point_spec, dtype=feasible_mean.dtype) _pf = Pareto(feasible_mean) screened_front = _pf.front[tf.reduce_all(_pf.front <= self._ref_point, -1)] # prepare the partitioned bounds of non-dominated region for calculating of the # hypervolume improvement in this area _partition_bounds = prepare_default_non_dominated_partition_bounds( self._ref_point, screened_front, ) self._expected_improvement_fn: Optional[AcquisitionFunction] if self._expected_improvement_fn is None: self._expected_improvement_fn = expected_hv_improvement( objective_model, _partition_bounds ) else: tf.debugging.Assert( isinstance(self._expected_improvement_fn, expected_hv_improvement), [] ) self._expected_improvement_fn.update(_partition_bounds) # type: ignore
[docs] class HIPPO(GreedyAcquisitionFunctionBuilder[ProbabilisticModelType]): r""" HIPPO: HIghly Parallelizable Pareto Optimization Builder of the acquisition function for greedily collecting batches by HIPPO penalization in multi-objective optimization by penalizing batch points by their distance in the objective space. The resulting acquistion function takes in a set of pending points and returns a base multi-objective acquisition function penalized around those points. Penalization is applied to the acquisition function multiplicatively. However, to improve numerical stability, we perform additive penalization in a log space. """ def __init__( self, objective_tag: Tag = OBJECTIVE, base_acquisition_function_builder: ( AcquisitionFunctionBuilder[ProbabilisticModelType] | SingleModelAcquisitionBuilder[ProbabilisticModelType] | None ) = None, ): """ Initializes the HIPPO acquisition function builder. :param objective_tag: The tag for the objective data and model. :param base_acquisition_function_builder: Base acquisition function to be penalized. Defaults to Expected Hypervolume Improvement, also supports its constrained version. """ self._objective_tag = objective_tag if base_acquisition_function_builder is None: self._base_builder: AcquisitionFunctionBuilder[ProbabilisticModelType] = ( ExpectedHypervolumeImprovement().using(self._objective_tag) ) else: if isinstance(base_acquisition_function_builder, SingleModelAcquisitionBuilder): self._base_builder = base_acquisition_function_builder.using(self._objective_tag) else: self._base_builder = base_acquisition_function_builder self._base_acquisition_function: Optional[AcquisitionFunction] = None self._penalization: Optional[PenalizationFunction] = None self._penalized_acquisition: Optional[AcquisitionFunction] = None
[docs] def prepare_acquisition_function( self, models: Mapping[Tag, ProbabilisticModelType], datasets: Optional[Mapping[Tag, Dataset]] = None, pending_points: Optional[TensorType] = None, ) -> AcquisitionFunction: """ Creates a new instance of the acquisition function. :param models: The models. :param datasets: The data from the observer. Must be populated. :param pending_points: The points we penalize with respect to. :return: The HIPPO acquisition function. :raise tf.errors.InvalidArgumentError: If the ``dataset`` is empty. """ tf.debugging.Assert(datasets is not None, [tf.constant([])]) datasets = cast(Mapping[Tag, Dataset], datasets) tf.debugging.Assert(datasets[self._objective_tag] is not None, [tf.constant([])]) tf.debugging.assert_positive( len(datasets[self._objective_tag]), message=f"{self._objective_tag} dataset must be populated.", ) acq = self._update_base_acquisition_function(models, datasets) if pending_points is not None and len(pending_points) != 0: acq = self._update_penalization(acq, models[self._objective_tag], pending_points) return acq
[docs] def update_acquisition_function( self, function: AcquisitionFunction, models: Mapping[Tag, ProbabilisticModelType], datasets: Optional[Mapping[Tag, Dataset]] = None, pending_points: Optional[TensorType] = None, new_optimization_step: bool = True, ) -> AcquisitionFunction: """ Updates the acquisition function. :param function: The acquisition function to update. :param models: The models. :param datasets: The data from the observer. Must be populated. :param pending_points: Points already chosen to be in the current batch (of shape [M,D]), where M is the number of pending points and D is the search space dimension. :param new_optimization_step: Indicates whether this call to update_acquisition_function is to start of a new optimization step, of to continue collecting batch of points for the current step. Defaults to ``True``. :return: The updated acquisition function. """ tf.debugging.Assert(datasets is not None, [tf.constant([])]) datasets = cast(Mapping[Tag, Dataset], datasets) tf.debugging.Assert(datasets[self._objective_tag] is not None, [tf.constant([])]) tf.debugging.assert_positive( len(datasets[self._objective_tag]), message=f"{self._objective_tag} dataset must be populated.", ) tf.debugging.Assert(self._base_acquisition_function is not None, [tf.constant([])]) if new_optimization_step: self._update_base_acquisition_function(models, datasets) if pending_points is None or len(pending_points) == 0: # no penalization required if no pending_points return cast(AcquisitionFunction, self._base_acquisition_function) return self._update_penalization(function, models[self._objective_tag], pending_points)
def _update_penalization( self, function: AcquisitionFunction, model: ProbabilisticModel, pending_points: Optional[TensorType] = None, ) -> AcquisitionFunction: tf.debugging.assert_rank(pending_points, 2) if self._penalized_acquisition is not None and isinstance( self._penalization, hippo_penalizer ): # if possible, just update the penalization function variables # (the type ignore is due to mypy getting confused by tf.function) self._penalization.update(pending_points) # type: ignore[unreachable] return self._penalized_acquisition else: # otherwise construct a new penalized acquisition function self._penalization = hippo_penalizer(model, pending_points) @tf.function def penalized_acquisition(x: TensorType) -> TensorType: log_acq = tf.math.log( cast(AcquisitionFunction, self._base_acquisition_function)(x) ) + tf.math.log(cast(PenalizationFunction, self._penalization)(x)) return tf.math.exp(log_acq) self._penalized_acquisition = penalized_acquisition return penalized_acquisition def _update_base_acquisition_function( self, models: Mapping[Tag, ProbabilisticModelType], datasets: Optional[Mapping[Tag, Dataset]] = None, ) -> AcquisitionFunction: if self._base_acquisition_function is None: self._base_acquisition_function = self._base_builder.prepare_acquisition_function( models, datasets ) else: self._base_acquisition_function = self._base_builder.update_acquisition_function( self._base_acquisition_function, models, datasets ) return self._base_acquisition_function
[docs] class hippo_penalizer: r""" Returns the penalization function used for multi-objective greedy batch Bayesian optimization. A candidate point :math:`x` is penalized based on the Mahalanobis distance to a given pending point :math:`p_i`. Since we assume objectives to be independent, the Mahalanobis distance between these points becomes a Eucledian distance normalized by standard deviation. Penalties for multiple pending points are multiplied, and the resulting quantity is warped with the arctan function to :math:`[0, 1]` interval. :param model: The model over the specified ``dataset``. :param pending_points: The points we penalize with respect to. :return: The penalization function. This function will raise :exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size greater than one. """ def __init__(self, model: ProbabilisticModel, pending_points: TensorType): """Initialize the MO penalizer. :param model: The model. :param pending_points: The points we penalize with respect to. :raise ValueError: If pending points are empty or None. :return: The penalization function. This function will raise :exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size greater than one.""" tf.debugging.Assert( pending_points is not None and len(pending_points) != 0, [tf.constant([])] ) self._model = model self._pending_points = tf.Variable(pending_points, shape=[None, *pending_points.shape[1:]]) pending_means, pending_vars = self._model.predict(self._pending_points) self._pending_means = tf.Variable(pending_means, shape=[None, *pending_means.shape[1:]]) self._pending_vars = tf.Variable(pending_vars, shape=[None, *pending_vars.shape[1:]])
[docs] def update(self, pending_points: TensorType) -> None: """Update the penalizer with new pending points.""" tf.debugging.Assert( pending_points is not None and len(pending_points) != 0, [tf.constant([])] ) self._pending_points.assign(pending_points) pending_means, pending_vars = self._model.predict(self._pending_points) self._pending_means.assign(pending_means) self._pending_vars.assign(pending_vars)
@tf.function def __call__(self, x: TensorType) -> TensorType: tf.debugging.assert_shapes( [(x, [..., 1, None])], message="This penalization function cannot be calculated for batches of points.", ) # x is [N, 1, D] x = tf.squeeze(x, axis=1) # x is now [N, D] x_means, x_vars = self._model.predict(x) # x_means is [N, K], x_vars is [N, K] # where K is the number of models/objectives # self._pending_points is [B, D] where B is the size of the batch collected so far tf.debugging.assert_shapes( [ (x, ["N", "D"]), (self._pending_points, ["B", "D"]), (self._pending_means, ["B", "K"]), (self._pending_vars, ["B", "K"]), (x_means, ["N", "K"]), (x_vars, ["N", "K"]), ], message="""Encountered unexpected shapes while calculating mean and variance of given point x and pending points""", ) x_means_expanded = x_means[:, None, :] pending_means_expanded = self._pending_means[None, :, :] pending_vars_expanded = self._pending_vars[None, :, :] pending_stddevs_expanded = tf.sqrt(pending_vars_expanded) # this computes Mahalanobis distance between x and pending points # since we assume objectives to be independent # it reduces to regular Eucledian distance normalized by standard deviation standardize_mean_diff = ( tf.abs(x_means_expanded - pending_means_expanded) / pending_stddevs_expanded ) # [N, B, K] d = tf.norm(standardize_mean_diff, axis=-1) # [N, B] # warp the distance so that resulting value is from 0 to (nearly) 1 warped_d = (2.0 / math.pi) * tf.math.atan(d) penalty = tf.reduce_prod(warped_d, axis=-1) # [N,] return tf.reshape(penalty, (-1, 1))