Skip to content

Commit 9076ec4

Browse files
authored
chore: Adds size weighting and time-to-idle support to LruParquetMetadataCache via moka (#24)
1 parent 9caae4f commit 9076ec4

File tree

2 files changed

+27
-79
lines changed

2 files changed

+27
-79
lines changed

datafusion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ itertools = "0.9.0"
7272
lru = "0.6.5"
7373
serde = { version = "1.0", features = ["rc"] }
7474
serde_derive = "1.0"
75+
moka = "0.8.2"
7576
tracing = "0.1.25"
7677
tracing-futures = { version = "0.2.5", features = ["tokio", "tokio-executor"] }
7778

datafusion/src/physical_plan/parquet.rs

Lines changed: 26 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use std::fmt;
2121
use std::fs::File;
22-
use std::sync::{Arc, Mutex};
22+
use std::sync::Arc;
2323
use std::task::{Context, Poll};
2424
use std::{any::Any, convert::TryInto};
2525

@@ -52,13 +52,16 @@ use parquet::file::{
5252

5353
use fmt::Debug;
5454
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
55+
use std::fmt::Formatter;
56+
use std::time::Duration;
5557

5658
use tokio::sync::mpsc::{channel, Receiver, Sender};
5759
use tokio_stream::wrappers::ReceiverStream;
5860

5961
use crate::datasource::datasource::{ColumnStatistics, Statistics};
6062
use async_trait::async_trait;
6163
use futures::stream::{Stream, StreamExt};
64+
use moka::sync::Cache;
6265
use parquet::file::metadata::ParquetMetaData;
6366

6467
use super::SQLMetric;
@@ -135,48 +138,6 @@ pub trait ParquetMetadataCache: Debug + Sync + Send {
135138
(*metadata).clone(),
136139
))
137140
}
138-
139-
/// Returns a copy of the cache stats.
140-
fn stats(&self) -> ParquetMetadataCacheStats;
141-
}
142-
143-
/// Stats for ParquetMetadataCache.
144-
#[derive(Clone, Debug)]
145-
pub struct ParquetMetadataCacheStats {
146-
hits: u64,
147-
misses: u64,
148-
}
149-
150-
impl ParquetMetadataCacheStats {
151-
/// Returns a new ParquetMetadataCacheStats
152-
pub fn new() -> Self {
153-
ParquetMetadataCacheStats { hits: 0, misses: 0 }
154-
}
155-
156-
/// Returns the number of cache reads.
157-
pub fn reads(&self) -> u64 {
158-
self.hits + self.misses
159-
}
160-
161-
/// Returns the number of cache hits.
162-
pub fn hits(&self) -> u64 {
163-
self.hits
164-
}
165-
166-
/// Returns the numbere of cache misses.
167-
pub fn misses(&self) -> u64 {
168-
self.misses
169-
}
170-
171-
/// Increments the number of cache hits.
172-
pub fn hit(&mut self) {
173-
self.hits += 1;
174-
}
175-
176-
/// Increments the number of cache misses.
177-
pub fn miss(&mut self) {
178-
self.misses += 1;
179-
}
180141
}
181142

182143
/// Default MetadataCache, does not cache anything
@@ -194,59 +155,45 @@ impl ParquetMetadataCache for NoopParquetMetadataCache {
194155
fn metadata(&self, _key: &str, file: &File) -> Result<Arc<ParquetMetaData>> {
195156
Ok(Arc::new(footer::parse_metadata(file)?))
196157
}
197-
198-
fn stats(&self) -> ParquetMetadataCacheStats {
199-
ParquetMetadataCacheStats::new()
200-
}
201158
}
202159

203160
/// LruMetadataCache, caches parquet metadata.
204-
#[derive(Debug)]
205161
pub struct LruParquetMetadataCache {
206-
data: Mutex<LruParquetMetadataCacheData>,
207-
}
208-
209-
#[derive(Debug)]
210-
struct LruParquetMetadataCacheData {
211-
cache: lru::LruCache<String, Arc<ParquetMetaData>>,
212-
stats: ParquetMetadataCacheStats,
162+
cache: Cache<String, Arc<ParquetMetaData>>,
213163
}
214164

215165
impl LruParquetMetadataCache {
216166
/// Creates a new LruMetadataCache
217-
pub fn new(metadata_cache_capacity: usize) -> Arc<Self> {
167+
pub fn new(max_capacity: u64, time_to_idle: Duration) -> Arc<Self> {
218168
Arc::new(LruParquetMetadataCache {
219-
data: Mutex::new(LruParquetMetadataCacheData {
220-
cache: lru::LruCache::new(metadata_cache_capacity),
221-
stats: ParquetMetadataCacheStats::new(),
222-
}),
169+
cache: moka::sync::Cache::builder()
170+
.weigher(|_, value: &Arc<ParquetMetaData>| value.metadata_size())
171+
.max_capacity(max_capacity)
172+
.time_to_idle(time_to_idle)
173+
.build(),
223174
})
224175
}
225176
}
226177

178+
impl Debug for LruParquetMetadataCache {
179+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
180+
f.debug_struct("LruParquetMetadataCache")
181+
.field("cache", &"<moka::sync::Cache>")
182+
.finish()
183+
}
184+
}
185+
227186
impl ParquetMetadataCache for LruParquetMetadataCache {
228187
fn metadata(&self, key: &str, file: &File) -> Result<Arc<ParquetMetaData>> {
229-
{
230-
let mut data = self.data.lock().unwrap();
231-
let metadata = data.cache.get(&key.to_string());
232-
if let Some(metadata) = metadata {
233-
let result = Ok(metadata.clone());
234-
data.stats.hit();
235-
return result;
236-
} else {
237-
data.stats.miss();
188+
let k = key.to_string();
189+
match self.cache.get(&k) {
190+
Some(metadata) => Ok(metadata),
191+
None => {
192+
let metadata = Arc::new(footer::parse_metadata(file)?);
193+
self.cache.insert(k, metadata.clone());
194+
Ok(metadata)
238195
}
239196
}
240-
let metadata = Arc::new(footer::parse_metadata(file)?);
241-
{
242-
let mut data = self.data.lock().unwrap();
243-
data.cache.put(key.to_string(), metadata.clone());
244-
}
245-
Ok(metadata)
246-
}
247-
248-
fn stats(&self) -> ParquetMetadataCacheStats {
249-
self.data.lock().unwrap().stats.clone()
250197
}
251198
}
252199

0 commit comments

Comments
 (0)