Skip to content

Commit

Permalink
Fix reallocation #50
Browse files Browse the repository at this point in the history
Fix reallocation bug introduced by moving the aggregation of center sums to the Python code.
  • Loading branch information
SylvanBrocard authored Dec 6, 2024
2 parents a23683b + 3645a04 commit cfd70e2
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 35 deletions.
2 changes: 2 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#define ASSUMED_NR_FEATURES 128 /**< Maximum number of features */
#define WRAM_FEATURES_SIZE \
512 /**< max size of the WRAM array holding points features in bytes */
#define DMA_ALIGN 8U /**< DMA alignment */
#define MAX_MRAM_TRANSFER_SIZE 2048 /**< Maximum size of a MRAM transfer */
/**@}*/

// Performance tracking
Expand Down
4 changes: 2 additions & 2 deletions src/dpu_kmeans/_dimm.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def transform(self, X):
Returns
-------
Xt : ndarray, dtype={np.float32, np.float64}
Xt : ndarray, dtype={np.int8, np.int16, np.int32}
Quantized data.
"""
Expand Down Expand Up @@ -108,7 +108,7 @@ def inverse_transform(self, Xt):
check_is_fitted(self)

# adding 0.5 to compensate for rounding previously
return ((Xt + 0.5) / self.scale_factor).astype(self.input_dtype)
return ((Xt + np.sign(Xt) * 0.5) / self.scale_factor).astype(self.input_dtype)


ld = LinearDiscretizer() # linear discretization transformer
Expand Down
13 changes: 10 additions & 3 deletions src/dpu_kmeans/_kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def _lloyd_iter_dpu(
sample_weight,
x_squared_norms,
n_threads,
verbose,
):
"""Single iteration of K-means lloyd algorithm with dense input on DPU.
Expand Down Expand Up @@ -96,11 +97,17 @@ def _lloyd_iter_dpu(
n_clusters = points_in_clusters.shape[0]
points_in_clusters[:] = points_in_clusters_per_dpu.sum(axis=0)[:n_clusters]

centers_sum_int = centers_sum_int_per_dpu.sum(axis=0)

reallocate_timer = 0
if any(points_in_clusters == 0):
# If any cluster has no points, we need to set the centers to the
# furthest points in the cluster from the previous iteration.
print("Warning: some clusters have no points, relocating empty clusters")
if verbose:
print(
"Warning: some clusters have no points, relocating empty clusters.",
"Relocation is not implemented on DPU and will be done on CPU.",
)
tic = time.perf_counter()

centers_old = _dimm.ld.inverse_transform(centers_old_int)
Expand All @@ -121,7 +128,7 @@ def _lloyd_iter_dpu(
sample_weight,
x_squared_norms,
centers_old,
centers_old,
centers_sum_new,
weight_in_clusters,
labels,
center_shift,
Expand All @@ -146,7 +153,6 @@ def _lloyd_iter_dpu(
toc = time.perf_counter()
reallocate_timer = toc - tic

centers_sum_int = centers_sum_int_per_dpu.sum(axis=0)
np.floor_divide(
centers_sum_int,
points_in_clusters[:, None],
Expand Down Expand Up @@ -272,6 +278,7 @@ def _kmeans_single_lloyd_dpu(
sample_weight,
x_squared_norms,
n_threads,
verbose,
)

# if verbose:
Expand Down
35 changes: 14 additions & 21 deletions src/dpu_program/kmeans_dpu_kernel.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@

#include "common.h"

/*================== DEFINES ============================*/
#define MAX_MRAM_TRANSFER_SIZE 2048 /**< Maximum size of a MRAM transfer */
#define MAX_MRAM_INT64_TRANSFER \
256 /**< Maximum size of a MRAM transfer for int64_t */

/*================== VARIABLES ==========================*/
/*------------------ LOCAL ------------------------------*/
/** @name Globals
Expand Down Expand Up @@ -309,29 +304,27 @@ void final_reduce(uint8_t tasklet_id) {
barrier_wait(&sync_barrier);

if (!compute_inertia) {
unsigned mram_transfer_size = ncluster_features * sizeof(*centers_sum);
unsigned sums_size = ncluster_features * sizeof(*centers_sum);
// rounding up to multiple of 8
mram_transfer_size = (mram_transfer_size + 7) & (unsigned)-8;
// TODO: fix this, this can be over 2048
const unsigned my_mram_offset_int64 = tasklet_id * MAX_MRAM_INT64_TRANSFER;
const unsigned my_mram_offset = my_mram_offset_int64 * sizeof(int64_t);
const unsigned my_mram_transfer_size =
my_mram_offset + MAX_MRAM_TRANSFER_SIZE <= mram_transfer_size
sums_size = (sums_size + DMA_ALIGN - 1) & -DMA_ALIGN;

const unsigned my_sum_offset = tasklet_id * MAX_MRAM_TRANSFER_SIZE;
const unsigned my_sum_transfer_size =
my_sum_offset + MAX_MRAM_TRANSFER_SIZE <= sums_size
? MAX_MRAM_TRANSFER_SIZE
: my_mram_offset <= mram_transfer_size
? mram_transfer_size - my_mram_offset
: 0;
if (my_mram_transfer_size > 0) {
mram_write(centers_sum + my_mram_offset_int64,
centers_sum_mram + my_mram_offset_int64,
my_mram_transfer_size);
: my_sum_offset <= sums_size ? sums_size - my_sum_offset
: 0;
if (my_sum_transfer_size > 0) {
mram_write((char *)centers_sum + my_sum_offset,
(__mram_ptr char *)centers_sum_mram + my_sum_offset,
my_sum_transfer_size);
}

if (mutex_trylock(write_mutex)) {
// writing the partial sums and counts to MRAM
uint16_t counts_size = nclusters * sizeof(*centers_count);
unsigned counts_size = nclusters * sizeof(*centers_count);
// rounding up to multiple of 8
counts_size = (counts_size + 7) & -8;
counts_size = (counts_size + DMA_ALIGN - 1) & -DMA_ALIGN;
mram_write(centers_count, centers_count_mram, counts_size);
}
} else {
Expand Down
62 changes: 53 additions & 9 deletions tests/test_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""

import numpy as np
from sklearn.cluster import KMeans, kmeans_plusplus
from sklearn.cluster import KMeans
from sklearn.datasets import make_blobs
from sklearn.metrics import adjusted_rand_score

Expand All @@ -17,9 +17,13 @@
def test_clustering_dpu_then_cpu():
"""Make clustering on DPUs and then on CPU, and compare the results."""
n_clusters = 15
n_features = 8
n_points = int(1e4)

# Generating data
data = make_blobs(int(1e4), 8, centers=n_clusters, random_state=42)[0].astype(
data = make_blobs(n_points, n_features, centers=n_clusters, random_state=42)[
0
].astype(
np.float32,
)

Expand All @@ -44,9 +48,13 @@ def test_clustering_dpu_then_cpu():
def test_clustering_cpu_then_dpu():
"""Make clustering on CPUs and then on DPU, and compare the results."""
n_clusters = 15
n_features = 8
n_points = int(1e4)

# Generating data
data = make_blobs(int(1e4), 8, centers=n_clusters, random_state=42)[0].astype(
data = make_blobs(n_points, n_features, centers=n_clusters, random_state=42)[
0
].astype(
np.float32,
)

Expand All @@ -71,9 +79,13 @@ def test_clustering_cpu_then_dpu():
def test_clustering_dpu_then_dpu():
"""Make clustering on DPU twice, and compare the results."""
n_clusters = 15
n_features = 8
n_points = int(1e4)

# Generating data
data = make_blobs(int(1e4), 8, centers=n_clusters, random_state=42)[0].astype(
data = make_blobs(n_points, n_features, centers=n_clusters, random_state=42)[
0
].astype(
np.float32,
)
data_copy = data.copy()
Expand Down Expand Up @@ -103,16 +115,47 @@ def test_large_dimensionality():
"""Test the clustering with a large features * clusters product."""
n_clusters = 24
n_features = 128
n_points = int(1e4)

# Generating data
data = make_blobs(n_points, n_features, centers=n_clusters, random_state=42)[
0
].astype(np.float32)

rng = np.random.default_rng(42)
init = rng.choice(data, n_clusters, replace=False)

# Clustering with DPUs
dpu_kmeans = DPUKMeans(n_clusters, init=init, n_init=1, verbose=False, n_dpu=4)
dpu_kmeans.fit(data)

# Clustering with CPU
kmeans = KMeans(n_clusters, init=init, n_init=1, algorithm="full")
kmeans.fit(data)

# Comparison
rand_score = adjusted_rand_score(dpu_kmeans.labels_, kmeans.labels_)

assert rand_score > 1 - 1e-2
assert kmeans.n_iter_ * 2 / 3 < dpu_kmeans.n_iter_ < kmeans.n_iter_ * 1.5


def test_relocation():
"""Test the clustering with a bad initialization.
The initialization is such that the clusters are not well separated.
This will cause at least one relocation.
"""
n_clusters = 15
n_features = 8
n_points = int(1e4)

# Generating data
data = make_blobs(int(1e4), n_features, centers=n_clusters, random_state=42)[
data = make_blobs(n_points, n_features, centers=n_clusters, random_state=42)[
0
].astype(np.float32)

# use KMeans++ initialization here for the sake of having a shorter test
# otherwise we get a lot of clusters relocations with a low number of points
# in high dimensionality
init = kmeans_plusplus(data, n_clusters, random_state=42)[0]
init = np.tile(data[0], (n_clusters, 1))

# Clustering with DPUs
dpu_kmeans = DPUKMeans(n_clusters, init=init, n_init=1, verbose=False, n_dpu=4)
Expand All @@ -134,3 +177,4 @@ def test_large_dimensionality():
test_clustering_cpu_then_dpu()
test_clustering_dpu_then_dpu()
test_large_dimensionality()
test_relocation()

0 comments on commit cfd70e2

Please sign in to comment.