Source code for devito.types.sparse

from collections import OrderedDict
try:
    from collections.abc import Iterable
except ImportError:
    # Before python 3.10
    from collections import Iterable
from itertools import product

import sympy
import numpy as np
from cached_property import cached_property

from devito.finite_differences import generate_fd_shortcuts
from devito.mpi import MPI, SparseDistributor
from devito.operations import LinearInterpolator, PrecomputedInterpolator
from devito.symbolics import indexify, retrieve_function_carriers
from devito.tools import (ReducerMap, as_tuple, flatten, prod, filter_ordered,
                          is_integer, dtype_to_mpidtype)
from devito.types.dense import DiscreteFunction, SubFunction
from devito.types.dimension import (Dimension, ConditionalDimension, DefaultDimension,
                                    DynamicDimension)
from devito.types.dimension import dimensions as mkdims
from devito.types.basic import Symbol
from devito.types.equation import Eq, Inc
from devito.types.utils import IgnoreDimSort


__all__ = ['SparseFunction', 'SparseTimeFunction', 'PrecomputedSparseFunction',
           'PrecomputedSparseTimeFunction', 'MatrixSparseTimeFunction']


class AbstractSparseFunction(DiscreteFunction):

    """
    An abstract class to define behaviours common to all sparse functions.
    """

    _sparse_position = -1
    """Position of sparse index among the function indices."""

    _radius = 0
    """The radius of the stencil operators provided by the SparseFunction."""

    _sub_functions = ()
    """SubFunctions encapsulated within this AbstractSparseFunction."""

    __rkwargs__ = DiscreteFunction.__rkwargs__ + ('npoint_global', 'space_order')

    def __init_finalize__(self, *args, **kwargs):
        super(AbstractSparseFunction, self).__init_finalize__(*args, **kwargs)
        self._npoint = kwargs.get('npoint', kwargs.get('npoint_global'))
        self._space_order = kwargs.get('space_order', 0)

        # Dynamically add derivative short-cuts
        self._fd = self.__fd_setup__()

    @classmethod
    def __indices_setup__(cls, *args, **kwargs):
        dimensions = as_tuple(kwargs.get('dimensions'))
        if not dimensions:
            dimensions = (Dimension(name='p_%s' % kwargs["name"]),)

        if args:
            return tuple(dimensions), tuple(args)
        else:
            return dimensions, dimensions

    @classmethod
    def __shape_setup__(cls, **kwargs):
        grid = kwargs.get('grid')
        # A Grid must have been provided
        if grid is None:
            raise TypeError('Need `grid` argument')
        shape = kwargs.get('shape')
        npoint = kwargs.get('npoint', kwargs.get('npoint_global'))
        if shape is None:
            glb_npoint = SparseDistributor.decompose(npoint, grid.distributor)
            shape = (glb_npoint[grid.distributor.myrank],)
        return shape

    def __fd_setup__(self):
        """
        Dynamically add derivative short-cuts.
        """
        return generate_fd_shortcuts(self.dimensions, self.space_order)

    def __distributor_setup__(self, **kwargs):
        """
        A `SparseDistributor` handles the SparseFunction decomposition based on
        physical ownership, and allows to convert between global and local indices.
        """
        return SparseDistributor(
            kwargs.get('npoint', kwargs.get('npoint_global')),
            self._sparse_dim,
            kwargs['grid'].distributor
        )

    def __subfunc_setup__(self, key, suffix, dtype=None):
        # Shape and dimensions from args
        name = '%s_%s' % (self.name, suffix)

        if key is not None and not isinstance(key, SubFunction):
            key = np.array(key)

        if key is not None:
            dimensions = (self._sparse_dim, Dimension(name='d'))
            if key.ndim > 2:
                dimensions = (self._sparse_dim, Dimension(name='d'),
                              *mkdims("i", n=key.ndim-2))
            else:
                dimensions = (self._sparse_dim, Dimension(name='d'))
            shape = (self.npoint, self.grid.dim, *key.shape[2:])
        else:
            dimensions = (self._sparse_dim, Dimension(name='d'))
            shape = (self.npoint, self.grid.dim)

        # Check if already a SubFunction
        if isinstance(key, SubFunction):
            # Need to rebuild so the dimensions match the parent SparseFunction
            indices = (self.indices[self._sparse_position], *key.indices[1:])
            return key._rebuild(*indices, name=name, shape=shape,
                                alias=self.alias, halo=None)
        elif key is not None and not isinstance(key, Iterable):
            raise ValueError("`%s` must be either SubFunction "
                             "or iterable (e.g., list, np.ndarray)" % key)

        if key is None:
            # Fallback to default behaviour
            dtype = dtype or self.dtype
        else:
            if (shape != key.shape and key.shape != (shape[1],)) and \
                    self._distributor.nprocs == 1:
                raise ValueError("Incompatible shape for %s, `%s`; expected `%s`" %
                                 (suffix, key.shape[:2], shape))

            # Infer dtype
            if np.issubdtype(key.dtype.type, np.integer):
                dtype = dtype or np.int32
            else:
                dtype = dtype or self.dtype

        sf = SubFunction(
            name=name, dtype=dtype, dimensions=dimensions,
            shape=shape, space_order=0, initializer=key, alias=self.alias,
            distributor=self._distributor
        )

        if self.npoint == 0:
            # This is a corner case -- we might get here, for example, when
            # running with MPI and some processes get 0-size arrays after
            # domain decomposition. We "touch" the data anyway to avoid the
            # case ``self._data is None``
            sf.data

        return sf

    @property
    def _sparse_dim(self):
        return self.dimensions[self._sparse_position]

    @property
    def _mpitype(self):
        return dtype_to_mpidtype(self.dtype)

    @property
    def _smpitype(self):
        sfuncs = [getattr(self, s) for s in self._sub_functions
                  if getattr(self, s) is not None]
        return {s: dtype_to_mpidtype(s.dtype) for s in sfuncs}

    @property
    def _comm(self):
        return self._distributor.comm

    @property
    def _coords_indices(self):
        if self.gridpoints_data is not None:
            return self.gridpoints_data
        else:
            if self.coordinates_data is None:
                raise ValueError("No coordinates or gridpoints attached"
                                 "to this SparseFunction")
            return (
                np.floor((self.coordinates_data - self.grid.origin) / self.grid.spacing)
            ).astype(int)

    @property
    def _support(self):
        """
        The grid points surrounding each sparse point within the radius of self's
        injection/interpolation operators.
        """
        max_shape = np.array(self.grid.shape).reshape(1, self.grid.dim)
        minmax = lambda arr: np.minimum(max_shape, np.maximum(0, arr))
        return np.stack([minmax(self._coords_indices + s) for s in self._point_support],
                        axis=2)

    @property
    def _dist_datamap(self):
        """
        Mapper ``M : MPI rank -> required sparse data``.
        """
        return self.grid.distributor.glb_to_rank(self._support) or {}

    @property
    def npoint(self):
        return self.shape[self._sparse_position]

    @property
    def npoint_global(self):
        """
        Global `npoint`s. This only differs from `self.npoint` in an MPI context.
        Issues
        ------
        * https://github.com/devitocodes/devito/issues/1498
        """
        return self._npoint

    @property
    def space_order(self):
        """The space order."""
        return self._space_order

    @property
    def r(self):
        return self._radius

    @property
    def gridpoints(self):
        try:
            return self._gridpoints
        except AttributeError:
            return self._coords_indices

    @property
    def gridpoints_data(self):
        try:
            return self._gridpoints.data._local.view(np.ndarray)
        except AttributeError:
            return None

    @property
    def coordinates(self):
        try:
            return self._coordinates
        except AttributeError:
            return None

    @property
    def coordinates_data(self):
        try:
            return self.coordinates.data._local.view(np.ndarray)
        except AttributeError:
            return None

    @cached_property
    def _pos_symbols(self):
        return [Symbol(name='pos%s' % d, dtype=np.int32)
                for d in self.grid.dimensions]

    @cached_property
    def _point_increments(self):
        """Index increments in each Dimension for each point symbol."""
        return tuple(product(range(-self.r+1, self.r+1), repeat=self.grid.dim))

    @cached_property
    def _point_support(self):
        return np.array(self._point_increments)

    @cached_property
    def _position_map(self):
        """
        Symbols map for the physical position of the sparse points relative to the grid
        origin.
        """
        return OrderedDict([((c - o)/d.spacing, p)
                            for p, c, d, o in zip(self._pos_symbols,
                                                  self._coordinate_symbols,
                                                  self.grid.dimensions,
                                                  self.grid.origin_symbols)])

    @cached_property
    def _dist_reorder_mask(self):
        """
        An ordering mask that puts ``self._sparse_position`` at the front.
        """
        ret = (self._sparse_position,)
        ret += tuple(i for i, d in enumerate(self.dimensions)
                     if d is not self._sparse_dim)
        return ret

    @cached_property
    def dist_origin(self):
        return self._dist_origin

    def interpolate(self, *args, **kwargs):
        """
        Implement an interpolation operation from the grid onto the given sparse points
        """
        return self.interpolator.interpolate(*args, **kwargs)

    def inject(self, *args, **kwargs):
        """
        Implement an injection operation from a sparse point onto the grid
        """
        return self.interpolator.inject(*args, **kwargs)

    def guard(self, expr=None):
        """
        Generate guarded expressions, that is expressions that are evaluated
        by an Operator only if certain conditions are met.  The introduced
        condition, here, is that all grid points in the support of a sparse
        value must fall within the grid domain (i.e., *not* on the halo).

        Parameters
        ----------
        expr : expr-like, optional
            Input expression, from which the guarded expression is derived.
            If not specified, defaults to ``self``.
        """
        conditions = {}

        # Positon map and temporaries for it
        pmap = self._position_map

        # Temporaries for the position
        temps = self.interpolator._positions(self.dimensions)

        # Create positions and indices temporaries/indirections
        for ((di, d), pos) in zip(enumerate(self.grid.dimensions), pmap.values()):
            # Add conditional to avoid OOB
            lb = sympy.And(pos >= d.symbolic_min, evaluate=False)
            ub = sympy.And(pos <= d.symbolic_max, evaluate=False)
            conditions[d] = sympy.And(lb, ub, evaluate=False)
        condition = sympy.And(*conditions.values(), evaluate=False)
        cd = ConditionalDimension(self._sparse_dim.name,
                                  self._sparse_dim,
                                  condition=condition, indirect=True)

        if expr is None:
            out = self.indexify()._subs(self._sparse_dim, cd)
        else:
            functions = {f for f in retrieve_function_carriers(expr)
                         if f.is_SparseFunction}
            out = indexify(expr).subs({f._sparse_dim: cd for f in functions})

        return out, temps

    def _dist_scatter_mask(self, dmap=None):
        """
        A mask to index into ``self.data``, which creates a new data array that
        logically contains N consecutive groups of sparse data values, where N
        is the number of MPI ranks. The i-th group contains the sparse data
        values accessible by the i-th MPI rank.  Thus, sparse data values along
        the boundary of two or more MPI ranks are duplicated.
        """
        dmap = dmap or self._dist_datamap
        mask = np.array(flatten(dmap[i] for i in sorted(dmap)), dtype=int)
        ret = [slice(None) for _ in range(self.ndim)]
        ret[self._sparse_position] = mask
        return tuple(ret)

    def _dist_count(self, dmap=None):
        """
        A 2-tuple of comm-sized iterables, which tells how many sparse points
        is this MPI rank expected to send/receive to/from each other MPI rank.
        """
        dmap = dmap or self._dist_datamap
        comm = self._comm

        ssparse = np.array([len(dmap.get(i, [])) for i in range(comm.size)], dtype=int)
        rsparse = np.empty(comm.size, dtype=int)
        comm.Alltoall(ssparse, rsparse)

        return ssparse, rsparse

    def _dist_alltoall(self, dmap=None):
        """
        The metadata necessary to perform an ``MPI_Alltoallv`` distributing the
        sparse data values across the MPI ranks needing them.
        """
        ssparse, rsparse = self._dist_count(dmap=dmap)

        # Per-rank shape of send/recv data
        sshape = []
        rshape = []
        for s, r in zip(ssparse, rsparse):
            handle = list(self.shape)
            handle[self._sparse_position] = s
            sshape.append(tuple(handle))

            handle = list(self.shape)
            handle[self._sparse_position] = r
            rshape.append(tuple(handle))

        # Per-rank count of send/recv data
        scount = tuple(prod(i) for i in sshape)
        rcount = tuple(prod(i) for i in rshape)

        # Per-rank displacement of send/recv data (it's actually all contiguous,
        # but the Alltoallv needs this information anyway)
        sdisp = np.concatenate([[0], np.cumsum(scount)[:-1]])
        rdisp = np.concatenate([[0], tuple(np.cumsum(rcount))[:-1]])

        # Total shape of send/recv data
        sshape = list(self.shape)
        sshape[self._sparse_position] = sum(ssparse)
        rshape = list(self.shape)
        rshape[self._sparse_position] = sum(rsparse)

        # May have to swap axes, as `MPI_Alltoallv` expects contiguous data, and
        # the sparse Dimension may not be the outermost
        sshape = tuple(sshape[i] for i in self._dist_reorder_mask)
        rshape = tuple(rshape[i] for i in self._dist_reorder_mask)

        return sshape, scount, sdisp, rshape, rcount, rdisp

    def _dist_subfunc_alltoall(self, subfunc, dmap=None):
        """
        The metadata necessary to perform an ``MPI_Alltoallv`` distributing
        self's SubFunction values across the MPI ranks needing them.
        """
        dmap = dmap or self._dist_datamap
        ssparse, rsparse = self._dist_count(dmap=dmap)

        # Per-rank shape of send/recv `coordinates`
        shape = subfunc.shape[1:]
        sshape = [(i, *shape) for i in ssparse]
        rshape = [(i, *shape) for i in rsparse]

        # Per-rank count of send/recv `coordinates`
        scount = [prod(i) for i in sshape]
        rcount = [prod(i) for i in rshape]

        # Per-rank displacement of send/recv `coordinates` (it's actually all
        # contiguous, but the Alltoallv needs this information anyway)
        sdisp = np.concatenate([[0], np.cumsum(scount)[:-1]])
        rdisp = np.concatenate([[0], tuple(np.cumsum(rcount))[:-1]])

        # Total shape of send/recv `coordinates`
        sshape = list(subfunc.shape)
        sshape[0] = sum(ssparse)
        rshape = list(subfunc.shape)
        rshape[0] = sum(rsparse)

        return sshape, scount, sdisp, rshape, rcount, rdisp

    def _dist_data_scatter(self, data=None):
        """
        A ``numpy.ndarray`` containing up-to-date data values belonging
        to the calling MPI rank. A data value belongs to a given MPI rank R
        if its coordinates fall within R's local domain.
        """
        data = data if data is not None else self.data._local

        # If not using MPI, don't waste time
        if self._distributor.nprocs == 1:
            return data

        # Compute dist map only once
        dmap = self._dist_datamap
        mask = self._dist_scatter_mask(dmap=dmap)

        # Pack sparse data values so that they can be sent out via an Alltoallv
        data = data[mask]
        data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))

        # Send out the sparse point values
        _, scount, sdisp, rshape, rcount, rdisp = self._dist_alltoall(dmap=dmap)
        scattered = np.empty(shape=rshape, dtype=self.dtype)
        self._comm.Alltoallv([data, scount, sdisp, self._mpitype],
                             [scattered, rcount, rdisp, self._mpitype])

        # Unpack data values so that they follow the expected storage layout
        return np.ascontiguousarray(np.transpose(scattered, self._dist_reorder_mask))

    def _dist_subfunc_scatter(self, subfunc):
        # If not using MPI, don't waste time
        if self._distributor.nprocs == 1:
            return {subfunc: subfunc.data}

        # Compute dist map only once
        dmap = self._dist_datamap
        mask = self._dist_scatter_mask(dmap=dmap)

        # Pack (reordered) SubFuncion values so that they can be sent out via an Alltoallv
        sfuncd = subfunc.data._local[mask[self._sparse_position]]

        # Send out the sparse point SubFuncion
        _, scount, sdisp, rshape, rcount, rdisp = \
            self._dist_subfunc_alltoall(subfunc, dmap=dmap)
        scattered = np.empty(shape=rshape, dtype=subfunc.dtype)
        self._comm.Alltoallv([sfuncd, scount, sdisp, self._smpitype[subfunc]],
                             [scattered, rcount, rdisp, self._smpitype[subfunc]])
        sfuncd = scattered

        # Translate global SubFuncion values into local SubFuncion values
        if self.dist_origin[subfunc] is not None:
            sfuncd = sfuncd - np.array(self.dist_origin[subfunc], dtype=subfunc.dtype)
        return {subfunc: sfuncd}

    def _dist_data_gather(self, data):
        # If not using MPI, don't waste time
        if self._distributor.nprocs == 1:
            return

        # Compute dist map only once
        try:
            data = self._C_as_ndarray(data)
        except AttributeError:
            pass
        dmap = self._dist_datamap
        mask = self._dist_scatter_mask(dmap=dmap)

        # Pack sparse data values so that they can be sent out via an Alltoallv
        data = np.ascontiguousarray(np.transpose(data, self._dist_reorder_mask))

        # Send back the sparse point values
        sshape, scount, sdisp, rshape, rcount, rdisp = self._dist_alltoall(dmap=dmap)
        gathered = np.empty(shape=sshape, dtype=self.dtype)

        self._comm.Alltoallv([data, rcount, rdisp, self._mpitype],
                             [gathered, scount, sdisp, self._mpitype])

        # Unpack data values so that they follow the expected storage layout
        gathered = np.ascontiguousarray(np.transpose(gathered, self._dist_reorder_mask))
        self._data[mask] = gathered[:]

    def _dist_subfunc_gather(self, sfuncd, subfunc):
        try:
            sfuncd = subfunc._C_as_ndarray(sfuncd)
        except AttributeError:
            pass
        # If not using MPI, don't waste time
        if self._distributor.nprocs == 1:
            return

        # Compute dist map only once
        dmap = self._dist_datamap
        mask = self._dist_scatter_mask(dmap=dmap)

        # Pack (reordered) SubFuncion values so that they can be sent out via an Alltoallv
        if self.dist_origin[subfunc] is not None:
            sfuncd = sfuncd + np.array(self.dist_origin[subfunc], dtype=subfunc.dtype)

        # Send out the sparse point SubFuncion values
        sshape, scount, sdisp, _, rcount, rdisp = \
            self._dist_subfunc_alltoall(subfunc, dmap=dmap)
        gathered = np.empty(shape=sshape, dtype=subfunc.dtype)
        self._comm.Alltoallv([sfuncd, rcount, rdisp, self._smpitype[subfunc]],
                             [gathered, scount, sdisp, self._smpitype[subfunc]])
        subfunc.data._local[mask[self._sparse_position]] = gathered[:]

        # Note: this method "mirrors" `_dist_scatter`: a sparse point that is sent
        # in `_dist_scatter` is here received; a sparse point that is received in
        # `_dist_scatter` is here sent.

    def _dist_scatter(self, data=None):
        mapper = {self: self._dist_data_scatter(data=data)}
        for i in self._sub_functions:
            if getattr(self, i) is not None:
                mapper.update(self._dist_subfunc_scatter(getattr(self, i)))
        return mapper

    def _dist_gather(self, data, *subfunc):
        self._dist_data_gather(data)
        for (sg, s) in zip(subfunc, self._sub_functions):
            if getattr(self, s) is not None:
                self._dist_subfunc_gather(sg, getattr(self, s))

    def _eval_at(self, func):
        return self

    def _halo_exchange(self):
        # no-op for SparseFunctions
        return

    def _arg_defaults(self, alias=None):
        key = alias or self
        mapper = {self: key}
        for i in self._sub_functions:
            f = getattr(key, i)
            if f is not None:
                mapper[getattr(self, i)] = f

        args = ReducerMap()

        # Add in the sparse data (as well as any SubFunction data) belonging to
        # self's local domain only
        for k, v in self._dist_scatter().items():
            args[mapper[k].name] = v
            for i, s in zip(mapper[k].indices, v.shape):
                args.update(i._arg_defaults(_min=0, size=s))

        return args

    def _arg_values(self, **kwargs):
        # Add value override for own data if it is provided, otherwise
        # use defaults
        if self.name in kwargs:
            new = kwargs.pop(self.name)
            if isinstance(new, AbstractSparseFunction):
                # Set new values and re-derive defaults
                values = new._arg_defaults(alias=self).reduce_all()
            else:
                # We've been provided a pure-data replacement (array)
                values = {}
                for k, v in self._dist_scatter(new).items():
                    values[k.name] = v
                    for i, s in zip(k.indices, v.shape):
                        size = s - sum(k._size_nodomain[i])
                        values.update(i._arg_defaults(size=size))
        else:
            values = self._arg_defaults(alias=self).reduce_all()

        return values

    def _arg_apply(self, dataobj, *subfuncs, alias=None):
        key = alias if alias is not None else self
        if isinstance(key, AbstractSparseFunction):
            # Gather into `self.data`
            key._dist_gather(dataobj, *subfuncs)
        elif self._distributor.nprocs > 1:
            raise NotImplementedError("Don't know how to gather data from an "
                                      "object of type `%s`" % type(key))


