Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid repeated reading of the DeltaLog #65

Merged
merged 16 commits into from
Feb 4, 2022

Conversation

osopardo1
Copy link
Member

@osopardo1 osopardo1 commented Jan 17, 2022

This PR fixes #61

The changes made are:

  • OTreeIndex now extends FileIndex instead of TahoeLogFileIndex
  • QueryExecutor doesn't need previouslyMatchedFiles anymore
  • QbeastFile now it's named QbeastBlock
  • Add more metadata to the QbeastBlock: size and modificationTime
  • CubeStatus now contains QbeastBlock objects instead of the path

@osopardo1 osopardo1 requested a review from cugni January 17, 2022 14:58
Copy link
Member

@cugni cugni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we do buildCubesStatuses in memory now, what was the need?
Also, we could change some class/method names to improve readability and be more consistent with the documentation.

@osopardo1
Copy link
Member Author

osopardo1 commented Jan 26, 2022

#65 (comment)

In relation to this comment, I did the following comparisons using different approaches (collect before, collect after and the code in the main branch) on an indexed store_sales table of the 10gb TPC-DS dataset.

df.write
      .format("qbeast")
      .option("columnsToIndex", "ss_item_sk,ss_customer_sk")
      .option("cubeSize", "300000")
      .save(qbeastDir)

The times compared are from two different methods:

  • Query. I compared running a count on the whole dataset both with Delta and Qbeast implementations
val qbeast = spark.read.format("qbeast").load(qbeastDir)
val delta = spark.read.format("delta").load(qbeastDir)

qbeast.count()
delta.count()
  • ListFiles. This is the method used by the TahoeLogFileIndex and OTreeIndex for filtering the files needed to query. It is useful to know if we improved something by reading only once the log
val deltaLog = DeltaLog.forTable(spark, qbeastDir)
val tahoeLogFileIndex =
  TahoeLogFileIndex(spark, deltaLog, deltaLog.dataPath, deltaLog.snapshot)
val oTreeIndex = OTreeIndex(tahoeLogFileIndex)

tahoeLogFileIndex.listFiles(partitionFilters = Seq.empty, dataFilters = Seq.empty)
oTreeIndex.listFiles(partitionFilters = Seq.empty, dataFilters = Seq.empty)

These are the implementations tested:

  • Delta → qbeast indexed but read as delta
  • Qbeast main branch → qbeast indexed but read with the main version
  • Qbeast PR65 commit f2445ec → Collect after groupBy in buildCubeStatuses
  • Qbeast PR65 commit 36eea23 → Collect before groupBy in buildCubeStatuses
Operation Delta Qbeast Main QBeast PR65 Collect After Qbeast PR65 Collect Before
Query 1.38s 2.94s 2.36s 1.73s
ListFiles 0.18s 1.4s 0.9s 0.3s

The tests were run locally. Feel free to discuss this approach here, and to suggest reversion of the changes! Thank you!

Change QbeastFile to QbeastBlock

Make matchingBlocks method protected

Filtering by revision on DeltaQbeastSnapshot
@osopardo1
Copy link
Member Author

osopardo1 commented Jan 27, 2022

Made more test on a distributed cluster, just to have more numbers. Here are the results:

mode: DISTRIBUTED, cluster: 4 node cluster, data:10gb store sales, columnsToIndex: ss_customer_sk, ss_item_sk, cubeSize default , num files: 5000+ files

Operation Delta Qbeast Main Qbeast PR65 Collect After Qbeast PR65 Collect Before
Query 8.2s 10.74s 10.05s 10.2s
ListFiles 0.43s 2.1s 0.91s 0.53s

mode: DISTRIBUTED, cluster: 4 node cluster, data:10gb store sales, columnsToIndex: ss_customer_sk, ss_item_sk, cubeSize 500000 , num files: 800+ files

Operation Delta Qbeast Main Qbeast PR65 Collect After Qbeast PR65 Collect Before
Query 1.64s 2.19s 3.14s 2.55s
ListFiles 0.32s 0.65s 0.96s 0.42s

mode: DISTRIBUTED, cluster: 4 node cluster, data:100gb store sales, columnsToIndex: ss_customer_sk, ss_item_sk, cubeSize 50000000 , num files: 86 files

Operation Delta Qbeast Main Qbeast PR65 Collect After Qbeast PR65 Collect Before
Query 0.96s 1.47s 1.46s 1.83s
ListFiles 0.24s 0.59s 0.59s 0.42s

We can see two things:

  • Penalization of reading multiple files.
  • For the largest DeltaLogs (for example the first one, with 5000+ files), this PR outputs better results on listFiles. In the other cases, the results are similar.

I will revert to a collect() after the groupBy because due it can give memory problems. delta-io/delta#779 .
We could think about optimization later.

@osopardo1 osopardo1 requested a review from cugni January 27, 2022 10:30
@osopardo1 osopardo1 merged commit 0bc21a6 into Qbeast-io:main Feb 4, 2022
@osopardo1 osopardo1 deleted the 61-reduce-delta-log-reads branch February 26, 2022 08:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Why do we read twice the delta log when doing a query?
2 participants