import numpy as np
from sklearn.base import BaseEstimator
import derrom.optimizers as optimizers
[docs]class derrom_estimator(BaseEstimator):
"""
Delay Embedded Regressive Reduced Order Model (derrom)
This is the estimator class with facilitates the training of the model, and the prediction and scoring of trajectories. It is designed to be modular, such that different approaches for the dimensionality reduction, feature scaling, nonlinear transformation and regression weight optimization can be passed via objects that implement the appropriate methods.
Parameters
----------
rdim : int
number of reduced latent space dimensions. The default rdim = 1 is most likely an inappropriate choice...
DE_l : int
delay embedding length. The default DE_l = 1 corresponds to no embedding, i.e., only the most recent system state is used.
full_hist : bool
If set to False (default), the delay embedding is padded with the least recent state if no sufficient system history is available
intercept : bool
If set to True, a bias/intercept term is added to the regression step.
dim_reducer : dim_reducer object
Object that implements the train, reduce, and reconstruct methods. If set to None, no dimensionality reduction is performed. More details on the dimensionality reduction implementation in :doc:`dim_reducers`.
scaler : scaler object
Object that implements the train, transform, and inverse transform methods. If set to None, no feature scaling is performed. More details on the feature scaler implementation in :doc:`scalers`.
NL_transformer : transformer object
Object that implements the setup and transform methods. If set to None, no nonlinear transform of the delay-embedded system state is performed. More details on the nonlinear transformer implementation in :doc:`transformers`.
optimizer : optimizer object
Object that implements the solve method. In set to None, a least-squares method is applied. More details on the optimizer implementation in :doc:`optimizers`.
Class attributes: (attributes, which are generated during the initialization are omitted.)
Attributes
----------
w : numpy.ndarray
Regression weights obtained from the fit method contained in a matrix (2D numpy.ndarray)
reg_mode : str
Regression mode. Can be set to 'AR' (autoregression) by the fit method. Toggles the predict method to forecast in autonomous mode, i.e., to feed its own output back as an input.
"""
def __init__(self, rdim = 1, DE_l = 1, full_hist=False, intercept = False, dim_reducer = None, scaler = None, NL_transformer = None, optimizer = None):
self.w = None
self.reg_mode = None
self.n_target_vars = None
self.rdim = rdim
self.DE_l = DE_l
self.intercept = intercept
self.full_hist = full_hist
if dim_reducer != None:
self.dim_reducer = dim_reducer
self.reduce_dim = True
else:
self.reduce_dim = False
if scaler != None:
self.scaler = scaler
self.standardize = True
else:
self.standardize = False
if NL_transformer != None:
self.NL_transformer = NL_transformer
self.NL_transform = True
else:
self.NL_transform = False
if optimizer != None:
self.optimizer = optimizer
else:
self.optimizer = optimizers.lstsqrs()
def __compare_trajectories_targets(self,trajectories,targets):
n_trajectory_data_vectors = np.sum([trajectory.shape[0] for trajectory in trajectories])
n_target_data_vectors = np.sum([target.shape[0] for target in targets])
if (n_trajectory_data_vectors == n_target_data_vectors):
return True
else:
return False
def __build_DE_vec(self, matrix, row, DE_l):
if DE_l == 1:
return matrix[row]
else:
DE_vec = []
for k in range(DE_l):
if row+k < 0:
DE_vec.append(matrix[0])
else:
DE_vec.append(matrix[row+k])
return np.concatenate(DE_vec, axis=0)
def __build_DE_matrix(self, reduced_trajectories):
assert self.DE_l > 0
if self.DE_l == 1:
return np.concatenate(reduced_trajectories,axis=0)
else:
DE_matrix = []
for r in range(len(reduced_trajectories)):
if(self.full_hist == False):
nRows = reduced_trajectories[r].shape[0]
Delta_j = self.DE_l-1
else:
nRows = reduced_trajectories[r].shape[0]-(self.DE_l-1)
Delta_j = 0
nCols = self.rdim*self.DE_l
run_DE_matrix = np.zeros((nRows,nCols))
for j in range(nRows):
run_DE_matrix[j] = self. __build_DE_vec(reduced_trajectories[r][:,:self.rdim], j-Delta_j, self.DE_l)
DE_matrix.append(run_DE_matrix)
DE_matrix = np.concatenate(DE_matrix, axis=0)
return DE_matrix
def __build_target_matrix(self, targets):
if self.full_hist == False:
target_matrix = np.concatenate(targets, axis=0)
else:
assert self.DE_l > 0
target_matrix = []
for r in range(len(targets)):
target_matrix.append(targets[r][self.DE_l-1:])
target_matrix = np.concatenate(target_matrix, axis=0)
return target_matrix
[docs] def fit(self, trajectories, targets='AR', rdim = None, DE_l = None, intercept=None, full_hist=None, dim_reducer = None, scaler = None, NL_transformer = None, optimizer = None):
"""
fit a derrom model
The fit method takes care of the proper delay embedding.
Parameters
----------
trajectories : list
A list of the training trajectories, where each element of the list is expected to be a 2D numpy.ndarray that represents an individual trajectories. Time slices must be stored in the rows (first index) and the system state variables in the columns (second index). All trajectories must have the same number of variables (columns), the number of time slices, however, may vary.
targets : 'AR', list
A list of the training targets, where each element is an numpy.ndarray that corresponds to training trajectory with the identical list index. Each element must have the same number of rows as the corresponding trajectory. If set to 'AR', i.e., autoregression, the targets are automatically generated from the time-shifted trajectories and the last and first time slices are dropped from the training and target data, respectively.
The remaining parameters are identical to the init method
"""
n_trajectories = len(trajectories)
if targets == 'AR':
self.reg_mode = 'AR'
else:
self.reg_mode = 'reg'
n_targets = len(targets)
self.n_target_vars = targets[0][0].size
### check data consistency
assert self.__compare_trajectories_targets(trajectories,targets)
if rdim != None:
self.rdim = rdim
if DE_l != None:
self.DE_l = DE_l
if intercept != None:
self.intercept = intercept
if full_hist != None:
self.full_hist = full_hist
if dim_reducer != None:
self.dim_reducer = dim_reducer
self.reduce_dim = True
if scaler != None:
self.scaler = scaler
self.standardize = True
if NL_transformer != None:
self.NL_transformer = NL_transformer
self.NL_transform = True
if optimizer != None:
self.optimizer = optimizer
#apply the dimensionality reduction to get the reduced coefficient matrix with rdim features via the dim_reducer object
if self.reduce_dim == True:
self.dim_reducer.train(np.concatenate(trajectories,axis=0),self.rdim)
reduced_trajectories = [self.dim_reducer.reduce(trajectory,self.rdim) for trajectory in trajectories]
else:
reduced_trajectories = trajectories
self.rdim = trajectories[0].shape[1]
#apply data/feature scaling via scaler object
if self.standardize:
self.scaler.train(np.concatenate(reduced_trajectories,axis=0))
reduced_trajectories = [self.scaler.transform(reduced_trajectory) for reduced_trajectory in reduced_trajectories]
#create training data matrices
if self.reg_mode == 'reg':
training_matrix = self.__build_DE_matrix(reduced_trajectories)
target_matrix = self.__build_target_matrix(targets)
else:
training_matrix = self.__build_DE_matrix( [reduced_trajectory[:-1] for reduced_trajectory in reduced_trajectories] )
target_matrix = self.__build_target_matrix( [reduced_trajectory[1:] for reduced_trajectory in reduced_trajectories] )
#apply transformation to the DE state
if self.NL_transform == True:
self.NL_transformer.setup(training_matrix.shape[1])
training_matrix = self.NL_transformer.transform(training_matrix)
#add bias/intercept
if self.intercept:
training_matrix = np.concatenate( [ training_matrix, np.ones( (training_matrix.shape[0],1) ) ] , axis=1 )
#calculate weight matrix via optimizer object
self.w = self.optimizer.solve(training_matrix, target_matrix)
[docs] def predict(self, trajectory):
"""
Predicts targets for each snapshot of the input trajectory. In autoregression mode, the first time slice is used (the first DE_l in the case of full_hist = True) as initial conditions to generate a prediction with the same length (rows) as the input trajectory.
Parameters
----------
trajectory : 2D numpy.ndarray
Trajectory where the time slices are stored in the rows (first index) and the state variables in the columns (second indx)
Returns
-------
matrix : 2D numpy.ndarray
Calculated prediction
"""
if self.reg_mode == 'AR':
return self.forecast(trajectory,trajectory.shape[0])
else:
if trajectory.ndim == 1:
trajectory = trajectory.reshape(1,-1)
#apply the dimensionality reduction to the trajectory
if self.reduce_dim == True:
reduced_trajectory = self.dim_reducer.reduce(trajectory,self.rdim)
else:
reduced_trajectory = trajectory
#apply data/feature scaling
if self.standardize:
reduced_trajectory = self.scaler.transform(reduced_trajectory)
#setup numpy array for the prediction
if self.full_hist == True:
pred_length = trajectory.shape[0]-(self.DE_l-1)
if self.full_hist == False:
pred_length = trajectory.shape[0]
pred = np.zeros((pred_length,self.n_target_vars))
if self.NL_transform == True:
feature_matrix = self.NL_transformer.transform(self.__build_DE_matrix([reduced_trajectory]))
else:
feature_matrix = self.__build_DE_matrix([reduced_trajectory])
#add bias/intercept
if self.intercept:
feature_matrix = np.concatenate( [ feature_matrix, np.ones( (feature_matrix.shape[0],1) ) ] , axis=1 )
#let the machine predict the dynamics
for j in range(0, pred.shape[0]):
#predict the next step
pred[j] = feature_matrix[j] @ self.w
return pred
[docs] def forecast(self,init,n_steps):
"""
Forecast n steps into the future in autoregression mode
Parameters
----------
init : 2D numpy.ndarray
Initial condition. For full_hist = False, only the first snapshot (first row) of is used. Otherwise the first DE_l snapshots are used.
n_steps : int
Length of the forecasted trajectory. The initial condition is thus included in n_steps
Returns
-------
matrix : 2D numpy.ndarray
Forecasted trajectory
"""
assert self.reg_mode == 'AR'
#apply the dimensionality reduction to the initital conditions
if self.reduce_dim == True:
reduced_init = self.dim_reducer.reduce(init,self.rdim)
else:
reduced_init = init
#apply data/feature scaling
if self.standardize:
reduced_init = self.scaler.transform(reduced_init)
#setup numpy array for the auto prediction
pred = np.zeros((n_steps,reduced_init.shape[1]))
#build initial condition for the auto predictions
if (self.full_hist == False):
j_start = 1
pred[0] = reduced_init[0]
else:
j_start = self.DE_l
pred[:self.DE_l] = reduced_init[:self.DE_l]
#let the machine predict the dynamics
for j in range(j_start,pred.shape[0]):
#build the DE vector from the past steps
DE_vec = self.__build_DE_vec(pred[:,:self.rdim], j-self.DE_l, self.DE_l)
DE_vec = DE_vec.reshape((1,DE_vec.size))
#apply transformation to the DE state
if self.NL_transform == True:
transform = self.NL_transformer.transform(DE_vec)
#add intercept/bias
if self.intercept:
transform = np.append(transform, 1.0)
#predict the next step
pred[j] = transform @ self.w
#undo the data/feature scaling
if self.standardize:
pred = self.scaler.inverse_transform(pred)
#reconstruct the full from the reduced representation
if self.reduce_dim == True:
pred = self.dim_reducer.reconstruct(pred)
return pred
[docs] def get_error(self, trajectory=None, truth=None, pred=None, norm='rms'):
"""
Computes the regression error
If no prediction is supplied, but the the trajectory, the corresponding prediction is computed first.
Parameters
----------
trajectory : 2D numpy.ndarray
If no prediction is supplied, it can be computed from the trajectory. Interchangeable with truth in autoregression (AR) mode
truth : 2D numpy.ndarray
Ground truth, against which the prediction is compared. Interchangeable with trajectory in autoregression (AR) mode
pred : 2D numpy.ndarray
Prediction corresponding to the truth.
norm : str, "rms", "fro", "ma"
Error norm chosen by the corresponding string. "rms" refers to the root-mean-squared error regarding all matrix elements, "fro" refers to the frobenius norm (proportional to the rms error, but not normalized to the number of matrix elements), and "max" refers to the maximum absolute error regarding all matrix elements.
Returns
-------
regression error : float
the regression error for a valid norm and -1 for an invalid norm.
"""
if self.reg_mode == 'AR':
if truth is None and trajectory is None:
raise ValueError('no trajectory supplied')
elif truth is None:
truth = trajectory
elif trajectory is None:
trajectory = truth
else:
if truth is None:
raise ValueError('no truth supplied')
if pred is None:
if trajectory is not None:
pred = self.predict(trajectory)
else:
raise ValueError('no trajectory supplied to compute prediction')
err = -1.
if norm =='rms': #normalized Frobenius norm
err = np.sqrt( np.mean( np.square(truth-pred) ) )
elif norm == 'fro': #Frobenius norm
err = np.linalg.norm(truth-pred, ord='fro')
elif norm =='max': #absolute max norm
err = np.abs(truth-pred).max()
else:
print('unknown norm')
return err
[docs] def score_multiple_trajectories(self,trajectories, targets=None, predictions=None, **kwargs):
"""
helper function to obtain error scores for multiple trajectories. Used by the derrom.utils.get.get_KFold_CV_scores function.
Parameters
----------
trajectories : list
A list of the trajectories to be scored, where each element of the list is expected to be a 2D numpy.ndarray that represents an individual trajectories. Time slices must be stored in the rows (first index) and the system state variables in the columns (second index). All trajectories must have the same number of variables (columns), the number of time slices, however, may vary.
targets : list
A list of the targets, where each element is an numpy.ndarray that corresponds to the trajectory with the identical list index. Each element must have the same number of rows as the corresponding trajectory. If set to 'AR', i.e., autoregression, no targets are required.
predictions : list
A list of the predictions, where each element is an numpy.ndarray that corresponds to the trajectory with the identical list index. Supplying predictions reduces computaion time if derrom.utils.get.get_KFold_CV_scores is to be evaluated for multiple error norms.
Returns
-------
mean : float
mean regression error from all supplied trajectories
scores : list
list of all individual regression errors
"""
scores = []
if predictions is None:
for k in range(len(trajectories)):
if targets is None or self.reg_mode=='AR':
scores.append(self.get_error(trajectory=trajectories[k], **kwargs))
else:
scores.append(self.get_error(trajectory=trajectories[k], truth=targets[k], **kwargs))
else:
for k in range(len(trajectories)):
if targets is None or self.reg_mode=='AR':
scores.append(self.get_error(trajectory=trajectories[k], pred=predictions[k], **kwargs))
else:
scores.append(self.get_error(trajectory=trajectories[k], truth=targets[k], pred=predictions[k], **kwargs))
mean = np.mean(scores)
return mean, scores
[docs] def print_status(self):
"""
print values of the relevant class attributes
"""
assert self.w is not None
print('full_hist: ', self.full_hist)
print('intercept: ', self.intercept)
print('standardize: ', self.standardize)
print('rdim: ', self.rdim)
print('DE_l: ', self.DE_l)
print('weights shape: ', self.w.shape)