from abc import ABC, abstractmethod
from ..parameter import DataParameter, SingleDataParameter
from ..physics import Constants
import numpy as np
import deepxde as dde
[docs]
class DataBase(ABC):
""" Base class of data
"""
subclasses = {}
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls.subclasses[cls._DATA_TYPE] = cls
[docs]
@classmethod
def create(cls, data_type, **kwargs):
if data_type not in cls.subclasses:
raise ValueError(f"Data type {format(data_type)} is not defined")
return cls.subclasses[data_type](**kwargs)
def __init__(self, parameters=SingleDataParameter()):
# parameters
self.parameters = parameters
# load data to dict
self.X_dict = {}
self.data_dict = {}
self.mask_dict = {}
self.mesh_dict = {}
# input to PINN
self.X = None
# reference solution of the output of PINN
self.sol = None
[docs]
@abstractmethod
def get_ice_coordinates(self, mask_name=""):
""" get ice masks if available from the data
"""
pass
[docs]
@abstractmethod
def load_data(self, domain, physics):
""" load data from `self.parameters.data_path`, within the given `domain` and `physics`
"""
pass
[docs]
@abstractmethod
def prepare_training_data(self):
""" prepare training data according to the `data_size`
"""
pass
[docs]
class Data(Constants):
""" class of data with all data used
"""
def __init__(self, parameters=DataParameter()):
super().__init__()
self.parameters = parameters
# create all instances of Data based on its source, we can have multiple data from the same source
self.data = {k:DataBase.create(parameters.data[k].source, parameters=parameters.data[k]) for k in parameters.data}
# input to PINN
self.X = {}
# reference solution of the output of PINN
self.sol = {}
[docs]
def get_ice_coordinates(self, mask_name=""):
""" get the coordinates of ice covered region from all the data, put them in one array
"""
return np.vstack([self.data[k].get_ice_coordinates(mask_name=mask_name) for k in self.data])
[docs]
def load_data(self, domain=None, physics=None):
""" laod all the data in `self.data`
"""
for k in self.data:
self.data[k].load_data(domain, physics)
[docs]
def prepare_training_data(self, transient=False, default_time=0.0):
""" merge all `X` and `sol` in `self.data` to `self.X` and `self.sol` with the keys
Args:
transient: if the problem is a time dependent simulation
default_time: default value of the third column (time) in `X`, if not provided by the data
"""
# prepare the training data according to data_size
for key in self.data:
self.data[key].prepare_training_data()
# merge all X and sol
for xkey, xval in self.data[key].X.items():
# check if the data has time dimension, if not, append one column with default_time to the end
if transient:
if xval.shape[1] < 3:
# check default_time setting in the data
if self.data[key].parameters.default_time is not None:
default_time = self.data[key].parameters.default_time
xval = np.hstack((xval, np.ones([xval.shape[0],1])*default_time))
if xkey not in self.X:
self.X[xkey] = xval.astype(dde.config.default_float())
else:
self.X[xkey] = np.vstack((self.X[xkey], xval.astype(dde.config.default_float())))
for xkey in self.data[key].sol:
if xkey not in self.sol:
self.sol[xkey] = self.data[key].sol[xkey].astype(dde.config.default_float())
else:
self.sol[xkey] = np.vstack((self.sol[xkey], self.data[key].sol[xkey].astype(dde.config.default_float())))