# Copyright 2020 The Trieste Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module contains functions of different methods for
partitioning the dominated/non-dominated region in multi-objective optimization problems."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Optional
import tensorflow as tf
from ...types import TensorType
from ...utils.misc import DEFAULTS
from .dominance import non_dominated
[docs]def prepare_default_non_dominated_partition_bounds(
reference: TensorType,
observations: Optional[TensorType] = None,
anti_reference: Optional[TensorType] = None,
) -> tuple[TensorType, TensorType]:
"""
Prepare the default non-dominated partition boundary for acquisition function usage.
This functionality will trigger different partition according to objective numbers, if
objective number is 2, an `ExactPartition2dNonDominated` will be used. If the objective
number is larger than 2, a `DividedAndConquerNonDominated` will be used.
:param observations: The observations for all objectives, with shape [N, D], if not specified
or is an empty Tensor, a single non-dominated partition bounds constructed by reference
and anti_reference point will be returned.
:param anti_reference: a worst point to use with shape [D].
Defines the lower bound of the hypercell. If not specified, will use a default value:
-[1e10] * D.
:param reference: a reference point to use, with shape [D].
Defines the upper bound of the hypervolume.
Should be equal to or bigger than the anti-ideal point of the Pareto set.
For comparing results across runs, the same reference point must be used.
:return: lower, upper bounds of the partitioned cell, each with shape [N, D]
:raise ValueError (or `tf.errors.InvalidArgumentError`): If ``reference`` has an invalid
shape.
:raise ValueError (or `tf.errors.InvalidArgumentError`): If ``anti_reference`` has an invalid
shape.
"""
def is_empty_obs(obs: Optional[TensorType]) -> bool:
return obs is None or tf.equal(tf.size(observations), 0)
def specify_default_anti_reference_point(
ref: TensorType, obs: Optional[TensorType]
) -> TensorType:
anti_ref = -1e10 * tf.ones(shape=(tf.shape(reference)), dtype=reference.dtype)
tf.debugging.assert_greater_equal(
ref,
anti_ref,
message=f"reference point: {ref} containing at least one value below default "
"anti-reference point ([-1e10, ..., -1e10]), try specify a lower "
"anti-reference point.",
)
if not is_empty_obs(obs): # make sure given (valid) observations are larger than -1e10
tf.debugging.assert_greater_equal(
obs,
anti_ref,
message=f"observations: {obs} containing at least one value below default "
"anti-reference point ([-1e10, ..., -1e10]), try specify a lower "
"anti-reference point.",
)
return anti_ref
tf.debugging.assert_shapes([(reference, ["D"])])
if anti_reference is None:
# if anti_reference point is not specified, use a -1e10 as default (act as -inf)
anti_reference = specify_default_anti_reference_point(reference, observations)
else:
# anti_reference point is specified
tf.debugging.assert_shapes([(anti_reference, ["D"])])
if is_empty_obs(observations): # if no valid observations
assert tf.reduce_all(tf.less_equal(anti_reference, reference)), ValueError(
f"anti_reference point: {anti_reference} contains at least one value larger "
f"than reference point: {reference}"
)
return tf.expand_dims(anti_reference, 0), tf.expand_dims(reference, 0)
elif tf.shape(observations)[-1] > 2:
return DividedAndConquerNonDominated(observations).partition_bounds(
anti_reference, reference
)
else:
return ExactPartition2dNonDominated(observations).partition_bounds(
anti_reference, reference
)
@dataclass(frozen=True)
class _BoundedVolumes:
# stores the index of the Pareto front to form lower and upper
# bounds of the pseudo cells decomposition.
# the lowerbounds index of the volumes
lower_idx: TensorType
# the upperbounds index of the volumes
upper_idx: TensorType
def __post_init__(self) -> None:
tf.debugging.assert_shapes([(self.lower_idx, ["N", "D"]), (self.upper_idx, ["N", "D"])])
[docs]class _BoundIndexPartition:
"""
A collection of partition strategies that are based on storing the index of pareto fronts
& other auxiliary points
"""
front: TensorType
_bounds: _BoundedVolumes
def __new__(cls, *args: Any, **kwargs: Any) -> Any:
if cls is _BoundIndexPartition:
raise TypeError("BoundIndexPartition may not be instantiated directly")
return object.__new__(cls)
[docs] def partition_bounds(
self, anti_reference: TensorType, reference: TensorType
) -> tuple[TensorType, TensorType]:
"""
Get the partitioned hypercell's lower and upper bounds.
:param anti_reference: a worst point to use with shape [D].
Defines the lower bound of the hypercell
:param reference: a reference point to use, with shape [D].
Defines the upper bound of the hypervolume.
Should be equal to or bigger than the anti-ideal point of the Pareto set.
For comparing results across runs, the same reference point must be used.
:return: lower, upper bounds of the partitioned cell, each with shape [N, D]
:raise ValueError (or `tf.errors.InvalidArgumentError`): If ``reference`` has an invalid
shape.
"""
tf.debugging.assert_greater_equal(reference, self.front)
tf.debugging.assert_greater_equal(self.front, anti_reference)
tf.debugging.assert_type(anti_reference, self.front.dtype)
tf.debugging.assert_type(reference, self.front.dtype)
tf.debugging.assert_shapes(
[
(self._bounds.lower_idx, ["N", "D"]),
(self._bounds.upper_idx, ["N", "D"]),
(self.front, ["M", "D"]),
(reference, ["D"]),
(anti_reference, ["D"]),
]
)
# concatenate the pseudo front to have the same corresponding of bound index
pseudo_pfront = tf.concat((anti_reference[None], self.front, reference[None]), axis=0)
N = tf.shape(self._bounds.upper_idx)[0]
D = tf.shape(self._bounds.upper_idx)[1]
idx = tf.tile(tf.range(D), (N,))
lower_idx = tf.stack((tf.reshape(self._bounds.lower_idx, [-1]), idx), axis=1)
upper_idx = tf.stack((tf.reshape(self._bounds.upper_idx, [-1]), idx), axis=1)
lower = tf.reshape(tf.gather_nd(pseudo_pfront, lower_idx), [N, D])
upper = tf.reshape(tf.gather_nd(pseudo_pfront, upper_idx), [N, D])
return lower, upper
[docs]class ExactPartition2dNonDominated(_BoundIndexPartition):
"""
Exact partition of non-dominated space, used as a default option when the
objective number equals 2.
"""
def __init__(self, front: TensorType):
"""
:param front: non-dominated pareto front.
"""
tf.debugging.assert_equal(
tf.reduce_all(non_dominated(front)[1]),
True,
message=f"\ninput {front} contains dominated points",
)
self.front = tf.gather_nd(front, tf.argsort(front[:, :1], axis=0)) # sort input front
self._bounds = self._get_bound_index()
def _get_bound_index(self) -> _BoundedVolumes:
# Compute the cells covering the non-dominated region for 2 dimension case
# this assumes the Pareto set has been sorted in ascending order on the first
# objective, which implies the second objective is sorted in descending order
len_front, number_of_objectives = self.front.shape
pseudo_front_idx = tf.concat(
[
tf.zeros([1, number_of_objectives], dtype=tf.int32),
tf.argsort(self.front, axis=0) + 1,
tf.ones([1, number_of_objectives], dtype=tf.int32) * len_front + 1,
],
axis=0,
)
range_ = tf.range(len_front + 1)[:, None]
lower_result = tf.concat([range_, tf.zeros_like(range_)], axis=-1)
upper_result = tf.concat(
[range_ + 1, pseudo_front_idx[::-1, 1:][: pseudo_front_idx[-1, 0]]], axis=-1
)
return _BoundedVolumes(lower_result, upper_result)
[docs]class DividedAndConquerNonDominated(_BoundIndexPartition):
"""
branch and bound procedure algorithm. a divide and conquer method introduced
in :cite:`Couckuyt2012`.
"""
def __init__(self, front: TensorType, threshold: TensorType | float = 0):
"""
:param front: non-dominated pareto front.
:param threshold: a threshold used to screen out cells in partition : when its volume is
below this threshold, its rejected directly in order to be more computationally
efficient, if setting above 0, this partition strategy tends to return an
approximated partition.
"""
tf.debugging.assert_equal(
tf.reduce_all(non_dominated(front)[1]),
True,
message=f"\ninput {front} contains dominated points",
)
self.front = tf.gather_nd(front, tf.argsort(front[:, :1], axis=0)) # sort
self.front = front
self._bounds = self._get_bound_index(threshold)
def _get_bound_index(self, threshold: TensorType | float = 0) -> _BoundedVolumes:
len_front, number_of_objectives = self.front.shape
lower_result = tf.zeros([0, number_of_objectives], dtype=tf.int32)
upper_result = tf.zeros([0, number_of_objectives], dtype=tf.int32)
min_front = tf.reduce_min(self.front, axis=0, keepdims=True) - 1
max_front = tf.reduce_max(self.front, axis=0, keepdims=True) + 1
pseudo_front = tf.concat([min_front, self.front, max_front], axis=0)
pseudo_front_idx = tf.concat(
[
tf.zeros([1, number_of_objectives], dtype=tf.int32),
tf.argsort(self.front, axis=0)
+ 1, # +1 as index zero is reserved for the ideal point
tf.ones([1, number_of_objectives], dtype=tf.int32) * len_front + 1,
],
axis=0,
)
divide_conquer_cells = tf.stack(
[
tf.zeros(number_of_objectives, dtype=tf.int32),
(int(pseudo_front_idx.shape[0]) - 1)
* tf.ones(number_of_objectives, dtype=tf.int32),
],
axis=0,
)[None]
total_size = tf.reduce_prod(max_front - min_front)
def while_body(
divide_conquer_cells: TensorType,
lower_result: TensorType,
upper_result: TensorType,
) -> tuple[TensorType, TensorType, TensorType]:
divide_conquer_cells_unstacked = tf.unstack(divide_conquer_cells, axis=0)
cell = divide_conquer_cells_unstacked[-1]
divide_conquer_cells_new = tf.cond(
tf.not_equal(tf.size(divide_conquer_cells_unstacked[:-1]), 0),
lambda: tf.stack(divide_conquer_cells_unstacked[:-1]),
lambda: tf.zeros([0, 2, number_of_objectives], dtype=tf.int32),
)
arr = tf.range(number_of_objectives)
lower_idx = tf.gather_nd(pseudo_front_idx, tf.stack((cell[0], arr), -1))
upper_idx = tf.gather_nd(pseudo_front_idx, tf.stack((cell[1], arr), -1))
lower = tf.gather_nd(pseudo_front, tf.stack((lower_idx, arr), -1))
upper = tf.gather_nd(pseudo_front, tf.stack((upper_idx, arr), -1))
test_accepted = self._is_test_required((upper - DEFAULTS.JITTER) < self.front)
lower_result_final, upper_result_final = tf.cond(
test_accepted,
lambda: self._accepted_test_body(lower_result, upper_result, lower_idx, upper_idx),
lambda: (lower_result, upper_result),
)
test_rejected = self._is_test_required((lower + DEFAULTS.JITTER) < self.front)
divide_conquer_cells_final = tf.cond(
tf.logical_and(test_rejected, tf.logical_not(test_accepted)),
lambda: self._rejected_test_body(
cell, lower, upper, divide_conquer_cells_new, total_size, threshold
),
lambda: divide_conquer_cells_new,
)
return divide_conquer_cells_final, lower_result_final, upper_result_final
_, lower_result_final, upper_result_final = tf.while_loop(
lambda divide_conquer_cells, lower_result, upper_result: len(divide_conquer_cells) > 0,
while_body,
loop_vars=[divide_conquer_cells, lower_result, upper_result],
shape_invariants=[
tf.TensorShape([None, 2, number_of_objectives]),
tf.TensorShape([None, number_of_objectives]),
tf.TensorShape([None, number_of_objectives]),
],
)
return _BoundedVolumes(lower_result_final, upper_result_final)
@staticmethod
def _is_test_required(smaller: TensorType) -> TensorType:
idx_dom_augm = tf.reduce_any(smaller, axis=1)
is_dom_augm = tf.reduce_all(idx_dom_augm)
return is_dom_augm
@staticmethod
def _accepted_test_body(
lower_result: TensorType,
upper_result: TensorType,
lower_idx: TensorType,
upper_idx: TensorType,
) -> tuple[TensorType, TensorType]:
lower_result_accepted = tf.concat([lower_result, lower_idx[None]], axis=0)
upper_result_accepted = tf.concat([upper_result, upper_idx[None]], axis=0)
return lower_result_accepted, upper_result_accepted
@classmethod
def _rejected_test_body(
cls,
cell: TensorType,
lower: TensorType,
upper: TensorType,
divide_conquer_cells: TensorType,
total_size: TensorType,
threshold: TensorType,
) -> TensorType:
divide_conquer_cells_dist = cell[1] - cell[0]
hc_size = tf.math.reduce_prod(upper - lower, axis=0, keepdims=True)
not_unit_cell = tf.reduce_any(divide_conquer_cells_dist > 1)
vol_above_thresh = tf.reduce_all((hc_size[0] / total_size) > threshold)
divide_conquer_cells_rejected = tf.cond(
tf.logical_and(not_unit_cell, vol_above_thresh),
lambda: cls._divide_body(divide_conquer_cells, divide_conquer_cells_dist, cell),
lambda: tf.identity(divide_conquer_cells),
)
return divide_conquer_cells_rejected
@staticmethod
def _divide_body(
divide_conquer_cells: TensorType,
divide_conquer_cells_dist: TensorType,
cell: TensorType,
) -> TensorType:
edge_size = tf.reduce_max(divide_conquer_cells_dist)
idx = tf.argmax(divide_conquer_cells_dist)
edge_size1 = int(tf.round(tf.cast(edge_size, dtype=tf.float32) / 2.0))
edge_size2 = int(edge_size - edge_size1)
sparse_edge_size1 = tf.concat(
[tf.zeros([idx]), edge_size1 * tf.ones([1]), tf.zeros([len(cell[1]) - idx - 1])], axis=0
)
upper = tf.identity(cell[1]) - tf.cast(sparse_edge_size1, dtype=tf.int32)
divide_conquer_cells_new = tf.concat(
[divide_conquer_cells, tf.stack([tf.identity(cell[0]), upper], axis=0)[None]], axis=0
)
sparse_edge_size2 = tf.concat(
[tf.zeros([idx]), edge_size2 * tf.ones([1]), tf.zeros([len(cell[1]) - idx - 1])], axis=0
)
lower = tf.identity(cell[0]) + tf.cast(sparse_edge_size2, dtype=tf.int32)
divide_conquer_cells_final = tf.concat(
[divide_conquer_cells_new, tf.stack([lower, tf.identity(cell[1])], axis=0)[None]],
axis=0,
)
return divide_conquer_cells_final