Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

576, 585 lookup existence in aliens, separate remote & local lookup #634

Merged
merged 36 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d869f82
Support exist in aliens
lostl1ght Oct 18, 2022
0d03ebf
Add exist operations
lostl1ght Oct 18, 2022
a6a85f9
Add logic in quorum
lostl1ght Oct 18, 2022
be09090
Update changelog
lostl1ght Oct 18, 2022
f882fab
Merge branch 'master' into 576_exist_alien
lostl1ght Oct 18, 2022
0664fc9
Check len of key vecs
lostl1ght Oct 19, 2022
d6dfd88
Merge branch 'master' into 576_exist_alien
lostl1ght Nov 21, 2022
4ade39b
Fix pearl version
lostl1ght Nov 21, 2022
2bd9373
Decompose exist
lostl1ght Nov 21, 2022
2695039
Reduce function calls
lostl1ght Nov 21, 2022
e602f95
Fix pearl version
lostl1ght Dec 26, 2022
acdce8a
Merge branch 'master' into 576_exist_alien
lostl1ght Dec 26, 2022
b396861
fix loglevel
Apr 19, 2023
50e5216
update disk_controller
Apr 19, 2023
d08b9d8
add index map
Apr 20, 2023
98b0c99
add group
Apr 21, 2023
f1ffbe0
add exist func
Apr 24, 2023
a81cd3b
Merge branch 'master' into 576_exist_alien
Apr 24, 2023
a4d9264
fix typo
Apr 24, 2023
1f71358
fix build
Apr 25, 2023
90c5cb4
fixes according to tests
Apr 25, 2023
93e748b
fix changelog
Apr 25, 2023
7540bf6
style fixes
Apr 25, 2023
96c53bf
style fixes
Apr 25, 2023
d79e32b
style fixes
Apr 25, 2023
f7dac68
style fixes
Apr 25, 2023
19ae801
update changelog
Apr 28, 2023
3855888
fix discussions
Apr 28, 2023
2a5a42e
fix discussions
Apr 28, 2023
181700b
fix discussions
Apr 28, 2023
8f96555
move checks to assertions
May 5, 2023
33a753d
fix log
May 5, 2023
72cb82a
fix discussions
May 5, 2023
25c9841
fix discussions
May 8, 2023
e51327c
use expect instead of unwrap
May 8, 2023
beb7810
Merge branch 'master' into 576_exist_alien
ikopylov May 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Bob versions changelog
#### Added
- Add clusterwide delete operation (#364)
- TLS support, TLS for grpc or rest can be enabled via cluster & node config (#303)
- Lookup existence in aliens, separate local and remote lookup (#576, #585)

#### Changed
- Update rust edition to 2021 (#484)
Expand Down
9 changes: 7 additions & 2 deletions bob-backend/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::prelude::*;
use std::{
collections::HashMap,
fmt::{Display, Formatter, Result as FMTResult}, hash::Hash,
fmt::{Display, Formatter, Result as FMTResult},
hash::Hash,
};

use crate::{
Expand Down Expand Up @@ -383,7 +384,11 @@ impl Backend {
let mut exist = vec![false; keys.len()];
let keys_by_id_and_path = self.group_keys_by_operations(keys, options);
for (operation, (keys, indexes)) in keys_by_id_and_path {
let result = self.inner.exist(operation, &keys).await;
let result = if operation.is_data_alien() {
self.inner.exist_alien(operation, &keys).await
} else {
self.inner.exist(operation, &keys).await
};
if let Ok(result) = result {
for (&res, ind) in result.iter().zip(indexes) {
exist[ind] |= res;
Expand Down
60 changes: 39 additions & 21 deletions bob/src/cluster/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,6 @@ async fn call_at_least(
(handles, errors)
}

pub(crate) fn group_keys_by_nodes(
mapper: &Virtual,
keys: &[BobKey],
) -> HashMap<Vec<Node>, (Vec<BobKey>, Vec<usize>)> {
let mut keys_by_nodes: HashMap<_, (Vec<_>, Vec<_>)> = HashMap::new();
for (ind, &key) in keys.iter().enumerate() {
keys_by_nodes
.entry(mapper.get_target_nodes_for_key(key).to_vec())
.and_modify(|(keys, indexes)| {
keys.push(key);
indexes.push(ind);
})
.or_insert_with(|| (vec![key], vec![ind]));
}
keys_by_nodes
}

pub(crate) async fn lookup_local_alien(
backend: &Backend,
key: BobKey,
Expand Down Expand Up @@ -278,11 +261,46 @@ pub(crate) async fn put_local_node(
backend.put_local(key, data, op).await
}

pub(crate) async fn delete_at_local_node(
backend: &Backend,
key: BobKey,
) -> Result<(), Error> {
pub(crate) async fn delete_at_local_node(backend: &Backend, key: BobKey) -> Result<(), Error> {
debug!("local node has vdisk replica, put local");
backend.delete(key).await?;
Ok(())
}

pub(crate) async fn exist_on_local_node(
backend: &Backend,
keys: &[BobKey],
) -> Result<Vec<bool>, Error> {
Ok(backend
.exist(keys, &BobOptions::new_get(Some(GetOptions::new_local())))
.await?)
}

pub(crate) async fn exist_on_local_alien(
backend: &Backend,
keys: &[BobKey],
) -> Result<Vec<bool>, Error> {
Ok(backend
.exist(keys, &BobOptions::new_get(Some(GetOptions::new_alien())))
.await?)
}

pub(crate) async fn exist_on_remote_nodes(
nodes: &[Node],
keys: &[BobKey],
) -> Vec<Result<NodeOutput<Vec<bool>>, NodeOutput<Error>>> {
LinkManager::call_nodes(nodes.iter(), |client| {
Box::pin(client.exist(keys.to_vec(), GetOptions::new_local()))
})
.await
}

pub(crate) async fn exist_on_remote_aliens(
nodes: &[Node],
keys: &[BobKey],
) -> Vec<Result<NodeOutput<Vec<bool>>, NodeOutput<Error>>> {
LinkManager::call_nodes(nodes.iter(), |client| {
Box::pin(client.exist(keys.to_vec(), GetOptions::new_alien()))
})
.await
}
133 changes: 121 additions & 12 deletions bob/src/cluster/quorum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use crate::prelude::*;

use super::{
operations::{
delete_at_nodes, delete_at_local_node, group_keys_by_nodes, lookup_local_alien, lookup_local_node,
delete_at_local_node, delete_at_nodes, exist_on_local_alien, exist_on_local_node,
exist_on_remote_aliens, exist_on_remote_nodes, lookup_local_alien, lookup_local_node,
lookup_remote_aliens, lookup_remote_nodes, put_at_least, put_local_all, put_local_node,
put_sup_nodes, Tasks,
},
Cluster,
};
use crate::link_manager::LinkManager;

#[derive(Clone)]
pub(crate) struct Quorum {
Expand Down Expand Up @@ -272,21 +272,130 @@ impl Cluster for Quorum {
}

async fn exist(&self, keys: &[BobKey]) -> Result<Vec<bool>, Error> {
ikopylov marked this conversation as resolved.
Show resolved Hide resolved
let keys_by_nodes = group_keys_by_nodes(&self.mapper, keys);
debug!(
"EXIST Nodes for fan out: {:?}",
&keys_by_nodes.keys().flatten().collect::<Vec<_>>()
);
let len = keys.len();
let mut exist = vec![false; len];
for (nodes, (keys, indexes)) in keys_by_nodes {
let cluster_results = LinkManager::exist_on_nodes(&nodes, &keys).await;
for result in cluster_results.into_iter().flatten() {
for (&r, &ind) in result.inner().iter().zip(&indexes) {
exist[ind] |= r;

// filter local keys
ikopylov marked this conversation as resolved.
Show resolved Hide resolved
let mut indices = Vec::new();
let local_keys = keys
.iter()
.enumerate()
.filter_map(|(idx, &key)| {
if self
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
.mapper
.get_target_nodes_for_key(key)
.iter()
.find(|node| node.name() == self.mapper.local_node_name())
.is_some()
{
indices.push(idx);
Some(key)
} else {
None
}
})
.collect::<Vec<BobKey>>();
// end
debug!("local keys {:?}", local_keys);
debug!("local indices {:?}", indices);
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
if local_keys.len() > 0 {
let result = exist_on_local_node(&self.backend, &local_keys).await?;
for (idx, r) in indices.into_iter().zip(result.into_iter()) {
exist[idx] |= r;
}
debug!("exist after local node {:?}", exist);
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
}

// filter keys that were not found
let mut local_alien = Vec::new();
for (idx, &r) in exist.iter().enumerate() {
if r == false {
local_alien.push(keys[idx]);
}
}
// end
debug!("local alien keys {:?}", local_alien);
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
if local_alien.len() > 0 {
let result = exist_on_local_alien(&self.backend, &local_alien).await?;
ikopylov marked this conversation as resolved.
Show resolved Hide resolved
let mut i = 0;
for r in exist.iter_mut() {
if *r == false {
*r |= result[i];
i += 1;
}
}
debug!("exist after local alien {:?}", exist);
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
}

// filter remote not found keys by nodes
let mut remote_keys: HashMap<_, (Vec<_>, Vec<_>)> = HashMap::new();
for (idx, &r) in exist.iter().enumerate() {
if r == false {
remote_keys
.entry(self.mapper.get_target_nodes_for_key(keys[idx]).to_vec())
.and_modify(|(closure_keys, indices)| {
closure_keys.push(keys[idx]);
indices.push(idx);
})
.or_insert_with(|| (vec![keys[idx]], vec![idx]));
}
}
// end

debug!("remote keys by nodes {:?}", remote_keys);
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
if remote_keys.len() > 0 {
for (nodes, (keys, indices)) in remote_keys {
let result = exist_on_remote_nodes(&nodes, &keys).await;
for res in result.into_iter().flatten() {
for (&r, &ind) in res.inner().iter().zip(&indices) {
exist[ind] |= r;
}
}
}
debug!("exist after remote nodes {:?}", exist);
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
}

// filter remote not found keys
let mut remote_alien = Vec::new();
for (idx, &r) in exist.iter().enumerate() {
if r == false {
remote_alien.push(keys[idx]);
}
}
// end
if remote_alien.len() > 0 {
// filter remote nodes
let remote_nodes = self
.mapper
.nodes()
.into_iter()
.filter_map(|(_, node)| {
if node.name() != self.mapper.local_node_name() {
Some(node.clone())
} else {
None
}
})
.collect::<Vec<Node>>();
// end
debug!("remote nodes {:?}", remote_nodes);
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved

let result = exist_on_remote_aliens(&remote_nodes, &remote_alien).await;
debug!("alien result {:?}", result);
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
for res in result.into_iter() {
if let Ok(inner) = res {
debug!("inner {:?}", inner);
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
let mut i = 0;
for r in exist.iter_mut() {
if *r == false {
*r |= inner.inner()[i];
i += 1;
}
}
}
}
}

Ok(exist)
}

Expand Down