diff --git a/CMakeLists.txt b/CMakeLists.txt index 3373ca9..7bbc573 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,10 @@ cmake_minimum_required(VERSION 3.15...3.27) set(CMAKE_C_COMPILER "gcc") set(CMAKE_CXX_COMPILER "g++") -project(${SKBUILD_PROJECT_NAME} VERSION ${SKBUILD_PROJECT_VERSION}) +project( + ${SKBUILD_PROJECT_NAME} + VERSION ${SKBUILD_PROJECT_VERSION} + LANGUAGES CXX) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # set(CMAKE_VERBOSE_MAKEFILE ON) @@ -33,7 +36,7 @@ ExternalProject_Add( -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} # temporary workaround until SDK distributes fixed toolchain file -DCMAKE_TOOLCHAIN_FILE=${CMAKE_CURRENT_SOURCE_DIR}/src/dpu_program/dpu.cmake -) + BUILD_ALWAYS TRUE) # =================== BUILDING THE HOST BINARY ====================== @@ -43,6 +46,7 @@ find_package(pybind11 CONFIG REQUIRED) include(CheckIPOSupported) include(${UPMEM_HOME}/share/upmem/cmake/include/host/DpuHost.cmake) include(cmake/CPM.cmake) +include(cmake/CompilerWarnings.cmake) cpmaddpackage( NAME @@ -56,12 +60,34 @@ cpmaddpackage( pybind11_add_module(_core MODULE src/main.cpp src/host_program/dimm_manager.cpp src/host_program/lloyd_iter.cpp) +target_sources( + _core + PRIVATE FILE_SET + common_headers + TYPE + HEADERS + BASE_DIRS + src + FILES + src/common.h) +target_sources( + _core + PRIVATE FILE_SET + host_headers + TYPE + HEADERS + BASE_DIRS + src + FILES + src/kmeans.hpp) target_link_libraries(_core PRIVATE ${DPU_HOST_LIBRARIES} fmt stdc++fs) target_include_directories(_core SYSTEM PUBLIC ${DPU_HOST_INCLUDE_DIRECTORIES}) target_link_directories(_core PUBLIC ${DPU_HOST_LINK_DIRECTORIES}) -target_compile_features(_core PUBLIC c_std_99 cxx_std_17) -target_compile_options(_core PRIVATE -Wall -Wextra $<$:-Ofast>) +target_compile_features(_core PUBLIC cxx_std_17) +target_compile_options(_core PRIVATE $<$:-Ofast> + $<$:-Og>) +dpu_kmeans_set_project_warnings(_core "" "") target_compile_definitions( _core diff --git a/cmake/CompilerWarnings.cmake b/cmake/CompilerWarnings.cmake new file mode 100644 index 0000000..30dc277 --- /dev/null +++ b/cmake/CompilerWarnings.cmake @@ -0,0 +1,68 @@ +# from here: +# +# https://github.com/lefticus/cppbestpractices/blob/master/02-Use_the_Tools_Available.md + +function(dpu_kmeans_set_project_warnings project_name CLANG_WARNINGS + GCC_WARNINGS) + if("${CLANG_WARNINGS}" STREQUAL "") + set(CLANG_WARNINGS + -Wall + -Wextra # reasonable and standard + -Wshadow # warn the user if a variable declaration shadows one from a + # parent context + -Wnon-virtual-dtor # warn the user if a class with virtual functions has + # a non-virtual destructor. This helps + # catch hard to track down memory errors + -Wold-style-cast # warn for c-style casts + -Wcast-align # warn for potential performance problem casts + -Wunused # warn on anything being unused + -Woverloaded-virtual # warn if you overload (not override) a virtual + # function + -Wpedantic # warn if non-standard C++ is used + -Wconversion # warn on type conversions that may lose data + -Wsign-conversion # warn on sign conversions + -Wnull-dereference # warn if a null dereference is detected + -Wdouble-promotion # warn if float is implicit promoted to double + -Wformat=2 # warn on security issues around functions that format output + # (ie printf) + -Wimplicit-fallthrough # warn on statements that fallthrough without an + # explicit annotation + ) + endif() + + if("${GCC_WARNINGS}" STREQUAL "") + set(GCC_WARNINGS + ${CLANG_WARNINGS} + -Wmisleading-indentation # warn if indentation implies blocks where + # blocks do not exist + -Wduplicated-cond # warn if if / else chain has duplicated conditions + -Wduplicated-branches # warn if if / else branches have duplicated code + -Wlogical-op # warn about logical operations being used where bitwise + # were probably wanted + -Wuseless-cast # warn if you perform a cast to the same type + -Wsuggest-override # warn if an overridden member function is not marked + # 'override' or 'final' + ) + endif() + + if(CMAKE_CXX_COMPILER_ID MATCHES ".*Clang" OR CMAKE_C_COMPILER_ID MATCHES + ".*Clang") + set(PROJECT_WARNINGS_CXX ${CLANG_WARNINGS}) + elseif(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(PROJECT_WARNINGS_CXX ${GCC_WARNINGS}) + else() + message( + AUTHOR_WARNING + "No compiler warnings set for CXX compiler: '${CMAKE_CXX_COMPILER_ID}'") + endif() + + # use the same warning flags for C + set(PROJECT_WARNINGS_C "${PROJECT_WARNINGS_CXX}") + + target_compile_options( + ${project_name} + PRIVATE # C++ warnings + $<$:${PROJECT_WARNINGS_CXX}> + # C warnings + $<$:${PROJECT_WARNINGS_C}>) +endfunction() diff --git a/pyproject.toml b/pyproject.toml index b058d85..cf76b4f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,3 +58,9 @@ build-type = "Release" [tool.scikit-build.cmake.define] NR_TASKLETS = "16" UPMEM_HOME = {env = "UPMEM_HOME", default = "/usr"} + +[tool.ruff.lint] +ignore = ["COM812"] + +[tool.ruff.lint.per-file-ignores] +"tests/*.py" = ["S101", "INP001"] diff --git a/src/common.h b/src/common.h index 5e5b769..86addff 100644 --- a/src/common.h +++ b/src/common.h @@ -25,8 +25,8 @@ * @brief Data size constraints */ /**@{*/ -#define ASSUMED_NR_CLUSTERS 32 /**< Maximum number of clusters */ -#define ASSUMED_NR_FEATURES 34 /**< Maximum number of features */ +#define ASSUMED_NR_CLUSTERS 32 /**< Maximum number of clusters */ +#define ASSUMED_NR_FEATURES 128 /**< Maximum number of features */ #define WRAM_FEATURES_SIZE \ 512 /**< max size of the WRAM array holding points features in bytes */ /**@}*/ @@ -75,3 +75,10 @@ typedef int32_t int_feature; (MRAM_SIZE / FEATURE_TYPE * 8 / \ 2) /**< How many features we fit into one DPU's MRAM. Can be increased \ further. */ + +struct __attribute__((aligned(8))) task_parameters { + uint8_t nfeatures; + uint8_t task_size_in_points; + uint16_t task_size_in_features; + uint16_t task_size_in_bytes; +}; diff --git a/src/dpu_kmeans/_core.pyi b/src/dpu_kmeans/_core.pyi index 3681e2f..b0c8e3b 100644 --- a/src/dpu_kmeans/_core.pyi +++ b/src/dpu_kmeans/_core.pyi @@ -14,6 +14,7 @@ from __future__ import annotations import numpy import os +import typing __all__ = ['Container', 'FEATURE_TYPE'] class Container: """ @@ -21,13 +22,12 @@ class Container: Container object to interface with the DPUs """ - nr_dpus: int @staticmethod def _pybind11_conduit_v1_(*args, **kwargs): ... def __init__(self) -> None: ... - def allocate(self) -> None: + def allocate(self, arg0: int) -> None: ... def compute_inertia(self, arg0: numpy.ndarray[numpy.int16]) -> int: ... @@ -35,7 +35,7 @@ class Container: ... def lloyd_iter(self, arg0: numpy.ndarray[numpy.int16], arg1: numpy.ndarray[numpy.int64], arg2: numpy.ndarray[numpy.int32]) -> None: ... - def load_array_data(self, arg0: numpy.ndarray[numpy.int16], arg1: int, arg2: int) -> None: + def load_array_data(self, arg0: numpy.ndarray[numpy.int16], arg1: str) -> None: ... def load_kernel(self, arg0: os.PathLike) -> None: ... @@ -44,12 +44,27 @@ class Container: def reset_timer(self) -> None: ... @property + def allocated(self) -> bool: + ... + @property + def binary_path(self) -> os.PathLike | None: + ... + @property def cpu_pim_time(self) -> float: ... @property + def data_size(self) -> int | None: + ... + @property def dpu_run_time(self) -> float: ... @property + def hash(self) -> bytes | None: + ... + @property + def nr_dpus(self) -> int: + ... + @property def pim_cpu_time(self) -> float: ... FEATURE_TYPE: int = 16 diff --git a/src/dpu_kmeans/_dimm.py b/src/dpu_kmeans/_dimm.py index 159da92..2c5fcfb 100644 --- a/src/dpu_kmeans/_dimm.py +++ b/src/dpu_kmeans/_dimm.py @@ -9,9 +9,6 @@ # pylint: disable=global-statement -import atexit -import sys - import numpy as np import xxhash @@ -26,18 +23,9 @@ from ._core import FEATURE_TYPE, Container -_allocated = False # whether the DPUs have been allocated -_kernel = None # name of the currently loaded binary -_data_id = None # ID of the currently loaded data -_data_checksum = None # the checksum of the currently loaded data -_data_size = None # size of the currently loaded data - _kernels_lib = {"kmeans": files("dpu_kmeans").joinpath("dpu_program/kmeans_dpu_kernel")} ctr = Container() -ctr.nr_dpus = 0 - -_requested_dpus = 0 class LinearDiscretizer(TransformerMixin, BaseEstimator): @@ -126,106 +114,26 @@ def inverse_transform(self, Xt): ld = LinearDiscretizer() # linear discretization transformer -def set_n_dpu(n_dpu: int): - """Set the number of DPUs to ask for during the allocation.""" - global _allocated - global _requested_dpus - if _allocated and _requested_dpus != n_dpu: - free_dpus() - if not _allocated: - _requested_dpus = n_dpu - ctr.nr_dpus = n_dpu - ctr.allocate() - _allocated = True - - -def get_n_dpu(): - """Return the number of allocated DPUs.""" - return ctr.nr_dpus - - def load_kernel(kernel: str, verbose: int = False): """Load a given kernel into the allocated DPUs.""" - global _kernel - global _allocated - global _data_id - global _data_checksum - global _data_size - if not _allocated: - ctr.allocate() - _allocated = True - if not _kernel == kernel: + if ctr.binary_path != kernel: if verbose: print(f"loading new kernel : {kernel}") - _kernel = kernel - ref = _kernels_lib[kernel] - with as_file(ref) as dpu_binary: + with as_file(_kernels_lib[kernel]) as dpu_binary: ctr.load_kernel(dpu_binary) - _data_id = None - _data_checksum = None - _data_size = None def load_data(X, verbose: int = False): """Load a dataset into the allocated DPUs.""" - global _data_checksum - global _data_size - # compute the checksum of X h = xxhash.xxh3_64() h.update(X) X_checksum = h.digest() - if _data_checksum != X_checksum: + if ctr.hash != X_checksum: if verbose: print("loading new data") - _data_checksum = X_checksum Xt = ld.fit_transform(X) - ctr.load_array_data( - Xt, - Xt.shape[0], - Xt.shape[1], - ) - _data_size = sys.getsizeof(Xt) + ctr.load_array_data(Xt, X_checksum) elif verbose: print("reusing previously loaded data") - - -def reset_timer(verbose=False): - """Reset the DPU execution timer.""" - if verbose: - print("resetting inner timer") - ctr.reset_timer() - - -def get_dpu_run_time(): - """Return the DPU execution timer.""" - return ctr.dpu_run_time - - -def get_cpu_pim_time(): - """Return the time to load the data to the DPU memory.""" - return ctr.cpu_pim_time - - -def get_pim_cpu_time(): - """Return the time to get the inertia from the DPU memory.""" - return ctr.pim_cpu_time - - -def free_dpus(verbose: int = False): - """Frees all allocated DPUs.""" - global _allocated - global _kernel - global _data_id - global _data_checksum - global _data_size - if _allocated: - if verbose: - print("freeing dpus") - ctr.free_dpus() - _allocated = False - _kernel = None - _data_id = None - _data_checksum = None - _data_size = None diff --git a/src/dpu_kmeans/_kmeans.py b/src/dpu_kmeans/_kmeans.py index 662540a..baaca2c 100644 --- a/src/dpu_kmeans/_kmeans.py +++ b/src/dpu_kmeans/_kmeans.py @@ -100,7 +100,7 @@ def _lloyd_iter_dpu( if any(points_in_clusters == 0): # If any cluster has no points, we need to set the centers to the # furthest points in the cluster from the previous iteration. - # print("Warning: some clusters have no points, relocating empty clusters") + print("Warning: some clusters have no points, relocating empty clusters") tic = time.perf_counter() centers_old = _dimm.ld.inverse_transform(centers_old_int) @@ -132,7 +132,12 @@ def _lloyd_iter_dpu( # weight_in_clusters = points_in_clusters.astype(float) weight_in_clusters[:] = points_in_clusters _relocate_empty_clusters_dense( - X, sample_weight, centers_old, centers_sum_new, weight_in_clusters, labels + X, + sample_weight, + centers_old, + centers_sum_new, + weight_in_clusters, + labels, ) points_in_clusters[:] = weight_in_clusters @@ -232,12 +237,12 @@ def _kmeans_single_lloyd_dpu( centers_new_int = np.empty_like(centers, dtype=dtype) centers_sum_int = np.empty_like(centers, dtype=np.int64) centers_sum_int_per_dpu = np.empty( - (_dimm.get_n_dpu(), centers.shape[0], centers.shape[1]), + (_dimm.ctr.nr_dpus, centers.shape[0], centers.shape[1]), dtype=np.int64, ) points_in_clusters = np.empty(n_clusters, dtype=np.int32) points_in_clusters_per_dpu = np.empty( - (_dimm.get_n_dpu(), _align_8_bytes(n_clusters, np.dtype(np.int32))), + (_dimm.ctr.nr_dpus, _align_8_bytes(n_clusters, np.dtype(np.int32))), dtype=np.int32, ) @@ -447,8 +452,7 @@ def fit(self, X, y=None, sample_weight=None): x_squared_norms = row_norms(X, squared=True) # allocate DPUs if not yet done - if self.n_dpu: - _dimm.set_n_dpu(self.n_dpu) + _dimm.ctr.allocate(self.n_dpu) if self.reload_data: # load kmeans kernel if not yet done @@ -456,7 +460,7 @@ def fit(self, X, y=None, sample_weight=None): # transfer the data points to the DPUs _dimm.load_data(X, verbose=self.verbose) - self.cpu_pim_time_ = _dimm.get_cpu_pim_time() + self.cpu_pim_time_ = _dimm.ctr.cpu_pim_time kmeans_single = _kmeans_single_lloyd_dpu self._check_mkl_vcomp(X, X.shape[0]) @@ -476,7 +480,7 @@ def fit(self, X, y=None, sample_weight=None): print("Initialization complete") # reset perf timer - _dimm.reset_timer(verbose=self.verbose) + _dimm.ctr.reset_timer() # run a k-means once tic = time.perf_counter() @@ -492,8 +496,8 @@ def fit(self, X, y=None, sample_weight=None): ) toc = time.perf_counter() main_loop_timer = toc - tic - dpu_run_time = _dimm.get_dpu_run_time() - pim_cpu_time = _dimm.get_pim_cpu_time() + dpu_run_time = _dimm.ctr.dpu_run_time + pim_cpu_time = _dimm.ctr.pim_cpu_time train_time += main_loop_timer # determine if these results are the best so far diff --git a/src/dpu_program/.clang-tidy b/src/dpu_program/.clang-tidy new file mode 100644 index 0000000..cefc93c --- /dev/null +++ b/src/dpu_program/.clang-tidy @@ -0,0 +1,8 @@ +Checks: > + *, + -llvmlibc*, + -fuchsia*, + -altera*, + -android*, + -cpp*, + -hicpp*, diff --git a/src/dpu_program/CMakeLists.txt b/src/dpu_program/CMakeLists.txt index f1ea4d5..cad1f23 100644 --- a/src/dpu_program/CMakeLists.txt +++ b/src/dpu_program/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.15...3.27) -project(dpu_program) +project(dpu_program LANGUAGES C) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # set(CMAKE_VERBOSE_MAKEFILE ON) @@ -14,6 +14,7 @@ if(NOT DEFINED UPMEM_HOME) endif() include(CheckIPOSupported) +include(../../cmake/CompilerWarnings.cmake) if(NOT CMAKE_BUILD_TYPE) message(FATAL_ERROR "CMAKE_BUILD_TYPE must be set") @@ -27,13 +28,24 @@ if(EXISTS "/sys/class/dpu_rank/dpu_rank0/dpu_chip_id") endif() endif() -add_compile_options(-mcpu=${CHIP_VERSION} -Wall -Wextra $<$:-Og>) -add_link_options(-mcpu=${CHIP_VERSION}) - add_executable(kmeans_dpu_kernel kmeans_dpu_kernel.c) +target_sources( + kmeans_dpu_kernel + PRIVATE FILE_SET + common_headers + TYPE + HEADERS + BASE_DIRS + ../ + FILES + ../common.h) +target_compile_options(kmeans_dpu_kernel PRIVATE -mcpu=${CHIP_VERSION} + $<$:-Og>) target_compile_definitions(kmeans_dpu_kernel PUBLIC NR_TASKLETS=${NR_TASKLETS}) -target_link_options(kmeans_dpu_kernel PUBLIC -DNR_TASKLETS=${NR_TASKLETS}) +target_link_options(kmeans_dpu_kernel PUBLIC -mcpu=${CHIP_VERSION} + -DNR_TASKLETS=${NR_TASKLETS}) +dpu_kmeans_set_project_warnings(kmeans_dpu_kernel "" "") check_ipo_supported( RESULT ipo_supported diff --git a/src/dpu_program/kmeans_dpu_kernel.c b/src/dpu_program/kmeans_dpu_kernel.c index 94bb92e..438fe1d 100644 --- a/src/dpu_program/kmeans_dpu_kernel.c +++ b/src/dpu_program/kmeans_dpu_kernel.c @@ -17,7 +17,12 @@ #include #include -#include "../common.h" +#include "common.h" + +/*================== DEFINES ============================*/ +#define MAX_MRAM_TRANSFER_SIZE 2048 /**< Maximum size of a MRAM transfer */ +#define MAX_MRAM_INT64_TRANSFER \ + 256 /**< Maximum size of a MRAM transfer for int64_t */ /*================== VARIABLES ==========================*/ /*------------------ LOCAL ------------------------------*/ @@ -25,15 +30,11 @@ * Global variables shared between tasklets */ /**@{*/ -unsigned int itask_in_features; -unsigned int itask_in_points; +int itask_in_features; +int itask_in_points; // unsigned int cluster_transfer_size; -uint8_t nfeatures; uint8_t nclusters; uint16_t ncluster_features; -uint8_t task_size_in_points; -uint16_t task_size_in_bytes; -uint16_t task_size_in_features; /**@}*/ /*------------------ INPUT ------------------------------*/ @@ -41,13 +42,10 @@ uint16_t task_size_in_features; * Variables for host application communication */ /**@{*/ -__host int nfeatures_host; -__host unsigned int nclusters_host; -__host unsigned int npoints; -__host unsigned int task_size_in_points_host; -__host unsigned int task_size_in_bytes_host; -__host unsigned int task_size_in_features_host; -__host unsigned int compute_inertia = 0; +__host struct task_parameters p_h; +__host int nclusters_host; +__host int npoints; +__host int compute_inertia = 0; // __host unsigned int membership_size_in_bytes; /**@}*/ @@ -112,7 +110,7 @@ __mram_noinit int64_t /**@}*/ /*================== SYNCHRONIZATION =====================*/ -BARRIER_INIT(sync_barrier, NR_TASKLETS); +BARRIER_INIT(sync_barrier, NR_TASKLETS) MUTEX_INIT(task_mutex); MUTEX_INIT(write_mutex); MUTEX_INIT(write_count_mutex); @@ -149,8 +147,8 @@ bool taskDispatch(int *current_itask_in_points, int *current_itask_in_features, *current_itask_in_features = itask_in_features; // update the index - itask_in_points += task_size_in_points; - itask_in_features += task_size_in_features; + itask_in_points += p_h.task_size_in_points; + itask_in_features += p_h.task_size_in_features; mutex_unlock(task_mutex); @@ -159,7 +157,7 @@ bool taskDispatch(int *current_itask_in_points, int *current_itask_in_features, perfcounter_get() - tasklet_counters[DISPATCH_TIC]; #endif - return (unsigned)*current_itask_in_points < npoints; + return *current_itask_in_points < npoints; } /** @@ -178,17 +176,11 @@ void initialize(uint8_t tasklet_id) { active_tasklets = 0; #endif - // downcasting some host variables - nfeatures = nfeatures_host; - nclusters = nclusters_host; - ncluster_features = nclusters * nfeatures; - task_size_in_points = task_size_in_points_host; - task_size_in_bytes = task_size_in_bytes_host; - task_size_in_bytes = (task_size_in_bytes + 7) & -8; - task_size_in_features = task_size_in_features_host; + nclusters = (uint8_t)nclusters_host; + ncluster_features = nclusters * p_h.nfeatures; // defining how much data we read/write from MRAM for each centroid - // cluster_transfer_size = nfeatures * sizeof(**centers_sum_tasklets); + // cluster_transfer_size = p_h.nfeatures * sizeof(**centers_sum_tasklets); // rounding cluster_transfer_size up to a multiple of 8 // cluster_transfer_size = (cluster_transfer_size + 7) & -8; @@ -208,30 +200,31 @@ void initialize(uint8_t tasklet_id) { } // reinitializing inertia table - else + else { memset(inertia_tasklets, 0, sizeof(*inertia_tasklets) * NR_TASKLETS); + } } barrier_wait(&sync_barrier); // pre-computing index lookup tables - uint16_t cluster_base_index = tasklet_id * nfeatures; + uint16_t cluster_base_index = tasklet_id * p_h.nfeatures; for (uint8_t icluster = tasklet_id; icluster < nclusters; icluster += NR_TASKLETS) { cluster_base_indices[icluster] = cluster_base_index; - cluster_base_index += NR_TASKLETS * nfeatures; + cluster_base_index += NR_TASKLETS * p_h.nfeatures; } - uint16_t point_base_index = tasklet_id * nfeatures; - for (uint8_t ipoint = tasklet_id; ipoint < task_size_in_points; + uint16_t point_base_index = tasklet_id * p_h.nfeatures; + for (uint8_t ipoint = tasklet_id; ipoint < p_h.task_size_in_points; ipoint += NR_TASKLETS) { point_base_indices[ipoint] = point_base_index; - point_base_index += NR_TASKLETS * nfeatures; + point_base_index += NR_TASKLETS * p_h.nfeatures; } // reinitializing center counters and sums per tasklet // memset(centers_count_tasklets[tasklet_id], 0, - // sizeof(**centers_count_tasklets) * nclusters); + // sizeof(**centers_count_tasklets) * p_h.nclusters); // memset(centers_sum_tasklets[tasklet_id], 0, sizeof(**centers_sum_tasklets) // * ncluster_features); } @@ -265,8 +258,8 @@ void task_reduce(uint8_t icluster, uint16_t point_base_index, tasklet_counters[ARITH_TIC] = perfcounter_get(); #endif mutex_lock(write_mutex); -#pragma unroll(ASSUMED_NR_FEATURES) - for (uint8_t idim = 0; idim < nfeatures; idim++) { +#pragma clang loop unroll(enable) + for (uint8_t idim = 0; idim < p_h.nfeatures; idim++) { // centers_sum_tasklets[tasklet_id][cluster_base_indices[icluster] + idim] // += w_features[point_base_index + idim]; centers_sum[cluster_base_index + idim] += @@ -293,9 +286,9 @@ void task_reduce(uint8_t icluster, uint16_t point_base_index, void final_reduce(uint8_t tasklet_id) { // barrier_wait(&sync_barrier); - // uint16_t cluster_base_index = tasklet_id * nfeatures; + // uint16_t cluster_base_index = tasklet_id * p_h.nfeatures; // #pragma must_iterate(1, ASSUMED_NR_CLUSTERS, 1) - // for (uint8_t icluster = tasklet_id; icluster < nclusters; icluster += + // for (uint8_t icluster = tasklet_id; icluster < p_h.nclusters; icluster += // NR_TASKLETS) // { // #pragma must_iterate(1, NR_TASKLETS, 1) @@ -306,33 +299,49 @@ void final_reduce(uint8_t tasklet_id) { // #pragma unroll(ASSUMED_NR_FEATURES) // #pragma must_iterate(1, ASSUMED_NR_FEATURES, 1) - // for (uint8_t ifeature = 0; ifeature < nfeatures; ifeature++) + // for (uint8_t ifeature = 0; ifeature < p_h.nfeatures; ifeature++) // centers_sum[cluster_base_index + ifeature] += // centers_sum_tasklets[itasklet][cluster_base_index + ifeature]; // } - // cluster_base_index += nfeatures * NR_TASKLETS; + // cluster_base_index += p_h.nfeatures * NR_TASKLETS; // } barrier_wait(&sync_barrier); - if (tasklet_id == 0) { - if (!compute_inertia) { - // TODO: this can probably go, just transfer from WRAM - // writing the partial sums and counts to MRAM - uint16_t mram_transfer_size = nclusters * sizeof(*centers_count); - // rounding up to multiple of 8 - mram_transfer_size = (mram_transfer_size + 7) & -8; - mram_write(centers_count, centers_count_mram, mram_transfer_size); + if (!compute_inertia) { + unsigned mram_transfer_size = ncluster_features * sizeof(*centers_sum); + // rounding up to multiple of 8 + mram_transfer_size = (mram_transfer_size + 7) & (unsigned)-8; + // TODO: fix this, this can be over 2048 + const unsigned my_mram_offset_int64 = tasklet_id * MAX_MRAM_INT64_TRANSFER; + const unsigned my_mram_offset = my_mram_offset_int64 * sizeof(int64_t); + const unsigned my_mram_transfer_size = + my_mram_offset + MAX_MRAM_TRANSFER_SIZE <= mram_transfer_size + ? MAX_MRAM_TRANSFER_SIZE + : my_mram_offset <= mram_transfer_size + ? mram_transfer_size - my_mram_offset + : 0; + if (my_mram_transfer_size > 0) { + mram_write(centers_sum + my_mram_offset_int64, + centers_sum_mram + my_mram_offset_int64, + my_mram_transfer_size); + } - mram_transfer_size = ncluster_features * sizeof(*centers_sum); + if (mutex_trylock(write_mutex)) { + // writing the partial sums and counts to MRAM + uint16_t counts_size = nclusters * sizeof(*centers_count); // rounding up to multiple of 8 - mram_transfer_size = (mram_transfer_size + 7) & -8; - mram_write(centers_sum, centers_sum_mram, mram_transfer_size); - } else + counts_size = (counts_size + 7) & -8; + mram_write(centers_count, centers_count_mram, counts_size); + } + } else { + if (tasklet_id == 0) { // summing inertia inertia = 0; - for (int i_tasklet = 0; i_tasklet < NR_TASKLETS; i_tasklet++) - inertia += inertia_tasklets[i_tasklet]; + for (int i_tasklet = 0; i_tasklet < NR_TASKLETS; i_tasklet++) { + inertia += inertia_tasklets[i_tasklet]; + } + } } } @@ -370,7 +379,7 @@ void counters_tally(uint8_t tasklet_id, perfcounter_t *tasklet_counters) { * @return 0 on success */ int main() { - uint8_t tasklet_id = me(); + uint8_t tasklet_id = (uint8_t)me(); int current_itask_in_points; int current_itask_in_features; @@ -398,15 +407,15 @@ int main() { #endif mram_read(&t_features[current_itask_in_features], w_features, - task_size_in_bytes); + p_h.task_size_in_bytes); uint8_t max_ipoint = - (task_size_in_points < npoints - current_itask_in_points) - ? task_size_in_points - : npoints - current_itask_in_points; + (p_h.task_size_in_points < npoints - current_itask_in_points) + ? p_h.task_size_in_points + : (uint8_t)(npoints - current_itask_in_points); for (uint8_t ipoint = 0; ipoint < max_ipoint; ipoint++) { uint64_t min_dist = UINT64_MAX; - uint8_t index = -1; + uint8_t index = UINT8_MAX; uint16_t point_base_index = point_base_indices[ipoint]; #ifdef PERF_COUNTER @@ -417,14 +426,15 @@ int main() { uint64_t dist = 0; /* Euclidean distance squared */ uint16_t cluster_base_index = cluster_base_indices[icluster]; -#pragma unroll(ASSUMED_NR_FEATURES) - for (uint8_t idim = 0; idim < nfeatures; idim++) { - volatile int_feature diff = (w_features[point_base_index + idim] - - c_clusters[cluster_base_index + idim]); +#pragma clang loop unroll(enable) + for (uint8_t idim = 0; idim < p_h.nfeatures; idim++) { + volatile int_feature diff = + (int_feature)(w_features[point_base_index + idim] - + c_clusters[cluster_base_index + idim]); #if FEATURE_TYPE == 32 dist += (int64_t)diff * diff; /* sum of squares */ #else - dist += diff * diff; /* sum of squares */ + dist += (uint32_t)(diff * diff); /* sum of squares */ #endif } /* see if distance is smaller than previous ones: @@ -440,10 +450,11 @@ int main() { #endif #ifndef PERF_COUNTER - if (!compute_inertia) + if (!compute_inertia) { task_reduce(index, point_base_index, w_features); - else + } else { inertia_tasklets[tasklet_id] += min_dist; + } #else task_reduce(index, point_base_index, w_features, tasklet_counters); #endif @@ -476,9 +487,9 @@ int main() { // printf("nreal_points: %d\n", npoints); // // printf("maxes: "); - // // for(int ifeature = 0; ifeature < nfeatures; ifeature++){ + // // for(int ifeature = 0; ifeature < p_h.nfeatures; ifeature++){ // // int64_t max_mean = 0; - // // for(int icluster = 0; icluster 0 // &¢ers_sum[cluster_base_indices[icluster]+ifeature] // /centers_count[icluster] > max_mean) @@ -495,7 +506,7 @@ int main() { // // } // // printf("\n"); - // for(int ifeature = 0; ifeature< nfeatures; ifeature++) + // for(int ifeature = 0; ifeature< p_h.nfeatures; ifeature++) // { // printf("count cluster %d : %d\n", ifeature, centers_count[ifeature]); // } diff --git a/src/host_program/dimm_manager.cpp b/src/host_program/dimm_manager.cpp index a2691bf..f646597 100644 --- a/src/host_program/dimm_manager.cpp +++ b/src/host_program/dimm_manager.cpp @@ -9,63 +9,108 @@ #include #include +#include #include #include #include +#include #include -#include "../kmeans.hpp" +#include "kmeans.hpp" extern "C" { #include #include } -/** - * @brief Allocates all DPUs - * - * @param p Algorithm parameters. - */ -void Container::allocate() { - if (p_.ndpu == 0U) { - DPU_ASSERT(dpu_alloc(DPU_ALLOCATE_ALL, nullptr, &p_.allset)); +void Container::allocate(uint32_t ndpu) { + if ((requested_dpus_ == ndpu || p_.ndpu == ndpu) && allset_) { + return; + } + requested_dpus_ = ndpu; + if (allset_) { + free_dpus(); + } + allset_.emplace(); + if (ndpu == 0U) { + DPU_CHECK(dpu_alloc(DPU_ALLOCATE_ALL, nullptr, &allset_.value()), + throw std::runtime_error("Failed to allocate DPUs")); } else { - DPU_ASSERT(dpu_alloc(p_.ndpu, nullptr, &p_.allset)); + DPU_CHECK(dpu_alloc(ndpu, nullptr, &allset_.value()), + throw std::runtime_error("Failed to allocate DPUs")); } - p_.allocated = true; - DPU_ASSERT(dpu_get_nr_dpus(p_.allset, &p_.ndpu)); + DPU_CHECK(dpu_get_nr_dpus(allset_.value(), &p_.ndpu), + throw std::runtime_error("Failed to get number of DPUs")); inertia_per_dpu_.resize(p_.ndpu); } +void Container::free_dpus() { + if (!allset_) { + return; + } + DPU_CHECK(dpu_free(allset_.value()), + throw std::runtime_error("Failed to free DPUs")); + allset_.reset(); + hash_.reset(); + binary_path_.reset(); + data_size_.reset(); + p_.ndpu = 0; +} + +void Container::load_kernel(const fs::path &binary_path) { + if (binary_path_ == binary_path) { + return; + } + if (!allset_) { + throw std::runtime_error("No DPUs allocated"); + } + DPU_CHECK(dpu_load(allset_.value(), binary_path.c_str(), nullptr), + throw std::runtime_error("Failed to load kernel")); + binary_path_ = binary_path; + hash_.reset(); + data_size_.reset(); +} + /** - * @brief Frees the DPUs. + * @brief Utility function to check the cast of a value. * - * @param p Algorithm parameters. + * @tparam T type to cast to + * @tparam U type of the value to cast + * @param name name of the variable to be cast + * @param value value to cast + * @return T the casted value */ -void Container::free_dpus() { - if (p_.allocated) { - DPU_ASSERT(dpu_free(p_.allset)); - p_.allocated = false; +template +static constexpr auto checked_cast(std::string_view name, U value) -> T { + if (value > std::numeric_limits::max()) { + throw std::overflow_error(fmt::format("{} is too large: {} (max {})", name, + value, + std::numeric_limits::max())); } + return static_cast(value); } /** - * @brief Loads a binary in the DPUs. + * @brief utility macro to create a name-value pair for checked_cast * - * @param p Algorithm parameters. - * @param DPU_BINARY path to the binary */ -void Container::load_kernel(const std::filesystem::path &binary_path) { - DPU_ASSERT(dpu_load(p_.allset, binary_path.c_str(), nullptr)); -} +#define VN(var) #var, var void Container::broadcast_number_of_clusters() const { - unsigned int nclusters_short = p_.nclusters; - DPU_ASSERT(dpu_broadcast_to(p_.allset, "nclusters_host", 0, &nclusters_short, - sizeof(nclusters_short), DPU_XFER_DEFAULT)); + if (!allset_) { + throw std::runtime_error("No DPUs allocated"); + } + checked_cast(VN(p_.nclusters)); + DPU_CHECK( + dpu_broadcast_to(allset_.value(), "nclusters_host", 0, &p_.nclusters, + sizeof(p_.nclusters), DPU_XFER_DEFAULT), + throw std::runtime_error("Failed to broadcast number of clusters")); } void Container::populate_dpus(const py::array_t &py_features) { + if (!allset_) { + throw std::runtime_error("No DPUs allocated"); + } auto features = py_features.unchecked<2>(); nreal_points_.resize(p_.ndpu); @@ -77,30 +122,36 @@ void Container::populate_dpus(const py::array_t &py_features) { dpu_set_t dpu{}; uint32_t each_dpu = 0; int64_t next = 0; - DPU_FOREACH(p_.allset, dpu, each_dpu) { + DPU_FOREACH(allset_.value(), dpu, each_dpu) { int64_t current = next; /* The C API takes a non-const pointer but does not modify the data */ - DPU_ASSERT(dpu_prepare_xfer( - dpu, const_cast(features.data(next, 0)))); + DPU_CHECK(dpu_prepare_xfer( + dpu, const_cast(features.data(next, 0))), + throw std::runtime_error("Failed to prepare transfer")); padding_points -= p_.npointperdpu; next = std::max(0L, -padding_points); int64_t nreal_points_dpu = next - current; - if (nreal_points_dpu > std::numeric_limits::max()) { - throw std::length_error( - fmt::format("Too many points for one DPU : {}", nreal_points_dpu)); - } - nreal_points_[each_dpu] = static_cast(nreal_points_dpu); + nreal_points_[each_dpu] = checked_cast(VN(nreal_points_dpu)); + } + auto features_count_per_dpu = p_.npointperdpu * p_.nfeatures; + if (features_count_per_dpu > MAX_FEATURE_DPU) { + throw std::length_error(fmt::format("Too many features for one DPU : {}", + features_count_per_dpu)); } - DPU_ASSERT(dpu_push_xfer(p_.allset, DPU_XFER_TO_DPU, "t_features", 0, - p_.npointperdpu * p_.nfeatures * sizeof(int_feature), - DPU_XFER_DEFAULT)); + DPU_CHECK(dpu_push_xfer(allset_.value(), DPU_XFER_TO_DPU, "t_features", 0, + static_cast(features_count_per_dpu) * + sizeof(int_feature), + DPU_XFER_DEFAULT), + throw std::runtime_error("Failed to push transfer")); - DPU_FOREACH(p_.allset, dpu, each_dpu) { - DPU_ASSERT(dpu_prepare_xfer(dpu, &nreal_points_[each_dpu])); + DPU_FOREACH(allset_.value(), dpu, each_dpu) { + DPU_CHECK(dpu_prepare_xfer(dpu, &nreal_points_[each_dpu]), + throw std::runtime_error("Failed to prepare transfer")); } - DPU_ASSERT(dpu_push_xfer(p_.allset, DPU_XFER_TO_DPU, "npoints", 0, - sizeof(int), DPU_XFER_DEFAULT)); + DPU_CHECK(dpu_push_xfer(allset_.value(), DPU_XFER_TO_DPU, "npoints", 0, + sizeof(int), DPU_XFER_DEFAULT), + throw std::runtime_error("Failed to push transfer")); const auto toc = std::chrono::steady_clock::now(); p_.cpu_pim_time = std::chrono::duration{toc - tic}.count(); @@ -133,8 +184,8 @@ void Container::load_nclusters(int nclusters) { /* task size in points should fit in an int */ int64_t task_size_in_points_64 = (p_.npointperdpu + ntasks - 1) / ntasks; if (task_size_in_points_64 > std::numeric_limits::max()) { - throw std::length_error(fmt::format("task size in points is too large: {}", - task_size_in_points_64)); + throw std::overflow_error(fmt::format( + "task size in points is too large: {}", task_size_in_points_64)); } /* task size has to be at least 1 and at most max_task_size */ int task_size_in_points = @@ -161,6 +212,9 @@ void Container::load_nclusters(int nclusters) { void Container::broadcast_parameters() { /* parameters to calculate once here and send to the DPUs. */ + if (!allset_) { + throw std::runtime_error("No DPUs allocated"); + } /* compute the iteration variables for the DPUs */ int task_size_in_bytes = get_task_size(); @@ -170,19 +224,16 @@ void Container::broadcast_parameters() { task_size_in_bytes / static_cast(sizeof(int_feature)); int task_size_in_points = task_size_in_features / p_.nfeatures; + /* validate variables width for the DPUs */ + task_parameters params_host{checked_cast(VN(p_.nfeatures)), + checked_cast(VN(task_size_in_points)), + checked_cast(VN(task_size_in_features)), + checked_cast(VN(task_size_in_bytes))}; + /* send computation parameters to the DPUs */ - DPU_ASSERT(dpu_broadcast_to(p_.allset, "nfeatures_host", 0, &p_.nfeatures, - sizeof(p_.nfeatures), DPU_XFER_DEFAULT)); - - DPU_ASSERT(dpu_broadcast_to(p_.allset, "task_size_in_points_host", 0, - &task_size_in_points, sizeof(task_size_in_points), - DPU_XFER_DEFAULT)); - DPU_ASSERT(dpu_broadcast_to(p_.allset, "task_size_in_bytes_host", 0, - &task_size_in_bytes, sizeof(task_size_in_bytes), - DPU_XFER_DEFAULT)); - DPU_ASSERT(dpu_broadcast_to(p_.allset, "task_size_in_features_host", 0, - &task_size_in_features, - sizeof(task_size_in_features), DPU_XFER_DEFAULT)); + DPU_CHECK(dpu_broadcast_to(allset_.value(), "p_h", 0, ¶ms_host, + sizeof(task_parameters), DPU_XFER_DEFAULT), + throw std::runtime_error("Failed to broadcast parameters")); } void Container::transfer_data(const py::array_t &data_int) { @@ -191,11 +242,21 @@ void Container::transfer_data(const py::array_t &data_int) { } void Container::load_array_data(const py::array_t &data_int, - int64_t npoints, int nfeatures) { - p_.npoints = npoints; - p_.nfeatures = nfeatures; - p_.npadded = ((p_.npoints + 8 * p_.ndpu - 1) / (8 * p_.ndpu)) * 8 * p_.ndpu; + const std::string &hash) { + if (hash_ == hash) { + return; + } + hash_ = hash; + + p_.npoints = data_int.shape(0); + p_.nfeatures = checked_cast(VN(data_int.shape(1))); + auto alignment = 8 * p_.ndpu; + p_.npadded = ((p_.npoints + alignment - 1) / alignment) * alignment; p_.npointperdpu = p_.npadded / p_.ndpu; + data_size_ = data_int.nbytes(); + transfer_data(data_int); } + +#undef VN diff --git a/src/host_program/lloyd_iter.cpp b/src/host_program/lloyd_iter.cpp index 81dd153..11b8eb7 100644 --- a/src/host_program/lloyd_iter.cpp +++ b/src/host_program/lloyd_iter.cpp @@ -4,14 +4,15 @@ * @brief Performs one iteration of the Lloyd K-Means algorithm. * */ - +#include #include #include #include +#include #include -#include "../kmeans.hpp" +#include "kmeans.hpp" #ifdef PERF_COUNTER #include @@ -20,26 +21,29 @@ extern "C" { #include -#include "../common.h" +#include "common.h" } void Container::lloyd_iter(const py::array_t &old_centers, py::array_t ¢ers_psum, py::array_t ¢ers_pcount) { - dpu_set_t dpu{}; /* Iteration variable for the DPUs. */ - uint32_t each_dpu = 0; /* Iteration variable for the DPUs. */ - + if (!allset_) { + throw std::runtime_error("No DPUs allocated"); + } #ifdef PERF_COUNTER std::array counters_mean = {}; std::vector> counters(p_.ndpu); #endif - DPU_ASSERT(dpu_broadcast_to(p_.allset, "c_clusters", 0, old_centers.data(), - old_centers.nbytes(), DPU_XFER_DEFAULT)); + DPU_CHECK(dpu_broadcast_to( + allset_.value(), "c_clusters", 0, old_centers.data(), + static_cast(old_centers.nbytes()), DPU_XFER_DEFAULT), + throw std::runtime_error("Failed to broadcast old centers")); const auto tic = std::chrono::steady_clock::now(); //============RUNNING ONE LLOYD ITERATION ON THE DPU============== - DPU_ASSERT(dpu_launch(p_.allset, DPU_SYNCHRONOUS)); + DPU_CHECK(dpu_launch(allset_.value(), DPU_SYNCHRONOUS), + throw std::runtime_error("Failed to launch DPUs")); //================================================================ const auto toc = std::chrono::steady_clock::now(); p_.time_seconds += std::chrono::duration{toc - tic}.count(); @@ -47,10 +51,10 @@ void Container::lloyd_iter(const py::array_t &old_centers, /* Performance tracking */ #ifdef PERF_COUNTER DPU_FOREACH(p_.allset, dpu, each_dpu) { - DPU_ASSERT(dpu_prepare_xfer(dpu, counters[each_dpu].data())); + DPU_CHECK(dpu_prepare_xfer(dpu, counters[each_dpu].data())); } - DPU_ASSERT(dpu_push_xfer(p_.allset, DPU_XFER_FROM_DPU, "host_counters", 0, - sizeof(counters[0]), DPU_XFER_DEFAULT)); + DPU_CHECK(dpu_push_xfer(p_.allset, DPU_XFER_FROM_DPU, "host_counters", 0, + sizeof(counters[0]), DPU_XFER_DEFAULT)); for (int icounter = 0; icounter < HOST_COUNTERS; icounter++) { int nonzero_dpus = 0; @@ -95,23 +99,43 @@ void Container::lloyd_iter(const py::array_t &old_centers, #endif /* copy back membership count per dpu (device to host) */ - DPU_FOREACH(p_.allset, dpu, each_dpu) { - DPU_ASSERT( + dpu_set_t dpu{}; + uint32_t each_dpu = 0; + DPU_FOREACH(allset_.value(), dpu, each_dpu) { + DPU_CHECK( // TODO: direct access - dpu_prepare_xfer(dpu, centers_pcount.mutable_data(each_dpu))); + dpu_prepare_xfer(dpu, centers_pcount.mutable_data(each_dpu)), + throw std::runtime_error("Failed to prepare transfer")); } - DPU_ASSERT(dpu_push_xfer( - p_.allset, DPU_XFER_FROM_DPU, "centers_count_mram", 0, - centers_pcount.itemsize() * centers_pcount.shape(1), DPU_XFER_DEFAULT)); + auto nr_clusters = centers_pcount.shape(1); + if (nr_clusters > ASSUMED_NR_CLUSTERS) { + throw std::length_error( + fmt::format("Too many clusters for one DPU : {}", nr_clusters)); + } + DPU_CHECK(dpu_push_xfer( + allset_.value(), DPU_XFER_FROM_DPU, "centers_count_mram", 0, + static_cast(centers_pcount.itemsize() * nr_clusters), + DPU_XFER_DEFAULT), + throw std::runtime_error("Failed to push transfer")); /* copy back centroids partial sums (device to host) */ - DPU_FOREACH(p_.allset, dpu, each_dpu) { - DPU_ASSERT(dpu_prepare_xfer(dpu, centers_psum.mutable_data(each_dpu))); + DPU_FOREACH(allset_.value(), dpu, each_dpu) { + DPU_CHECK(dpu_prepare_xfer(dpu, centers_psum.mutable_data(each_dpu, 0, 0)), + throw std::runtime_error("Failed to prepare transfer")); + } + auto nr_clusters_x_nr_features = + centers_psum.shape(1) * centers_psum.shape(2); + if (nr_clusters_x_nr_features > ASSUMED_NR_CLUSTERS * ASSUMED_NR_FEATURES) { + throw std::length_error( + fmt::format("Too many clusters x features for one DPU : {}", + nr_clusters_x_nr_features)); } - DPU_ASSERT(dpu_push_xfer( - p_.allset, DPU_XFER_FROM_DPU, "centers_sum_mram", 0, - centers_psum.itemsize() * centers_psum.shape(1) * centers_psum.shape(2), - DPU_XFER_DEFAULT)); + DPU_CHECK( + dpu_push_xfer(allset_.value(), DPU_XFER_FROM_DPU, "centers_sum_mram", 0, + static_cast(centers_psum.itemsize() * + nr_clusters_x_nr_features), + DPU_XFER_DEFAULT), + throw std::runtime_error("Failed to push transfer")); /* averaging the new centers and summing the centers count * has been moved to the python code */ @@ -119,16 +143,23 @@ void Container::lloyd_iter(const py::array_t &old_centers, auto Container::compute_inertia(const py::array_t &old_centers) -> int64_t { + if (!allset_) { + throw std::runtime_error("No DPUs allocated"); + } int compute_inertia = 1; - DPU_ASSERT(dpu_broadcast_to(p_.allset, "c_clusters", 0, old_centers.data(), - old_centers.nbytes(), DPU_XFER_DEFAULT)); + DPU_CHECK(dpu_broadcast_to( + allset_.value(), "c_clusters", 0, old_centers.data(), + static_cast(old_centers.nbytes()), DPU_XFER_DEFAULT), + throw std::runtime_error("Failed to broadcast old centers")); - DPU_ASSERT(dpu_broadcast_to(p_.allset, "compute_inertia", 0, &compute_inertia, - sizeof(int), DPU_XFER_DEFAULT)); + DPU_CHECK(dpu_broadcast_to(allset_.value(), "compute_inertia", 0, + &compute_inertia, sizeof(int), DPU_XFER_DEFAULT), + throw std::runtime_error("Failed to broadcast compute inertia")); auto tic = std::chrono::steady_clock::now(); //============RUNNING ONE LLOYD ITERATION ON THE DPU============== - DPU_ASSERT(dpu_launch(p_.allset, DPU_SYNCHRONOUS)); + DPU_CHECK(dpu_launch(allset_.value(), DPU_SYNCHRONOUS), + throw std::runtime_error("Failed to launch DPUs")); //================================================================ auto toc = std::chrono::steady_clock::now(); p_.time_seconds += std::chrono::duration{toc - tic}.count(); @@ -137,19 +168,22 @@ auto Container::compute_inertia(const py::array_t &old_centers) /* copy back inertia (device to host) */ dpu_set_t dpu{}; uint32_t each_dpu = 0; - DPU_FOREACH(p_.allset, dpu, each_dpu) { - DPU_ASSERT(dpu_prepare_xfer(dpu, &(inertia_per_dpu_[each_dpu]))); + DPU_FOREACH(allset_.value(), dpu, each_dpu) { + DPU_CHECK(dpu_prepare_xfer(dpu, &(inertia_per_dpu_[each_dpu])), + throw std::runtime_error("Failed to prepare transfer")); } - DPU_ASSERT(dpu_push_xfer(p_.allset, DPU_XFER_FROM_DPU, "inertia", 0, - sizeof(inertia_per_dpu_[0]), DPU_XFER_DEFAULT)); + DPU_CHECK(dpu_push_xfer(allset_.value(), DPU_XFER_FROM_DPU, "inertia", 0, + sizeof(inertia_per_dpu_[0]), DPU_XFER_DEFAULT), + throw std::runtime_error("Failed to push transfer")); /* sum partial inertia */ int64_t inertia = std::accumulate(inertia_per_dpu_.cbegin(), inertia_per_dpu_.cend(), 0LL); compute_inertia = 0; - DPU_ASSERT(dpu_broadcast_to(p_.allset, "compute_inertia", 0, &compute_inertia, - sizeof(int), DPU_XFER_DEFAULT)); + DPU_CHECK(dpu_broadcast_to(allset_.value(), "compute_inertia", 0, + &compute_inertia, sizeof(int), DPU_XFER_DEFAULT), + throw std::runtime_error("Failed to broadcast compute inertia")); toc = std::chrono::steady_clock::now(); diff --git a/src/kmeans.hpp b/src/kmeans.hpp index a8705aa..9f643a3 100644 --- a/src/kmeans.hpp +++ b/src/kmeans.hpp @@ -9,9 +9,11 @@ #include #include +#include #include #include +#include #include extern "C" { @@ -34,13 +36,13 @@ struct kmeans_params { int nclusters; /**< Number of clusters */ int isOutput; /**< Whether to print debug information */ uint32_t ndpu; /**< Number of allocated dpu */ - dpu_set_t allset; /**< Struct of the allocated dpu set */ - bool allocated; /**< Whether the DPUs are allocated */ double time_seconds; /**< Perf counter */ double cpu_pim_time; /**< Time to populate the DPUs */ double pim_cpu_time; /**< Time to transfer inertia from the CPU */ }; +namespace fs = std::filesystem; + /** * @brief Container class for interfacing with python * @@ -49,10 +51,14 @@ struct kmeans_params { */ class Container { private: - kmeans_params p_{}; /**< Struct containing various algorithm parameters. */ - std::vector inertia_per_dpu_; /**< Iteration buffer to read inertia - from the DPUs. */ - std::vector nreal_points_; /* number of real data points on each dpu */ + kmeans_params p_{}; /**< Algorithm parameters. */ + std::optional allset_{}; /**< Set of DPUs. */ + uint32_t requested_dpus_{0}; /**< Number of requested DPUs. */ + std::vector inertia_per_dpu_; /**< Internal iteration buffer. */ + std::vector nreal_points_; /**< Real data points per dpu. */ + std::optional hash_; /**< Hash of the data. */ + std::optional binary_path_; /**< Path to the binary. */ + std::optional data_size_; /**< Size of the data. */ /** * @brief Broadcast current number of clusters to the DPUs @@ -100,13 +106,25 @@ class Container { auto operator=(Container &&) -> Container & = default; /** - * @brief Allocates DPUs. + * @brief Allocates DPUs if necessary + * + * @param ndpu Number of DPUs to allocate. 0 means all available DPUs. */ - void allocate(); + void allocate(uint32_t ndpu); [[nodiscard]] auto get_ndpu() const -> size_t { return p_.ndpu; } - void set_ndpu(uint32_t ndpu) { p_.ndpu = ndpu; } + [[nodiscard]] auto allocated() const -> bool { return allset_.has_value(); } + + [[nodiscard]] auto hash() const -> std::optional { return hash_; } + + [[nodiscard]] auto binary_path() const -> std::optional { + return binary_path_; + } + + [[nodiscard]] auto data_size() const -> std::optional { + return data_size_; + } void reset_timer() { p_.time_seconds = 0.0; } @@ -136,7 +154,7 @@ class Container { * @param threshold Parameter to declare convergence. */ void load_array_data(const py::array_t &data_int, - int64_t npoints, int nfeatures); + const std::string &hash); /** * @brief Informs the DPUs of the number of clusters for that iteration. diff --git a/src/main.cpp b/src/main.cpp index 322d2d3..80d79df 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include "kmeans.hpp" @@ -44,7 +45,11 @@ PYBIND11_MODULE(_core, m) { )pbdoc") .def(py::init<>()) .def("allocate", &Container::allocate) - .def_property("nr_dpus", &Container::get_ndpu, &Container::set_ndpu) + .def_property_readonly("nr_dpus", &Container::get_ndpu) + .def_property_readonly("allocated", &Container::allocated) + .def_property_readonly("hash", &Container::hash) + .def_property_readonly("binary_path", &Container::binary_path) + .def_property_readonly("data_size", &Container::data_size) .def("load_kernel", &Container::load_kernel) .def("load_array_data", &Container::load_array_data) .def("load_n_clusters", &Container::load_nclusters) diff --git a/tests/test_functional.py b/tests/test_functional.py index 51dcce9..ef2adc8 100644 --- a/tests/test_functional.py +++ b/tests/test_functional.py @@ -1,39 +1,37 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -"""Module for automated functional testing -Tests the K-Means at the service level""" +"""Module for automated functional testing. -# Author: Sylvan Brocard -# License: MIT +Tests the K-Means at the service level + +:Author: Sylvan Brocard +:License: MIT +""" import numpy as np -from sklearn.cluster import KMeans +from sklearn.cluster import KMeans, kmeans_plusplus from sklearn.datasets import make_blobs from sklearn.metrics import adjusted_rand_score from dpu_kmeans import KMeans as DPUKMeans -from dpu_kmeans import _dimm - -N_CLUSTERS = 15 def test_clustering_dpu_then_cpu(): - """Make clustering on DPUs and then on CPU, and compare the results""" + """Make clustering on DPUs and then on CPU, and compare the results.""" + n_clusters = 15 # Generating data - data = make_blobs(int(1e4), 8, centers=N_CLUSTERS, random_state=42)[0].astype( - np.float32 + data = make_blobs(int(1e4), 8, centers=n_clusters, random_state=42)[0].astype( + np.float32, ) - init = data[:N_CLUSTERS] + rng = np.random.default_rng(42) + init = rng.choice(data, n_clusters, replace=False) # Clustering with DPUs - _dimm.set_n_dpu(4) - dpu_kmeans = DPUKMeans(N_CLUSTERS, init=init, n_init=1, verbose=False) + dpu_kmeans = DPUKMeans(n_clusters, init=init, n_init=1, verbose=False, n_dpu=4) dpu_kmeans.fit(data) # Clustering with CPU - kmeans = KMeans(N_CLUSTERS, init=init, n_init=1, algorithm="full") + kmeans = KMeans(n_clusters, init=init, n_init=1, algorithm="full") kmeans.fit(data) # Comparison @@ -44,22 +42,23 @@ def test_clustering_dpu_then_cpu(): def test_clustering_cpu_then_dpu(): - """Make clustering on CPUs and then on DPU, and compare the results""" + """Make clustering on CPUs and then on DPU, and compare the results.""" + n_clusters = 15 # Generating data - data = make_blobs(int(1e4), 8, centers=N_CLUSTERS, random_state=42)[0].astype( - np.float32 + data = make_blobs(int(1e4), 8, centers=n_clusters, random_state=42)[0].astype( + np.float32, ) - init = data[:N_CLUSTERS] + rng = np.random.default_rng(42) + init = rng.choice(data, n_clusters, replace=False) # Clustering with CPU - kmeans = KMeans(N_CLUSTERS, init=init, n_init=1, algorithm="full") + kmeans = KMeans(n_clusters, init=init, n_init=1, algorithm="full") kmeans.fit(data) # Clustering with DPUs - _dimm.set_n_dpu(4) - dpu_kmeans = DPUKMeans(N_CLUSTERS, init=init, n_init=1, verbose=False) + dpu_kmeans = DPUKMeans(n_clusters, init=init, n_init=1, verbose=False, n_dpu=4) dpu_kmeans.fit(data) # Comparison @@ -70,25 +69,25 @@ def test_clustering_cpu_then_dpu(): def test_clustering_dpu_then_dpu(): - """Make clustering on DPU twice, and compare the results""" + """Make clustering on DPU twice, and compare the results.""" + n_clusters = 15 # Generating data - data = make_blobs(int(1e4), 8, centers=N_CLUSTERS, random_state=42)[0].astype( - np.float32 + data = make_blobs(int(1e4), 8, centers=n_clusters, random_state=42)[0].astype( + np.float32, ) data_copy = data.copy() - init = data[:N_CLUSTERS] + rng = np.random.default_rng(42) + init = rng.choice(data, n_clusters, replace=False) # Clustering with DPUs - _dimm.set_n_dpu(4) - - dpu_kmeans = DPUKMeans(N_CLUSTERS, init=init, n_init=1, verbose=False) + dpu_kmeans = DPUKMeans(n_clusters, init=init, n_init=1, verbose=False, n_dpu=4) dpu_kmeans.fit(data) n_iter_1 = dpu_kmeans.n_iter_ dpu_labels_1 = dpu_kmeans.labels_ - dpu_kmeans = DPUKMeans(N_CLUSTERS, init=init, n_init=1, verbose=False) + dpu_kmeans = DPUKMeans(n_clusters, init=init, n_init=1, verbose=False, n_dpu=4) dpu_kmeans.fit(data_copy) n_iter_2 = dpu_kmeans.n_iter_ dpu_labels_2 = dpu_kmeans.labels_ @@ -100,7 +99,38 @@ def test_clustering_dpu_then_dpu(): assert n_iter_1 == n_iter_2 +def test_large_dimensionality(): + """Test the clustering with a large features * clusters product.""" + n_clusters = 24 + n_features = 128 + + # Generating data + data = make_blobs(int(1e4), n_features, centers=n_clusters, random_state=42)[ + 0 + ].astype(np.float32) + + # use KMeans++ initialization here for the sake of having a shorter test + # otherwise we get a lot of clusters relocations with a low number of points + # in high dimensionality + init = kmeans_plusplus(data, n_clusters, random_state=42)[0] + + # Clustering with DPUs + dpu_kmeans = DPUKMeans(n_clusters, init=init, n_init=1, verbose=False, n_dpu=4) + dpu_kmeans.fit(data) + + # Clustering with CPU + kmeans = KMeans(n_clusters, init=init, n_init=1, algorithm="full") + kmeans.fit(data) + + # Comparison + rand_score = adjusted_rand_score(dpu_kmeans.labels_, kmeans.labels_) + + assert rand_score > 1 - 1e-2 + assert kmeans.n_iter_ * 2 / 3 < dpu_kmeans.n_iter_ < kmeans.n_iter_ * 1.5 + + if __name__ == "__main__": test_clustering_dpu_then_cpu() test_clustering_cpu_then_dpu() test_clustering_dpu_then_dpu() + test_large_dimensionality()