#!/usr/bin/env python3
# Copyright 2020 Benjamin Ehret
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# @title :data/timeseries/split_audioset.py
# @author :be
# @contact :behret@ethz.ch
# @created :02/04/2020
# @version :1.0
# @python_version :3.6.7
"""
Split Audioset Dataset
^^^^^^^^^^^^^^^^^^^^^^
The module :mod:`data.timeseries.split_audioset` contains a wrapper for data
handlers for the SplitAudioset task.
It is based on the module :mod:`data.special.split_mnist`.
"""
import numpy as np
from sklearn.model_selection import train_test_split
from hypnettorch.data.timeseries.audioset_data import AudiosetData
[docs]def get_split_audioset_handlers(data_path, use_one_hot=True, validation_size=0,
target_per_timestep=True,
num_classes_per_task=10, num_tasks=5,
rseed=None):
"""This function instantiates ``num_tasks`` objects of the class
:class:`AudiosetData` which will contain a disjoint set of labels.
The SplitAudioset task consists of ``num_tasks`` tasks which consist of a
classification problem with ``num_classes_per_task`` classes from our
preprocessed Audioset data set.
Args:
(....): See docstring of class
:class:`data.timeseries.audioset_data.AudiosetData`.
validation_size (int): The size of the validation set of each individual
data handler.
num_classes_per_task (int): Number of classes to put into one data
handler. If ``2``, then every data handler will include 2 classes.
num_tasks (int): The number of data handlers that should be
returned by this function.
rseed (int, optional): The ``rseed`` is passed when constructing
instances of class :class:`SplitAudioset`. In addition, it is used
to shuffle the classes before splitting Audioset into tasks.
Returns:
(list): A list of data handlers, each corresponding to a
:class:`SplitAudioset` object.
"""
# Set the random seed
if rseed is not None:
r_state = np.random.RandomState(rseed)
else:
r_state = np.random
# parse structure of subtasks
num_classes_total = 100
assert num_tasks is None or num_tasks > 0
if num_tasks is None:
num_tasks = num_classes_total // num_classes_per_task
if not (num_tasks >= 1 and \
(num_tasks * num_classes_per_task) <= num_classes_total):
raise ValueError('Cannot create SplitAudioset datasets for %d tasks '\
% (num_tasks) + 'with %d classes per task.' \
% (num_classes_per_task))
print('Creating %d data handlers for SplitAudioSet tasks ...' % num_tasks)
# Randomly shuffle the order of the classes to break the ontology order.
task_order = np.arange(num_classes_total)
task_order = r_state.permutation(task_order)
# determine which classes go into which subtasks.
# for the experiment in which we successively extend the number of classes
# / tasks, the classes need to be assigned to the different tasks in a
# reproducible way (between different versions of the subtask, not only
# over differeet runs). This only works if we have 10 or less tasks/classes,
# which is sufficient for the experiment. If the number of classes/tasks is
# higher, the described reproducibility is not required anymore.
task_classes = []
if num_tasks <= 10 and num_classes_per_task <= 10:
task_mat = np.reshape(task_order,(10,10))
for i in range(num_tasks):
task_classes.append(task_mat[i,:num_classes_per_task])
else:
for i in range(num_tasks):
task_classes.append(task_order[i*num_classes_per_task:
(i+1)*num_classes_per_task])
handlers = []
for tc in task_classes:
dhandler = SplitAudioset(data_path, validation_size=validation_size,
use_one_hot=use_one_hot, target_per_timestep=target_per_timestep,
rseed=rseed, labels=tc)
handlers.append(dhandler)
print('Creating data handlers for SplitAudioset tasks ... Done')
return handlers
[docs]class SplitAudioset(AudiosetData):
"""An instance of the class shall represent a SplitAudioset task.
Args:
(....): See docstring of class
:class:`data.timeseries.audioset_data.AudiosetData`.
validation_size (int): The size of the validation set of each individual
data handler.
labels (list): The labels that should be part of this task.
full_out_dim (bool): Choose the original Audioset labels instead of
the new task output dimension. This option will affect the
attributes :attr:`data.dataset.Dataset.num_classes` and
:attr:`data.dataset.Dataset.out_shape`.
"""
def __init__(self, data_path, use_one_hot=True, validation_size=1000,
target_per_timestep=True, rseed=None, labels=[0, 1],
full_out_dim=False):
# Note, we build the validation set below!
super().__init__(data_path, use_one_hot=use_one_hot, validation_size=0,
target_per_timestep=target_per_timestep, rseed=rseed)
self._full_out_dim = full_out_dim
if isinstance(labels, range):
labels = list(labels)
assert np.all(np.array(labels) >= 0) and \
np.all(np.array(labels) < self.num_classes) and \
len(labels) == len(np.unique(labels))
K = len(labels)
self._labels = labels
train_ins = self.get_train_inputs()
test_ins = self.get_test_inputs()
train_outs = self.get_train_outputs()
test_outs = self.get_test_outputs()
# Get labels.
if self.is_one_hot:
train_labels = self._to_one_hot(train_outs, reverse=True)
test_labels = self._to_one_hot(test_outs, reverse=True)
else:
train_labels = train_outs
test_labels = test_outs
# Note, the label stays the same for all timesteps.
train_labels = train_labels[:, 0]
test_labels = test_labels[:, 0]
assert train_labels.size == self.num_train_samples and \
test_labels.size == self.num_test_samples
train_mask = train_labels == labels[0]
test_mask = test_labels == labels[0]
for k in range(1, K):
train_mask = np.logical_or(train_mask, train_labels == labels[k])
test_mask = np.logical_or(test_mask, test_labels == labels[k])
train_ins = train_ins[train_mask, :]
test_ins = test_ins[test_mask, :]
train_outs = train_outs[train_mask, :]
test_outs = test_outs[test_mask, :]
# Old sample ids for new data, used extract correct sequence lengths.
prev_train_inds = self._data['train_inds'][train_mask]
prev_test_inds = self._data['test_inds'][test_mask]
in_seq_lengths = np.concatenate([ \
self._data['in_seq_lengths'][prev_train_inds],
self._data['in_seq_lengths'][prev_test_inds]])
out_seq_lengths = np.concatenate([ \
self._data['out_seq_lengths'][prev_train_inds],
self._data['out_seq_lengths'][prev_test_inds]])
if validation_size > 0:
if validation_size >= train_outs.shape[0]:
raise ValueError('Validation set size must be smaller than ' +
'%d.' % train_outs.shape[0])
train_inds, val_inds = train_test_split( \
np.arange(train_outs.shape[0]), test_size=validation_size,
shuffle=True, random_state=rseed,
stratify=train_labels[train_mask])
else:
train_inds = np.arange(train_outs.shape[0])
test_inds = np.arange(train_outs.shape[0],
train_outs.shape[0] + test_outs.shape[0])
outputs = np.concatenate([train_outs, test_outs], axis=0)
if not full_out_dim:
# Transform outputs, e.g., if 1-hot [0,0,0,1,0,0,0,0,0,0] -> [0,1]
# Note, the method assumes `full_out_dim` when later called by a
# user. We just misuse the function to call it inside the
# constructor.
self._full_out_dim = True
outputs = self.transform_outputs(outputs)
self._full_out_dim = full_out_dim
# Note, we may also have to adapt the output shape appropriately.
if self.is_one_hot:
self._data['out_shape'] = [len(labels)]
images = np.concatenate([train_ins, test_ins], axis=0)
### Overwrite internal data structure. Only keep desired labels.
# Note, we continue to pretend to be a 100 class problem, such that
# the user has easy access to the correct labels and has the original
# 1-hot encodings.
if not full_out_dim:
self._data['num_classes'] = len(labels)
else:
self._data['num_classes'] = 100
self._data['in_data'] = images
self._data['out_data'] = outputs
self._data['train_inds'] = train_inds
self._data['test_inds'] = test_inds
if validation_size > 0:
self._data['val_inds'] = val_inds
self._data['in_seq_lengths'] = in_seq_lengths
self._data['out_seq_lengths'] = out_seq_lengths
n_val = 0
if validation_size > 0:
n_val = val_inds.size
print('Created Audioset task with labels %s and %d train, %d test '
% (str(labels), train_inds.size, test_inds.size) +
'and %d val samples.' % (n_val))
[docs] def get_identifier(self):
"""Returns the name of the dataset."""
return 'SplitAudioset'
if __name__ == '__main__':
pass