Skip to content

Commit

Permalink
Perform a single mutation per index in file-backed metastore (#3921)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Oct 9, 2023
1 parent 99fbb70 commit 9e798b3
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ impl FileBackedIndex {
}

/// Index UID accessor.
pub fn index_uid(&self) -> IndexUid {
self.metadata.index_uid.clone()
pub fn index_uid(&self) -> &IndexUid {
&self.metadata.index_uid
}

/// Index metadata accessor.
Expand Down Expand Up @@ -445,12 +445,13 @@ impl FileBackedIndex {

/// Adds a source.
pub(crate) fn add_source(&mut self, source_config: SourceConfig) -> MetastoreResult<()> {
let index_uid = self.index_uid().clone();
let source_id = source_config.source_id.clone();

self.metadata.add_source(source_config)?;
self.per_source_shards.insert(
source_id.clone(),
Shards::empty(self.index_uid(), source_id),
);

self.per_source_shards
.insert(source_id.clone(), Shards::empty(index_uid, source_id));
Ok(())
}

Expand Down Expand Up @@ -543,41 +544,105 @@ impl FileBackedIndex {

pub(crate) fn open_shards(
&mut self,
subrequest: OpenShardsSubrequest,
) -> MetastoreResult<MutationOccurred<OpenShardsSubresponse>> {
self.get_shards_for_source_mut(&subrequest.source_id)?
.open_shards(subrequest)
subrequests: Vec<OpenShardsSubrequest>,
) -> MetastoreResult<MutationOccurred<Vec<OpenShardsSubresponse>>> {
let mut mutation_occurred = false;
let mut subresponses = Vec::with_capacity(subrequests.len());

for subrequest in subrequests {
let subresponse = match self
.get_shards_for_source_mut(&subrequest.source_id)?
.open_shards(subrequest)?
{
MutationOccurred::Yes(subresponse) => {
mutation_occurred = true;
subresponse
}
MutationOccurred::No(subresponse) => subresponse,
};
subresponses.push(subresponse);
}
if mutation_occurred {
Ok(MutationOccurred::Yes(subresponses))
} else {
Ok(MutationOccurred::No(subresponses))
}
}

pub(crate) fn acquire_shards(
&mut self,
subrequest: AcquireShardsSubrequest,
) -> MetastoreResult<MutationOccurred<AcquireShardsSubresponse>> {
self.get_shards_for_source_mut(&subrequest.source_id)?
.acquire_shards(subrequest)
subrequests: Vec<AcquireShardsSubrequest>,
) -> MetastoreResult<MutationOccurred<Vec<AcquireShardsSubresponse>>> {
let mut mutation_occurred = false;
let mut subresponses = Vec::with_capacity(subrequests.len());

for subrequest in subrequests {
let subresponse = match self
.get_shards_for_source_mut(&subrequest.source_id)?
.acquire_shards(subrequest)?
{
MutationOccurred::Yes(subresponse) => {
mutation_occurred = true;
subresponse
}
MutationOccurred::No(subresponse) => subresponse,
};
subresponses.push(subresponse);
}
if mutation_occurred {
Ok(MutationOccurred::Yes(subresponses))
} else {
Ok(MutationOccurred::No(subresponses))
}
}

pub(crate) fn close_shards(
&mut self,
subrequest: CloseShardsSubrequest,
) -> MetastoreResult<MutationOccurred<Either<CloseShardsSuccess, CloseShardsFailure>>> {
self.get_shards_for_source_mut(&subrequest.source_id)?
.close_shards(subrequest)
subrequests: Vec<CloseShardsSubrequest>,
) -> MetastoreResult<MutationOccurred<Vec<Either<CloseShardsSuccess, CloseShardsFailure>>>>
{
let mut mutation_occurred = false;
let mut subresponses = Vec::with_capacity(subrequests.len());

for subrequest in subrequests {
let subresponse = match self
.get_shards_for_source_mut(&subrequest.source_id)?
.close_shards(subrequest)?
{
MutationOccurred::Yes(subresponse) => {
mutation_occurred = true;
subresponse
}
MutationOccurred::No(subresponse) => subresponse,
};
subresponses.push(subresponse);
}
if mutation_occurred {
Ok(MutationOccurred::Yes(subresponses))
} else {
Ok(MutationOccurred::No(subresponses))
}
}

pub(crate) fn delete_shards(
&mut self,
subrequest: DeleteShardsSubrequest,
subrequests: Vec<DeleteShardsSubrequest>,
force: bool,
) -> MetastoreResult<MutationOccurred<()>> {
self.get_shards_for_source_mut(&subrequest.source_id)?
.delete_shards(subrequest, force)
let mut mutation_occurred = MutationOccurred::No(());

for subrequest in subrequests {
mutation_occurred = self
.get_shards_for_source_mut(&subrequest.source_id)?
.delete_shards(subrequest, force)?
}
Ok(mutation_occurred)
}

pub(crate) fn list_shards(
&mut self,
&self,
subrequest: ListShardsSubrequest,
) -> MetastoreResult<MutationOccurred<ListShardsSubresponse>> {
) -> MetastoreResult<ListShardsSubresponse> {
self.get_shards_for_source(&subrequest.source_id)?
.list_shards(subrequest)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,25 +287,21 @@ impl Shards {
return Err(MetastoreError::InvalidArgument { message });
}
}
if mutation_occurred {
Ok(MutationOccurred::Yes(()))
} else {
Ok(MutationOccurred::No(()))
}
Ok(MutationOccurred::from(mutation_occurred))
}

pub(super) fn list_shards(
&self,
subrequest: ListShardsSubrequest,
) -> MetastoreResult<MutationOccurred<ListShardsSubresponse>> {
) -> MetastoreResult<ListShardsSubresponse> {
let shards = self.list_shards_inner(subrequest.shard_state);
let response = ListShardsSubresponse {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shards,
next_shard_id: self.next_shard_id,
};
Ok(MutationOccurred::No(response))
Ok(response)
}

pub(super) fn try_apply_delta(
Expand Down Expand Up @@ -465,9 +461,7 @@ mod tests {
source_id: source_id.clone(),
shard_state: None,
};
let MutationOccurred::No(subresponse) = shards.list_shards(subrequest).unwrap() else {
panic!("Expected `MutationOccured::No`");
};
let subresponse = shards.list_shards(subrequest).unwrap();
assert_eq!(subresponse.index_uid, index_uid.as_str());
assert_eq!(subresponse.source_id, source_id);
assert_eq!(subresponse.shards.len(), 0);
Expand All @@ -494,9 +488,7 @@ mod tests {
source_id: source_id.clone(),
shard_state: None,
};
let MutationOccurred::No(mut subresponse) = shards.list_shards(subrequest).unwrap() else {
panic!("Expected `MutationOccured::No`");
};
let mut subresponse = shards.list_shards(subrequest).unwrap();
subresponse
.shards
.sort_unstable_by_key(|shard| shard.shard_id);
Expand All @@ -509,9 +501,7 @@ mod tests {
source_id,
shard_state: Some(ShardState::Closed as i32),
};
let MutationOccurred::No(subresponse) = shards.list_shards(subrequest).unwrap() else {
panic!("Expected `MutationOccured::No`");
};
let subresponse = shards.list_shards(subrequest).unwrap();
assert_eq!(subresponse.shards.len(), 1);
assert_eq!(subresponse.shards[0].shard_id, 1);
}
Expand Down
110 changes: 66 additions & 44 deletions quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ use itertools::{Either, Itertools};
use quickwit_common::uri::Uri;
use quickwit_config::{validate_index_id_pattern, IndexConfig, SourceConfig};
use quickwit_proto::metastore::{
AcquireShardsRequest, AcquireShardsResponse, CloseShardsRequest, CloseShardsResponse,
DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, DeleteTask, EntityKind,
ListShardsRequest, ListShardsResponse, MetastoreError, MetastoreResult, OpenShardsRequest,
OpenShardsResponse,
AcquireShardsRequest, AcquireShardsResponse, AcquireShardsSubrequest, CloseShardsRequest,
CloseShardsResponse, CloseShardsSubrequest, DeleteQuery, DeleteShardsRequest,
DeleteShardsResponse, DeleteShardsSubrequest, DeleteTask, EntityKind, ListShardsRequest,
ListShardsResponse, MetastoreError, MetastoreResult, OpenShardsRequest, OpenShardsResponse,
OpenShardsSubrequest,
};
use quickwit_proto::{IndexUid, PublishToken};
use quickwit_storage::Storage;
Expand Down Expand Up @@ -178,7 +179,7 @@ impl FileBackedMetastore {
) -> MetastoreResult<T> {
let index_id = index_uid.index_id();
let mut locked_index = self.get_locked_index(index_id).await?;
if locked_index.index_uid() != index_uid {
if *locked_index.index_uid() != index_uid {
return Err(MetastoreError::NotFound(EntityKind::Index {
index_id: index_id.to_string(),
}));
Expand Down Expand Up @@ -225,7 +226,7 @@ impl FileBackedMetastore {
where F: FnOnce(&FileBackedIndex) -> MetastoreResult<T> {
let index_id = index_uid.index_id();
let locked_index = self.get_locked_index(index_id).await?;
if locked_index.index_uid() == index_uid {
if *locked_index.index_uid() == index_uid {
view(&locked_index)
} else {
Err(MetastoreError::NotFound(EntityKind::Index {
Expand Down Expand Up @@ -654,62 +655,74 @@ impl Metastore for FileBackedMetastore {
// Shard API

async fn open_shards(&self, request: OpenShardsRequest) -> MetastoreResult<OpenShardsResponse> {
let mut subresponses = Vec::with_capacity(request.subrequests.len());
let mut response = OpenShardsResponse {
subresponses: Vec::with_capacity(request.subrequests.len()),
};
// We must group the subrequests by `index_uid` to mutate each index only once, since each
// mutation triggers an IO.
let grouped_subrequests: HashMap<IndexUid, Vec<OpenShardsSubrequest>> = request
.subrequests
.into_iter()
.into_group_map_by(|subrequest| IndexUid::new(subrequest.index_uid.clone()));

for subrequest in request.subrequests {
let index_uid: IndexUid = subrequest.index_uid.clone().into();
let subresponse = self
.mutate(index_uid, |index| index.open_shards(subrequest))
for (index_uid, subrequests) in grouped_subrequests {
let subresponses = self
.mutate(index_uid, |index| index.open_shards(subrequests))
.await?;
subresponses.push(subresponse);
response.subresponses.extend(subresponses);
}
let response = OpenShardsResponse { subresponses };
Ok(response)
}

async fn acquire_shards(
&self,
request: AcquireShardsRequest,
) -> MetastoreResult<AcquireShardsResponse> {
let mut subresponses = Vec::with_capacity(request.subrequests.len());
let mut response = AcquireShardsResponse {
subresponses: Vec::with_capacity(request.subrequests.len()),
};
// We must group the subrequests by `index_uid` to mutate each index only once, since each
// mutation triggers an IO.
let grouped_subrequests: HashMap<IndexUid, Vec<AcquireShardsSubrequest>> = request
.subrequests
.into_iter()
.into_group_map_by(|subrequest| IndexUid::new(subrequest.index_uid.clone()));

for subrequest in request.subrequests {
let index_uid: IndexUid = subrequest.index_uid.clone().into();
let subresponse = self
.mutate(index_uid, |index| index.acquire_shards(subrequest))
for (index_uid, subrequests) in grouped_subrequests {
let subresponses = self
.mutate(index_uid, |index| index.acquire_shards(subrequests))
.await?;
subresponses.push(subresponse);
response.subresponses.extend(subresponses);
}
let response = AcquireShardsResponse { subresponses };
Ok(response)
}

async fn close_shards(
&self,
request: CloseShardsRequest,
) -> MetastoreResult<CloseShardsResponse> {
let mut successes = Vec::with_capacity(request.subrequests.len());
let mut failures = Vec::new();
let mut response = CloseShardsResponse {
successes: Vec::with_capacity(request.subrequests.len()),
failures: Vec::new(),
};
// We must group the subrequests by `index_uid` to mutate each index only once, since each
// mutation triggers an IO.
let grouped_subrequests: HashMap<IndexUid, Vec<CloseShardsSubrequest>> = request
.subrequests
.into_iter()
.into_group_map_by(|subrequest| IndexUid::new(subrequest.index_uid.clone()));

for subrequest in request.subrequests {
let index_uid: IndexUid = subrequest.index_uid.clone().into();
match self
.mutate(index_uid, |index| index.close_shards(subrequest))
.await
{
Ok(Either::Left(success)) => {
successes.push(success);
}
Ok(Either::Right(failure)) => {
failures.push(failure);
for (index_uid, subrequests) in grouped_subrequests {
let subresponses = self
.mutate(index_uid, |index| index.close_shards(subrequests))
.await?;
for subresponse in subresponses {
match subresponse {
Either::Left(success) => response.successes.push(success),
Either::Right(failure) => response.failures.push(failure),
}
Err(error) => return Err(error),
}
}
let response = CloseShardsResponse {
successes,
failures,
};
Ok(response)
}

Expand All @@ -719,11 +732,17 @@ impl Metastore for FileBackedMetastore {
) -> MetastoreResult<DeleteShardsResponse> {
let mut subresponses = Vec::with_capacity(request.subrequests.len());

for subrequest in request.subrequests {
let index_uid: IndexUid = subrequest.index_uid.clone().into();
// We must group the subrequests by `index_uid` to mutate each index only once, since each
// mutation triggers an IO.
let grouped_subrequests: HashMap<IndexUid, Vec<DeleteShardsSubrequest>> = request
.subrequests
.into_iter()
.into_group_map_by(|subrequest| IndexUid::new(subrequest.index_uid.clone()));

for (index_uid, subrequests) in grouped_subrequests {
let subresponse = self
.mutate(index_uid, |index| {
index.delete_shards(subrequest, request.force)
index.delete_shards(subrequests, request.force)
})
.await?;
subresponses.push(subresponse);
Expand All @@ -738,7 +757,7 @@ impl Metastore for FileBackedMetastore {
for subrequest in request.subrequests {
let index_uid: IndexUid = subrequest.index_uid.clone().into();
let subresponse = self
.mutate(index_uid, |index| index.list_shards(subrequest))
.read(index_uid, |index| index.list_shards(subrequest))
.await?;
subresponses.push(subresponse);
}
Expand Down Expand Up @@ -1569,9 +1588,12 @@ mod tests {
assert_eq!(indexes_metadatas.len(), 2);

// Let's delete indexes.
metastore.delete_index(index_uid_alive).await.unwrap();
metastore
.delete_index(index_uid_unregistered)
.delete_index(index_uid_alive.clone())
.await
.unwrap();
metastore
.delete_index(index_uid_unregistered.clone())
.await
.unwrap();
let no_more_indexes = metastore
Expand Down

0 comments on commit 9e798b3

Please sign in to comment.