Source code for tf_al.wrapper.mc_dropout

import math
import numpy as np
from sklearn.metrics import roc_auc_score
from scipy.special import digamma, beta
import tensorflow.keras as keras
import tensorflow as tf

from . import  Model
from ..utils import beta_approximated_upper_joint_entropy



[docs]class McDropout(Model): """ Wrapper class for neural networks. """ def __init__(self, model, config=None, **kwargs): super().__init__(model, config=config, model_type="mc_dropout", **kwargs) self._approximation_type = "sampling" # disable batch norm # super().disable_batch_norm() def __call__(self, inputs, sample_size=10, batch_size=None, **kwargs): """ Perform a prediction using mc dropout as bayesian approximation. Parameters: inputs (numpy.ndarray): Inputs going into the model sample_size (int): Number of samples to acquire from bayesian model. (default=10) batch_size (int): In how many batches to split the data? (default=None) """ if batch_size is None: batch_size = len(inputs) if batch_size < 1: raise ValueError("Error in McDropout.__call__(). Can't select negative amount of batches.") if sample_size < 1: raise ValueError("Error in McDropout.__call__(). Can't sample negative amount of times.") total_len = len(inputs) num_batches = math.ceil(total_len/batch_size) batches = np.array_split(inputs, num_batches, axis=0) predictions = [] for batch in batches: # Sample n_times for given batch posterior_samples = [] for i in range(sample_size): posterior_samples.append(self._model(batch, training=True)) # Sampled single time or multiple times? if sample_size > 1: stacked = np.stack(posterior_samples, axis=1) predictions.append(stacked) else: predictions.append(posterior_samples[0]) if len(predictions) == 1: return predictions[0] return np.vstack(predictions)
[docs] def evaluate(self, inputs, targets, sample_size=10, **kwargs): """ Evaluate a model on given input data and targets. """ if len(inputs) != len(targets): raise ValueError("Error in McDropout.evaluate(). Targets and inputs not of equal length.") # Returns: (batch_size, sample_size, target_len) or (batch_size, target_len) predictions = self.__call__(inputs, sample_size=sample_size, **kwargs) output_metrics = {} for metric in self.eval_metrics: metric_name = None if hasattr(metric, "__name__"): metric_name = metric.__name__ else: metric_name = metric.name output_metrics[metric_name] = metric(targets, predictions, sample_size=sample_size) return output_metrics
# if self.is_classification(): # loss, acc = self.__evaluate(predictions, targets, sample_size) # return {"loss": loss, "accuracy": acc} # loss_fn = keras.losses.get(self._model.loss) # loss = loss_fn(predictions, targets).numpy() # return {"loss": np.mean(loss, axis=-1), "accuracy": []} def __evaluate(self, predictions, targets, sample_size): """ Parameters: predictions (numpy.ndarray): The predictions made by the network of shape (batch, targets) or (batch, samples, targets) targets (numpy.ndarray): The target values sample_size (int): The number of samples taken from posterior. Returns: (list()) of values representing the accuracy and loss """ expectation = predictions if len(predictions.shape) == 3: expectation = np.average(predictions, axis=1) # Will fail in regression case!!!! Add flag to function? loss_fn = tf.keras.losses.get(self._model.loss) loss = loss_fn(targets, expectation) # Extend dimension in binary case extended = self._problem.extend_binary_predictions(predictions) pred_targets = np.argmax(extended, axis=-1) # One-hot vector passed if len(targets.shape) == 2: targets = np.argmax(targets, axis=1) # Extend target dimension (multiple sample in prediction) if sample_size > 1: targets = np.vstack([targets]*sample_size).T acc = np.mean(pred_targets == targets) return [np.mean(loss.numpy()), acc]
[docs] def compile(self, *args, **kwargs): self._model.compile(**kwargs) metrics = self._create_init_metrics(kwargs) metric_names = self._extract_metric_names(metrics) self.eval_metrics = self._init_metrics("sampling", metric_names) self._compile_params = kwargs
# ----- # Acquisition functions # ---------------------------
[docs] def get_query_fn(self, name): fn = None if name == "max_entropy": fn = self.__max_entropy elif name == "bald": fn = self.__bald elif name == "max_var_ratio": fn = self.__max_var_ratio elif name == "std_mean": fn = self.__std_mean elif name == "baba": fn = self.__baba return fn
def __max_entropy(self, data, sample_size=10, **kwargs): """ Select datapoints by using max entropy. Parameters: model (tf.Model) The tensorflow model to use for selection of datapoints unlabeled_pool (Pool) The pool of unlabeled data to select """ # Create predictions predictions = self.__call__(data, sample_size=sample_size) expectation = self.expectation(predictions) # Absolute value to prevent nan values and + 0.001 to prevent infinity values log_post = np.log(np.abs(expectation) + .001) # Calculate max-entropy return -np.sum(expectation*log_post, axis=1) def __bald(self, data, sample_size=10, **kwargs): # TODO: dimensions do not line up in mutli class # predictions shape (batch, num_predictions, num_classes) predictions = self.__call__(data, sample_size=sample_size) posterior = self.expectation(predictions) entropy = -self.__shannon_entropy(posterior) # first_term = -np.sum(posterior*np.log(np.abs(posterior) + .001), axis=1) # Missing dimension in binary case? predictions = self._problem.extend_binary_predictions(predictions) inner_sum = self.__shannon_entropy(predictions) # inner_sum = np.sum(predictions*np.log(np.abs(predictions) + .001), axis=1) disagreement = np.sum(inner_sum, axis=1)/predictions.shape[1] return entropy + disagreement def __max_var_ratio(self, data, sample_size=10, **kwargs): """ Select datapoints by maximising variation ratios. # (batch, predictions, classes) reduce to (batch, predictions (max-class)) # 1 - (count of most common class / num predictions) """ predictions = self.__call__(data, sample_size=sample_size) posterior = self.expectation(predictions) # Calcualte max variation rations return 1 - posterior.max(axis=1) def __std_mean(self, data, sample_size=10, **kwargs): """ Maximise mean standard deviation. Check std mean calculation. Depending the model type calculation of p(y=c|x, w) can differ. (Kampffmeyer et al. 2016; Kendall et al. 2015) Todo: Implement distinction for different model types. """ # TODO: generalize for n-classes For binary classes predictions = self.__call__(data, sample_size=sample_size) # Calculate variance/standard deviation from samples variance = self.variance(predictions) std = np.sqrt(variance) # Mean over target variables return np.mean(std, axis=-1) def __baba(self, data, sample_size=10, **kwargs): """ Normalized mutual information Implementation of acquisition function described in: BABA: Beta Approximation for Bayesian Active Learning, Jae Oh Woo """ # predictions shape (batch, num_predictions, num_classes) predictions = self.__call__(data, sample_size=sample_size) sample_mean = self.expectation(predictions) entropy = -self.__shannon_entropy(sample_mean) disagreement = self.__disagreement(predictions) bald_term = self.__mutual_information(entropy, disagreement) # Beta approximation parameters sample_var = self.variance(predictions) a = ((np.power(sample_mean, 2)*(1-sample_mean))/(sample_var+.0001))-sample_mean b = ((1/sample_mean)-1)*a upper_joint_entropy = beta_approximated_upper_joint_entropy(a, b) return bald_term/np.abs(upper_joint_entropy) # -------------- # Utils # -------------------- def __disagreement(self, predictions): predictions = self._problem.extend_binary_predictions(predictions) inner_sum = self.__shannon_entropy(predictions) return np.sum(inner_sum, axis=1)/predictions.shape[1] def __mutual_information(self, entropy, disagreement): return entropy + disagreement def __shannon_entropy(self, values): """ Calculate the shannon entropy for given values. """ return np.sum(values*np.log(values + .001), axis=1)
[docs] def expectation(self, predictions): """ Calculate the mean of the distribution output distribution. Returns: (numpy.ndarray) The expectation per datapoint """ # predictions -> (batch_size, num_predictions) predictions = self._problem.extend_binary_predictions(predictions) return np.average(predictions, axis=1)
[docs] def variance(self, predictions): """ Calculate the variance of the distribution. Returns: (numpy.ndarray) The variance per datapoint and target """ predictions = self._problem.extend_binary_predictions(predictions) return np.var(predictions, axis=1)
[docs] def std(self, predictions): """ Calculate the standard deviation. Returns: (numpy.ndarray) The standard deviation per datapoint and target """ predictions = self._problem.extend_binary_predictions(predictions) return np.std(predictions, axis=1)