Source code for trieste.models.gpflux.models

# Copyright 2021 The Trieste Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing import Any, Callable, Mapping, Optional

import dill
import gpflow
import tensorflow as tf
from gpflow.inducing_variables import InducingPoints
from gpflow.keras import tf_keras
from gpflux.layers import GPLayer, LatentVariableLayer
from gpflux.models import DeepGP
from tensorflow.python.keras.callbacks import Callback

from ... import logging
from ...data import Dataset
from ...space import EncoderFunction
from ...types import TensorType
from ..interfaces import (
    EncodedTrainableProbabilisticModel,
    HasReparamSampler,
    HasTrajectorySampler,
    ReparametrizationSampler,
    TrajectorySampler,
)
from ..optimizer import KerasOptimizer
from ..utils import (
    write_summary_data_based_metrics,
    write_summary_kernel_parameters,
    write_summary_likelihood_parameters,
)
from .interface import GPfluxPredictor
from .sampler import (
    DeepGaussianProcessDecoupledTrajectorySampler,
    DeepGaussianProcessReparamSampler,
)


[docs] class DeepGaussianProcess( GPfluxPredictor, EncodedTrainableProbabilisticModel, HasReparamSampler, HasTrajectorySampler ): """ A :class:`TrainableProbabilisticModel` wrapper for a GPflux :class:`~gpflux.models.DeepGP` with :class:`GPLayer` or :class:`LatentVariableLayer`: this class does not support e.g. keras layers. We provide simple architectures that can be used with this class in the `architectures.py` file. """ def __init__( self, model: DeepGP | Callable[[], DeepGP], optimizer: KerasOptimizer | None = None, num_rff_features: int = 1000, continuous_optimisation: bool = True, compile_args: Optional[Mapping[str, Any]] = None, encoder: EncoderFunction | None = None, ): """ :param model: The underlying GPflux deep Gaussian process model. Passing in a named closure rather than a model can help when copying or serialising. :param optimizer: The optimizer wrapper with necessary specifications for compiling and training the model. Defaults to :class:`~trieste.models.optimizer.KerasOptimizer` with :class:`~tf.optimizers.Adam` optimizer, mean squared error metric and a dictionary of default arguments for the Keras `fit` method: 400 epochs, batch size of 1000, and verbose 0. A custom callback that reduces the optimizer learning rate is used as well. See https://keras.io/api/models/model_training_apis/#fit-method for a list of possible arguments. :param num_rff_features: The number of random Fourier features used to approximate the kernel when calling :meth:`trajectory_sampler`. We use a default of 1000 as it typically performs well for a wide range of kernels. Note that very smooth kernels (e.g. RBF) can be well-approximated with fewer features. :param continuous_optimisation: if True (default), the optimizer will keep track of the number of epochs across BO iterations and use this number as initial_epoch. This is essential to allow monitoring of model training across BO iterations. :param compile_args: Keyword arguments to pass to the ``compile`` method of the Keras model (:class:`~tf.keras.Model`). See https://keras.io/api/models/model_training_apis/#compile-method for a list of possible arguments. The ``optimizer`` and ``metrics`` arguments must not be included. :param encoder: Optional encoder with which to transform query points before generating predictions. :raise ValueError: If ``model`` has unsupported layers, ``num_rff_features`` is less than 0, if the ``optimizer`` is not of a supported type, or `compile_args` contains disallowed arguments. """ if compile_args is None: compile_args = {} if not {"optimizer", "metrics"}.isdisjoint(compile_args): raise ValueError( "optimizer and metrics arguments must not be included in compile_args." ) if isinstance(model, DeepGP): self._model_closure = None else: self._model_closure = model model = model() for layer in model.f_layers: if not isinstance(layer, (GPLayer, LatentVariableLayer)): raise ValueError( f"`DeepGaussianProcess` can only be built out of `GPLayer` or" f"`LatentVariableLayer`, received {type(layer)} instead." ) super().__init__(optimizer, encoder) if num_rff_features <= 0: raise ValueError( f"num_rff_features must be greater or equal to zero, got {num_rff_features}." ) self._num_rff_features = num_rff_features if not isinstance(self.optimizer.optimizer, tf_keras.optimizers.Optimizer): raise ValueError( f"Optimizer for `DeepGaussianProcess` must be an instance of a " f"`tf.optimizers.Optimizer` or `tf.keras.optimizers.Optimizer`, " f"received {type(self.optimizer.optimizer)} instead." ) if not isinstance( self.optimizer.optimizer.lr, tf_keras.optimizers.schedules.LearningRateSchedule ): self.original_lr = self.optimizer.optimizer.lr.numpy() epochs = 400 def scheduler(epoch: int, lr: float) -> float: if epoch == epochs // 2: return lr * 0.1 else: return lr if not self.optimizer.fit_args: self.optimizer.fit_args = { "verbose": 0, "epochs": epochs, "batch_size": 1000, "callbacks": [tf_keras.callbacks.LearningRateScheduler(scheduler)], } if self.optimizer.metrics is None: self.optimizer.metrics = ["mse"] self._model_gpflux = model # inputs and targets need to be redone with a float64 dtype to avoid setting the keras # backend to float64, this is likely to be fixed in GPflux, see issue: # https://github.com/secondmind-labs/GPflux/issues/76 self._model_gpflux.inputs = tf_keras.Input( tuple(self._model_gpflux.inputs.shape[:-1]), name=self._model_gpflux.inputs.name, dtype=tf.float64, ) self._model_gpflux.targets = tf_keras.Input( tuple(self._model_gpflux.targets.shape[:-1]), name=self._model_gpflux.targets.name, dtype=tf.float64, ) self._model_keras = model.as_training_model() self._model_keras.compile( optimizer=self.optimizer.optimizer, metrics=self.optimizer.metrics, **compile_args ) self._absolute_epochs = 0 self._continuous_optimisation = continuous_optimisation def __getstate__(self) -> dict[str, Any]: state = self.__dict__.copy() # when using a model closure, store the model parameters, not the model itself if self._model_closure is not None: state["_model_gpflux"] = gpflow.utilities.parameter_dict(self._model_gpflux) state["_model_keras"] = gpflow.utilities.parameter_dict(self._model_keras) # use to_json and get_weights to save any optimizer fit_arg callback models callbacks: list[Callback] = self._optimizer.fit_args.get("callbacks", []) callback: Callback saved_models: list[KerasOptimizer] = [] tensorboard_writers: list[dict[str, Any]] = [] try: for callback in callbacks: # serialize the callback models before pickling the optimizer saved_models.append(callback.model) if callback.model is self._model_keras: # no need to serialize the main model, just use a special value instead callback.model = ... elif callback.model: callback.model = (callback.model.to_json(), callback.model.get_weights()) # don't pickle tensorboard writers either; they'll be recreated when needed if isinstance(callback, tf_keras.callbacks.TensorBoard): tensorboard_writers.append(callback._writers) callback._writers = {} state["_optimizer"] = dill.dumps(state["_optimizer"]) except Exception as e: raise NotImplementedError( "Failed to copy DeepGaussianProcess optimizer due to unsupported callbacks." ) from e finally: # revert original state, even if the pickling failed for callback, model in zip(self._optimizer.fit_args.get("callbacks", []), saved_models): callback.model = model for callback, writers in zip( (cb for cb in callbacks if isinstance(cb, tf_keras.callbacks.TensorBoard)), tensorboard_writers, ): callback._writers = writers # do the same thing for the history callback if self._model_keras.history: history_model = self._model_keras.history.model try: if history_model is self._model_keras: # no need to serialize the main model, just use a special value instead self._model_keras.history.model = ... elif history_model: self._model_keras.history.model = ( history_model.to_json(), history_model.get_weights(), ) state["_history"] = dill.dumps(self._model_keras.history) finally: self._model_keras.history.model = history_model # don't try to serialize any other copies of the history callback if isinstance(state.get("_last_optimization_result"), tf_keras.callbacks.History): state["_last_optimization_result"] = ... return state def __setstate__(self, state: dict[str, Any]) -> None: self.__dict__.update(state) # regenerate the models using the model closure if self._model_closure is not None: dgp: DeepGP = state["_model_closure"]() self._model_gpflux = dgp # inputs and targets need to be redone with a float64 dtype to avoid setting the keras # backend to float64, this is likely to be fixed in GPflux, see issue: # https://github.com/secondmind-labs/GPflux/issues/76 self._model_gpflux.inputs = tf_keras.Input( tuple(self._model_gpflux.inputs.shape[:-1]), name=self._model_gpflux.inputs.name, dtype=tf.float64, ) self._model_gpflux.targets = tf_keras.Input( tuple(self._model_gpflux.targets.shape[:-1]), name=self._model_gpflux.targets.name, dtype=tf.float64, ) self._model_keras = dgp.as_training_model() # unpickle the optimizer, and restore all the callback models self._optimizer = dill.loads(self._optimizer) for callback in self._optimizer.fit_args.get("callbacks", []): if callback.model is ...: callback.set_model(self._model_keras) elif callback.model: model_json, weights = callback.model model = tf_keras.models.model_from_json(model_json) model.set_weights(weights) callback.set_model(model) # recompile the model self._model_keras.compile(self._optimizer.optimizer) # assign the model parameters if self._model_closure is not None: gpflow.utilities.multiple_assign(self._model_gpflux, state["_model_gpflux"]) gpflow.utilities.multiple_assign(self._model_keras, state["_model_keras"]) # restore the history (including any model it contains) if "_history" in state: self._model_keras.history = dill.loads(state["_history"]) if self._model_keras.history.model is ...: self._model_keras.history.set_model(self._model_keras) elif self._model_keras.history.model: model_json, weights = self._model_keras.history.model model = tf_keras.models.model_from_json(model_json) model.set_weights(weights) self._model_keras.history.set_model(model) # recover optimization result if necessary (and possible) if state.get("_last_optimization_result") is ...: self._last_optimization_result = getattr(self._model_keras, "history")
[docs] def __repr__(self) -> str: """""" return f"DeepGaussianProcess({self.model_gpflux!r}, {self.optimizer.optimizer!r})"
@property def model_gpflux(self) -> DeepGP: return self._model_gpflux @property def model_keras(self) -> tf_keras.Model: return self._model_keras
[docs] def sample_encoded(self, query_points: TensorType, num_samples: int) -> TensorType: trajectory = self.trajectory_sampler().get_trajectory() expanded_query_points = tf.expand_dims(query_points, -2) # [N, 1, D] tiled_query_points = tf.tile(expanded_query_points, [1, num_samples, 1]) # [N, S, D] return tf.transpose(trajectory(tiled_query_points), [1, 0, 2]) # [S, N, L]
[docs] def reparam_sampler(self, num_samples: int) -> ReparametrizationSampler[GPfluxPredictor]: """ Return a reparametrization sampler for a :class:`DeepGaussianProcess` model. :param num_samples: The number of samples to obtain. :return: The reparametrization sampler. """ return DeepGaussianProcessReparamSampler(num_samples, self)
[docs] def trajectory_sampler(self) -> TrajectorySampler[GPfluxPredictor]: """ Return a trajectory sampler. For :class:`DeepGaussianProcess`, we build trajectories using the GPflux default sampler. :return: The trajectory sampler. """ return DeepGaussianProcessDecoupledTrajectorySampler(self, self._num_rff_features)
[docs] def update_encoded(self, dataset: Dataset) -> None: inputs = dataset.query_points new_num_data = inputs.shape[0] self.model_gpflux.num_data = new_num_data # Update num_data for each layer, as well as make sure dataset shapes are ok for i, layer in enumerate(self.model_gpflux.f_layers): if hasattr(layer, "num_data"): layer.num_data = new_num_data if isinstance(layer, LatentVariableLayer): inputs = layer(inputs) continue if isinstance(layer.inducing_variable, InducingPoints): inducing_variable = layer.inducing_variable else: inducing_variable = layer.inducing_variable.inducing_variable if inputs.shape[-1] != inducing_variable.Z.shape[-1]: raise ValueError( f"Shape {inputs.shape} of input to layer {layer} is incompatible with shape" f" {inducing_variable.Z.shape} of that layer. Trailing dimensions must match." ) if ( i == len(self.model_gpflux.f_layers) - 1 and dataset.observations.shape[-1] != layer.q_mu.shape[-1] ): raise ValueError( f"Shape {dataset.observations.shape} of new observations is incompatible" f" with shape {layer.q_mu.shape} of existing observations. Trailing" f" dimensions must match." ) inputs = layer(inputs)
[docs] def optimize_encoded(self, dataset: Dataset) -> tf_keras.callbacks.History: """ Optimize the model with the specified `dataset`. :param dataset: The data with which to optimize the `model`. """ fit_args = dict(self.optimizer.fit_args) # Tell optimizer how many epochs have been used before: the optimizer will "continue" # optimization across multiple BO iterations rather than start fresh at each iteration. # This allows us to monitor training across iterations. if "epochs" in fit_args: fit_args["epochs"] = fit_args["epochs"] + self._absolute_epochs hist = self.model_keras.fit( {"inputs": dataset.query_points, "targets": dataset.observations}, **fit_args, initial_epoch=self._absolute_epochs, ) if self._continuous_optimisation: self._absolute_epochs = self._absolute_epochs + len(hist.history["loss"]) # Reset lr in case there was an lr schedule: a schedule will have changed the learning # rate, so that the next time we call `optimize` the starting learning rate would be # different. Therefore, we make sure the learning rate is set back to its initial value. # However, this is not needed for `LearningRateSchedule` instances. if not isinstance( self.optimizer.optimizer.lr, tf_keras.optimizers.schedules.LearningRateSchedule ): self.optimizer.optimizer.lr.assign(self.original_lr) return hist
[docs] def log(self, dataset: Optional[Dataset] = None) -> None: """ Log model training information at a given optimization step to the Tensorboard. We log a few summary statistics of losses, layer KL divergences and metrics (as provided in ``optimizer``): ``final`` value at the end of the training, ``diff`` value as a difference between inital and final epoch. We also log epoch statistics, but as histograms, rather than time series. We also log several training data based metrics, such as root mean square error between predictions and observations and several others. For custom logs user will need to subclass the model and overwrite this method. :param dataset: Optional data that can be used to log additional data-based model summaries. """ summary_writer = logging.get_tensorboard_writer() if summary_writer: with summary_writer.as_default(step=logging.get_step_number()): logging.scalar("epochs/num_epochs", len(self.model_keras.history.epoch)) for idx, layer in enumerate(self.model_gpflux.f_layers): write_summary_kernel_parameters(layer.kernel, prefix=f"layer[{idx}]/") write_summary_likelihood_parameters(self.model_gpflux.likelihood_layer.likelihood) for k, v in self.model_keras.history.history.items(): logging.histogram(f"{k}/epoch", lambda: v) logging.scalar(f"{k}/final", lambda: v[-1]) logging.scalar(f"{k}/diff", lambda: v[0] - v[-1]) if dataset: write_summary_data_based_metrics( dataset=dataset, model=self, prefix="training_" )