from collections import OrderedDict
from cached_property import cached_property
import numpy as np
from sympy.core.sympify import converter as sympify_converter
from devito.finite_differences import Differentiable
from devito.finite_differences.tools import make_shift_x0
from devito.types.basic import AbstractTensor
from devito.types.dense import Function, TimeFunction
from devito.types.utils import NODE
__all__ = ['TensorFunction', 'TensorTimeFunction', 'VectorFunction', 'VectorTimeFunction']
[docs]
class TensorFunction(AbstractTensor):
"""
Tensor valued Function represented as a Matrix.
Each component is a Function or TimeFunction.
A TensorFunction and the classes that inherit from it takes the same parameters as
a DiscreteFunction and additionally:
Parameters
----------
name : str
Name of the symbol.
grid : Grid, optional
Carries shape, dimensions, and dtype of the TensorFunction. When grid is not
provided, shape and dimensions must be given. For MPI execution, a
Grid is compulsory.
space_order : int or 3-tuple of ints, optional
Discretisation order for space derivatives. Defaults to 1. ``space_order`` also
impacts the number of points available around a generic point of interest. By
default, ``space_order`` points are available on both sides of a generic point of
interest, including those nearby the grid boundary. Sometimes, fewer points
suffice; in other scenarios, more points are necessary. In such cases, instead of
an integer, one can pass a 3-tuple ``(o, lp, rp)`` indicating the discretization
order (``o``) as well as the number of points on the left (``lp``) and right
(``rp``) sides of a generic point of interest.
shape : tuple of ints, optional
Shape of the domain region in grid points. Only necessary if ``grid`` isn't given.
dimensions : tuple of Dimension, optional
Dimensions associated with the object. Only necessary if ``grid`` isn't given.
dtype : data-type, optional
Any object that can be interpreted as a numpy data type. Defaults
to ``np.float32``.
staggered : Dimension or tuple of Dimension or Stagger, optional
Define how the TensorFunction is staggered.
initializer : callable or any object exposing the buffer interface, optional
Data initializer. If a callable is provided, data is allocated lazily.
allocator : MemoryAllocator, optional
Controller for memory allocation. To be used, for example, when one wants
to take advantage of the memory hierarchy in a NUMA architecture. Refer to
`default_allocator.__doc__` for more information.
padding : int or tuple of ints, optional
.. deprecated:: shouldn't be used; padding is now automatically inserted.
Allocate extra grid points to maximize data access alignment. When a tuple
of ints, one int per Dimension should be provided.
symmetric : bool, optional
Whether the tensor is symmetric or not. Defaults to True.
diagonal : Bool, optional
Whether the tensor is diagonal or not. Defaults to False.
staggered: tuple of Dimension, optional
Staggering of each component, needs to have the size of the tensor. Defaults
to the Dimensions.
"""
_is_TimeDependent = False
_sub_type = Function
_class_priority = 10
_op_priority = Differentiable._op_priority + 1.
def __init_finalize__(self, *args, **kwargs):
grid = kwargs.get('grid')
dimensions = kwargs.get('dimensions')
inds, _ = Function.__indices_setup__(grid=grid,
dimensions=dimensions)
self._space_dimensions = inds
@classmethod
def __subfunc_setup__(cls, *args, **kwargs):
"""
Creates the components of the TensorFunction
either from input or from input Dimensions.
"""
comps = kwargs.get("components")
if comps is not None:
return comps
grid = kwargs.get("grid")
if grid is None:
dims = kwargs.get('dimensions')
if dims is None:
raise TypeError("Need either `grid` or `dimensions`")
else:
dims = grid.dimensions
stagg = kwargs.get("staggered", None)
name = kwargs.get("name")
symm = kwargs.get('symmetric', True)
diag = kwargs.get('diagonal', False)
funcs = []
# Fill tensor, only upper diagonal if symmetric
for i, d in enumerate(dims):
funcs2 = [0 for _ in range(len(dims))]
start = i if (symm or diag) else 0
stop = i + 1 if diag else len(dims)
for j in range(start, stop):
kwargs["name"] = "%s_%s%s" % (name, d.name, dims[j].name)
kwargs["staggered"] = (stagg[i][j] if stagg is not None
else (NODE if i == j else (d, dims[j])))
funcs2[j] = cls._sub_type(**kwargs)
funcs.append(funcs2)
# Symmetrize and fill diagonal if symmetric
if symm:
funcs = np.array(funcs) + np.triu(np.array(funcs), k=1).T
funcs = funcs.tolist()
return funcs
def __getattr__(self, name):
"""
Try calling a dynamically created FD shortcut.
Notes
-----
This method acts as a fallback for __getattribute__
"""
try:
return self.applyfunc(lambda x: x if x == 0 else getattr(x, name))
except:
raise AttributeError("%r object has no attribute %r" % (self.__class__, name))
def _eval_at(self, func):
"""
Evaluate tensor at func location
"""
def entries(i, j, func):
return getattr(self[i, j], '_eval_at', lambda x: self[i, j])(func[i, j])
entry = lambda i, j: entries(i, j, func)
return self._new(self.rows, self.cols, entry)
@classmethod
def __indices_setup__(cls, **kwargs):
return Function.__indices_setup__(grid=kwargs.get('grid'),
dimensions=kwargs.get('dimensions'))
@property
def _symbolic_functions(self):
return frozenset().union(*[a._symbolic_functions for a in self.values()])
@property
def is_TimeDependent(self):
return self._is_TimeDependent
@property
def space_dimensions(self):
"""Spatial dimensions."""
return self._space_dimensions
@cached_property
def space_order(self):
"""The space order for all components."""
return ({a.space_order for a in self} - {None}).pop()
@property
def is_diagonal(self):
"""Whether the tensor is diagonal."""
return np.all([self[i, j] == 0 for j in range(self.cols)
for i in range(self.rows) if i != j])
def _evaluate(self, **kwargs):
return self.applyfunc(lambda x: getattr(x, 'evaluate', x))
def values(self):
if self.is_diagonal:
return [self[i, i] for i in range(self.shape[0])]
elif self.is_symmetric:
val = super(TensorFunction, self).values()
return list(OrderedDict.fromkeys(val))
else:
return super(TensorFunction, self).values()
def div(self, shift=None):
"""
Divergence of the TensorFunction (is a VectorFunction).
"""
comps = []
func = vec_func(self)
ndim = len(self.space_dimensions)
shift_x0 = make_shift_x0(shift, (ndim, ndim))
for i in range(len(self.space_dimensions)):
comps.append(sum([getattr(self[j, i], 'd%s' % d.name)
(x0=shift_x0(shift, d, i, j))
for j, d in enumerate(self.space_dimensions)]))
return func._new(comps)
@property
def laplace(self):
"""
Laplacian of the TensorFunction.
"""
comps = []
func = vec_func(self)
for j, d in enumerate(self.space_dimensions):
comps.append(sum([getattr(self[j, i], 'd%s2' % d.name)
for i, d in enumerate(self.space_dimensions)]))
return func._new(comps)
def grad(self, shift=None):
raise AttributeError("Gradient of a second order tensor not supported")
def new_from_mat(self, mat):
func = tens_func(self)
return func._new(self.rows, self.cols, mat)
def classof_prod(self, other, mat):
try:
is_mat = len(mat[0]) > 1
except TypeError:
is_mat = False
is_time = (getattr(self, '_is_TimeDependent', False) or
getattr(other, '_is_TimeDependent', False))
return mat_time_dict[(is_time, is_mat)]
[docs]
class VectorFunction(TensorFunction):
"""
Vector valued space varying Function as a rank 1 tensor of Function.
"""
is_VectorValued = True
is_TensorValued = False
_sub_type = Function
_is_TimeDependent = False
@property
def is_transposed(self):
return self.shape[0] == 1
@classmethod
def __subfunc_setup__(cls, *args, **kwargs):
"""
Creates the components of the VectorFunction
either from input or from input dimensions.
"""
comps = kwargs.get("components")
if comps is not None:
return comps
funcs = []
grid = kwargs.get("grid")
if grid is None:
dims = kwargs.get('dimensions')
if dims is None:
raise TypeError("Need either `grid` or `dimensions`")
else:
dims = grid.dimensions
stagg = kwargs.get("staggered", None)
name = kwargs.get("name")
for i, d in enumerate(dims):
kwargs["name"] = "%s_%s" % (name, d.name)
kwargs["staggered"] = stagg[i] if stagg is not None else d
funcs.append(cls._sub_type(**kwargs))
return funcs
# Custom repr and str
def __str__(self):
st = ''.join([' %-2s,' % c for c in self])[1:-1]
return "Vector(%s)" % st
__repr__ = __str__
def div(self, shift=None):
"""
Divergence of the VectorFunction, creates the divergence Function.
"""
shift_x0 = make_shift_x0(shift, (len(self.space_dimensions),))
return sum([getattr(self[i], 'd%s' % d.name)(x0=shift_x0(shift, d, None, i))
for i, d in enumerate(self.space_dimensions)])
@property
def laplace(self):
"""
Laplacian of the VectorFunction, creates the Laplacian VectorFunction.
"""
func = vec_func(self)
comps = [sum([getattr(s, 'd%s2' % d.name) for d in self.space_dimensions])
for s in self]
return func._new(comps)
@property
def curl(self):
"""
Gradient of the (3D) VectorFunction, creates the curl VectorFunction.
"""
if len(self.space_dimensions) != 3:
raise AttributeError("Curl only supported for 3D VectorFunction")
# The curl of a VectorFunction is a VectorFunction
derivs = ['d%s' % d.name for d in self.space_dimensions]
comp1 = getattr(self[2], derivs[1]) - getattr(self[1], derivs[2])
comp2 = getattr(self[0], derivs[2]) - getattr(self[2], derivs[0])
comp3 = getattr(self[1], derivs[0]) - getattr(self[0], derivs[1])
func = vec_func(self)
return func._new(3, 1, [comp1, comp2, comp3])
def grad(self, shift=None):
"""
Gradient of the VectorFunction, creates the gradient TensorFunction.
"""
func = tens_func(self)
ndim = len(self.space_dimensions)
shift_x0 = make_shift_x0(shift, (ndim, ndim))
comps = [[getattr(f, 'd%s' % d.name)(x0=shift_x0(shift, d, i, j))
for j, d in enumerate(self.space_dimensions)]
for i, f in enumerate(self)]
return func._new(comps)
def outer(self, other):
comps = [[self[i] * other[j] for i in range(self.cols)] for j in range(self.cols)]
func = tens_func(self, other)
return func._new(comps)
def new_from_mat(self, mat):
func = vec_func(self)
return func._new(self.rows, 1, mat)
[docs]
class TensorTimeFunction(TensorFunction):
"""
Time varying TensorFunction.
"""
is_TensorValued = True
_sub_type = TimeFunction
_is_TimeDependent = True
@cached_property
def time_order(self):
"""The time order for all components."""
return ({a.time_order for a in self} - {None}).pop()
[docs]
class VectorTimeFunction(VectorFunction, TensorTimeFunction):
"""
Time varying VectorFunction.
"""
is_VectorValued = True
is_TensorValued = False
_sub_type = TimeFunction
_is_TimeDependent = True
_time_position = 0
mat_time_dict = {(True, True): TensorTimeFunction, (True, False): VectorTimeFunction,
(False, True): TensorFunction, (False, False): VectorFunction}
def vec_func(*funcs):
return (VectorTimeFunction if any([getattr(f, 'is_TimeDependent', False)
for f in funcs]) else VectorFunction)
def tens_func(*funcs):
return (TensorTimeFunction if any([getattr(f, 'is_TimeDependent', False)
for f in funcs]) else TensorFunction)
def sympify_tensor(arg):
return arg
sympify_converter[TensorFunction] = sympify_tensor