Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solve oddness bug #52

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ sdist.exclude = ["benchmarks", ".dvc", ".dvcignore", ".github", ".devcontainer"]
sdist.include = ["src/dpu_kmeans/dpu_program", "src/dpu_kmeans/_core.*.so"]

[tool.scikit-build.cmake]
build-type = "Release"
build-type = "Debug"

[tool.scikit-build.cmake.define]
NR_TASKLETS = "16"
Expand Down
11 changes: 11 additions & 0 deletions src/dpu_program/kmeans_dpu_kernel.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <mram.h>
#include <mutex.h>
#include <perfcounter.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>

Expand Down Expand Up @@ -392,6 +393,16 @@ int main() {
tasklet_counters[MAIN_TIC] = perfcounter_get();
#endif

// // rounding up to multiple of 8
// const unsigned read_size = (p_h.task_size_in_bytes + DMA_ALIGN - 1) &
// -DMA_ALIGN; if((uintptr_t)&t_features[current_itask_in_features] %
// DMA_ALIGN != 0) {
// halt(1);
// }
const int align_offset = (int)((unsigned)current_itask_in_features &
((DMA_ALIGN / sizeof(int_feature)) - 1));
__mram_ptr int_feature *mram_ptr =
&t_features[current_itask_in_features - align_offset];
mram_read(&t_features[current_itask_in_features], w_features,
p_h.task_size_in_bytes);

Expand Down
56 changes: 52 additions & 4 deletions src/host_program/lloyd_iter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
#include <pybind11/numpy.h>
#include <pybind11/pybind11.h>

#include <algorithm>
#include <chrono>
#include <cstddef>
#include <numeric>
#include <utility>

#include "kmeans.hpp"

Expand All @@ -24,6 +26,51 @@ extern "C" {
#include "common.h"
}

namespace {
/**
* @brief Global array to temporarily store the old centers with padding.
* Used when the cluster centers buffer isn't 4 aligned.
*/
std::array<int_feature, ASSUMED_NR_CLUSTERS * ASSUMED_NR_FEATURES>
old_centers_padded;

/**
* @brief Rounds a number to the next multiple of 4.
*
* @param x Number to round.
* @return size_t Rounded number.
*/
constexpr auto round_to_4(size_t x) -> size_t { return (x + 3U) & ~3U; }

/**
* @brief Aligns the cluster centers array length to 4 bytes if needed.
* If the incoming array isn't 4 aligned, it will be copied to a global array.
*
* @param old_centers Cluster centers array.
* @return constexpr auto Pair of the source pointer and the transfer size.
*/
inline auto align_c_clusters(const py::array_t<int_feature> &old_centers) {
auto xfer_size = static_cast<size_t>(old_centers.nbytes());
const auto *src_ptr = old_centers.data();

if (reinterpret_cast<uintptr_t>(src_ptr) % 4 != 0) {
throw std::runtime_error("old_centers array is not 4 aligned");
}

/* If the length of the clusters array isn't 4 aligned, we need
* to copy it to a new array that will accept padding */
constexpr int kWramAligment = 4;
if (xfer_size % kWramAligment != 0) {
std::copy_n(old_centers.data(), old_centers.size(),
old_centers_padded.begin());
xfer_size = round_to_4(xfer_size);
src_ptr = old_centers_padded.data();
}

return std::pair{src_ptr, xfer_size};
}
} // namespace

void Container::lloyd_iter(const py::array_t<int_feature> &old_centers,
py::array_t<int64_t> &centers_psum,
py::array_t<int> &centers_pcount) {
Expand All @@ -35,15 +82,16 @@ void Container::lloyd_iter(const py::array_t<int_feature> &old_centers,
std::vector<std::array<uint64_t, HOST_COUNTERS>> counters(p_.ndpu);
#endif

DPU_CHECK(dpu_broadcast_to(
allset_.value(), "c_clusters", 0, old_centers.data(),
static_cast<size_t>(old_centers.nbytes()), DPU_XFER_DEFAULT),
const auto [src_ptr, xfer_size] = align_c_clusters(old_centers);

DPU_CHECK(dpu_broadcast_to(allset_.value(), "c_clusters", 0, src_ptr,
xfer_size, DPU_XFER_DEFAULT),
throw std::runtime_error("Failed to broadcast old centers"));

const auto tic = std::chrono::steady_clock::now();
//============RUNNING ONE LLOYD ITERATION ON THE DPU==============
DPU_CHECK(dpu_launch(allset_.value(), DPU_SYNCHRONOUS),
throw std::runtime_error("Failed to launch DPUs"));
throw std::runtime_error("Failed to run DPUs"));
//================================================================
const auto toc = std::chrono::steady_clock::now();
p_.time_seconds += std::chrono::duration<double>{toc - tic}.count();
Expand Down
45 changes: 40 additions & 5 deletions tests/test_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def test_clustering_dpu_then_dpu():
def test_large_dimensionality():
"""Test the clustering with a large features * clusters product."""
n_clusters = 24
n_features = 128
n_features = 71
n_points = int(1e4)

# Generating data
Expand Down Expand Up @@ -172,9 +172,44 @@ def test_relocation():
assert kmeans.n_iter_ * 2 / 3 < dpu_kmeans.n_iter_ < kmeans.n_iter_ * 1.5


def test_odd():
"""Test alignment issues when n_clusters * n_features is odd.

Occurs when transferring data to the DPUs.
"""
n_clusters = 15
n_features = 7
n_points = int(1e4)

# Generating data
data = make_blobs(n_points, n_features, centers=n_clusters, random_state=42)[
0
].astype(
np.float32,
)

rng = np.random.default_rng(42)
init = rng.choice(data, n_clusters, replace=False)

# Clustering with DPUs
dpu_kmeans = DPUKMeans(n_clusters, init=init, n_init=1, verbose=False, n_dpu=4)
dpu_kmeans.fit(data)

# Clustering with CPU
kmeans = KMeans(n_clusters, init=init, n_init=1, algorithm="full")
kmeans.fit(data)

# Comparison
rand_score = adjusted_rand_score(dpu_kmeans.labels_, kmeans.labels_)

assert rand_score > 1 - 1e-2
assert kmeans.n_iter_ * 2 / 3 < dpu_kmeans.n_iter_ < kmeans.n_iter_ * 1.5


if __name__ == "__main__":
test_clustering_dpu_then_cpu()
test_clustering_cpu_then_dpu()
test_clustering_dpu_then_dpu()
# test_clustering_dpu_then_cpu()
# test_clustering_cpu_then_dpu()
# test_clustering_dpu_then_dpu()
test_large_dimensionality()
test_relocation()
# test_relocation()
test_odd()
Loading