Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data retrieval is grindingly slow with DeltaTable().to_pandas #631

Closed
boonware opened this issue Jun 9, 2022 · 13 comments
Closed

Data retrieval is grindingly slow with DeltaTable().to_pandas #631

boonware opened this issue Jun 9, 2022 · 13 comments

Comments

@boonware
Copy link

boonware commented Jun 9, 2022

Environment

Delta-rs version:

Binding:

Environment:

  • Cloud provider: S3 / IBM COS
  • OS: MacOS / Linux
  • Other: deltalake v0.5.7

Bug

What happened:
I have a Python Flask application that exposes a REST API to access the contents of a Delta Lake table for use in a web browser application. API requests to return records are grindingly slow, often in excess of 2 minutes. I have used a partitioned table with on the order of a few hundred (less than 1k) files. I have profiled the application and the bottleneck is in the to_pandas call. I see similar results if I replace the call with to_pyarrow_table or to_pyarrow_dataset.

Why is this call so slow?

dataset = DeltaTable(url).to_pandas(partitions=partitions, columns=columns).to_dict()

Is there a recommended approach that I am missing here?

What you expected to happen:

Data could be returned from the table in a reasonable length of time, i.e. that expected by a Reactive Web Application.

How to reproduce it:
The code above using S3 or IBM COS as backend storage. Table with order of 100K records partitioned on a timestamp field that creates on the order of 100 files.

More details:
As stated above, application profiling shows the delay is not at the REST controller level.

@boonware boonware added the bug Something isn't working label Jun 9, 2022
@boonware boonware changed the title Requests for data are grindingly slow Data retrieval is grindingly slow Jun 9, 2022
@boonware boonware changed the title Data retrieval is grindingly slow Data retrieval is grindingly slow with DeltaTable().to_pandas Jun 9, 2022
@wjones127
Copy link
Collaborator

It's hard to say what it could be. My initial guess is the bottleneck is IO. Have you checked how many files you are retrieving?

len(list(DeltaTable(url).to_pyarrow_dataset(partitions=partitions).get_fragments()))

By default, we use our internal filesystem, but you might get better performance with a pyarrow filesystem.

from pyarrow.fs import S3FileSystem

fs = S3FileSystem(...) # configure
dataset = DeltaTable(url).to_pandas(partitions=partitions, columns=columns, filesystem=fs).to_dict()

Data could be returned from the table in a reasonable length of time, i.e. that expected by a Reactive Web Application.

To be clear, I don't think Delta Lake will ever have comparable latency to a dedicated OLAP database. I would guess at best latency will be in the 1 - 5 second range, unless you implement some caching.

@MrPowers
Copy link
Contributor

@boonware - I wouldn't expect reading multiple files into a pandas DataFrame via delta-rs to provide low latency queries.

Query engines that are optimized to read multiple files in parallel like Spark or Dask should be a lot faster.

Even single node query engines like polars or DataFusion should provide better performance.

Make sure to query the data and leverage partition filters, predicate pushdown filters, and column pruning. Or just use a database.

@wjones127 wjones127 removed the bug Something isn't working label Jan 24, 2023
@someaveragepunter
Copy link

hopefully this thread is still being monitored after closed.
Just to clarify I've benchmarked a few comparisons when reading the same parquet files(s) of sizes between 1Mb - 100Mb from S3:

- pyarrow over s3fs: 5x faster
- duckdb: 3x faster
- awswrangler: 3x faster

than delta-rs. the files were written with delta-rs but read back from the 3 packages above

I like delta-rs for all the benefits it provides over raw parquet. however cannot justify trading the performance for the features.
is getting performance parity or btter with pyarrow and duckdb on the delta-rs roadmap

@MrPowers
Copy link
Contributor

@someaveragepunter - can you please provide the sample data and the queries, so we can take a look? Are you using partition filtering/metadata filtering? Are you saying that DuckDB is faster than pandas for certain queries?

@someaveragepunter
Copy link

Let me try and narrow the use cases down, I was testing with partition filtering, but recall even for non partitioned reads it was slower. I just wanted to check if it was something the core contributors were prepared to look at before I spent the time cleaning up my example.

One thing that would help is if you could recommend some public deltatable data readily available on S3 I could test against rather than needing to share my sample data.

thanks @MrPowers

@MrPowers
Copy link
Contributor

@someaveragepunter - I've been running benchmarks with the h2o groupby dataset.

Here are instructions on how to generate the dataset.

Here's a talk I gave at the Data + AI Summit a couple of months back showing how Delta Lake made some queries run a lot faster. You need to formulate the queries properly to get good performance. Feel free to send your query code in the meantime and I can take a look.

@someaveragepunter
Copy link

someaveragepunter commented Aug 17, 2023

I think the key point I'm emphasizing on here is reading from s3 (blob store) as opposed to a local or network filesystem.
I've had to use a different dataset which is much larger in size (700mb) than my original test, in this case,
a) pyarrow on s3fs is twice as fast as delta-rs
b) duckdb is twice as fast when loading all cols but roughly on par when selecting only 2 cols.

if you could write your h2o files as a deltalake and share them publically on S3, I could try running some partitioned tests

example code below:


import boto3
from swarkn.helpers import timer, init_logger #pip install swarkn
from deltalake import DeltaTable as DT

init_logger()

# simple delta table with 2 parquet files 700Mb and 100Mb, no partitions
path = 's3://nyc-taxi-test/temp/weather_sorted_delta'
columns = ['flow_date', 'ssr'] # column pruning

