from abc import ABC, abstractmethod
from .utils import data_misfit
from deepxde.backend import backend_name
[docs]
class ParameterBase(ABC):
""" Abstract class of parameters in the experiment
"""
def __init__(self, param_dict):
self.param_dict = param_dict
# set default parameters
self.set_default()
# set parameters from param_dict if given
self.set_parameters(self.param_dict)
# make some necessary update of the parameters after loading from param_dict
self.update()
# check consistency
self.check_consistency()
[docs]
@abstractmethod
def set_default(self):
""" set default values
"""
pass
[docs]
@abstractmethod
def check_consistency(self):
""" check consistency of the parameter data
"""
pass
def _add_parameters(self, pdict: dict):
""" add all the keys from pdict to the class, with their values
"""
if isinstance(pdict, dict):
for key, value in pdict.items():
setattr(self, key, value)
[docs]
def has_keys(self, keys):
""" if all the keys are in the class, return true, otherwise return false
"""
if isinstance(keys, dict) or isinstance(keys, list):
return all([hasattr(self, k) for k in keys])
else:
return False
[docs]
def set_parameters(self, pdict: dict):
""" find all the keys from pdict which are avalible in the class, update the values
"""
if isinstance(pdict, dict):
for key, value in pdict.items():
# only update attribute the key
if hasattr(self, key):
setattr(self, key, value)
def __str__(self):
""" display all attributes except 'param_dict'
"""
return "\t" + type(self).__name__ + ": \n" + \
("\n".join(["\t\t" + k + ":\t" + str(self.__dict__[k]) for k in self.__dict__ if k != "param_dict"]))+"\n"
[docs]
def update(self):
""" after set_parameter, make some necessary update of the parameters
"""
pass
[docs]
class DomainParameter(ParameterBase):
"""
parameters of domain
"""
def __init__(self, param_dict={}):
super().__init__(param_dict)
[docs]
def set_default(self):
# shape file to define the outer boundary of the domain
self.shapefile = None
# shapebox = [xmin, xmax, ymin, ymax]
self.shapebox = [None]*4
# save the margin
self.margin = 0
# number of collocation points used in the domain
self.num_collocation_points = 0
# static or time dependent problem
self.time_dependent = False
# start and end time
self.start_time = 0
self.end_time = 0
[docs]
def check_consistency(self):
""" need to provide start and end time if solving a time dependent problem
"""
if self.time_dependent:
if self.start_time >= self.end_time:
raise ValueError(f"'start_time' at {self.start_time} is ahead of 'end_time' at {self.end_time}")
if self.shapefile is None:
if len(self.shapebox) != 4:
raise ValueError("shapefile and shapebox are not defined, you need to define one of them")
if None not in self.shapebox:
xmin, xmax, ymin, ymax = self.shapebox
if xmin >= xmax:
raise ValueError("xmin>=xmax! `shapebox` should be in the order [xmin, xmax, ymin, ymax]")
if ymin >= ymax:
raise ValueError("ymin>=ymax! `shapebox` should be in the order [xmin, xmax, ymin, ymax]")
[docs]
class DataParameter(ParameterBase):
""" list of all data used
"""
def __init__(self, param_dict={}):
super().__init__(param_dict)
[docs]
def set_default(self):
""" default parameters
"""
self.data = {}
[docs]
def check_consistency(self):
for k in self.data:
if self.data[k].data_path == "":
raise ValueError(f"{k}[\"data_path\"] can not be empty!")
def __str__(self):
"""
display all data
"""
return "\t" + type(self).__name__ + ": \n" + \
("\n".join(["\t\t" + k + ":\n" + str(self.data[k]) for k in self.data]))+"\n"
[docs]
def update(self):
""" convert dict to class SingleDataParameter
"""
if self.data:
self.data = {k:SingleDataParameter(self.data[k]) for k in self.data}
# check consistency after update
self.check_consistency()
[docs]
class SingleDataParameter(ParameterBase):
""" parameters of a single data file
"""
def __init__(self, param_dict={}):
super().__init__(param_dict)
[docs]
def set_default(self):
""" default settings
"""
# file path
self.data_path = ""
# length of each data in used, leave no data variable(sol) empty or set to None
self.data_size = {}
# name map k->v, k is the variable name in the PINN, v is the variable name in the data file
self.name_map = {}
# X name map k->v, k is the input names in the PINN, v is the coordinates name in the data file
self.X_map = {}
# sample data points only inside the polygon domain, rather than the bbox
self.sample_only_inside = False
# scaling factor, map using k->s, by default s=1, k is the same (or subset) as in data_size
self.scaling = {}
# source of the data
self.source = "ISSM"
# default time point, None means not in used
self.default_time = None
[docs]
def check_consistency(self):
if self.source not in ["ISSM", "ISSM Light", "mat", "h5", "nc"]:
raise ValueError(f"Data loader of {self.source} is not implemented")
def __str__(self):
"""
display all attributes except 'param_dict'
"""
return ("\n".join(["\t\t\t" + k + ":\t" + str(self.__dict__[k]) for k in self.__dict__ if k != "param_dict"]))+"\n"
[docs]
def update(self):
""" update name_map according to data_size
"""
# if the X coordinates names are not given, then use default
if not self.X_map:
self.X_map["x"] = "x"
self.X_map["y"] = "y"
self.X_map["t"] = "t"
# every variable in data_size need to be loaded from the data loader
for k in self.data_size:
# names in data_size, if not given in name_map, then use the same name for key and value
if k not in self.name_map:
self.name_map[k] = k
if k not in self.scaling:
self.scaling[k] = 1.0
# check consistency after update
self.check_consistency()
[docs]
class NNParameter(ParameterBase):
"""
parameters of nn
"""
def __init__(self, param_dict={}):
super().__init__(param_dict)
[docs]
def set_default(self):
"""
default values:
"""
# nn architecture
self.input_variables = []
self.output_variables = []
self.num_neurons = 0
self.num_layers = 0
self.activation = "tanh"
self.initializer = "Glorot uniform"
# fourier feature transform
self.fft = False
self.num_fourier_feature = 10
self.sigma = 1.0
self.B = None
# parallel neural network
self.is_parallel = False
# scaling parameters
self.input_lb = None
self.input_ub = None
self.output_lb = None
self.output_ub = None
[docs]
def check_consistency(self):
if self.fft:
if self.input_size != self.num_fourier_feature*self.sigma_size*2:
raise ValueError("'input_size' does not match the number of fourier feature")
if self.B is not None:
if not isinstance(self.B, list):
raise TypeError("'B' matrix need to be input in a list")
if len(self.B[0]) != self.num_fourier_feature*self.sigma_size:
raise ValueError("Number of columns of 'B' matrix does not match the number of fourier feature")
else:
# input size of nn equals to dependent in physics
if self.input_size != len(self.input_variables):
raise ValueError("'input_size' does not match the number of 'input_variables'")
# out size of nn equals to variables in physics
if self.output_size != len(self.output_variables):
raise ValueError("'output_size' does not match the number of 'output_variables'")
pass
[docs]
def is_output_scaling(self):
"""
if the output boundaries are provided
"""
if (self.output_lb is not None) and (self.output_ub is not None):
return True
else:
return False
[docs]
def set_parameters(self, pdict: dict):
super().set_parameters(pdict)
self.input_size = len(self.input_variables)
self.output_size = len(self.output_variables)
# num_eurons is list
if isinstance(self.num_neurons, list):
self.num_layers = len(self.num_neurons)
# update necesarry parameters for fourier feature transform
if self.fft:
if self.is_parallel:
raise ValueError("FFT currently does not support parallel nets")
# always convert sigma to a list
if not isinstance(self.sigma, list):
self.sigma = [self.sigma]
# we need to know this size, to create and reshape B in nn.py
self.sigma_size = len(self.sigma)
self.input_size = self.num_fourier_feature*self.sigma_size*2
# cover num_neurons to list
if not isinstance(self.num_neurons, list):
self.num_neurons = [self.num_neurons]*self.num_layers
# convert activation function to list, nlayer+, because the output layer
if not isinstance(self.activation, list):
self.activation = [self.activation]*(self.num_layers+1)
[docs]
class PhysicsParameter(ParameterBase):
""" parameter of physics
"""
def __init__(self, param_dict={}):
super().__init__(param_dict)
self.setup_equations()
[docs]
def set_default(self):
# name(s) and parameters of the equations
self.equations = {}
self.manual_data_weights = None
[docs]
def check_consistency(self):
pass
[docs]
def setup_equations(self):
""" translate the input dict to subclass of EquationParameter(), and save back to the values in self.equations
"""
self.equations = {k:EquationParameter.create(k, param_dict = self.equations[k]) for k in self.equations}
def __str__(self):
"""
display all equations
"""
return "\t" + type(self).__name__ + ": \n" + \
("\n".join(["\t\t" + k + ":\n" + str(self.equations[k]) for k in self.equations]))+"\n"
[docs]
class EquationParameter(ParameterBase):
""" parameter of equations
"""
subclasses = {}
def __init__(self, param_dict={}):
super().__init__(param_dict)
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls.subclasses[cls._EQUATION_TYPE] = cls
[docs]
@classmethod
def create(cls, equation_type, **kwargs):
if equation_type not in cls.subclasses:
raise ValueError(f"Equation type {format(equation_type)} is not defined")
return cls.subclasses[equation_type](**kwargs)
[docs]
def set_default(self):
# list of input names
self.input = []
# list of output names
self.output = []
# lower and upper bound of output
self.output_lb = []
self.output_ub = []
# weights of each output
self.data_weights = []
# names of residuals
self.residuals = []
# pde weights
self.pde_weights = []
# scalar variables: name:value
self.scalar_variables = {}
[docs]
def check_consistency(self):
if (len(self.output)) != (len(self.output_lb)):
raise ValueError("Size of 'output' does not match the size of 'output_lb'")
if (len(self.output)) != (len(self.output_ub)):
raise ValueError("Size of 'output' does not match the size of 'output_ub'")
if any([l>=u for l,u in zip(self.output_lb, self.output_ub)]):
raise ValueError("output_lb is not smaller than output_ub")
if (len(self.output)) != (len(self.data_weights)):
raise ValueError("Size of 'output' does not match the size of 'data_weights'")
# check the pde weights
if isinstance(self.pde_weights, list):
if len(self.pde_weights) != len(self.residuals):
raise ValueError("Length of pde_weights does not match the length of residuals")
else:
raise ValueError("pde_weights is not a list")
[docs]
def set_parameters(self, pdict: dict):
""" overwrite the default function, so that for 'scalar_parameters', only update the dict
"""
if isinstance(pdict, dict):
for key, value in pdict.items():
# only update attribute the key
if hasattr(self, key):
# only update the dictionary, not overwirte
if isinstance(value, dict):
old_dict = getattr(self, key)
old_dict.update(value)
setattr(self, key, old_dict)
else:
setattr(self, key, value)
# check consistency after update
self.check_consistency()
def __str__(self):
"""
display all attributes except 'param_dict'
"""
return ("\n".join(["\t\t\t" + k + ":\t" + str(self.__dict__[k]) for k in self.__dict__ if k != "param_dict"]))+"\n"
[docs]
class TrainingParameter(ParameterBase):
""" parameter of training
"""
def __init__(self, param_dict={}):
super().__init__(param_dict)
[docs]
def set_default(self):
# random seed, set default to None
self.random_seed = None
# number of epochs
self.epochs = 0
# optimization method
self.optimizers = "adam"
# general loss function
self.loss_functions = "MSE"
# additional loss functions, specified as a dict
self.additional_loss = {}
# learning rate
self.learning_rate = 0.001
# decay steps
self.decay_steps = 0
# decay rate
self.decay_rate = 0.0
# list of the weights
self.loss_weights = []
# setting the callbacks
self.has_callbacks = False
# dde.callbacks.EarlyStopping(min_delta=min_delta, patience=patience)
self.min_delta = None
self.patience = None
# dde.callbacks.PDEPointResampler(period=period)
self.period = None
# dde.callbacks.ModelCheckpoint(filepath, verbose=1, save_better_only=True)
self.checkpoint = False
# set mini-batch (currently only support pytorch)
self.mini_batch = None
# path to save the results
self.save_path = "./"
# if save the results and history
self.is_save = True
# if plot the results and history, and save figures
self.is_plot = False
[docs]
def check_consistency(self):
# convert epochs and optimizers to list
if not isinstance(self.epochs, list):
self.epochs = [self.epochs]
if not isinstance(self.optimizers, list):
self.optimizers = [self.optimizers]
# length of epochs should match optimizers
if len(self.epochs) != len(self.optimizers):
raise ValueError("Length of epochs does not match the length of optimizers")
[docs]
def check_callbacks(self):
""" check if any of the following variable is given from setting
"""
# EarlyStopping
if self.has_EarlyStopping():
return True
# PDEPointResampler
if self.has_PDEPointResampler():
return True
# ModelCheckpoint
if self.has_ModelCheckpoint():
return True
# Mini batch
if self.has_MiniBatch():
return True
# otherwise
return False
[docs]
def has_EarlyStopping(self):
""" check if param has the min_delta or patience for early stopping
"""
has_es = False
if self.min_delta is not None:
has_es = True
if self.patience is not None:
has_es = True
# update the setting with partially None
if has_es:
if self.min_delta is None:
self.min_delta = 0
if self.patience is None:
self.patience = 0
return has_es
[docs]
def has_ModelCheckpoint(self):
""" check if param has checkpoint=True for checkpointing
"""
return self.checkpoint
[docs]
def has_PDEPointResampler(self):
""" check if param has the period for resampler
"""
if self.period is None:
return False
else:
return True
[docs]
def has_MiniBatch(self):
""" check if param has mini batch
"""
if self.mini_batch is None:
return False
else:
return True
[docs]
def update(self):
""" convert dict to class LossFunctionParameter
"""
# update additional loss
if self.additional_loss:
self.additional_loss = {k:LossFunctionParameter(self.additional_loss[k]) for k in self.additional_loss}
# because DeepXDE only support mini-batch with pytorch and paddle, automatically set mini_batch=None for other backends
if backend_name not in ["pytorch", "paddle"]:
print(f"Mini Batch is currently not supported with backend {backend_name}!")
self.mini_batch = None
# add callback setttings if given any of them
self.has_callbacks = self.check_callbacks()
# check consistency after update
self.check_consistency()
[docs]
class LossFunctionParameter(ParameterBase):
""" parameter of customize loss function
"""
def __init__(self, param_dict={}):
super().__init__(param_dict)
[docs]
def set_default(self):
# name of the loss term, should avoid using existing names in the system,
# TODO: make sure this name is not in used
self.name = ""
# loss functions
self.function = "MSE"
# weight of this loss function
self.weight = 1.0
[docs]
def check_consistency(self):
data_misfit.get(self.function)
[docs]
class Parameters(ParameterBase):
""" parameters of the pinn, including domain, data, nn, and physics
"""
def __init__(self, param_dict={}):
super().__init__(param_dict)
def __str__(self):
return "Parameters: \n" + str(self.training) + str(self.domain) + str(self.data) + str(self.nn) + str(self.physics)
[docs]
def set_default(self):
self.training = TrainingParameter()
self.domain = DomainParameter()
self.data = DataParameter()
self.nn = NNParameter()
self.physics = PhysicsParameter()
[docs]
def set_parameters(self, param_dict):
self.training = TrainingParameter(param_dict)
self.domain = DomainParameter(param_dict)
self.data = DataParameter(param_dict)
self.nn = NNParameter(param_dict)
self.physics = PhysicsParameter(param_dict)
[docs]
def check_consistency(self):
# length of training.loss_weights equals to equations+datasize
#if (any(x not in self.nn.output_variables for x in self.data.datasize)):
# TODO: raise ValueError("names in 'datasize' does not match the name in 'output_variables'")
pass
[docs]
def update(self):
"""
update parameters according to the input
"""
# Ensure each dataset has an entry in data_size for each output variable
# DataParameter stores datasets in self.data.data (dict of SingleDataParameter)
if hasattr(self.data, "data") and isinstance(self.data.data, dict):
for _, ds in self.data.data.items():
# ds should be a SingleDataParameter
if not hasattr(ds, "data_size") or ds.data_size is None:
ds.data_size = {}
for x in self.nn.output_variables:
ds.data_size.setdefault(x, None)
pass