Skip to content

Commit

Permalink
Close local shards and GC mrecordlog queues on ingester startup (#3895)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Oct 6, 2023
1 parent c561720 commit e05b708
Show file tree
Hide file tree
Showing 68 changed files with 2,322 additions and 981 deletions.
4 changes: 2 additions & 2 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ matches = "0.1.9"
md5 = "0.7"
mime_guess = "2.0.4"
mockall = "0.11"
mrecordlog = "0.4"
mrecordlog = { git = "https://github.com/quickwit-oss/mrecordlog", rev = "ae1ca3f" }
new_string_template = "1.4.0"
nom = "7.1.3"
num_cpus = "1"
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-codegen/example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use tower::{Layer, Service};

pub use crate::error::HelloError;
pub use crate::hello::*;
use crate::hello::{Hello, HelloRequest, HelloResponse};

pub type HelloResult<T> = Result<T, HelloError>;

Expand Down
5 changes: 1 addition & 4 deletions quickwit/quickwit-common/src/tower/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,7 @@ impl<R> fmt::Debug for BufferLayer<R> {

impl<R> Clone for BufferLayer<R> {
fn clone(&self) -> Self {
Self {
bound: self.bound,
_phantom: PhantomData,
}
*self
}
}

Expand Down
154 changes: 96 additions & 58 deletions quickwit/quickwit-common/src/tower/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ use std::cmp::{Eq, PartialEq};
use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use futures::{Stream, StreamExt};
use tokio::sync::RwLock;

use super::Change;

/// A pool of `V` values identified by `K` keys. The pool can be updated manually by calling the
/// `add/remove` methods or by listening to a stream of changes.
pub struct Pool<K, V> {
inner: Arc<RwLock<InnerPool<K, V>>>,
pool: Arc<RwLock<HashMap<K, V>>>,
}

impl<K, V> fmt::Debug for Pool<K, V>
Expand All @@ -49,26 +48,21 @@ where
impl<K, V> Clone for Pool<K, V> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
pool: self.pool.clone(),
}
}
}

impl<K, V> Default for Pool<K, V> {
impl<K, V> Default for Pool<K, V>
where K: Eq + PartialEq + Hash
{
fn default() -> Self {
let inner = InnerPool {
map: HashMap::new(),
};
Self {
inner: Arc::new(RwLock::new(inner)),
pool: Arc::new(RwLock::new(HashMap::default())),
}
}
}

struct InnerPool<K, V> {
map: HashMap<K, V>,
}

impl<K, V> Pool<K, V>
where
K: Eq + PartialEq + Hash + Clone + Send + Sync + 'static,
Expand All @@ -85,10 +79,10 @@ where
.for_each(|change| async {
match change {
Change::Insert(key, service) => {
pool.insert(key, service).await;
pool.insert(key, service);
}
Change::Remove(key) => {
pool.remove(&key).await;
pool.remove(&key);
}
}
})
Expand All @@ -98,59 +92,97 @@ where
}

