Source code for hypnettorch.data.timeseries.copy_data

#!/usr/bin/env python3
# Copyright 2019 Maria Cervera
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# title           :data/timeseries/copy_data.py
# author          :mc
# contact         :mariacer@ethz.ch
# created         :20/03/2020
# version         :1.0
# python_version  :3.7
"""
Dataset for the sequential copy task
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

A data handler for the copy task as described in: 

    https://arxiv.org/pdf/1410.5401.pdf

A typical usecase of this dataset is in an incremental learning setting. For
instance, a sequence of tasks with increasing lengths can be used in curriculum
learning or continual learning.

The class contains a lot of options to modify the basic copy task. Many of those
variations target the usecase continual learning (rather than curriculum
learning) by providing sets of distinct tasks with comparable difficulty. Note,
these variations typically extend the required input processing and are not
limited to plain copying.
"""
import matplotlib.pyplot as plt
import numpy as np
from warnings import warn
import torch

from hypnettorch.data.sequential_dataset import SequentialDataset

[docs]class CopyTask(SequentialDataset): """Data handler for the sequential copy task. In this task, a binary vector is presented as input, and the network has to learn to copy it. Such that the network cannot rely on intermediate information, there is a delay between the end of the input presentation and the output generation. The end of the input sequence is delimited by a binary bit, which is always zero except when the sequence finishes. This flag should not be copied. An instance of this class will represent copy task patterns of random length (by default) but fixed width (but see option ``out_width``). The length of input patterns will be sampled uniformly from the interval ``[min_input_len, max_input_len]``. Note that the actual length of the patterns ``pat_len`` might be smaller in the case where there are a certain number of zero-valued timesteps within the input. As such, every sequence is characterised by the following values: - ``pat_len``: the actual length of the binary pattern to be copied. Across this duration, half the pixels have value of 1 and the other half have value 0. - ``input_len``: the length of input presentation up until the stop flag. It is equal to the pattern length plus the number of zero-valued timesteps. - ``seq_len``: the length of the entire sequences, including input presentation, stop flag and output generation. Therefore it is equal to the input length, plus one (stop flag), plus the pattern length (since during output reconstruction we don't care about reconstructing the zero-valued part of the input). Caution: Manipulations such as permutations or scattering/masking will be applied online in :meth:`output_to_torch_tensor`. Args: min_input_len (int): The minimum length of an input sequence. Note: The input length is the length of the presented input before the stop flag. It might include both a pattern to be copied and a set of zero-valued timesteps that do not need to be reconstructed. max_input_len (int): The maximum length of a pattern. seq_width (int): The width if each pattern. Note: Each pattern will have a certain length (across time) and a certain width. out_width (int, optional): If specified, a number smaller than ``seq_width`` is expected. In this case, only the first ``out_width`` input features are expected to be copied (i.e., only those occur as target output features). num_train (int): Number of training samples. num_test (int): Number of test samples. num_val (int, optional): Number of validation samples. pat_len (int, optional): The actual length of the pattern within the input sequence (excluding zero-valued timesteps). By default, the value is ``-1`` meaning that the pattern length is identical to the input length, and there are no zeroed timesteps. For other values, the input sequences will be zero-padded after ``pat_len`` timesteps. Therefore, the input sequence lengths remain the same, but the actual duration of the patterns is reduced. This manipulation is useful to decouple sequence length and memory requirement for analysis. Note: We define the number of timesteps that are not zero, and therefore for values different than ``-1`` with the current implementation we will obtain patterns of identical length (but different input sequence length). scatter_pattern (bool): Option only compatible with ``pat_len != -1``. If activated, the pattern is not concentrated at the beginning of the input sequence. Instead, the whole input sequence will be filled with a random pattern (i.e., no padding is used) but only a fixed and random (see option ``rseed_scatter``) number of timesteps from the input sequence are considered to create an output sequence of length ``pat_len``. permute_width (boolean, optional): If enabled, the generated pattern will be permuted along the width axis. permute_time (boolean, optional): If enabled, the generated pattern will be permuted along the temporal axis. permute_xor (bool): Only applicable if ``permute_width`` or ``permute_time`` is ``True``. If ``True``, the permuted and unpermuted output pattern will be combined to a new output pattern via a logical xor operation. permute_xor_iter (int): Only applicable if ``permute_xor`` is set. If ``True``, the internal permutation is applied iteratively and XOR-ed with the previous target output to obtain a final target output. permute_xor_separate (bool): Only applicable if ``permute_xor`` is set and ``permute_xor_iter > 1``. If ``True``, a separate permutation matrix is used per iteration described by ``permute_xor_iter``. In this case, we the input pattern is ``permute_xor_iter`` times permuted via a separate permutation matrix and the resulting patterns are sequentially XOR-ed with the original input pattern. Hence, this can be viewed as follows: ``permute_xor_iter`` random input pixels are assigned to each output pattern pixel. This output pattern pixel will be ``1`` if and only if the number of ones in those input pixels is odd. random_pad (bool, optional): If activated, the truncated part of the input (see option ``pat_len``) will be left as a random pattern, and not set to zero. Note that the loss computation is unaffected by this option. pad_after_stop (bool): This option will affect how option ``pat_len`` is handled and therefore can only be used if ``pat_len`` is set. If ``True``, ``pat_len`` will determine the length of the input sequence (no padding applied before the stop bit). Therefore, the padding is moved to after the stop bit and therewith part of the target output. I.e., the original input sequence length determines the output sequence length which consists of zero padding and the input pattern of length ``pat_len``. Note, in this case, the options ``min_input_len`` and ``max_input_len`` actually apply solely to the output. pairwise_permute (bool, optional): This option is only used if some permutation is activated. If enabled, it will force the permutation to be a pairwise switch between successive pixels. Note that this operation is deterministic, and will therefore be identical for different tasks, if more than one task is generated. revert_output_seq (bool, optional): If enabled, it will revert output sequences along the time dimension. Note that this operation is deterministic, and will therefore be identical for different tasks, if more than one task is generated. rseed (int, optional): If ``None``, the current random state of numpy is used to generate the data. Otherwise, a new random state with the given seed is generated. rseed_permute (int, optional): Random seed for performing permutations of the copy patterns. Only used if option ``permute_width`` or ``permute_time`` are activated. If ``None``, the current random state of numpy is used to generate the data. Otherwise, a new random state with the given seed is generated. rseed_scatter (int, optional): See option ``rseed``. Random seed for determining which timesteps of the input sequence to use for the output pattern if option ``scatter_pattern`` is activated. """ def __init__(self, min_input_len, max_input_len, seq_width=7, out_width=-1, num_train=100, num_test=100, num_val=None, pat_len=-1, scatter_pattern=False, permute_width=False, permute_time=False, permute_xor=False, permute_xor_iter=1, permute_xor_separate=False, random_pad=False, pad_after_stop=False, pairwise_permute=False, revert_output_seq=False, rseed=None, rseed_permute=None, rseed_scatter=None): super().__init__() if pad_after_stop and pat_len == -1: raise ValueError('Option "pad_after_stop" can only be set if ' + '"pat_len" is defined.') if pad_after_stop and random_pad: # We could enforce a fixed random pattern instead of zero-padding. raise NotImplementedError() if pad_after_stop and scatter_pattern: raise ValueError('Options "pad_after_stop" and "scatter_pattern" ' + 'not compatible.') if revert_output_seq and (permute_time or permute_width): raise ValueError('Incompatible options: "revert_output_seq" and ' + 'permutations.') if out_width == -1: out_width = seq_width else: assert out_width <= seq_width if out_width < seq_width and permute_width: raise NotImplementedError('The combination of "out_width" ' + 'with "permute_width" is currently ' + 'not supported or tested.') # set random state if rseed is not None: self._rstate = np.random.RandomState(rseed) else: self._rstate = np.random self._permute_width = permute_width self._permute_time = permute_time self._permute_xor = permute_xor self._permute_xor_iter = permute_xor_iter self._permute_xor_separate = permute_xor_separate self._rseed_permute = rseed_permute self._random_pad = random_pad self._pad_after_stop = pad_after_stop self._pairwise_permute = pairwise_permute self._revert_output_seq = revert_output_seq rstate_permute = None if permute_width or permute_time: # set random state for permutations if rseed_permute is not None: rstate_permute = np.random.RandomState(rseed_permute) else: rstate_permute = np.random # Permute not implemented for sequences of varying length. assert min_input_len == max_input_len elif permute_xor: warn('Option "permute_xor" has no effect if no ' + 'permutations are applied.') # We define the permutations in terms of a permutation vector X, # which is applied to flattened arrays, which are then returned to # their original shapes. # Note that right now the code is not optimal, as we will loop over # all samples to apply permutations, even in the case when no actual # permutations are occurring. This is implemented like this for sanity # checks but should be fixed in the future. if pat_len == -1: pat_len_perm = min_input_len else: pat_len_perm = pat_len # Note we don't overwrite pat_len since for the non permutation # cases, we want it to be different for each sample. self.permutation_ = None if (permute_width or permute_time) and permute_xor and \ permute_xor_separate: self.permutation_ = [] for _ in range(permute_xor_iter): self.permutation_.append(CopyTask.create_permutation_matrix( \ permute_time, permute_width, pat_len_perm, out_width, rstate_permute, pairwise_permute=self._pairwise_permute)) elif permute_width or permute_time or revert_output_seq: self.permutation_ = CopyTask.create_permutation_matrix( \ permute_time, permute_width, pat_len_perm, out_width, rstate_permute, pairwise_permute=self._pairwise_permute, revert_output_seq=self._revert_output_seq) out_pat_steps = None if scatter_pattern: if pat_len == -1 or pat_len > min_input_len: raise ValueError('Option "pat_len=%d" invalid when ' % pat_len + 'activating "scatter_pattern".') if min_input_len != max_input_len: warn('Option "scatter_pattern" only considers timesteps ' + 'below "min_input_len".') rstate_scatter = np.random if rseed_scatter is not None: rstate_scatter = np.random.RandomState(rseed_scatter) # Select timesteps to be used from the input to create the output # pattern. out_pat_steps = np.sort(rstate_scatter.choice(\ np.arange(min_input_len), pat_len, replace=False)) # Specify internal data structure. # Note, it's not a typical classification dataset, since it consists of # `seq_width` many independent binary classification decisions per # timestep. self._data['classification'] = False self._data['is_one_hot'] = False self._data['sequence'] = True # Note, there will be an extra channel for the stop bit in each input # sample. self._data['in_shape'] = [seq_width + 1] self._data['out_shape'] = [out_width] self._data['train_inds'] = np.arange(num_train) self._data['test_inds'] = np.arange(num_train, num_train + num_test) if pad_after_stop: self._data['avg_input_length'] = pat_len self._data['avg_output_length'] = (min_input_len + max_input_len)/2 else: self._data['avg_input_length'] = (min_input_len + max_input_len) / 2 self._data['avg_output_length'] = self._data['avg_input_length'] \ if pat_len == -1 else pat_len # Note, the following two attributes apply to outputs if # `pad_after_stop`. self._data['min_input_len'] = min_input_len self._data['max_input_len'] = max_input_len self._data['pat_len'] = pat_len self._data['scatter_pattern'] = scatter_pattern self._data['scatter_steps'] = out_pat_steps # Note that by default the number of timesteps in x is two times the # maximum input length (one for presenting it, and one for producing it # as an output) plus a delay of 1 timestep between the two. # If a certain number of timesteps is zeroed out in the input pattern, # then the target pattern will only correspond to the non-zeroed part. # It may also be that the output has a padded part. # Note that this step is important when computing the accuracy and loss # on the correct output timesteps. if pat_len == -1: self._data['seq_len'] = self._data['max_input_len'] * 2 + 1 else: # Note, the maximum sequence length is independent of the option # `pad_after_stop`. self._data['seq_len'] = self._data['max_input_len'] + 1 + \ pat_len if num_val is not None: n_start = num_train + num_test self._data['val_inds'] = np.arange(n_start, n_start + num_val) # get train and test data train_x, train_y, train_l, train_nz = \ self._generate_trial_samples(num_train) test_x, test_y, test_l, test_nz = self._generate_trial_samples(num_test) # Create validation data if requested. if num_val is not None: val_x, val_y, val_l, val_nz = self._generate_trial_samples(num_val) in_data = np.vstack([train_x, test_x, val_x]) out_data = np.vstack([train_y, test_y, val_y]) seq_lengths = np.concatenate([train_l, test_l, val_l]) zeroed_ts = np.concatenate([train_nz, test_nz, val_nz]) else: in_data = np.vstack([train_x, test_x]) out_data = np.vstack([train_y, test_y]) seq_lengths = np.concatenate([train_l, test_l]) zeroed_ts = np.concatenate([train_nz, test_nz]) self._data['in_data'] = in_data self._data['out_data'] = out_data # Note, inputs and outputs have the same lengths in this dataset. self._data['in_seq_lengths'] = seq_lengths self._data['out_seq_lengths'] = seq_lengths self._data['zeroed_ts'] = zeroed_ts def _generate_trial_samples(self, n_samples): """Generate a certain number of trials. Args: n_samples (int): The number of desired samples. Returns: (tuple): Tuple containing: - **x**: Matrix of trial inputs of shape ``[n_samples, in_size*time_steps]``. - **y**: Matrix of trial targets of shape ``[n_samples, in_size*time_steps]``. - **lengths**: Vector of trial lengths. - **zeroed_ts**: Vector of number of zeroed out timesteps. """ lengths = [] zeroed_ts = [] in_size = self._data['in_shape'][0] - 1 out_size = self._data['out_shape'][0] seq_len = self._data['max_input_len'] * 2 + 1 scatter_pattern = self._data['scatter_pattern'] # Randomly create a binary matrix. x = self._rstate.rand(self._data['seq_len'], n_samples, in_size) x = np.where(x>0.5, 1., 0) # Define y as the copy of x without the delimiter flag, delayed by the # maximum input length + 1. # Note that x and y have different dimensions in this step. Because # the output pattern might need to be manipulated differently for # different tasks in a way that requires having the entire input # sequence (i.e. when scattering patterns), we define y to be twice the # length of the maximum sample length plus one, and we will cut it if # necessary in :meth:`output_to_torch_tensor`. y = np.zeros((seq_len, n_samples, out_size)) # Cut each sample to a specific length randomly sampled around average # input length for this task. flag = np.zeros((self._data['seq_len'], n_samples, 1)) for i in range(n_samples): if self._data['min_input_len'] != self._data['max_input_len']: sample_input_len = self._rstate.randint(\ self._data['min_input_len'], self._data['max_input_len']+1) else: sample_input_len = self._data['min_input_len'] sample_output_len = sample_input_len # If needed, zero-out the last timesteps of the input, as # specified by the value `pat_len`. pat_len = self._data['pat_len'] if self._pad_after_stop and pat_len < sample_input_len: assert pat_len != -1 sample_input_len = pat_len elif not self._pad_after_stop and pat_len != -1 and \ pat_len < sample_output_len: sample_output_len = pat_len num_zeroed_ts = 0 if pat_len == -1 else \ max(0, sample_input_len-pat_len) if scatter_pattern or self._random_pad: assert not self._pad_after_stop x[sample_input_len:, i, :] = 0 elif pat_len == -1 or pat_len > sample_input_len: x[sample_input_len:, i, :] = 0 else: x[pat_len:, i, :] *= 0 # Copy the content of x into y, after a delay of 1 timestep. oo = sample_output_len - pat_len if self._pad_after_stop else 0 y[sample_input_len + 1 + oo: sample_input_len*2 + 1 + oo, i, :] = \ x[:sample_input_len, i, :out_size] # Add flag indicator to easily recover sequence lengths afterwards. y[sample_input_len, i, :] = -1 # FIXME it's ugly # Add the sequence stop flag. flag[sample_input_len, i, :] = 1. lengths.append(sample_input_len + 1 + sample_output_len) zeroed_ts.append(num_zeroed_ts) # Concatenate the flag to the inputs. x = np.concatenate((x, flag), axis=2) # Reshape x and y to fit the dataset class. x = self._flatten_array(x, ts_dim_first=True) y = self._flatten_array(y, ts_dim_first=True) return x, y, np.array(lengths), np.array(zeroed_ts)
[docs] def output_to_torch_tensor(self, *args, **kwargs): """Similar to method :meth:`input_to_torch_tensor`, just for dataset outputs. Args: (....): See docstring of method :meth:`data.dataset.Dataset.output_to_torch_tensor`. Returns: (torch.Tensor): The given input ``y`` as PyTorch tensor. It has dimensions ``[T, B, *out_shape]``, where ``T`` is the number of time steps (see attribute :attr:`max_num_ts_out`), ``B`` is the batch size and ``out_shape`` refers to the output feature shape, see :attr:`data.dataset.Dataset.out_shape`. """ y = super().output_to_torch_tensor(*args, **kwargs) n_samples = y.shape[1] scatter_pattern = self._data['scatter_pattern'] scatter_steps = self._data['scatter_steps'] input_len = self._data['min_input_len'] # Scatter the contents of x if needed. if scatter_pattern: for i in range(n_samples): # Get the correct sample length. sample_input_len = y[:,i,:][:,0].tolist().index(-1) # Get the sample pattern length. pat_len = self._data['pat_len'] if pat_len == -1 or pat_len > sample_input_len: pat_len = sample_input_len # That's just the complete input pattern. y_pattern_original = y[sample_input_len + 1:, i, :] # Note, the rest is cut away below. y[sample_input_len + 1:sample_input_len + 1 + pat_len, i, :] = \ y_pattern_original[scatter_steps] # Delete the stop flag indicator. y[y==-1] = 0. # Cut irrelevant timesteps for the loss from the output sequences. # This line only has an effect if `pad_len` was set. y = y[:self._data['seq_len'], :, :] seq_len, n_samples, out_size = y.shape pat_len = self._data['pat_len'] if self.permutation is not None: if pat_len == -1: output_len = input_len else: output_len = pat_len if self._pad_after_stop: output_len = input_len input_len = pat_len assert seq_len == input_len + 1 + output_len if self._permute_xor and self.permutation is not None: ### Apply permutation & xor iteratively y_pattern = y[input_len+1:, :, :] y_pattern_orig = y_pattern.clone() y_pattern_perm = torch.zeros_like(y_pattern) for p in range(self._permute_xor_iter): curr_perm = self.permutation if self._permute_xor_separate: curr_perm = self.permutation[p] # get permutation for all samples for i in range(n_samples): # What pattern to permute: if self._permute_xor_separate: # The original pattern. sample_pattern = y_pattern_orig[:, i, :].clone() else: # The result of the last XOR operation. sample_pattern = y_pattern[:, i, :].clone() sample_pattern = sample_pattern.flatten()[curr_perm] if self._pad_after_stop: y_pattern_perm[-pat_len:, i, :] = sample_pattern.view( \ (pat_len, out_size)) else: y_pattern_perm[:, i, :] = sample_pattern.view(\ (output_len, out_size)) # update y_pattern by applying xor operation y_pattern = torch.logical_xor(y_pattern_perm.bool(), y_pattern.bool()) y[-output_len:, :, :] = y_pattern.double() elif self.permutation is not None: ### Apply the permutation. # Note that the permutation can be applied either in the width or # time dimensions, or both. For the first case, at a given timestep # :math:`t` the binary vector of activations is permuted for the # targets. For the second case, given a certain input unit :math:`i` # the binary vector of activations is permuted across time, only for # the timesteps where the pattern is being reconstructed (not during # input). If both are active, the permutation is performed # independently for each point in both dimensions. y_pattern = y[input_len+1:, :, :] y_pattern_perm = torch.zeros_like(y_pattern) for i in range(n_samples): sample_pattern = y_pattern[:, i, :].clone() sample_pattern = sample_pattern.flatten()[self.permutation] if self._pad_after_stop: y_pattern_perm[-pat_len:, i, :] = sample_pattern.view( \ (pat_len, out_size)) else: y_pattern_perm[:, i, :] = sample_pattern.view( \ (output_len, out_size)) # Add the copied or permuted pattern to the target sequences. y[-output_len:, :, :] = y_pattern_perm return y
[docs] def get_out_pattern_bounds(self, sample_ids): """Get the start time step and length of the output pattern within the sequence. Note, input sequences may have varying length (even though they are padded to the same length). Assume we are considering a input of length 7, meaning that the total sequence would have the length 15 = 7 + 1 + 7 (input pattern presentation, stop bit, output pattern copying). In addition, assume that the maximum input length is 10 (hence, the maximum input length is 21 = 10 + 1 + 10). In this case, all sequences are padded to have length 21. For the sample in consideration (with input length 7), the output pattern sequence starts at index 8 and has a length of 7, or less, if the input contains some zeroed values. Hence, these two number would be returned for this sample. Args: (....): See docstring of method :meth:`data.sequential_data.SequentialDataset.\ get_in_seq_lengths`. Returns: (tuple): Tuple containing: - **start_inds** (numpy.ndarray): 1D array with the same length as ``sample_ids``, which contains the start index for output pattern in a given sample. - **lengths** (numpy.ndarray): 1D array containing the lengths of the pattern per given sample. """ seq_lengths = self.get_out_seq_lengths(sample_ids) # Get the actual zeroeing out per sample (since some might be shorter # than the number of timesteps to be masked out, it might be that the # actual number of timesteps that have been zeroed is smaller). zeroed_ts = self.get_zeroed_ts(sample_ids) # To compute the start index of target pattern. if self._pad_after_stop: out_pat_start_inds = \ np.ones_like(seq_lengths) * self._data['pat_len'] + 1 else: out_pat_start_inds = (seq_lengths + zeroed_ts) // 2 + 1 if self._pad_after_stop: lengths = zeroed_ts + self._data['pat_len'] return out_pat_start_inds, lengths else: # The actual pattern length after zeroing part of the pattern. pat_len = (seq_lengths + zeroed_ts)// 2 - zeroed_ts assert np.all(np.equal(seq_lengths, out_pat_start_inds + pat_len)) return out_pat_start_inds, pat_len
[docs] def get_zeroed_ts(self, sample_ids): """Get the number of zeroed timesteps in each input pattern. Note, if ``scatter_pattern`` was activated in the constructor, then this number does not refer to the number of padded steps in the input sequence but rather to the number of unused steps in the input sequence. However, those unused steps will still contain random patterns. Similarly, if argument ``random_pad`` is used. Note, if ``pad_after_stop`` was activated, then the zeroed timesteps actually occur after the stop bit, i.e., in the output part of the sequence. Args: (....): See docstring of method :meth:`get_in_seq_lengths`. Returns: (numpy.ndarray): A 1D numpy array. """ return self._data['zeroed_ts'][sample_ids]
[docs] def get_identifier(self): """Returns the name of the dataset.""" return 'Copy'
def _plot_config(self, inputs, outputs=None, predictions=None): """Defines properties, used by the method :meth:`plot_samples`. This method can be overwritten, if these configs need to be different for a certain dataset. Args: (....): See docstring of method :meth:`data.dataset.Dataset._plot_config`. Returns: (dict): A dictionary with the plot configs. """ plot_configs = dict() plot_configs['outer_wspace'] = 0.4 plot_configs['outer_hspace'] = 0.5 plot_configs['inner_hspace'] = 0.2 plot_configs['inner_wspace'] = 0.8 plot_configs['num_inner_rows'] = 1 if outputs is not None: plot_configs['num_inner_rows'] += 1 if predictions is not None: plot_configs['num_inner_rows'] += 1 plot_configs['num_inner_cols'] = 1 plot_configs['num_inner_plots'] = plot_configs['num_inner_rows'] return plot_configs def _plot_sample(self, fig, inner_grid, num_inner_plots, ind, inputs, outputs=None, predictions=None, equalize_size=False, mask_predictions=False, sample_ids=None): """Add a custom sample plot to the given Axes object. Note, this method is called by the :meth:`plot_samples` method. Note, that the number of inner subplots is configured via the method: :meth:`_plot_config`. Args: (....): See docstring of method :meth:`data.dataset.Dataset._plot_sample`. inputs: 2D Array with dimensions [1, self._data['in_shape']*self._data['num_timesteps']. outputs: 1D Array with dimensions [(self._data['in_shape']-1)*self._data['num_timesteps']. equalize_size (bool): Equalize the size of the input and output tensors by adding an empty row to the outputs (as it has one less unit). mask_predictions (bool): Mask given predictions to only show predicted values during output pattern presentation time. sample_ids (numpy.ndarray): See option ``sample_ids`` of method :meth:`get_out_pattern_bounds`. Only required if ``mask_predictions`` is ``True``. """ # Reshape the values to uncouple inputs and time. x = self._flatten_array(inputs, ts_dim_first=True, reverse=True, feature_shape=self.in_shape) wdiff = self.in_shape[0] - self.out_shape[0] pdata = [x] plabel = ['inputs'] if outputs is not None: t = self._flatten_array(outputs, ts_dim_first=True, reverse=True, feature_shape=self.out_shape) if equalize_size: t = np.concatenate([t, np.zeros((t.shape[0], t.shape[1], wdiff))], axis=2) pdata.append(t) plabel.append('outputs') if predictions is not None: y = self._flatten_array(predictions, ts_dim_first=True, reverse=True, feature_shape=self.out_shape) if equalize_size: y = np.concatenate([y, np.zeros((y.shape[0], y.shape[1], wdiff))], axis=2) if mask_predictions: assert sample_ids is not None ss, sl = self.get_out_pattern_bounds(sample_ids[[ind]]) for sind in range(y.shape[1]): y[:ss[sind], sind, :] = 0 y[(ss[sind]+sl[sind]):, sind, :] = 0 pdata.append(y) plabel.append('predictions') for i, d in enumerate(pdata): ax = plt.Subplot(fig, inner_grid[i]) # Note, we can't use `set_axis_off`, if we wanna keep the y-label. ax.set_ylabel(plabel[i]) ax.set_xticks([]) ax.set_yticks([]) ax.imshow(d.squeeze(axis=1).transpose()) fig.add_subplot(ax) def __str__(self): """Print major characteristics of the current dataset.""" return 'Data handler for the copy task dataset.\n' + \ 'Input Sequence width: %i \n' % self._data['in_shape'][0] + \ 'Output Sequence width: %i \n' % self._data['in_shape'][0] + \ 'Average input length: %.1f \n' % \ self._data['avg_input_length'] + \ 'Average output length: %.1f \n' % \ self._data['avg_output_length'] + \ 'Max. total number of timesteps: %i. \n' % \ self._data['seq_len'] + \ 'Dataset contains %d training, %d validation and %d test ' % \ (self.num_train_samples, self.num_val_samples, self.num_test_samples) + \ 'samples.' @property def permutation(self): """Getter for attribute :attr:`permutation_`""" return self.permutation_
[docs] @staticmethod def create_permutation_matrix(permute_time, permute_width, pat_len_perm, seq_width, rstate_permute, pairwise_permute=False, revert_output_seq=False): """Create a permutation matrix. Args: pairwise_permute (boolean, optional): If True, the permutations correspond to switching the position of neighboring pixels. For example `1234567` would become `2143657`. If the number of timesteps is odd, the last timestep is left unmoved. revert_output_seq (boolean, optional): If True, the output sequences will be inverted along the time dimension. I.e. a pattern `1234567` would become `7654321`. """ P = None if permute_width or permute_time or revert_output_seq: P = np.arange(pat_len_perm*seq_width) if permute_width and pairwise_permute: raise NotImplementedError if permute_width and permute_time: P = rstate_permute.permutation(P) elif permute_time: P = P.reshape((pat_len_perm, seq_width)) if pairwise_permute: num_ts = P.shape[0] order = [(b,a) for a, b in zip(np.arange(0, num_ts-1, 2),\ np.arange(1, num_ts, 2))] order = [item for it in order for item in it] # If odd number of timesteps, last one remains unpermuted. if num_ts % 2 != 0: order.append(num_ts - 1) assert len(order) == num_ts P = P[order, :] else: P = rstate_permute.permutation(P) P = P.flatten() elif permute_width: P = P.reshape((pat_len_perm, seq_width)) P = rstate_permute.permutation(P.T).T P = P.flatten() elif revert_output_seq: P = P.reshape((pat_len_perm, seq_width)) P = P[::-1] P = P.flatten() return P
if __name__ == '__main__': pass