Skip to content

Commit

Permalink
refactor: use pagination to load values from pstor
Browse files Browse the repository at this point in the history
Signed-off-by: Abhinandan Purkait <purkaitabhinandan@gmail.com>
  • Loading branch information
Abhinandan-Purkait committed Feb 13, 2024
1 parent 7a7d1ce commit 30503bc
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 41 deletions.
12 changes: 10 additions & 2 deletions control-plane/agents/src/bin/core/controller/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ pub(crate) struct RegistryInner<S: Store> {
thin_args: ThinArgs,
/// Check if the HA feature is enabled.
ha_disabled: bool,
/// Etcd max page size.
etcd_max_page_size: i64,
}

impl Registry {
Expand All @@ -121,6 +123,7 @@ impl Registry {
host_acl: Vec<HostAccessControl>,
thin_args: ThinArgs,
ha_enabled: bool,
etcd_max_page_size: i64,
) -> Result<Self, SvcError> {
let store_endpoint = Self::format_store_endpoint(&store_url);
tracing::info!("Connecting to persistent store at {}", store_endpoint);
Expand Down Expand Up @@ -171,14 +174,15 @@ impl Registry {
legacy_prefix_present,
thin_args,
ha_disabled: ha_enabled,
etcd_max_page_size,
}),
};
registry.init().await?;

