Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vacuum is very slow #395

Closed
lyogev opened this issue Apr 21, 2020 · 10 comments
Closed

Vacuum is very slow #395

lyogev opened this issue Apr 21, 2020 · 10 comments

Comments

@lyogev
Copy link

lyogev commented Apr 21, 2020

Hi,
I just started using delta lake and noticed that vacuum creates a single job per file in spark, does this make any sense?

@rahulsmahadev
Copy link
Collaborator

Hey @lyogev Have you tried setting a lower value for park.sql.sources.parallelPartitionDiscovery.parallelism, it is 10000 by default in Apache Spark.

@lyogev
Copy link
Author

lyogev commented Apr 22, 2020

Yes, but it doesn't change the fact that I'm getting 200 job each vacuum (my output is 200 files each time)
Note that this is a non-partitioned table

Screen Shot 2020-04-22 at 12 32 14

@brkyvz
Copy link
Collaborator

brkyvz commented Apr 22, 2020

@lyogev If you're dealing with smaller tables, you can definitely reduce that configuration. It doesn't launch 1 Spark job per file, it actually launches 1 Spark job with 200 partitions that's supposed to list your table directory. It pulls them to the Driver 1 by 1 to avoid OOMing the Driver, causing Vacuum to launch 200 Spark jobs.

There is room for optimization here. We cache the file listing. We can count the number of files, and then repartition the result according to a heuristic to avoid launching 200 small jobs. Is this something you would consider contributing? :)

@lyogev
Copy link
Author

lyogev commented Apr 26, 2020

Hi @brkyvz I would love to contibute.
However, right now we are heavily invested in hudi.
I have performed some tests on delta as the open source seemed mature enough to test it.
So i would love to share some insights, in the end if we decide to switch from hudi to delta I would definitely contribute.
So this is what I have right now :)

Pros for delta

  1. Ingestion is much faster, 2X to 4X when using merge in delta vs upsert in hudi (copy on write).
  2. System is very simple to use, much less configurations and API is clean.

Cons for delta

  1. No hive metastore support, without this we can't use it
  2. Slow vacuuming (the above issue)
  3. Queries are slower with delta, simple select queries return in 2-3X time but in a count(1) query, it's 10X slower (using databricks to test this)
  4. Missing the auto optimize and auto compaction feature in the open source version, these are included in hudi.

So basically the slow query performance + missing hive is what makes this unattractive for now, although I would love that ingestion speed!

@redsk
Copy link

redsk commented Apr 26, 2020

There is room for optimization here. We cache the file listing. We can count the number of files, and then repartition the result according to a heuristic to avoid launching 200 small jobs. Is this something you would consider contributing? :)

@brkyvz If I understand the code correctly, it would be a matter of changing VacuumCommand.scala from this

        val diff = allFilesAndDirs
          .where('modificationTime < deleteBeforeTimestamp || 'isDir)
          .mapPartitions { fileStatusIterator => ...

to something like this

        val diffFiles = allFilesAndDirs
          .where('modificationTime < deleteBeforeTimestamp || 'isDir)

        val diff = diffFiles
           .repartition(repartitionHeuristic(diffFiles))
           .mapPartitions { fileStatusIterator => ...

        // 1024 files per partition up to a max of 128 partitions
        def repartitionHeuristic(diffFiles: Dataset[SerializableFileStatus]): Long = {
           val nrFiles = diffFiles.count
           math.min(128, math.max(nrFiles / 1024L, 1L))
        }

Or would you rather envision something more sophisticated?

@prasadvaze
Copy link

prasadvaze commented Apr 26, 2020

So when you said no hive metastore support - what do you mean exactly?

Pros for delta

  1. Ingestion is much faster, 2X to 4X when using merge in delta vs upsert in hudi (copy on write).
  2. System is very simple to use, much less configurations and API is clean.

Cons for delta

  1. No hive metastore support, without this we can't use it
  2. Slow vacuuming (the above issue)
  3. Queries are slower with delta, simple select queries return in 2-3X time but in a count(1) query, it's 10X slower (using databricks to test this)
  4. Missing the auto optimize and auto compaction feature in the open source version, these are included in hudi.

So basically the slow query performance + missing hive is what makes this unattractive for now, although I would love that ingestion speed!

@lyogev
Copy link
Author

lyogev commented Apr 26, 2020

@prasadvaze I mean you can't register tables yet per #85

@prasadvaze
Copy link

prasadvaze commented Apr 27, 2020

@lyogev per comment by @ abiratsis in #375 , looks like he was able to register delta table in hiveMetastore

val someDf = spark.emptyDataset[SomeSchema].toDF
// first create Delta table
someDf.write.format("delta").partitionBy("day").save(outputPath)
// then register it to hiveMetastore
spark.sql(s"CREATE TABLE ${tableName} USING DELTA LOCATION '${outputPath}'")

The only strange behaviour is the unexpected path of the Delta table as already mentioned in issue #85

@brkyvz
Copy link
Collaborator

brkyvz commented Apr 27, 2020

@lyogev @prasadvaze MetaStore support should be coming in the next two weeks. We're working on getting Delta to compile with Spark 3.0 this week. Then we can release the support for Hive MetaStore tables.

@redsk I would do something very similar to that a bit later in the code (right before the delete).
I wouldn't cap max parallelism, that would lead to scaling issues. However, the simple heuristic of math.max(nrFiles / 1024L, 1L) seems good to me. We may want to make the 1024 configurable instead of hardcoded.

@mohamed-a-abdelaziz
Copy link

@brkyvz I think the bottleneck is deleting files one by one, I was able to parallelize it and deleting around 300K files in 10 minutes using >100GB,32CPUs
Kindly check my PR #529

@brkyvz brkyvz closed this as completed in 40182f3 Oct 15, 2020
tdas pushed a commit to tdas/delta that referenced this issue May 31, 2023
* PR_20_UpdateFlinkSource_JavaDoc - changes to Javadocs

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR_20_UpdateFlinkSource_JavaDoc - changes to Javadocs

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* PR_20_UpdateFlinkSource_JavaDoc - changes to Javadocs

Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants