Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: revise point cloud plumbing and add unit tests #205

Merged
merged 14 commits into from
Feb 6, 2023
3 changes: 1 addition & 2 deletions src/phoenix/pointcloud/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
from .pointcloud import DriftPointCloud
from .projectors import UMAPProjector

27 changes: 27 additions & 0 deletions src/phoenix/pointcloud/clustering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from dataclasses import asdict, dataclass

import numpy as np
import numpy.typing as npt
from hdbscan import HDBSCAN

DEFAULT_MIN_CLUSTER_SIZE = 20
DEFAULT_MIN_SAMPLES = 1


@dataclass(frozen=True)
class Parameters:
min_cluster_size: int = DEFAULT_MIN_CLUSTER_SIZE
min_samples: float = DEFAULT_MIN_SAMPLES
mikeldking marked this conversation as resolved.
Show resolved Hide resolved


@dataclass(frozen=True)
class Hdbscan:
mikeldking marked this conversation as resolved.
Show resolved Hide resolved
parameters: Parameters

def find_clusters(self, arr: npt.NDArray[np.float64]) -> list[set[int]]:
RogerHYang marked this conversation as resolved.
Show resolved Hide resolved
RogerHYang marked this conversation as resolved.
Show resolved Hide resolved
cluster_ids: npt.NDArray[np.int_] = HDBSCAN(**asdict(self.parameters)).fit_predict(arr)
ans: list[set[int]] = [set() for _ in range(np.max(cluster_ids) + 1)]
RogerHYang marked this conversation as resolved.
Show resolved Hide resolved
for i, cluster_id in enumerate(cluster_ids):
if cluster_id > -1:
ans[cluster_id].add(i)
RogerHYang marked this conversation as resolved.
Show resolved Hide resolved
return ans
117 changes: 24 additions & 93 deletions src/phoenix/pointcloud/pointcloud.py
Original file line number Diff line number Diff line change
@@ -1,103 +1,34 @@
import json
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List
from typing import Hashable, Mapping, Protocol, Set, TypeVar

MAX_UMAP_POINTS = 500
import numpy as np
import numpy.typing as npt

