diff --git a/examples/plot_distributed_array.py b/examples/plot_distributed_array.py index 693153e..b3b5ea2 100644 --- a/examples/plot_distributed_array.py +++ b/examples/plot_distributed_array.py @@ -128,24 +128,76 @@ # Create mask nsub = 2 -mask = np.repeat(np.arange(size // nsub), nsub) -if rank == 0: print(f"Mask: {mask}") +subsize = max(1, size // nsub) +mask = np.repeat(np.arange(size // subsize), subsize) +if rank == 0: + print("1D masked arrays") + print(f"Mask: {mask}") # Create and fill the distributed array x = pylops_mpi.DistributedArray(global_shape=global_shape, partition=Partition.SCATTER, mask=mask) -x[:] = (MPI.COMM_WORLD.Get_rank() + 1) * np.ones(local_shape) +x[:] = (MPI.COMM_WORLD.Get_rank() % subsize + 1.) * np.ones(local_shape) xloc = x.asarray() # Dot product dot = x.dot(x) -dotloc = np.dot(xloc[local_shape * nsub * (rank // nsub):local_shape * nsub * (rank // nsub + 1)], - xloc[local_shape * nsub * (rank // nsub):local_shape * nsub * (rank // nsub + 1)]) +dotloc = np.dot(xloc[local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)], + xloc[local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)]) print(f"Dot check (Rank {rank}): {np.allclose(dot, dotloc)}") # Norm norm = x.norm(ord=2) -normloc = np.linalg.norm(xloc[local_shape * nsub * (rank // nsub):local_shape * nsub * (rank // nsub + 1)], +normloc = np.linalg.norm(xloc[local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)], ord=2) print(f"Norm check (Rank {rank}): {np.allclose(norm, normloc)}") + +############################################################################### +# And with 2d-arrays distributed over axis=1 +extra_dim_shape = 2 +if rank == 0: + print("2D masked arrays (over axis=1)") + +# Create and fill the distributed array +x = pylops_mpi.DistributedArray(global_shape=(extra_dim_shape, global_shape), + partition=Partition.SCATTER, + axis=1, mask=mask) +x[:] = (MPI.COMM_WORLD.Get_rank() % subsize + 1.) * np.ones((extra_dim_shape, local_shape)) +xloc = x.asarray() + +# Dot product +dot = x.dot(x) +dotloc = np.dot(xloc[:, local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)].ravel(), + xloc[:, local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)].ravel()) +print(f"Dot check (Rank {rank}): {np.allclose(dot, dotloc)}") + +# Norm +norm = x.norm(ord=2, axis=1) +normloc = np.linalg.norm(xloc[:, local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)], + ord=2, axis=1) +print(f"Norm check (Rank {rank}): {np.allclose(norm, normloc)}") + +############################################################################### +# And finally with 2d-arrays distributed over axis=0 +if rank == 0: + print("2D masked arrays (over axis=0)") + +# Create and fill the distributed array +x = pylops_mpi.DistributedArray(global_shape=(global_shape, extra_dim_shape), + partition=Partition.SCATTER, + axis=0, mask=mask) +x[:] = (MPI.COMM_WORLD.Get_rank() % subsize + 1.) * np.ones((local_shape, extra_dim_shape)) +xloc = x.asarray() + +# Dot product +dot = x.dot(x) +dotloc = np.dot(xloc[local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)].ravel(), + xloc[local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)].ravel()) +print(f"Dot check (Rank {rank}): {np.allclose(dot, dotloc)}") + +# Norm +norm = x.norm(ord=2, axis=0) +normloc = np.linalg.norm(xloc[local_shape * subsize * (rank // subsize):local_shape * subsize * (rank // subsize + 1)], + ord=2, axis=0) +print(f"Norm check (Rank {rank}): {np.allclose(norm, normloc)}") diff --git a/pylops_mpi/DistributedArray.py b/pylops_mpi/DistributedArray.py index 7cbb74b..eaba4f9 100644 --- a/pylops_mpi/DistributedArray.py +++ b/pylops_mpi/DistributedArray.py @@ -319,7 +319,7 @@ def to_dist(cls, x: NDArray, operations on the distributed array such as dot product or norm. Returns - ---------- + ------- dist_array : :obj:`DistributedArray` Distributed Array of the Global Array """ diff --git a/tests/test_distributedarray.py b/tests/test_distributedarray.py index 0504127..ef6a526 100644 --- a/tests/test_distributedarray.py +++ b/tests/test_distributedarray.py @@ -45,28 +45,28 @@ par5j = {'x': np.random.normal(300, 300, (500, 501)) + 1.0j * np.random.normal(50, 50, (500, 501)), 'partition': Partition.SCATTER, 'axis': 1} -par6 = {'x': np.random.normal(100, 100, (500, 500)), +par6 = {'x': np.random.normal(100, 100, (600, 600)), 'partition': Partition.SCATTER, 'axis': 0} -par6b = {'x': np.random.normal(100, 100, (500, 500)), +par6b = {'x': np.random.normal(100, 100, (600, 600)), 'partition': Partition.BROADCAST, 'axis': 0} -par7 = {'x': np.random.normal(300, 300, (500, 500)), +par7 = {'x': np.random.normal(300, 300, (600, 600)), 'partition': Partition.SCATTER, 'axis': 0} -par7b = {'x': np.random.normal(300, 300, (500, 500)), +par7b = {'x': np.random.normal(300, 300, (600, 600)), 'partition': Partition.BROADCAST, 'axis': 0} -par8 = {'x': np.random.normal(100, 100, (1000,)), +par8 = {'x': np.random.normal(100, 100, (1200,)), 'partition': Partition.SCATTER, 'axis': 0} -par8b = {'x': np.random.normal(100, 100, (1000,)), +par8b = {'x': np.random.normal(100, 100, (1200,)), 'partition': Partition.BROADCAST, 'axis': 0} -par9 = {'x': np.random.normal(300, 300, (1000,)), +par9 = {'x': np.random.normal(300, 300, (1200,)), 'partition': Partition.SCATTER, 'axis': 0} -par9b = {'x': np.random.normal(300, 300, (1000,)), +par9b = {'x': np.random.normal(300, 300, (1200,)), 'partition': Partition.BROADCAST, 'axis': 0} @@ -192,3 +192,56 @@ def test_distributed_norm(par): assert_allclose(arr.norm(ord=np.inf, axis=par['axis']), np.linalg.norm(par['x'], ord=np.inf, axis=par['axis']), rtol=1e-14) assert_allclose(arr.norm(), np.linalg.norm(par['x'].flatten()), rtol=1e-13) + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.parametrize("par1, par2", [(par6, par7), (par6b, par7b), + (par8, par9), (par8b, par9b)]) +def test_distributed_maskeddot(par1, par2): + """Test Distributed Dot product with masked array""" + nsub = 3 # number of subcommunicators + subsize = max(1, MPI.COMM_WORLD.Get_size() // nsub) + mask = np.repeat(np.arange(nsub), subsize) + # Replicate x1 and x2 as required in masked arrays + x1, x2 = par1['x'], par2['x'] + if par1['axis'] != 0: + x1 = np.swapaxes(x1, par1['axis'], 0) + for isub in range(1, nsub): + x1[(x1.shape[0] // nsub) * isub:(x1.shape[0] // nsub) * (isub + 1)] = x1[:x1.shape[0] // nsub] + if par1['axis'] != 0: + x1 = np.swapaxes(x1, 0, par1['axis']) + if par2['axis'] != 0: + x2 = np.swapaxes(x2, par2['axis'], 0) + for isub in range(1, nsub): + x2[(x2.shape[0] // nsub) * isub:(x2.shape[0] // nsub) * (isub + 1)] = x2[:x2.shape[0] // nsub] + if par2['axis'] != 0: + x2 = np.swapaxes(x2, 0, par2['axis']) + + arr1 = DistributedArray.to_dist(x=x1, partition=par1['partition'], mask=mask, axis=par1['axis']) + arr2 = DistributedArray.to_dist(x=x2, partition=par2['partition'], mask=mask, axis=par2['axis']) + assert_allclose(arr1.dot(arr2), np.dot(x1.flatten(), x2.flatten()) / nsub, rtol=1e-14) + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.parametrize("par", [(par6), (par6b), (par7), (par7b), + (par8), (par8b), (par9), (par9b)]) +def test_distributed_maskednorm(par): + """Test Distributed numpy.linalg.norm method with masked array""" + nsub = 3 # number of subcommunicators + subsize = max(1, MPI.COMM_WORLD.Get_size() // nsub) + mask = np.repeat(np.arange(nsub), subsize) + # Replicate x as required in masked arrays + x = par['x'] + if par['axis'] != 0: + x = np.swapaxes(x, par['axis'], 0) + for isub in range(1, nsub): + x[(x.shape[0] // nsub) * isub:(x.shape[0] // nsub) * (isub + 1)] = x[:x.shape[0] // nsub] + if par['axis'] != 0: + x = np.swapaxes(x, 0, par['axis']) + arr = DistributedArray.to_dist(x=x, mask=mask, axis=par['axis']) + assert_allclose(arr.norm(ord=1, axis=par['axis']), + np.linalg.norm(par['x'], ord=1, axis=par['axis']) / nsub, rtol=1e-14) + assert_allclose(arr.norm(ord=np.inf, axis=par['axis']), + np.linalg.norm(par['x'], ord=np.inf, axis=par['axis']), rtol=1e-14) + assert_allclose(arr.norm(ord=2, axis=par['axis']), + np.linalg.norm(par['x'], ord=2, axis=par['axis']) / np.sqrt(nsub), rtol=1e-13)