Skip to content

Commit

Permalink
lint and remove dead code
Browse files Browse the repository at this point in the history
  • Loading branch information
SylvanBrocard committed Dec 6, 2024
1 parent cfd70e2 commit 028c235
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 57 deletions.
13 changes: 7 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
[build-system]
requires = [
"pybind11",
"scikit-build-core>=0.10",
]
requires = ["pybind11", "scikit-build-core>=0.10"]
build-backend = "scikit_build_core.build"

[project]
Expand Down Expand Up @@ -57,10 +54,14 @@ build-type = "Release"

[tool.scikit-build.cmake.define]
NR_TASKLETS = "16"
UPMEM_HOME = {env = "UPMEM_HOME", default = "/usr"}
UPMEM_HOME = { env = "UPMEM_HOME", default = "/usr" }

[tool.ruff.lint]
ignore = ["COM812"]
select = ["ALL"]
ignore = ["COM812", "ANN", "T201", "TRY003"]

[tool.ruff.lint.pep8-naming]
ignore-names = ["X*"]

[tool.ruff.lint.per-file-ignores]
"tests/*.py" = ["S101", "INP001"]
108 changes: 57 additions & 51 deletions src/dpu_kmeans/_kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@

import time
import warnings
from lib2to3.pgen2 import driver
from os import scandir

