Source code for derrom.utils

import numpy as np



#######################################
### KFold cross validation
#######################################

[docs]def get_KFolds(data_list, folds=5): """ Split the given list of trajectories into K-folds for cross-validation. Parameters ---------- data_list : list An list of trajectories, where each trajectory is a 2D numyp.ndarray. folds : int, optional The number of folds to create. Defaults to 5. Returns ------- folds : list A list of K folds, where each fold is a list of trajectories. """ KFolds = np.array_split(data_list, folds) #numpy returns list of ndarrays KFolds = [list(array) for array in KFolds] #covert ndarrays back to list such that KFolds is a list of lists of ndarrays (the individual trajectories) return KFolds
[docs]def get_KFold_CV_scores(model, trajectories, targets='AR', folds=5, seed=817, norms = ['rms'], train_kwargs={}): """ Conduct K-fold cross-validation on the given model using the given trajectories and targets, and return the scores for each fold. Parameters ---------- model : object An object that implements the `fit()` and `score_multiple_trajectories()` methods, which will be used to train and score the model. trajectories : list An list of trajectories, where each trajectory is a 2D numpy.ndarray. targets : list or str, optional An list of targets corresponding to the given trajectories, or 'AR' for autoregression mode. Defaults to 'AR'. folds : int, optional The number of folds to create for cross-validation. Defaults to 5. seed : int, optional The seed value to use for the random number generator when shuffling the trajectories. Defaults to 817. norms : list of str, optional A list of error norms to use for scoring the trajectories. Defaults to ['rms']. train_kwargs : dict, optional A dictionary of keyword arguments to pass to the `fit()` method when training the model. Defaults to {}. Returns ------- scores : list A list of n lists, where n is the number of error norms specified in `norms`. Each list contains the scores for each each error norm. """ if targets != 'AR': assert len(trajectories) == len(targets) #create shuffled copy of the trajectories rng = np.random.default_rng(seed=seed) shuffled_inds = [i for i in range(len(trajectories))] rng.shuffle(shuffled_inds) strajectories = [trajectories[ind] for ind in shuffled_inds] if targets != 'AR': stargets = [targets[ind] for ind in shuffled_inds] #split trajectories into folds KFold_trajectories = get_KFolds(strajectories, folds=folds) if targets != 'AR': KFold_targets = get_KFolds(stargets, folds=folds) scores = [[] for n in range(len(norms))] #of the individual folds - for each error norm in the norms list for k in range(folds): train_trajectories = KFold_trajectories.copy() test_trajectories = train_trajectories.pop(k) #test_trajectories = the kth fold, train_trajectories to remaining folds train_trajectories = [item for sublist in train_trajectories for item in sublist] #unpack the training folds into one flattened list if targets != 'AR': train_targets = KFold_targets.copy() test_targets = train_targets.pop(k) train_targets = [item for sublist in train_targets for item in sublist] else: test_targets = None #train the model on the training trajectories and get scores from the testing trajectories if targets == 'AR': model.fit(trajectories=train_trajectories, targets='AR', **train_kwargs) else: model.fit(trajectories=train_trajectories, targets=train_targets, **train_kwargs) predictions=[] for trajectory in test_trajectories: predictions.append(model.predict(trajectory)) #score the test trajectories for each error norm in the norms list for l,norm in enumerate(norms): fold_mean_score, fold_all_scores = model.score_multiple_trajectories(test_trajectories, test_targets, predictions=predictions, norm=norm) scores[l].append(fold_all_scores) for n in range(len(norms)): scores[n] = [item for sublist in scores[n] for item in sublist] return scores
####################################### ### save and load models ####################################### import pickle
[docs]def save_model(model,filename='../model.obj'): """ Save a model object to a file using the pickle module. Parameters ---------- model : object The model to be saved to a file. filename : str, optional The name of the file to save the object to. Defaults to '../model.obj'. """ file = open(filename, 'wb') pickle.dump(model, file)
[docs]def load_model(filename='../model.obj'): """ Load a Python object from a file using the pickle module. Parameters ---------- filename : str, optional The name of the file to load the object from. Defaults to '../model.obj'. Returns ------- model : object The loaded model object. """ file = open(filename, 'rb') return pickle.load(file)
####################################### ### save and load trajectories #######################################
[docs]def save_trajectories(trajectories, filename='../trajectories'): """ Save the given list of trajectories to a numpy .npz file. Parameters ---------- trajectories : list list of trajectories to save. filename : str, optional Name of the file to save the trajectories to. Defaults to '../trajectories'. """ np.savez(filename, trajectories)
[docs]def load_trajectories(filename='../trajectories.npz'): """ Load a list of trajectories from a numpy .npz file. Parameters ---------- filename : str, optional Name of the file to load the trajectories from. Defaults to '../trajectories.npz'. Returns ------- trajectories : list List of loaded trajectories if file exists, otherwise None. """ from os.path import exists if not exists(filename): print('trajectories file ot found') return None else: npz_trajectories = np.load(filename) trajectories = np.split(npz_trajectories['arr_0'], npz_trajectories['arr_0'].shape[0], axis=0) for k in range(len(trajectories)): trajectories[k] = np.reshape(trajectories[k], trajectories[k].shape[1:]) return trajectories
####################################### ### plot shortcuts ####################################### import matplotlib.pyplot as plt import matplotlib.colors as colors
[docs]def plot_trajectory(data,title='trajectory'): """ Plot a 2D trajectory using matplotlib. Parameters ---------- data : 2D numpy.ndarray A 2D array of the trajectory to be plotted. Each row represents a time step and each column represents a state variable. title : str, optional The title of the plot. Defaults to 'trajectory'. """ plt.imshow(data, aspect='auto', interpolation='none',origin='lower',cmap='Reds') plt.title(title) plt.ylabel(r'time $t$') plt.xlabel(r'state variable $s_n$') cb = plt.colorbar() cb.set_label('value') plt.show()
[docs]def plot_difference(test,truth,title='difference'): """ Plot the element-wise difference between two 2D arrays using matplotlib. Parameters ---------- test : numpy.ndarray The first 2D array to be compared. truth : numpy.ndarray The second 2D array to be compared. title : str, optional The title of the plot. Defaults to 'difference'. """ err = test-truth plt.imshow(err, aspect='auto', interpolation='none',origin='lower',cmap='bwr', norm=colors.CenteredNorm(vcenter=0.0)) plt.title(title) plt.ylabel(r'time $t$') plt.xlabel(r'state variable $s_n$') cb = plt.colorbar() cb.set_label('error') plt.show()
####################################### ### class to benchmark the dimensionality reduction with the KFold function #######################################
[docs]class reducer_helper_class: """ A helper class for benchmarking dimensionality reducers. Implements the `fit()` and `score_multiple()` methods to work with the `get_KFold_CV_scores()` function. Parameters ---------- dim_reducer : object or None, optional The dimensionality reduction object rdim : int, optional The reduced dimensionality, i.e., the number of components to retain after dimensionality reduction. Default is 1. Attributes ---------- rdim : int The reduced dimensionality, i.e., the number of components to retain after dimensionality reduction. dim_reducer : object The dimensionality reduction object. reg_mode : str This string is set to 'AR' to properly work the the KFold_CV function """ def __init__(self, dim_reducer = None, rdim = 1): self.rdim = rdim self.dim_reducer = dim_reducer self.reg_mode = 'AR' #fake AR to make the KFold CV scores work
[docs] def fit(self, trajectories, targets='AR', rdim=None, dim_reducer = None): if dim_reducer != None: self.dim_reducer = dim_reducer if self.dim_reducer == None: raise ValueError('no dim reducer object as been passed') if rdim != None: self.rdim = rdim data_matrix = np.concatenate(trajectories,axis=0) self.dim_reducer.train(data_matrix, self.rdim)
[docs] def predict(self, run, rdim=None): if rdim == None: rdim = self.rdim return self.dim_reducer.reconstruct( self.dim_reducer.reduce(run, rdim) )
[docs] def get_error(self, trajectory, pred=None, rdim=None, norm='rms'): if rdim == None: rdim = self.rdim if pred is None: pred = self.predict(trajectory, rdim=rdim) err=-1. if norm=='fro': err = np.linalg.norm(trajectory-pred, ord='fro') elif norm =='max': err = np.abs(trajectory-pred).max() elif norm == 'rms': err = np.sqrt( np.mean( np.square(trajectory-pred) ) ) else: print('unknown norm') return err
[docs] def score_multiple_trajectories(self,trajectories, targets=None, predictions=None, **kwargs): scores = [] if predictions is None: for k in range(len(trajectories)): scores.append(self.get_error(trajectory=trajectories[k], **kwargs)) else: for k in range(len(trajectories)): scores.append(self.get_error(trajectory=trajectories[k], pred=predictions[k], **kwargs)) mean = np.mean(scores) return mean, scores
####################################### ### ode integrator classes, which allow for the integration of derrom. #######################################
[docs]class ivp_integrator: """ Quick and dirty numerical integrator for solving initial value problems (IVPs), where either all or only parts of the right-hand-side of the equations of motion are provided by a data-driven model. Implements the `fit()` and `score_multiple()` methods to work with the `get_KFold_CV_scores()` function. Parameters ---------- model : object Model object with the methods 'predict' and 'fit'. 'predict' provides either all or parts of the derivatives. The 'fit' method can be invoked by the integrator during KFold_CV scoring. derivs : function, optional Function that calculates the system derivatives with respect to time. Default is None, in which case the 'predict' method of the model object is used. dt : float, optional Time step for numerical integration. Default is 1. dt_out : float, optional Time step for saving output data. Default is 1. method : {'Heun', 'Euler'}, optional Numerical integration method to use. Default is 'Heun'. Attributes ---------- model_hist_option : bool Boolean indicating whether the model's 'full_hist' attribute is True or False. """ def __init__(self, model, derivs=None, dt=1., dt_out=1., method='Heun'): self.dt = dt self.dt_out = dt_out self.method = method self.targets = 'AR' self.model = model self.model_hist_option = model.full_hist self.model.full_hist = True if derivs is None: self.derivs = self.model.predict else: self.derivs = derivs
[docs] def fit(self, **kwargs): """ fits the data driven model to the training data presented via the `**kwargs`. Used by the KFold_CV function """ self.model.full_hist = self.model_hist_option self.model.fit(**kwargs) self.model.full_hist = True
def __Euler(self, init, n_steps, dt, dt_out): sol = np.zeros((n_steps,init.shape[1])) sol[:init.shape[0]] = init state = sol[:1] j_out = int(dt_out/dt) j_max = sol.shape[0]*j_out for j in range(1,sol.shape[0]*j_out): state = state + dt*self.derivs(state) if j%j_out == 0: sol[j//j_out] = state return sol def __Euler_wdelay(self, init, n_steps, dt, dt_out): sol = np.zeros((n_steps,init.shape[1])) sol[0] = init[0] state = sol[:1] j_out = int(dt_out/dt) j_max = sol.shape[0]*j_out hist_length = (self.model.DE_l-1)*j_out + 1 hist_ind = hist_length - 1 hist = np.zeros((hist_length,init.shape[1])) for k in range(hist.shape[0]): hist[k] = init[0] for j in range(1,sol.shape[0]*j_out): vecs = np.stack( [ hist[(hist_ind - n*j_out + hist_length)%hist_length] for n in range(self.model.DE_l-1,-1,-1)] ) state = state + dt*self.derivs(vecs) hist_ind = (hist_ind+1)%hist_length hist[hist_ind] = state if j%j_out == 0: sol[j//j_out] = state return sol def __Heun(self, init, n_steps, dt, dt_out): sol = np.zeros((n_steps,init.shape[1])) sol[:init.shape[0]] = init state = sol[:1] j_out = int(dt_out/dt) j_max = sol.shape[0]*j_out for j in range(1,sol.shape[0]*j_out): f1 = self.derivs(state) f2 = self.derivs(state + dt*f1) state = state + 0.5*dt*(f1+f2) if j%j_out == 0: sol[j//j_out] = state return sol def __Heun_wdelay(self, init, n_steps, dt, dt_out): sol = np.zeros((n_steps,init.shape[1])) sol[0] = init[0] state = sol[:1] j_out = int(dt_out/dt) j_max = sol.shape[0]*j_out hist_length = (self.model.DE_l-1)*j_out + 1 hist_ind = hist_length - 1 hist = np.zeros((hist_length,init.shape[1])) for k in range(hist.shape[0]): hist[k] = init[0] for j in range(1,sol.shape[0]*j_out): vecs = np.stack( [hist[(hist_ind - n*j_out + hist_length)%hist_length] for n in range(self.model.DE_l-1,-1,-1)] ) f1 = self.derivs(vecs).flatten() hist_ind = (hist_ind+1)%hist_length vecs = np.stack( [hist[(hist_ind - n*j_out + hist_length)%hist_length] for n in range(self.model.DE_l-1,0,-1)]+[(state+dt*f1).flatten()] ) f2 = self.derivs(vecs) state = state + 0.5*dt*(f1+f2) hist[hist_ind] = state if j%j_out == 0: sol[j//j_out] = state return sol
[docs] def integrate(self, init, n_steps, dt=None, dt_out=None): """ Integrate the differential equations using the specified method and time step. Parameters ---------- init : 2D numpy.ndarray Initial state of the system. n_steps : int Number of integration steps to perform. dt : float, optional Time step for the integration. Defaults to `self.dt` if not specified. dt_out : float, optional Time step for outputting results. Defaults to `self.dt_out` if not specified. Returns ------- trajectory : 2D numpy.ndarray Array containing the state of the system at each time step. """ if dt is None: dt = self.dt if dt_out is None: dt_out = self.dt_out if self.method == 'Heun': if self.model.DE_l == 1: return self.__Heun(init,n_steps,dt,dt_out) else: return self.__Heun_wdelay(init,n_steps,dt,dt_out) elif self.method == 'Euler': if self.model.DE_l == 1: return self.__Euler(init,n_steps,dt,dt_out) else: return self.__Euler_wdelay(init,n_steps,dt,dt_out) else: raise ValueError('integration method >> ' + self.method + ' << does not exist')
[docs] def predict(self, trajectory): """ Use the first time steps of the given trajectory as initial conditions and integrate the system to obtain a solution with identical length. Intended to score the solutions against the input trajectories. Parameters ---------- trajectory : 2D numpy.ndarray Test trajectory, i.e., ground truth Returns ------- prediction : 2D numpy.ndarray An array containing the predicted trajectory of the system. """ return self.integrate(trajectory, trajectory.shape[0])
[docs] def get_error(self, truth, pred=None, norm='rms'): """ Calculates the error between the ground truth values and predicted values. Parameters ---------- truth: 2D numpy.ndarray The ground truth values. pred: 2D numpy.ndarray, optional The predicted values. If not provided, the function uses the predict method of the class to make predictions. norm: str, optional The type of norm to be used to calculate the error. Default is 'rms'. Possible values are: - 'rms': Normalized Frobenius norm - 'fro': Frobenius norm - 'max': Absolute maximum norm Returns ------- error : float The calculated error value. """ if pred is None: pred = self.predict(truth) assert pred.shape == truth.shape err = -1. if norm =='rms': #normalized Frobenius norm err = np.sqrt( np.mean( np.square(truth-pred) ) ) elif norm == 'fro': #Frobenius norm err = np.linalg.norm(truth-pred, ord='fro') elif norm =='max': #absolute max norm err = np.abs(truth-pred).max() else: print('unknown norm') return err
[docs] def score_multiple_trajectories(self, trajectories, targets=None, predictions=None, **kwargs): """ helper function to obtain error scores for multiple trajectories. Used by the derrom.utils.get.get_KFold_CV_scores function. Parameters ---------- trajectories : list A list of the trajectories to be scored, where each element of the list is expected to be a 2D numpy.ndarray that represents an individual trajectories. Time slices must be stored in the rows (first index) and the system state variables in the columns (second index). All trajectories must have the same number of variables (columns), the number of time slices, however, may vary. targets : list A list of the targets, where each element is an numpy.ndarray that corresponds to the trajectory with the identical list index. Each element must have the same number of rows as the corresponding trajectory. If set to 'AR', i.e., autoregression, no targets are required. predictions : list A list of the predictions, where each element is an numpy.ndarray that corresponds to the trajectory with the identical list index. Supplying predictions reduces computaion time if derrom.utils.get.get_KFold_CV_scores is to be evaluated for multiple error norms. Returns ------- mean : float mean regression error from all supplied trajectories scores : list list of all individual regression errors """ scores = [] if predictions is None: for k in range(len(trajectories)): scores.append(self.get_error(trajectories[k],**kwargs)) else: assert len(trajectories) == len(predictions) for k in range(len(trajectories)): scores.append(self.get_error(trajectories[k], pred=predictions[k], **kwargs)) mean = np.mean(scores) return mean, scores
[docs]class PHELPH_ivp_integrator(ivp_integrator):
[docs] def fit(self, trajectories, targets, **kwargs): el_trajectories = [trajectory[:,:-1] for trajectory in trajectories] self.model.fit(el_trajectories, targets, **kwargs)
[docs] def get_error(self, truth, pred=None, norm='rms'): if pred is None: pred = self.predict(truth) assert pred.shape == truth.shape I_truth = truth[:,-1] el_truth = truth[:,:-1] I_pred = pred[:,-1] el_pred = pred[:,:-1] err = -1. if norm =='rms': err = np.sqrt( np.mean( np.square(el_truth-el_pred) ) ) elif norm =='max': #absolute max norm err = np.abs(el_truth-el_pred).max() elif norm == 'I_max': err = (I_pred.max()- I_truth.max()) elif norm == 'I_max_pos': err = (np.argmax(I_pred)-np.argmax(I_truth))*self.dt_out elif norm == 'I_area': err = (np.sum(I_pred) - np.sum(I_truth)) else: print('unknown norm') return err