Source code for trieste.space

# Copyright 2020 The Trieste Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module contains implementations of various types of search space."""
from __future__ import annotations

import operator
from abc import ABC, abstractmethod
from collections import Counter
from dataclasses import dataclass, field
from functools import reduce
from itertools import chain
from itertools import product as itertools_product
from typing import (
    Callable,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    TypeVar,
    Union,
    overload,
)

import gpflow.kernels
import numpy as np
import scipy.optimize as spo
import tensorflow as tf
import tensorflow_probability as tfp
from check_shapes import check_shapes
from gpflow.keras import tf_keras
from gpflow.kernels import ActivityCondition
from typing_extensions import Protocol, runtime_checkable

from .types import TensorType
from .utils import flatten_leading_dims

[docs] SearchSpaceType = TypeVar("SearchSpaceType", bound="SearchSpace")
""" A type variable bound to :class:`SearchSpace`. """
[docs] DEFAULT_DTYPE: tf.DType = tf.float64
""" Default dtype to use when none is provided. """
[docs] EncoderFunction = Callable[[TensorType], TensorType]
""" Type alias for point encoders. These transform points from one search space to another. """
[docs] class SampleTimeoutError(Exception): """Raised when sampling from a search space has timed out."""
[docs] class NonlinearConstraint(spo.NonlinearConstraint): # type: ignore[misc] """ A wrapper class for nonlinear constraints on variables. The constraints expression is of the form:: lb <= fun(x) <= ub :param fun: The function defining the nonlinear constraints; with input shape [..., D] and output shape [..., 1], returning a scalar value for each input point. :param lb: The lower bound of the constraint. Should be a scalar or of shape [1]. :param ub: The upper bound of the constraint. Should be a scalar or of shape [1]. :param keep_feasible: Keep the constraints feasible throughout optimization iterations if this is `True`. """ def __init__( self, fun: Callable[[TensorType], TensorType], lb: Sequence[float] | TensorType, ub: Sequence[float] | TensorType, keep_feasible: bool = False, ): # Implement caching to avoid calling the constraint function multiple times to get value # and gradient. def _constraint_value_and_gradient(x: TensorType) -> Tuple[TensorType, TensorType]: val, grad = tfp.math.value_and_gradient(fun, x) tf.debugging.assert_shapes( [(val, [..., 1])], message="Nonlinear constraint only supports single output function.", ) return tf.cast(val, dtype=x.dtype), tf.cast(grad, dtype=x.dtype) cache_x: TensorType = tf.constant([]) cache_f: TensorType = tf.constant([]) cache_df_dx: TensorType = tf.constant([]) def val_fun(x: TensorType) -> TensorType: nonlocal cache_x, cache_f, cache_df_dx if not np.array_equal(x, cache_x): cache_f, cache_df_dx = _constraint_value_and_gradient(x) cache_x = x return cache_f def jac_fun(x: TensorType) -> TensorType: nonlocal cache_x, cache_f, cache_df_dx if not np.array_equal(x, cache_x): cache_f, cache_df_dx = _constraint_value_and_gradient(x) cache_x = x return cache_df_dx self._orig_fun = fun # Used for constraints equality check. super().__init__(val_fun, lb, ub, jac=jac_fun, keep_feasible=keep_feasible)
[docs] def residual(self, points: TensorType) -> TensorType: """ Calculate the residuals between the constraint function and its lower/upper limits. :param points: The points to calculate the residuals for, with shape [..., D]. :return: A tensor containing the lower and upper residual values with shape [..., 2]. """ tf.debugging.assert_rank_at_least(points, 2) non_d_axes = np.ones_like(points.shape)[:-1] # Avoid adding axes shape to static graph. lb = tf.cast(tf.reshape(self.lb, (*non_d_axes, -1)), dtype=points.dtype) ub = tf.cast(tf.reshape(self.ub, (*non_d_axes, -1)), dtype=points.dtype) fval = self.fun(points) fval = tf.reshape(fval, (*points.shape[:-1], -1)) # Atleast 2D. fval = tf.cast(fval, dtype=points.dtype) values = [fval - lb, ub - fval] values = tf.concat(values, axis=-1) return values
def __repr__(self) -> str: """""" return f""" NonlinearConstraint({self.fun!r}, {self.lb!r}, {self.ub!r}, {self.keep_feasible!r})" """
[docs] def __eq__(self, other: object) -> bool: """ :param other: A constraint. :return: Whether the constraint is identical to this one. """ if not isinstance(other, NonlinearConstraint): return False return bool( self._orig_fun == other._orig_fun and tf.reduce_all(self.lb == other.lb) and tf.reduce_all(self.ub == other.ub) and self.keep_feasible == other.keep_feasible )
[docs] class LinearConstraint(spo.LinearConstraint): # type: ignore[misc] """ A wrapper class for linear constraints on variables. The constraints expression is of the form:: lb <= A @ x <= ub :param A: The matrix defining the linear constraints with shape [M, D], where M is the number of constraints. :param lb: The lower bound of the constraint. Should be a scalar or of shape [M]. :param ub: The upper bound of the constraint. Should be a scalar or of shape [M]. :param keep_feasible: Keep the constraints feasible throughout optimization iterations if this is `True`. """ def __init__( self, A: TensorType, lb: Sequence[float] | TensorType, ub: Sequence[float] | TensorType, keep_feasible: bool = False, ): super().__init__(A, lb, ub, keep_feasible=keep_feasible)
[docs] def residual(self, points: TensorType) -> TensorType: """ Calculate the residuals between the constraint function and its lower/upper limits. :param points: The points to calculate the residuals for, with shape [..., D]. :return: A tensor containing the lower and upper residual values with shape [..., M*2]. """ tf.debugging.assert_rank_at_least(points, 2) non_d_axes = np.ones_like(points.shape)[:-1] # Avoid adding axes shape to static graph. lb = tf.cast(tf.reshape(self.lb, (*non_d_axes, -1)), dtype=points.dtype) ub = tf.cast(tf.reshape(self.ub, (*non_d_axes, -1)), dtype=points.dtype) A = tf.cast(self.A, dtype=points.dtype) fval = tf.linalg.matmul(points, A, transpose_b=True) fval = tf.reshape(fval, (*points.shape[:-1], -1)) # Atleast 2D. values = [fval - lb, ub - fval] values = tf.concat(values, axis=-1) return values
def __repr__(self) -> str: """""" return f""" LinearConstraint({self.A!r}, {self.lb!r}, {self.ub!r}, {self.keep_feasible!r})" """
[docs] def __eq__(self, other: object) -> bool: """ :param other: A constraint. :return: Whether the constraint is identical to this one. """ if not isinstance(other, LinearConstraint): return False return bool( tf.reduce_all(self.A == other.A) and tf.reduce_all(self.lb == other.lb) and tf.reduce_all(self.ub == other.ub) and tf.reduce_all(self.keep_feasible == other.keep_feasible) )
[docs] Constraint = Union[LinearConstraint, NonlinearConstraint]
""" Type alias for constraints. """
[docs] def _embed_constraint( constraint: Constraint, offset: int, part_dim: int, combined_dim: int ) -> Constraint: """Re-express a constraint defined on a ``part_dim``-wide sub-vector so it applies to a wider ``combined_dim``-wide flat vector, with the sub-vector occupying columns ``[offset, offset + part_dim)`` and all other columns ignored. Used to carry a search space's (positional) constraints through a Cartesian product, where the space's columns are relocated to a contiguous block of the combined flat vector. :param constraint: A :class:`LinearConstraint` or :class:`NonlinearConstraint` over the ``part_dim``-wide sub-vector. :param offset: Index of the sub-vector's first column in the combined vector. :param part_dim: Width of the sub-vector the constraint was defined on. :param combined_dim: Width of the combined vector. :return: An equivalent constraint over the combined vector. :raises NotImplementedError: For an unsupported constraint type. """ if isinstance(constraint, LinearConstraint): A = tf.convert_to_tensor(constraint.A) if int(A.shape[-1]) != part_dim: raise ValueError( f"LinearConstraint matrix A has {int(A.shape[-1])} columns but the space it " f"constrains has dimension {part_dim}; global constraints must span the full " f"flat vector." ) padded = tf.pad(A, [[0, 0], [offset, combined_dim - offset - part_dim]]) return LinearConstraint(padded, constraint.lb, constraint.ub, constraint.keep_feasible) if isinstance(constraint, NonlinearConstraint): orig_fun = constraint._orig_fun def shifted_fun( x: TensorType, _fun: Callable[[TensorType], TensorType] = orig_fun, _o: int = offset, _w: int = part_dim, ) -> TensorType: return _fun(x[..., _o : _o + _w]) return NonlinearConstraint( shifted_fun, constraint.lb, constraint.ub, constraint.keep_feasible ) raise NotImplementedError( f"Cannot embed constraint of type {type(constraint).__name__} into a wider space." )
[docs] class SearchSpace(ABC): """ A :class:`SearchSpace` represents the domain over which an objective function is optimized. """ @abstractmethod
[docs] def sample(self, num_samples: int, seed: Optional[int] = None) -> TensorType: """ :param num_samples: The number of points to sample from this search space. :param seed: Random seed for reproducibility. :return: ``num_samples`` i.i.d. random points, sampled uniformly from this search space. """
[docs] def contains(self, value: TensorType) -> TensorType: """Method for checking membership. :param value: A point or points to check for membership of this :class:`SearchSpace`. :return: A boolean array showing membership for each point in value. :raise ValueError (or tf.errors.InvalidArgumentError): If ``value`` has a different dimensionality points from this :class:`SearchSpace`. """ tf.debugging.assert_equal( tf.rank(value) > 0 and tf.shape(value)[-1] == self.dimension, True, message=f""" Dimensionality mismatch: space is {self.dimension}, value is {tf.shape(value)[-1]} """, ) return self._contains(value)
@abstractmethod
[docs] def _contains(self, value: TensorType) -> TensorType: """Space-specific implementation of membership. Can assume valid input shape. :param value: A point or points to check for membership of this :class:`SearchSpace`. :return: A boolean array showing membership for each point in value. """
[docs] def __contains__(self, value: TensorType) -> bool: """Method called by `in` operator. Doesn't support broadcasting as Python insists on converting the result to a boolean. :param value: A single point to check for membership of this :class:`SearchSpace`. :return: `True` if ``value`` is a member of this search space, else `False`. :raise ValueError (or tf.errors.InvalidArgumentError): If ``value`` has a different dimensionality from this :class:`SearchSpace`. """ tf.debugging.assert_equal( tf.rank(value) == 1, True, message=f""" Rank mismatch: expected 1, got {tf.rank(value)}. To get a tensor of boolean membership values from a tensor of points, use `space.contains(value)` rather than `value in space`. """, ) return self.contains(value)
@property @abstractmethod
[docs] def dimension(self) -> TensorType: """The number of inputs in this search space."""
@property @abstractmethod
[docs] def has_bounds(self) -> bool: """Whether the search space has meaningful numerical bounds."""
@property @abstractmethod
[docs] def lower(self) -> TensorType: """The lowest value taken by each search space dimension."""
@property @abstractmethod
[docs] def upper(self) -> TensorType: """The highest value taken by each search space dimension."""
@abstractmethod
[docs] def product(self: SearchSpaceType, other: SearchSpaceType) -> SearchSpaceType: """ :param other: A search space of the same type as this search space. :return: The Cartesian product of this search space with the ``other``. Subclasses that do not support a same-type product should raise :exc:`NotImplementedError`; :meth:`__mul__` then falls back to a :class:`TaggedProductSearchSpace`. """
@overload
[docs] def __mul__(self: SearchSpaceType, other: SearchSpaceType) -> SearchSpaceType: ...
@overload def __mul__( # type: ignore[overload-cannot-match] self: SearchSpaceType, other: SearchSpace, ) -> SearchSpace: # mypy complains that this is superfluous, but it seems to use it fine to infer # that Box * Box = Box, while Box * Discrete = SearchSpace. ... def __mul__(self, other: SearchSpace) -> SearchSpace: """ :param other: A search space. :return: The Cartesian product of this search space with the ``other``. If both spaces are of the same type (and have no constraints) then this calls the :meth:`product` method. Otherwise, it generates a :class:`TaggedProductSearchSpace`. """ # If the search space has any constraints, always return a tagged product search space. if not self.has_constraints and not other.has_constraints and isinstance(other, type(self)): try: return self.product(other) except NotImplementedError: # Subclass opts out of a same-type product; fall back to a tagged product. pass return TaggedProductSearchSpace((self, other))
[docs] def __pow__(self: SearchSpaceType, other: int) -> SearchSpaceType: """ Return the Cartesian product of ``other`` instances of this search space. For example, for an exponent of `3`, and search space `s`, this is `s ** 3`, which is equivalent to `s * s * s`. :param other: The exponent, or number of instances of this search space to multiply together. Must be strictly positive. :return: The Cartesian product of ``other`` instances of this search space. :raise tf.errors.InvalidArgumentError: If the exponent ``other`` is less than 1. """ tf.debugging.assert_positive(other, message="Exponent must be strictly positive") return reduce(operator.mul, [self] * other)
[docs] def discretize(self, num_samples: int) -> DiscreteSearchSpace: """ :param num_samples: The number of points in the :class:`DiscreteSearchSpace`. :return: A discrete search space consisting of ``num_samples`` points sampled uniformly from this search space. :raise NotImplementedError: If this :class:`SearchSpace` has constraints. """ if self.has_constraints: # Constraints are not supported. raise NotImplementedError( "Discretization is currently not supported in the presence of constraints." ) return DiscreteSearchSpace(points=self.sample(num_samples))
@abstractmethod
[docs] def __eq__(self, other: object) -> bool: """ :param other: A search space. :return: Whether the search space is identical to this one. """
@property
[docs] def constraints(self) -> Sequence[Constraint]: """The sequence of explicit constraints specified in this search space.""" return []
[docs] def constraints_residuals(self, points: TensorType) -> TensorType: """ Return residuals for all the constraints in this :class:`SearchSpace`. :param points: The points to get the residuals for, with shape [..., D]. :return: A tensor of all the residuals with shape [..., C], where C is the total number of constraints. :raise NotImplementedError: If this :class:`SearchSpace` does not support constraints. """ raise NotImplementedError("Constraints are currently not supported for this search space.")
[docs] def is_feasible(self, points: TensorType) -> TensorType: """ Checks if points satisfy the explicit constraints of this :class:`SearchSpace`. Note membership of the search space is not checked. :param points: The points to check constraints feasibility for, with shape [..., D]. :return: A tensor of booleans. Returns `True` for each point if it is feasible in this search space, else `False`. :raise NotImplementedError: If this :class:`SearchSpace` has constraints. """ # Everything is feasible in the absence of constraints. Must be overriden if there are # constraints. if self.has_constraints: raise NotImplementedError("Feasibility check is not implemented for this search space.") return tf.cast(tf.ones(points.shape[:-1]), dtype=bool)
@property
[docs] def has_constraints(self) -> bool: """Returns `True` if this search space has any explicit constraints specified.""" # By default, assume there are no constraints; can be overridden by a subclass. return False
[docs] class GeneralDiscreteSearchSpace(SearchSpace): """ An ABC representing different types of discrete search spaces (not just numerical). This contains a default implementation using explicitly provided points which subclasses may ignore. """ def __init__(self, points: TensorType): """ :param points: The points that define the discrete space, with shape ('N', 'D'). :raise ValueError (or tf.errors.InvalidArgumentError): If ``points`` has an invalid shape. """ tf.debugging.assert_shapes([(points, ("N", "D"))]) self._points = points self._dimension = tf.shape(self._points)[-1] @property
[docs] def points(self) -> TensorType: """All the points in this space.""" return self._points
@property
[docs] def dimension(self) -> TensorType: """The number of inputs in this search space.""" return self._dimension
[docs] def _contains(self, value: TensorType) -> TensorType: comparison = tf.math.equal(self.points, tf.expand_dims(value, -2)) # [..., N, D] return tf.reduce_any(tf.reduce_all(comparison, axis=-1), axis=-1) # [...]
[docs] def sample(self, num_samples: int, seed: Optional[int] = None) -> TensorType: """ :param num_samples: The number of points to sample from this search space. :param seed: Random seed for reproducibility. :return: ``num_samples`` i.i.d. random points, sampled uniformly, from this search space. """ if seed is not None: # ensure reproducibility tf.random.set_seed(seed) if num_samples == 0: return self.points[:0, :] else: sampled_indices = tf.random.categorical( tf.ones((1, tf.shape(self.points)[0])), num_samples, seed=seed ) return tf.gather(self.points, sampled_indices)[0, :, :] # [num_samples, D]
[docs] class DiscreteSearchSpace(GeneralDiscreteSearchSpace): r""" A discrete :class:`SearchSpace` representing a finite set of :math:`D`-dimensional points in :math:`\mathbb{R}^D`. For example: >>> points = tf.constant([[-1.0, 0.4], [-1.0, 0.6], [0.0, 0.4]]) >>> search_space = DiscreteSearchSpace(points) >>> assert tf.constant([0.0, 0.4]) in search_space >>> assert tf.constant([1.0, 0.5]) not in search_space """ def __repr__(self) -> str: """""" return f"DiscreteSearchSpace({self._points!r})" @property
[docs] def has_bounds(self) -> bool: return True
@property
[docs] def lower(self) -> TensorType: """The lowest value taken across all points by each search space dimension.""" return tf.reduce_min(self.points, -2)
@property
[docs] def upper(self) -> TensorType: """The highest value taken across all points by each search space dimension.""" return tf.reduce_max(self.points, -2)
[docs] def product(self, other: DiscreteSearchSpace) -> DiscreteSearchSpace: r""" Return the Cartesian product of the two :class:`DiscreteSearchSpace`\ s. For example: >>> sa = DiscreteSearchSpace(tf.constant([[0, 1], [2, 3]])) >>> sb = DiscreteSearchSpace(tf.constant([[4, 5, 6], [7, 8, 9]])) >>> (sa * sb).points.numpy() array([[0, 1, 4, 5, 6], [0, 1, 7, 8, 9], [2, 3, 4, 5, 6], [2, 3, 7, 8, 9]], dtype=int32) :param other: A :class:`DiscreteSearchSpace` with :attr:`points` of the same dtype as this search space. :return: The Cartesian product of the two :class:`DiscreteSearchSpace`\ s. :raise TypeError: If one :class:`DiscreteSearchSpace` has :attr:`points` of a different dtype to the other. """ if self.points.dtype is not other.points.dtype: return NotImplemented tile_self = tf.tile(self.points[:, None], [1, len(other.points), 1]) tile_other = tf.tile(other.points[None], [len(self.points), 1, 1]) cartesian_product = tf.concat([tile_self, tile_other], axis=2) product_space_dimension = self.points.shape[-1] + other.points.shape[-1] return DiscreteSearchSpace(tf.reshape(cartesian_product, [-1, product_space_dimension]))
[docs] def __eq__(self, other: object) -> bool: """ :param other: A search space. :return: Whether the search space is identical to this one. """ if not isinstance(other, DiscreteSearchSpace): return NotImplemented return bool(tf.reduce_all(tf.sort(self.points, 0) == tf.sort(other.points, 0)))
[docs] class BooleanSearchSpace(DiscreteSearchSpace): r""" A 1-D :class:`DiscreteSearchSpace` restricted to :math:`\{0, 1\}`, representing a single Boolean indicator variable from the GDP formulation. Provides a distinct type for dispatch in validation and downstream consumers (e.g. GA bitflip mutation, kernel active/inactive checks). Example: >>> space = BooleanSearchSpace() >>> assert space.dimension == 1 >>> assert tf.constant([0.0], dtype=tf.float64) in space >>> assert tf.constant([1.0], dtype=tf.float64) in space >>> assert tf.constant([2.0], dtype=tf.float64) not in space """ def __init__(self, dtype: tf.DType = DEFAULT_DTYPE) -> None: """ :param dtype: The dtype of the points. Defaults to :data:`DEFAULT_DTYPE`. """ super().__init__(points=tf.constant([[0], [1]], dtype=dtype)) def __repr__(self) -> str: """""" return f"BooleanSearchSpace({self.points.dtype!r})"
[docs] def __eq__(self, other: object) -> bool: if not isinstance(other, BooleanSearchSpace): return NotImplemented return self.points.dtype == other.points.dtype
@runtime_checkable
[docs] class HasOneHotEncoder(Protocol): """A categorical search space that contains default logic for one-hot encoding.""" @property @abstractmethod
[docs] def one_hot_encoder(self) -> EncoderFunction: "A one-hot encoder for points in the search space."
[docs] def one_hot_encoder(space: SearchSpace) -> EncoderFunction: "A utility function for one-hot encoding a search space when it supports it." return space.one_hot_encoder if isinstance(space, HasOneHotEncoder) else lambda x: x
[docs] def cast_encoder( encoder: EncoderFunction, input_dtype: Optional[tf.DType] = None, output_dtype: Optional[tf.DType] = None, ) -> EncoderFunction: "A utility function for casting the input and/or output of an encoder." def cast_and_encode(x: TensorType) -> TensorType: if input_dtype is not None: x = tf.cast(x, input_dtype) y = encoder(x) if output_dtype is not None: y = tf.cast(y, output_dtype) return y return cast_and_encode
[docs] def one_hot_encoded_space(space: SearchSpace) -> SearchSpace: "A bounded search space corresponding to the one-hot encoding of the given space." if isinstance(space, GeneralDiscreteSearchSpace) and isinstance(space, HasOneHotEncoder): return DiscreteSearchSpace(space.one_hot_encoder(space.points)) elif isinstance(space, TaggedProductSearchSpace): spaces = [one_hot_encoded_space(space.get_subspace(tag)) for tag in space.subspace_tags] return TaggedProductSearchSpace(spaces=spaces, tags=space.subspace_tags) elif isinstance(space, HasOneHotEncoder): raise NotImplementedError(f"Unsupported one-hot-encoded space {type(space)}") else: return space
[docs] class CategoricalSearchSpace(GeneralDiscreteSearchSpace, HasOneHotEncoder): r""" A categorical :class:`SearchSpace` representing a finite set :math:`\mathcal{C}` of categories, or a finite Cartesian product :math:`\mathcal{C}_1 \times \cdots \times \mathcal{C}_n` of such sets. For example: >>> CategoricalSearchSpace(5) CategoricalSearchSpace([('0', '1', '2', '3', '4')]) >>> CategoricalSearchSpace(["Red", "Green", "Blue"]) CategoricalSearchSpace([('Red', 'Green', 'Blue')]) >>> CategoricalSearchSpace([2,3]) CategoricalSearchSpace([('0', '1'), ('0', '1', '2')]) >>> CategoricalSearchSpace([["R", "G", "B"], ["Y", "N"]]) CategoricalSearchSpace([('R', 'G', 'B'), ('Y', 'N')]) Note that internally categories are represented by numeric indices: >>> rgb = CategoricalSearchSpace(["Red", "Green", "Blue"]) >>> assert tf.constant([1], dtype=tf.float64) in rgb >>> assert tf.constant([3], dtype=tf.float64) not in rgb >>> rgb.to_tags(tf.constant([[1], [0], [2]])) <tf.Tensor: shape=(3, 1), dtype=string, numpy= array([[b'Green'], [b'Red'], [b'Blue']], dtype=object)> """ def __init__( self, categories: int | Sequence[int] | Sequence[str] | Sequence[Sequence[str]], dtype: tf.DType = DEFAULT_DTYPE, ): """ :param categories: Number of categories or category names. Can be an array for multidimensional spaces. :param dtype: The dtype of the returned indices, either tf.float32 or tf.float64. """ if isinstance(categories, int) or any(isinstance(x, str) for x in categories): categories = [categories] # type: ignore[assignment] if not isinstance(categories, Sequence) or not ( all( isinstance(x, Sequence) and not isinstance(x, str) and all(isinstance(y, str) for y in x) for x in categories ) or all(isinstance(x, int) for x in categories) ): raise TypeError( f"Invalid category description {categories!r}: " "expected either numbers or names." ) elif any(isinstance(x, int) for x in categories): category_lens: Sequence[int] = categories # type: ignore[assignment] if any(x <= 0 for x in category_lens): raise ValueError("Numbers of categories must be positive") tags = [tuple(f"{i}" for i in range(n)) for n in category_lens] else: category_names: Sequence[Sequence[str]] = categories # type: ignore[assignment] if any(len(ts) == 0 for ts in category_names): raise ValueError("Category name lists cannot be empty") tags = [tuple(ts) for ts in category_names] self._tags = tags self._dtype = dtype ranges = [tf.range(len(ts), dtype=dtype) for ts in tags] meshgrid = tf.meshgrid(*ranges, indexing="ij") points = ( tf.reshape(tf.stack(meshgrid, axis=-1), [-1, len(tags)]) if tags else tf.zeros([0, 0]) ) super().__init__(points) def __repr__(self) -> str: """""" return f"CategoricalSearchSpace({self._tags!r})" @property
[docs] def has_bounds(self) -> bool: return False
@property
[docs] def lower(self) -> TensorType: raise AttributeError("Categorical search spaces do not have numerical bounds")
@property
[docs] def upper(self) -> TensorType: raise AttributeError("Categorical search spaces do not have numerical bounds")
@property
[docs] def tags(self) -> Sequence[Sequence[str]]: """The tags of the categories.""" return self._tags
@property
[docs] def one_hot_encoder(self) -> EncoderFunction: """A one-hot encoder for the numerical indices. Note that binary categories are left unchanged instead of adding an unnecessary second feature.""" def binary_encoder(x: TensorType) -> TensorType: # no need to one-hot encode binary categories (but we should still validate) tf.debugging.Assert(tf.reduce_all((x == 0) | (x == 1)), [tf.constant([])]) return x def encoder(x: TensorType) -> TensorType: flat_x, unflatten = flatten_leading_dims(x) tf.debugging.assert_equal(tf.shape(flat_x)[-1], len(self.tags)) columns = tf.split(flat_x, len(self.tags), axis=1) encoders = [ ( binary_encoder if len(ts) == 2 else tf_keras.layers.CategoryEncoding(num_tokens=len(ts), output_mode="one_hot") ) for ts in self.tags ] encoded = tf.concat( [ tf.cast(encoder(tf.reshape(column, [-1, 1])), dtype=x.dtype) for encoder, column in zip(encoders, columns) ], axis=1, ) return unflatten(encoded) return encoder
[docs] def to_tags(self, indices: TensorType) -> TensorType: """ Convert a tensor of indices (such as one returned by :meth:`sample`) to one of category tags. :param indices: A tensor of integer indices. :return: A tensor of string tags. """ if indices.dtype.is_floating: if not tf.reduce_all(tf.math.equal(indices, tf.math.floor(indices))): raise ValueError("Non-integral indices passed to to_tags") indices = tf.cast(indices, dtype=tf.int32) def extract_tags(row: TensorType) -> TensorType: return tf.stack( [tf.gather(tf.constant(self._tags[i]), row[i]) for i in range(len(row))] ) return tf.map_fn(extract_tags, indices, dtype=tf.string)
[docs] def product(self, other: CategoricalSearchSpace) -> CategoricalSearchSpace: r""" Return the Cartesian product of the two :class:`CategoricalSearchSpace`\ s. For example: >>> rgb = CategoricalSearchSpace(["Red", "Green", "Blue"]) >>> yn = CategoricalSearchSpace(["Yes", "No"]) >>> rgb * yn CategoricalSearchSpace([('Red', 'Green', 'Blue'), ('Yes', 'No')]) :param other: A :class:`CategoricalSearchSpace`. :return: The Cartesian product of the two :class:`CategoricalSearchSpace`\ s. """ return CategoricalSearchSpace(tuple(chain(self.tags, other.tags)))
[docs] def __eq__(self, other: object) -> bool: """ :param other: A search space. :return: Whether the search space is identical to this one. """ if not isinstance(other, CategoricalSearchSpace): return NotImplemented return self.tags == other.tags
[docs] class Box(SearchSpace): r""" Continuous :class:`SearchSpace` representing a :math:`D`-dimensional box in :math:`\mathbb{R}^D`. Mathematically it is equivalent to the Cartesian product of :math:`D` closed bounded intervals in :math:`\mathbb{R}`. """ @overload def __init__( self, lower: Sequence[float], upper: Sequence[float], constraints: Optional[Sequence[Constraint]] = None, ctol: float | TensorType = 1e-7, ): ... @overload def __init__( self, lower: TensorType, upper: TensorType, constraints: Optional[Sequence[Constraint]] = None, ctol: float | TensorType = 1e-7, ): ... def __init__( self, lower: Sequence[float] | TensorType, upper: Sequence[float] | TensorType, constraints: Optional[Sequence[Constraint]] = None, ctol: float | TensorType = 1e-7, ): r""" If ``lower`` and ``upper`` are `Sequence`\ s of floats (such as lists or tuples), they will be converted to tensors of dtype `DEFAULT_DTYPE`. :param lower: The lower (inclusive) bounds of the box. Must have shape [D] for positive D, and if a tensor, must have float type. :param upper: The upper (inclusive) bounds of the box. Must have shape [D] for positive D, and if a tensor, must have float type. :param constraints: Sequence of explicit input constraints for this search space. :param ctol: Tolerance to use to check constraints satisfaction. :raise ValueError (or tf.errors.InvalidArgumentError): If any of the following are true: - ``lower`` and ``upper`` have invalid shapes. - ``lower`` and ``upper`` do not have the same floating point type. - ``upper`` is not greater or equal to ``lower`` across all dimensions. """ tf.debugging.assert_shapes([(lower, ["D"]), (upper, ["D"])]) tf.assert_rank(lower, 1) tf.assert_rank(upper, 1) tf.debugging.assert_non_negative(ctol, message="Tolerance must be non-negative") if isinstance(lower, Sequence): self._lower = tf.constant(lower, dtype=DEFAULT_DTYPE) self._upper = tf.constant(upper, dtype=DEFAULT_DTYPE) else: self._lower = tf.convert_to_tensor(lower) self._upper = tf.convert_to_tensor(upper) tf.debugging.assert_same_float_dtype([self._lower, self._upper]) tf.debugging.assert_less_equal(self._lower, self._upper) self._dimension = tf.shape(self._upper)[-1] if constraints is None: self._constraints: Sequence[Constraint] = [] else: self._constraints = constraints self._ctol = ctol def __repr__(self) -> str: """""" return f"Box({self._lower!r}, {self._upper!r}, {self._constraints!r}, {self._ctol!r})" @property
[docs] def has_bounds(self) -> bool: return True
@property
[docs] def lower(self) -> tf.Tensor: """The lower bounds of the box.""" return self._lower
@property
[docs] def upper(self) -> tf.Tensor: """The upper bounds of the box.""" return self._upper
@property
[docs] def dimension(self) -> TensorType: """The number of inputs in this search space.""" return self._dimension
@property
[docs] def constraints(self) -> Sequence[Constraint]: """The sequence of explicit constraints specified in this search space.""" return self._constraints
[docs] def _contains(self, value: TensorType) -> TensorType: """ For each point in ``value``, return `True` if the point is a member of this search space, else `False`. A point is a member if all of its coordinates lie in the closed intervals bounded by the lower and upper bounds. :param value: A point or points to check for membership of this :class:`SearchSpace`. :return: A boolean array showing membership for each point in value. """ return tf.reduce_all(value >= self._lower, axis=-1) & tf.reduce_all( value <= self._upper, axis=-1 )
def _sample(self, num_samples: int, seed: Optional[int] = None) -> TensorType: # Internal common method to sample randomly from the space. dim = tf.shape(self._lower)[-1] return tf.random.uniform( (num_samples, dim), minval=self._lower, maxval=self._upper, dtype=self._lower.dtype, seed=seed, )
[docs] def sample(self, num_samples: int, seed: Optional[int] = None) -> TensorType: """ Sample randomly from the space. :param num_samples: The number of points to sample from this search space. :param seed: Random seed for reproducibility. :return: ``num_samples`` i.i.d. random points, sampled uniformly, from this search space with shape '[num_samples, D]' , where D is the search space dimension. """ tf.debugging.assert_non_negative(num_samples) if seed is not None: # ensure reproducibility tf.random.set_seed(seed) return self._sample(num_samples, seed)
def _sample_halton( self, start: int, num_samples: int, seed: Optional[int] = None, ) -> TensorType: # Internal common method to sample from the space using a Halton sequence. tf.debugging.assert_non_negative(num_samples) if num_samples == 0: return tf.constant([], dtype=self._lower.dtype) if seed is not None: # ensure reproducibility tf.random.set_seed(seed) dim = tf.shape(self._lower)[-1] sequence_indices = tf.range(start=start, limit=start + num_samples, dtype=tf.int32) return (self._upper - self._lower) * tfp.mcmc.sample_halton_sequence( dim=dim, sequence_indices=sequence_indices, dtype=self._lower.dtype, seed=seed ) + self._lower
[docs] def sample_halton(self, num_samples: int, seed: Optional[int] = None) -> TensorType: """ Sample from the space using a Halton sequence. The resulting samples are guaranteed to be diverse and are reproducible by using the same choice of ``seed``. :param num_samples: The number of points to sample from this search space. :param seed: Random seed for the halton sequence :return: ``num_samples`` of points, using halton sequence with shape '[num_samples, D]' , where D is the search space dimension. """ return self._sample_halton(0, num_samples, seed)
[docs] def sample_sobol(self, num_samples: int, skip: Optional[int] = None) -> TensorType: """ Sample a diverse set from the space using a Sobol sequence. If ``skip`` is specified, then the resulting samples are reproducible. :param num_samples: The number of points to sample from this search space. :param skip: The number of initial points of the Sobol sequence to skip :return: ``num_samples`` of points, using sobol sequence with shape '[num_samples, D]' , where D is the search space dimension. """ tf.debugging.assert_non_negative(num_samples) if num_samples == 0: return tf.constant([], dtype=self._lower.dtype) if skip is None: # generate random skip skip = tf.random.uniform([1], maxval=2**16, dtype=tf.int32)[0] dim = tf.shape(self._lower)[-1] return (self._upper - self._lower) * tf.math.sobol_sample( dim=dim, num_results=num_samples, dtype=self._lower.dtype, skip=skip ) + self._lower
[docs] def _sample_feasible_loop( self, num_samples: int, sampler: Callable[[], TensorType], max_tries: int = 100, ) -> TensorType: """ Rejection sampling using provided callable. Try ``max_tries`` number of times to find ``num_samples`` feasible points. :param num_samples: The number of feasible points to sample from this search space. :param sampler: Callable to return samples. Called potentially multiple times. :param max_tries: Maximum attempts to sample the requested number of points. :return: ``num_samples`` feasible points sampled using ``sampler``. :raise SampleTimeoutError: If ``max_tries`` are exhausted before ``num_samples`` are sampled. """ xs = [] count = 0 tries = 0 while count < num_samples and tries < max_tries: tries += 1 xi = sampler() mask = self.is_feasible(xi) xo = tf.boolean_mask(xi, mask) xs.append(xo) count += xo.shape[0] if count < num_samples: raise SampleTimeoutError( f"""Failed to sample {num_samples} feasible point(s), even after {tries} attempts. Sampled only {count} feasible point(s).""" ) xs = tf.concat(xs, axis=0)[:num_samples] return xs
[docs] def sample_feasible( self, num_samples: int, seed: Optional[int] = None, max_tries: int = 100 ) -> TensorType: """ Sample feasible points randomly from the space. :param num_samples: The number of feasible points to sample from this search space. :param seed: Random seed for reproducibility. :param max_tries: Maximum attempts to sample the requested number of points. :return: ``num_samples`` i.i.d. random points, sampled uniformly, from this search space with shape '[num_samples, D]' , where D is the search space dimension. :raise SampleTimeoutError: If ``max_tries`` are exhausted before ``num_samples`` are sampled. """ tf.debugging.assert_non_negative(num_samples) # Without constraints or zero-num-samples use the normal sample method directly. if not self.has_constraints or num_samples == 0: return self.sample(num_samples, seed) if seed is not None: # ensure reproducibility tf.random.set_seed(seed) def _sampler() -> TensorType: return self._sample(num_samples, seed) return self._sample_feasible_loop(num_samples, _sampler, max_tries)
[docs] def sample_halton_feasible( self, num_samples: int, seed: Optional[int] = None, max_tries: int = 100 ) -> TensorType: """ Sample feasible points from the space using a Halton sequence. The resulting samples are guaranteed to be diverse and are reproducible by using the same choice of ``seed``. :param num_samples: The number of feasible points to sample from this search space. :param seed: Random seed for the halton sequence :param max_tries: Maximum attempts to sample the requested number of points. :return: ``num_samples`` of points, using halton sequence with shape '[num_samples, D]' , where D is the search space dimension. :raise SampleTimeoutError: If ``max_tries`` are exhausted before ``num_samples`` are sampled. """ tf.debugging.assert_non_negative(num_samples) # Without constraints or zero-num-samples use the normal sample method directly. if not self.has_constraints or num_samples == 0: return self.sample_halton(num_samples, seed) start = 0 def _sampler() -> TensorType: nonlocal start # Global seed is set on every call in _sample_halton() so that we always sample from # the same (randomised) sequence, and skip the relevant number of beginning samples. samples = self._sample_halton(start, num_samples, seed) start += num_samples return samples return self._sample_feasible_loop(num_samples, _sampler, max_tries)
[docs] def sample_sobol_feasible( self, num_samples: int, skip: Optional[int] = None, max_tries: int = 100 ) -> TensorType: """ Sample a diverse set of feasible points from the space using a Sobol sequence. If ``skip`` is specified, then the resulting samples are reproducible. :param num_samples: The number of feasible points to sample from this search space. :param skip: The number of initial points of the Sobol sequence to skip :param max_tries: Maximum attempts to sample the requested number of points. :return: ``num_samples`` of points, using sobol sequence with shape '[num_samples, D]' , where D is the search space dimension. :raise SampleTimeoutError: If ``max_tries`` are exhausted before ``num_samples`` are sampled. """ tf.debugging.assert_non_negative(num_samples) # Without constraints or zero-num-samples use the normal sample method directly. if not self.has_constraints or num_samples == 0: return self.sample_sobol(num_samples, skip) if skip is None: # generate random skip skip = tf.random.uniform([1], maxval=2**16, dtype=tf.int32)[0] _skip: TensorType = skip # To keep mypy happy. def _sampler() -> TensorType: nonlocal _skip samples = self.sample_sobol(num_samples, skip=_skip) # Skip the relevant number of beginning samples from previous iterations. _skip += num_samples return samples return self._sample_feasible_loop(num_samples, _sampler, max_tries)
[docs] def product(self, other: Box) -> Box: r""" Return the Cartesian product of the two :class:`Box`\ es (concatenating their respective lower and upper bounds). For example: >>> unit_interval = Box([0.0], [1.0]) >>> square_at_origin = Box([-2.0, -2.0], [2.0, 2.0]) >>> new_box = unit_interval * square_at_origin >>> new_box.lower.numpy() array([ 0., -2., -2.]) >>> new_box.upper.numpy() array([1., 2., 2.]) :param other: A :class:`Box` with bounds of the same type as this :class:`Box`. :return: The Cartesian product of the two :class:`Box`\ es. :raise TypeError: If the bounds of one :class:`Box` have different dtypes to those of the other :class:`Box`. """ if self.lower.dtype is not other.lower.dtype: return NotImplemented product_lower_bound = tf.concat([self._lower, other.lower], axis=-1) product_upper_bound = tf.concat([self._upper, other.upper], axis=-1) return Box(product_lower_bound, product_upper_bound)
[docs] def __eq__(self, other: object) -> bool: """ :param other: A search space. :return: Whether the search space is identical to this one. """ if not isinstance(other, Box): return NotImplemented return bool( tf.reduce_all(self.lower == other.lower) and tf.reduce_all(self.upper == other.upper) # Constraints match only if they are exactly the same (in the same order). and self._constraints == other._constraints )
[docs] def constraints_residuals(self, points: TensorType) -> TensorType: """ Return residuals for all the constraints in this :class:`SearchSpace`. :param points: The points to get the residuals for, with shape [..., D]. :return: A tensor of all the residuals with shape [..., C], where C is the total number of constraints. """ residuals = [constraint.residual(points) for constraint in self._constraints] residuals = tf.concat(residuals, axis=-1) return residuals
[docs] def is_feasible(self, points: TensorType) -> TensorType: """ Checks if points satisfy the explicit constraints of this :class:`SearchSpace`. Note membership of the search space is not checked. :param points: The points to check constraints feasibility for, with shape [..., D]. :return: A tensor of booleans. Returns `True` for each point if it is feasible in this search space, else `False`. """ return tf.math.reduce_all(self.constraints_residuals(points) >= -self._ctol, axis=-1)
@property
[docs] def has_constraints(self) -> bool: """Returns `True` if this search space has any explicit constraints specified.""" return len(self._constraints) > 0
[docs] class CollectionSearchSpace(SearchSpace): r""" An abstract :class:`SearchSpace` consisting of a collection of multiple :class:`SearchSpace` objects, each with a unique tag. This class provides functionality for accessing each individual space. Note that the individual spaces are not combined in any way. """ def __init__(self, spaces: Sequence[SearchSpace], tags: Optional[Sequence[str]] = None): r""" Build a :class:`CollectionSearchSpace` from a list ``spaces`` of other spaces. If ``tags`` are provided then they form the identifiers of the subspaces, otherwise the subspaces are labelled numerically. :param spaces: A sequence of :class:`SearchSpace` objects representing the space's subspaces :param tags: An optional list of tags giving the unique identifiers of the space's subspaces. :raise ValueError (or tf.errors.InvalidArgumentError): If ``spaces`` has a different length to ``tags`` when ``tags`` is provided or if ``tags`` contains duplicates. """ number_of_subspaces = len(spaces) if tags is None: tags = [str(index) for index in range(number_of_subspaces)] else: number_of_tags = len(tags) tf.debugging.assert_equal( number_of_tags, number_of_subspaces, message=f""" Number of tags must match number of subspaces but received {number_of_tags} tags and {number_of_subspaces} subspaces. """, ) number_of_unique_tags = len(set(tags)) tf.debugging.assert_equal( number_of_tags, number_of_unique_tags, message=f"Subspace names must be unique but received {tags}.", ) self._spaces = dict(zip(tags, spaces)) self._tags = tuple(tags) # avoid accidental modification by users def __repr__(self) -> str: return f"""{self.__class__.__name__}(spaces = {[self.get_subspace(tag) for tag in self.subspace_tags]}, tags = {self.subspace_tags}) """ @property
[docs] def has_bounds(self) -> bool: """Whether the search space has meaningful numerical bounds.""" return all(self.get_subspace(tag).has_bounds for tag in self.subspace_tags)
@property
[docs] def subspace_lower(self) -> Sequence[TensorType]: """The lowest values taken by each space dimension, in the same order as specified when initializing the space.""" return [self.get_subspace(tag).lower for tag in self.subspace_tags]
@property
[docs] def subspace_upper(self) -> Sequence[TensorType]: """The highest values taken by each space dimension, in the same order as specified when initializing the space.""" return [self.get_subspace(tag).upper for tag in self.subspace_tags]
@property
[docs] def subspace_tags(self) -> tuple[str, ...]: """Return the names of the subspaces contained in this space.""" return self._tags
@property
[docs] def subspace_dimension(self) -> Sequence[TensorType]: """The number of inputs in each subspace, in the same order as specified when initializing the space.""" return [self.get_subspace(tag).dimension for tag in self.subspace_tags]
[docs] def get_subspace(self, tag: str) -> SearchSpace: """ Return the domain of a particular subspace. :param tag: The tag specifying the target subspace. :return: Target subspace. """ tf.debugging.assert_equal( tag in self.subspace_tags, True, message=f""" Attempted to access a subspace that does not exist. This space only contains subspaces with the tags {self.subspace_tags} but received {tag}. """, ) return self._spaces[tag]
[docs] def subspace_sample(self, num_samples: int, seed: Optional[int] = None) -> Sequence[TensorType]: """ Sample randomly from the space by sampling from each subspace and returning the resulting samples in the same order as specified when initializing the space. :param num_samples: The number of points to sample from each subspace. :param seed: Optional tf.random seed. :return: ``num_samples`` i.i.d. random points, sampled uniformly, from each search subspace with shape '[num_samples, D]' , where D is the search space dimension. """ tf.debugging.assert_non_negative(num_samples) if seed is not None: # ensure reproducibility tf.random.set_seed(seed) return [ # ensure subspaces (which may be identical) don't all use the same seed self._spaces[tag].sample(num_samples, seed=None if seed is None else seed + i) for i, tag in enumerate(self._tags) ]
[docs] def __eq__(self, other: object) -> bool: """ :param other: A search space. :return: Whether the search space is identical to this one. """ if not isinstance(other, type(self)): return NotImplemented return self._tags == other._tags and self._spaces == other._spaces
[docs] class TaggedProductSearchSpace(CollectionSearchSpace, HasOneHotEncoder): r""" Product :class:`SearchSpace` consisting of a product of multiple :class:`SearchSpace`. This class provides functionality for accessing either the resulting combined search space or each individual space. This class is useful for defining mixed search spaces, for example: context_space = DiscreteSearchSpace(tf.constant([[-0.5, 0.5]])) decision_space = Box([-1, -2], [2, 3]) mixed_space = TaggedProductSearchSpace(spaces=[context_space, decision_space]) Note: the dtype of all the component search spaces must be the same. Note that this class assumes that individual points in product spaces are represented with their inputs in the same order as specified when initializing the space. """ def __init__(self, spaces: Sequence[SearchSpace], tags: Optional[Sequence[str]] = None): r""" Build a :class:`TaggedProductSearchSpace` from a list ``spaces`` of other spaces. If ``tags`` are provided then they form the identifiers of the subspaces, otherwise the subspaces are labelled numerically. :param spaces: A sequence of :class:`SearchSpace` objects representing the space's subspaces :param tags: An optional list of tags giving the unique identifiers of the space's subspaces. :raise ValueError (or tf.errors.InvalidArgumentError): If ``spaces`` has a different length to ``tags`` when ``tags`` is provided or if ``tags`` contains duplicates. """ super().__init__(spaces, tags) subspace_sizes = self.subspace_dimension self._subspace_sizes_by_tag = dict(zip(self._tags, subspace_sizes)) self._subspace_starting_indices = dict( zip(self._tags, tf.cumsum(subspace_sizes, exclusive=True)) ) self._dimension = tf.cast(tf.reduce_sum(subspace_sizes), dtype=tf.int32) @property @check_shapes("return: [D]")
[docs] def lower(self) -> TensorType: """The lowest values taken by each space dimension, concatenated across subspaces.""" lower_for_each_subspace = self.subspace_lower return ( tf.concat(lower_for_each_subspace, axis=-1) if lower_for_each_subspace else tf.constant([], dtype=DEFAULT_DTYPE) )
@property @check_shapes("return: [D]")
[docs] def upper(self) -> TensorType: """The highest values taken by each space dimension, concatenated across subspaces.""" upper_for_each_subspace = self.subspace_upper return ( tf.concat(upper_for_each_subspace, axis=-1) if upper_for_each_subspace else tf.constant([], dtype=DEFAULT_DTYPE) )
@property @check_shapes("return: []")
[docs] def dimension(self) -> TensorType: """The number of inputs in this product search space.""" return self._dimension
[docs] def fix_subspace(self, tag: str, values: TensorType) -> TaggedProductSearchSpace: """ Return a new :class:`TaggedProductSearchSpace` with the specified subspace replaced with a :class:`DiscreteSearchSpace` containing ``values`` as its points. This is useful if you wish to restrict subspaces to sets of representative points. :param tag: The tag specifying the target subspace. :param values: The values used to populate the new discrete subspace.z :return: New :class:`TaggedProductSearchSpace` with the specified subspace replaced with a :class:`DiscreteSearchSpace` containing ``values`` as its points. """ new_spaces = [ self.get_subspace(t) if t != tag else DiscreteSearchSpace(points=values) for t in self.subspace_tags ] return TaggedProductSearchSpace(spaces=new_spaces, tags=self.subspace_tags)
[docs] def get_subspace_component(self, tag: str, values: TensorType) -> TensorType: """ Returns the components of ``values`` lying in a particular subspace. :param tag: Subspace tag. :param values: Points from the :class:`TaggedProductSearchSpace` of shape [N,Dprod]. :return: The sub-components of ``values`` lying in the specified subspace, of shape [N, Dsub], where Dsub is the dimensionality of the specified subspace. """ starting_index_of_subspace = self._subspace_starting_indices[tag] ending_index_of_subspace = starting_index_of_subspace + self._subspace_sizes_by_tag[tag] return values[..., starting_index_of_subspace:ending_index_of_subspace]
[docs] def _contains(self, value: TensorType) -> TensorType: """ Return `True` if ``value`` is a member of this search space, else `False`. A point is a member if each of its subspace components lie in each subspace. Recall that individual points in product spaces are represented with their inputs in the same order as specified when initializing the space. :param value: A point to check for membership of this :class:`SearchSpace`. :return: `True` if ``value`` is a member of this search space, else `False`. May return a scalar boolean `TensorType` instead of the `bool` itself. :raise ValueError (or tf.errors.InvalidArgumentError): If ``value`` has a different dimensionality from the search space. """ in_each_subspace = [ self._spaces[tag].contains(self.get_subspace_component(tag, value)) for tag in self._tags ] return tf.reduce_all(in_each_subspace, axis=0)
@check_shapes("return: [num_samples, D]")
[docs] def sample(self, num_samples: int, seed: Optional[int] = None) -> TensorType: """ Sample randomly from the space by sampling from each subspace and concatenating the resulting samples. :param num_samples: The number of points to sample from this search space. :param seed: Optional tf.random seed. :return: ``num_samples`` i.i.d. random points, sampled uniformly, from this search space with shape '[num_samples, D]' , where D is the search space dimension. """ subspace_samples = self.subspace_sample(num_samples, seed) return tf.concat(subspace_samples, -1)
[docs] def product(self, other: TaggedProductSearchSpace) -> TaggedProductSearchSpace: r""" Return the Cartesian product of the two :class:`TaggedProductSearchSpace`\ s, building a tree of :class:`TaggedProductSearchSpace`\ s. :param other: A search space of the same type as this search space. :return: The Cartesian product of this search space with the ``other``. """ return TaggedProductSearchSpace(spaces=[self, other])
@property
[docs] def one_hot_encoder(self) -> EncoderFunction: """An encoder that one-hot-encodes all subpsaces that support it (and leaves the other subspaces unchanged).""" def encoder(x: TensorType) -> TensorType: components = [] for tag in self.subspace_tags: component = self.get_subspace_component(tag, x) space = self.get_subspace(tag) if isinstance(space, HasOneHotEncoder): component = space.one_hot_encoder(component) components.append(component) return tf.concat(components, axis=-1) return encoder
[docs] def _reject_duplicate_tags(seq_name: str, tags: Sequence[str]) -> None: """Raise ``ValueError`` if ``tags`` contains duplicates (shared by the hierarchy helper and :class:`HierarchicalSearchSpace`).""" duplicates = sorted(t for t, count in Counter(tags).items() if count > 1) if duplicates: raise ValueError(f"`{seq_name}` contains duplicate tags: {duplicates}.")
[docs] def _build_tag_to_columns_map(tag_sizes: Sequence[tuple[str, int]]) -> dict[str, list[int]]: """Lay tags out in order along the flat vector, each occupying ``size`` consecutive columns. :param tag_sizes: ``(tag, dimension)`` pairs in flat-vector order. :return: a mapping from tag to its flat-vector column indices. """ tag_to_columns: dict[str, list[int]] = {} cursor = 0 for tag, size in tag_sizes: tag_to_columns[tag] = list(range(cursor, cursor + size)) cursor += size return tag_to_columns
@dataclass(frozen=True)
[docs] class HierarchyNode: """Tag-based specification of one node in a :class:`HierarchicalSearchSpace` hierarchy. A node owns a set of non-indicator (feature) subspaces, referenced by tag, and is active only when its gating indicators take the required values (also by tag). References are by tag; the owning :class:`HierarchicalSearchSpace` resolves them to the flat-vector column layout (and to a :class:`gpflow.kernels.HierarchyNode`) internally, so ``subspaces`` is supplied once -- to the space -- rather than to every node. :param name: Human-readable label for the node (also the kernel-association identifier). :param subspace_tags: Tags of the non-indicator subspaces owned by this node. Each must be a key of the space's ``subspaces`` and define numerical bounds (e.g. a :class:`Box`). Must be non-empty and contain no duplicates. :param activity_condition_tags: ``{indicator_tag: required_value}`` gating this node; empty (the default) means the node is unconditionally active. """ name: str subspace_tags: Sequence[str] activity_condition_tags: Mapping[str, int] = field(default_factory=dict)
[docs] def _resolve_node( node: HierarchyNode, subspaces: Mapping[str, SearchSpace], tag_to_columns: Mapping[str, Sequence[int]], ) -> gpflow.kernels.HierarchyNode: """Resolve a tag-based :class:`HierarchyNode` to the gpflow column-based node. The GPflow type is keyed on integer column indices (``feature_dims``) and an :class:`ActivityCondition` whose keys are also flat-vector columns, in the layout given by ``tag_to_columns`` (one column per dimension of each subspace, in ``subspaces`` order). Values are coerced to ``int`` so K-ary categorical category indices survive without bool-coercion. GPflow performs node-local validation downstream. """ if not node.subspace_tags: raise ValueError( f"node '{node.name}': `subspace_tags` must be non-empty; every node must own at " f"least one (feature) subspace." ) _reject_duplicate_tags(f"node '{node.name}' subspace_tags", node.subspace_tags) feature_dims: list[int] = [] bounds_rows: list[tuple[float, float]] = [] for stag in node.subspace_tags: if stag not in tag_to_columns: raise ValueError( f"subspace_tag '{stag}' is not a key of `subspaces` {list(tag_to_columns)}." ) sub = subspaces[stag] # Non-indicator subspaces are encoded as (lower, upper) feature bounds, so they must # expose numerical bounds. Box and the discrete spaces qualify; categorical subspaces # (``has_bounds`` is False) are rejected here with a clear message. if not sub.has_bounds: raise ValueError( f"subspace_tag '{stag}' refers to a {type(sub).__name__} without numerical " f"bounds; non-indicator subspaces must define bounds (e.g. a Box or a discrete " f"space)." ) lower = tf.cast(sub.lower, dtype=tf.float64).numpy().tolist() upper = tf.cast(sub.upper, dtype=tf.float64).numpy().tolist() for col, lo, hi in zip(tag_to_columns[stag], lower, upper): feature_dims.append(col) bounds_rows.append((float(lo), float(hi))) requirements: dict[int, int] = {} for ind_tag, required in node.activity_condition_tags.items(): if ind_tag not in tag_to_columns: raise ValueError( f"activity_condition_tags key '{ind_tag}' is not a key of `subspaces` " f"{list(tag_to_columns)}." ) if len(tag_to_columns[ind_tag]) != 1: raise ValueError( f"Indicator '{ind_tag}' must be 1-dimensional, got " f"{len(tag_to_columns[ind_tag])} columns." ) requirements[tag_to_columns[ind_tag][0]] = int(required) return gpflow.kernels.HierarchyNode( name=node.name, feature_dims=feature_dims, feature_bounds=tf.constant(bounds_rows, dtype=tf.float64), activity_condition=ActivityCondition(requirements=requirements), )
[docs] INACTIVE_CONSTRAINT_RESIDUAL: float = 1e10
"""Large positive residual returned for an inactive :class:`ConditionalConstraint`, so the point is trivially feasible with respect to that constraint. This is inspired by the "big-M" MI(N)LP reformulation of a generalized disjunctive program: allocating feasible-with-huge- margin on the inactive branch.""" @dataclass(frozen=True)
[docs] class ConditionalConstraint: r"""A disjunctive constraint :math:`h(x) \leq 0` enforced only when a set of indicator variables take specified values. When the ``indicator_conditions`` all hold for a point, the wrapped ``constraint`` is evaluated on the slice of that point identified by ``active_subspace_tags`` (concatenated in the given order). When they do not hold, the residual is set to :data:`INACTIVE_CONSTRAINT_RESIDUAL`, making the point feasible with respect to this constraint. References are by tag, so a conditional constraint survives :meth:`HierarchicalSearchSpace.product` unchanged. :param constraint: The underlying :class:`LinearConstraint` / :class:`NonlinearConstraint`, defined on the concatenated ``active_subspace_tags`` slice. :param indicator_conditions: ``{indicator_tag: required_value}`` gating the constraint. :param active_subspace_tags: Non-indicator subspace tags whose columns the constraint reads. """ constraint: Constraint indicator_conditions: Mapping[str, int] active_subspace_tags: Sequence[str]
[docs] def residual(self, points: TensorType, space: HierarchicalSearchSpace) -> TensorType: """Constraint residuals, big-M where the indicator conditions are not met. :param points: Points in the flat-vector representation, shape ``[..., D]``. :param space: The owning :class:`HierarchicalSearchSpace` (provides tag slicing). :return: Residuals with shape ``[..., C]`` matching the wrapped constraint. """ active = tf.ones(tf.shape(points)[:-1], dtype=tf.bool) for ind_tag, required in self.indicator_conditions.items(): ind_vals = space.get_subspace_component(ind_tag, points) # [..., 1] target = tf.cast(int(required), ind_vals.dtype) active = active & (tf.abs(ind_vals[..., 0] - target) < 0.5) sub_points = tf.concat( [space.get_subspace_component(tag, points) for tag in self.active_subspace_tags], axis=-1, ) real_residual = self.constraint.residual(sub_points) # [..., C] inactive_residual = tf.fill( tf.shape(real_residual), tf.constant(INACTIVE_CONSTRAINT_RESIDUAL, dtype=real_residual.dtype), ) mask = tf.broadcast_to(active[..., None], tf.shape(real_residual)) return tf.where(mask, real_residual, inactive_residual)
@dataclass(frozen=True)
[docs] class LogicalProposition: r"""A feasibility constraint on the indicator variables alone, :math:`\Omega(Y)`. ``fun`` receives ``{indicator_tag: values}`` where each value has shape ``[..., 1]`` (values in the indicator's permitted set) and must return a boolean tensor of shape ``[...]``. Logical propositions are checked by :meth:`HierarchicalSearchSpace.is_feasible` but are **not** included in :meth:`HierarchicalSearchSpace.constraints_residuals`, as they have no useful gradient for continuous optimizers. They reference indicators by tag, so they survive :meth:`HierarchicalSearchSpace.product` unchanged. :param fun: Maps ``{indicator_tag: [..., 1]}`` to a boolean feasibility tensor ``[...]``. :param name: Optional human-readable label. """ fun: Callable[[Mapping[str, TensorType]], TensorType] name: str = "" def __repr__(self) -> str: return f"{self.__class__.__name__}(name={self.name!r})"
[docs] class HierarchicalSearchSpace(CollectionSearchSpace): r""" A :class:`SearchSpace` that extends :class:`CollectionSearchSpace` with a hierarchy specification describing conditional activation of subspaces. The hierarchy is described by a sequence of tag-based :class:`HierarchyNode` objects (each owning ``subspace_tags`` and gated by ``activity_condition_tags``); the space resolves them to gpflow's column-based representation internally (see :meth:`to_gpflow_hierarchy`). Variables fall into three roles: - **Indicators** (:class:`BooleanSearchSpace` or dimension-1 :class:`CategoricalSearchSpace`), inferred by role as the subspaces referenced as ``activity_condition_tags`` keys in the hierarchy. Boolean indicators take values in :math:`\{0, 1\}`, ``K``-ary categorical indicators in :math:`\{0, \ldots, K-1\}`. Indicators are always unconditional; they must not appear in any node's ``subspace_tags``. - **Unconditional variables** owned by a node with empty ``activity_condition_tags``. Always active. - **Conditional variables** owned by a node with non-empty ``activity_condition_tags``. Active only when the referenced indicators take the required values. Points are represented as flat vectors concatenated in the iteration order of ``subspaces`` (a ``dict`` preserves insertion order in Python 3.7+), the same convention as :class:`TaggedProductSearchSpace`. .. note:: ``sample`` and ``contains`` operate on the full flat vector and do **not** enforce "active-branch" semantics: every subspace always contributes its columns regardless of the indicator values, and inactive-branch columns are neither masked nor ignored. This matches the flat-vector convention expected by the hierarchical GP kernel, which reads the indicator columns itself to decide which features are active. .. note:: Non-indicator subspaces must define numerical bounds (e.g. :class:`Box`). Categorical (non-indicator) subspaces are not supported, since the hierarchy encodes each non-indicator dimension as a ``(lower, upper)`` bound and there is no one-hot handling here; constructing such a space raises ``ValueError``. Example:: subspaces = { "x1": Box([0.0], [1.0]), "y1": BooleanSearchSpace(), "x2": Box([0.0], [5.0]), "x4": Box([-2.0], [2.0]), "x3": Box([-1.0], [1.0]), } hierarchy = [ HierarchyNode("shared", subspace_tags=["x1"]), HierarchyNode("branch_A", subspace_tags=["x2", "x4"], activity_condition_tags={"y1": 1}), HierarchyNode("branch_B", subspace_tags=["x3"], activity_condition_tags={"y1": 0}), ] space = HierarchicalSearchSpace(subspaces, hierarchy) """ def __init__( self, subspaces: Mapping[str, SearchSpace], hierarchy: Sequence[HierarchyNode], constraints: Optional[Sequence[Constraint]] = None, conditional_constraints: Sequence[ConditionalConstraint] = (), logical_propositions: Sequence[LogicalProposition] = (), ctol: float | TensorType = 1e-7, ) -> None: """ :param subspaces: Tag-keyed mapping of subspaces; iteration order defines the flat-vector layout. :param hierarchy: A sequence of tag-based :class:`HierarchyNode` objects defining the conditional structure. Every non-indicator subspace tag must appear in at least one node's ``subspace_tags``. Indicators are inferred by role: a subspace referenced as an ``activity_condition_tags`` key is an indicator and must be a :class:`BooleanSearchSpace` or a dimension-1 :class:`CategoricalSearchSpace`. :param constraints: Optional explicit (global) constraints enforced on the full flat-vector representation, following the same ``constraints`` contract as :class:`Box`. Consumed by :meth:`constraints_residuals` and :meth:`is_feasible`, so a constrained space plugs into the usual Bayesian optimization machinery unchanged. :param conditional_constraints: Disjunctive constraints, each enforced only when its indicator conditions hold (see :class:`ConditionalConstraint`); inactive rows contribute :data:`INACTIVE_CONSTRAINT_RESIDUAL`. :param logical_propositions: Indicator-only feasibility constraints (see :class:`LogicalProposition`); applied in :meth:`is_feasible` but excluded from :meth:`constraints_residuals` (no continuous gradient). :param ctol: Tolerance used by :meth:`is_feasible` when checking that residuals are non-negative. :raises ValueError: If any validation rule is violated. """ subspaces = dict(subspaces) # decouple from caller, preserve order super().__init__(list(subspaces.values()), list(subspaces.keys())) self._hierarchy = tuple(hierarchy) self._indicator_value_sets: dict[str, tuple[int, ...]] = {} self._constraints: Sequence[Constraint] = [] if constraints is None else list(constraints) self._conditional_constraints: tuple[ConditionalConstraint, ...] = tuple( conditional_constraints ) self._logical_propositions: tuple[LogicalProposition, ...] = tuple(logical_propositions) self._ctol = ctol subspace_sizes = self.subspace_dimension self._subspace_sizes_by_tag: dict[str, TensorType] = dict(zip(self._tags, subspace_sizes)) self._subspace_starting_indices: dict[str, TensorType] = dict( zip(self._tags, tf.cumsum(subspace_sizes, exclusive=True)) ) self._dimension = tf.cast(tf.reduce_sum(subspace_sizes), dtype=tf.int32) # Pre-compute a flat tag -> [column indices] map and its inverse # (column index -> owning tag) for resolving tag-based nodes and for the # activity/query methods. self._tag_to_columns = _build_tag_to_columns_map( [(tag, int(self._subspace_sizes_by_tag[tag])) for tag in self._tags] ) self._column_to_tag = { col: tag for tag, cols in self._tag_to_columns.items() for col in cols } self._total_columns = sum(len(cols) for cols in self._tag_to_columns.values()) # Resolve the tag-based nodes to gpflow's column-based representation, used by the # kernel (:meth:`to_gpflow_hierarchy`) and by column-level validation. ``subspaces`` # is needed only here. self._gpflow_hierarchy: tuple[gpflow.kernels.HierarchyNode, ...] = tuple( _resolve_node(node, subspaces, self._tag_to_columns) for node in self._hierarchy ) # Infer the indicator tags by role: an indicator is a subspace referenced as an # ``activity_condition`` key somewhere in the hierarchy (kept in subspace order). referenced_tags = {tag for node in self._hierarchy for tag in node.activity_condition_tags} self._indicator_tags = tuple(tag for tag in self._tags if tag in referenced_tags) self._validate() def _validate(self) -> None: # Node-shape and tag-existence checks already ran during resolution to the gpflow # representation (see ``_resolve_node``): every ``subspace_tags`` / ``activity_condition`` # key exists, is bounded where required, and indicators are single-column. This method # adds the space-level semantics. Indicators are inferred (by role) from the unique # subspace tags, so they cannot be duplicated or unknown. # Indicators must reference a BooleanSearchSpace or a 1-D CategoricalSearchSpace; # record each indicator's permitted value set. for itag in self._indicator_tags: sub = self.get_subspace(itag) if isinstance(sub, BooleanSearchSpace): self._indicator_value_sets[itag] = (0, 1) elif isinstance(sub, CategoricalSearchSpace): if len(sub.tags) != 1: raise ValueError( f"Indicator tag '{itag}' must reference a dimension-1 " f"CategoricalSearchSpace, got a categorical of dimension " f"{len(sub.tags)}." ) self._indicator_value_sets[itag] = tuple(range(len(sub.tags[0]))) else: raise ValueError( f"Indicator tag '{itag}' must reference a BooleanSearchSpace or a " f"dimension-1 CategoricalSearchSpace, got {type(sub).__name__}." ) # Non-indicator subspaces must expose numerical bounds: the hierarchy # encodes each as a (lower, upper) ``feature_bounds`` row, which requires # ``lower``/``upper`` to be defined. Categorical (non-indicator) subspaces # have no numerical bounds (``has_bounds`` is False) and no one-hot # handling here, so reject them with a clear error rather than letting # ``sub.lower``/``sub.upper`` raise an opaque ``AttributeError`` later. for ntag in self.non_indicator_tags: if not self.get_subspace(ntag).has_bounds: raise ValueError( f"Non-indicator subspace '{ntag}' " f"({type(self.get_subspace(ntag)).__name__}) has no numerical " f"bounds and is not supported in a HierarchicalSearchSpace. " f"Non-indicator subspaces must define numerical bounds (e.g. Box)." ) all_tags = set(self.subspace_tags) indicator_set = set(self._indicator_tags) for node in self._hierarchy: # A tag cannot be both an owned feature and an indicator. owned_indicators = [t for t in node.subspace_tags if t in indicator_set] if owned_indicators: raise ValueError( f"HierarchyNode '{node.name}' lists indicator tag(s) {owned_indicators} in " f"`subspace_tags`; an indicator cannot be owned as a feature." ) # Each required value must be in the gating indicator's permitted set. for ind_tag, required in node.activity_condition_tags.items(): permitted = self._indicator_value_sets[ind_tag] if int(required) not in permitted: raise ValueError( f"HierarchyNode '{node.name}' requires value {required!r} for indicator " f"'{ind_tag}', which is not in its permitted set {list(permitted)}." ) # Every non-indicator subspace must be owned by some node. covered = {tag for node in self._hierarchy for tag in node.subspace_tags} orphans = sorted(set(self.non_indicator_tags) - covered) if orphans: raise ValueError( f"Non-indicator subspace tags {orphans} are orphaned: they do not appear in " f"any HierarchyNode.subspace_tags." ) # Conditional constraints: indicator conditions must reference declared indicators # with permitted values, and active_subspace_tags must be existing non-indicators. for i, cc in enumerate(self._conditional_constraints): for ind_tag, required in cc.indicator_conditions.items(): if ind_tag not in indicator_set: raise ValueError( f"conditional_constraints[{i}] references indicator '{ind_tag}', " f"which is not a declared indicator tag {sorted(indicator_set)}." ) permitted = self._indicator_value_sets[ind_tag] if int(required) not in permitted: raise ValueError( f"conditional_constraints[{i}] requires indicator '{ind_tag}' = " f"{required!r}, which is not in its permitted set {list(permitted)}." ) for tag in cc.active_subspace_tags: if tag not in all_tags: raise ValueError( f"conditional_constraints[{i}] active_subspace_tag '{tag}' is not a " f"key of `subspaces` {sorted(all_tags)}." ) if tag in indicator_set: raise ValueError( f"conditional_constraints[{i}] active_subspace_tag '{tag}' is an " f"indicator; constraints operate on the non-indicator slice only." )
[docs] def __eq__(self, other: object) -> bool: """ :param other: A search space. :return: Whether the search space is identical to this one, including its hierarchy and indicator tags (the base-class comparison only covers the subspaces and their tags). """ if not isinstance(other, HierarchicalSearchSpace): return NotImplemented if not super().__eq__(other): return False return ( self._indicator_tags == other._indicator_tags and self._constraints == other._constraints and self._conditional_constraints == other._conditional_constraints and self._logical_propositions == other._logical_propositions and self._hierarchy == other._hierarchy )
def __repr__(self) -> str: return ( f"{self.__class__.__name__}(" f"spaces={[self.get_subspace(tag) for tag in self.subspace_tags]}, " f"tags={list(self.subspace_tags)}, " f"hierarchy={list(self._hierarchy)}, " f"indicator_tags={list(self._indicator_tags)}, " f"constraints={self._constraints!r}, " f"conditional_constraints={self._conditional_constraints!r}, " f"logical_propositions={self._logical_propositions!r}, " f"ctol={self._ctol!r})" ) @property
[docs] def hierarchy(self) -> tuple[HierarchyNode, ...]: """The tag-based hierarchy specification (the :class:`HierarchyNode`\\ s as supplied).""" return self._hierarchy
[docs] def to_gpflow_hierarchy(self) -> list[gpflow.kernels.HierarchyNode]: """The hierarchy resolved to gpflow's column-based :class:`gpflow.kernels.HierarchyNode`\\ s, ready to build a hierarchical kernel, e.g. ``ArcHierarchical(space.to_gpflow_hierarchy(), active_dims=...)``.""" return list(self._gpflow_hierarchy)
@property
[docs] def indicator_tags(self) -> tuple[str, ...]: """The declared indicator tags (Boolean or categorical).""" return self._indicator_tags
@property
[docs] def indicator_dims(self) -> list[int]: """Flat-vector column indices corresponding to ``indicator_tags``, in declared order. These are the columns referenced by the keys of each node's ``activity_condition.requirements`` (gpflow's convention). Each indicator subspace contributes exactly one column (dimension 1). """ return [self._tag_to_columns[t][0] for t in self._indicator_tags]
@property
[docs] def indicator_value_sets(self) -> dict[str, tuple[int, ...]]: """The permitted integer-valued set for each indicator tag, as built during validation. ``(0, 1)`` for a :class:`BooleanSearchSpace` indicator and ``(0, ..., K-1)`` for a ``K``-ary :class:`CategoricalSearchSpace` indicator.""" return dict(self._indicator_value_sets)
@property
[docs] def non_indicator_tags(self) -> tuple[str, ...]: """All subspace tags that are not indicators, in tag order.""" indicator_set = set(self._indicator_tags) return tuple(t for t in self.subspace_tags if t not in indicator_set)
@property
[docs] def constraints(self) -> Sequence[Constraint]: """The explicit (global) constraints enforced on the full flat-vector representation.""" return self._constraints
@property
[docs] def ctol(self) -> float | TensorType: """The tolerance applied when checking the explicit constraints.""" return self._ctol
@property
[docs] def conditional_constraints(self) -> tuple[ConditionalConstraint, ...]: """The disjunctive constraints gated by indicator conditions.""" return self._conditional_constraints
@property
[docs] def logical_propositions(self) -> tuple[LogicalProposition, ...]: """The indicator-only feasibility constraints.""" return self._logical_propositions
@property
[docs] def has_constraints(self) -> bool: """``True`` if any global, conditional, or logical constraints are present.""" return bool( self._constraints or self._conditional_constraints or self._logical_propositions )
[docs] def constraints_residuals(self, points: TensorType) -> TensorType: """ Return residuals for all global and conditional constraints in this search space. Global constraints are evaluated on the full flat vector; conditional constraints return :data:`INACTIVE_CONSTRAINT_RESIDUAL` on rows where their indicator conditions do not hold. Logical propositions are excluded (no continuous gradient); use :meth:`is_feasible` to account for them. :param points: The points to get the residuals for, with shape ``[..., D]``. :return: A tensor of all the residuals with shape ``[..., C]``, where ``C`` is the total number of constraint residual columns. :raise NotImplementedError: If this search space has no global or conditional constraints. """ residuals = [constraint.residual(points) for constraint in self._constraints] residuals += [cc.residual(points, self) for cc in self._conditional_constraints] if not residuals: raise NotImplementedError( "No constraints to compute residuals for in this search space." ) return tf.concat(residuals, axis=-1)
[docs] def is_feasible(self, points: TensorType) -> TensorType: """ Check whether points satisfy all constraints of this search space — global and conditional (via residuals) and logical propositions. Note that membership of the space (the conditional activity structure) is not checked here. :param points: The points to check, with shape ``[..., D]``. :return: A boolean tensor of shape ``[...]``, ``True`` where the point is feasible. """ if self._constraints or self._conditional_constraints: feasible = tf.math.reduce_all( self.constraints_residuals(points) >= -self._ctol, axis=-1 ) else: feasible = tf.cast(tf.ones(tf.shape(points)[:-1]), dtype=bool) if self._logical_propositions: indicator_values = { tag: self.get_subspace_component(tag, points) for tag in self._indicator_tags } for proposition in self._logical_propositions: feasible = tf.logical_and(feasible, proposition.fun(indicator_values)) return feasible
@property @check_shapes("return: []")
[docs] def dimension(self) -> TensorType: """The total number of dimensions across all subspaces.""" return self._dimension
@property @check_shapes("return: [D]")
[docs] def lower(self) -> TensorType: """The lowest values taken by each dimension, concatenated across subspaces.""" lower_for_each = self.subspace_lower return ( tf.concat(lower_for_each, axis=-1) if lower_for_each else tf.constant([], dtype=DEFAULT_DTYPE) )
@property @check_shapes("return: [D]")
[docs] def upper(self) -> TensorType: """The highest values taken by each dimension, concatenated across subspaces.""" upper_for_each = self.subspace_upper return ( tf.concat(upper_for_each, axis=-1) if upper_for_each else tf.constant([], dtype=DEFAULT_DTYPE) )
@check_shapes("return: [num_samples, D]")
[docs] def sample(self, num_samples: int, seed: Optional[int] = None) -> TensorType: """Sample from the space by sampling each subspace and concatenating. Every subspace is sampled and contributes its columns; this does **not** enforce active-branch semantics (inactive-branch columns are still filled). See the class docstring. """ subspace_samples = self.subspace_sample(num_samples, seed) return tf.concat(subspace_samples, -1)
[docs] def _contains(self, value: TensorType) -> TensorType: # Membership requires every subspace component to be in bounds; this does # not enforce active-branch semantics (inactive-branch columns are still # checked against their subspace). See the class docstring. in_each_subspace = [ self._spaces[tag].contains(self.get_subspace_component(tag, value)) for tag in self._tags ] return tf.reduce_all(in_each_subspace, axis=0)
[docs] def get_subspace_component(self, tag: str, values: TensorType) -> TensorType: """ Extract the columns of ``values`` corresponding to a particular subspace. :param tag: The subspace tag. :param values: Points from this space, shape ``[N, D]``. :return: The sub-components, shape ``[N, D_sub]``. """ start = self._subspace_starting_indices[tag] end = start + self._subspace_sizes_by_tag[tag] return values[..., start:end]
[docs] def _node_subspace_tags(self, node: HierarchyNode) -> list[str]: """The (unique, ordered) non-indicator subspace tags a node owns.""" seen: list[str] = [] for tag in node.subspace_tags: if tag not in seen: seen.append(tag) return seen
[docs] def active_subspace_tags(self, indicator_config: Mapping[str, int]) -> list[str]: """ Return the non-indicator subspace tags that are active for a given indicator configuration. :param indicator_config: A mapping ``{indicator_tag: value}`` providing a value for every indicator of this space. Extra keys are ignored. :return: List of active non-indicator subspace tags. :raise ValueError: If ``indicator_config`` omits any indicator. """ self._require_complete_config(indicator_config) active: list[str] = [] for node in self._hierarchy: if self._node_is_active(node, indicator_config): for stag in self._node_subspace_tags(node): if stag not in active: active.append(stag) return active
[docs] def enumerate_tasks(self, feasible_only: bool = False) -> list[dict[str, int]]: """ Return indicator configurations as the Cartesian product of each indicator's permitted value set. Both Boolean and ``K``-ary categorical indicators contribute the integer values ``[0, 1, ..., K-1]`` (with ``K = 2`` for Boolean indicators); the total number of configurations equals :math:`\\prod_k |\\mathcal{C}_k|`. :param feasible_only: By default the full Cartesian product is returned, ignoring any :class:`LogicalProposition`\\ s. When ``True``, configurations that violate any logical proposition are dropped (global/conditional constraints, which involve the continuous variables, are not considered here). :return: A list of ``{indicator_tag: value}`` dictionaries, one per task. """ if not self._indicator_tags: return [{}] per_indicator_values: list[Sequence[int]] = [ list(self._indicator_value_sets[t]) for t in self._indicator_tags ] tasks = [ dict(zip(self._indicator_tags, combo)) for combo in itertools_product(*per_indicator_values) ] if feasible_only and self._logical_propositions: # Evaluate the propositions on all configs at once, reusing the ``{tag: [N, 1]}`` input # shape that ``is_feasible`` builds from the flat vector. indicator_values = { tag: tf.constant([[task[tag]] for task in tasks], dtype=DEFAULT_DTYPE) for tag in self._indicator_tags } feasible = tf.ones(len(tasks), dtype=bool) for proposition in self._logical_propositions: feasible = tf.logical_and(feasible, proposition.fun(indicator_values)) tasks = [task for task, ok in zip(tasks, feasible.numpy()) if ok] return tasks
[docs] def is_active(self, tag: str, indicator_config: Mapping[str, int]) -> bool: """ Check whether a non-indicator subspace is active for a given indicator configuration. :param tag: A non-indicator subspace tag. :param indicator_config: A mapping ``{indicator_tag: value}`` providing a value for every indicator of this space. Extra keys are ignored. :return: True if the subspace is active. :raise ValueError: If ``indicator_config`` omits any indicator. """ self._require_complete_config(indicator_config) for node in self._hierarchy: if tag in self._node_subspace_tags(node) and self._node_is_active( node, indicator_config ): return True return False
[docs] def node_for_subspace(self, tag: str) -> list[HierarchyNode]: """ Return the :class:`HierarchyNode` objects that own the given subspace tag. :param tag: A non-indicator subspace tag. :return: List of nodes containing this tag. """ return [node for node in self._hierarchy if tag in self._node_subspace_tags(node)]
[docs] def _require_complete_config(self, indicator_config: Mapping[str, int]) -> None: """Reject a partial configuration: ``indicator_config`` must provide a value for every indicator of this space (extra keys are ignored). Catches forgotten or mistyped indicator keys, which would otherwise be silently treated as unsatisfied.""" missing = [tag for tag in self._indicator_tags if tag not in indicator_config] if missing: raise ValueError( f"`indicator_config` must provide a value for every indicator " f"{list(self._indicator_tags)}; missing: {missing}." )
[docs] def product(self, other: HierarchicalSearchSpace) -> HierarchicalSearchSpace: """ Return a new :class:`HierarchicalSearchSpace` that is the combination of this space and ``other``. Tags in the two spaces must be disjoint. The (tag-based) hierarchy nodes, conditional constraints and logical propositions reference subspaces and indicators by tag, so they compose across the disjoint-tag product unchanged and are simply concatenated; the combined space re-resolves the nodes against the merged column layout. Global (positional) constraints are remapped via :func:`_embed_constraint`: ``self``'s keep columns ``[0, D_self)`` and ``other``'s shift to ``[D_self, D_self + D_other)``. The combined space takes the tighter (minimum) of the two operands' constraint tolerances. :param other: Another :class:`HierarchicalSearchSpace`. :return: The combined hierarchical space. :raises ValueError: If the two spaces share any tags. """ overlap = set(self.subspace_tags) & set(other.subspace_tags) if overlap: raise ValueError(f"Cannot combine spaces with overlapping tags: {overlap}") combined_subspaces: dict[str, SearchSpace] = { **{tag: self.get_subspace(tag) for tag in self.subspace_tags}, **{tag: other.get_subspace(tag) for tag in other.subspace_tags}, } offset = int(self.dimension) combined_dim = int(self.dimension) + int(other.dimension) constraints = [ _embed_constraint(c, 0, int(self.dimension), combined_dim) for c in self._constraints ] + [ _embed_constraint(c, offset, int(other.dimension), combined_dim) for c in other._constraints ] # Tag-based nodes compose unchanged across disjoint tags; the new space re-resolves # them to the merged column layout. hierarchy = list(self.hierarchy) + list(other.hierarchy) conditional_constraints = list(self._conditional_constraints) + list( other._conditional_constraints ) logical_propositions = list(self._logical_propositions) + list(other._logical_propositions) return HierarchicalSearchSpace( combined_subspaces, hierarchy, constraints=constraints, conditional_constraints=conditional_constraints, logical_propositions=logical_propositions, ctol=min(self.ctol, other.ctol), )
[docs] def _node_is_active(self, node: HierarchyNode, indicator_config: Mapping[str, int]) -> bool: """Check whether a node's ``activity_condition_tags`` are all satisfied by ``indicator_config`` (both keyed by indicator tag). Uses integer equality so that K-ary categorical indicators are compared exactly rather than via Boolean truthiness. Assumes a complete config (see :meth:`_require_complete_config`, enforced by the public callers).""" for ind_tag, required in node.activity_condition_tags.items(): if int(indicator_config[ind_tag]) != int(required): return False return True
[docs] class TaggedMultiSearchSpace(CollectionSearchSpace): r""" A :class:`SearchSpace` made up of a collection of multiple :class:`SearchSpace` subspaces, each with a unique tag. All subspaces must have the same dimensionality. Each subspace is treated as an independent space and not combined in any way. This class provides functionality for accessing all the subspaces at once by using the usual search space methods, as well as for accessing individual subspaces. When accessing all subspaces at once from this class (e.g. `lower()`, `upper()`, `sample()`), the returned tensors have an extra dimension corresponding to the subspaces. This class can be useful to represent a collection of search spaces that do not interact with each other. For example, it is used to implement batch trust region rules in the :class:`BatchTrustRegion` class. """ def __init__(self, spaces: Sequence[SearchSpace], tags: Optional[Sequence[str]] = None): r""" Build a :class:`TaggedMultiSearchSpace` from a list ``spaces`` of other spaces. If ``tags`` are provided then they form the identifiers of the subspaces, otherwise the subspaces are labelled numerically. :param spaces: A sequence of :class:`SearchSpace` objects representing the space's subspaces :param tags: An optional list of tags giving the unique identifiers of the space's subspaces. :raise ValueError (or tf.errors.InvalidArgumentError): If ``spaces`` has a different length to ``tags`` when ``tags`` is provided or if ``tags`` contains duplicates. :raise ValueError (or tf.errors.InvalidArgumentError): If ``spaces`` has a different dimension to each other. """ # At least one subspace is required. tf.debugging.assert_greater( len(spaces), 0, message=f""" At least one subspace is required but received {len(spaces)}. """, ) tf.debugging.assert_equal( len({int(space.dimension) for space in spaces}), 1, message=f""" All subspaces must have the same dimension but received {[int(space.dimension) for space in spaces]}. """, ) super().__init__(spaces, tags) @property @check_shapes("return: [V, D]")
[docs] def lower(self) -> TensorType: """Returns the stacked lower bounds of all the subspaces. :return: The lower bounds of shape [V, D], where V is the number of subspaces and D is the dimensionality of each subspace. """ return tf.stack(self.subspace_lower, axis=0)
@property @check_shapes("return: [V, D]")
[docs] def upper(self) -> TensorType: """Returns the stacked upper bounds of all the subspaces. :return: The upper bounds of shape [V, D], where V is the number of subspaces and D is the dimensionality of each subspace. """ return tf.stack(self.subspace_upper, axis=0)
@property @check_shapes("return: []")
[docs] def dimension(self) -> TensorType: """The number of inputs in this search space.""" return self.get_subspace(self.subspace_tags[0]).dimension
@check_shapes("return: [num_samples, V, D]")
[docs] def sample(self, num_samples: int, seed: Optional[int] = None) -> TensorType: """ Sample randomly from the space by sampling from each subspace and returning the resulting samples stacked along the second axis in the same order as specified when initializing the space. :param num_samples: The number of points to sample from each subspace. :param seed: Optional tf.random seed. :return: ``num_samples`` i.i.d. random points, sampled uniformly, from each search subspace with shape '[num_samples, V, D]' , where V is the number of subspaces and D is the search space dimension. """ return tf.stack(self.subspace_sample(num_samples, seed), axis=1)
[docs] def _contains(self, value: TensorType) -> TensorType: """ Return `True` if ``value`` is a member of this search space, else `False`. A point is a member if it is a member of any of the subspaces. :param value: A point or points to check for membership of this :class:`SearchSpace`. :return: A boolean array showing membership for each point in value. """ return tf.reduce_any( [self.get_subspace(tag)._contains(value) for tag in self.subspace_tags], axis=0 )
[docs] def product(self, other: TaggedMultiSearchSpace) -> TaggedMultiSearchSpace: r""" Return a bigger collection of two :class:`TaggedMultiSearchSpace`\ s, regenerating the tags. :param other: A search space of the same type as this search space. :return: The product of this search space with the ``other``. """ return TaggedMultiSearchSpace( spaces=tuple(self._spaces.values()) + tuple(other._spaces.values()) )
[docs] def discretize(self, num_samples: int) -> DiscreteSearchSpace: """ :param num_samples: The number of points in the :class:`DiscreteSearchSpace`. :return: A discrete search space consisting of ``num_samples`` points sampled uniformly from this search space. :raise NotImplementedError: If this :class:`SearchSpace` has constraints. """ if self.has_constraints: # Constraints are not supported. raise NotImplementedError( "Discretization is currently not supported in the presence of constraints." ) samples = self.sample(num_samples) # Sample num_samples points from each subspace. samples = tf.reshape(samples, [-1, self.dimension]) # Flatten the samples across subspaces. samples = tf.random.shuffle(samples)[:num_samples] # Randomly pick num_samples points. return DiscreteSearchSpace(points=samples)