boto3.setup_default_session()
creds = boto3.DEFAULT_SESSION.get_credentials()

############################## S3FS + pyarrow ##########################

import pyarrow.dataset as ds
from pyarrow import fs

s3 = fs.S3FileSystem()
with timer(): #1s negligible setup cost
    dataset = ds.dataset(path.replace('s3://', ''),
        format="parquet",
        filesystem=s3,
        #partitioning="hive"
    )

with timer(): #80s
    pyarrow_table = dataset.to_table(
        # columns=columns # 8s
    )




#################### DUCKDB #########################
import duckdb

s3 = boto3.resource('s3')
client = boto3.client("s3")
con = duckdb.connect(database=':memory:')

con.execute(f"""
INSTALL httpfs;
LOAD httpfs;
SET s3_region='eu-west-2';
SET s3_access_key_id={creds.access_key};
SET s3_secret_access_key='{creds.secret_key}';
""")

with timer(): # 17s
    out = con.execute(f"select flow_date, ssr from read_parquet('{path}/*.parquet')").df()


with timer(): # 89s
    out = con.execute(f"select * from read_parquet('{path}/*.parquet')").df()

#################### DELTA Lake #########################

with timer():
    det = DT(path,
        storage_options=dict(
            AWS_REGION='eu-west-2',
            AWS_ACCESS_KEY_ID=creds.access_key,
            AWS_SECRET_ACCESS_KEY=creds.secret_key,
            AWS_S3_ALLOW_UNSAFE_RENAME='true'
        )
    )
with timer(): # 160s
    dfpd = det.to_pyarrow_table(
        # columns=columns, # 16s
    )

deltalake.version
'0.10.1'
duckdb.version
'0.8.1'
boto3.version
'1.28.16'
pyarrow.version
'12.0.1'

python 3.11
windows

@MrPowers
Copy link
Contributor

MrPowers commented Aug 21, 2023

@someaveragepunter - can you try to execute the query like this:

table = DeltaTable(f"{pathlib.Path.home()}/data/delta/G1_1e9_1e2_0_0")
dataset = table.to_pyarrow_dataset()
quack = duckdb.arrow(dataset)
quack.filter("id1 = 'id016' and v2 > 10")

This notebook shows the performance gains you can get by using to_pyarrow_dataset instead of to_pyarrow_table.

I think you're seeing bad performance cause you're loading all the data into memory.

@someaveragepunter
Copy link

to_pyarrow_dataset() is still lazy and hasn't downloaded any data. how do I retrieve that data into a pandas or polars dataframe? I still have to do table.to_pyarrow_dataset().to_table() right? which takes the same amount of (slow) time

can I ask that you execute the code above and perform the necessary adjustments and test the performance gain?

again I have to stress, this is slowness I'm seeing using S3 blob storage. your example above suggests that you're testing with a local filesystem.

@MrPowers
Copy link
Contributor

I still have to do table.to_pyarrow_dataset().to_table() right?

No, quack.filter("id1 = 'id016' and v2 > 10") runs the query. You don't need to convert to a PyArrow Table. Converting an entire dataset to a PyArrow table is an antipattern and will give bad performance.

pyarrow on s3fs is twice as fast as delta-rs

I'm still trying to figure out the benchmark you're running. There are query engines (pandas, Spark, DuckDB), Lakehouse Storage Systems (Delta Lake), and in memory formats (Arrow). Shouldn't the benchmark comparison be something like "A DuckDB query run on Parquet files stored in S3 runs faster/slower than the same DuckDB query run on a Delta Table with the same data stored in S3"?

@someaveragepunter
Copy link

The comparison I'm running at this point is a query without any filters i.e. I genuinely want the entire contents of the parquet files within that folder.
I'm comparing the time it takes to pull the entire contents of the parquet file back using
a) duckdb (not the query engine but the httpfs mechanism it uses to read the parquet file)
b) pyarrow over s3fs
c) delta-rs (which seems to be the slowest amongst the 3)

@mattiasu96
Copy link

If I may jump in, I have a pretty similar problem.

I am using a Delta Table (hosted on S3) withing Databricks.

I am pretty aware that I shouldn't (theoretically) be loading all the data in memory and this slows down execution. But due to some legacy code, at a certain point I need to cast my dataset to Pandas.

I made a simple experiment: it's a 1.9GB parquet file resultin in a table of 8885805 rows × 133 columns.

If I load the parquet files directly in pandas through pd.read_parquet() it takes overall 50 seconds to load the dataset.

If I save my dataset as delta table (with same partitioning) and I do:

from deltalake import DeltaTable
dt = DeltaTable("/my_table_path/")
df = dt.to_pandas()

This takes 2 minutes and 50 seconds. Which is a great worsening in performances.

For bigger dataset, this result scales accordigly, with DeltaLake taking an insane amount of time to cast to pandas

@snsancar
Copy link

snsancar commented Jan 3, 2024

@someaveragepunter - can you try to execute the query like this:

table = DeltaTable(f"{pathlib.Path.home()}/data/delta/G1_1e9_1e2_0_0")
dataset = table.to_pyarrow_dataset()
quack = duckdb.arrow(dataset)
quack.filter("id1 = 'id016' and v2 > 10")

This notebook shows the performance gains you can get by using to_pyarrow_dataset instead of to_pyarrow_table.

I think you're seeing bad performance cause you're loading all the data into memory.

I tested this, really much faster than converting to Pandas DF ( huge difference, thanks)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants