Skip to content

Commit

Permalink
feat: upload a batch of inferences
Browse files Browse the repository at this point in the history
  • Loading branch information
whoseoyster committed Jul 23, 2024
1 parent eff6bf0 commit fa3eb50
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 12 deletions.
54 changes: 54 additions & 0 deletions examples/monitoring/upload_batch_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import os

import pandas as pd
from openlayer import Openlayer
from openlayer.lib import data
from openlayer.types.inference_pipelines import data_stream_params

os.environ["OPENLAYER_API_KEY"] = "YOUR_API_KEY"
pipeline_id = "YOUR_INFERENCE_PIPELINE_ID"

df = pd.DataFrame(
{
"CreditScore": [600],
"Geography": ["France"],
"Gender": ["Male"],
"Age": [40],
"Tenure": [5],
"Balance": [100000],
"NumOfProducts": [1],
"HasCrCard": [1],
"IsActiveMember": [1],
"EstimatedSalary": [50000],
"AggregateRate": [0.5],
"Year": [2020],
"Prediction": [0],
}
)

config = data_stream_params.ConfigTabularClassificationData(
categorical_feature_names=["Gender", "Geography"],
class_names=["Retained", "Exited"],
feature_names=[
"CreditScore",
"Geography",
"Gender",
"Age",
"Tenure",
"Balance",
"NumOfProducts",
"HasCrCard",
"IsActiveMember",
"EstimatedSalary",
"AggregateRate",
"Year",
],
predictions_column_name="Prediction",
)

data.upload_batch_inferences(
client=Openlayer(),
inference_pipeline_id=pipeline_id,
dataset_df=df,
config=config,
)
1 change: 0 additions & 1 deletion examples/monitoring/upload_reference_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,4 @@
inference_pipeline_id=pipeline_id,
dataset_df=df,
config=config,
storage_type=data.StorageType.FS,
)
3 changes: 2 additions & 1 deletion src/openlayer/lib/data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Data upload functions."""

__all__ = ["upload_reference_dataframe", "StorageType"]
__all__ = ["StorageType", "upload_reference_dataframe", "upload_batch_inferences"]

from ._upload import StorageType
from .reference_dataset import upload_reference_dataframe
from .batch_inferences import upload_batch_inferences
66 changes: 66 additions & 0 deletions src/openlayer/lib/data/batch_inferences.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Upload a batch of inferences to the Openlayer platform."""

import os
import tarfile
import tempfile
import time
from typing import Optional
import httpx

import pandas as pd

from ... import Openlayer
from ..._utils import maybe_transform
from ...types.inference_pipelines import data_stream_params
from .. import utils
from . import StorageType, _upload


def upload_batch_inferences(
client: Openlayer,
inference_pipeline_id: str,
dataset_df: pd.DataFrame,
config: data_stream_params.Config,
storage_type: Optional[StorageType] = None,
) -> None:
"""Uploads a batch of inferences to the Openlayer platform."""
uploader = _upload.Uploader(client, storage_type)
object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.tar.gz"

# Fetch presigned url
presigned_url_response = client.storage.presigned_url.create(
object_name=object_name,
)

# Write dataset and config to temp directory
with tempfile.TemporaryDirectory() as tmp_dir:
temp_file_path = f"{tmp_dir}/dataset.csv"
dataset_df.to_csv(temp_file_path, index=False)

# Copy relevant files to tmp dir
config["label"] = "production"
utils.write_yaml(
maybe_transform(config, data_stream_params.Config),
f"{tmp_dir}/dataset_config.yaml",
)

tar_file_path = os.path.join(tmp_dir, object_name)
with tarfile.open(tar_file_path, mode="w:gz") as tar:
tar.add(tmp_dir, arcname=os.path.basename("monitoring_data"))

# Upload to storage
uploader.upload(
file_path=tar_file_path,
object_name=object_name,
presigned_url_response=presigned_url_response,
)

# Notify the backend
client.post(
f"/inference-pipelines/{inference_pipeline_id}/data",
cast_to=httpx.Response,
body={
"storageUri": presigned_url_response.storage_uri,
"performDataMerge": False,
},
)
13 changes: 3 additions & 10 deletions src/openlayer/lib/data/reference_dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Upload reference datasets to the Openlayer platform."""

import os
import shutil
import tarfile
import tempfile
import time
Expand All @@ -23,9 +22,7 @@ def upload_reference_dataframe(
config: data_stream_params.Config,
storage_type: Optional[StorageType] = None,
) -> None:
"""Upload a reference dataset to the Openlayer platform and update the
inference pipeline with the new reference dataset.
"""
"""Uploads a reference dataset to the Openlayer platform."""
uploader = _upload.Uploader(client, storage_type)
object_name = f"reference_dataset_{time.time()}_{inference_pipeline_id}.tar.gz"

Expand All @@ -40,14 +37,11 @@ def upload_reference_dataframe(
dataset_df.to_csv(temp_file_path, index=False)

# Copy relevant files to tmp dir
folder_path = os.path.join(tmp_dir, "reference")
os.mkdir(folder_path)
config["label"] = "reference"
utils.write_yaml(
maybe_transform(config, data_stream_params.Config),
f"{folder_path}/dataset_config.yaml",
f"{tmp_dir}/dataset_config.yaml",
)
shutil.copy(temp_file_path, folder_path)

tar_file_path = os.path.join(tmp_dir, object_name)
with tarfile.open(tar_file_path, mode="w:gz") as tar:
Expand All @@ -61,8 +55,7 @@ def upload_reference_dataframe(
)

# Notify the backend
response = client.inference_pipelines.update(
client.inference_pipelines.update(
inference_pipeline_id=inference_pipeline_id,
reference_dataset_uri=presigned_url_response.storage_uri,
)
print(response)

0 comments on commit fa3eb50

Please sign in to comment.