Skip to content

Conversation

nuno-faria
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

Controlling the memory used in the metadata cache prevents unbounded growth, which otherwise could even lead to OOM for long-running applications. It uses a LRU algorithm to evict entries.

The limit can be set with the runtime config datafusion.runtime.file_metadata_cache_limit (e.g., set datafusion.runtime.file_metadata_cache_limit = '1G').

What changes are included in this PR?

  • Added the datafusion.runtime.file_metadata_cache_limit config (defaults to '1G').
  • Added the lru crate to implement the LRU semantics.
  • Added memory related methods to the FileMetadata and FileMetadataCache traits.
  • Updated the DefaultFilesMetadataCache struct to limit the memory used.
  • Updated the CachedParquetMetaData struct to provide the Parquet metadata size.
  • Added unit tests.

Are these changes tested?

Yes

Are there any user-facing changes?

Added a new runtime config to control the metadata cache, but since it is disabled by default, it does not impact existing applications.

@github-actions github-actions bot added documentation Improvements or additions to documentation core Core DataFusion crate execution Related to the execution crate datasource Changes to the datasource crate labels Aug 4, 2025
Copy link
Contributor Author

@nuno-faria nuno-faria left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @alamb

Cargo.toml Outdated
indexmap = "2.10.0"
itertools = "0.14"
log = "^0.4"
lru = "0.16.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lru crate appears to be well maintained and used by a number of popular crates (like tracing-log, redis, and aws-sdk-s3). Let me know if it is ok to include it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it seems to be a reasonable crate. However, I think in general if we can avoid new dependencies in DataFusion that would be good -- our dependency trail is already quite large, and I realize one new dependency doesn't seem like much (but that is what we said when introducing all the existing ones too 😢 )

Note lru is also a net new dependency (no existing DataFusion dependency uses it)

It also has a bunch of unsafe which isn't necessarily a deal breaker itself, but unless it is performance critical I think we should avoid a potential source of crashing / non deterministic bugs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I did some research and I think implementing a LRU cache in Rust that actually has O(1) properties will be non trivial (there is a good writeup here: https://seanchen1991.github.io/posts/lru-cache/)

My personal preference would be to implement something custom but I am really torn about this, especially given it would be nice to implement other LRU caches (like listing and statistics, for example) 🤔

The best I could come up with was using a HashMap<Path, usize> that maps to an index, in a VecDequeue that implements the linked list implemented as described in the blog. I don't think it would be simple though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O(1) only tells you half of the story. A traditional LRU still requires expensive bookkeeping for a GET+MISS case since you need to preserve the LRU order for evictions -- or you perform a potentially expensive (partial) sort whenever you need to evict data. So I think the question should be if this cache is read optimized or not and if your GET operations are concurrent. If both questions are answered with YES, then I suggest we just implement SIEVE. It's simple and quite performant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spoke with @XiangpengHao this afternoon and we have a proposal:

  1. Use a FIFO queue -- aka evict the oldest elements first (e.g. store Paths in a VecDeque or something and remove them from the HashSet in order when the cache is full)

Benefits of this approach

  1. Simple to implement
  2. Predictable, easy to explain behavior

Downsides:

  1. Can evict item that are frequently used
  2. Doesn't account for how costly an item is to keep in the cache

However, I think this is an appropriate tradeoff for the default implementation because

  1. a properly sized cache will have no evictions (so the eviction strategy for the simple case doesn't matter)
  2. users can supply their own implementations for more advanced usecases

I argue against including anything more complicated in the initial implementation because of the many tradeoffs. A cache eviction strategy needs to weigh multiple different factors, for example

  1. The cost (eg. size in bytes) of keeping elements in the cache
  2. The benefit (e.g. how much will the cached metadata improve future queries)
  3. The cost of next miss (e.g. the cost to reload metadata from a local disk is likely much lower than from S3)

The relative tradeoffs between these factors likely varies substantially from system to sytem

Users with more advanced needs can implement whatever strategy is most appropriate to their system via the traits

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a custom LRU queue implementation almost finished when I got this notification 😅.
I left the details below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropping https://crates.io/crates/s3-fifo as an interesting related thing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the (very nicely) coded lru in this PR and I think it is quite good. Unless there are any objections, I think we can go with what is here

self.put(key, value)
}

fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that the remove method is the only one in the CacheAccessor trait that expects a &mut. This appears to be inconsistent with the other update methods, but I did not change it since the trait is public.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I suggest we propose this cleanup in a follow on PR / ticket so we can discuss it separately if desired

@alamb
Copy link
Contributor

alamb commented Aug 4, 2025

Thank you -- I will review this later today

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @nuno-faria -- I found this PR well commented, and well tested as always 🏆

My only hesitation is adding the lru dependency -- however, I spent a while this afternoon trying to come up with a plausible alternative and I was not able to so

I would be curious what you think / if you have some other way to implement a LRU cache without a new dependency

Cargo.toml Outdated
indexmap = "2.10.0"
itertools = "0.14"
log = "^0.4"
lru = "0.16.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it seems to be a reasonable crate. However, I think in general if we can avoid new dependencies in DataFusion that would be good -- our dependency trail is already quite large, and I realize one new dependency doesn't seem like much (but that is what we said when introducing all the existing ones too 😢 )

Note lru is also a net new dependency (no existing DataFusion dependency uses it)

It also has a bunch of unsafe which isn't necessarily a deal breaker itself, but unless it is performance critical I think we should avoid a potential source of crashing / non deterministic bugs

update_limit(&ctx, "2G").await;
assert_eq!(get_limit(&ctx), Some(2 * 1024 * 1024 * 1024));

update_limit(&ctx, "123K").await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

let mut source = ParquetSource::new(self.options.clone());

// Use the CachedParquetFileReaderFactory when metadata caching is enabled
if self.options.global.cache_metadata {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are your thoughts about (in a follow on PR) removing the options.cache_metadata and always trying to save the metadata (which will be a noop if there is no room)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think caching by default would be good. The only situation where it wouldn't help would be one-time scans of parquet files that do not require the page index, but for large files the scan should largely outweigh the page index retrieval anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file_statistic_cache: Option<FileStatisticsCache>,
list_files_cache: Option<ListFilesCache>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
file_metadata_cache: Arc<dyn FileMetadataCache>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing the idea of having a default file_metadata_cache installed got me thinking about @BlakeOrth's comment here: #16971 (comment)

After this work to cache file metadata, it seems like we may want to consider adding default caches for ListFiles and FileStatistics as well 🤔 (as a follow on PR of course)

self.put(key, value)
}

fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I suggest we propose this cleanup in a follow on PR / ticket so we can discuss it separately if desired

Cargo.toml Outdated
indexmap = "2.10.0"
itertools = "0.14"
log = "^0.4"
lru = "0.16.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I did some research and I think implementing a LRU cache in Rust that actually has O(1) properties will be non trivial (there is a good writeup here: https://seanchen1991.github.io/posts/lru-cache/)

My personal preference would be to implement something custom but I am really torn about this, especially given it would be nice to implement other LRU caches (like listing and statistics, for example) 🤔

The best I could come up with was using a HashMap<Path, usize> that maps to an index, in a VecDequeue that implements the linked list implemented as described in the blog. I don't think it would be simple though

/// assert_eq!(lru_queue.pop(), Some((2, 20)));
/// assert_eq!(lru_queue.pop(), None);
/// ```
pub struct LruQueue<K: Eq + Hash + Clone, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see that you had implemented this -- I will review shortly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation uses no unsafe blocks. The "unsafest" part is when we upgrade the Weak pointers in the doubly-linked list and then unwrap them, however we guarantee that the strong reference is always in the data map.
While I believe this implementation could be more efficient, I tried to keep it as simple as possible (e.g., get does a remove and a put, instead of something more complex). A quick bench shows that it reaches >1M puts/gets/pops per second, which should be enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can start with this and simplify / improve the performance as follow on PRs if people hit issues with it

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @nuno-faria -- I think this PR looks great to me. There are several items I think we should improve, but we can also do it as a follow on PR and are not strictly required for this PR

  1. Change default to something more like 50MB
  2. Use usize rather than Option for cache
  3. Avoid unreachable / panic
  4. Update the config description:

