Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add statistics for multiclassification + refactoring and improvements #35

Merged
merged 10 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,5 @@ ui/*.sw?
## K3S SPECIFICS #
#####################

docker/k3s_data/kubeconfig/
docker/k3s_data/kubeconfig/
docker/k3s_data/images/
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ services:
- ./docker/k3s_data/manifests/spark-init.yaml:/var/lib/rancher/k3s/server/manifests/spark-init.yaml
# Mount entrypoint
- ./docker/k3s_data/init/entrypoint.sh:/opt/entrypoint/entrypoint.sh
# Preload docker images
- ./docker/k3s_data/images:/var/lib/rancher/k3s/agent/images
expose:
- "6443" # Kubernetes API Server
- "80" # Ingress controller port 80
Expand Down
18 changes: 16 additions & 2 deletions spark/Dockerfile
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will use pyproject and poetry lock from spark project instead of manually putting dependencies as it was before

Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
FROM python:3.10.14-slim AS build

WORKDIR /build
COPY poetry.lock pyproject.toml ./
RUN pip install --no-cache-dir poetry==1.8.3 && \
poetry export -f requirements.txt -o requirements.txt


FROM spark:3.5.1-scala2.12-java17-python3-ubuntu

# Requirements from previous step
COPY --from=build /build/requirements.txt .

# Adding needed jar
RUN curl -o /opt/spark/jars/bcprov-jdk15on-1.70.jar https://repo1.maven.org/maven2/org/bouncycastle/bcprov-jdk15on/1.70/bcprov-jdk15on-1.70.jar && \
curl -o /opt/spark/jars/bcpkix-jdk15on-1.70.jar https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-jdk15on/1.70/bcpkix-jdk15on-1.70.jar && \
Expand All @@ -9,8 +20,11 @@ RUN curl -o /opt/spark/jars/bcprov-jdk15on-1.70.jar https://repo1.maven.org/mave

USER root

# Adding needed python libs that will be used by pyspark jobs
RUN pip install numpy pydantic pandas psycopg2-binary orjson scipy
RUN apt-get update && \
apt-get install -y --no-install-recommends gcc libpq-dev python3-dev

# Install requirements coming from pyproject
RUN pip install --no-cache-dir -r requirements.txt

USER spark

Expand Down
28 changes: 24 additions & 4 deletions spark/README.md
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to use the local spark image

Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,35 @@

This folder contains files to create the Spark docker image that will be used to calculate metrics.

The custom image is created using the `Dockerfile` and it is a base Spark image where are installed some additional dependencies and loaded with the custom jobs located in the `jobs` folder.

To create an additional job, add a `.py` file in `jobs` folder (take as an example `reference_job.py` for the boilerplate)
The custom image is created using the `Dockerfile` and it is a base Spark image where are installed additional dependencies and loaded with custom jobs located in the `jobs` folder.

### Development

This is a poetry project that can be used to develop and test the jobs before putting them in the docker image.

NB: if additional python dependencies are needed, pleas add them in `Dockerfile` accordingly, and not only in the `pyproject.toml`
To create an additional job, add a `.py` file in `jobs` folder (take as an example `reference_job.py` for the boilerplate) and write unit tests

### End-to-end testing

Before publishing the image is possible to test the platform with new development or improvement done in the spark image.

From this project folder, run

```bash
docker build . -t radicalbit-spark-py:develop && docker save radicalbit-spark-py:develop -o ../docker/k3s_data/images/radicalbit-spark-py:develop.tar
```

This will build and save the new image in `/docker/k3s_data/images/`.

To use this image in the Radicalbit Platform, the docker compose must be modified adding the following environment variable in the `api` container:

```
SPARK_IMAGE: "radicalbit-spark-py:develop"
```

When the k3s cluster inside the docker compose will start, it will automatically load the saved image that can be used to test the code during the development.

NB: when a new image is built and saved, the k3s container must be restarted

#### Formatting and linting

Expand Down
25 changes: 10 additions & 15 deletions spark/jobs/current_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import orjson
from pyspark.sql.types import StructType, StructField, StringType

from jobs.metrics.statistics import calculate_statistics_current
from jobs.models.current_dataset import CurrentDataset
from jobs.models.reference_dataset import ReferenceDataset
from utils.current import CurrentMetricsService
from utils.models import JobStatus, ModelOut
from utils.spark import apply_schema_to_dataframe
from utils.db import update_job_status, write_to_db

from pyspark.sql import SparkSession
Expand Down Expand Up @@ -42,22 +44,15 @@ def main(
"fs.s3a.connection.ssl.enabled", "false"
)

current_schema = model.to_current_spark_schema()
current_dataset = spark_session.read.csv(current_dataset_path, header=True)
current_dataset = apply_schema_to_dataframe(current_dataset, current_schema)
current_dataset = current_dataset.select(
*[c for c in current_schema.names if c in current_dataset.columns]
)
reference_schema = model.to_reference_spark_schema()
reference_dataset = spark_session.read.csv(reference_dataset_path, header=True)
reference_dataset = apply_schema_to_dataframe(reference_dataset, reference_schema)
reference_dataset = reference_dataset.select(
*[c for c in reference_schema.names if c in reference_dataset.columns]
)
raw_current = spark_session.read.csv(current_dataset_path, header=True)
current_dataset = CurrentDataset(model=model, raw_dataframe=raw_current)
raw_reference = spark_session.read.csv(reference_dataset_path, header=True)
reference_dataset = ReferenceDataset(model=model, raw_dataframe=raw_reference)

metrics_service = CurrentMetricsService(
spark_session, current_dataset, reference_dataset, model=model
spark_session, current_dataset.current, reference_dataset.reference, model=model
)
statistics = metrics_service.calculate_statistics()
statistics = calculate_statistics_current(current_dataset)
data_quality = metrics_service.calculate_data_quality()
model_quality = metrics_service.calculate_model_quality_with_group_by_timestamp()
drift = metrics_service.calculate_drift()
Expand Down
Empty file added spark/jobs/metrics/__init__.py
Empty file.
148 changes: 148 additions & 0 deletions spark/jobs/metrics/statistics.py
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be squashed probably in only one method in future refactoring

Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from models.current_dataset import CurrentDataset
from models.reference_dataset import ReferenceDataset
import pyspark.sql.functions as F

N_VARIABLES = "n_variables"
N_OBSERVATION = "n_observations"
MISSING_CELLS = "missing_cells"
MISSING_CELLS_PERC = "missing_cells_perc"
DUPLICATE_ROWS = "duplicate_rows"
DUPLICATE_ROWS_PERC = "duplicate_rows_perc"
NUMERIC = "numeric"
CATEGORICAL = "categorical"
DATETIME = "datetime"


# FIXME use pydantic struct like data quality
def calculate_statistics_reference(
reference_dataset: ReferenceDataset,
) -> dict[str, float]:
number_of_variables = len(reference_dataset.get_all_variables())
number_of_observations = reference_dataset.reference_count
number_of_numerical = len(reference_dataset.get_numerical_variables())
number_of_categorical = len(reference_dataset.get_categorical_variables())
number_of_datetime = len(reference_dataset.get_datetime_variables())
reference_columns = reference_dataset.reference.columns

stats = (
reference_dataset.reference.select(
[
F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c)
if t not in ("datetime", "date", "timestamp", "bool", "boolean")
else F.count(F.when(F.col(c).isNull(), c)).alias(c)
for c, t in reference_dataset.reference.dtypes
]
)
.withColumn(MISSING_CELLS, sum([F.col(c) for c in reference_columns]))
.withColumn(
MISSING_CELLS_PERC,
(F.col(MISSING_CELLS) / (number_of_variables * number_of_observations))
* 100,
)
.withColumn(
DUPLICATE_ROWS,
F.lit(
number_of_observations
- reference_dataset.reference.dropDuplicates(
[
c
for c in reference_columns
if c != reference_dataset.model.timestamp.name
]
).count()
),
)
.withColumn(
DUPLICATE_ROWS_PERC,
(F.col(DUPLICATE_ROWS) / number_of_observations) * 100,
)
.withColumn(N_VARIABLES, F.lit(number_of_variables))
.withColumn(N_OBSERVATION, F.lit(number_of_observations))
.withColumn(NUMERIC, F.lit(number_of_numerical))
.withColumn(CATEGORICAL, F.lit(number_of_categorical))
.withColumn(DATETIME, F.lit(number_of_datetime))
.select(
*[
MISSING_CELLS,
MISSING_CELLS_PERC,
DUPLICATE_ROWS,
DUPLICATE_ROWS_PERC,
N_VARIABLES,
N_OBSERVATION,
NUMERIC,
CATEGORICAL,
DATETIME,
]
)
.toPandas()
.to_dict(orient="records")[0]
)

return stats


def calculate_statistics_current(
current_dataset: CurrentDataset,
) -> dict[str, float]:
number_of_variables = len(current_dataset.get_all_variables())
number_of_observations = current_dataset.current_count
number_of_numerical = len(current_dataset.get_numerical_variables())
number_of_categorical = len(current_dataset.get_categorical_variables())
number_of_datetime = len(current_dataset.get_datetime_variables())
reference_columns = current_dataset.current.columns

stats = (
current_dataset.current.select(
[
F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c)
if t not in ("datetime", "date", "timestamp", "bool", "boolean")
else F.count(F.when(F.col(c).isNull(), c)).alias(c)
for c, t in current_dataset.current.dtypes
]
)
.withColumn(MISSING_CELLS, sum([F.col(c) for c in reference_columns]))
.withColumn(
MISSING_CELLS_PERC,
(F.col(MISSING_CELLS) / (number_of_variables * number_of_observations))
* 100,
)
.withColumn(
DUPLICATE_ROWS,
F.lit(
number_of_observations
- current_dataset.current.dropDuplicates(
[
c
for c in reference_columns
if c != current_dataset.model.timestamp.name
]
).count()
),
)
.withColumn(
DUPLICATE_ROWS_PERC,
(F.col(DUPLICATE_ROWS) / number_of_observations) * 100,
)
.withColumn(N_VARIABLES, F.lit(number_of_variables))
.withColumn(N_OBSERVATION, F.lit(number_of_observations))
.withColumn(NUMERIC, F.lit(number_of_numerical))
.withColumn(CATEGORICAL, F.lit(number_of_categorical))
.withColumn(DATETIME, F.lit(number_of_datetime))
.select(
*[
MISSING_CELLS,
MISSING_CELLS_PERC,
DUPLICATE_ROWS,
DUPLICATE_ROWS_PERC,
N_VARIABLES,
N_OBSERVATION,
NUMERIC,
CATEGORICAL,
DATETIME,
]
)
.toPandas()
.to_dict(orient="records")[0]
)

return stats
Empty file added spark/jobs/models/__init__.py
Empty file.
97 changes: 97 additions & 0 deletions spark/jobs/models/current_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from typing import List

from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType, StructField, StructType

from utils.models import ModelOut, ModelType, ColumnDefinition
from utils.spark import apply_schema_to_dataframe


class CurrentDataset:
def __init__(self, model: ModelOut, raw_dataframe: DataFrame):
current_schema = self.spark_schema(model)
current_dataset = apply_schema_to_dataframe(raw_dataframe, current_schema)

self.model = model
self.current = current_dataset.select(
*[c for c in current_schema.names if c in current_dataset.columns]
)
self.current_count = self.current.count()

# FIXME this must exclude target when we will have separate current and ground truth
@staticmethod
def spark_schema(model: ModelOut):
all_features = (
model.features + [model.target] + [model.timestamp] + model.outputs.output
)
if model.outputs.prediction_proba and model.model_type == ModelType.BINARY:
enforce_float = [
model.target.name,
model.outputs.prediction.name,
model.outputs.prediction_proba.name,
]
elif model.model_type == ModelType.BINARY:
enforce_float = [model.target.name, model.outputs.prediction.name]
else:
enforce_float = []
return StructType(
[
StructField(
name=feature.name,
dataType=model.convert_types(feature.type),
nullable=False,
)
if feature.name not in enforce_float
else StructField(
name=feature.name,
dataType=DoubleType(),
nullable=False,
)
for feature in all_features
]
)

def get_numerical_features(self) -> List[ColumnDefinition]:
return [feature for feature in self.model.features if feature.is_numerical()]

def get_categorical_features(self) -> List[ColumnDefinition]:
return [feature for feature in self.model.features if feature.is_categorical()]

# FIXME this must exclude target when we will have separate current and ground truth
def get_numerical_variables(self) -> List[ColumnDefinition]:
all_features = (
self.model.features
+ [self.model.target]
+ [self.model.timestamp]
+ self.model.outputs.output
)
return [feature for feature in all_features if feature.is_numerical()]

# FIXME this must exclude target when we will have separate current and ground truth
def get_categorical_variables(self) -> List[ColumnDefinition]:
all_features = (
self.model.features
+ [self.model.target]
+ [self.model.timestamp]
+ self.model.outputs.output
)
return [feature for feature in all_features if feature.is_categorical()]

# FIXME this must exclude target when we will have separate current and ground truth
def get_datetime_variables(self) -> List[ColumnDefinition]:
all_features = (
self.model.features
+ [self.model.target]
+ [self.model.timestamp]
+ self.model.outputs.output
)
return [feature for feature in all_features if feature.is_datetime()]

# FIXME this must exclude target when we will have separate current and ground truth
def get_all_variables(self) -> List[ColumnDefinition]:
return (
self.model.features
+ [self.model.target]
+ [self.model.timestamp]
+ self.model.outputs.output
)
Loading
Loading