import math
from copy import deepcopy
import numpy as np
[docs]class Pool:
"""
Pool that holds information about labeled and unlabeld inputs.
The attribute 'indices' holds information about the labeled inputs.
Each value of self.indices can take the following states:
(value==-1) Corresponding input is labeld
(value!=-1) Corresponding input is not labeled
Parameters:
inputs (numpy.ndarray): Inputs to the network.
targets (numpy.ndarray): Already known targets, used for experimental runs. (default=None)
target_shape (tuple()): The shape of the target, if None equals the len(inputs). (default=None)
"""
def __init__(self, inputs, targets=None, target_shape=None):
self.__inputs = inputs
self.__true_targets = targets
self.__indices = np.linspace(0, len(inputs)-1, len(inputs), dtype=int)
if targets is not None:
self.__targets = np.zeros(targets.shape)
elif target_shape is None:
self.__targets = np.zeros(len(inputs))
else:
self.__targets = np.zeros(target_shape)
[docs] def init(self, size):
"""
Initialize the pool with specific number of labels.
Only applicable when pool in pseudo mode.
Parameters:
size (int|list|np.ndarray): Either the number of datapoints to initialized or an explicit list or array of indices to initialize.
"""
is_int = isinstance(size, int)
is_list = isinstance(size, list)
is_np_array = isinstance(size, np.ndarray)
if not self.is_pseudo():
raise ValueError("Error in Pool.init(size). Can't initialize pool using init(size) when not in pseudo mode. Initialize pool with targets, to put Pool in pseudo mode.")
if is_int and size < 1:
raise ValueError("Error in Pool.init(size). Can't initialize pool with {} targets. Use a positive integer > 1.".format(size))
if is_int and len(self.__indices) < size:
raise ValueError("Error in Pool.init(size). Can't initialize pool, not enough targets. {} targets required, {} are available.".format(size, len(self.__indices)))
if not (is_int or is_list or is_np_array):
raise ValueError("Error in Pool.init(size). Expected size to be an integer, list or numpy array of indices.")
# Initialize explicit indices
if is_list or is_np_array:
self.__init_explicit_indices(size)
return
# Initialize pool with one-hot-vector labels
true_target_shape = self.__true_targets.shape
if len(true_target_shape) > 1 and true_target_shape[-1] > 1:
self.__init_with_one_hot_vectors(size)
return
# WARNING: Will only work for categorical targets
# Initialize n-datapoints per class
unique_targets = np.unique(self.__true_targets)
num_to_select = self.__adapt_init_size(size, len(unique_targets))
while size > 0:
# Annotate samples in round robin like schme
# TODO: unique targets may be one-hot vector or float in regression case
for target in unique_targets:
unlabeled_indices = self.get_unlabeled_indices()
true_targets = self.__true_targets[unlabeled_indices]
selector = (true_targets == target)
# Selector may be multi-dimensional array -> needs to be flattened
indices = unlabeled_indices[selector]
targets = true_targets[selector]
# Move to next target, when none available of this type
if len(indices) == 0:
continue
adapted_num_to_select = self.__adapt_num_to_select(targets, num_to_select)
selected_indices = np.random.choice(indices, adapted_num_to_select, replace=False)
# Update pool
selected_targets = self.__true_targets[selected_indices]
self.annotate(selected_indices, selected_targets)
size -= adapted_num_to_select
if size < 1:
break
def __init_with_one_hot_vectors(self, size):
true_label_index = np.argmax(self.__true_targets, axis=-1)
num_labels = np.unique(true_label_index)
num_to_select = self.__adapt_init_size(size, len(num_labels))
while size > 0:
for target in num_labels:
# Get one-hot encoded labels
unlabeled_indices = self.get_unlabeled_indices()
true_targets = np.argmax(self.__true_targets, axis=-1)[unlabeled_indices]
selector = (true_targets == target)
#
indices = unlabeled_indices[selector]
targets = true_targets[selector]
# No datapoints for current target available
if len(indices) == 0:
continue
adapted_num_to_select = self.__adapt_num_to_select(targets, num_to_select)
selected_indices = np.random.choice(indices, adapted_num_to_select, replace=False)
# Update pool
selected_targets = self.__true_targets[selected_indices]
self.annotate(selected_indices, selected_targets)
size -= adapted_num_to_select
if size < 1:
break
def __init_explicit_indices(self, indices):
"""
Initializes the pool with ex
Parameters:
indices (list|numpy.ndarray): A list of indices which to use
"""
try:
unlabeled_indices = self.get_unlabeled_indices()
true_targets = self.__true_targets[unlabeled_indices]
selected_indices = unlabeled_indices[indices]
selected_targets = true_targets[indices]
self.annotate(selected_indices, selected_targets)
except IndexError as e:
raise IndexError("Error in Pool.init(size). " + str(e).capitalize() + ".")
def __adapt_num_to_select(self, available, num_to_select):
"""
Adapts the number of elements to select next.
Parameters:
available (numpy.ndarray): The available elements.
num_to_select (int): The number of elements to select.
Returns:
(int) the adapted number of elements selectable.
"""
num_available = len(available)
if num_available < num_to_select:
return num_available
return num_to_select
[docs] def get_targets_by(self, indices):
"""
"""
return self.__targets[indices]
def __setitem__(self, indices, targets):
"""
Shortcut to annotate function.
Parameters:
indices (numpy.ndarray): For which indices to set values.
targets (numpy.ndarray): The targets to set.
"""
self.annotate(indices, targets)
[docs] def annotate(self, indices, targets=None):
"""
Annotate inputs of given indices with given targets.
Parameters:
indices (numpy.ndarray): The indices to annotate.
targets (numpy.ndarray): The labels to set for the given annotations.
"""
if targets is None:
if self.__targets is None:
raise ValueError("Error in Pool.annotate(). Can't annotate inputs, targets is None.")
targets = self.__true_targets[indices]
# Create annotation
self.__indices[indices] = -1
self.__targets[indices] = targets
# ---------
# Utilities
# -------------------
def __adapt_init_size(self, size, available):
num_to_select = 1
if available < size:
num_to_select = math.floor(size/available)
return num_to_select
[docs] def has_unlabeled(self):
"""
Has pool any unlabeled inputs?
Returns:
(bool) true or false depending whether unlabeled data exists.
"""
selector = np.logical_not(self.__indices == -1)
return np.any(selector)
[docs] def has_labeled(self):
"""
Has pool labeled inputs?
Returns:
(bool) true or false depending whether or not there are labeled inputs.
"""
selector = self.__indices == -1
return np.any(selector)
[docs] def is_pseudo(self):
"""
Is the pool in pseudo mode?
Meaning, true target labels are already known?
Returns:
(bool) indicating whether or not true labels are existent.
"""
return self.__true_targets is not None
def __deepcopy__(self, memo):
return Pool(self.__inputs, self.__true_targets)
# ---------
# Setter/Getter
# -------------------
[docs] def get_indices(self):
"""
Returns the current labeling state.
Returns:
(numpy.ndarray) the indices state. (-1) indicating a labeled input.
"""
return self.__indices
[docs] def get_labeled_indices(self):
"""
Get the indices of labeled datapoints.
Returns:
(numpy.ndarray) of datapoints that already has been labeled.
"""
selector = self.__indices == -1
indices = np.linspace(0, len(self.__inputs)-1, len(self.__inputs), dtype=int)
return indices[selector]
[docs] def get_unlabeled_indices(self):
"""
Get all unlabeled indices for this pool.
Returns:
(numpy.ndarray) an array of indices.
"""
selector = self.__indices != -1
return self.__indices[selector]
[docs] def get_length_labeled(self):
"""
Get the number of labeled inputs.
Returns:
(int) The number of labeled inputs.
"""
return np.sum(self.__indices == -1)
[docs] def get_length_unlabeled(self):
"""
Get the number of unlabeld inputs.
Returns:
(int) the number of unlabeled inputs
"""
return np.sum(np.logical_not(self.__indices == -1))
[docs] def get_labeled_data(self):
"""
Get data and indices of datapoints which are currently labeled.
Returns:
(tuple(numpy.ndarray, numpy.ndarray)) inputs and corresponding targets.
"""
selector = self.__indices == -1
inputs = self.__inputs[selector]
targets = self.__targets[selector]
return inputs, targets
[docs] def get_unlabeled_data(self):
"""
Get data and their indices of datapoints which are currently not labeled.
Returns:
(tuple(numpy.ndarray, numpy.ndarray)) The inputs and their indices in the pool
"""
selector = self.__indices != -1
inputs = self.__inputs[selector]
indices = self.__indices[selector]
return inputs, indices