Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

576, 585 lookup existence in aliens, separate remote & local lookup #634

Merged
merged 36 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d869f82
Support exist in aliens
lostl1ght Oct 18, 2022
0d03ebf
Add exist operations
lostl1ght Oct 18, 2022
a6a85f9
Add logic in quorum
lostl1ght Oct 18, 2022
be09090
Update changelog
lostl1ght Oct 18, 2022
f882fab
Merge branch 'master' into 576_exist_alien
lostl1ght Oct 18, 2022
0664fc9
Check len of key vecs
lostl1ght Oct 19, 2022
d6dfd88
Merge branch 'master' into 576_exist_alien
lostl1ght Nov 21, 2022
4ade39b
Fix pearl version
lostl1ght Nov 21, 2022
2bd9373
Decompose exist
lostl1ght Nov 21, 2022
2695039
Reduce function calls
lostl1ght Nov 21, 2022
e602f95
Fix pearl version
lostl1ght Dec 26, 2022
acdce8a
Merge branch 'master' into 576_exist_alien
lostl1ght Dec 26, 2022
b396861
fix loglevel
Apr 19, 2023
50e5216
update disk_controller
Apr 19, 2023
d08b9d8
add index map
Apr 20, 2023
98b0c99
add group
Apr 21, 2023
f1ffbe0
add exist func
Apr 24, 2023
a81cd3b
Merge branch 'master' into 576_exist_alien
Apr 24, 2023
a4d9264
fix typo
Apr 24, 2023
1f71358
fix build
Apr 25, 2023
90c5cb4
fixes according to tests
Apr 25, 2023
93e748b
fix changelog
Apr 25, 2023
7540bf6
style fixes
Apr 25, 2023
96c53bf
style fixes
Apr 25, 2023
d79e32b
style fixes
Apr 25, 2023
f7dac68
style fixes
Apr 25, 2023
19ae801
update changelog
Apr 28, 2023
3855888
fix discussions
Apr 28, 2023
2a5a42e
fix discussions
Apr 28, 2023
181700b
fix discussions
Apr 28, 2023
8f96555
move checks to assertions
May 5, 2023
33a753d
fix log
May 5, 2023
72cb82a
fix discussions
May 5, 2023
25c9841
fix discussions
May 8, 2023
e51327c
use expect instead of unwrap
May 8, 2023
beb7810
Merge branch 'master' into 576_exist_alien
ikopylov May 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Bob versions changelog
- Exist test on doubled range of keys for integration tests (#764)
- Used swap and bob virtual memory metrics added to hardware metrics (#771)
- Added validate_data_checksum_in_index_regen field to pearl config (#607)
- Lookup existence in aliens (#576)
- Separate local and remote lookup (#585)
- Exist test for alien integration tests (#726)

#### Changed
Expand Down
6 changes: 5 additions & 1 deletion bob-backend/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ impl Backend {
let mut exist = vec![false; keys.len()];
let keys_by_id_and_path = self.group_keys_by_operations(keys, options);
for (operation, (keys, indexes)) in keys_by_id_and_path {
let result = self.inner.exist(operation, &keys).await;
let result = if operation.is_data_alien() {
self.inner.exist_alien(operation, &keys).await
} else {
self.inner.exist(operation, &keys).await
};
if let Ok(result) = result {
for (&res, ind) in result.iter().zip(indexes) {
exist[ind] |= res;
Expand Down
4 changes: 3 additions & 1 deletion bob-backend/src/pearl/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ impl BackendStorage for Pearl {

async fn exist_alien(&self, operation: Operation, keys: &[BobKey]) -> BackendResult<Vec<bool>> {
if self.alien_disk_controller.can_process_operation(&operation) {
self.alien_disk_controller.exist(operation, keys).await
self.alien_disk_controller
.exist_alien(operation, keys)
.await
} else {
Err(Error::dc_is_not_available())
}
Expand Down
23 changes: 22 additions & 1 deletion bob-backend/src/pearl/disk_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,28 @@ impl DiskController {
if let Some(group) = group_option {
group.exist(keys).await
} else {
Err(Error::internal())
Err(Error::vdisk_not_found(operation.vdisk_id()))
}
} else {
Err(Error::dc_is_not_available())
}
}

pub(crate) async fn exist_alien(
&self,
operation: Operation,
keys: &[BobKey],
) -> Result<Vec<bool>, Error> {
if *self.state.read().await == GroupsState::Ready {
let vdisk_group = self.find_group(&operation).await;
if let Ok(group) = vdisk_group {
group.exist(keys).await
} else {
trace!(
"EXIST[alien] No alien group has been created for vdisk #{}",
operation.vdisk_id()
);
Ok(vec![false; keys.len()])
}
} else {
Err(Error::dc_is_not_available())
Expand Down
61 changes: 41 additions & 20 deletions bob/src/cluster/operations.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::link_manager::LinkManager;
use crate::prelude::*;
use super::support_types::{RemoteDeleteError};
use super::support_types::RemoteDeleteError;

pub(crate) type Tasks<Err> = FuturesUnordered<JoinHandle<Result<NodeOutput<()>, NodeOutput<Err>>>>;

Expand Down Expand Up @@ -85,25 +85,6 @@ async fn call_all<TOp, TErr: Debug>(
(handles_len, errors)
}

// ======================= EXIST =================

pub(crate) fn group_keys_by_nodes(
mapper: &Virtual,
keys: &[BobKey],
) -> HashMap<Vec<Node>, (Vec<BobKey>, Vec<usize>)> {
let mut keys_by_nodes: HashMap<_, (Vec<_>, Vec<_>)> = HashMap::new();
for (ind, &key) in keys.iter().enumerate() {
keys_by_nodes
.entry(mapper.get_target_nodes_for_key(key).to_vec())
.and_modify(|(keys, indexes)| {
keys.push(key);
indexes.push(ind);
})
.or_insert_with(|| (vec![key], vec![ind]));
}
keys_by_nodes
}

// ======================== GET ==========================

pub(crate) async fn get_any(
Expand Down Expand Up @@ -295,7 +276,47 @@ pub(crate) async fn put_local_node(
backend.put_local(key, data, op).await
}

// =================== EXIST ==================

pub(crate) async fn exist_on_local_node(
backend: &Backend,
keys: &[BobKey],
) -> Result<Vec<bool>, Error> {
Ok(backend
.exist(keys, &BobGetOptions::new_get(Some(GetOptions::new_local())))
.await?)
}

pub(crate) async fn exist_on_local_alien(
backend: &Backend,
keys: &[BobKey],
) -> Result<Vec<bool>, Error> {
Ok(backend
.exist(keys, &BobGetOptions::new_get(Some(GetOptions::new_alien())))
.await?)
}

pub(crate) async fn exist_on_remote_nodes(
keys_by_node: &HashMap<String, (Node, Vec<BobKey>)>,
) -> Vec<Result<NodeOutput<Vec<bool>>, NodeOutput<Error>>> {
LinkManager::call_nodes(keys_by_node.values().map(|(n, _)| n), |client| {
Box::pin(client.exist(
keys_by_node.get(client.node().name()).expect("map is based on nodes from values").1.clone(),
GetOptions::new_local(),
))
})
.await
}

pub(crate) async fn exist_on_remote_aliens(
nodes: &[&Node],
keys: &[BobKey],
) -> Vec<Result<NodeOutput<Vec<bool>>, NodeOutput<Error>>> {
LinkManager::call_nodes(nodes.iter().map(|n| *n), |client| {
Box::pin(client.exist(keys.to_vec(), GetOptions::new_alien()))
})
.await
}

// =================== DELETE =================

Expand Down
181 changes: 162 additions & 19 deletions bob/src/cluster/quorum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use crate::prelude::*;

use super::{
operations::{
delete_on_local_node, delete_on_local_aliens, delete_on_remote_nodes, delete_on_remote_nodes_with_options,
group_keys_by_nodes, lookup_local_alien, lookup_local_node, lookup_remote_aliens, finish_at_least_handles,
lookup_remote_nodes, put_at_least, put_local_all, put_local_node, put_sup_nodes, Tasks,
delete_on_local_aliens, delete_on_local_node, delete_on_remote_nodes,
delete_on_remote_nodes_with_options, exist_on_local_alien, exist_on_local_node,
exist_on_remote_aliens, exist_on_remote_nodes, finish_at_least_handles, lookup_local_alien,
lookup_local_node, lookup_remote_aliens, lookup_remote_nodes, put_at_least, put_local_all,
put_local_node, put_sup_nodes, Tasks,
},
support_types::{ RemoteDeleteError, HashSetExt },
support_types::{HashSetExt, RemoteDeleteError, IndexMap},
Cluster,
};
use crate::link_manager::LinkManager;

#[derive(Clone)]
pub(crate) struct Quorum {
Expand Down Expand Up @@ -292,11 +293,11 @@ impl Quorum {
.filter(|n| !sup_nodes_set.contains(n.name()) && n.name() != local_node_name)
.map(|n| (n, DeleteOptions::new_alien(vec![])))
.collect();

trace!("DELETE[{}] normal alien deletion requests: {:?}", key, all_other_nodes_queries);
if let Err(sup_nodes_errors) = delete_on_remote_nodes(key, meta, all_other_nodes_queries.into_iter()).await {
debug!("delete on aliens nodes errors: {:?}", sup_nodes_errors);
};
};


// Delete on local node
Expand All @@ -318,6 +319,93 @@ impl Quorum {
}
Ok(())
}

async fn collect_remote_exists(
result: &mut [bool],
keys: &[BobKey],
indexes_by_node: &mut HashMap<Node, IndexMap>,
) {
if indexes_by_node.is_empty() {
return;
}

let mut node_keys_by_node_name = HashMap::new();
for (node, node_map) in indexes_by_node.iter_mut() {
node_map.retain_not_existed(&result);
if !node_map.is_empty() {
node_keys_by_node_name.insert(node.name().to_owned(), (node.clone(), node_map.collect(keys)));
}
}

if !node_keys_by_node_name.is_empty() {
let remote_results = exist_on_remote_nodes(&node_keys_by_node_name).await;
for remote_result in remote_results.into_iter() {
match remote_result {
Ok(remote_result) => {
let node = &node_keys_by_node_name.get(remote_result.node_name())
.expect("result should be from known node").0;
debug_assert!(node.name() == remote_result.node_name());
indexes_by_node
.get(&node)
.expect("node should exist")
.update_existence(result, remote_result.inner());
trace!("Check existence on node {}: found {}/{} keys",
node.name(), remote_result.inner().iter().filter(|f| **f).count(), remote_result.inner().len());
}
Err(e) => {
debug!("Failed to check existence on node {}: {:?}", e.node_name(), e);
}
}
}
}
}

fn group_by_nodes(
keys: &[BobKey],
mapper: &Virtual,
) -> (
Option<IndexMap>,
HashMap<Node, IndexMap>,
HashMap<Node, IndexMap>,
) {
let mut local = IndexMap::new();
let mut primary = HashMap::new();
let mut secondary = HashMap::new();

let local_node = mapper.local_node_name();

for (index, &key) in keys.iter().enumerate() {
let target_nodes = mapper.get_target_nodes_for_key(key);

if !target_nodes
.iter()
.any(|n| n.name() == local_node || primary.contains_key(n))
{
if let Some(node) = target_nodes.iter().find(|n| !secondary.contains_key(*n)) {
primary.insert(node.clone(), IndexMap::new());
}
}

for node in target_nodes {
if node.name() == local_node {
local.push(index);
} else if let Some(map) = primary.get_mut(node) {
map.push(index)
} else {
secondary
.entry(node.clone())
.or_insert(IndexMap::new())
.push(index);
}
}
}

return (
if local.is_empty() { None } else { Some(local) },
primary,
secondary,
);
}
}

#[async_trait]
Expand Down Expand Up @@ -351,22 +439,77 @@ impl Cluster for Quorum {
}

async fn exist(&self, keys: &[BobKey]) -> Result<Vec<bool>, Error> {
ikopylov marked this conversation as resolved.
Show resolved Hide resolved
let keys_by_nodes = group_keys_by_nodes(&self.mapper, keys);
debug!(
"EXIST Nodes for fan out: {:?}",
&keys_by_nodes.keys().flatten().collect::<Vec<_>>()
);
let len = keys.len();
let mut exist = vec![false; len];
for (nodes, (keys, indexes)) in keys_by_nodes {
let cluster_results = LinkManager::exist_on_nodes(&nodes, &keys).await;
for result in cluster_results.into_iter().flatten() {
for (&r, &ind) in result.inner().iter().zip(&indexes) {
exist[ind] |= r;
debug!("EXIST {} keys", len);

let mut result = vec![false; len];

let (local, mut primary, mut secondary) = Self::group_by_nodes(keys, &self.mapper);

if let Some(local) = local {
if !local.is_empty() {
trace!("EXIST {} keys check local node", len);
let local_keys = local.collect(keys);
match exist_on_local_node(&self.backend, &local_keys).await {
Ok(local_exist) => {
trace!("EXIST {} keys check local node: found {}/{} keys",
len, local_exist.iter().filter(|v| **v).count(), local.len());
local.update_existence(&mut result, &local_exist);
},
Err(e) => warn!("EXIST {} check local node failed: {:?}", len, e)
};
}
}

trace!("EXIST {} keys check primary nodes", len);
Self::collect_remote_exists(&mut result, keys, &mut primary).await;
trace!("EXIST {} keys check primary nodes finished", len);

trace!("EXIST {} keys check secondary nodes", len);
Self::collect_remote_exists(&mut result, keys, &mut secondary).await;
trace!("EXIST {} keys check secondary nodes finished", len);

let mut alien_index_map = IndexMap::where_not_exists(&result);

if !alien_index_map.is_empty() {
trace!("EXIST {} keys check local alien", len);
match exist_on_local_alien(&self.backend, &alien_index_map.collect(keys)).await {
Ok(local_alien_result) => {
trace!("EXIST {} keys check local alien finished: found {}/{} keys",
len, local_alien_result.iter().filter(|v| **v).count(), alien_index_map.len());
alien_index_map.update_existence(&mut result, &local_alien_result);
alien_index_map.retain_not_existed(&result)
},
Err(e) => warn!("EXIST {} keys check local alien failed: {:?}", len, e)
}
}

if !alien_index_map.is_empty() {
trace!("EXIST {} keys check remote alien", len);
let all_remote_nodes: Vec<_> = self
.mapper
.nodes()
.iter()
.filter(|n| n.name() != self.mapper.local_node_name())
.collect();
let remote_nodes_aliens_exist =
exist_on_remote_aliens(&all_remote_nodes, &alien_index_map.collect(keys)).await;
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
trace!("EXIST {} keys check remote alien finished", len);
for remote_alien_result in remote_nodes_aliens_exist {
match remote_alien_result {
Ok(remote_alien_result) => {
alien_index_map.update_existence(&mut result, remote_alien_result.inner());
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
trace!("Check existence in alien on node {}: found {}/{} keys",
remote_alien_result.node_name(), remote_alien_result.inner().iter().filter(|f| **f).count(),
remote_alien_result.inner().len());
}
Err(e) => debug!("EXIST {} keys check remote alien failed on node {}: {:?}",
len, e.node_name(), e)
}
}
}
Ok(exist)

Ok(result)
}

async fn delete(&self, key: BobKey, meta: &BobMeta) -> Result<(), Error> {
Expand Down
Loading