Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for optimize (file compaction) on Delta tables #934

Closed
wants to merge 3 commits into from

Conversation

vkorukanti
Copy link
Collaborator

Overview

This work adds the “OPTIMIZE (file compaction)” as outlined in issue #927

Implementation Details

  • Add OPTIMIZE SQL command to grammar file DeltaSqlBase.g4
  • Handle the Optimize table SQL command in DeltaSqlParser.scala by creating a DeltaCommand implementation of OPTIMIZE (file compaction).
  • Implement DeltaCommand for OPTIMIZE (file compaction) (OptimizeTableCommand). Algorithm is as follows:
  • List all the files in latest snapshot that satisfy the given partition predicate
    • Go through the files and filter out all files that are greater than the minimum size (DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE)
    • Group files by each partition
    • For each partition divide the files into bins where the sum of the size of the files in a bin is <= DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE
    • Run a Spark job for each bin that reads the files in the bin and writes the output in a single file. The command can run DeltaSQLConfig.DELTA_OPTIMIZE_MAX_THREADS number of parallel jobs, each working on one bin.
    • At the end, create a DeltaLog transaction that adds the output files generated and removes the compacted files.
    • At each step, collect metrics to output in the Optimize SQL command.
  • Tests

This resolves #927.

@vkorukanti vkorukanti force-pushed the optimize branch 2 times, most recently from 5cb3bbf to 58513e4 Compare February 10, 2022 07:43
.doc("Target file size produced by the OPTIMIZE command.")
.longConf
.checkValue(_ >= 0, "maxFileSize has to be positive")
.createWithDefault(1024 * 1024 * 1024)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a problem that the max and min file size are the same by default? If you assume the input and output size during compaction is the same (which obviously isn't exactly the case in reality), you'll never compact a file to above the min file size threshold. I guess they would just get filtered out as bins of 1 file, but not sure if that's suboptimal (pun intended)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min_size will avoid compacting already large files(> 1 GB in this case) s in the table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm just saying compacted files will never become "large", so they always be reconsidered for compaction (and probably ignored from there so maybe not a huge deal)

@Kimahriman
Copy link
Contributor

Any plans yet for a Scala and Python API on DeltaTable?

@Kimahriman
Copy link
Contributor

Tested this out today and found a couple things:

  • The parallelism doesn't seem to be working, was only getting 1 job running at a time
  • Each job had to sort its data, since a write to a partitioned table requires sorting on the partition value by default. Can get around this using spark.sql.maxConcurrentOutputFileWriters=2, but would probably be good to have this built-in if possible to avoid having to store/spill all the data before writing (same problem exists by default using the current compaction method of replaceWhere)

@zsxwing
Copy link
Member

zsxwing commented Feb 17, 2022

Thanks a lot for testing this! See the following comments to the issues you raised:

The parallelism doesn't seem to be working, was only getting 1 job running at a time

How many partitions do you have and what's the size of each partition? Currently we pack files into bins and run each bin in one Spark job. In order to get more than one job, we need either more than one partitions or a partition that has more than one bins.

Each job had to sort its data

Good catch. Since eliminating sort is an optimization for the existing approach, we can leave it in a followup PR. @vkorukanti could you create a followup issue for this?

@zsxwing
Copy link
Member

zsxwing commented Feb 17, 2022

Any plans yet for a Scala and Python API on DeltaTable?

Are you interested in working on these programming APIs? We would like to involve the community as much as possible in this big feature 😄

@vkorukanti
Copy link
Collaborator Author

Tested this out today and found a couple things:
Thank you for testing it.

  • The parallelism doesn't seem to be working, was only getting 1 job running at a time

Good catch! This is a bug in the PR. I just pushed a fix.

  • Each job had to sort its data, since a write to a partitioned table requires sorting on the partition value by default. Can get around this using spark.sql.maxConcurrentOutputFileWriters=2, but would probably be good to have this built-in if possible to avoid having to store/spill all the data before writing (same problem exists by default using the current compaction method of replaceWhere)

Good point! Will look into this.

@zsxwing
Copy link
Member

zsxwing commented Feb 17, 2022

Good catch! This is a bug in the PR. I just pushed a fix.

Ah, could you also a test for this?

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some style comments.

@Kimahriman
Copy link
Contributor

Any plans yet for a Scala and Python API on DeltaTable?

Are you interested in working on these programming APIs? We would like to involve the community as much as possible in this big feature 😄

Yeah I can take a stab at it if no one gets to it first, just wanted to make sure it wasn't already being worked hah. There tends to be bursts of commits that get added out of the blue 😛

I am currently limited in how well I can optimize things with the current method, so very curious to test this out and get it working quickly. Biggest thing I wanted to test out was how well the one job per file thing could be parallelized, I will want to use hundreds to thousands of cores to optimize (at least for the initial run). Currently we use a combo of maxPartitionBytes and openCostInBytes to do a single partition in a single job without a shuffle, just limited by having to do delta log scans for each partition

@Kimahriman
Copy link
Contributor

New observations testing out with the threading working:

  • Spark seems to be handling 1,000 threads fine
  • I'm testing it out using Zeppelin and Zeppelin is not super happy with trying to track updates for 1,000 jobs at once. Maybe not a huge deal, don't know if this will become a problem anywhere else too (like in databricks itself?)
  • When I originally tested the default 15 threads, it was hard to stop my test. I tried multiple times to stop the optimize, but killing things from zeppelin would kill the running jobs, but then it would just start the next batch of jobs. Likewise killing the stage in Spark killed that one job, but then it would just keep starting the next. It eventually stopped, but I'm not sure how (maybe I had to kill it 15 times or something?)

@zsxwing
Copy link
Member

zsxwing commented Feb 18, 2022

@Kimahriman Thanks for testing this. Do you know how Zeppelin cancels a command along with running Spark jobs? Databricks has its own notebook implementation and its own command cancellation mechanism. Our Databricks experience doesn't apply to Zeppelin.

@Kimahriman
Copy link
Contributor

I think it just cancels the "job group", and it creates a job group for a paragraph basically and any job created by that paragraph. That's why I wasn't sure Databricks might do the same thing of just cancelling job groups. I'll play around with a little more and see, but not a show stopper by any means. We normally wouldn't run this in zeppelin anyway, just used it for testing

GitOrigin-RevId: ed312cd4abc8477468dfd1d37c054e5542286a5d
@Kimahriman
Copy link
Contributor

The more common thing I guess is killing a stage in the UI. That seems to just kill that one job but then it just caries on with the rest of them. What will happen in this case when things finally finish? Will the whole thing fail? Or will it partially succeed? And same thing could happen with an actual error in one of the jobs. Would it make sense to fail quickly as soon as one of the jobs fails?

@vkorukanti
Copy link
Collaborator Author

The more common thing I guess is killing a stage in the UI. That seems to just kill that one job but then it just caries on with the rest of them. What will happen in this case when things finally finish? Will the whole thing fail? Or will it partially succeed? And same thing could happen with an actual error in one of the jobs. Would it make sense to fail quickly as soon as one of the jobs fails?

What is the paralleization number set in your test case? Any failure in one of the jobs fails the Optimize command. This can be improved to cancel the pending jobs as soon as one failure is seen.

@Kimahriman
Copy link
Contributor

Yeah I guess that's the question, is it supposed to wait until it's done before it realizes it's going to fail? This was just with the default 15 threads

@zsxwing
Copy link
Member

zsxwing commented Feb 18, 2022

is it supposed to wait until it's done before it realizes it's going to fail?

Good catch. This is an issue of Scala par collection we use here... I guess we need to remove the fancy Scala par collection and use the regular Java multiple thread code. But since this is a pretty big PR, it's better to merge this first and address issues in followup PRs.

@vkorukanti
Copy link
Collaborator Author

vkorukanti commented Feb 18, 2022

is it supposed to wait until it's done before it realizes it's going to fail?

Good catch. This is an issue of Scala par collection we use here... I guess we need to remove the fancy Scala par collection and use the regular Java multiple thread code. But since this is a pretty big PR, it's better to merge this first and address issues in followup PRs.

Couple of approaches here:

  • fail the optimize command as soon as there is a single job failure
  • continue working and at the end capture partial progress of successful jobs and commit. I am planning to work on this once this PR is merged.

GitOrigin-RevId: 1193adc1543f26a86b07d3ad5b1946e203eef87a
GitOrigin-RevId: 5c811f5f6d6e66dc2e648bd8fa1d9f57d9efaea5
Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Given this is a large PR, I prefer to get this merged first and we can complete the following two tasks in followup PRs:

  • Eliminate the unnecessary sort.
  • Better cancellation support for optimize.

@vkorukanti
Copy link
Collaborator Author

LGTM. Given this is a large PR, I prefer to get this merged first and we can complete the following two tasks in followup PRs:

  • Eliminate the unnecessary sort.

Created issue #948

  • Better cancellation support for optimize.

Created issue #949

jbguerraz pushed a commit to jbguerraz/delta that referenced this pull request Jul 6, 2022
This PR adds the functionality to compact small files in a Delta table into large files through OPTIMIZE SQL command. The file compaction process potentially improves the performance of read queries on Delta tables. This processing (removing small files and adding large files) doesn't change the data in the table and the changes are committed transactionally to the Delta log.

**Syntax of the SQL command**
```
OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25];
```
* The SQL command allows selecting subset of partitions to operate file compaction on.

**Configuration options**
- `optimize.minFileSize` - Files which are smaller than this threshold (in bytes) will be grouped together and rewritten as larger files.
- `optimize.maxFileSize` - Target file size produced by the OPTIMIZE command.
- `optimize.maxThreads` - Maximum number of parallel jobs allowed in OPTIMIZE command

New test suite `OptimizeCompactionSuite.scala`

Closes delta-io#934

GitOrigin-RevId: f818d49b0f13296768e61f9f06fadf33a7831056
jbguerraz pushed a commit to jbguerraz/delta that referenced this pull request Jul 6, 2022
This PR adds the functionality to compact small files in a Delta table into large files through OPTIMIZE SQL command. The file compaction process potentially improves the performance of read queries on Delta tables. This processing (removing small files and adding large files) doesn't change the data in the table and the changes are committed transactionally to the Delta log.

**Syntax of the SQL command**
```
OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25];
```
* The SQL command allows selecting subset of partitions to operate file compaction on.

**Configuration options**
- `optimize.minFileSize` - Files which are smaller than this threshold (in bytes) will be grouped together and rewritten as larger files.
- `optimize.maxFileSize` - Target file size produced by the OPTIMIZE command.
- `optimize.maxThreads` - Maximum number of parallel jobs allowed in OPTIMIZE command

New test suite `OptimizeCompactionSuite.scala`

Closes delta-io#934

GitOrigin-RevId: f818d49b0f13296768e61f9f06fadf33a7831056
@vkorukanti vkorukanti deleted the optimize branch September 14, 2023 11:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for Optimizing (file compaction) Delta Lake tables
4 participants