Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why bucket table is not supported in Delta #524

Open
LantaoJin opened this issue Sep 24, 2020 · 16 comments
Open

Why bucket table is not supported in Delta #524

LantaoJin opened this issue Sep 24, 2020 · 16 comments
Labels
question Questions on how to use Delta Lake

Comments

@LantaoJin
Copy link
Contributor

Why bucket table is not supported in Delta? I have implemented bucket table in Delta, includes

  1. create a bucket table using delta
  2. convert a bucket table to delta
  3. read and insert, update/delete/merge on bucket delta table via SQL
  4. bucket join (without shuffle)

Do we need this feature? I can file a PR.

@zsxwing
Copy link
Member

zsxwing commented Oct 5, 2020

Hidden partitioning ( #490 ) is a better solution for this. It's pretty easy to support bucket tables on top of hidden partitioning.

@vprus
Copy link

vprus commented Apr 10, 2021

@zsxwing : Could you explain how bucketing can be implemented on top of partitioning? Say, we have a users table bucketed by user_id, so we can join with this table without shuffle. That works out of the box in plain Spark, and I'd rather have the same in Delta. With hidden partitioning, I'd have to specify partitioning based on something like pmod(hash($"user_id"), N)? That will probably add additional unnecessary path elements, which is inelegant, but the key question is whether Spark will skip shuffle in this case? In other works, whether dataframe created when reading such table will have proper outputPartitioning?

@MasterDDT
Copy link

MasterDDT commented Aug 30, 2021

I'm also interested in this. I'd like to write a delta table to s3 with df.repartition(5, "someCol") and it would be nice if delta can write HashPartitioning(5, someCol) into the metadata along with 5 files. Then on read it can have that partitioning so it can avoid future shuffles.

I dont like using DataFrameWriter.bucketBy because it requires a metastore, the above ^ should be doable without it. Also DataFrameWriter.partitionBy produces 1 file per partition value which is not great when its a hash.

@allisonport-db allisonport-db added the question Questions on how to use Delta Lake label Oct 8, 2021
@alainbryden
Copy link

alainbryden commented Dec 14, 2022

Switching to bucketed parquet tables to avoid shuffles has brought my processing pipeline down from 2 hours to 5 minutes.

Today, databricks returns warnings (hints?) that I am not using Delta tables:

Accelerate queries with Delta: This query is on a non-Delta table with many small files. To improve the performance of queries, convert to Delta and run the OPTIMIZE command on the table

but if I attempt to switch formats, I get the error:

"AnalysisException: Operation not allowed: Bucketing is not supported for Delta tables"

Ideally this would be supported, but at the very least, databricks shouldn't be hinting to switch to delta if the current format is bucketed parquet.

@scottsand-db
Copy link
Collaborator

Hi @alainbryden - if you have Databricks-specific questions, please contact the appropriate Databricks support.

We can only answer questions about open-source Delta Lake :)

@alainbryden
Copy link

@scottsand-db This is not my ticket, I merely added my experience as a comment. The original question does not appear to be databricks specific. I suggest reopening.

@scottsand-db scottsand-db reopened this Jan 3, 2023
@cb-sukumarnataraj
Copy link

Any update on this feature request?. we are also interested in this feature.

@MasterDDT
Copy link

MasterDDT commented Jun 1, 2023

@dennyglee Hi from ActionIQ, is this ticket written clearly and does it have the right tags to get triaged?

@dennyglee
Copy link
Contributor

Hi @MasterDDT - the ticket is clearly written but I guess the concern I immediately have is that df.repartiton() would require changes to how Spark would "decide" to run the Hashpartition(y, $col). We would want to design the API so how Trino, Rust, Python (and all of the other frameworks) would interact with this metadata. We'll probably have to noodle this design a bit so that way all systems could leverage this, eh?!

@MasterDDT
Copy link

MasterDDT commented Jun 2, 2023

There's already the _delta_log/* metadata that all clients need to read, could you stick it in there?

Yeah I thought maybe the implementation of the hash function could be an issue, I dont know if df.repartition(y, $col) produces different layout of data based on engine (Spark vs Trino) version, Java version, CPU arch, etc. It could be best effort so if you write with (sparkVer=x, javaVer=x, ...) and read it back with same thing, it would honor that and give you an already partitioned dataset?

Essentially what we want is to get away from using a metastore...its only useful (for us) to keep schema + partitioning information, which delta could do too, and then the data is next to the metadata which is very nice for portability. We might still use a metastore but then its more like a cache (we dont care about dropping it).