class AbstractSparseTimeFunction(AbstractSparseFunction):

    """
    An abstract class to define behaviours common to all sparse time-varying functions.
    """

    _time_position = 0
    """Position of time index among the function indices."""

    __rkwargs__ = AbstractSparseFunction.__rkwargs__ + ('nt', 'time_order')

    def __init_finalize__(self, *args, **kwargs):
        self._time_dim = self.indices[self._time_position]
        self._time_order = kwargs.get('time_order', 1)
        if not isinstance(self.time_order, int):
            raise ValueError("`time_order` must be int")

        super().__init_finalize__(*args, **kwargs)

    def __fd_setup__(self):
        """
        Dynamically add derivative short-cuts.
        """
        return generate_fd_shortcuts(self.dimensions, self.space_order,
                                     to=self.time_order)

    @property
    def time_dim(self):
        """The time Dimension."""
        return self._time_dim

    @classmethod
    def __shape_setup__(cls, **kwargs):
        shape = kwargs.get('shape')
        if shape is None:
            nt = kwargs.get('nt')
            if not isinstance(nt, int):
                raise TypeError('Need `nt` int argument')
            if nt <= 0:
                raise ValueError('`nt` must be > 0')

            shape = list(AbstractSparseFunction.__shape_setup__(**kwargs))
            shape.insert(cls._time_position, nt)

        return tuple(shape)

    @classmethod
    def __indices_setup__(cls, *args, **kwargs):
        dimensions = as_tuple(kwargs.get('dimensions'))
        if not dimensions:
            dimensions = (kwargs['grid'].time_dim,
                          Dimension(name='p_%s' % kwargs["name"]),)

        if args:
            return tuple(dimensions), tuple(args)
        else:
            return dimensions, dimensions

    @property
    def nt(self):
        return self.shape[self._time_position]

    @property
    def time_order(self):
        """The time order."""
        return self._time_order

    @property
    def _time_size(self):
        return self.shape_allocated[self._time_position]


