Skip to content

Commit

Permalink
Fix DV Pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
brnaguiar committed Sep 19, 2023
1 parent 0ecf06e commit 7c78bd8
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
35 changes: 22 additions & 13 deletions src/collaborative/nodes/data_validation_nodes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from mlflow.utils.logging_utils import sys
from pyspark.sql.functions import max, min

from conf import catalog, paths
from conf import paths


class DataValidation:
Expand All @@ -11,16 +12,24 @@ def __init__(self, session, source, storage) -> None:

def validate_ratings(self, dataset: str, file_suffix: str):

self.processed_data_path = paths.get_path(
paths.DATA_01EXTERNAL,
self.source,
catalog.DatasetType.TRAIN,
dataset,
suffix=file_suffix,
storage=self.storage,
as_string=True,
)
data = self.session.read.csv(self.processed_data_path)
try:
self.processed_data_path = paths.get_path(
paths.DATA_01EXTERNAL,
self.source,
dataset,
suffix=file_suffix,
storage=self.storage,
as_string=True,
)
data = self.session.read.csv(
self.processed_data_path,
sep=",",
header=True,
inferSchema=True,
)
maxv, minv = data.agg(max("rating"), min("rating")).first().asDict().values()
assert maxv <= 5 and minv >= 0, "Ratings are NOT in range [0, 5]"

maxv, minv = data.agg(max("rating"), min("rating")).first().asDict().values()
assert maxv <= 5 and minv >= 0, "Ratings are NOT in range [0, 5]"
except Exception as e:
print(f"Exception {e} occurred, exiting...")
sys.exit(1)
4 changes: 2 additions & 2 deletions src/collaborative/pipelines/data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
def run(
source: str = catalog.Sources.MOVIELENS,
dataset: str = catalog.Datasets.RATINGS,
file_suffix: str = catalog.FileFormat.PARQUET,
storage=globals.Storage.S3,
file_suffix: str = catalog.FileFormat.CSV,
storage=globals.Storage.DOCKER,
):
session = (
SparkSession.builder.appName("collab-dv")
Expand Down

0 comments on commit 7c78bd8

Please sign in to comment.