// Disable v1 compat if nexus_info keys are migrated.
if registry.config().mayastor_compat_v1() && registry.nexus_info_v1_migrated().await? {
// Delete the v1 nexus_info keys by brute force.
delete_all_v1_nexus_info(&mut store)
delete_all_v1_nexus_info(&mut store, etcd_max_page_size)
.await
.map_err(|error| StoreError::Generic {
source: Box::new(error),
Expand Down Expand Up @@ -406,7 +410,11 @@ impl Registry {
async fn init(&self) -> Result<(), SvcError> {
let mut store = self.store.lock().await;
self.specs
.init(store.deref_mut(), self.legacy_prefix_present)
.init(
store.deref_mut(),
self.legacy_prefix_present,
self.etcd_max_page_size,
)
.await?;
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ pub const MSP_OPERATOR: &str = "msp-operator";
pub(crate) async fn migrate_product_v1_to_v2<S: Store>(
store: &mut S,
spec_type: StorableObjectType,
etcd_max_page_size: i64,
) -> Result<(), StoreError> {
info!("Migrating {spec_type:?} from v1 to v2 key space");
let prefix = &product_v1_key_prefix_obj(spec_type);
let store_entries = store.get_values_prefix(prefix).await?;
let store_entries = store
.get_values_paged_all(prefix, etcd_max_page_size)
.await?;
for (k, v) in store_entries {
let id = k
.split('/')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::controller::{
};
use agents::errors::SvcError;
use stor_port::{
transport_api::ResourceKind,
pstor::{product_v1_key_prefix, API_VERSION},
transport_api::{ErrorChain, ResourceKind},
types::v0::{
openapi::apis::Uuid,
store::{
Expand All @@ -18,10 +19,11 @@ use stor_port::{
node::NodeSpec,
pool::PoolSpec,
replica::ReplicaSpec,
volume::{AffinityGroupSpec, VolumeSpec},
snapshots::volume::VolumeSnapshot,
volume::{AffinityGroupSpec, VolumeContentSource, VolumeSpec},
AsOperationSequencer, OperationMode, OperationSequence, SpecStatus, SpecTransaction,
},
transport::{NexusId, NodeId, PoolId, ReplicaId, VolumeId},
transport::{NexusId, NodeId, PoolId, ReplicaId, SnapshotId, VolumeId},
},
};

Expand All @@ -30,14 +32,6 @@ use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use snafu::{ResultExt, Snafu};
use std::{fmt::Debug, ops::Deref, sync::Arc};
use stor_port::{
pstor::{product_v1_key_prefix, API_VERSION},
transport_api::ErrorChain,
types::v0::{
store::{snapshots::volume::VolumeSnapshot, volume::VolumeContentSource},
transport::SnapshotId,
},
};

#[derive(Debug, Snafu)]
#[snafu(context(suffix(false)))]
Expand Down Expand Up @@ -909,6 +903,7 @@ impl ResourceSpecsLocked {
&self,
store: &mut S,
legacy_prefix_present: bool,
etcd_max_page_size: i64,
) -> Result<(), SvcError> {
let spec_types = [
StorableObjectType::VolumeSpec,
Expand All @@ -919,7 +914,7 @@ impl ResourceSpecsLocked {
StorableObjectType::VolumeSnapshot,
];
for spec in &spec_types {
self.populate_specs(store, *spec, legacy_prefix_present)
self.populate_specs(store, *spec, legacy_prefix_present, etcd_max_page_size)
.await
.map_err(|error| SvcError::Internal {
details: error.full_string(),
Expand Down Expand Up @@ -996,22 +991,22 @@ impl ResourceSpecsLocked {
store: &mut S,
spec_type: StorableObjectType,
legacy_prefix_present: bool,
etcd_max_page_size: i64,
) -> Result<(), SpecError> {
if legacy_prefix_present {
migrate_product_v1_to_v2(store, spec_type)
migrate_product_v1_to_v2(store, spec_type, etcd_max_page_size)
.await
.map_err(|e| SpecError::StoreMigrate {
source: Box::new(e),
})?;
}
let prefix = key_prefix_obj(spec_type, API_VERSION);
let store_entries =
store
.get_values_prefix(&prefix)
.await
.map_err(|e| SpecError::StoreGet {
source: Box::new(e),
})?;
let store_entries = store
.get_values_paged_all(&prefix, etcd_max_page_size)
.await
.map_err(|e| SpecError::StoreGet {
source: Box::new(e),
})?;
let store_values = store_entries.iter().map(|e| e.1.clone()).collect();

let mut resource_specs = self.0.write();
Expand Down
7 changes: 6 additions & 1 deletion control-plane/agents/src/bin/core/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) mod watch;
use clap::Parser;
use controller::registry::NumRebuilds;
use std::{net::SocketAddr, num::ParseIntError};
use utils::{version_info_str, DEFAULT_GRPC_SERVER_ADDR};
use utils::{version_info_str, DEFAULT_GRPC_SERVER_ADDR, ETCD_MAX_PAGE_LIMIT};

use stor_port::HostAccessControl;
use utils::tracing_telemetry::{trace::TracerProvider, KeyValue};
Expand Down Expand Up @@ -108,6 +108,10 @@ pub(crate) struct CliArgs {
/// This is useful when the frontend nodes do not support the NVMe ANA feature.
#[clap(long, env = "HA_DISABLED")]
pub(crate) disable_ha: bool,

/// Etcd Pagination Limit.
#[clap(long, default_value = ETCD_MAX_PAGE_LIMIT)]
pub(crate) etcd_page_limit: u32,
}
impl CliArgs {
fn args() -> Self {
Expand Down Expand Up @@ -178,6 +182,7 @@ async fn server(cli_args: CliArgs) -> anyhow::Result<()> {
},
cli_args.thin_args,
cli_args.disable_ha,
cli_args.etcd_page_limit as i64,
)
.await?;

Expand Down
114 changes: 113 additions & 1 deletion control-plane/agents/src/bin/core/tests/controller/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use deployer_cluster::{etcd_client::Client, *};
use itertools::Itertools;
use serde_json::Value;
use std::str::FromStr;
use stor_port::{
pstor::{etcd::Etcd, StoreObj},
pstor::{etcd::Etcd, StoreKv, StoreObj},
types::v0::{
openapi::models,
store::registry::{ControlPlaneService, StoreLeaseOwner, StoreLeaseOwnerKey},
transport,
},
};
use uuid::Uuid;

/// Test that the content of the registry is correctly loaded from the persistent store on start up.
#[tokio::test]
Expand Down Expand Up @@ -212,3 +216,111 @@ async fn core_agent_lease_lock() {
tracing::info!("core: {:?}", core.state);
assert_eq!(Some(false), core.state.unwrap().running);
}

const OLD_VOLUME_PREFIX: &str = "/namespace/default/control-plane/VolumeSpec";

#[tokio::test]
async fn etcd_pagination() {
let lease_ttl = std::time::Duration::from_secs(2);
let cluster = ClusterBuilder::builder()
.with_io_engines(0)
.with_rest(false)
.with_jaeger(false)
.with_store_lease_ttl(lease_ttl)
.build()
.await
.unwrap();

let mut etcd = Etcd::new("0.0.0.0:2379").await.unwrap();

let cluster_id = get_cluster_id(&mut etcd).await;

let node_prefix = format!(
"/openebs.io/mayastor/apis/v0/clusters/{}/namespaces/default/NodeSpec",
cluster_id
);
let volume_prefix = format!(
"/openebs.io/mayastor/apis/v0/clusters/{}/namespaces/default/VolumeSpec",
cluster_id
);

// Persist some nodes in etcd.
for i in 1 .. 11 {
let key = format!("{}/node{}", node_prefix, i);
let json_str = format!(
r#"{{"id":"mayastor-node{}","endpoint":"136.144.51.107:10124","labels":{{}}}}"#,
i
);
let value = Value::from_str(&json_str).unwrap();
etcd.put_kv(&key, &value).await.unwrap();
}

// Persist some volumes in new keyspace in etcd.
for _i in 1 .. 6 {
let uuid = Uuid::new_v4();
let key = format!("{}/{}", volume_prefix, uuid);
let json_str = r#"{"uuid":"456122b1-7e19-4148-a890-579ca785a119","size":2147483648,"labels":{"local":"true"},"num_replicas":3,"status":{"Created":"Online"},"target":{"node":"mayastor-node4","nexus":"d6ccbb97-d13e-4ffb-91a0-c7607bb01f8f","protocol":"nvmf"},"policy":{"self_heal":true},"topology":{"node":{"Explicit":{"allowed_nodes":["mayastor-node2","mayastor-master","mayastor-node3","mayastor-node1","mayastor-node4"],"preferred_nodes":["mayastor-node2","mayastor-node3","mayastor-node4","mayastor-master","mayastor-node1"]}},"pool":{"Labelled":{"exclusion":{},"inclusion":{"openebs.io/created-by":"msp-operator"}}}},"last_nexus_id":"d6ccbb97-d13e-4ffb-91a0-c7607bb01f8f","operation":null}"#;
let value = Value::from_str(json_str).unwrap();
etcd.put_kv(&key, &value).await.unwrap();
}

// Persist some volumes in old key space in etcd.
for _i in 1 .. 6 {
let uuid = Uuid::new_v4();
let key = format!("{}/{}", OLD_VOLUME_PREFIX, uuid);
let json_str = r#"{"uuid":"456122b1-7e19-4148-a890-579ca785a119","size":2147483648,"labels":{"local":"true"},"num_replicas":3,"status":{"Created":"Online"},"target":{"node":"mayastor-node4","nexus":"d6ccbb97-d13e-4ffb-91a0-c7607bb01f8f","protocol":"nvmf"},"policy":{"self_heal":true},"topology":{"node":{"Explicit":{"allowed_nodes":["mayastor-node2","mayastor-master","mayastor-node3","mayastor-node1","mayastor-node4"],"preferred_nodes":["mayastor-node2","mayastor-node3","mayastor-node4","mayastor-master","mayastor-node1"]}},"pool":{"Labelled":{"exclusion":{},"inclusion":{"openebs.io/created-by":"msp-operator"}}}},"last_nexus_id":"d6ccbb97-d13e-4ffb-91a0-c7607bb01f8f","operation":null}"#;
let value = Value::from_str(json_str).unwrap();
etcd.put_kv(&key, &value).await.unwrap();
}

// Persist some nexus info in old key space in etcd.
for _i in 1 .. 6 {
let uuid = Uuid::new_v4();
let key = format!("{}", uuid);
let json_str = r#"{"children":[{"healthy":true,"uuid":"82779efa-a0c7-4652-a37b-83eefd894714"},{"healthy":true,"uuid":"2d98fa96-ac12-40be-acdc-e3559c0b1530"},{"healthy":true,"uuid":"620ff519-419a-48d6-97a8-c1ba3260d87e"}],"clean_shutdown":false}"#;
let value = Value::from_str(json_str).unwrap();
etcd.put_kv(&key, &value).await.unwrap();
}

// There Should be exactly 10 Nodes.
let node_kvs = etcd.get_values_paged_all(&node_prefix, 3).await.unwrap();
assert_eq!(node_kvs.len(), 10);

// There Should be exactly 5 New Volumes.
let volume_kvs = etcd.get_values_paged_all(&volume_prefix, 3).await.unwrap();
assert_eq!(volume_kvs.len(), 5);

// There Should be exactly 5 Old Volumes.
let volume_kvs = etcd
.get_values_paged_all(OLD_VOLUME_PREFIX, 3)
.await
.unwrap();
assert_eq!(volume_kvs.len(), 5);

cluster.restart_core().await;
cluster
.volume_service_liveness(None)
.await
.expect("Should have restarted by now");

// There Should be exactly 10 New Volumes, after the migration.
let volume_kvs_all = etcd.get_values_paged_all(&volume_prefix, 3).await.unwrap();
assert_eq!(volume_kvs_all.len(), 10);

let all = etcd.get_values_paged_all("", 3).await.unwrap();
assert_eq!(all.len(), 28);
}

// Helper Methods

// Get the cluster id from etcd.
async fn get_cluster_id(etcd: &mut Etcd) -> String {
let kvs = etcd.get_values_paged("", 3, "").await.unwrap();
let chunks = kvs[0].0.split('/').collect::<Vec<&str>>();
let cluster_word_ind = chunks
.iter()
.find_position(|x| **x == "clusters")
.unwrap()
.0;
chunks[cluster_word_ind + 1].to_string()
}
10 changes: 5 additions & 5 deletions control-plane/stor-port/src/types/v0/store/nexus_persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ use std::fmt::Debug;
use tracing::info;
use uuid::Uuid;

/// ETCD Pagination limit.
const ETCD_PAGED_LIMIT: i64 = 1000;

/// Definition of the nexus information that gets saved in the persistent
/// store.
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -139,12 +136,15 @@ impl StorableObject for NexusInfo {

/// Deletes all v1 nexus_info by fetching all keys and parsing the key to UUID and deletes on
/// success.
pub async fn delete_all_v1_nexus_info<S: Store>(store: &mut S) -> Result<(), StoreError> {
pub async fn delete_all_v1_nexus_info<S: Store>(
store: &mut S,
etcd_page_limit: i64,
) -> Result<(), StoreError> {
let mut prefix: &str = "";
let mut first = true;
let mut kvs;
loop {
kvs = store.get_values_paged(prefix, ETCD_PAGED_LIMIT).await?;
kvs = store.get_values_paged(prefix, etcd_page_limit, "").await?;
if !first && kvs.get(0).is_some() {
kvs.remove(0);
}
Expand Down
8 changes: 8 additions & 0 deletions utils/pstor/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ pub trait StoreKv: Sync + Send + Clone {
&mut self,
key_prefix: &str,
limit: i64,
range_end: &str,
) -> Result<Vec<(String, Value)>, Error>;
/// Returns a vector of tuples. Each tuple represents a key-value pair. It paginates through all
/// the values for the prefix with limit.
async fn get_values_paged_all(
&mut self,
key_prefix: &str,
limit: i64,
) -> Result<Vec<(String, Value)>, Error>;
/// Deletes all key values from a given prefix.
async fn delete_values_prefix(&mut self, key_prefix: &str) -> Result<(), Error>;
Expand Down
2 changes: 2 additions & 0 deletions utils/pstor/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,6 @@ pub enum Error {
source: Box<Error>,
description: String,
},
#[snafu(display("Failed to parse range end for start key: '{}'", start_key))]
RangeEnd { start_key: String },
}
Loading

0 comments on commit 30503bc

Please sign in to comment.