tdas pushed a commit to tdas/delta that referenced this issue Jun 6, 2023
* [FlinkSQL_PR_1] Flink Delta Sink - Table API UPDATED (delta-io#389)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Co-authored-by: Paweł Kubit <pawel.kubit@getindata.com>
Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* [FlinkSQL_PR_2] - SQL Support for Delta Source connector. (delta-io#487)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_3] - Delta catalog skeleton (delta-io#503)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_4] - Delta catalog - Interactions with DeltaLog. Create and get table. (delta-io#506)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_5] - Delta catalog - DDL option validation. (delta-io#509)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_6] - Delta catalog - alter table + tests. (delta-io#510)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_7] - Delta catalog - Restrict Delta Table factory to work only with Delta Catalog + tests. (delta-io#514)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_8] - Delta Catalog - DDL/Query hint validation + tests. (delta-io#520)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_9] - Delta Catalog - Adding Flink's Hive catalog as decorated catalog. (delta-io#524)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_10] - Table API support SELECT with filter on partition column. (delta-io#528)

* [FlinkSQL_PR_10] - Table API support SELECT with filter on partition column.

---------

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Co-authored-by: Scott Sandre <scott.sandre@databricks.com>

* [FlinkSQL_PR_11] - Delta Catalog - cache DeltaLog instances in DeltaCatalog. (delta-io#529)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_12] - UML diagrams. (delta-io#530)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_13] - Remove mergeSchema option from SQL API. (delta-io#531)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_14] - SQL examples. (delta-io#535)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* remove duplicate function after rebasing against master

---------

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Co-authored-by: kristoffSC <krzysiek.chmielewski@gmail.com>
Co-authored-by: Paweł Kubit <pawel.kubit@getindata.com>
Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
tdas pushed a commit to tdas/delta that referenced this issue Jun 8, 2023
* [FlinkSQL_PR_1] Flink Delta Sink - Table API UPDATED (delta-io#389)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Co-authored-by: Paweł Kubit <pawel.kubit@getindata.com>
Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* [FlinkSQL_PR_2] - SQL Support for Delta Source connector. (delta-io#487)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_3] - Delta catalog skeleton (delta-io#503)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_4] - Delta catalog - Interactions with DeltaLog. Create and get table. (delta-io#506)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_5] - Delta catalog - DDL option validation. (delta-io#509)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_6] - Delta catalog - alter table + tests. (delta-io#510)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_7] - Delta catalog - Restrict Delta Table factory to work only with Delta Catalog + tests. (delta-io#514)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_8] - Delta Catalog - DDL/Query hint validation + tests. (delta-io#520)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_9] - Delta Catalog - Adding Flink's Hive catalog as decorated catalog. (delta-io#524)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_10] - Table API support SELECT with filter on partition column. (delta-io#528)

* [FlinkSQL_PR_10] - Table API support SELECT with filter on partition column.

---------

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Co-authored-by: Scott Sandre <scott.sandre@databricks.com>

* [FlinkSQL_PR_11] - Delta Catalog - cache DeltaLog instances in DeltaCatalog. (delta-io#529)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_12] - UML diagrams. (delta-io#530)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_13] - Remove mergeSchema option from SQL API. (delta-io#531)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_14] - SQL examples. (delta-io#535)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* remove duplicate function after rebasing against master

---------

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Co-authored-by: kristoffSC <krzysiek.chmielewski@gmail.com>
Co-authored-by: Paweł Kubit <pawel.kubit@getindata.com>
Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
@ltltlt
Copy link

ltltlt commented Apr 30, 2024

It's been an year, any updates on this feature support? Basically our workload is same as @MasterDDT that we need to bucketBy UserId and filter by specific UserId and read only one bucket instead of all data. We have tried zorder by UserId and the perf is not ideal.

@wudanzy
Copy link

wudanzy commented May 28, 2024

Hi all, what's the status on the feature? We also want bucket by a list of columns, so consumers can leverage such info for joins and aggregations.

I investigated Z ordering and liquid clustering, but they are for data skipping.

@shubham-goel99
Copy link

This might be a naïve question, but why we want to support bucketing using df.repartition instead of the DataFrameWriter.bucketBy operation?

@MasterDDT
Copy link

MasterDDT commented Jul 10, 2024

As a simpler implementation, you could ignore supporting the Spark Dataframe partitioning 🤷 . Ultimately, here is what I'm looking for:

someDF
  .write.format("delta")
  .option("newHashBucketingColumn", "foo")
  .option("newHashBucketingNumPartitions", "5")
  .save("s3://somewhere")

spark.read.format("delta").load("s3://somewhere").queryExecution.sparkPlan.outputPartitioning match {
  case HashPartitioning(5, "foo") => yes!
  case _ => no
}

^ allows me to take two Delta tables with the same bucketing spec and join them without a shuffle. Otherwise I am pretty sure ^ will always shuffle no matter what zorder/liquid/etc you do.

@wudanzy
Copy link

wudanzy commented Aug 7, 2024

Hi all, I have done a lot of investigation and made this design of implementing bucketing in the delta.

Please take a look at the design and share your thoughts! Also, I assume that everyone in this thread would be interested in the bucketing feature, so please upvote this feature in any channel to help speed up this feature.

@wudanzy
Copy link

wudanzy commented Aug 7, 2024

Hi all, since this issue has label as question, I formally filled a feature request. Please upvote that feature (or provide any other help) to make it happen sooner!

I am willing to contribute this feature, but since I am new to the Delta community, I need some suggestion and guidance from all of you to make it happen, any help is welcomed!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Questions on how to use Delta Lake
Projects
None yet
Development

No branches or pull requests