From 85a1126dc735104c2f2887462281b9025cbc533a Mon Sep 17 00:00:00 2001 From: Ian Mathews Date: Mon, 20 May 2024 17:11:59 -0400 Subject: [PATCH] refactor: notebook output tables (#40) * feat: refactor to new notebook output table API * chore: bump version * fix: support both resumable and non-resumable notebook uploads * fix: resumable vs. standard upload * add logging for debugging * refactor: remove ability to pass string to notebook.create_output_Table * fix: open file to be uploaded * chore: final cleanup, remove debug logs --- setup.py | 2 +- src/redivis/_version.py | 2 +- src/redivis/classes/Base.py | 1 - src/redivis/classes/Dataset.py | 1 - src/redivis/classes/File.py | 1 - src/redivis/classes/Notebook.py | 93 +++++++++++++++----------- src/redivis/classes/Query.py | 4 +- src/redivis/classes/Row.py | 1 - src/redivis/classes/Table.py | 5 +- src/redivis/classes/Upload.py | 15 +++-- src/redivis/common/retryable_upload.py | 1 - 11 files changed, 75 insertions(+), 51 deletions(-) diff --git a/setup.py b/setup.py index 2d2841c..134c02d 100644 --- a/setup.py +++ b/setup.py @@ -43,7 +43,7 @@ "pyarrow >= 13.0.0", "requests >= 2.0.0", "shapely >= 2.0.1", - "tqdm == 4.64.0", + "tqdm >= 4.64.0", ] # What packages are optional? diff --git a/src/redivis/_version.py b/src/redivis/_version.py index e113045..6a57210 100644 --- a/src/redivis/_version.py +++ b/src/redivis/_version.py @@ -1 +1 @@ -__version__ = "0.14.7" +__version__ = "0.14.8" diff --git a/src/redivis/classes/Base.py b/src/redivis/classes/Base.py index 767081d..29e62ee 100644 --- a/src/redivis/classes/Base.py +++ b/src/redivis/classes/Base.py @@ -1,4 +1,3 @@ -import json class Base: diff --git a/src/redivis/classes/Dataset.py b/src/redivis/classes/Dataset.py index 296a0e2..be339d6 100644 --- a/src/redivis/classes/Dataset.py +++ b/src/redivis/classes/Dataset.py @@ -2,7 +2,6 @@ from .Query import Query from .Base import Base from urllib.parse import quote as quote_uri -import warnings from ..common.api_request import make_request, make_paginated_request diff --git a/src/redivis/classes/File.py b/src/redivis/classes/File.py index 64c5f18..e45b036 100644 --- a/src/redivis/classes/File.py +++ b/src/redivis/classes/File.py @@ -8,7 +8,6 @@ from ..common.api_request import make_request from urllib.parse import quote as quote_uri - class File(Base): def __init__( self, diff --git a/src/redivis/classes/Notebook.py b/src/redivis/classes/Notebook.py index a8006d7..0d8aaf0 100644 --- a/src/redivis/classes/Notebook.py +++ b/src/redivis/classes/Notebook.py @@ -1,13 +1,15 @@ from .Table import Table from .Base import Base from ..common.api_request import make_request +from tqdm.auto import tqdm + import pathlib import uuid import os import pyarrow as pa import pyarrow.dataset as pa_dataset import pyarrow.parquet as pa_parquet -import pandas as pd +from ..common.retryable_upload import perform_resumable_upload, perform_standard_upload class Notebook(Base): def __init__( @@ -17,46 +19,61 @@ def __init__( self.current_notebook_job_id = current_notebook_job_id def create_output_table(self, data=None, *, name=None, append=False, geography_variables=None): - if type(data) == str: - temp_file_path = str + temp_file_path = f"/tmp/redivis/out/{uuid.uuid4()}" + pathlib.Path(temp_file_path).parent.mkdir(exist_ok=True, parents=True) + + import geopandas + import pandas as pd + + if isinstance(data, geopandas.GeoDataFrame): + if geography_variables is None: + geography_variables = list(data.select_dtypes('geometry')) + data.to_wkt().to_parquet(path=temp_file_path, index=False) + elif isinstance(data, pd.DataFrame): + data.to_parquet(path=temp_file_path, index=False) + elif isinstance(data, pa_dataset.Dataset): + pa_dataset.write_dataset(data, temp_file_path, format='parquet', basename_template='part-{i}.parquet', max_partitions=1) + temp_file_path = f'{temp_file_path}/part-0.parquet' + elif isinstance(data, pa.Table): + pa_parquet.write_table(data, temp_file_path) else: - temp_file_path = f"/tmp/redivis/out/{uuid.uuid4()}" - pathlib.Path(temp_file_path).parent.mkdir(exist_ok=True, parents=True) - - import geopandas - - if isinstance(data, geopandas.GeoDataFrame): - if geography_variables is None: - geography_variables = list(data.select_dtypes('geometry')) - data.to_wkt().to_parquet(path=temp_file_path, index=False) - elif isinstance(data, pd.DataFrame): - data.to_parquet(path=temp_file_path, index=False) - elif isinstance(data, pa_dataset.Dataset): - pa_dataset.write_dataset(data, temp_file_path, format='parquet', basename_template='part-{i}.parquet', max_partitions=1) - temp_file_path = f'{temp_file_path}/part-0.parquet' - elif isinstance(data, pa.Table): - pa_parquet.write_table(data, temp_file_path) + # importing polars is causing an IllegalInstruction error on ARM + Docker. Import inline to avoid crashes elsewhwere + # TODO: revert once fixed upstream + import polars + if isinstance(data, polars.LazyFrame): + data.sink_parquet(temp_file_path) + elif isinstance(data, polars.DataFrame): + data.write_parquet(temp_file_path) else: - # importing polars is causing an IllegalInstruction error on ARM + Docker. Import inline to avoid crashes elsewhwere - # TODO: revert once fixed upstream - import polars - if isinstance(data, polars.LazyFrame): - data.sink_parquet(temp_file_path) - elif isinstance(data, polars.DataFrame): - data.write_parquet(temp_file_path) - else: - raise Exception('Unknown datatype provided to notebook.create_output_table. Must either by a file path, or an instance of pandas.DataFrame, pyarrow.Dataset, pyarrow.Table, dask.DataFrame, polars.LazyFrame, or polars.DataFrame') + raise Exception('Unknown datatype provided to notebook.create_output_table. Must either by a file path, or an instance of pandas.DataFrame, pyarrow.Dataset, pyarrow.Table, dask.DataFrame, polars.LazyFrame, or polars.DataFrame') + + size = os.stat(temp_file_path).st_size + + pbar_bytes = tqdm(total=size, unit='B', leave=False, unit_scale=True) + + res = make_request( + method="POST", + path=f"/notebookJobs/{self.current_notebook_job_id}/tempUploads", + payload={"tempUploads": [{"size": size, "resumable": size > 1e8}]}, + ) + temp_upload = res["results"][0] with open(temp_file_path, 'rb') as f: - res = make_request( - method="PUT", - path=f"/notebookJobs/{self.current_notebook_job_id}/outputTable", - query={"name": name, "append": append, "geographyVariables": geography_variables}, - payload=f, - parse_payload=False - ) - - if type(data) != str: - os.remove(temp_file_path) + if temp_upload["resumable"]: + perform_resumable_upload(data=f, progressbar=pbar_bytes, + temp_upload_url=temp_upload["url"]) + else: + perform_standard_upload(data=f, temp_upload_url=temp_upload["url"], + progressbar=pbar_bytes) + + pbar_bytes.close() + os.remove(temp_file_path) + + res = make_request( + method="PUT", + path=f"/notebookJobs/{self.current_notebook_job_id}/outputTable", + payload={"name": name, "append": append, "geographyVariables": geography_variables, "tempUploadId": temp_upload["id"]}, + ) + return Table(name=res["name"], properties=res) diff --git a/src/redivis/classes/Query.py b/src/redivis/classes/Query.py index b33f444..e53d798 100644 --- a/src/redivis/classes/Query.py +++ b/src/redivis/classes/Query.py @@ -3,7 +3,6 @@ import time import warnings import pyarrow as pa -import pandas as pd from ..common.api_request import make_request from ..common.list_rows import list_rows @@ -107,6 +106,8 @@ def to_pandas_dataframe(self, max_results=None, *, geography_variable="", progre batch_preprocessor=batch_preprocessor ) + import pandas as pd + if dtype_backend == 'numpy_nullable': df = arrow_table.to_pandas(self_destruct=True, date_as_object=date_as_object, types_mapper={ pa.int64(): pd.Int64Dtype(), @@ -146,6 +147,7 @@ def to_geopandas_dataframe(self, max_results=None, *, geography_variable="", pro coerce_schema=False, batch_preprocessor=batch_preprocessor ) + import pandas as pd if dtype_backend == 'numpy_nullable': df = arrow_table.to_pandas(self_destruct=True, date_as_object=date_as_object, types_mapper={ diff --git a/src/redivis/classes/Row.py b/src/redivis/classes/Row.py index 167ec4a..2130a1b 100644 --- a/src/redivis/classes/Row.py +++ b/src/redivis/classes/Row.py @@ -1,5 +1,4 @@ import copy -import operator from typing import Any, Dict, Iterable, Tuple diff --git a/src/redivis/classes/Table.py b/src/redivis/classes/Table.py index 0e7bb73..8e314c8 100644 --- a/src/redivis/classes/Table.py +++ b/src/redivis/classes/Table.py @@ -7,7 +7,6 @@ import concurrent.futures import pathlib import pyarrow as pa -import pandas as pd from tqdm.auto import tqdm from threading import Event @@ -394,6 +393,8 @@ def to_pandas_dataframe(self, max_results=None, *, variables=None, progress=True batch_preprocessor=batch_preprocessor ) + import pandas as pd + if dtype_backend == 'numpy_nullable': df = arrow_table.to_pandas(self_destruct=True, date_as_object=date_as_object, types_mapper={ pa.int64(): pd.Int64Dtype(), @@ -429,6 +430,8 @@ def to_geopandas_dataframe(self, max_results=None, *, variables=None, geography_ batch_preprocessor=batch_preprocessor ) + import pandas as pd + if dtype_backend == 'numpy_nullable': df = arrow_table.to_pandas(self_destruct=True, date_as_object=date_as_object, types_mapper={ pa.int64(): pd.Int64Dtype(), diff --git a/src/redivis/classes/Upload.py b/src/redivis/classes/Upload.py index dfa8828..5bb0c81 100644 --- a/src/redivis/classes/Upload.py +++ b/src/redivis/classes/Upload.py @@ -6,7 +6,6 @@ import warnings import io import pyarrow as pa -import pandas as pd from tqdm.auto import tqdm from ..common.util import get_geography_variable, get_warning @@ -15,7 +14,7 @@ from .Base import Base from .Variable import Variable from ..common.api_request import make_request, make_paginated_request -from ..common.retryable_upload import perform_resumable_upload +from ..common.retryable_upload import perform_resumable_upload, perform_standard_upload class Upload(Base): def __init__( @@ -68,10 +67,16 @@ def create( res = make_request( method="POST", path=f"{self.table.uri}/tempUploads", - payload={"tempUploads": [{"size": size, "name": self.name, "resumable": True}]}, + payload={"tempUploads": [{"size": size, "name": self.name, "resumable": size > 1e7}]}, ) temp_upload = res["results"][0] - perform_resumable_upload(data=data, temp_upload_url=temp_upload["url"], progressbar=pbar_bytes) + if temp_upload["resumable"]: + perform_resumable_upload(data=data, progressbar=pbar_bytes, + temp_upload_url=temp_upload["url"]) + else: + perform_standard_upload(data=data, temp_upload_url=temp_upload["url"], + progressbar=pbar_bytes) + if progress: pbar_bytes.close() @@ -257,6 +262,7 @@ def to_pandas_dataframe(self, max_results=None, *, variables=None, progress=True coerce_schema=True, batch_preprocessor=batch_preprocessor ) + import pandas as pd if dtype_backend == 'numpy_nullable': df = arrow_table.to_pandas(self_destruct=True, date_as_object=date_as_object, types_mapper={ @@ -291,6 +297,7 @@ def to_geopandas_dataframe(self, max_results=None, *, variables=None, geography_ coerce_schema=True, batch_preprocessor=batch_preprocessor ) + import pandas as pd if dtype_backend == 'numpy_nullable': df = arrow_table.to_pandas(self_destruct=True, date_as_object=date_as_object, types_mapper={ diff --git a/src/redivis/common/retryable_upload.py b/src/redivis/common/retryable_upload.py index fe359a2..94791d9 100644 --- a/src/redivis/common/retryable_upload.py +++ b/src/redivis/common/retryable_upload.py @@ -4,7 +4,6 @@ import requests import os import logging -from ..common.api_request import make_request from tqdm.utils import CallbackIOWrapper