/// Returns whether the pool is empty.
pub async fn is_empty(&self) -> bool {
self.inner.read().await.map.is_empty()
pub fn is_empty(&self) -> bool {
self.pool
.read()
.expect("lock should not be poisoned")
.is_empty()
}

/// Returns the number of values in the pool.
pub async fn len(&self) -> usize {
self.inner.read().await.map.len()
pub fn len(&self) -> usize {
self.pool.read().expect("lock should not be poisoned").len()
}

/// Returns all the keys in the pool.
pub async fn keys(&self) -> Vec<K> {
self.inner.read().await.map.keys().cloned().collect()
pub fn keys(&self) -> Vec<K> {
self.pool
.read()
.expect("lock should not be poisoned")
.iter()
.map(|(key, _)| key.clone())
.collect()
}

/// Returns all the values in the pool.
pub fn values(&self) -> Vec<V> {
self.pool
.read()
.expect("lock should not be poisoned")
.iter()
.map(|(_, value)| value.clone())
.collect()
}

/// Returns all the key-value pairs in the pool.
pub async fn all(&self) -> Vec<(K, V)> {
self.inner
pub fn pairs(&self) -> Vec<(K, V)> {
self.pool
.read()
.await
.map
.expect("lock should not be poisoned")
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.map(|(key, value)| (key.clone(), value.clone()))
.collect()
}

/// Returns the value associated with the given key.
pub async fn get<Q>(&self, key: &Q) -> Option<V>
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
Q: Hash + Eq + ?Sized,
K: Borrow<Q>,
{
self.inner.read().await.map.get(key).cloned()
self.pool
.read()
.expect("lock should not be poisoned")
.contains_key(key)
}

/// Finds a key in the pool that satisfies the given predicate.
pub async fn find(&self, func: impl Fn(&K) -> bool) -> Option<K> {
self.inner
/// Returns the value associated with the given key.
pub fn get<Q>(&self, key: &Q) -> Option<V>
where
Q: Hash + Eq + ?Sized,
K: Borrow<Q>,
{
self.pool
.read()
.await
.map
.keys()
.find(|k| func(k))
.expect("lock should not be poisoned")
.get(key)
.cloned()
}

/// Finds a key in the pool that satisfies the given predicate.
pub fn find(&self, func: impl Fn(&K, &V) -> bool) -> Option<(K, V)> {
self.pool
.read()
.expect("lock should not be poisoned")
.iter()
.find(|(key, value)| func(key, value))
.map(|(key, value)| (key.clone(), value.clone()))
}

/// Adds a value to the pool.
pub async fn insert(&self, key: K, service: V) {
self.inner.write().await.map.insert(key, service);
pub fn insert(&self, key: K, service: V) {
self.pool
.write()
.expect("lock should not be poisoned")
.insert(key, service);
}

/// Removes a value from the pool.
pub async fn remove(&self, key: &K) {
self.inner.write().await.map.remove(key);
pub fn remove(&self, key: &K) {
self.pool
.write()
.expect("lock should not be poisoned")
.remove(key);
}
}

Expand All @@ -159,11 +191,8 @@ where K: Eq + PartialEq + Hash
{
fn from_iter<I>(iter: I) -> Self
where I: IntoIterator<Item = (K, V)> {
let key_values = HashMap::from_iter(iter);
let inner = InnerPool { map: key_values };

Self {
inner: Arc::new(RwLock::new(inner)),
pool: Arc::new(RwLock::new(HashMap::from_iter(iter))),
}
}
}
Expand All @@ -180,39 +209,48 @@ mod tests {
async fn test_pool() {
let (change_stream_tx, change_stream_rx) = tokio::sync::mpsc::channel(10);
let change_stream = ReceiverStream::new(change_stream_rx);

let pool = Pool::default();
pool.listen_for_changes(change_stream);
assert!(pool.is_empty().await);
assert_eq!(pool.len().await, 0);

assert!(pool.is_empty());
assert_eq!(pool.len(), 0);

change_stream_tx.send(Change::Insert(1, 11)).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
assert!(!pool.is_empty().await);
assert_eq!(pool.len().await, 1);
assert_eq!(pool.get(&1).await, Some(11));

assert!(!pool.is_empty());
assert_eq!(pool.len(), 1);

assert!(pool.contains_key(&1));
assert_eq!(pool.get(&1), Some(11));

change_stream_tx.send(Change::Insert(2, 21)).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(pool.len().await, 2);
assert_eq!(pool.get(&2).await, Some(21));

assert_eq!(pool.find(|k| *k == 1).await, Some(1));
assert_eq!(pool.len(), 2);
assert_eq!(pool.get(&2), Some(21));

assert_eq!(pool.find(|k, _| *k == 1), Some((1, 11)));

let mut all_nodes = pool.all().await;
all_nodes.sort();
assert_eq!(all_nodes, vec![(1, 11), (2, 21)]);
let mut pairs = pool.pairs();
pairs.sort();

assert_eq!(pairs, vec![(1, 11), (2, 21)]);

change_stream_tx.send(Change::Insert(1, 12)).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(pool.get(&1).await, Some(12));

assert_eq!(pool.get(&1), Some(12));

change_stream_tx.send(Change::Remove(1)).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
assert_eq!(pool.len().await, 1);

assert_eq!(pool.len(), 1);

change_stream_tx.send(Change::Remove(2)).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
assert!(pool.is_empty().await);
assert_eq!(pool.len().await, 0);

assert!(pool.is_empty());
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl IndexConfig {
default_search_fields: vec![
"body".to_string(),
r#"attributes.server"#.to_string(),
r#"attributes.server\.status"#.to_string(),
r"attributes.server\.status".to_string(),
],
};
IndexConfig {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ documentation = "https://quickwit.io/docs/"
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
fnv = { workspace = true }
dyn-clone = { workspace = true }
http = { workspace = true }
hyper = { workspace = true }
Expand Down
Loading

0 comments on commit e05b708

Please sign in to comment.