Skip to content

Commit

Permalink
chore(bors): merge pull request #695
Browse files Browse the repository at this point in the history
695: Fixup correct nvme controller and make child add v1 idempotent r=tiagolobocastro a=tiagolobocastro

    feat(csi/node/timeout): add io-engine timeout and parse humantime
    
    Adds new parameter "--io-engine-io-timeout".
    This is used as a base for the nvme core io timeout: we add a slack of
    10s to this value, allowing the backend to fail io first.
    Also let's parse the "--nvme-core-io-timeout" as humantime as well..
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    fix(nexus/add-child/v1): make add child v1 idempotent
    
    When v1 nexus add child was added, it was not made idempotent.
    Even though this is not an issue per se, as the child eventually gets
    GCd and re-added it can cause strange logging..
    TODO: should we have different behaviour depending on the state?
    Example if faulted should we remove/readd?
    Bonus: Fixes old test which stopped working a long time ago when
    pstor was enabled for the data-plane by not enabling it for that
    particular test only..
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    fix(csi-node/nvmf/fixup): fixup correct nvme controller
    
    When we replace an existing path, the new path has a different controller number. And so
    the controller number and device number now mismatch, meaning we can not safely deref
    /sys/class/nvme/nvme{major}
    Instead, we can simply deref
    /sys/class/block/nvme{major}c*n1/queue
    The major ensures we use the original device number, and the glob ensures we modify the
    timeout for all controllers.
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>


Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Dec 6, 2023
2 parents baa101c + 94a222c commit 6cf3f16
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 69 deletions.
37 changes: 27 additions & 10 deletions control-plane/agents/src/bin/core/controller/io_engine/v1/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,39 @@ impl crate::controller::io_engine::NexusChildApi<Nexus, Nexus, ()> for super::Rp
err
)]
async fn add_child(&self, request: &AddNexusChild) -> Result<Nexus, SvcError> {
let rpc_nexus = self
let result = self
.nexus()
.add_child_nexus(request.to_rpc())
.await
.context(GrpcRequestError {
resource: ResourceKind::Child,
request: "add_child_nexus",
})?;
match rpc_nexus.into_inner().nexus {
None => Err(SvcError::Internal {
details: format!(
"resource: {}, request: {}, err: {}",
"Nexus", "add_child", "no nexus returned"
),
}),
Some(nexus) => Ok(rpc_nexus_to_agent(&nexus, &request.node)?),
});
match result {
Ok(rpc_nexus) => match rpc_nexus.into_inner().nexus {
None => Err(SvcError::Internal {
details: format!(
"resource: {}, request: {}, err: {}",
"Nexus", "add_child", "no nexus returned"
),
}),
Some(nexus) => Ok(rpc_nexus_to_agent(&nexus, &request.node)?),
},
Err(error) if error.tonic_code() == tonic::Code::AlreadyExists => {
let nexus = self.fetch_nexus(&request.nexus).await?;
if let Some(child) = nexus.child(request.uri.as_str()) {
// todo: Should we do anything here depending on the state?
tracing::warn!(
?child,
nexus=%request.nexus,
"Child is already part of the nexus"
);
Ok(nexus)
} else {
Err(error)
}
}
Err(error) => Err(error),
}
}

