Source code for

#!/usr/bin/env python3
# Copyright 2020 Benjamin Ehret
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# title           :data/timeseries/
# author          :be
# contact
# created         :14/04/2020
# version         :1.0
# python_version  :3.7
Sequence of Stroke MNIST Samples (SeqSMNIST) Dataset

A data handler to generate a set of sequential stroke MNIST tasks for continual
learning. The used stroke MNIST data was already preprocessed with the script
:mod:`data.timeseries.preprocess_smnist` (see also the corresponding data
handler in :mod:`data.timeseries.smnist_data`).

**The task**

Given a sequence of two smnist digits of length ``n`` (e.g. ``2,5,5,2,2`` with
``n=5``), classify which of the ``2**n`` possible binary sequences (classes) the
presented sequence belongs to. E.g., for ``n=3`` the number of classes would be
8 (corresponding to all possible sequences with two digits (``0`` and ``1``
here): ``000, 001, 010, 100, 011, 110, 101, 111``.

The individual tasks of the task family differ in which digits are used to 
generate the binary sequences. Considering all possible pairs of digits we 
can generate (10**2-10) / 2 = 45 tasks.
import numpy as np
import numpy.matlib as npm
import pickle
import os
import urllib.request
import itertools
from sklearn.preprocessing import OneHotEncoder

from import SequentialDataset

# generate sequences and classes
# generate dataset by sampling numbers from smnist
# write class that implements dataset interface
# write train file
# write hpsearch file

#def generate_tasks(self):

[docs]class SeqSMNIST(SequentialDataset): """Datahandler for one sequential stroke MNIST task (as described above). Note: That the outputs are always provided as one-hot encodings of duration equal to one. One can decide to make these targets span the entirety of the sequence (by repeating it over timesteps) by setting ``target_per_timestep`` to ``True``. Args: data_path (str): Where should the dataset be read from? If not existing, the dataset will be downloaded into this folder. use_one_hot (bool): Whether the class labels should be represented in a one-hot encoding. num_train (int) : Number of training samples to be generated. num_test (int) : Number of test samples to be generated. num_val (int) : Number of validation samples to be generated. target_per_timestep (bool): If activated, the one-hot encoding of the current image will be copied across the entire sequence. Else, there is a single target for the entire sequence (rather than one per timestep. sequence_length (int): The length of the binary sequence to be classified. This also affects the number of classes which is ``2**n``. digits (tuple): The two digits that shall be used for generating the binary sequence. two_class (bool): When true, instead of classifying each possible sequence individually, sequences are randomly grouped into two classes. This makes the number of classes (and therefore the chance level) independent of the sequence length. upsample_control (bool): If ``True``, instead of building sequences of digits, we upsample single digits by a factor given by ``seq_len``. fix_class_partition (bool): TODO rseed (int): Seed for numpy random state. """ def __init__(self, data_path, use_one_hot=True, num_train=1600, num_test=400, num_val=0, target_per_timestep=True, sequence_length=4, digits=(0,1), two_class=False, upsample_control=False, fix_class_partition=False, rseed=None): super().__init__() # set random state if rseed is not None: self._rstate = np.random.RandomState(rseed) else: self._rstate = np.random self.target_per_timestep = target_per_timestep self.two_class = two_class # If dataset does not exist in dataset folder, download it from dropbox. # FIXME Dropbox link might become invalid in the near future. data_path = os.path.join(data_path, 'sequential/smnist/ss_mnist_data.pickle') if not os.path.exists(data_path): data_dir = os.path.dirname(data_path) if not os.path.exists(data_dir): os.makedirs(data_dir) url = "" try: u = urllib.request.urlopen(url) data = u.close() except: raise RuntimeError('SMNIST data cannot be downloaded. '+ 'If you are working on the cluster, please manually '+ 'copy the pickled dataset into the following location: ' '%s. ' % (data_path) + 'If the dropbox link (%s) ' % url + 'is invalid, please rebuild the dataset using the script ' + '"".') with open(data_path, "wb") as f: f.write(data) with open(data_path, 'rb') as f: data = pickle.load(f) smnist_n_train = 50000 x_data_train = data[0][:smnist_n_train] y_data_train = data[1][:smnist_n_train] x_data_test = data[2] y_data_test = data[3] x_data_val = data[0][smnist_n_train:] y_data_val = data[1][smnist_n_train:] # generate data if two_class: num_classes = 2**sequence_length if sequence_length == 1: class_partition = np.asarray([1]) else: # randomly group sequences into 2 classes class_partition = self._rstate.choice(num_classes, int(num_classes/2), replace=False) if fix_class_partition: # use the same random partition for all tasks rstate_partition = np.random.RandomState(42) class_partition = rstate_partition.choice(num_classes, int(num_classes/2), replace=False) else: class_partition = None #print(class_partition) max_seq_len = 117 * sequence_length x_train, y_train, sample_lengths_train = \ self._generate_data(x_data_train, y_data_train, max_seq_len, digits, sequence_length, num_train, use_one_hot, class_partition, upsample_control) x_test, y_test, sample_lengths_test = \ self._generate_data(x_data_test, y_data_test, max_seq_len, digits, sequence_length, num_test, use_one_hot, class_partition, upsample_control) x_val, y_val, sample_lengths_val = \ self._generate_data(x_data_val, y_data_val, max_seq_len, digits, sequence_length, num_val, use_one_hot, class_partition, upsample_control) in_data = np.vstack((x_val,x_train,x_test)) out_data = np.vstack((y_val,y_train,y_test)) sample_lengths = np.hstack((sample_lengths_val, sample_lengths_train, sample_lengths_test)) # Specify internal data structure. self._data['classification'] = True self._data['sequence'] = True if two_class: self._data['num_classes'] = 2 else: self._data['num_classes'] = 2**sequence_length # Quadruple per timestep: (dx, dy, eos, eod). self._data['in_shape'] = [4] # Quatruple self._data['out_shape'] = [self._data['num_classes'] if use_one_hot else 1] # Maximum number of timesteps, sequences will be padded to this length. self._data['num_time_steps'] = max_seq_len self._data['is_one_hot'] = use_one_hot self._data['in_data'] = in_data self._data['out_data'] = out_data if num_val > 0: self._data['val_inds'] = np.arange(num_val) self._data['train_inds'] = np.arange(num_val, num_val+num_train) self._data['test_inds'] = np.arange(num_val+num_train, num_val+num_train + num_test) self._data['in_seq_lengths'] = sample_lengths if target_per_timestep: self._data['out_seq_lengths'] = sample_lengths else: self._data['out_seq_lengths'] = np.ones_like(sample_lengths) def _generate_data(self, x_data, y_data, max_seq_len, digits, seq_len, n_samples, use_one_hot, class_partition, upsample_control): """Generates data for a single sequence stroke MNIST task with specified length and digits to use. Args: x_data (list): Original stroke mnist input data, with every list entry being a numpy.ndarray of shape ``[4, stroke_seq_len]`` y_data (list): Original stroke mnist labels, with every list entry being a numpy.ndarray of shape ``[10]`` max_seq_len (int): The maximum length of a sequence (i.e. number of timesteps of a sample) digits (tuple): The two digits used to build the sequence seq_len (int): The length of the sequence of digits to build n_samples (int): The number of samples that should be generated use_one_hot (bool): Whether or not to use one hot encodings class_partition (list): If sequences should be grouped into 2 different classes this list specifies the class partition. upsample_control (bool): See constructor docstring. Returns: (tuple): Tuple containing: - **in_data** (numpy.ndarray): Numpy array of shape ``[n_samples, max_num_time_steps * 4]``. - **out_data** (numpy.ndarray): Numpy array of shape ``[n_samples, max_num_time_steps]``. - **sample_lengths** (numpy.ndarray): The original unpadded sequence lengths. """ # modify seq_len in case we do upsampling control if upsample_control: upsample_factor = seq_len seq_len = 1 if not self.two_class: raise NotImplementedError() # construct all possible classes classes = ["".join(seq) for seq in \ itertools.product("01", repeat=seq_len)] # get the right number of samples per class to get a balanced data set # with the desired n_samples. num = n_samples div = len(classes) n_samples_per_class = [num // div + (1 if x < num % div else 0) \ for x in range (div)] # find indices of samples with the wanted digit class y_data = [np.argmax(y) for y in y_data] digit_idx = [] digit_idx.append(np.where(np.asarray(y_data) == digits[0])[0]) digit_idx.append(np.where(np.asarray(y_data) == digits[1])[0]) # generate samples for every class samples = [] labels = [] for i,c in enumerate(classes): this_label = i digits_to_sample = [int(c[i]) for i in range(len(c))] for s in range(n_samples_per_class[i]): this_sample = None for d in digits_to_sample: rand_idx = self._rstate.randint(len(digit_idx[d])) sample_idx = digit_idx[d][rand_idx] digit_sample = x_data[sample_idx] if this_sample is None: this_sample = digit_sample else: this_sample = np.vstack((this_sample,digit_sample)) samples.append(this_sample) labels.append(this_label) # if configured sort labels into 2 classes labels = np.asarray(labels) if self.two_class and not upsample_control: lbl_mask = np.isin(labels, class_partition) labels[~lbl_mask] = 0 labels[lbl_mask] = 1 if upsample_control: for i,s in enumerate(samples): # Initial timestep is absolute start position of digit. To # translate to a higher resolution image, we can just multiply # the abolute position vby the scaling factor. upsample = s[0,:]*upsample_factor for t in np.arange(1,s.shape[0]): # don't do upsampling at end of strokes or end of digits if all((s[t,2] == 0, s[t,3] == 0)): # Repeat original stroke "upsample_factor" times, such # that the relative stroke length is identical if # images are normalized to same resolution. for k in range(upsample_factor): upsample = np.vstack((upsample, s[t,:])) else: upsample = np.vstack((upsample, s[t,:])) samples[i] = upsample # structure output data out_data = labels.reshape(-1, 1) if use_one_hot: n_classes = 2**seq_len if self.two_class: n_classes = 2 # FIXME We shouldn't call this method if the validation set size is # zero. if out_data.size == 0: out_data = np.matlib.repmat(out_data, 1, n_classes) else: # FIXME use internal method `_to_one_hot` and set required class # attributes beforehand. one_hot_encoder = OneHotEncoder(categories=[range(n_classes)]), 1, 1).T) out_data = one_hot_encoder.transform(out_data).toarray() if self.target_per_timestep: out_data = np.matlib.repmat(np.asarray(out_data), 1, max_seq_len) # structure input data in_data = np.zeros((n_samples,max_seq_len,4)) sample_lengths = np.zeros(n_samples) for i,s in enumerate(samples): in_data[i,:s.shape[0],:] = s sample_lengths[i] = s.shape[0] in_data = self._flatten_array(in_data) return in_data, out_data, sample_lengths
[docs] def get_identifier(self): """Returns the name of the dataset.""" return 'Sequence SMNIST'
def _plot_sample(self, fig, inner_grid, num_inner_plots, ind, inputs, outputs=None, predictions=None, sample_ids=None, is_one_hot=None): raise NotImplementedError() def _plot_config(self, inputs, outputs=None, predictions=None): raise NotImplementedError() def __str__(self): """Print major characteristics of the current dataset.""" return 'Data handler for sequential SMNIST'
if __name__ == '__main__': pass