-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Faster ListingTable partition listing (#6182) #6183
Conversation
|
||
let batch = RecordBatch::try_new(schema.clone(), arrays)?; | ||
|
||
// TODO: Plumb this down |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a pre-existing issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was even worse (the old code creates an entire SessionContext
right)?
I'll give this a try tomorrow, thanks for this! |
"" => path, | ||
p => path.strip_prefix(p)?.strip_prefix(DELIMITER)?, | ||
}; | ||
let stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous logic would return None for an exact match
@kylebrooks-8451 did you manage to give this a go and did it help your use-case? |
Marking as ready for review, as resolves a longstanding todo, and should scale significantly better. Perhaps @yahoNanJing you might be able to give this a test, as I seem to remember you having some non-trivial workloads that exercise this functionality? |
@tustvold - We did test this on a very large table, we gave it 10 minutes to list and it still was running. For reference, PyArrow with adlfs fsspec takes around 2-3 minutes on this same table. Let me test it again on this latest commit. Edit: Correction - This was using a trailing |
That is disappointing, I presume you are running in release mode? I will push a commit that adds some logs so that we can get some insight into what it is spending its time doing |
I see the commit to add logs. Let me build this in release on an Azure VM and try to run against our largest parquet dataset. |
Aah, I didn't realise this was Azure... Azure Blob Storage is notoriously slow, still we should be able to at least match fsspec. Interested to see where it is spending time. |
Still debugging this, I noticed this error:
|
Aah, I worried that might happen, we should probably limit the maximum number of concurrent requests when listing the partitions. Will push a fix later today |
@tustvold It's working now and very fast, 7 seconds for this table. I'm worried this might not be real results because it seems to not be reading any parquet files only the partition folder. The inferred schema is only partition columns no actual data from parquet. Is there some setting I'm missing to read the files / infer their schema? |
Is it possible your query has predicates that aren't satisfied by any of the partitions, i.e. it is pruning everything out? |
Following some investigation with @kylebrooks-8451 I believe the conclusions to be:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @tustvold -- I think this PR really improves the code. ❤️
My only concern about this PR is that it removes some tests -- there is probably a good reason for doing so, but I wanted to get the answer / rationale before approving
cc @thinkharderdev / @Dandandan I wonder if you have time to review this idea. It seems like a great idea to me, but I don't think I have a great way to test it.
@@ -153,225 +151,239 @@ pub fn split_files( | |||
.collect() | |||
} | |||
|
|||
struct Partition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding some comments about specifically what depth
and files
fields mean would help readability. Like are the files only files or do they include paths and what does depth
signify?
futures.push(partition.list(store)); | ||
|
||
while let Some((partition, paths)) = futures.next().await.transpose()? { | ||
if let Some(next) = pending.pop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an invariant that if pending is non empty, then futures.len() prior to the loop is CONCURRENCY_LIMT
? I am trying work out why only one pending future pushed to futures
rather than pushing while futures.len() < CONCURRENCY_LIMIT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each iteration of the loop can at most complete one future, therefore freeing up at most one "slot" in futures. If pending
contains anything it implies that we were at CONCURRENCY_LIMIT before we polled futures, and therefore can only add at most one future
|
||
let batch = RecordBatch::try_new(schema.clone(), arrays)?; | ||
|
||
// TODO: Plumb this down |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was even worse (the old code creates an entire SessionContext
right)?
None => true, | ||
}; | ||
|
||
let glob_match = self.contains(path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a nice refactoring
) | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_path_batch_roundtrip_no_partiton() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What were these tests removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is testing logic that no longer exists, we no longer encode filenames to a RecordBatch, filter it and convert it back, instead evaluating the expressions directly
I can file a ticket for this if you would like |
For some perspective, this PR is able to list a ~27,000 partition table in Azure Blob storage in 7 seconds whereas the PyArrow Dataset for the same table the adlfs fsspec FileSystem takes 2.5 minutes. The old DataFusion code before this PR never finished after waiting > 10 minutes. Fantastic work @tustvold! |
🎉 that is amazing -- thank you for the measurement @kylebrooks-8451 |
Schema inference PR : #6366 |
Which issue does this PR close?
Closes #6182
Rationale for this change
There were reports of slow listing performance
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?