Skip to content

Commit

Permalink
always compute inertia on DPUs
Browse files Browse the repository at this point in the history
profiling shows the walltime is negligible. Avoids having to do an extra E-step at the end of the algorithm.
  • Loading branch information
SylvanBrocard committed Dec 6, 2024
1 parent 028c235 commit 10d3f5d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 84 deletions.
5 changes: 2 additions & 3 deletions src/dpu_kmeans/_kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ def _kmeans_single_lloyd_dpu(
Number of iterations run.
"""
compute_inertia = _dimm.ctr.compute_inertia
scale_factor = _dimm.ld.scale_factor
n_clusters = centers_init.shape[0]
dtype = _dimm.ld.dtype
Expand Down Expand Up @@ -304,7 +303,7 @@ def _kmeans_single_lloyd_dpu(
)

if verbose:
inertia = compute_inertia(centers_int) / scale_factor**2
inertia = _dimm.ctr.inertia / scale_factor**2
print(f"Iteration {i}, inertia {inertia}.")

centers_int, centers_new_int = centers_new_int, centers_int
Expand All @@ -322,7 +321,7 @@ def _kmeans_single_lloyd_dpu(
centers[:] = _dimm.ld.inverse_transform(centers_int)

tic = time.perf_counter()
inertia = compute_inertia(centers_int) / scale_factor**2
inertia = _dimm.ctr.inertia / scale_factor**2
toc = time.perf_counter()
inertia_timer = toc - tic

Expand Down
75 changes: 33 additions & 42 deletions src/dpu_program/kmeans_dpu_kernel.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ uint16_t ncluster_features;
__host struct task_parameters p_h;
__host int nclusters_host;
__host int npoints;
__host int compute_inertia = 0;
// __host unsigned int membership_size_in_bytes;
/**@}*/

Expand Down Expand Up @@ -189,15 +188,11 @@ void initialize(uint8_t tasklet_id) {
// c_clusters, ncluster_features * sizeof(int32_t));

// reinitializing center counters and sums
if (!compute_inertia) {
memset(centers_count, 0, sizeof(*centers_count) * nclusters);
memset(centers_sum, 0, sizeof(*centers_sum) * ncluster_features);
}
memset(centers_count, 0, sizeof(*centers_count) * nclusters);
memset(centers_sum, 0, sizeof(*centers_sum) * ncluster_features);

// reinitializing inertia table
else {
memset(inertia_tasklets, 0, sizeof(*inertia_tasklets) * NR_TASKLETS);
}
memset(inertia_tasklets, 0, sizeof(*inertia_tasklets) * NR_TASKLETS);
}

barrier_wait(&sync_barrier);
Expand Down Expand Up @@ -303,37 +298,35 @@ void final_reduce(uint8_t tasklet_id) {

barrier_wait(&sync_barrier);

if (!compute_inertia) {
unsigned sums_size = ncluster_features * sizeof(*centers_sum);
unsigned sums_size = ncluster_features * sizeof(*centers_sum);
// rounding up to multiple of 8
sums_size = (sums_size + DMA_ALIGN - 1) & -DMA_ALIGN;

const unsigned my_sum_offset = tasklet_id * MAX_MRAM_TRANSFER_SIZE;
const unsigned my_sum_transfer_size =
my_sum_offset + MAX_MRAM_TRANSFER_SIZE <= sums_size
? MAX_MRAM_TRANSFER_SIZE
: my_sum_offset <= sums_size ? sums_size - my_sum_offset
: 0;
if (my_sum_transfer_size > 0) {
mram_write((char *)centers_sum + my_sum_offset,
(__mram_ptr char *)centers_sum_mram + my_sum_offset,
my_sum_transfer_size);
}

if (mutex_trylock(write_mutex)) {
// writing the partial sums and counts to MRAM
unsigned counts_size = nclusters * sizeof(*centers_count);
// rounding up to multiple of 8
sums_size = (sums_size + DMA_ALIGN - 1) & -DMA_ALIGN;

const unsigned my_sum_offset = tasklet_id * MAX_MRAM_TRANSFER_SIZE;
const unsigned my_sum_transfer_size =
my_sum_offset + MAX_MRAM_TRANSFER_SIZE <= sums_size
? MAX_MRAM_TRANSFER_SIZE
: my_sum_offset <= sums_size ? sums_size - my_sum_offset
: 0;
if (my_sum_transfer_size > 0) {
mram_write((char *)centers_sum + my_sum_offset,
(__mram_ptr char *)centers_sum_mram + my_sum_offset,
my_sum_transfer_size);
}
counts_size = (counts_size + DMA_ALIGN - 1) & -DMA_ALIGN;
mram_write(centers_count, centers_count_mram, counts_size);
}

if (mutex_trylock(write_mutex)) {
// writing the partial sums and counts to MRAM
unsigned counts_size = nclusters * sizeof(*centers_count);
// rounding up to multiple of 8
counts_size = (counts_size + DMA_ALIGN - 1) & -DMA_ALIGN;
mram_write(centers_count, centers_count_mram, counts_size);
}
} else {
if (tasklet_id == 0) {
// summing inertia
inertia = 0;
for (int i_tasklet = 0; i_tasklet < NR_TASKLETS; i_tasklet++) {
inertia += inertia_tasklets[i_tasklet];
}
if (mutex_trylock(write_count_mutex)) {
// summing inertia
inertia = 0;
for (int i_tasklet = 0; i_tasklet < NR_TASKLETS; i_tasklet++) {
inertia += inertia_tasklets[i_tasklet];
}
}
}
Expand Down Expand Up @@ -443,13 +436,11 @@ int main() {
#endif

#ifndef PERF_COUNTER
if (!compute_inertia) {
task_reduce(index, point_base_index, w_features);
} else {
inertia_tasklets[tasklet_id] += min_dist;
}
task_reduce(index, point_base_index, w_features);
inertia_tasklets[tasklet_id] += min_dist;
#else
task_reduce(index, point_base_index, w_features, tasklet_counters);
inertia_tasklets[tasklet_id] += min_dist;
#endif
}

Expand Down
40 changes: 6 additions & 34 deletions src/host_program/lloyd_iter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,31 +141,12 @@ void Container::lloyd_iter(const py::array_t<int_feature> &old_centers,
* has been moved to the python code */
}

auto Container::compute_inertia(const py::array_t<int_feature> &old_centers)
-> int64_t {
auto Container::get_inertia() -> int64_t {
if (!allset_) {
throw std::runtime_error("No DPUs allocated");
}
int compute_inertia = 1;
DPU_CHECK(dpu_broadcast_to(
allset_.value(), "c_clusters", 0, old_centers.data(),
static_cast<size_t>(old_centers.nbytes()), DPU_XFER_DEFAULT),
throw std::runtime_error("Failed to broadcast old centers"));

DPU_CHECK(dpu_broadcast_to(allset_.value(), "compute_inertia", 0,
&compute_inertia, sizeof(int), DPU_XFER_DEFAULT),
throw std::runtime_error("Failed to broadcast compute inertia"));

auto tic = std::chrono::steady_clock::now();
//============RUNNING ONE LLOYD ITERATION ON THE DPU==============
DPU_CHECK(dpu_launch(allset_.value(), DPU_SYNCHRONOUS),
throw std::runtime_error("Failed to launch DPUs"));
//================================================================
auto toc = std::chrono::steady_clock::now();
p_.time_seconds += std::chrono::duration<double>{toc - tic}.count();

tic = std::chrono::steady_clock::now();
/* copy back inertia (device to host) */
/* Copy back inertia (device to host) */
dpu_set_t dpu{};
uint32_t each_dpu = 0;
DPU_FOREACH(allset_.value(), dpu, each_dpu) {
Expand All @@ -176,18 +157,9 @@ auto Container::compute_inertia(const py::array_t<int_feature> &old_centers)
sizeof(inertia_per_dpu_[0]), DPU_XFER_DEFAULT),
throw std::runtime_error("Failed to push transfer"));

/* sum partial inertia */
int64_t inertia =
std::accumulate(inertia_per_dpu_.cbegin(), inertia_per_dpu_.cend(), 0LL);

compute_inertia = 0;
DPU_CHECK(dpu_broadcast_to(allset_.value(), "compute_inertia", 0,
&compute_inertia, sizeof(int), DPU_XFER_DEFAULT),
throw std::runtime_error("Failed to broadcast compute inertia"));

toc = std::chrono::steady_clock::now();

p_.pim_cpu_time = std::chrono::duration<double>{toc - tic}.count();
auto toc = std::chrono::steady_clock::now();
p_.cpu_pim_time += std::chrono::duration<double>{toc - tic}.count();

return inertia;
return std::accumulate(inertia_per_dpu_.cbegin(), inertia_per_dpu_.cend(),
0LL);
}
6 changes: 2 additions & 4 deletions src/kmeans.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,9 @@ class Container {
py::array_t<int> &centers_pcount);

/**
* @brief Runs one E step of the K-Means algorithm and gets inertia.
* @brief Get the inertia computed in the E step of the previous iteration.
*
* @param old_centers [in] Discretized coordinates of the current
* centroids.
* @return int64_t The inertia.
*/
auto compute_inertia(const py::array_t<int_feature> &old_centers) -> int64_t;
auto get_inertia() -> int64_t;
};
2 changes: 1 addition & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ PYBIND11_MODULE(_core, m) {
.def_property_readonly("hash", &Container::hash)
.def_property_readonly("binary_path", &Container::binary_path)
.def_property_readonly("data_size", &Container::data_size)
.def_property_readonly("inertia", &Container::get_inertia)
.def("load_kernel", &Container::load_kernel)
.def("load_array_data", &Container::load_array_data)
.def("load_n_clusters", &Container::load_nclusters)
.def("free_dpus", &Container::free_dpus)
.def("lloyd_iter", &Container::lloyd_iter)
.def("compute_inertia", &Container::compute_inertia)
.def("reset_timer", &Container::reset_timer)
.def_property_readonly("dpu_run_time", &Container::get_dpu_run_time)
.def_property_readonly("cpu_pim_time", &Container::get_cpu_pim_time)
Expand Down

0 comments on commit 10d3f5d

Please sign in to comment.