import deepxde as dde
import deepxde.backend as bkd
from ..parameter import PhysicsParameter
from . import EquationBase
import itertools
from ..utils import slice_column, jacobian, ppow, default_float_type
[docs]
class Physics:
""" All the physics in used as constraint in the PINN
"""
def __init__(self, parameters=PhysicsParameter()):
self.parameters = parameters
# add all physics
self.equations = [EquationBase.create(eq, parameters=self.parameters.equations[eq]) for eq in self.parameters.equations]
# update (global) input, output variable list from local_input_var and local_output_var of each equations
self.input_var = self._update_global_variables([p.local_input_var for p in self.equations])
self.output_var = self._update_global_variables([p.local_output_var for p in self.equations])
# update the index in each of physics
for p in self.equations:
p.update_id(self.input_var, self.output_var)
# find the min and max of the lb and ub of the output_var among all physics
self.output_lb = []
self.output_ub = []
self.data_weights = []
for k in self.output_var:
self.output_lb.append(min([p.output_lb[k] for p in self.equations if k in p.output_lb]))
self.output_ub.append(max([p.output_ub[k] for p in self.equations if k in p.output_ub]))
self.data_weights.append(max([p.data_weights[k] for p in self.equations if k in p.data_weights]))
# manualy override output lower and/or upper bounds
self._apply_manual_values(self.output_lb, self.parameters.manual_output_lb)
self._apply_manual_values(self.output_ub, self.parameters.manual_output_ub)
# manualy update data weights
self._apply_manual_values(self.data_weights, self.parameters.manual_data_weights)
# update residual list
self.residuals = list(itertools.chain.from_iterable([p.residuals for p in self.equations]))
self.pde_weights = list(itertools.chain.from_iterable([p.pde_weights for p in self.equations]))
def _apply_manual_values(self, values, manual_values):
"""Update settings related to output with a variable-name mapping."""
if manual_values is None:
return
output_ids = {name: i for i, name in enumerate(self.output_var)}
for name, value in manual_values.items():
if name in output_ids:
values[output_ids[name]] = value
def _update_global_variables(self, local_var_list):
""" Update global variables based on a list of local variables,
find all unqiue keys, then put in one single List
Args:
local_var_list: list of local variables in the equation
"""
# merge all dict, get all unique keys
global_var = {}
for d in local_var_list:
global_var.update(d)
return list(global_var.keys())
[docs]
def pdes(self, nn_input_var, nn_output_var):
""" a wrapper of all the equations used in the PINN, Args need to follow the requirment by deepxde
Args:
nn_input_var: input tensor to the nn
nn_output_var: output tensor from the nn
"""
eq = []
for p in self.equations:
eq += p.pde(nn_input_var, nn_output_var)
return eq
[docs]
def vel_mag(self, nn_input_var, nn_output_var, X):
""" a wrapper for PointSetOperatorBC func call, Args need to follow the requirment by deepxde
Args:
nn_input_var: input tensor to the nn
nn_output_var: output tensor from the nn
X: NumPy array of the collocation points defined on the boundary, required by deepxde
"""
uid = self.output_var.index('u')
vid = self.output_var.index('v')
u = slice_column(nn_output_var, uid)
v = slice_column(nn_output_var, vid)
vel = ppow((bkd.square(u) + bkd.square(v) + 1.0e-30), 0.5)
return vel
[docs]
def surf_x(self, nn_input_var, nn_output_var, X):
"""dsdx
"""
sid = self.output_var.index('s')
xid = self.input_var.index('x')
dsdx = jacobian(nn_output_var, nn_input_var, i=sid, j=xid)
return dsdx
[docs]
def surf_y(self, nn_input_var, nn_output_var, X):
"""dsdy
"""
sid = self.output_var.index('s')
yid = self.input_var.index('y')
dsdy = jacobian(nn_output_var, nn_input_var, i=sid, j=yid)
return dsdy
[docs]
def user_defined_gradient(self, output_var, input_var):
""" compute the gradient of output_var with respect to the input_var, return a function wrapper for PointSetOperatorBC
TODO: implement jax version
Args:
input_var: string name of input variable (independent variable)
output_var: string name of output variable (dependent variable)
"""
def _wrapper(nn_input_var, nn_output_var, X):
yid = self.output_var.index(output_var)
xid = self.input_var.index(input_var)
dydx = jacobian(nn_output_var, nn_input_var, i=yid, j=xid)
return dydx
return _wrapper
[docs]
def calving_front(self, nn_input_var, nn_output_var, X):
""" calculate the calving front boundary condition
Args:
nn_input_var: input tensor to the nn
nn_output_var: output tensor from the nn
X: NumPy array of the collocation points defined on the boundary, required by deepxde
"""
eqind = next((i for i,p in enumerate(self.equations) if p._EQUATION_TYPE.upper() == "CALVINGFRONT"), None)
return self.equations[eqind]._bc(nn_input_var, nn_output_var)
[docs]
def operator(self, pname):
""" grab the pde operator, used for testing the pdes and plotting
Args:
pname : pde operator name (string), case insensitive
"""
for p in self.equations:
if p._EQUATION_TYPE.lower() == pname.lower():
return p.pde