From 7e7261d9c8eab2ee0f781500502483f316009a1e Mon Sep 17 00:00:00 2001 From: Stainless Bot Date: Thu, 3 Oct 2024 00:16:33 +0000 Subject: [PATCH] feat: feat: add async batch uploads & improve client-side upload latency --- pyproject.toml | 2 + src/openlayer/lib/data/__init__.py | 7 +- src/openlayer/lib/data/_upload.py | 1 - src/openlayer/lib/data/batch_inferences.py | 74 ++++++++++++++++------ 4 files changed, 61 insertions(+), 23 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 587c3459..9c6d3cef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,9 @@ dependencies = [ "sniffio", "cached-property; python_version < '3.8'", "pandas; python_version >= '3.7'", + "pyarrow>=11.0.0", "pyyaml>=6.0", + "requests_toolbelt>=1.0.0", ] requires-python = ">= 3.7" classifiers = [ diff --git a/src/openlayer/lib/data/__init__.py b/src/openlayer/lib/data/__init__.py index 89cdc091..5072e313 100644 --- a/src/openlayer/lib/data/__init__.py +++ b/src/openlayer/lib/data/__init__.py @@ -4,9 +4,14 @@ "StorageType", "upload_reference_dataframe", "upload_batch_inferences", + "upload_batch_inferences_async", "update_batch_inferences", ] from ._upload import StorageType -from .batch_inferences import update_batch_inferences, upload_batch_inferences +from .batch_inferences import ( + update_batch_inferences, + upload_batch_inferences, + upload_batch_inferences_async, +) from .reference_dataset import upload_reference_dataframe diff --git a/src/openlayer/lib/data/_upload.py b/src/openlayer/lib/data/_upload.py index 2695133e..6127a890 100644 --- a/src/openlayer/lib/data/_upload.py +++ b/src/openlayer/lib/data/_upload.py @@ -5,7 +5,6 @@ """ import os -import shutil from enum import Enum from typing import Optional diff --git a/src/openlayer/lib/data/batch_inferences.py b/src/openlayer/lib/data/batch_inferences.py index dbc7d805..c8821c1a 100644 --- a/src/openlayer/lib/data/batch_inferences.py +++ b/src/openlayer/lib/data/batch_inferences.py @@ -1,23 +1,21 @@ """Upload a batch of inferences to the Openlayer platform.""" -import os import time -import shutil -import tarfile import tempfile from typing import Optional import httpx import pandas as pd +import pyarrow as pa from . import StorageType, _upload -from .. import utils from ... import Openlayer from ..._utils import maybe_transform from ...types.inference_pipelines import data_stream_params +import asyncio -def upload_batch_inferences( +async def upload_batch_inferences_async( client: Openlayer, inference_pipeline_id: str, config: data_stream_params.Config, @@ -25,6 +23,7 @@ def upload_batch_inferences( dataset_path: Optional[str] = None, storage_type: Optional[StorageType] = None, merge: bool = False, + verbose: bool = False, ) -> None: """Uploads a batch of inferences to the Openlayer platform.""" if dataset_df is None and dataset_path is None: @@ -33,7 +32,7 @@ def upload_batch_inferences( raise ValueError("Only one of dataset_df or dataset_path should be provided.") uploader = _upload.Uploader(client, storage_type) - object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.tar.gz" + object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.arrow" # Fetch presigned url presigned_url_response = client.storage.presigned_url.create( @@ -42,26 +41,34 @@ def upload_batch_inferences( # Write dataset and config to temp directory with tempfile.TemporaryDirectory() as tmp_dir: - temp_file_path = f"{tmp_dir}/dataset.csv" + # If DataFrame is provided, convert it to Arrow Table and write it using IPC + # writer if dataset_df is not None: - dataset_df.to_csv(temp_file_path, index=False) - else: - shutil.copy(dataset_path, temp_file_path) + temp_file_path = f"{tmp_dir}/dataset.arrow" + if verbose: + print("Converting DataFrame to pyarrow Table...") + pa_table = pa.Table.from_pandas(dataset_df) + pa_schema = pa_table.schema - # Copy relevant files to tmp dir - config["label"] = "production" - utils.write_yaml( - maybe_transform(config, data_stream_params.Config), - f"{tmp_dir}/dataset_config.yaml", - ) + if verbose: + print( + "Writing Arrow Table using RecordBatchStreamWriter to " + f"{temp_file_path}" + ) + with pa.ipc.RecordBatchStreamWriter(temp_file_path, pa_schema) as writer: + writer.write_table(pa_table, max_chunksize=16384) + else: + object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.csv" + temp_file_path = dataset_path - tar_file_path = os.path.join(tmp_dir, object_name) - with tarfile.open(tar_file_path, mode="w:gz") as tar: - tar.add(tmp_dir, arcname=os.path.basename("monitoring_data")) + # camelCase the config + config = maybe_transform(config, data_stream_params.Config) - # Upload to storage + # Upload tarball to storage + if verbose: + print("Uploading dataset to storage via presigned URL...") uploader.upload( - file_path=tar_file_path, + file_path=temp_file_path, object_name=object_name, presigned_url_response=presigned_url_response, ) @@ -73,10 +80,35 @@ def upload_batch_inferences( body={ "storageUri": presigned_url_response.storage_uri, "performDataMerge": merge, + "config": config, }, ) +def upload_batch_inferences( + client: Openlayer, + inference_pipeline_id: str, + config: data_stream_params.Config, + dataset_df: Optional[pd.DataFrame] = None, + dataset_path: Optional[str] = None, + storage_type: Optional[StorageType] = None, + merge: bool = False, + verbose: bool = False, +) -> None: + asyncio.run( + upload_batch_inferences_async( + client, + inference_pipeline_id, + config, + dataset_df, + dataset_path, + storage_type, + merge, + verbose, + ) + ) + + def update_batch_inferences( client: Openlayer, inference_pipeline_id: str,