Nice to consider:

  1. using parking_lot: https://github.com/nuno-faria/datafusion/pull/1/files

I also reviewed the test coverage using

cargo llvm-cov test --html -p datafusion-execution -- lru_queue

And all the code is covered:
Screenshot 2025-08-05 at 4 14 28 PM

(BTW the only thing not covered is is_empty which I don't think is a problem):
Screenshot 2025-08-05 at 4 15 48 PM

/// assert_eq!(lru_queue.pop(), Some((2, 20)));
/// assert_eq!(lru_queue.pop(), None);
/// ```
pub struct LruQueue<K: Eq + Hash + Clone, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can start with this and simplify / improve the performance as follow on PRs if people hit issues with it

@nuno-faria
Copy link
Contributor Author

Thanks @alamb for the review. I think the remaining TODOs are now finished.

I also reviewed the test coverage using

cargo llvm-cov test --html -p datafusion-execution -- lru_queue

Thanks for the tip. I added the missing test to get to 100% coverage.

@alamb
Copy link
Contributor

alamb commented Aug 6, 2025

Thanks @alamb for the review. I think the remaining TODOs are now finished.

I also reviewed the test coverage using

cargo llvm-cov test --html -p datafusion-execution -- lru_queue

Thanks for the tip. I added the missing test to get to 100% coverage.

100% coverage -- now that is attention to detail!

@alamb
Copy link
Contributor

alamb commented Aug 6, 2025

I just looked at the latest changes and they look great to me. I'll plan to merge this PR in tomorrow unless anyone else would like time to review

@alamb alamb merged commit a9e6d4b into apache:main Aug 7, 2025
28 checks passed
@alamb
Copy link
Contributor

alamb commented Aug 7, 2025

Let's go! Thanks again @nuno-faria

@nuno-faria nuno-faria deleted the cache_metadata_limit branch August 7, 2025 10:35
Comment on lines -177 to +320
self.metadata
.get(&k.location)
.map(|s| {
let (extra, metadata) = s.value();
if extra.size != k.size || extra.last_modified != k.last_modified {
None
} else {
Some(Arc::clone(metadata))
}
})
.unwrap_or(None)
let mut state = self.state.lock().unwrap();
state.get(k)
Copy link

@abhita abhita Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @nuno-faria @alamb
Trying to comprehend the requirement of locks over reads.
From what I can understand is for this operation, the state of MetadataCache would be altered as the cache_hits would be increased for each read operation. Wouldn't this affect the cache read-throughput in case of concurrent reads?

Could you help shed some light onto the whole context of requirement of lock here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to the cache_hits, the get operation also changes the queue itself, since it promotes elements to the top as it executes with Least Recently Used guarantees. Because the queue mutates, the lock is required.

As for the performance, I tested it at the time and it could comfortably handle millions of reads/writes per second, so it should not be a problem. In any case, the user can always supply a custom FileMetadataCache implementation if necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are seeing any issues or have suggestions for improvements @abhita we would love to hear them as well

Copy link

@abhita abhita Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If maintaining the data-structure for the eviction was the key-factor here, can we think of having a standard/common data-structure like DashMap<> for maintaining Cache-Entries and have a tracking data-structure for the Keys?(Cache structure agnostic of eviction policies)
This way, the code becomes much more extensible for plugging in custom eviction policies in the future.

This would also give us the opportunity to explore locks over Keys rather than the whole cache.
Any thoughts?
@nuno-faria @alamb

Referring to liquid-cache implementation as per : https://github.com/XiangpengHao/liquid-cache/blob/d232270cfaf495ba257d748a51123673409f7c72/src/storage/src/cache/policies.rs#L85

Copy link
Contributor

@alamb alamb Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tanks @abhita ! I think if you are interested in alternative approaches the best thing to do would be

  1. Create some sort of benchmark (maybe make 10k little parquet files with small metadata) that shows cache management consuming significant time

Then you could explore various strategies to improve the performance

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.
Curious on why is Cache Data Structure tightly-coupled to the Eviction Policy
@alamb @nuno-faria

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how to answer that question. It sounds like a more general software structure question rather than something specific about this code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation execution Related to the execution crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Parquet Metadata Cache]: Limit memory used
5 participants