import numpy as np
import scipy.sparse as sp
from sklearn.cluster import KMeans as KMeansCPU
from sklearn.cluster._k_means_common import _relocate_empty_clusters_dense
from sklearn.cluster._kmeans import (
_is_same_clustering,
_labels_inertia_threadpool_limit,
_openmp_effective_n_threads,
lloyd_iter_chunked_dense,
Expand Down Expand Up @@ -67,17 +64,44 @@ def _lloyd_iter_dpu(
Parameters
----------
centers_old : ndarray of shape (n_clusters, n_features), dtype=floating
centers_old_int : ndarray of shape (n_clusters, n_features), dtype=int
Centers before previous iteration, placeholder for the centers after
previous iteration.
centers_new : ndarray of shape (n_clusters, n_features), dtype=floating
centers_new_int : ndarray of shape (n_clusters, n_features), dtype=int
Centers after previous iteration, placeholder for the new centers
computed during this iteration.
weight_in_clusters : ndarray of shape (n_clusters,), dtype=floating
Placeholder for the sums of the weights of every observation assigned
to each center.
centers_sum_int : ndarray of shape (n_clusters, n_features), dtype=int
Placeholder for the sums of the points assigned to each center.
centers_sum_int_per_dpu : ndarray of shape (n_dpu, n_clusters, n_features), \
dtype=int
Placeholder for the sums of the points assigned to each center on each DPU.
points_in_clusters : ndarray of shape (n_clusters,), dtype=int
Placeholder for the number of points assigned to each center.
points_in_clusters_per_dpu : ndarray of shape (n_dpu, n_clusters_round), \
dtype=int
Placeholder for the number of points assigned to each center on each DPU.
X : {ndarray} of shape (n_samples, n_features)
The observations to cluster.
sample_weight : ndarray of shape (n_samples,), dtype={np.float32, np.float64}
The weights for each observation in X.
x_squared_norms : ndarray of shape (n_samples,), dtype={np.float32, np.float64}
Precomputed x_squared_norms.
n_threads : int
The number of OpenMP threads to use for the computation. Parallelism is
sample-wise on the main cython loop which assigns each sample to its
closest center.
verbose : bool
Verbosity mode.
Returns
-------
Expand All @@ -97,7 +121,7 @@ def _lloyd_iter_dpu(
n_clusters = points_in_clusters.shape[0]
points_in_clusters[:] = points_in_clusters_per_dpu.sum(axis=0)[:n_clusters]

centers_sum_int = centers_sum_int_per_dpu.sum(axis=0)
centers_sum_int[:] = centers_sum_int_per_dpu.sum(axis=0)

reallocate_timer = 0
if any(points_in_clusters == 0):
Expand Down Expand Up @@ -136,7 +160,6 @@ def _lloyd_iter_dpu(
update_centers=False,
)

# weight_in_clusters = points_in_clusters.astype(float)
weight_in_clusters[:] = points_in_clusters
_relocate_empty_clusters_dense(
X,
Expand Down Expand Up @@ -177,7 +200,9 @@ def _kmeans_single_lloyd_dpu(
tol=1e-4,
n_threads=1,
):
"""Perform a single run of k-means lloyd on DPU, assumes preparation completed prior.
"""Perform a single run of k-means lloyd on DPU.
Assumes preparation completed prior.
Parameters
----------
Expand Down Expand Up @@ -239,7 +264,6 @@ def _kmeans_single_lloyd_dpu(
# Buffers to avoid new allocations at each iteration.
centers = centers_init
centers_int = np.empty_like(centers, dtype=dtype)
# centers_new = np.empty_like(centers, dtype=np.float32)
centers_new_int = np.empty_like(centers, dtype=dtype)
centers_sum_int = np.empty_like(centers, dtype=np.int64)
centers_sum_int_per_dpu = np.empty(
Expand All @@ -252,13 +276,11 @@ def _kmeans_single_lloyd_dpu(
dtype=np.int32,
)

# points_in_clusters_per_dpu = np.empty((n_dpu, n_clusters_round), dtype=np.int32)
# partial_sums = np.empty((n_clusters, n_dpu, n_features), dtype=np.int64)

if sp.issparse(X):
raise ValueError("Sparse matrix not supported")
else:
lloyd_iter = _lloyd_iter_dpu
msg = "Sparse matrix not supported"
raise NotImplementedError(msg)

lloyd_iter = _lloyd_iter_dpu

# quantize the centroids
centers_int[:] = _dimm.ld.transform(centers)
Expand All @@ -281,11 +303,9 @@ def _kmeans_single_lloyd_dpu(
verbose,
)

# if verbose:
# _, inertia = _labels_inertia_threadpool_limit(
# X, sample_weight, x_squared_norms, centers_new, n_threads
# )
# print(f"Iteration {i}, inertia {inertia}.")
if verbose:
inertia = compute_inertia(centers_int) / scale_factor**2
print(f"Iteration {i}, inertia {inertia}.")

centers_int, centers_new_int = centers_new_int, centers_int

Expand All @@ -301,13 +321,6 @@ def _kmeans_single_lloyd_dpu(
# convert the centroids back to float
centers[:] = _dimm.ld.inverse_transform(centers_int)

# host side E step of the algorithm
# tic = time.perf_counter()
# labels, inertia = _labels_inertia_threadpool_limit(
# X, sample_weight, x_squared_norms, centers, n_threads
# )
# toc = time.perf_counter()

tic = time.perf_counter()
inertia = compute_inertia(centers_int) / scale_factor**2
toc = time.perf_counter()
Expand Down Expand Up @@ -374,9 +387,18 @@ class KMeans(KMeansCPU):
copy_x is False. If the original data is sparse, but not in CSR format,
a copy will be made even if copy_x is False.
n_dpu : int, default=0
Number of DPUs to use for the computation. If 0, all available DPUs.
reload_data : bool, default=True
When True, data sent to the DPUs is hashed and compared to the previous
data. If the data is the same, it is not retransferred to the DPUs.
When False, the hash is skipped and the data is reused. The correct
data and DPU kernel must be already loaded.
Examples
--------
>>> import numpy as np
>>> from dpu_kmeans import KMeans
>>> X = np.array([[1, 2], [1, 4], [1, 0], [10, 2], [10, 4], [10, 0]])
Expand All @@ -385,6 +407,7 @@ class KMeans(KMeansCPU):
>>> print(kmeans.cluster_centers_)
[[ 0.9998627 2. ]
[10.000137 2. ]]
"""

def __init__(
Expand Down Expand Up @@ -421,6 +444,7 @@ def fit(self, X, y=None, sample_weight=None):
-------
self : KMeans
Fitted estimator.
"""
tic = time.perf_counter()

Expand Down Expand Up @@ -453,7 +477,8 @@ def fit(self, X, y=None, sample_weight=None):
if hasattr(init, "__array__"):
init -= X_mean
else:
raise NotImplementedError("Sparse initialization is not supported.")
msg = "Sparse matrix not supported"
raise NotImplementedError(msg)

# precompute squared norms of data points
x_squared_norms = row_norms(X, squared=True)
Expand Down Expand Up @@ -555,22 +580,3 @@ def fit(self, X, y=None, sample_weight=None):
self.pim_cpu_time_ = best_pim_cpu_time
self.train_time_ = train_time
return self

# def _kmeans(self):
# log_iterations = np.require(
# np.zeros(1, dtype=np.int32), requirements=["A", "C"]
# )
# log_time = np.require(np.zeros(1, dtype=np.float64), requirements=["A", "C"])

# clusters = _dimm.ctr.kmeans(
# self.n_clusters,
# self.n_clusters,
# False,
# self.verbose,
# self.n_init,
# self.max_iter,
# log_iterations,
# log_time,
# )

# return clusters, log_iterations[0], log_time[0]

0 comments on commit 028c235

Please sign in to comment.