Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: revise point cloud plumbing and add unit tests #205

Merged
merged 14 commits into from
Feb 6, 2023
Merged
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ disallow_untyped_defs = true
disallow_incomplete_defs = true
strict = true
exclude = [
"dist/",
"sdist/",
"tests/",
]

Expand Down
3 changes: 1 addition & 2 deletions src/phoenix/pointcloud/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
from .pointcloud import DriftPointCloud
from .projectors import UMAPProjector

25 changes: 25 additions & 0 deletions src/phoenix/pointcloud/clustering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dataclasses import asdict, dataclass
from typing import List, Set

import numpy as np
import numpy.typing as npt
from hdbscan import HDBSCAN
from typing_extensions import TypeAlias

RowIndex: TypeAlias = int
Cluster: TypeAlias = Set[RowIndex]
Matrix: TypeAlias = npt.NDArray[np.float64]


@dataclass(frozen=True)
class Hdbscan:
mikeldking marked this conversation as resolved.
Show resolved Hide resolved
min_cluster_size: int = 20
min_samples: float = 1

def find_clusters(self, mat: Matrix) -> List[Cluster]:
cluster_ids: npt.NDArray[np.int_] = HDBSCAN(**asdict(self)).fit_predict(mat)
ans: List[Cluster] = [set() for _ in range(np.max(cluster_ids) + 1)]
for row_idx, cluster_id in enumerate(cluster_ids):
if cluster_id > -1:
ans[cluster_id].add(row_idx)
return ans
150 changes: 58 additions & 92 deletions src/phoenix/pointcloud/pointcloud.py
Original file line number Diff line number Diff line change
@@ -1,103 +1,69 @@
import json
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List
from typing import Dict, Hashable, List, Mapping, Protocol, Set, Tuple, TypeVar

MAX_UMAP_POINTS = 500
import numpy as np
import numpy.typing as npt
from typing_extensions import TypeAlias

Identifier = TypeVar("Identifier", bound=Hashable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need for this generic? trying to understand what you mean by an identifier here. A doc-string could help here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mostly a placeholder. I see that we have used uuid (which is a str) as an identifier for the prediction_id, so it could be just a str in the dict key, but it's a dynamic language, so it could work if the prediction_id is a number (e.g. row number). Making it generic here is just to keep the typing flexible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it - I think we should probably consider not over abstracting but this is neat.

Vector: TypeAlias = npt.NDArray[np.float64]
Matrix: TypeAlias = npt.NDArray[np.float64]
ClusterId: TypeAlias = int
RowIndex: TypeAlias = int
Cluster: TypeAlias = Set[RowIndex]

class Coordinates(ABC):
@abstractmethod
def get_coordinates(self) -> List[float]:
pass

class DimensionalityReducer(Protocol):
RogerHYang marked this conversation as resolved.
Show resolved Hide resolved
def project(self, mat: Matrix, n_components: int) -> Matrix:
...

@dataclass
class Coordinates2D(Coordinates):
x: float
y: float

def get_coordinates(self) -> List[float]:
return [float(self.x), float(self.y)]


@dataclass
class Coordinates3D(Coordinates):
x: float
y: float
z: float

def get_coordinates(self) -> List[float]:
return [float(self.x), float(self.y), float(self.z)]


@dataclass(frozen=True)
class InferenceAttributes:
prediction_label: str
# prediction_score: float,
actual_label: str
# actual_score: float,
raw_text_data: str
# link_to_data: str,


@dataclass(frozen=True)
class Point:
id: int
coordinates: Coordinates
inference_attributes: InferenceAttributes


@dataclass(frozen=True)
class Cluster:
id: int
point_ids: List[int]
purity_score: float
class ClustersFinder(Protocol):
def find_clusters(self, mat: Matrix) -> List[Cluster]:
...


@dataclass(frozen=True)
class DriftPointCloud:
primary_points: List[Point]
reference_points: List[Point]
clusters: List[Cluster]

# For demo - passing to umap widget
def to_json(self) -> str:
primary_pts_json = self._points_to_json(self.primary_points)
reference_pts_json = self._points_to_json(self.reference_points)
clusters_json = self._clusters_to_json(self.clusters)

data = {
"primaryData": primary_pts_json,
"referenceData": reference_pts_json,
"clusters": clusters_json,
class PointCloud:
dimensionalityReducer: DimensionalityReducer
clustersFinder: ClustersFinder

def generate(
self,
data: Mapping[Identifier, Vector],
n_components: int = 3,
) -> Tuple[Dict[Identifier, Vector], Dict[Identifier, ClusterId]]:
"""
Given a set of vectors, projects them onto lower dimensions, and
finds clusters among the projections.

Parameters
----------
data : mapping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data : mapping
data : Mapping[Identifier, Vector]

Does this not corralate with the type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. i was looking for examples from other code bases but haven't found any, so kind of just gave up. i wasn't sure if these types variables belong in a docstring since they're local constructs relevant only to the type checker

Mapping of input vectors by their identifiers.

n_components: int, default=3
Number of dimensions in the projected space.

Returns
-------
projections : dictionary
Projected vectors in the low demension space, mapped back to the
input vectors' identifiers.

cluster_membership: dictinary
Cluster membership by way of cluster_ids in the form of integers
0,1,2,... mapped back to the input vectors' identifiers. Note that
some vectors may not belong to any cluster and are excluded here.

"""
identifiers, vectors = zip(*data.items())
projections = self.dimensionalityReducer.project(
np.stack(vectors), n_components=n_components
)
clusters = self.clustersFinder.find_clusters(projections)
return dict(zip(identifiers, projections)), {
identifiers[row_index]: cluster_id
for cluster_id, cluster in enumerate(clusters)
for row_index in cluster
}
return json.dumps(data)

@staticmethod
def _points_to_json(points: List[Point]) -> List[Dict[str, Any]]:
pts_json = []
for point in points:
point_json_obj = {
"position": point.coordinates.get_coordinates(),
"metaData": {
"id": int(point.id),
"rawTextData": [point.inference_attributes.raw_text_data],
"predictionLabel": point.inference_attributes.prediction_label,
"actualLabel": point.inference_attributes.actual_label,
},
}
pts_json.append(point_json_obj)
return pts_json

@staticmethod
def _clusters_to_json(clusters: List[Cluster]) -> List[Dict[str, Any]]:
clusters_json = []
for cluster in clusters:
cluster_json_obj = {
"id": int(cluster.id),
"pointIds": cluster.point_ids,
"purityScore": cluster.purity_score,
}
clusters_json.append(cluster_json_obj)
return clusters_json
Loading