Skip to content

Commit

Permalink
remove allocations for alien processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Druzhitskiy committed Oct 25, 2023
1 parent f566e05 commit a5e1d67
Showing 1 changed file with 56 additions and 51 deletions.
107 changes: 56 additions & 51 deletions bob-backend/src/pearl/disk_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,34 +290,37 @@ impl DiskController {
Ok(groups)
}

async fn find_groups(&self, operation: &Operation) -> BackendResult<Vec<Group>> {
Ok(Self::find_in_groups(self.groups.read().await.iter(), operation))
async fn find_all_groups(&self, operation: &Operation) -> Vec<Group> {
self.groups.read().await
.iter()
.filter(|group| group.can_process_operation(operation))
.cloned()
.collect()
}

fn find_in_groups<'pearl, 'op>(
pearls: impl Iterator<Item = &'pearl Group>,
fn find_single_group<'pearl, 'op>(
mut pearls: impl Iterator<Item = &'pearl Group>,
operation: &'op Operation,
) -> Vec<Group> {
) -> Option<Group> {
pearls
.filter(|group| group.can_process_operation(operation))
.find(|group| group.can_process_operation(operation))
.cloned()
.collect()
}

async fn get_or_create_first_pearl(&self, operation: &Operation) -> BackendResult<Group> {
async fn get_or_create_pearl(&self, operation: &Operation) -> BackendResult<Group> {
trace!("try get alien pearl, operation {:?}", operation);
// block for read lock: it should be dropped before write lock is acquired (otherwise - deadlock)
{
let read_lock_groups = self.groups.read().await;
let pearl = Self::find_in_groups(read_lock_groups.iter(), operation);
if let Some(g) = pearl.first().cloned() {
let pearl = Self::find_single_group(read_lock_groups.iter(), operation);
if let Some(g) = pearl {
return Ok(g);
}
}

let mut write_lock_groups = self.groups.write().await;
let pearl = Self::find_in_groups(write_lock_groups.iter(), operation);
if let Some(g) = pearl.first().cloned() {
let pearl = Self::find_single_group(write_lock_groups.iter(), operation);
if let Some(g) = pearl {
return Ok(g);
}

Expand Down Expand Up @@ -383,7 +386,7 @@ impl DiskController {
data: &BobData,
) -> Result<(), Error> {
if *self.state.read().await == GroupsState::Ready {
let vdisk_group = self.get_or_create_first_pearl(&op).await;
let vdisk_group = self.get_or_create_pearl(&op).await;
match vdisk_group {
Ok(group) => match group.put(key, data, StartTimestampConfig::new(false)).await {
Err(e) => Err(self.process_error(e).await),
Expand Down Expand Up @@ -464,9 +467,18 @@ impl DiskController {

pub(crate) async fn get_alien(&self, op: Operation, key: BobKey) -> Result<BobData, Error> {
if *self.state.read().await == GroupsState::Ready {
let mut results = self.get_alien_results_unordered(&op, |g| async move { g.get(key).await }).await.unwrap_or(vec![]);
results.sort_by_key(|d| d.meta().timestamp());
if let Some(r) = results.into_iter().last() {
let result = self.get_alien_results(&op, |g| async move { g.get(key).await },
None,
|r, d| {
let d_ts = d.meta().timestamp();
if d_ts > r.as_ref().map(|d: &BobData| d.meta().timestamp()).unwrap_or(0) {
Some(d)
} else {
r
}
}).await;

if let Some(Some(r)) = result {
Ok(r)
} else {
Err(Error::key_not_found(key))
Expand Down Expand Up @@ -505,13 +517,13 @@ impl DiskController {
keys: &[BobKey],
) -> Result<Vec<bool>, Error> {
if *self.state.read().await == GroupsState::Ready {
let results = self.get_alien_results_unordered(&operation, |g| async move { g.exist(keys).await }).await.unwrap_or(vec![]);
Ok(results.into_iter().fold(vec![false; keys.len()], |mut s, n| {
for i in 0..n.len() {
s[i] |= n[i];
}
s
}))
Ok(self.get_alien_results(&operation, |g| async move { g.exist(keys).await },
vec![false; keys.len()], |mut s, n| {
for i in 0..n.len() {
s[i] |= n[i];
}
s
}).await.unwrap_or(vec![false; keys.len()]))
} else {
Err(Error::dc_is_not_available())
}
Expand Down Expand Up @@ -553,12 +565,14 @@ impl DiskController {
force_delete: bool,
) -> Result<u64, Error> {
if *self.state.read().await == GroupsState::Ready {
if let Some(results) = self.get_alien_results_unordered(&op, |g| async move { g.delete(key, meta, StartTimestampConfig::new(false), force_delete).await }).await {
Ok(results.into_iter().fold(0, |s, n| s + n))
if let Some(result) = self.get_alien_results(&op,
|g| async move { g.delete(key, meta, StartTimestampConfig::new(false), force_delete).await },
0, |s, n| s + n).await {
Ok(result)
} else {
if force_delete {
// If delete is forced we need to create at least one group
match self.get_or_create_first_pearl(&op).await {
match self.get_or_create_pearl(&op).await {
Ok(group) => group.delete(key, meta, StartTimestampConfig::new(false), force_delete).await,
Err(err) => {
error!(
Expand Down Expand Up @@ -699,36 +713,27 @@ impl DiskController {
.await
}

async fn get_alien_results_unordered<R, RF, F>(&self, op: &Operation, f: F) -> Option<Vec<R>>
async fn get_alien_results<R, RF, F, Res>(&self, op: &Operation, f: F,
init: Res, fold: impl Fn(Res, R) -> Res) -> Option<Res>
where
RF: Future<Output = Result<R, Error>>,
F: Fn(Group) -> RF
{
match self.find_groups(op).await {
Ok(groups) => {
if groups.is_empty() {
return None;
}
let mut result = vec![];
let mut futures = FuturesUnordered::new();
for g in groups {
futures.push(f(g));
}
while let Some(r) = futures.next().await {
match r {
Ok(r) => result.push(r),
Err(e) => {
trace!("error getting alien results for op {:?}: {:?}", op, e);
self.process_error(e).await;
},
}
}
Some(result)
}
Err(e) => {
trace!("error finding groups for op {:?}: {:?}", op, e);
None
let groups = self.find_all_groups(op).await;
if groups.is_empty() {
return None;
}
let mut result = init;
for g in groups {
let r = f(g).await;
match r {
Ok(r) => result = fold(result, r),
Err(e) => {
trace!("error getting alien results for op {:?}: {:?}", op, e);
self.process_error(e).await;
},
}
}
Some(result)
}
}

0 comments on commit a5e1d67

Please sign in to comment.