[docs] class SparseFunction(AbstractSparseFunction): """ Tensor symbol representing a sparse array in symbolic equations. A SparseFunction carries multi-dimensional data that are not aligned with the computational grid. As such, each data value is associated some coordinates. A SparseFunction provides symbolic interpolation routines to convert between Functions and sparse data points. These are based upon standard [bi,tri]linear interpolation. Parameters ---------- name : str Name of the symbol. npoint : int Number of sparse points. grid : Grid The computational domain from which the sparse points are sampled. coordinates : np.ndarray, optional The coordinates of each sparse point. space_order : int, optional Discretisation order for space derivatives. Defaults to 0. shape : tuple of ints, optional Shape of the object. Defaults to ``(npoint,)``. dimensions : tuple of Dimension, optional Dimensions associated with the object. Only necessary if the SparseFunction defines a multi-dimensional tensor. dtype : data-type, optional Any object that can be interpreted as a numpy data type. Defaults to ``np.float32``. initializer : callable or any object exposing the buffer interface, optional Data initializer. If a callable is provided, data is allocated lazily. allocator : MemoryAllocator, optional Controller for memory allocation. To be used, for example, when one wants to take advantage of the memory hierarchy in a NUMA architecture. Refer to `default_allocator.__doc__` for more information. Examples -------- Creation >>> from devito import Grid, SparseFunction >>> grid = Grid(shape=(4, 4)) >>> sf = SparseFunction(name='sf', grid=grid, npoint=2) >>> sf sf(p_sf) Inspection >>> sf.data Data([0., 0.], dtype=float32) >>> sf.coordinates sf_coords(p_sf, d) >>> sf.coordinates_data array([[0., 0.], [0., 0.]], dtype=float32) Symbolic interpolation routines >>> from devito import Function >>> f = Function(name='f', grid=grid) >>> exprs0 = sf.interpolate(f) >>> exprs1 = sf.inject(f, sf) Notes ----- The parameters must always be given as keyword arguments, since SymPy uses ``*args`` to (re-)create the Dimension arguments of the symbolic object. About SparseFunction and MPI. There is a clear difference between: * Where the sparse points *physically* live, i.e., on which MPI rank. This depends on the user code, particularly on how the data is set up. * and which MPI rank *logically* owns a given sparse point. The logical ownership depends on where the sparse point is located within ``self.grid``. Right before running an Operator (i.e., upon a call to ``op.apply``), a SparseFunction "scatters" its physically owned sparse points so that each MPI rank gets temporary access to all of its logically owned sparse points. A "gather" operation, executed before returning control to user-land, updates the physically owned sparse points in ``self.data`` by collecting the values computed during ``op.apply`` from different MPI ranks. """ is_SparseFunction = True _radius = 1 """The radius of the stencil operators provided by the SparseFunction.""" _sub_functions = ('coordinates',) __rkwargs__ = AbstractSparseFunction.__rkwargs__ + ('coordinates',) def __init_finalize__(self, *args, **kwargs): super().__init_finalize__(*args, **kwargs) self.interpolator = LinearInterpolator(self) # Set up sparse point coordinates coordinates = kwargs.get('coordinates', kwargs.get('coordinates_data')) self._coordinates = self.__subfunc_setup__(coordinates, 'coords') self._dist_origin = {self._coordinates: self.grid.origin_offset} @cached_property def _coordinate_symbols(self): """Symbol representing the coordinate values in each Dimension.""" d_dim = self.coordinates.dimensions[1] return tuple([self.coordinates._subs(d_dim, i) for i in range(self.grid.dim)]) @cached_property def _decomposition(self): mapper = {self._sparse_dim: self._distributor.decomposition[self._sparse_dim]} return tuple(mapper.get(d) for d in self.dimensions)
[docs] class SparseTimeFunction(AbstractSparseTimeFunction, SparseFunction): """ Tensor symbol representing a space- and time-varying sparse array in symbolic equations. Like SparseFunction, SparseTimeFunction carries multi-dimensional data that are not aligned with the computational grid. As such, each data value is associated some coordinates. A SparseTimeFunction provides symbolic interpolation routines to convert between TimeFunctions and sparse data points. These are based upon standard [bi,tri]linear interpolation. Parameters ---------- name : str Name of the symbol. npoint : int Number of sparse points. nt : int Number of timesteps along the time Dimension. grid : Grid The computational domain from which the sparse points are sampled. coordinates : np.ndarray, optional The coordinates of each sparse point. space_order : int, optional Discretisation order for space derivatives. Defaults to 0. time_order : int, optional Discretisation order for time derivatives. Defaults to 1. shape : tuple of ints, optional Shape of the object. Defaults to ``(nt, npoint)``. dimensions : tuple of Dimension, optional Dimensions associated with the object. Only necessary if the SparseFunction defines a multi-dimensional tensor. dtype : data-type, optional Any object that can be interpreted as a numpy data type. Defaults to ``np.float32``. initializer : callable or any object exposing the buffer interface, optional Data initializer. If a callable is provided, data is allocated lazily. allocator : MemoryAllocator, optional Controller for memory allocation. To be used, for example, when one wants to take advantage of the memory hierarchy in a NUMA architecture. Refer to `default_allocator.__doc__` for more information. Examples -------- Creation >>> from devito import Grid, SparseTimeFunction >>> grid = Grid(shape=(4, 4)) >>> sf = SparseTimeFunction(name='sf', grid=grid, npoint=2, nt=3) >>> sf sf(time, p_sf) Inspection >>> sf.data Data([[0., 0.], [0., 0.], [0., 0.]], dtype=float32) >>> sf.coordinates sf_coords(p_sf, d) >>> sf.coordinates_data array([[0., 0.], [0., 0.]], dtype=float32) Symbolic interpolation routines >>> from devito import TimeFunction >>> f = TimeFunction(name='f', grid=grid) >>> exprs0 = sf.interpolate(f) >>> exprs1 = sf.inject(f, sf) Notes ----- The parameters must always be given as keyword arguments, since SymPy uses ``*args`` to (re-)create the Dimension arguments of the symbolic object. """ is_SparseTimeFunction = True __rkwargs__ = tuple(filter_ordered(AbstractSparseTimeFunction.__rkwargs__ + SparseFunction.__rkwargs__))
[docs] def interpolate(self, expr, u_t=None, p_t=None, increment=False): """ Generate equations interpolating an arbitrary expression into ``self``. Parameters ---------- expr : expr-like Input expression to interpolate. u_t : expr-like, optional Time index at which the interpolation is performed. p_t : expr-like, optional Time index at which the result of the interpolation is stored. increment: bool, optional If True, generate increments (Inc) rather than assignments (Eq). """ # Apply optional time symbol substitutions to expr subs = {} if u_t is not None: time = self.grid.time_dim t = self.grid.stepping_dim expr = expr.subs({time: u_t, t: u_t}) if p_t is not None: subs = {self.time_dim: p_t} return super().interpolate(expr, increment=increment, self_subs=subs)
[docs] def inject(self, field, expr, u_t=None, p_t=None, implicit_dims=None): """ Generate equations injecting an arbitrary expression into a field. Parameters ---------- field : Function Input field into which the injection is performed. expr : expr-like Injected expression. u_t : expr-like, optional Time index at which the interpolation is performed. p_t : expr-like, optional Time index at which the result of the interpolation is stored. implicit_dims : Dimension or list of Dimension, optional An ordered list of Dimensions that do not explicitly appear in the injection expression, but that should be honored when constructing the operator. """ # Apply optional time symbol substitutions to field and expr if u_t is not None: field = field.subs({field.time_dim: u_t}) if p_t is not None: expr = expr.subs({self.time_dim: p_t}) return super().inject(field, expr, implicit_dims=implicit_dims)
[docs] class PrecomputedSparseFunction(AbstractSparseFunction): """ Tensor symbol representing a sparse array in symbolic equations; unlike SparseFunction, PrecomputedSparseFunction uses externally-defined data for interpolation. Parameters ---------- name : str Name of the symbol. npoint : int Number of sparse points. grid : Grid The computational domain from which the sparse points are sampled. r : int Number of gridpoints in each Dimension to interpolate a single sparse point to. E.g. `r=2` for linear interpolation. coordinates : np.ndarray, optional The coordinates of each sparse point. gridpoints : np.ndarray, optional An array carrying the *reference* grid point corresponding to each sparse point. Of all the gridpoints that one sparse point would be interpolated to, this is the grid point closest to the origin, i.e. the one with the lowest value of each coordinate Dimension. Must be a two-dimensional array of shape `(npoint, grid.ndim)`. interpolation_coeffs : np.ndarray, optional An array containing the coefficient for each of the r^2 (2D) or r^3 (3D) gridpoints that each sparse point will be interpolated to. The coefficient is split across the n Dimensions such that the contribution of the point (i, j, k) will be multiplied by `interp_coeffs[..., i]*interp_coeffs[...,j]*interp_coeffs[...,k]`. So for `r=6`, we will store 18 coefficients per sparse point (instead of potentially 216). Must be a three-dimensional array of shape `(npoint, grid.ndim, r)`. space_order : int, optional Discretisation order for space derivatives. Defaults to 0. shape : tuple of ints, optional Shape of the object. Defaults to `(npoint,)`. dimensions : tuple of Dimension, optional Dimensions associated with the object. Only necessary if the SparseFunction defines a multi-dimensional tensor. dtype : data-type, optional Any object that can be interpreted as a numpy data type. Defaults to `np.float32`. initializer : callable or any object exposing the buffer interface, optional Data initializer. If a callable is provided, data is allocated lazily. allocator : MemoryAllocator, optional Controller for memory allocation. To be used, for example, when one wants to take advantage of the memory hierarchy in a NUMA architecture. Refer to `default_allocator.__doc__` for more information. Notes ----- The parameters must always be given as keyword arguments, since SymPy uses `*args` to (re-)create the Dimension arguments of the symbolic object. """ _sub_functions = ('gridpoints', 'coordinates', 'interpolation_coeffs') __rkwargs__ = (AbstractSparseFunction.__rkwargs__ + ('r', 'gridpoints', 'coordinates', 'interpolation_coeffs')) def __init_finalize__(self, *args, **kwargs): super().__init_finalize__(*args, **kwargs) # Process kwargs coordinates = kwargs.get('coordinates', kwargs.get('coordinates_data')) gridpoints = kwargs.get('gridpoints', kwargs.get('gridpoints_data')) interpolation_coeffs = kwargs.get('interpolation_coeffs', kwargs.get('interpolation_coeffs_data')) # Grid points per sparse point (2 in the case of bilinear and trilinear) r = kwargs.get('r') if not is_integer(r): raise TypeError('Need `r` int argument') if r <= 0: raise ValueError('`r` must be > 0') # Make sure radius matches the coefficients size if interpolation_coeffs is not None: nr = interpolation_coeffs.shape[-1] if nr // 2 != r: if nr == r: r = r // 2 else: raise ValueError("Interpolation coefficients shape %d do " "not match specified radius %d" % (r, nr)) self._radius = r if coordinates is not None and gridpoints is not None: raise ValueError("Either `coordinates` or `gridpoints` must be " "provided, but not both") # Specifying only `npoints` is acceptable; this will require the user # to setup the coordinates data later on npoint = kwargs.get('npoint', None) if self.npoint and coordinates is None and gridpoints is None: coordinates = np.zeros((npoint, self.grid.dim)) if coordinates is not None: self._coordinates = self.__subfunc_setup__(coordinates, 'coords') self._gridpoints = None self._dist_origin = {self._coordinates: self.grid.origin_offset} else: assert gridpoints is not None self._coordinates = None self._gridpoints = self.__subfunc_setup__(gridpoints, 'gridpoints', dtype=np.int32) self._dist_origin = {self._gridpoints: self.grid.origin_ioffset} # Setup the interpolation coefficients. These are compulsory self._interpolation_coeffs = \ self.__subfunc_setup__(interpolation_coeffs, 'interp_coeffs') self._dist_origin.update({self._interpolation_coeffs: None}) self.interpolator = PrecomputedInterpolator(self) @property def interpolation_coeffs(self): """ The Precomputed interpolation coefficients.""" return self._interpolation_coeffs @property def interpolation_coeffs_data(self): return self.interpolation_coeffs.data._local.view(np.ndarray) @cached_property def _coordinate_symbols(self): """Symbol representing the coordinate values in each Dimension.""" if self.gridpoints is not None: d_dim = self.gridpoints.dimensions[1] return tuple([self.gridpoints._subs(d_dim, di) * d.spacing + o for ((di, d), o) in zip(enumerate(self.grid.dimensions), self.grid.origin)]) else: d_dim = self.coordinates.dimensions[1] return tuple([self.coordinates._subs(d_dim, i) for i in range(self.grid.dim)]) @cached_property def _position_map(self): """ Symbol for each grid index according to the coordinates. Notes ----- The expression `(coord - origin)/spacing` could also be computed in the mathematically equivalent expanded form `coord/spacing - origin/spacing`. This particular form is problematic when a sparse point is in close proximity of the grid origin, since due to a larger machine precision error it may cause a +-1 error in the computation of the position. We mitigate this problem by computing the positions individually (hence the need for a position map). """ if self.gridpoints is not None: ddim = self.gridpoints.dimensions[-1] return OrderedDict((self.gridpoints._subs(ddim, di), p) for (di, p) in zip(range(self.grid.dim), self._pos_symbols)) else: return super()._position_map
[docs] class PrecomputedSparseTimeFunction(AbstractSparseTimeFunction, PrecomputedSparseFunction): """ Tensor symbol representing a space- and time-varying sparse array in symbolic equations; unlike SparseTimeFunction, PrecomputedSparseTimeFunction uses externally-defined data for interpolation. Parameters ---------- name : str Name of the symbol. npoint : int Number of sparse points. grid : Grid The computational domain from which the sparse points are sampled. r : int Number of gridpoints in each Dimension to interpolate a single sparse point to. E.g. `r=2` for linear interpolation. coordinates : np.ndarray, optional The coordinates of each sparse point. gridpoints : np.ndarray, optional An array carrying the *reference* grid point corresponding to each sparse point. Of all the gridpoints that one sparse point would be interpolated to, this is the grid point closest to the origin, i.e. the one with the lowest value of each coordinate Dimension. Must be a two-dimensional array of shape `(npoint, grid.ndim)`. interpolation_coeffs : np.ndarray, optional An array containing the coefficient for each of the r^2 (2D) or r^3 (3D) gridpoints that each sparse point will be interpolated to. The coefficient is split across the n Dimensions such that the contribution of the point (i, j, k) will be multiplied by `interp_coeffs[..., i]*interp_coeffs[...,j]*interp_coeffs[...,k]`. So for `r=6`, we will store 18 coefficients per sparse point (instead of potentially 216). Must be a three-dimensional array of shape `(npoint, grid.ndim, r)`. space_order : int, optional Discretisation order for space derivatives. Defaults to 0. time_order : int, optional Discretisation order for time derivatives. Default to 1. shape : tuple of ints, optional Shape of the object. Defaults to `(npoint,)`. dimensions : tuple of Dimension, optional Dimensions associated with the object. Only necessary if the SparseFunction defines a multi-dimensional tensor. dtype : data-type, optional Any object that can be interpreted as a numpy data type. Defaults to `np.float32`. initializer : callable or any object exposing the buffer interface, optional Data initializer. If a callable is provided, data is allocated lazily. allocator : MemoryAllocator, optional Controller for memory allocation. To be used, for example, when one wants to take advantage of the memory hierarchy in a NUMA architecture. Refer to `default_allocator.__doc__` for more information. Notes ----- The parameters must always be given as keyword arguments, since SymPy uses ``*args`` to (re-)create the Dimension arguments of the symbolic object. """ __rkwargs__ = tuple(filter_ordered(AbstractSparseTimeFunction.__rkwargs__ + PrecomputedSparseFunction.__rkwargs__))
# *** MatrixSparse*Function API # This is mostly legacy stuff which often escapes the devito's modus operandi class DynamicSubFunction(SubFunction): def _arg_defaults(self, **kwargs): return {} class MatrixSparseTimeFunction(AbstractSparseTimeFunction): """ A specialised type of SparseTimeFunction where the interpolation is externally defined. Currently, this means that the (integer) grid points and associated coefficients for each sparse point are explicitly provided as separate SubFunctions. Additionally, this class allows sources and receivers to be constructed from multiple locations, each with their own coefficients. This is to support injection and sampling of dipole (and more general) sources and receivers, without needing to store multiple versions of the sample data that vary only by a scalar constant. matrix: scipy.sparse matrix A scipy-style sparse matrix with a row for each physical point in the grid, and a column for each index into the data array. r: int or Mapping[Dimension, Optional[int]] The number of gridpoints in each Dimension used to inject/interpolate each physical point. e.g. bi-/tri-linear interplation would use 2 coefficients in each Dimension. The Mapping version of this parameter allows a different number of grid points in each Dimension. If a Dimension maps to None, this has a special interpretation - sources are not localised to coordinates in that Dimension. This is loosely equivalent to specifying r[dim] = dim_size, and with all gridpoint locations along that Dimension equal to zero. par_dim: Dimension If set, this is the Dimension used to split the sources for parallel injection. The source injection loop becomes a loop over this spatial Dimension, and then a loop over sources which touch that spatial Dimension coordinate. This defaults to grid.dimensions[0], and if specified must correspond to one of the grid.dimensions. other parameters as per SparseTimeFunction Location/coefficient data: msf.gridpoints.data[iloc, idim]: int integer, position (in global coordinates) of the _minimum_ index that location index `iloc` is interpolated from / injected into, in Dimension `idim` where idim is an index into the grid.dimensions msf.interpolation_coefficients: Dict[Dimension, np.ndarray] For each Dimension, there is an array of interpolation coefficients for each location `iloc`. This array is of shape (nloc, r), and is also available as msf.coefficients_x.data[iloc, ir] These are the coefficients that are multiplied by sample values at the gridpoints in the range: [msf.gridpoints.data[iloc, idim], msf.gridpoints.data[iloc, idim] + r) NOTE: *** restriction on space order of functions being sampled/injected into The halo of the function being interpolated/injected into must be larger than r, otherwise out of bounds access may result. NOTE: *** explicit scatter/gather semantics Before using this in an Operator, msf.manual_scatter() must be called to distribute the data. This only needs to be done once for any number of calls to the Operator (e.g. for checkpointing), if the data, gridpoints and coefficients have not changed. This is true whether or not MPI is being used, and independent of the MPI_Size. Likewise, after all time steps have been run, data must be collected from remote ranks using msf.manual_gather() before relying on any of the data from msf.data[:] .. note:: The parameters must always be given as keyword arguments, since SymPy uses `*args` to (re-)create the Dimension arguments of the symbolic function. """ is_SparseFunction = True is_SparseTimeFunction = True _time_position = 0 """Position of time index among the function indices.""" # We use DiscreteFunction instead of AbstractSparseTimeFunction # because we want to get rid of 'npoint' __rkwargs__ = (DiscreteFunction.__rkwargs__ + ('dimensions', 'r', 'matrix', 'nt', 'grid')) def __init_finalize__(self, *args, **kwargs): # The crucial argument to DugSparseTimeFunction is a sparse # matrix mapping a "source" or "receiver" to a set of locations self.matrix = kwargs.pop('matrix') from devito.data.allocators import default_allocator self._allocator = kwargs.get("allocator", default_allocator()) # Rows are locations, columns are source/receivers nloc, npoint = self.matrix.shape super().__init_finalize__(*args, **kwargs, npoint=npoint) # Grid points per sparse point r = kwargs.get('r') if r is None: raise ValueError('MatrixSparseTimeFunction requires parameter `r`') if is_integer(r): if r <= 0: raise ValueError('MatrixSparseTimeFunction requires r > 0') # convert to dictionary with same size in all dims r = {dim: r for dim in self.grid.dimensions} # Validate radius is set correctly for all grid Dimensions for d in self.grid.dimensions: if d not in r: raise ValueError("dimension %s not specified in r mapping" % d) if r[d] is None: continue if not is_integer(r[d]) or r[d] <= 0: raise ValueError('invalid parameter value r[%s] = %s' % (d, r[d])) # TODO is this going to cause some trouble with users of self.r? self._radius = r # Get the parallelism Dimension for injection self._par_dim = kwargs.get("par_dim") if self._par_dim is not None: assert self._par_dim in self.grid.dimensions else: self._par_dim = self.grid.dimensions[0] # This has one value per Dimension (e.g. size=3 for 3D) # Maybe this should be unique per SparseFunction, # but I can't see a need yet. ddim = Dimension('d') # Sources have their own Dimension # As do Locations locdim = Dimension('loc_%s' % self.name) self._gridpoints = SubFunction( name="%s_gridpoints" % self.name, dtype=np.int32, dimensions=(locdim, ddim), shape=(nloc, self.grid.dim), allocator=self._allocator, space_order=0, parent=self) # There is a coefficient array per grid Dimension # I could pack these into one array but that seems less readable? self.interpolation_coefficients = {} self.interpolation_coefficients_t_bogus = {} self.rdims = [] for d in self.grid.dimensions: if self._radius[d] is not None: rdim = DefaultDimension( name='r%s_%s' % (d.name, self.name), default_value=self._radius[d] ) self.rdims.append(rdim) coeff_dim = rdim coeff_shape = self._radius[d] else: coeff_dim = d coeff_shape = self.grid.dimension_map[d].glb self.interpolation_coefficients[d] = SubFunction( name="%s_coefficients_%s" % (self.name, d.name), dtype=self.dtype, dimensions=(locdim, coeff_dim), shape=(nloc, coeff_shape), allocator=self._allocator, space_order=0, parent=self) # For the _sub_functions, these must be named attributes of # this SparseFunction object setattr( self, "coefficients_%s" % d.name, self.interpolation_coefficients[d]) # We also need arrays to represent the sparse matrix map # The shapes are bogus; these are really only used when # constructing the expression, # - the mpi logic dynamically constructs arrays to feed to the # operator C code. self.nnzdim = Dimension('nnz_%s' % self.name) # In the non-MPI case, at least, we should fill these in once if self._distributor.nprocs == 1: m_coo = self.matrix.tocoo(copy=False) nnz_size = m_coo.nnz else: nnz_size = 1 self._mrow = DynamicSubFunction( name='mrow_%s' % self.name, dtype=np.int32, dimensions=(self.nnzdim,), shape=(nnz_size,), space_order=0, parent=self, allocator=self._allocator, ) self._mcol = DynamicSubFunction( name='mcol_%s' % self.name, dtype=np.int32, dimensions=(self.nnzdim,), shape=(nnz_size,), space_order=0, parent=self, allocator=self._allocator, ) self._mval = DynamicSubFunction( name='mval_%s' % self.name, dtype=self.dtype, dimensions=(self.nnzdim,), shape=(nnz_size,), space_order=0, parent=self, allocator=self._allocator, ) # This loop maintains a map of nnz indices which touch each # coordinate of the parallised injection Dimension # This takes the form of a list of nnz indices, and a start/end # position in that list for each index in the parallel dim self.par_dim_to_nnz_dim = DynamicDimension('par_dim_to_nnz_%s' % self.name) # This map acts as an indirect sort of the sources according to their # position along the parallelisation dimension self._par_dim_to_nnz_map = DynamicSubFunction( name='par_dim_to_nnz_map_%s' % self.name, dtype=np.int32, dimensions=(self.par_dim_to_nnz_dim,), # shape is unknown at this stage shape=(1,), space_order=0, parent=self, ) self._par_dim_to_nnz_m = DynamicSubFunction( name='par_dim_to_nnz_m_%s' % self.name, dtype=np.int32, dimensions=(self._par_dim,), # shape is unknown at this stage shape=(1,), space_order=0, parent=self, ) self._par_dim_to_nnz_M = DynamicSubFunction( name='par_dim_to_nnz_M_%s' % self.name, dtype=np.int32, dimensions=(self._par_dim,), # shape is unknown at this stage shape=(1,), space_order=0, parent=self, ) if self._distributor.nprocs == 1: self._mrow.data[:] = m_coo.row self._mcol.data[:] = m_coo.col self._mval.data[:] = m_coo.data # self._fd = generate_fd_shortcuts(self) self.scatter_result = None self.scattered_data = None def free_data(self): # The sympy cache holds the symbol references, but we can break the link # between the symbol and the data, thus causing the memory to be freed # This renders the object useless self._data = None self._gridpoints._data = None self._mrow._data = None self._mcol._data = None self._mval._data = None for f in self.interpolation_coefficients.values(): f._data = None self.scatter_result = None self.scattered_data = None __distributor_setup__ = DiscreteFunction.__distributor_setup__ @property def dt(self): t = self.time_dim dt = self.time_dim.spacing return (-1 * self.subs(t, t - dt) + self.subs(t, t + dt))/(2 * dt) @property def dt2(self): t = self.time_dim dt = self.time_dim.spacing return (self.subs(t, t - dt) - 2 * self + self.subs(t, t + dt))/(dt*dt) @property def mrow(self): return self._mrow @property def mcol(self): return self._mcol @property def mval(self): return self._mval @property def par_dim_to_nnz_map(self): return self._par_dim_to_nnz_map @property def par_dim_to_nnz_m(self): return self._par_dim_to_nnz_m @property def par_dim_to_nnz_M(self): return self._par_dim_to_nnz_M @property def _sub_functions(self): return ('gridpoints', *['coefficients_%s' % d.name for d in self.grid.dimensions], 'mrow', 'mcol', 'mval', 'par_dim_to_nnz_map', 'par_dim_to_nnz_m', 'par_dim_to_nnz_M') def interpolate(self, expr, u_t=None, p_t=None): """Creates a :class:`sympy.Eq` equation for the interpolation of an expression onto this sparse point collection. :param expr: The expression to interpolate. :param u_t: (Optional) time index to use for indexing into field data in `expr`. :param p_t: (Optional) time index to use for indexing into the sparse point data. """ expr = indexify(expr) # Apply optional time symbol substitutions to expr if u_t is not None: time = self.grid.time_dim t = self.grid.stepping_dim expr = expr.subs(t, u_t).subs(time, u_t) gridpoints = self._gridpoints.indexed mrow = self._mrow.indexed mcol = self._mcol.indexed mval = self._mval.indexed tdim, pdim = self.indices locdim, ddim = self._gridpoints.indices nnzdim = self.nnzdim row = mrow[nnzdim] dim_subs = [(pdim, mcol[nnzdim])] coeffs = [mval[nnzdim]] for i, d in enumerate(self.grid.dimensions): _, rd = self.interpolation_coefficients[d].dimensions coefficients = self.interpolation_coefficients[d].indexed # If radius is set to None, then the coefficient array is # actually the full size of the grid Dimension itself if self._radius[d] is not None: dim_subs.append((d, rd + gridpoints[row, i])) else: assert d is rd coeffs.append(coefficients[row, rd]) # Apply optional time symbol substitutions to lhs of assignment lhs = self if p_t is None else self.subs(tdim, p_t) lhs = lhs.subs([(pdim, mcol[nnzdim])]) rhs = prod(coeffs) * expr.subs(dim_subs) return [Eq(self, 0), Inc(lhs, rhs)] def inject(self, field, expr, u_t=None, p_t=None): """Symbol for injection of an expression onto a grid :param field: The grid field into which we inject. :param expr: The expression to inject. :param u_t: (Optional) time index to use for indexing into `field`. :param p_t: (Optional) time index to use for indexing into `expr`. """ expr = indexify(expr) field = indexify(field) tdim, pdim = self.indices par_dim_to_nnz_dim = self.par_dim_to_nnz_dim locdim, ddim = self.gridpoints.indices # Apply optional time symbol substitutions to field and expr if u_t is not None: field = field.subs(field.indices[0], u_t) if p_t is not None: expr = expr.subs(tdim, p_t) gridpoints = self._gridpoints.indexed mrow = self._mrow.indexed mcol = self._mcol.indexed mval = self._mval.indexed partonnz = self._par_dim_to_nnz_map.indexed nnz_index = partonnz[par_dim_to_nnz_dim] row = mrow[nnz_index] dim_subs = [(pdim, mcol[nnz_index])] coeffs = [mval[nnz_index]] # Devito requires a fixed ordering of Dimensions across # all loops, which means we need to respect that when constructing # the loops for this injection. # to that end, we keep the pairs (x, rx) (y, ry) together in the # ordering. par_dim_seen = False implicit_dims_for_range = [tdim] implicit_dims_for_inject = [tdim] for i, d in enumerate(self.grid.dimensions): _, rd = self.interpolation_coefficients[d].dimensions coefficients = self.interpolation_coefficients[d].indexed # There are four cases here. if d is self._par_dim: if self._radius[d] is None: # If d is the parallelism Dimension, AND this Dimension is # non-local (i.e. all sources touch all indices, and # gridpoint for this dim is ignored) coeffs.append(coefficients[row, d]) else: # d is the parallelism Dimension, so the index into # the coefficients array is derived from the value of # this Dimension minus the gridpoint of the point coeffs.append(coefficients[row, d - gridpoints[row, i]]) # loop dim here is always d implicit_dims_for_range.append(d) implicit_dims_for_inject.append(d) implicit_dims_for_inject.append(par_dim_to_nnz_dim) par_dim_seen = True else: if self._radius[d] is None: # d is not the parallelism Dimension, AND this Dimension # is non-local (i.e. all sources touch all indices, # and gridpoint for this dim is ignored) # the loop is therefore over the original Dimension d coeffs.append(coefficients[row, d]) loop_dim = d else: # d is not the parallelism Dimension, and it _is_ # local. In this case the loop is over the radius Dimension # and we need to substitute d with the offset from the # grid point dim_subs.append((d, rd + gridpoints[row, i])) coeffs.append(coefficients[row, rd]) loop_dim = rd implicit_dims_for_inject.append(loop_dim) if not par_dim_seen: implicit_dims_for_range.append(loop_dim) rhs = prod(coeffs) * expr field = field.subs(dim_subs) out = [ Eq( par_dim_to_nnz_dim.symbolic_min, self._par_dim_to_nnz_m, implicit_dims=tuple(implicit_dims_for_range) ), Eq( par_dim_to_nnz_dim.symbolic_max, self._par_dim_to_nnz_M, implicit_dims=tuple(implicit_dims_for_range) ), Inc( field, rhs.subs(dim_subs), implicit_dims=IgnoreDimSort(implicit_dims_for_inject), ), ] return out @classmethod def __shape_setup__(cls, **kwargs): # This happens before __init__, so we have to get 'npoint' # from the matrix _, npoint = kwargs['matrix'].shape return kwargs.get('shape', (kwargs.get('nt'), npoint)) @property def _arg_names(self): """Return a tuple of argument names introduced by this function.""" return tuple([self.name, self.name + "_" + self.gridpoints.name] + ['%s_%s' % (self.name, x.name) for x in self.interpolation_coefficients.values()]) @property def gridpoints(self): return self._gridpoints def _rank_to_points(self): """ For each rank in self._distributor, return a numpy array of int32s for the positions within this rank's self.gridpoints/self.interpolation_coefficients (i.e. the locdim) which must be injected into that rank. Any given location may require injection into several ranks, based on the radius of the injection stencil and its proximity to a rank boundary. It is assumed, for now, that any given location may be completely sampled from within one rank - so when gathering the data, any point sampled from more than one rank may have duplicates discarded. This implies that the radius of the sampling is less than the halo size of the Functions being sampled from. It also requires that the halos be exchanged before interpolation (must verify that this occurs). """ distributor = self._distributor # Along each Dimension, the coordinate indices are broken into # 2*decomposition_size+3 groups, numbered starting at 0 # Group 2*i contributes only to rank i-1 # Group 2*i+1 contributes to rank i-1 and rank i # Obviously this means groups 0 and 1 are "bad" - they contribute # to points to the left of the domain (rank -1) # So is group 2*decomp_size+1 and 2*decomp_size+2 # (these contributes to rank "decomp_size") # binned_gridpoints will hold which group the particular # point is along that decomposed Dimension. binned_gridpoints = np.empty_like(self._gridpoints.data) dim_group_dim_rank = [] for idim, dim in enumerate(self.grid.dimensions): decomp = distributor.decomposition[idim] decomp_size = len(decomp) dim_breaks = np.empty([2*decomp_size+2], dtype=np.int32) dim_r = self.r[dim] if dim_r is None: # size is the whole grid dim_r = self.grid.dimension_map[dim].glb # Define the split dim_breaks[:-2:2] = [ decomp_part[0] - self.r + 1 for decomp_part in decomp] dim_breaks[-2] = decomp[-1][-1] + 1 - self.r + 1 dim_breaks[1:-1:2] = [ decomp_part[0] for decomp_part in decomp] dim_breaks[-1] = decomp[-1][-1] + 1 # Handle the radius is None case by ensuring we treat # all grid points in that direction as zero gridpoints_dim = self._gridpoints.data[:, idim] if self.r[dim] is None: gridpoints_dim = np.zeros_like(gridpoints_dim) try: binned_gridpoints[:, idim] = np.digitize( gridpoints_dim, dim_breaks) except ValueError as e: raise ValueError( "decomposition failed! Are some ranks too skinny?" ) from e this_group_rank_map = { 0: {None}, 1: {None, 0}, **{2*i+2: {i} for i in range(decomp_size)}, **{2*i+2+1: {i, i+1} for i in range(decomp_size-1)}, 2*decomp_size+1: {decomp_size-1, None}, 2*decomp_size+2: {None}} dim_group_dim_rank.append(this_group_rank_map) # This allows the points to be grouped into non-overlapping sets # based on their bin in each Dimension. For each set we build a list # of points. bins, inverse, counts = np.unique( binned_gridpoints, return_inverse=True, return_counts=True, axis=0) # inverse is now a "unique bin number" for each point gridpoints # we want to turn that into a list of points for each bin # so we argsort inverse_argsort = np.argsort(inverse).astype(np.int32) cumulative_counts = np.cumsum(counts) gp_map = {tuple(bi): inverse_argsort[cci-ci:cci] for bi, cci, ci in zip(bins, cumulative_counts, counts) } # the result is now going to be a concatenation of these lists # for each of the output ranks # each bin has a set of ranks -> each rank has a set (possibly empty) # of bins # For each rank get the per-dimension coordinates # TODO maybe we should cache this on the distributor dim_ranks_to_glb = { tuple(distributor.comm.Get_coords(rank)): rank for rank in range(distributor.comm.Get_size())} global_rank_to_bins = {} from itertools import product for bi in bins: # This is a list of sets for the Dimension-specific rank dim_rank_sets = [dgdr[bii] for dgdr, bii in zip(dim_group_dim_rank, bi)] # Convert these to an absolute rank # This is where we will throw a KeyError if there are points OOB for dim_ranks in product(*dim_rank_sets): global_rank = dim_ranks_to_glb[tuple(dim_ranks)] global_rank_to_bins\ .setdefault(global_rank, set())\ .add(tuple(bi)) empty = np.array([], dtype=np.int32) return [np.concatenate(( empty, *[gp_map[bi] for bi in global_rank_to_bins.get(rank, [])])) for rank in range(distributor.comm.Get_size())] def _build_par_dim_to_nnz(self, active_gp, active_mrow): # The case where we parallelise over a non-local index is suboptimal, but # supported. In this case, the actual grid point locations are ignored # and all points are touched. pardim_index = self.grid.dimensions.index(self._par_dim) if self._radius[self._par_dim] is None: # early exit with degenerate case - no reordering and all coordinate # values touch all parts of the array nnz_M = active_mrow.size - 1 return { self._par_dim_to_nnz_map: np.arange(active_mrow.size, dtype=np.int32), self._par_dim_to_nnz_m: np.zeros( (self.grid.shape_local[pardim_index],), dtype=np.int32 ), self._par_dim_to_nnz_M: np.full( (self.grid.shape_local[pardim_index],), nnz_M, dtype=np.int32 ), } # Get the radius along the parallel Dimension r = self._radius[self._par_dim] # now, the parameters can be devito.Data, which doesn't like fancy indexing # very much. So, we convert to regular numpy arrays active_gp = np.array(active_gp) active_mrow = np.array(active_mrow) # sort the injected nonzero indices by parallel coordinate pardim_coordinates_nnz = active_gp[active_mrow, pardim_index] reordering = np.argsort(pardim_coordinates_nnz) pardim_reordered = pardim_coordinates_nnz[reordering] # now each x coordinate that we inject into has a range # of relevant entries in the reordered array # we don't worry about MPI here; by the time this function is called, # all gridpoints have been renumbered to local offsets # this coordinate is touched by any source with gridpoint >= x - r + 1 # and gridpoint <= x all_xs = np.arange(self.grid.shape_local[pardim_index]) # This should satisfy: # x_reordered[i-1] < x - r + 1 <= x_reordered[i] reordered_m = np.searchsorted(pardim_reordered, all_xs - r + 1, side='left') # x_reordered[i-1] <= x < x_reordered[i] reordered_M = np.searchsorted(pardim_reordered, all_xs, side='right') - 1 # return output suitable for scatter return { self._par_dim_to_nnz_map: reordering.astype(np.int32), self._par_dim_to_nnz_m: reordered_m.astype(np.int32), self._par_dim_to_nnz_M: reordered_M.astype(np.int32), } def manual_scatter(self, *, data_all_zero=False): distributor = self._distributor if distributor.nprocs == 1: self.scattered_data = self.data self.scatter_result = { self: self.data, **{ getattr(self, k): getattr(self, k).data for k in self._sub_functions }, self.mrow: self.mrow.data, self.mcol: self.mcol.data, self.mval: self.mval.data, **self._build_par_dim_to_nnz(self.gridpoints.data, self.mrow.data), } return # Generate the matrix arrays m_coo = self.matrix.tocoo(copy=False) # HACK: for now, only take npoints != 0 on rank 0 # Broadcast all the data, gridpoints, coefficients to all ranks # Each rank then ignores any of the data which isn't in its own # domain. if distributor.myrank != 0 and self.npoint != 0: raise ValueError("can only accept sources/receivers on rank 0") # args[self.mrow.name] = m_coo.row.copy() # args[self.mcol.name] = m_coo.col.copy() # args[self.mval.name] = m_coo.data.copy() # args.update(self.nnzdim._arg_defaults(size=m_coo.nnz)) # Send out data # Send out gridpoints # Send out coefficients # Send out matrix rows, cols, data r_tuple = tuple(self.r[dim] for dim in self.grid.dimensions) npoint, nloc, nnz, ndim, r_tuple_bcast, nt = distributor.comm.bcast( (self.npoint, self._gridpoints.data.shape[0], m_coo.nnz, self._gridpoints.data.shape[-1], r_tuple, self.data.shape[self._time_position]), root=0) # important that all ranks have the same ndims and same r assert r_tuple == r_tuple_bcast assert ndim == self._gridpoints.data.shape[-1] # handle None radius r_tuple_no_none = tuple( ri if ri is not None else self.grid.dimension_map[d].glb for ri, d in zip(r_tuple, self.grid.dimensions) ) # now all ranks can allocate the buffers to receive into if distributor.myrank != 0: if data_all_zero: scattered_data = np.zeros([nt, npoint], dtype=self.dtype) else: scattered_data = np.empty([nt, npoint], dtype=self.dtype) scattered_gp = np.empty([nloc, ndim], dtype=np.int32) scattered_coeffs = [ np.empty([nloc, r_tuple_no_none[idim]], dtype=self.dtype) for idim in range(ndim) ] scattered_mrow = np.empty([nnz], dtype=np.int32) scattered_mcol = np.empty([nnz], dtype=np.int32) scattered_mval = np.empty([nnz], dtype=self.dtype) else: scattered_data = self.data # These are copies because we mess with them down below scattered_gp = self._gridpoints.data.copy() scattered_coeffs = [ self.interpolation_coefficients[d].data.copy() for d in self.grid.dimensions] scattered_mrow = m_coo.row.copy() scattered_mcol = m_coo.col.copy() scattered_mval = m_coo.data.copy() if not data_all_zero: distributor.comm.Bcast(scattered_data, root=0) for arr in [scattered_gp, *scattered_coeffs, scattered_mrow, scattered_mcol, scattered_mval]: distributor.comm.Bcast(arr, root=0) # now recreate the matrix to only contain points in our # local domain. # along each Dimension, each point is in one of 5 groups # 0 - completely to the left # 1 - to the left, but the injection stencil touches our domain # 2 - completely in our domain # 3 - in the domain, but the injection stencil includes points # to the right # 4 - completely to the right active_mrow = scattered_mrow active_mcol = scattered_mcol active_mval = scattered_mval # first, build a reduced matrix excluding any points outside our domain for idim, (dim, mycoord) in enumerate(zip( self.grid.dimensions, distributor.mycoords)): _left = distributor.decomposition[idim][mycoord][0] _right = distributor.decomposition[idim][mycoord][-1] + 1 this_dim_r = self.r[dim] effective_gridpoints = scattered_gp[active_mrow, idim] if this_dim_r is None: this_dim_r = self.grid.dimension_map[dim].glb effective_gridpoints = np.zeros_like(effective_gridpoints) # rewrite the matrix to remove the rows in groups 0 and 4 mask = ( (effective_gridpoints >= _left - this_dim_r + 1) & (effective_gridpoints < _right)) which = np.nonzero(mask) active_mrow = active_mrow[which] active_mcol = active_mcol[which] active_mval = active_mval[which] # then, zero any of the coefficients which refer to points outside our # domain. Do this on all the gridpoints for now, since this is a hack # anyway for idim, (dim, mycoord) in enumerate(zip( self.grid.dimensions, distributor.mycoords)): _left = distributor.decomposition[idim][mycoord][0] _right = distributor.decomposition[idim][mycoord][-1] + 1 # points to the left have the first few coeffs zeroed this_dim_r = self.r[dim] effective_gridpoints = scattered_gp[:, idim] if this_dim_r is None: this_dim_r = self.grid.dimension_map[dim].glb effective_gridpoints = np.zeros_like(effective_gridpoints) trim_size = np.clip(_left - effective_gridpoints, 0, this_dim_r) for ir in range(this_dim_r): # which points need zeroing? mask = (trim_size > ir) scattered_coeffs[idim][mask, ir] = 0 # points to the right have the last few coeffs zeroed trim_size = np.clip( effective_gridpoints - (_right - this_dim_r), 0, this_dim_r) for ir in range(this_dim_r): # which points need zeroing? mask = (trim_size > ir) scattered_coeffs[idim][mask, -(ir+1)] = 0 # finally, we translate to local coordinates # no need for this in the broadcasted Dimensions if self.r[dim] is not None: scattered_gp[:, idim] -= _left self.scattered_data = scattered_data self.scatter_result = { self: scattered_data, self.gridpoints: scattered_gp, **{ self.interpolation_coefficients[d]: scattered_coeffs[idim] for idim, d in enumerate(self.grid.dimensions) }, self.mrow: active_mrow, self.mcol: active_mcol, self.mval: active_mval, **self._build_par_dim_to_nnz(scattered_gp, active_mrow), } def _dist_scatter(self, data=None): assert data is None if self.scatter_result is None: raise Exception("_dist_scatter called before manual_scatter called") return self.scatter_result # The implementation in AbstractSparseFunction now relies on us # having a .coordinates property, which we don't have. def _arg_apply(self, dataobj, *subfuncs, alias=None): key = alias if alias is not None else self if isinstance(key, AbstractSparseFunction): # Gather into `self.data` key._dist_gather(self._C_as_ndarray(dataobj)) elif self._distributor.nprocs > 1: raise NotImplementedError("Don't know how to gather data from an " "object of type `%s`" % type(key)) def manual_gather(self): # data, in this case, is set to whatever dist_scatter provided? # on rank 0, this is the original data array (hack...) distributor = self._distributor # If not using MPI, don't waste time if distributor.nprocs == 1: return # This relies on all ranks having a copy of all data. Which feels "bad". if distributor.myrank != 0: distributor.comm.Reduce( self.scattered_data, None, op=MPI.SUM, root=0 ) else: distributor.comm.Reduce( MPI.IN_PLACE, self.scattered_data, # Note: on rank 0 data === scattered_data. op=MPI.SUM, root=0 ) def _dist_gather(self, data): pass