-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Is your feature request related to a problem or challenge?
I would like to propose APIs / maybe a non-default implementation of a "filter cache". It's an idea I got from the paper titled Predicate Caching: Query-Driven Secondary Indexing for Cloud Data Warehouse
published by AWS.
Maybe I've misunderstood the paper, etc. but what we ended up implementing in our production system to great effect is essentially: HashMap<(file_path, row_group, filter), Bitmap>
.
We check this cache when doing a scan on a file before we even read the parquet metadata.
The advantages of this system are:
- Very little data to cache. The key is relatively small, the values can be compressed using roaring bitmaps, etc.
- High selectivity: this can prune down to the row level
- Does not even require loading parquet metadata to check. This is especially important for cases where you can skip entire files -> zero IO / processing done to skip it.
- Works for arbitrary filters
- Could be easily serialized e.g. to store it in Redis
- Multiple filters can be combined via the bitmap
Some more details.
We created an enum of the form:
struct FilterCacheResult {
/// No rows in the container match
None,
/// All rows in the container match
All,
/// A known set of rows in the container match with no false positives
Exact(Bitmap)
}
The point of the None
variant is that we can cache negative and partial results, including the result of bloom filters, page pruning, etc. So for example if we run bloom filters and get back that no rows match -> cache that -> next time you don't even need to read the bloom filters or parquet metadata.
We also configured our cache policy (using S3-FIFO via Foyer I think) to avoid caching on the first hit, which removes useless caching of timestamps, UUIDs, etc. that would never result in another cache hit.
Creating the cache key is relatively easy:
- For
PhysicalExpr
we convert it to protobuf and hash the protbuf bytes - Bloom filters / stats filters just need to be a string of the form
bloom(column)
We make a cache entry per filter after splitting them using split_conjunction
.
Tagging authors of the paper that I could find on GitHub: @TobiasSchmidtDE, @andreaskipf, @DominikHorn
Describe the solution you'd like
APIs for custom implementations to be able to do this, maybe even a default implementation that does simple in-memory caching.
I think this could be a relatively simple trait that is passed into ParquetSource
and would be pretty straightforward to integrate into the codebase.
Unfortunately I don't think this will give us big wins on benchmarks since those don't scan the same data multiple times, but I expect a big impact for some real world users of DataFusion.