diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6c2b4a22..bf7ae798 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -14,8 +14,9 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest] - python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] + python-version: ['3.9', '3.10', '3.11', '3.12'] mpi: ['mpich', 'openmpi', 'intelmpi'] + rank: ['2', '3', '4'] exclude: - os: macos-latest mpi: 'intelmpi' @@ -52,4 +53,4 @@ jobs: - name: Install pylops-mpi run: pip install . - name: Testing using pytest-mpi - run: mpiexec -n 2 pytest --with-mpi + run: mpiexec -n ${{ matrix.rank }} pytest --with-mpi diff --git a/examples/plot_distributed_array.py b/examples/plot_distributed_array.py index 6e1168a3..b3b5ea28 100644 --- a/examples/plot_distributed_array.py +++ b/examples/plot_distributed_array.py @@ -16,13 +16,18 @@ plt.close("all") np.random.seed(42) +# MPI parameters +size = MPI.COMM_WORLD.Get_size() # number of nodes +rank = MPI.COMM_WORLD.Get_rank() # rank of current node + + # Defining the global shape of the distributed array global_shape = (10, 5) ############################################################################### -# Let's start by defining the -# class with the input parameters ``global_shape``, -# ``partition``, and ``axis``. Here's an example implementation of the class with ``axis=0``. +# Let's start by defining the class with the input parameters ``global_shape``, +# ``partition``, and ``axis``. Here's an example implementation of the class +# with ``axis=0``. arr = pylops_mpi.DistributedArray(global_shape=global_shape, partition=pylops_mpi.Partition.SCATTER, axis=0) @@ -72,6 +77,9 @@ pylops_mpi.plot_local_arrays(arr2, "Distributed Array - 2", vmin=0, vmax=1) ############################################################################### +# Let's move now to consider various operations that one can perform on +# :py:class:`pylops_mpi.DistributedArray` objects. +# # **Scaling** - Each process operates on its local portion of # the array and scales the corresponding elements by a given scalar. scale_arr = .5 * arr1 @@ -101,3 +109,95 @@ # of the array and multiplies the corresponding elements together. mult_arr = arr1 * arr2 pylops_mpi.plot_local_arrays(mult_arr, "Multiplication", vmin=0, vmax=1) + +############################################################################### +# Finally, let's look at the case where parallelism could be applied over +# multiple axes - and more specifically one belonging to the model/data and one +# to the operator. This kind of "2D"-parallelism requires repeating parts of +# the model/data over groups of ranks. However, when global operations such as +# ``dot`` or ``norm`` are applied on a ``pylops_mpi.DistributedArray`` of +# this kind, we need to ensure that the repeated portions to do all contribute +# to the computation. This can be achieved via the ``mask`` input parameter: +# a list of size equal to the number of ranks, whose elements contain the index +# of the subgroup/subcommunicator (with partial arrays in different groups +# are identical to each other). + +# Defining the local and global shape of the distributed array +local_shape = 5 +global_shape = local_shape * size + +# Create mask +nsub = 2 +subsize = max(1, size // nsub) +mask = np.repeat(np.arange(size // subsize), subsize) +if rank == 0: + print("1D masked arrays") + print(f"Mask: {mask}") + +# Create and fill the distributed array +x = pylops_mpi.DistributedArray(global_shape=global_shape, + partition=Partition.SCATTER, + mask=mask) +x[:] = (MPI.COMM_WORLD.Get_rank() % subsize + 1.) * np.ones(local_shape) +xloc = x.asarray() + +# Dot product +dot = x.dot(x) +dotloc = np.dot(xloc[local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)], + xloc[local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)]) +print(f"Dot check (Rank {rank}): {np.allclose(dot, dotloc)}") + +# Norm +norm = x.norm(ord=2) +normloc = np.linalg.norm(xloc[local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)], + ord=2) +print(f"Norm check (Rank {rank}): {np.allclose(norm, normloc)}") + +############################################################################### +# And with 2d-arrays distributed over axis=1 +extra_dim_shape = 2 +if rank == 0: + print("2D masked arrays (over axis=1)") + +# Create and fill the distributed array +x = pylops_mpi.DistributedArray(global_shape=(extra_dim_shape, global_shape), + partition=Partition.SCATTER, + axis=1, mask=mask) +x[:] = (MPI.COMM_WORLD.Get_rank() % subsize + 1.) * np.ones((extra_dim_shape, local_shape)) +xloc = x.asarray() + +# Dot product +dot = x.dot(x) +dotloc = np.dot(xloc[:, local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)].ravel(), + xloc[:, local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)].ravel()) +print(f"Dot check (Rank {rank}): {np.allclose(dot, dotloc)}") + +# Norm +norm = x.norm(ord=2, axis=1) +normloc = np.linalg.norm(xloc[:, local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)], + ord=2, axis=1) +print(f"Norm check (Rank {rank}): {np.allclose(norm, normloc)}") + +############################################################################### +# And finally with 2d-arrays distributed over axis=0 +if rank == 0: + print("2D masked arrays (over axis=0)") + +# Create and fill the distributed array +x = pylops_mpi.DistributedArray(global_shape=(global_shape, extra_dim_shape), + partition=Partition.SCATTER, + axis=0, mask=mask) +x[:] = (MPI.COMM_WORLD.Get_rank() % subsize + 1.) * np.ones((local_shape, extra_dim_shape)) +xloc = x.asarray() + +# Dot product +dot = x.dot(x) +dotloc = np.dot(xloc[local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)].ravel(), + xloc[local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)].ravel()) +print(f"Dot check (Rank {rank}): {np.allclose(dot, dotloc)}") + +# Norm +norm = x.norm(ord=2, axis=0) +normloc = np.linalg.norm(xloc[local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)], + ord=2, axis=0) +print(f"Norm check (Rank {rank}): {np.allclose(norm, normloc)}") diff --git a/pylops_mpi/DistributedArray.py b/pylops_mpi/DistributedArray.py index 0b2e2c64..b858077f 100644 --- a/pylops_mpi/DistributedArray.py +++ b/pylops_mpi/DistributedArray.py @@ -14,10 +14,14 @@ class Partition(Enum): Distributing data among different processes. - - ``BROADCAST``: Distributes data to all processes. + - ``BROADCAST``: Distributes data to all processes + (ensuring that data is kept consistent across processes) + - ``UNSAFE_BROADCAST``: Distributes data to all processes + (without ensuring that data is kept consistent across processes) - ``SCATTER``: Distributes unique portions to each process. """ BROADCAST = "Broadcast" + UNSAFE_BROADCAST = "UnsafeBroadcast" SCATTER = "Scatter" @@ -41,7 +45,7 @@ def local_split(global_shape: Tuple, base_comm: MPI.Comm, local_shape : :obj:`tuple` Shape of the local array. """ - if partition == Partition.BROADCAST: + if partition in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST]: local_shape = global_shape # Split the array else: @@ -62,10 +66,13 @@ class DistributedArray: .. warning:: When setting the partition of the DistributedArray to :obj:`pylops_mpi.Partition.BROADCAST`, it is crucial to be aware that any attempts to make arrays different from rank to rank will be - overwritten by the actions of rank 0. This means that if you modify - the DistributedArray on a specific rank, and are using broadcast to - synchronize the arrays across all ranks, the modifications made by other - ranks will be discarded and overwritten with the value at rank 0. + overwritten by the actions of rank 0. This is accomplished internally + by broadcasting the content of rank 0 every time a modification of + the array is attempted. Such a behaviour does however incur a cost + as communication may be not needed if the user ensures not to modify + the content of the array in different ranks in a different way. To + avoid broadcasting, one can use :obj:`pylops_mpi.Partition.UNSAFE_BROADCAST` + instead. Parameters ---------- @@ -75,11 +82,14 @@ class DistributedArray: MPI Communicator over which array is distributed. Defaults to ``mpi4py.MPI.COMM_WORLD``. partition : :obj:`Partition`, optional - Broadcast or Scatter the array. Defaults to ``Partition.SCATTER``. + Broadcast, UnsafeBroadcast, or Scatter the array. Defaults to ``Partition.SCATTER``. axis : :obj:`int`, optional Axis along which distribution occurs. Defaults to ``0``. local_shapes : :obj:`list`, optional List of tuples or integers representing local shapes at each rank. + mask : :obj:`list`, optional + Mask defining subsets of ranks to consider when performing 'global' + operations on the distributed array such as dot product or norm. engine : :obj:`str`, optional Engine used to store array (``numpy`` or ``cupy``) dtype : :obj:`str`, optional @@ -90,6 +100,7 @@ def __init__(self, global_shape: Union[Tuple, Integral], base_comm: Optional[MPI.Comm] = MPI.COMM_WORLD, partition: Partition = Partition.SCATTER, axis: int = 0, local_shapes: Optional[List[Union[Tuple, Integral]]] = None, + mask: Optional[List[Integral]] = None, engine: Optional[str] = "numpy", dtype: Optional[DTypeLike] = np.float64): if isinstance(global_shape, Integral): @@ -98,13 +109,15 @@ def __init__(self, global_shape: Union[Tuple, Integral], raise IndexError(f"Axis {axis} out of range for DistributedArray " f"of shape {global_shape}") if partition not in Partition: - raise ValueError(f"Should be either {Partition.BROADCAST} " - f"or {Partition.SCATTER}") + raise ValueError(f"Should be either {Partition.BROADCAST}, " + f"{Partition.UNSAFE_BROADCAST} or {Partition.SCATTER}") self.dtype = dtype self._global_shape = _value_or_sized_to_tuple(global_shape) self._base_comm = base_comm self._partition = partition self._axis = axis + self._mask = mask + self._sub_comm = base_comm if mask is None else base_comm.Split(color=mask[base_comm.rank], key=base_comm.rank) local_shapes = local_shapes if local_shapes is None else [_value_or_sized_to_tuple(local_shape) for local_shape in local_shapes] self._check_local_shapes(local_shapes) @@ -122,6 +135,9 @@ def __setitem__(self, index, value): `Partition.SCATTER` - Local Arrays are assigned their unique values. + `Partition.UNSAFE_SCATTER` - Local Arrays are assigned their + unique values. + `Partition.BROADCAST` - The value at rank-0 is broadcasted and is assigned to all the ranks. @@ -168,6 +184,16 @@ def local_shape(self): """ return self._local_shape + @property + def mask(self): + """Mask of the Distributed array + + Returns + ------- + engine : :obj:`list` + """ + return self._mask + @property def engine(self): """Engine of the Distributed array @@ -249,6 +275,16 @@ def local_shapes(self): """ return self.base_comm.allgather(self.local_shape) + @property + def sub_comm(self): + """MPI Sub-Communicator + + Returns + ------- + sub_comm : :obj:`MPI.Comm` + """ + return self._sub_comm + def asarray(self): """Global view of the array @@ -260,7 +296,7 @@ def asarray(self): Global Array gathered at all ranks """ # Since the global array was replicated at all ranks - if self.partition == Partition.BROADCAST: + if self.partition in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST]: # Get only self.local_array. return self.local_array # Gather all the local arrays and apply concatenation. @@ -272,7 +308,8 @@ def to_dist(cls, x: NDArray, base_comm: MPI.Comm = MPI.COMM_WORLD, partition: Partition = Partition.SCATTER, axis: int = 0, - local_shapes: Optional[List[Tuple]] = None): + local_shapes: Optional[List[Tuple]] = None, + mask: Optional[List[Integral]] = None): """Convert A Global Array to a Distributed Array Parameters @@ -287,9 +324,12 @@ def to_dist(cls, x: NDArray, Axis of Distribution local_shapes : :obj:`list`, optional Local Shapes at each rank. + mask : :obj:`list`, optional + Mask defining subsets of ranks to consider when performing 'global' + operations on the distributed array such as dot product or norm. Returns - ---------- + ------- dist_array : :obj:`DistributedArray` Distributed Array of the Global Array """ @@ -298,9 +338,10 @@ def to_dist(cls, x: NDArray, partition=partition, axis=axis, local_shapes=local_shapes, + mask=mask, engine=get_module_name(get_array_module(x)), dtype=x.dtype) - if partition == Partition.BROADCAST: + if partition in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST]: dist_array[:] = x else: slices = [slice(None)] * x.ndim @@ -319,7 +360,7 @@ def _check_local_shapes(self, local_shapes): raise ValueError(f"Length of local shapes is not equal to number of processes; " f"{len(local_shapes)} != {self.size}") # Check if local shape == global shape - if self.partition is Partition.BROADCAST and local_shapes[self.rank] != self.global_shape: + if self.partition in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST] and local_shapes[self.rank] != self.global_shape: raise ValueError(f"Local shape is not equal to global shape at rank = {self.rank};" f"{local_shapes[self.rank]} != {self.global_shape}") elif self.partition is Partition.SCATTER: @@ -339,6 +380,12 @@ def _check_partition_shape(self, dist_array): raise ValueError(f"Local Array Shape Mismatch - " f"{self.local_shape} != {dist_array.local_shape}") + def _check_mask(self, dist_array): + """Check mask of the Array + """ + if not np.array_equal(self.mask, dist_array.mask): + raise ValueError("Mask of both the arrays must be same") + def _allreduce(self, send_buf, recv_buf=None, op: MPI.Op = MPI.SUM): """MPI Allreduce operation """ @@ -348,12 +395,22 @@ def _allreduce(self, send_buf, recv_buf=None, op: MPI.Op = MPI.SUM): self.base_comm.Allreduce(send_buf, recv_buf, op) return recv_buf + def _allreduce_subcomm(self, send_buf, recv_buf=None, op: MPI.Op = MPI.SUM): + """MPI Allreduce operation with subcommunicator + """ + if recv_buf is None: + return self.sub_comm.allreduce(send_buf, op) + # For MIN and MAX which require recv_buf + self.sub_comm.Allreduce(send_buf, recv_buf, op) + return recv_buf + def __neg__(self): arr = DistributedArray(global_shape=self.global_shape, base_comm=self.base_comm, partition=self.partition, axis=self.axis, local_shapes=self.local_shapes, + mask=self.mask, engine=self.engine, dtype=self.dtype) arr[:] = -self.local_array @@ -381,11 +438,13 @@ def add(self, dist_array): """Distributed Addition of arrays """ self._check_partition_shape(dist_array) + self._check_mask(dist_array) SumArray = DistributedArray(global_shape=self.global_shape, base_comm=self.base_comm, dtype=self.dtype, partition=self.partition, local_shapes=self.local_shapes, + mask=self.mask, engine=self.engine, axis=self.axis) SumArray[:] = self.local_array + dist_array.local_array @@ -395,6 +454,7 @@ def iadd(self, dist_array): """Distributed In-place Addition of arrays """ self._check_partition_shape(dist_array) + self._check_mask(dist_array) self[:] = self.local_array + dist_array.local_array return self @@ -403,12 +463,14 @@ def multiply(self, dist_array): """ if isinstance(dist_array, DistributedArray): self._check_partition_shape(dist_array) + self._check_mask(dist_array) ProductArray = DistributedArray(global_shape=self.global_shape, base_comm=self.base_comm, dtype=self.dtype, partition=self.partition, local_shapes=self.local_shapes, + mask=self.mask, engine=self.engine, axis=self.axis) if isinstance(dist_array, DistributedArray): @@ -423,13 +485,15 @@ def dot(self, dist_array): """Distributed Dot Product """ self._check_partition_shape(dist_array) + self._check_mask(dist_array) + # Convert to Partition.SCATTER if Partition.BROADCAST x = DistributedArray.to_dist(x=self.local_array) \ - if self.partition is Partition.BROADCAST else self + if self.partition in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST] else self y = DistributedArray.to_dist(x=dist_array.local_array) \ - if self.partition is Partition.BROADCAST else dist_array + if self.partition in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST] else dist_array # Flatten the local arrays and calculate dot product - return self._allreduce(np.dot(x.local_array.flatten(), y.local_array.flatten())) + return self._allreduce_subcomm(np.dot(x.local_array.flatten(), y.local_array.flatten())) def _compute_vector_norm(self, local_array: NDArray, axis: int, ord: Optional[int] = None): @@ -456,23 +520,36 @@ def _compute_vector_norm(self, local_array: NDArray, raise ValueError(f"norm-{ord} not possible for vectors") elif ord == 0: # Count non-zero then sum reduction - recv_buf = self._allreduce(np.count_nonzero(local_array, axis=axis).astype(np.float64)) + recv_buf = self._allreduce_subcomm(np.count_nonzero(local_array, axis=axis).astype(np.float64)) elif ord == np.inf: # Calculate max followed by max reduction - recv_buf = self._allreduce(np.max(np.abs(local_array), axis=axis).astype(np.float64), - recv_buf, op=MPI.MAX) + recv_buf = self._allreduce_subcomm(np.max(np.abs(local_array), axis=axis).astype(np.float64), + recv_buf, op=MPI.MAX) recv_buf = np.squeeze(recv_buf, axis=axis) elif ord == -np.inf: # Calculate min followed by min reduction - recv_buf = self._allreduce(np.min(np.abs(local_array), axis=axis).astype(np.float64), - recv_buf, op=MPI.MIN) + recv_buf = self._allreduce_subcomm(np.min(np.abs(local_array), axis=axis).astype(np.float64), + recv_buf, op=MPI.MIN) recv_buf = np.squeeze(recv_buf, axis=axis) else: - recv_buf = self._allreduce(np.sum(np.abs(np.float_power(local_array, ord)), axis=axis)) + recv_buf = self._allreduce_subcomm(np.sum(np.abs(np.float_power(local_array, ord)), axis=axis)) recv_buf = np.power(recv_buf, 1. / ord) return recv_buf + def zeros_like(self): + """Creates a copy of the DistributedArray filled with zeros + """ + arr = DistributedArray(global_shape=self.global_shape, + base_comm=self.base_comm, + partition=self.partition, + axis=self.axis, + local_shapes=self.local_shapes, + engine=self.engine, + dtype=self.dtype) + arr[:] = 0. + return arr + def norm(self, ord: Optional[int] = None, axis: int = -1): """Distributed numpy.linalg.norm method @@ -486,7 +563,7 @@ def norm(self, ord: Optional[int] = None, """ # Convert to Partition.SCATTER if Partition.BROADCAST x = DistributedArray.to_dist(x=self.local_array) \ - if self.partition is Partition.BROADCAST else self + if self.partition in [Partition.BROADCAST, Partition.UNSAFE_BROADCAST] else self if axis == -1: # Flatten the local arrays and calculate norm return x._compute_vector_norm(x.local_array.flatten(), axis=0, ord=ord) @@ -503,6 +580,7 @@ def conj(self): partition=self.partition, axis=self.axis, local_shapes=self.local_shapes, + mask=self.mask, engine=self.engine, dtype=self.dtype) conj[:] = self.local_array.conj() @@ -516,6 +594,7 @@ def copy(self): partition=self.partition, axis=self.axis, local_shapes=self.local_shapes, + mask=self.mask, engine=self.engine, dtype=self.dtype) arr[:] = self.local_array @@ -538,6 +617,7 @@ def ravel(self, order: Optional[str] = "C"): local_shapes = [(np.prod(local_shape, axis=-1), ) for local_shape in self.local_shapes] arr = DistributedArray(global_shape=np.prod(self.global_shape), local_shapes=local_shapes, + mask=self.mask, partition=self.partition, engine=self.engine, dtype=self.dtype) diff --git a/pylops_mpi/basicoperators/BlockDiag.py b/pylops_mpi/basicoperators/BlockDiag.py index 7911692f..28511105 100644 --- a/pylops_mpi/basicoperators/BlockDiag.py +++ b/pylops_mpi/basicoperators/BlockDiag.py @@ -1,7 +1,8 @@ import numpy as np from scipy.sparse.linalg._interface import _get_dtype from mpi4py import MPI -from typing import Optional, Sequence +from typing import Optional, Sequence, List +from numbers import Integral from pylops import LinearOperator from pylops.utils import DTypeLike @@ -28,6 +29,9 @@ class MPIBlockDiag(MPILinearOperator): One or more :class:`pylops.LinearOperator` to be stacked. base_comm : :obj:`mpi4py.MPI.Comm`, optional Base MPI Communicator. Defaults to ``mpi4py.MPI.COMM_WORLD``. + mask : :obj:`list`, optional + Mask defining subsets of ranks to consider when performing 'global' operations on + the distributed array such as dot product or norm. dtype : :obj:`str`, optional Type of elements in input array. @@ -95,8 +99,10 @@ class MPIBlockDiag(MPILinearOperator): def __init__(self, ops: Sequence[LinearOperator], base_comm: MPI.Comm = MPI.COMM_WORLD, + mask: Optional[List[Integral]] = None, dtype: Optional[DTypeLike] = None): self.ops = ops + self.mask = mask mops = np.zeros(len(self.ops), dtype=np.int64) nops = np.zeros(len(self.ops), dtype=np.int64) for iop, oper in enumerate(self.ops): @@ -116,7 +122,7 @@ def __init__(self, ops: Sequence[LinearOperator], def _matvec(self, x: DistributedArray) -> DistributedArray: ncp = get_module(x.engine) y = DistributedArray(global_shape=self.shape[0], local_shapes=self.local_shapes_n, - engine=x.engine, dtype=self.dtype) + mask=self.mask, engine=x.engine, dtype=self.dtype) y1 = [] for iop, oper in enumerate(self.ops): y1.append(oper.matvec(x.local_array[self.mmops[iop]: @@ -128,7 +134,7 @@ def _matvec(self, x: DistributedArray) -> DistributedArray: def _rmatvec(self, x: DistributedArray) -> DistributedArray: ncp = get_module(x.engine) y = DistributedArray(global_shape=self.shape[1], local_shapes=self.local_shapes_m, - engine=x.engine, dtype=self.dtype) + mask=self.mask, engine=x.engine, dtype=self.dtype) y1 = [] for iop, oper in enumerate(self.ops): y1.append(oper.rmatvec(x.local_array[self.nnops[iop]: diff --git a/pylops_mpi/waveeqprocessing/MDC.py b/pylops_mpi/waveeqprocessing/MDC.py index 1d6524d8..18980bfc 100644 --- a/pylops_mpi/waveeqprocessing/MDC.py +++ b/pylops_mpi/waveeqprocessing/MDC.py @@ -20,7 +20,7 @@ def _MDC(G, nt, nv, nfmax, dt=1., dr=1., twosided=True, Used to be able to provide operators from different libraries to MDC. It operates in the same way as public method - (PoststackLinearModelling) but has additional input parameters allowing + (MPIMDC) but has additional input parameters allowing passing a different operator and additional arguments to be passed to such operator. @@ -81,8 +81,10 @@ def MPIMDC(G, nt, nv, nfreq, dt=1., dr=1., twosided=True, base_comm: MPI.Comm = MPI.COMM_WORLD): r"""Multi-dimensional convolution. - Apply multi-dimensional convolution between two datasets. Model and data - should be provided after flattening 2- or 3-dimensional arrays of size + Apply multi-dimensional convolution between two datasets in a distributed + fashion, with ``G`` distributed over ranks across the frequency axis. + Model and data are broadcasted and should be provided after flattening + 2- or 3-dimensional arrays of size :math:`[n_t \times n_r (\times n_{vs})]` and :math:`[n_t \times n_s (\times n_{vs})]` (or :math:`2*n_t-1` for ``twosided=True``), respectively. @@ -91,7 +93,7 @@ def MPIMDC(G, nt, nv, nfreq, dt=1., dr=1., twosided=True, ---------- G : :obj:`numpy.ndarray` Multi-dimensional convolution kernel in frequency domain of size - :math:`[n_{fmax} \times n_s \times n_r]` + :math:`[n_{f,rank} \times n_s \times n_r]` nt : :obj:`int` Number of samples along time axis for model and data (note that this must be equal to ``2*n_t-1`` when working with ``twosided=True``. diff --git a/tests/test_distributedarray.py b/tests/test_distributedarray.py index 05041279..8ee47a85 100644 --- a/tests/test_distributedarray.py +++ b/tests/test_distributedarray.py @@ -45,28 +45,28 @@ par5j = {'x': np.random.normal(300, 300, (500, 501)) + 1.0j * np.random.normal(50, 50, (500, 501)), 'partition': Partition.SCATTER, 'axis': 1} -par6 = {'x': np.random.normal(100, 100, (500, 500)), +par6 = {'x': np.random.normal(100, 100, (600, 600)), 'partition': Partition.SCATTER, 'axis': 0} -par6b = {'x': np.random.normal(100, 100, (500, 500)), +par6b = {'x': np.random.normal(100, 100, (600, 600)), 'partition': Partition.BROADCAST, 'axis': 0} -par7 = {'x': np.random.normal(300, 300, (500, 500)), +par7 = {'x': np.random.normal(300, 300, (600, 600)), 'partition': Partition.SCATTER, 'axis': 0} -par7b = {'x': np.random.normal(300, 300, (500, 500)), +par7b = {'x': np.random.normal(300, 300, (600, 600)), 'partition': Partition.BROADCAST, 'axis': 0} -par8 = {'x': np.random.normal(100, 100, (1000,)), +par8 = {'x': np.random.normal(100, 100, (1200,)), 'partition': Partition.SCATTER, 'axis': 0} -par8b = {'x': np.random.normal(100, 100, (1000,)), +par8b = {'x': np.random.normal(100, 100, (1200,)), 'partition': Partition.BROADCAST, 'axis': 0} -par9 = {'x': np.random.normal(300, 300, (1000,)), +par9 = {'x': np.random.normal(300, 300, (1200,)), 'partition': Partition.SCATTER, 'axis': 0} -par9b = {'x': np.random.normal(300, 300, (1000,)), +par9b = {'x': np.random.normal(300, 300, (1200,)), 'partition': Partition.BROADCAST, 'axis': 0} @@ -192,3 +192,69 @@ def test_distributed_norm(par): assert_allclose(arr.norm(ord=np.inf, axis=par['axis']), np.linalg.norm(par['x'], ord=np.inf, axis=par['axis']), rtol=1e-14) assert_allclose(arr.norm(), np.linalg.norm(par['x'].flatten()), rtol=1e-13) + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.parametrize("par1, par2", [(par6, par7), (par6b, par7b), + (par8, par9), (par8b, par9b)]) +def test_distributed_maskeddot(par1, par2): + """Test Distributed Dot product with masked array""" + # number of subcommunicators + if MPI.COMM_WORLD.Get_size() % 2 == 0: + nsub = 2 + elif MPI.COMM_WORLD.Get_size() % 3 == 0: + nsub = 3 + else: + pass + subsize = max(1, MPI.COMM_WORLD.Get_size() // nsub) + mask = np.repeat(np.arange(nsub), subsize) + print('subsize, mask', subsize, mask) + # Replicate x1 and x2 as required in masked arrays + x1, x2 = par1['x'], par2['x'] + if par1['axis'] != 0: + x1 = np.swapaxes(x1, par1['axis'], 0) + for isub in range(1, nsub): + x1[(x1.shape[0] // nsub) * isub:(x1.shape[0] // nsub) * (isub + 1)] = x1[:x1.shape[0] // nsub] + if par1['axis'] != 0: + x1 = np.swapaxes(x1, 0, par1['axis']) + if par2['axis'] != 0: + x2 = np.swapaxes(x2, par2['axis'], 0) + for isub in range(1, nsub): + x2[(x2.shape[0] // nsub) * isub:(x2.shape[0] // nsub) * (isub + 1)] = x2[:x2.shape[0] // nsub] + if par2['axis'] != 0: + x2 = np.swapaxes(x2, 0, par2['axis']) + + arr1 = DistributedArray.to_dist(x=x1, partition=par1['partition'], mask=mask, axis=par1['axis']) + arr2 = DistributedArray.to_dist(x=x2, partition=par2['partition'], mask=mask, axis=par2['axis']) + assert_allclose(arr1.dot(arr2), np.dot(x1.flatten(), x2.flatten()) / nsub, rtol=1e-14) + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.parametrize("par", [(par6), (par6b), (par7), (par7b), + (par8), (par8b), (par9), (par9b)]) +def test_distributed_maskednorm(par): + """Test Distributed numpy.linalg.norm method with masked array""" + # number of subcommunicators + if MPI.COMM_WORLD.Get_size() % 2 == 0: + nsub = 2 + elif MPI.COMM_WORLD.Get_size() % 3 == 0: + nsub = 3 + else: + pass + subsize = max(1, MPI.COMM_WORLD.Get_size() // nsub) + mask = np.repeat(np.arange(nsub), subsize) + # Replicate x as required in masked arrays + x = par['x'] + if par['axis'] != 0: + x = np.swapaxes(x, par['axis'], 0) + for isub in range(1, nsub): + x[(x.shape[0] // nsub) * isub:(x.shape[0] // nsub) * (isub + 1)] = x[:x.shape[0] // nsub] + if par['axis'] != 0: + x = np.swapaxes(x, 0, par['axis']) + arr = DistributedArray.to_dist(x=x, mask=mask, axis=par['axis']) + assert_allclose(arr.norm(ord=1, axis=par['axis']), + np.linalg.norm(par['x'], ord=1, axis=par['axis']) / nsub, rtol=1e-14) + assert_allclose(arr.norm(ord=np.inf, axis=par['axis']), + np.linalg.norm(par['x'], ord=np.inf, axis=par['axis']), rtol=1e-14) + assert_allclose(arr.norm(ord=2, axis=par['axis']), + np.linalg.norm(par['x'], ord=2, axis=par['axis']) / np.sqrt(nsub), rtol=1e-13) diff --git a/tests/test_fredholm.py b/tests/test_fredholm.py index 1baeb57c..b3e34f73 100644 --- a/tests/test_fredholm.py +++ b/tests/test_fredholm.py @@ -20,7 +20,7 @@ size = MPI.COMM_WORLD.Get_size() par1 = { - "nsl": 6, + "nsl": 12, "ny": 6, "nx": 4, "nz": 5, @@ -30,7 +30,7 @@ "dtype": "float32", } # real, saved Gt par2 = { - "nsl": 6, + "nsl": 12, "ny": 6, "nx": 4, "nz": 5, @@ -40,7 +40,7 @@ "dtype": "float32", } # real, unsaved Gt par3 = { - "nsl": 6, + "nsl": 12, "ny": 6, "nx": 4, "nz": 5, @@ -50,7 +50,7 @@ "dtype": "complex64", } # complex, saved Gt par4 = { - "nsl": 6, + "nsl": 12, "ny": 6, "nx": 4, "nz": 5, @@ -60,7 +60,7 @@ "dtype": "complex64", } # complex, unsaved Gt par5 = { - "nsl": 6, + "nsl": 12, "ny": 6, "nx": 4, "nz": 1, @@ -70,7 +70,7 @@ "dtype": "float32", } # real, saved Gt, nz=1 par6 = { - "nsl": 6, + "nsl": 12, "ny": 6, "nx": 4, "nz": 1, diff --git a/tests/test_linearop.py b/tests/test_linearop.py index 411679f5..4d16f3d9 100644 --- a/tests/test_linearop.py +++ b/tests/test_linearop.py @@ -87,8 +87,8 @@ def test_transpose(par): range(size)] BDiag = pylops.BlockDiag(ops=ops) Top = BDiag.T - assert_allclose(Top_x_np, Top @ x_global, rtol=1e-14) - assert_allclose(Top_y_np, Top.H @ y_global, rtol=1e-14) + assert_allclose(Top_x_np, Top @ x_global, rtol=1e-9) + assert_allclose(Top_y_np, Top.H @ y_global, rtol=1e-9) @pytest.mark.mpi(min_size=2) @@ -121,8 +121,8 @@ def test_scaled(par): range(size)] BDiag = pylops.BlockDiag(ops=ops) Sop = BDiag * -4 - assert_allclose(Sop_x_np, Sop @ x_global, rtol=1e-14) - assert_allclose(Sop_y_np, Sop.H @ y_global, rtol=1e-14) + assert_allclose(Sop_x_np, Sop @ x_global, rtol=1e-9) + assert_allclose(Sop_y_np, Sop.H @ y_global, rtol=1e-9) @pytest.mark.mpi(min_size=2) @@ -155,8 +155,8 @@ def test_power(par): range(size)] BDiag = pylops.BlockDiag(ops=ops) Pop = BDiag ** 3 - assert_allclose(Pop_x_np, Pop @ x_global, rtol=1e-14) - assert_allclose(Pop_y_np, Pop.H @ y_global, rtol=1e-14) + assert_allclose(Pop_x_np, Pop @ x_global, rtol=1e-9) + assert_allclose(Pop_y_np, Pop.H @ y_global, rtol=1e-9) @pytest.mark.mpi(min_size=2) @@ -195,8 +195,8 @@ def test_sum(par): range(size)] BDiag2 = pylops.BlockDiag(ops=ops2) Sop = BDiag + BDiag2 - assert_allclose(Sop_x_np, Sop @ x_global, rtol=1e-14) - assert_allclose(Sop_y_np, Sop.H @ y_global, rtol=1e-14) + assert_allclose(Sop_x_np, Sop @ x_global, rtol=1e-9) + assert_allclose(Sop_y_np, Sop.H @ y_global, rtol=1e-9) @pytest.mark.mpi(min_size=2) @@ -235,8 +235,8 @@ def test_product(par): range(size)] BDiag2 = pylops.BlockDiag(ops=ops2) Pop = BDiag * BDiag2 - assert_allclose(Pop_x_np, Pop @ x_global, rtol=1e-14) - assert_allclose(Pop_y_np, Pop.H @ y_global, rtol=1e-14) + assert_allclose(Pop_x_np, Pop @ x_global, rtol=1e-9) + assert_allclose(Pop_y_np, Pop.H @ y_global, rtol=1e-9) @pytest.mark.mpi(min_size=2) @@ -269,8 +269,8 @@ def test_conj(par): range(size)] BDiag = pylops.BlockDiag(ops=ops) Cop = BDiag.conj() - assert_allclose(Cop_x_np, Cop @ x_global, rtol=1e-14) - assert_allclose(Cop_y_np, Cop.H @ y_global, rtol=1e-14) + assert_allclose(Cop_x_np, Cop @ x_global, rtol=1e-9) + assert_allclose(Cop_y_np, Cop.H @ y_global, rtol=1e-9) @pytest.mark.mpi(min_size=2) @@ -299,8 +299,8 @@ def test_mpilinop(par): y_adj = y_adj_dist.asarray() if rank == 0: - assert_allclose(y, Fop @ x_global, rtol=1e-14) - assert_allclose(y_adj, Fop.H @ x_adj_global, rtol=1e-14) + assert_allclose(y, Fop @ x_global, rtol=1e-9) + assert_allclose(y_adj, Fop.H @ x_adj_global, rtol=1e-9) @pytest.mark.mpi(min_size=2) @@ -329,7 +329,7 @@ def test_fwd_mpilinop(par): VStack = pylops.VStack(ops=[(i + 1) * Sop for i in range(size)]) FullOp = VStack @ Fop y_np = FullOp @ x_global - assert_allclose(y, y_np.flatten(), rtol=1e-14) + assert_allclose(y, y_np.flatten(), rtol=1e-9) @pytest.mark.mpi(min_size=2) @@ -358,4 +358,4 @@ def test_adj_mpilinop(par): VStack = pylops.VStack(ops=[(i + 1) * Sop for i in range(size)]) FullOp = VStack @ Fop y_np = FullOp.H @ x_global - assert_allclose(y, y_np.flatten(), rtol=1e-14) + assert_allclose(y, y_np.flatten(), rtol=1e-9)