Identifier = TypeVar("Identifier", bound=Hashable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need for this generic? trying to understand what you mean by an identifier here. A doc-string could help here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mostly a placeholder. I see that we have used uuid (which is a str) as an identifier for the prediction_id, so it could be just a str in the dict key, but it's a dynamic language, so it could work if the prediction_id is a number (e.g. row number). Making it generic here is just to keep the typing flexible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it - I think we should probably consider not over abstracting but this is neat.


class Coordinates(ABC):
@abstractmethod
def get_coordinates(self) -> List[float]:
pass

class DimensionalityReducer(Protocol):
RogerHYang marked this conversation as resolved.
Show resolved Hide resolved
def project(self, arr: npt.NDArray[np.float64], n_components: int) -> npt.NDArray[np.float64]:
...

@dataclass
class Coordinates2D(Coordinates):
x: float
y: float

def get_coordinates(self) -> List[float]:
return [float(self.x), float(self.y)]


@dataclass
class Coordinates3D(Coordinates):
x: float
y: float
z: float

def get_coordinates(self) -> List[float]:
return [float(self.x), float(self.y), float(self.z)]


@dataclass(frozen=True)
class InferenceAttributes:
prediction_label: str
# prediction_score: float,
actual_label: str
# actual_score: float,
raw_text_data: str
# link_to_data: str,


@dataclass(frozen=True)
class Point:
id: int
coordinates: Coordinates
inference_attributes: InferenceAttributes


@dataclass(frozen=True)
class Cluster:
id: int
point_ids: List[int]
purity_score: float
class ClustersFinder(Protocol):
def find_clusters(self, arr: npt.NDArray[np.float64]) -> list[Set[int]]:
...


@dataclass(frozen=True)
class DriftPointCloud:
primary_points: List[Point]
reference_points: List[Point]
clusters: List[Cluster]

# For demo - passing to umap widget
def to_json(self) -> str:
primary_pts_json = self._points_to_json(self.primary_points)
reference_pts_json = self._points_to_json(self.reference_points)
clusters_json = self._clusters_to_json(self.clusters)

data = {
"primaryData": primary_pts_json,
"referenceData": reference_pts_json,
"clusters": clusters_json,
}
return json.dumps(data)

@staticmethod
def _points_to_json(points: List[Point]) -> List[Dict[str, Any]]:
pts_json = []
for point in points:
point_json_obj = {
"position": point.coordinates.get_coordinates(),
"metaData": {
"id": int(point.id),
"rawTextData": [point.inference_attributes.raw_text_data],
"predictionLabel": point.inference_attributes.prediction_label,
"actualLabel": point.inference_attributes.actual_label,
},
}
pts_json.append(point_json_obj)
return pts_json

@staticmethod
def _clusters_to_json(clusters: List[Cluster]) -> List[Dict[str, Any]]:
clusters_json = []
for cluster in clusters:
cluster_json_obj = {
"id": int(cluster.id),
"pointIds": cluster.point_ids,
"purityScore": cluster.purity_score,
}
clusters_json.append(cluster_json_obj)
return clusters_json
class PointCloud:
dimensionalityReducer: DimensionalityReducer
clustersFinder: ClustersFinder

def generate(
self,
vectors: Mapping[Identifier, npt.NDArray[np.float64]],
n_components: int,
) -> tuple[dict[Identifier, npt.NDArray[np.float64]], list[Set[Identifier]]]:
ids, vs = zip(*vectors.items())
RogerHYang marked this conversation as resolved.
Show resolved Hide resolved
projections = self.dimensionalityReducer.project(np.stack(vs), n_components=n_components)
return dict(zip(ids, projections)), [
set(ids[i] for i in c) for c in self.clustersFinder.find_clusters(projections)
RogerHYang marked this conversation as resolved.
Show resolved Hide resolved
]
197 changes: 16 additions & 181 deletions src/phoenix/pointcloud/projectors.py
Original file line number Diff line number Diff line change
@@ -1,193 +1,28 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Sequence, Tuple, Type, Union, cast
from dataclasses import asdict, dataclass
from typing import cast

import numpy as np
from hdbscan import HDBSCAN
from numpy.typing import ArrayLike
import numpy.typing as npt
from umap import UMAP

from phoenix.datasets import Dataset

from .pointcloud import (
Cluster,
Coordinates,
Coordinates2D,
Coordinates3D,
InferenceAttributes,
Point,
)

MAX_UMAP_POINTS = 500
DEFAULT_MIN_CLUSTER_SIZE = 20
DEFAULT_MIN_SAMPLES = 1
DEFAULT_N_NEIGHBORS = 15
DEFAULT_MIN_DIST = 0.1
RogerHYang marked this conversation as resolved.
Show resolved Hide resolved


@dataclass(frozen=True)
class UMAPProjector:
hyperparameters: Dict[str, Union[int, float, str]]

def __post__init__(self) -> None:
if "n_neighbors" in self.hyperparameters and (
not isinstance(self.hyperparameters["n_neighbors"], int)
or self.hyperparameters["n_neighbors"] not in (2, 3)
):
raise ValueError(
"Projection dimensionality not supported. Must be integer value: 2 or 3 (2D/3D)."
)

@staticmethod
def _move_to_center(projections: np.ndarray) -> np.ndarray: # type: ignore
# Calculate Center of Mass
cm = np.sum(projections, axis=0) / projections.shape[0]
return projections - cm # type: ignore

@staticmethod
def _build_points(
primary_projections: np.ndarray, # type: ignore
reference_projections: np.ndarray, # type: ignore
primary_dataset: Dataset,
reference_dataset: Dataset,
embedding_feature: str,
) -> Tuple[List[Point], List[Point]]:
primary_points: List[Point] = []
reference_points: List[Point] = []

# Number of dimensions in projections: 2D or 3D
N = primary_projections.shape[-1]
c: Type[Coordinates]
if N == 2:
c = Coordinates2D
elif N == 3:
c = Coordinates3D
else:
raise ValueError("Projections should be done to 2D or 3D.")

# Build primary points
prediction_label_col = primary_dataset.get_prediction_label_column()
actual_label_col = primary_dataset.get_actual_label_column()
raw_text_data_col = primary_dataset.get_embedding_raw_data_column(embedding_feature)
for i in range(len(primary_projections)):
inference_attributes = InferenceAttributes(
prediction_label=prediction_label_col[i],
actual_label=actual_label_col[i],
raw_text_data=raw_text_data_col[i],
)
primary_points.append(
Point(
id=i,
coordinates=c(*[primary_projections[i][k] for k in range(N)]),
inference_attributes=inference_attributes,
)
)
# Build reference points
prediction_label_col = reference_dataset.get_prediction_label_column()
actual_label_col = reference_dataset.get_actual_label_column()
raw_text_data_col = reference_dataset.get_embedding_raw_data_column(embedding_feature)
for i in range(len(reference_projections)):
inference_attributes = InferenceAttributes(
prediction_label=prediction_label_col[i],
actual_label=actual_label_col[i],
raw_text_data=raw_text_data_col[i],
)
reference_points.append(
Point(
id=i + len(primary_projections),
coordinates=c(*[reference_projections[i][k] for k in range(N)]),
inference_attributes=inference_attributes,
)
)
return primary_points, reference_points

@staticmethod
def _build_clusters(
cluster_ids: np.ndarray, # type: ignore
primary_points: List[Point],
reference_points: List[Point],
) -> List[Cluster]:
unique_cluster_ids = np.unique(cluster_ids)
# map cluster_id to point_ids inside the cluster
map_cluster_id_point_ids: Dict[int, List[int]] = {
id: [] for id in unique_cluster_ids if id != -1
}
# map cluster_id to the count of primary points in the cluster
map_cluster_id_primary_count: Dict[int, int] = {
id: 0 for id in unique_cluster_ids if id != -1
}
# map cluster_id to the count of reference points in the cluster
map_cluster_id_reference_count: Dict[int, int] = {
id: 0 for id in unique_cluster_ids if id != -1
}

primary_cluster_ids = cluster_ids[: len(primary_points)]
reference_cluster_ids = cluster_ids[len(primary_points) :]
# Check that there are as many coordinates as cluster IDs
# This is a defensive test, since this should be guaranteed by UMAP & HDBSCAN libraries
if len(reference_cluster_ids) != len(reference_points):
raise ValueError(
f"There should be equal number of point coordinates as cluster IDs. "
f"len(reference_cluster_ids) = {len(reference_cluster_ids)}. "
f"len(reference_points) = {len(reference_points)}."
)
class Parameters:
n_neighbors: int = DEFAULT_N_NEIGHBORS
min_dist: float = DEFAULT_MIN_DIST

for i, cluster_id in enumerate(primary_cluster_ids):
if cluster_id == -1: # Exclude "unknown" cluster
continue
map_cluster_id_point_ids[cluster_id].append(primary_points[i].id)
map_cluster_id_primary_count[cluster_id] += 1
for i, cluster_id in enumerate(reference_cluster_ids):

if cluster_id == -1: # Exclude "unknown" cluster
continue
map_cluster_id_point_ids[cluster_id].append(reference_points[i].id)
map_cluster_id_reference_count[cluster_id] += 1

clusters: List[Cluster] = []
for cluster_id, point_ids in map_cluster_id_point_ids.items():
primary_count = map_cluster_id_primary_count[cluster_id]
reference_count = map_cluster_id_reference_count[cluster_id]
purity_score = (reference_count - primary_count) / (reference_count + primary_count)
clusters.append(Cluster(id=cluster_id, point_ids=point_ids, purity_score=purity_score))
return clusters

def project(
self, primary_dataset: Dataset, reference_dataset: Dataset, embedding_feature: str
) -> Tuple[List[Point], List[Point], List[Cluster]]:
# Sample down our datasets to max 2500 rows for UMAP performance
points_per_dataset = MAX_UMAP_POINTS // 2
sampled_primary_dataset = primary_dataset.sample(num=points_per_dataset)
sampled_reference_dataset = reference_dataset.sample(num=MAX_UMAP_POINTS // 2)

primary_embeddings = np.stack(
cast(
Sequence[ArrayLike],
sampled_primary_dataset.get_embedding_vector_column(embedding_feature),
)
)
reference_embeddings = np.stack(
cast(
Sequence[ArrayLike],
sampled_reference_dataset.get_embedding_vector_column(embedding_feature),
)
)
@dataclass(frozen=True)
class Umap:
parameters: Parameters

embeddings = np.concatenate([primary_embeddings, reference_embeddings])
umap = UMAP(**self.hyperparameters)
projections: np.ndarray = umap.fit_transform(embeddings) # type: ignore
projections = self._move_to_center(projections)
# Find clusters
hdbscan = HDBSCAN(
min_cluster_size=DEFAULT_MIN_CLUSTER_SIZE, min_samples=DEFAULT_MIN_SAMPLES
)
cluster_ids: np.ndarray[Any, Any] = hdbscan.fit_predict(projections)
def _center(self, arr: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
return cast(npt.NDArray[np.float64], arr - np.sum(arr, axis=0) / arr.shape[0])

primary_points, reference_points = self._build_points(
projections[:points_per_dataset],
projections[points_per_dataset:],
sampled_primary_dataset,
sampled_reference_dataset,
embedding_feature,
def project(self, data: npt.NDArray[np.float64], n_components: int) -> npt.NDArray[np.float64]:
return self._center(
UMAP(**asdict(self.parameters), n_components=n_components).fit_transform(data)
)

clusters = self._build_clusters(cluster_ids, primary_points, reference_points)

return primary_points, reference_points, clusters
25 changes: 25 additions & 0 deletions src/phoenix/pointcloud/umap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dataclasses import dataclass

from .clustering import Hdbscan
from .clustering import Parameters as HdbscanParamaters
from .pointcloud import PointCloud
from .projectors import Parameters as UmapParamaters
from .projectors import Umap

MAX_UMAP_POINTS = 500
DEFAULT_MIN_CLUSTER_SIZE = 20
DEFAULT_MIN_SAMPLES = 1


@dataclass(frozen=True)
class UmapPointCloud(PointCloud):
umap_param: UmapParamaters
hdbscan_param: HdbscanParamaters

def __post_init__(self) -> None:
super().__init__(
dimensionalityReducer=Umap(self.umap_param), clustersFinder=Hdbscan(self.hdbscan_param)
)

# TODO: extract embedding tensor and metadata from dataset, filtered and sampled
# TODO: return points to the caller, depending on the gql interface
Loading