Expand Down
26 changes: 19 additions & 7 deletions control-plane/agents/src/bin/core/tests/nexus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ async fn nexus_child_transaction() {
.with_agents(vec!["core"])
.with_req_timeouts(grpc_timeout, grpc_timeout)
.with_grpc_timeouts(grpc_timeout_opts())
.with_reconcile_period(Duration::from_secs(100), Duration::from_secs(100))
.build()
.await
.unwrap();
Expand Down Expand Up @@ -489,12 +490,12 @@ async fn nexus_child_transaction() {
// unpause io_engine
cluster.composer().thaw(io_engine.as_str()).await.unwrap();

// now it should be shared successfully
let uri = nexus_client
// now it should be added successfully
let child = nexus_client
.add_nexus_child(&add_child, None)
.await
.unwrap();
println!("Share uri: {uri:?}");
println!("Child: {child:?}");

cluster.composer().pause(io_engine.as_str()).await.unwrap();

Expand All @@ -520,13 +521,23 @@ async fn nexus_child_transaction() {
.len(),
1
);

let mut io_engine = cluster.grpc_handle(&cluster.node(0)).await.unwrap();
io_engine
.add_child(add_child.nexus.as_str(), add_child.uri.as_str(), true)
.await
.unwrap();

// now it should be added successfully
let child = nexus_client
.add_nexus_child(&add_child, None)
.await
.unwrap();
println!("Child: {child:?}");
}

/// Tests child add and remove operations when the store is temporarily down
/// TODO: these tests don't work anymore because the io_engine also writes child healthy states
/// to etcd so we can't simply pause etcd anymore..
/// Tests child add and remove operations when the store is temporarily down.
#[tokio::test]
#[ignore]
async fn nexus_child_transaction_store() {
let store_timeout = Duration::from_millis(250);
let reconcile_period = Duration::from_millis(250);
Expand All @@ -539,6 +550,7 @@ async fn nexus_child_transaction_store() {
.with_reconcile_period(reconcile_period, reconcile_period)
.with_store_timeout(store_timeout)
.with_grpc_timeouts(grpc_timeout_opts())
.with_options(|b| b.with_io_engine_no_pstor(true))
.build()
.await
.unwrap();
Expand Down
100 changes: 55 additions & 45 deletions control-plane/csi-driver/src/bin/node/dev/nvmf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use csi_driver::PublishParams;
use glob::glob;
use nvmeadm::nvmf_subsystem::Subsystem;
use regex::Regex;
use tracing::debug;
use udev::{Device, Enumerator};
use url::Url;
use uuid::Uuid;
Expand Down Expand Up @@ -200,47 +199,59 @@ impl Attach for NvmfAttach {
}

async fn fixup(&self) -> Result<(), DeviceError> {
if let Some(io_timeout) = self.io_timeout {
let device = self
.get_device()?
.ok_or_else(|| DeviceError::new("NVMe device not found"))?;
let dev_name = device.sysname().to_str().unwrap();
let major = DEVICE_REGEX
.captures(dev_name)
.ok_or_else(|| {
DeviceError::new(&format!(
"NVMe device \"{}\" does not match \"{}\"",
dev_name, *DEVICE_REGEX,
))
})?
.get(1)
.unwrap()
.as_str();
let pattern = format!("/sys/class/nvme/nvme{major}/nvme*n1/queue");
let path = glob(&pattern)
.unwrap()
.next()
.ok_or_else(|| {
DeviceError::new(&format!(
"failed to look up sysfs device directory \"{pattern}\"",
))
})?
.map_err(|_| {
DeviceError::new(&format!(
"IO error when reading device directory \"{pattern}\""
))
})?;
// If the timeout was higher than nexus's timeout then IOs could
// error out earlier than they should. Therefore we should make sure
// that timeouts in the nexus are set to a very high value.
debug!(
"Setting IO timeout on \"{}\" to {}s",
path.to_string_lossy(),
io_timeout
);
sysfs::write_value(&path, "io_timeout", 1000 * io_timeout)?;
let Some(io_timeout) = self.io_timeout else {
return Ok(());
};

let device = self
.get_device()?
.ok_or_else(|| DeviceError::new("NVMe device not found"))?;
let dev_name = device.sysname().to_str().unwrap();
let major = DEVICE_REGEX
.captures(dev_name)
.ok_or_else(|| {
DeviceError::new(&format!(
"NVMe device \"{}\" does not match \"{}\"",
dev_name, *DEVICE_REGEX,
))
})?
.get(1)
.unwrap()
.as_str();
let pattern = format!("/sys/class/block/nvme{major}c*n1/queue");
let glob = glob(&pattern).unwrap();
let result = glob
.into_iter()
.map(|glob_result| {
match glob_result {
Ok(path) => {
let path_str = path.display();
// If the timeout was higher than nexus's timeout then IOs could
// error out earlier than they should. Therefore we should make sure
// that timeouts in the nexus are set to a very high value.
tracing::debug!("Setting IO timeout on \"{path_str}\" to {io_timeout}s",);
sysfs::write_value(&path, "io_timeout", 1000 * io_timeout).map_err(
|error| {
tracing::error!(%error, path=%path_str, "Failed to set io_timeout to {io_timeout}s");
error.into()
},
)
}
Err(error) => {
// This should never happen as we should always have permissions to list.
tracing::error!(%error, "Unable to collect sysfs for /dev/nvme{major}");
Err(DeviceError::new(error.to_string().as_str()))
}
}
})
.collect::<Result<Vec<()>, DeviceError>>();
match result {
Ok(r) if r.is_empty() => Err(DeviceError::new(&format!(
"look up of sysfs device directory \"{pattern}\" found 0 entries",
))),
Ok(_) => Ok(()),
Err(error) => Err(error),
}
Ok(())
}
}

Expand Down Expand Up @@ -284,10 +295,9 @@ pub(crate) fn check_nvme_tcp_module() -> Result<(), std::io::Error> {
/// (note, this is a system-wide parameter)
pub(crate) fn set_nvmecore_iotimeout(io_timeout_secs: u32) -> Result<(), std::io::Error> {
let path = Path::new("/sys/module/nvme_core/parameters");
debug!(
"Setting nvme_core IO timeout on \"{}\" to {}s",
path.to_string_lossy(),
io_timeout_secs
tracing::debug!(
"Setting nvme_core IO timeout on \"{path}\" to {io_timeout_secs}s",
path = path.to_string_lossy(),
);
sysfs::write_value(path, "io_timeout", io_timeout_secs)?;
Ok(())
Expand Down
35 changes: 30 additions & 5 deletions control-plane/csi-driver/src/bin/node/main_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::{
future::Future,
io::ErrorKind,
pin::Pin,
str::FromStr,
sync::Arc,
task::{Context, Poll},
};
Expand Down Expand Up @@ -124,7 +125,13 @@ pub(super) async fn main() -> anyhow::Result<()> {
.long("nvme-core-io-timeout")
.value_name("TIMEOUT")
.required(false)
.help("Sets the global nvme_core module io_timeout, in seconds")
.help("Sets the global nvme_core module io_timeout, in seconds or humantime")
)
.arg(
Arg::new("io-engine-io-timeout")
.long("io-engine-io-timeout")
.required(false)
.help("Derives and sets the nvme_core module io_timeout based on the io-engine timeout")
)
.arg(
Arg::new(crate::config::nvme_nr_io_queues())
Expand Down Expand Up @@ -162,6 +169,7 @@ pub(super) async fn main() -> anyhow::Result<()> {
.get_matches();

utils::print_package_info!();
println!("{:?}", env::args().collect::<Vec<String>>());

let endpoint = matches.get_one::<String>("grpc-endpoint").unwrap();
let csi_socket = matches
Expand All @@ -183,10 +191,27 @@ pub(super) async fn main() -> anyhow::Result<()> {
check_ana_and_label_node(matches.get_one::<String>("node-name").expect("required")).await?;
}

if let Some(nvme_io_timeout_secs) = matches.get_one::<String>("nvme-core-io-timeout") {
let io_timeout_secs: u32 = nvme_io_timeout_secs.parse().expect(
"nvme_core io_timeout should be an integer number, representing the timeout in seconds",
);
if let Some(nvme_io_timeout) = matches.get_one::<String>("io-engine-io-timeout") {
let io_timeout = humantime::Duration::from_str(nvme_io_timeout).map_err(|error| {
anyhow::format_err!("Failed to parse 'io-engine-io-timeout': {error}")
})?;
// Add slack of 10s allowing the io-engine to fail first.
let io_timeout_secs = io_timeout.as_secs() as u32 + 10;

if let Err(error) = crate::dev::nvmf::set_nvmecore_iotimeout(io_timeout_secs) {
anyhow::bail!("Failed to set nvme_core io_timeout: {}", error);
}
} else if let Some(nvme_io_timeout) = matches.get_one::<String>("nvme-core-io-timeout") {
let io_timeout_secs = match humantime::Duration::from_str(nvme_io_timeout) {
Ok(human_time) => {
human_time.as_secs() as u32
}
Err(_) => {
nvme_io_timeout.parse().expect(
"nvme_core io_timeout should be in humantime or an integer number, representing the timeout in seconds",
)
}
};

if let Err(error) = crate::dev::nvmf::set_nvmecore_iotimeout(io_timeout_secs) {
anyhow::bail!("Failed to set nvme_core io_timeout: {}", error);
Expand Down
2 changes: 1 addition & 1 deletion deployer/src/infra/io_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl ComponentAction for IoEngine {
spec = spec.with_env("DEVELOPER_DELAYED", "1");
}

if !options.no_etcd {
if !options.no_etcd && !options.io_engine_no_pstor {
let etcd = format!("etcd.{}:2379", options.cluster_label.name());
spec = spec.with_args(vec!["-p", &etcd]);
}
Expand Down
9 changes: 9 additions & 0 deletions deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ pub struct StartOptions {
)]
io_engine_api_versions: Vec<IoEngineApiVersion>,

/// Don't configure the persistent store with the io-engine.
#[clap(long)]
io_engine_no_pstor: bool,

/// Set the developer delayed env flag of the io_engine reactor.
#[clap(short, long)]
pub developer_delayed: bool,
Expand Down Expand Up @@ -438,6 +442,11 @@ impl StartOptions {
self
}
#[must_use]
pub fn with_io_engine_no_pstor(mut self, no_pstor: bool) -> Self {
self.io_engine_no_pstor = no_pstor;
self
}
#[must_use]
pub fn with_io_engine_cores(mut self, cores: u32) -> Self {
self.io_engine_cores = cores;
self
Expand Down
2 changes: 1 addition & 1 deletion utils/deployer-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl Cluster {
Ok(CsiNodeClient { csi, internal })
}

/// restart the core agent
/// Restart the core agent.
pub async fn restart_core(&self) {
self.remove_store_lock(ControlPlaneService::CoreAgent).await;
self.composer.restart("core").await.unwrap();
Expand Down

0 comments on commit 6cf3f16

Please sign in to comment.