# Copyright 2021 The Trieste Contributors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
This module contains multi-objective acquisition function builders.
from __future__ import annotations

import math
from itertools import combinations, product
from typing import Callable, Mapping, Optional, Sequence, cast

import tensorflow as tf
import tensorflow_probability as tfp

from import Dataset
from ...models import ProbabilisticModel, ReparametrizationSampler
from ...models.interfaces import HasReparamSampler
from import OBJECTIVE
from ...types import Tag, TensorType
from ...utils import DEFAULTS
from ..interface import (
from ..multi_objective.pareto import (
from .function import ExpectedConstrainedImprovement

[docs] class ExpectedHypervolumeImprovement(SingleModelAcquisitionBuilder[ProbabilisticModel]): """ Builder for the expected hypervolume improvement acquisition function. The implementation of the acquisition function largely follows :cite:`yang2019efficient` """ def __init__( self, reference_point_spec: ( Sequence[float] | TensorType | Callable[..., TensorType] ) = get_reference_point, ): """ :param reference_point_spec: this method is used to determine how the reference point is calculated. If a Callable function specified, it is expected to take existing posterior mean-based observations (to screen out the observation noise) and return a reference point with shape [D] (D represents number of objectives). If the Pareto front location is known, this arg can be used to specify a fixed reference point in each bo iteration. A dynamic reference point updating strategy is used by default to set a reference point according to the datasets. """ if callable(reference_point_spec): self._ref_point_spec: tf.Tensor | Callable[..., TensorType] = reference_point_spec else: self._ref_point_spec = tf.convert_to_tensor(reference_point_spec) self._ref_point = None
[docs] def __repr__(self) -> str: """""" if callable(self._ref_point_spec): return f"ExpectedHypervolumeImprovement({self._ref_point_spec.__name__})" else: return f"ExpectedHypervolumeImprovement({self._ref_point_spec!r})"
[docs] def prepare_acquisition_function( self, model: ProbabilisticModel, dataset: Optional[Dataset] = None, ) -> AcquisitionFunction: """ :param model: The model. :param dataset: The data from the observer. Must be populated. :return: The expected hypervolume improvement acquisition function. """ tf.debugging.Assert(dataset is not None, [tf.constant([])]) dataset = cast(Dataset, dataset) tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.") mean, _ = model.predict(dataset.query_points) if callable(self._ref_point_spec): self._ref_point = tf.cast(self._ref_point_spec(mean), dtype=mean.dtype) else: self._ref_point = tf.cast(self._ref_point_spec, dtype=mean.dtype) _pf = Pareto(mean) screened_front = _pf.front[tf.reduce_all(_pf.front <= self._ref_point, -1)] # prepare the partitioned bounds of non-dominated region for calculating of the # hypervolume improvement in this area _partition_bounds = prepare_default_non_dominated_partition_bounds( self._ref_point, screened_front ) return expected_hv_improvement(model, _partition_bounds)
[docs] def update_acquisition_function( self, function: AcquisitionFunction, model: ProbabilisticModel, dataset: Optional[Dataset] = None, ) -> AcquisitionFunction: """ :param function: The acquisition function to update. :param model: The model. :param dataset: The data from the observer. Must be populated. """ tf.debugging.Assert(dataset is not None, [tf.constant([])]) dataset = cast(Dataset, dataset) tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.") tf.debugging.Assert(isinstance(function, expected_hv_improvement), [tf.constant([])]) mean, _ = model.predict(dataset.query_points) if callable(self._ref_point_spec): self._ref_point = self._ref_point_spec(mean) else: assert isinstance(self._ref_point_spec, tf.Tensor) # specified a fixed ref point self._ref_point = tf.cast(self._ref_point_spec, dtype=mean.dtype) _pf = Pareto(mean) screened_front = _pf.front[tf.reduce_all(_pf.front <= self._ref_point, -1)] _partition_bounds = prepare_default_non_dominated_partition_bounds( self._ref_point, screened_front ) function.update(_partition_bounds) # type: ignore return function
[docs] class expected_hv_improvement(AcquisitionFunctionClass): def __init__(self, model: ProbabilisticModel, partition_bounds: tuple[TensorType, TensorType]): r""" expected Hyper-volume (HV) calculating using Eq. 44 of :cite:`yang2019efficient` paper. The expected hypervolume improvement calculation in the non-dominated region can be decomposed into sub-calculations based on each partitioned cell. For easier calculation, this sub-calculation can be reformulated as a combination of two generalized expected improvements, corresponding to Psi (Eq. 44) and Nu (Eq. 45) function calculations, respectively. Note: 1. Since in Trieste we do not assume the use of a certain non-dominated region partition algorithm, we do not assume the last dimension of the partitioned cell has only one (lower) bound (i.e., minus infinity, which is used in the :cite:`yang2019efficient` paper). This is not as efficient as the original paper, but is applicable to different non-dominated partition algorithm. 2. As the Psi and nu function in the original paper are defined for maximization problems, we inverse our minimisation problem (to also be a maximisation), allowing use of the original notation and equations. :param model: The model of the objective function. :param partition_bounds: with shape ([N, D], [N, D]), partitioned non-dominated hypercell bounds for hypervolume improvement calculation :return: The expected_hv_improvement acquisition function modified for objective minimisation. This function will raise :exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size greater than one. """ self._model = model self._lb_points = tf.Variable( partition_bounds[0], trainable=False, shape=[None, partition_bounds[0].shape[-1]] ) self._ub_points = tf.Variable( partition_bounds[1], trainable=False, shape=[None, partition_bounds[1].shape[-1]] ) self._cross_index = tf.constant( list(product(*[[0, 1]] * self._lb_points.shape[-1])) ) # [2^d, indices_at_dim]
[docs] def update(self, partition_bounds: tuple[TensorType, TensorType]) -> None: """Update the acquisition function with new partition bounds.""" self._lb_points.assign(partition_bounds[0]) self._ub_points.assign(partition_bounds[1])
[docs] def __call__(self, x: TensorType) -> TensorType: tf.debugging.assert_shapes( [(x, [..., 1, None])], message="This acquisition function only supports batch sizes of one.", ) normal = tfp.distributions.Normal( loc=tf.zeros(shape=1, dtype=x.dtype), scale=tf.ones(shape=1, dtype=x.dtype) ) def Psi(a: TensorType, b: TensorType, mean: TensorType, std: TensorType) -> TensorType: return std * normal.prob((b - mean) / std) + (mean - a) * ( 1 - normal.cdf((b - mean) / std) ) def nu(lb: TensorType, ub: TensorType, mean: TensorType, std: TensorType) -> TensorType: return (ub - lb) * (1 - normal.cdf((ub - mean) / std)) def ehvi_based_on_partitioned_cell( neg_pred_mean: TensorType, pred_std: TensorType ) -> TensorType: r""" Calculate the ehvi based on cell i. """ neg_lb_points, neg_ub_points = -self._ub_points, -self._lb_points neg_ub_points = tf.minimum(neg_ub_points, 1e10) # clip to improve numerical stability psi_ub = Psi( neg_lb_points, neg_ub_points, neg_pred_mean, pred_std ) # [..., num_cells, out_dim] psi_lb = Psi( neg_lb_points, neg_lb_points, neg_pred_mean, pred_std ) # [..., num_cells, out_dim] psi_lb2ub = tf.maximum(psi_lb - psi_ub, 0.0) # [..., num_cells, out_dim] nu_contrib = nu(neg_lb_points, neg_ub_points, neg_pred_mean, pred_std) stacked_factors = tf.concat( [tf.expand_dims(psi_lb2ub, -2), tf.expand_dims(nu_contrib, -2)], axis=-2 ) # Take the cross product of psi_diff and nu across all outcomes # [..., num_cells, 2(operation_num, refer Eq. 45), num_obj] factor_combinations = tf.linalg.diag_part( tf.gather(stacked_factors, self._cross_index, axis=-2) ) # [..., num_cells, 2^d, 2(operation_num), num_obj] return tf.reduce_sum(tf.reduce_prod(factor_combinations, axis=-1), axis=-1) candidate_mean, candidate_var = self._model.predict(tf.squeeze(x, -2)) candidate_std = tf.sqrt(candidate_var) neg_candidate_mean = -tf.expand_dims(candidate_mean, 1) # [..., 1, out_dim] candidate_std = tf.expand_dims(candidate_std, 1) # [..., 1, out_dim] ehvi_cells_based = ehvi_based_on_partitioned_cell(neg_candidate_mean, candidate_std) return tf.reduce_sum( ehvi_cells_based, axis=-1, keepdims=True, )
[docs] class BatchMonteCarloExpectedHypervolumeImprovement( SingleModelAcquisitionBuilder[HasReparamSampler] ): """ Builder for the batch expected hypervolume improvement acquisition function. The implementation of the acquisition function largely follows :cite:`daulton2020differentiable` """ def __init__( self, sample_size: int, reference_point_spec: ( Sequence[float] | TensorType | Callable[..., TensorType] ) = get_reference_point, *, jitter: float = DEFAULTS.JITTER, ): """ :param sample_size: The number of samples from model predicted distribution for each batch of points. :param reference_point_spec: this method is used to determine how the reference point is calculated. If a Callable function specified, it is expected to take existing posterior mean-based observations (to screen out the observation noise) and return a reference point with shape [D] (D represents number of objectives). If the Pareto front location is known, this arg can be used to specify a fixed reference point in each bo iteration. A dynamic reference point updating strategy is used by default to set a reference point according to the datasets. :param jitter: The size of the jitter to use when stabilising the Cholesky decomposition of the covariance matrix. :raise ValueError (or InvalidArgumentError): If ``sample_size`` is not positive, or ``jitter`` is negative. """ tf.debugging.assert_positive(sample_size) tf.debugging.assert_greater_equal(jitter, 0.0) self._sample_size = sample_size self._jitter = jitter if callable(reference_point_spec): self._ref_point_spec: tf.Tensor | Callable[..., TensorType] = reference_point_spec else: self._ref_point_spec = tf.convert_to_tensor(reference_point_spec) self._ref_point = None
[docs] def __repr__(self) -> str: """""" if callable(self._ref_point_spec): return ( f"BatchMonteCarloExpectedHypervolumeImprovement({self._sample_size!r}," f" {self._ref_point_spec.__name__}," f" jitter={self._jitter!r})" ) else: return ( f"BatchMonteCarloExpectedHypervolumeImprovement({self._sample_size!r}," f" {self._ref_point_spec!r}" f" jitter={self._jitter!r})" )
[docs] def prepare_acquisition_function( self, model: HasReparamSampler, dataset: Optional[Dataset] = None, ) -> AcquisitionFunction: """ :param model: The model. Must have event shape [1]. :param dataset: The data from the observer. Must be populated. :return: The batch expected hypervolume improvement acquisition function. """ tf.debugging.Assert(dataset is not None, [tf.constant([])]) dataset = cast(Dataset, dataset) tf.debugging.assert_positive(len(dataset), message="Dataset must be populated.") mean, _ = model.predict(dataset.query_points) if callable(self._ref_point_spec): self._ref_point = tf.cast(self._ref_point_spec(mean), dtype=mean.dtype) else: self._ref_point = tf.cast(self._ref_point_spec, dtype=mean.dtype) _pf = Pareto(mean) screened_front = _pf.front[tf.reduce_all(_pf.front <= self._ref_point, -1)] # prepare the partitioned bounds of non-dominated region for calculating of the # hypervolume improvement in this area _partition_bounds = prepare_default_non_dominated_partition_bounds( self._ref_point, screened_front ) if not isinstance(model, HasReparamSampler): raise ValueError( f"The batch Monte-Carlo expected hyper-volume improvement function only supports " f"models that implement a reparam_sampler method; received {model!r}" ) sampler = model.reparam_sampler(self._sample_size) return batch_ehvi(sampler, self._jitter, _partition_bounds)
[docs] def batch_ehvi( sampler: ReparametrizationSampler[HasReparamSampler], sampler_jitter: float, partition_bounds: tuple[TensorType, TensorType], ) -> AcquisitionFunction: """ :param sampler: The posterior sampler, which given query points `at`, is able to sample the possible observations at 'at'. :param sampler_jitter: The size of the jitter to use in sampler when stabilising the Cholesky decomposition of the covariance matrix. :param partition_bounds: with shape ([N, D], [N, D]), partitioned non-dominated hypercell bounds for hypervolume improvement calculation :return: The batch expected hypervolume improvement acquisition function for objective minimisation. """ def acquisition(at: TensorType) -> TensorType: _batch_size = at.shape[-2] # B def gen_q_subset_indices(q: int) -> tf.RaggedTensor: # generate all subsets of [1, ..., q] as indices indices = list(range(q)) return tf.ragged.constant([list(combinations(indices, i)) for i in range(1, q + 1)]) samples = sampler.sample(at, jitter=sampler_jitter) # [..., S, B, num_obj] q_subset_indices = gen_q_subset_indices(_batch_size) hv_contrib = tf.zeros(tf.shape(samples)[:-2], dtype=samples.dtype) lb_points, ub_points = partition_bounds def hv_contrib_on_samples( obj_samples: TensorType, ) -> TensorType: # calculate samples overlapped area's hvi for obj_samples # [..., S, Cq_j, j, num_obj] -> [..., S, Cq_j, num_obj] overlap_vertices = tf.reduce_max(obj_samples, axis=-2) overlap_vertices = tf.maximum( # compare overlap vertices and lower bound of each cell: tf.expand_dims(overlap_vertices, -3), # expand a cell dimension lb_points[tf.newaxis, tf.newaxis, :, tf.newaxis, :], ) # [..., S, K, Cq_j, num_obj] lengths_j = tf.maximum( # get hvi length per obj within each cell (ub_points[tf.newaxis, tf.newaxis, :, tf.newaxis, :] - overlap_vertices), 0.0 ) # [..., S, K, Cq_j, num_obj] areas_j = tf.reduce_sum( # sum over all subsets Cq_j -> [..., S, K] tf.reduce_prod(lengths_j, axis=-1), axis=-1 # calc hvi within each K ) return tf.reduce_sum(areas_j, axis=-1) # sum over cells -> [..., S] for j in tf.range(1, _batch_size + 1): # Inclusion-Exclusion loop q_choose_j = tf.gather(q_subset_indices, j - 1).to_tensor() # gather all combinations having j points from q batch points (Cq_j) j_sub_samples = tf.gather(samples, q_choose_j, axis=-2) # [..., S, Cq_j, j, num_obj] hv_contrib += tf.cast((-1) ** (j + 1), dtype=samples.dtype) * hv_contrib_on_samples( j_sub_samples ) return tf.reduce_mean(hv_contrib, axis=-1, keepdims=True) # average through MC return acquisition
[docs] class ExpectedConstrainedHypervolumeImprovement( ExpectedConstrainedImprovement[ProbabilisticModelType] ): """ Builder for the constrained expected hypervolume improvement acquisition function. This function essentially combines ExpectedConstrainedImprovement and ExpectedHypervolumeImprovement. """ def __init__( self, objective_tag: Tag, constraint_builder: AcquisitionFunctionBuilder[ProbabilisticModelType], min_feasibility_probability: float | TensorType = 0.5, reference_point_spec: ( Sequence[float] | TensorType | Callable[..., TensorType] ) = get_reference_point, ): """ :param objective_tag: The tag for the objective data and model. :param constraint_builder: The builder for the constraint function. :param min_feasibility_probability: The minimum probability of feasibility for a "best point" to be considered feasible. :param reference_point_spec: this method is used to determine how the reference point is calculated. If a Callable function specified, it is expected to take existing posterior mean-based feasible observations (to screen out the observation noise) and return a reference point with shape [D] (D represents number of objectives). If the feasible Pareto front location is known, this arg can be used to specify a fixed reference point in each bo iteration. A dynamic reference point updating strategy is used by default to set a reference point according to the datasets. """ super().__init__(objective_tag, constraint_builder, min_feasibility_probability) if callable(reference_point_spec): self._ref_point_spec: tf.Tensor | Callable[..., TensorType] = reference_point_spec else: self._ref_point_spec = tf.convert_to_tensor(reference_point_spec) self._ref_point = None def __repr__(self) -> str: """""" if callable(self._ref_point_spec): return ( f"ExpectedConstrainedHypervolumeImprovement({self._objective_tag!r}," f" {self._constraint_builder!r}, {self._min_feasibility_probability!r}," f" {self._ref_point_spec.__name__})" ) else: return ( f"ExpectedConstrainedHypervolumeImprovement({self._objective_tag!r}, " f" {self._constraint_builder!r}, {self._min_feasibility_probability!r}," f" ref_point_specification={repr(self._ref_point_spec)!r}" )
[docs] def _update_expected_improvement_fn( self, objective_model: ProbabilisticModelType, feasible_mean: TensorType ) -> None: """ Set or update the unconstrained expected improvement function. :param objective_model: The objective model. :param feasible_mean: The mean of the feasible query points. """ if callable(self._ref_point_spec): self._ref_point = tf.cast( self._ref_point_spec(feasible_mean), dtype=feasible_mean.dtype, ) else: self._ref_point = tf.cast(self._ref_point_spec, dtype=feasible_mean.dtype) _pf = Pareto(feasible_mean) screened_front = _pf.front[tf.reduce_all(_pf.front <= self._ref_point, -1)] # prepare the partitioned bounds of non-dominated region for calculating of the # hypervolume improvement in this area _partition_bounds = prepare_default_non_dominated_partition_bounds( self._ref_point, screened_front, ) self._expected_improvement_fn: Optional[AcquisitionFunction] if self._expected_improvement_fn is None: self._expected_improvement_fn = expected_hv_improvement( objective_model, _partition_bounds ) else: tf.debugging.Assert( isinstance(self._expected_improvement_fn, expected_hv_improvement), [] ) self._expected_improvement_fn.update(_partition_bounds) # type: ignore
[docs] class HIPPO(GreedyAcquisitionFunctionBuilder[ProbabilisticModelType]): r""" HIPPO: HIghly Parallelizable Pareto Optimization Builder of the acquisition function for greedily collecting batches by HIPPO penalization in multi-objective optimization by penalizing batch points by their distance in the objective space. The resulting acquistion function takes in a set of pending points and returns a base multi-objective acquisition function penalized around those points. Penalization is applied to the acquisition function multiplicatively. However, to improve numerical stability, we perform additive penalization in a log space. """ def __init__( self, objective_tag: Tag = OBJECTIVE, base_acquisition_function_builder: ( AcquisitionFunctionBuilder[ProbabilisticModelType] | SingleModelAcquisitionBuilder[ProbabilisticModelType] | None ) = None, ): """ Initializes the HIPPO acquisition function builder. :param objective_tag: The tag for the objective data and model. :param base_acquisition_function_builder: Base acquisition function to be penalized. Defaults to Expected Hypervolume Improvement, also supports its constrained version. """ self._objective_tag = objective_tag if base_acquisition_function_builder is None: self._base_builder: AcquisitionFunctionBuilder[ProbabilisticModelType] = ( ExpectedHypervolumeImprovement().using(self._objective_tag) ) else: if isinstance(base_acquisition_function_builder, SingleModelAcquisitionBuilder): self._base_builder = base_acquisition_function_builder.using(self._objective_tag) else: self._base_builder = base_acquisition_function_builder self._base_acquisition_function: Optional[AcquisitionFunction] = None self._penalization: Optional[PenalizationFunction] = None self._penalized_acquisition: Optional[AcquisitionFunction] = None
[docs] def prepare_acquisition_function( self, models: Mapping[Tag, ProbabilisticModelType], datasets: Optional[Mapping[Tag, Dataset]] = None, pending_points: Optional[TensorType] = None, ) -> AcquisitionFunction: """ Creates a new instance of the acquisition function. :param models: The models. :param datasets: The data from the observer. Must be populated. :param pending_points: The points we penalize with respect to. :return: The HIPPO acquisition function. :raise tf.errors.InvalidArgumentError: If the ``dataset`` is empty. """ tf.debugging.Assert(datasets is not None, [tf.constant([])]) datasets = cast(Mapping[Tag, Dataset], datasets) tf.debugging.Assert(datasets[self._objective_tag] is not None, [tf.constant([])]) tf.debugging.assert_positive( len(datasets[self._objective_tag]), message=f"{self._objective_tag} dataset must be populated.", ) acq = self._update_base_acquisition_function(models, datasets) if pending_points is not None and len(pending_points) != 0: acq = self._update_penalization(acq, models[self._objective_tag], pending_points) return acq
[docs] def update_acquisition_function( self, function: AcquisitionFunction, models: Mapping[Tag, ProbabilisticModelType], datasets: Optional[Mapping[Tag, Dataset]] = None, pending_points: Optional[TensorType] = None, new_optimization_step: bool = True, ) -> AcquisitionFunction: """ Updates the acquisition function. :param function: The acquisition function to update. :param models: The models. :param datasets: The data from the observer. Must be populated. :param pending_points: Points already chosen to be in the current batch (of shape [M,D]), where M is the number of pending points and D is the search space dimension. :param new_optimization_step: Indicates whether this call to update_acquisition_function is to start of a new optimization step, of to continue collecting batch of points for the current step. Defaults to ``True``. :return: The updated acquisition function. """ tf.debugging.Assert(datasets is not None, [tf.constant([])]) datasets = cast(Mapping[Tag, Dataset], datasets) tf.debugging.Assert(datasets[self._objective_tag] is not None, [tf.constant([])]) tf.debugging.assert_positive( len(datasets[self._objective_tag]), message=f"{self._objective_tag} dataset must be populated.", ) tf.debugging.Assert(self._base_acquisition_function is not None, [tf.constant([])]) if new_optimization_step: self._update_base_acquisition_function(models, datasets) if pending_points is None or len(pending_points) == 0: # no penalization required if no pending_points return cast(AcquisitionFunction, self._base_acquisition_function) return self._update_penalization(function, models[self._objective_tag], pending_points)
def _update_penalization( self, function: AcquisitionFunction, model: ProbabilisticModel, pending_points: Optional[TensorType] = None, ) -> AcquisitionFunction: tf.debugging.assert_rank(pending_points, 2) if self._penalized_acquisition is not None and isinstance( self._penalization, hippo_penalizer ): # if possible, just update the penalization function variables # (the type ignore is due to mypy getting confused by tf.function) self._penalization.update(pending_points) # type: ignore[unreachable] return self._penalized_acquisition else: # otherwise construct a new penalized acquisition function self._penalization = hippo_penalizer(model, pending_points) @tf.function def penalized_acquisition(x: TensorType) -> TensorType: log_acq = tf.math.log( cast(AcquisitionFunction, self._base_acquisition_function)(x) ) + tf.math.log(cast(PenalizationFunction, self._penalization)(x)) return tf.math.exp(log_acq) self._penalized_acquisition = penalized_acquisition return penalized_acquisition def _update_base_acquisition_function( self, models: Mapping[Tag, ProbabilisticModelType], datasets: Optional[Mapping[Tag, Dataset]] = None, ) -> AcquisitionFunction: if self._base_acquisition_function is None: self._base_acquisition_function = self._base_builder.prepare_acquisition_function( models, datasets ) else: self._base_acquisition_function = self._base_builder.update_acquisition_function( self._base_acquisition_function, models, datasets ) return self._base_acquisition_function
[docs] class hippo_penalizer: r""" Returns the penalization function used for multi-objective greedy batch Bayesian optimization. A candidate point :math:`x` is penalized based on the Mahalanobis distance to a given pending point :math:`p_i`. Since we assume objectives to be independent, the Mahalanobis distance between these points becomes a Eucledian distance normalized by standard deviation. Penalties for multiple pending points are multiplied, and the resulting quantity is warped with the arctan function to :math:`[0, 1]` interval. :param model: The model over the specified ``dataset``. :param pending_points: The points we penalize with respect to. :return: The penalization function. This function will raise :exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size greater than one. """ def __init__(self, model: ProbabilisticModel, pending_points: TensorType): """Initialize the MO penalizer. :param model: The model. :param pending_points: The points we penalize with respect to. :raise ValueError: If pending points are empty or None. :return: The penalization function. This function will raise :exc:`ValueError` or :exc:`~tf.errors.InvalidArgumentError` if used with a batch size greater than one.""" tf.debugging.Assert( pending_points is not None and len(pending_points) != 0, [tf.constant([])] ) self._model = model self._pending_points = tf.Variable(pending_points, shape=[None, *pending_points.shape[1:]]) pending_means, pending_vars = self._model.predict(self._pending_points) self._pending_means = tf.Variable(pending_means, shape=[None, *pending_means.shape[1:]]) self._pending_vars = tf.Variable(pending_vars, shape=[None, *pending_vars.shape[1:]])
[docs] def update(self, pending_points: TensorType) -> None: """Update the penalizer with new pending points.""" tf.debugging.Assert( pending_points is not None and len(pending_points) != 0, [tf.constant([])] ) self._pending_points.assign(pending_points) pending_means, pending_vars = self._model.predict(self._pending_points) self._pending_means.assign(pending_means) self._pending_vars.assign(pending_vars)
@tf.function def __call__(self, x: TensorType) -> TensorType: tf.debugging.assert_shapes( [(x, [..., 1, None])], message="This penalization function cannot be calculated for batches of points.", ) # x is [N, 1, D] x = tf.squeeze(x, axis=1) # x is now [N, D] x_means, x_vars = self._model.predict(x) # x_means is [N, K], x_vars is [N, K] # where K is the number of models/objectives # self._pending_points is [B, D] where B is the size of the batch collected so far tf.debugging.assert_shapes( [ (x, ["N", "D"]), (self._pending_points, ["B", "D"]), (self._pending_means, ["B", "K"]), (self._pending_vars, ["B", "K"]), (x_means, ["N", "K"]), (x_vars, ["N", "K"]), ], message="""Encountered unexpected shapes while calculating mean and variance of given point x and pending points""", ) x_means_expanded = x_means[:, None, :] pending_means_expanded = self._pending_means[None, :, :] pending_vars_expanded = self._pending_vars[None, :, :] pending_stddevs_expanded = tf.sqrt(pending_vars_expanded) # this computes Mahalanobis distance between x and pending points # since we assume objectives to be independent # it reduces to regular Eucledian distance normalized by standard deviation standardize_mean_diff = ( tf.abs(x_means_expanded - pending_means_expanded) / pending_stddevs_expanded ) # [N, B, K] d = tf.norm(standardize_mean_diff, axis=-1) # [N, B] # warp the distance so that resulting value is from 0 to (nearly) 1 warped_d = (2.0 / math.pi) * tf.math.atan(d) penalty = tf.reduce_prod(warped_d, axis=-1) # [N,] return tf.reshape(penalty, (-1, 1))