Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixup correct nvme controller and make child add v1 idempotent #695

Merged
merged 3 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,39 @@ impl crate::controller::io_engine::NexusChildApi<Nexus, Nexus, ()> for super::Rp
err
)]
async fn add_child(&self, request: &AddNexusChild) -> Result<Nexus, SvcError> {
let rpc_nexus = self
let result = self
.nexus()
.add_child_nexus(request.to_rpc())
.await
.context(GrpcRequestError {
resource: ResourceKind::Child,
request: "add_child_nexus",
})?;
match rpc_nexus.into_inner().nexus {
None => Err(SvcError::Internal {
details: format!(
"resource: {}, request: {}, err: {}",
"Nexus", "add_child", "no nexus returned"
),
}),
Some(nexus) => Ok(rpc_nexus_to_agent(&nexus, &request.node)?),
});
match result {
Ok(rpc_nexus) => match rpc_nexus.into_inner().nexus {
None => Err(SvcError::Internal {
details: format!(
"resource: {}, request: {}, err: {}",
"Nexus", "add_child", "no nexus returned"
),
}),
Some(nexus) => Ok(rpc_nexus_to_agent(&nexus, &request.node)?),
},
Err(error) if error.tonic_code() == tonic::Code::AlreadyExists => {
let nexus = self.fetch_nexus(&request.nexus).await?;
if let Some(child) = nexus.child(request.uri.as_str()) {
// todo: Should we do anything here depending on the state?
tracing::warn!(
?child,
nexus=%request.nexus,
"Child is already part of the nexus"
);
Ok(nexus)
} else {
Err(error)
}
}
Err(error) => Err(error),
}
}

Expand Down
26 changes: 19 additions & 7 deletions control-plane/agents/src/bin/core/tests/nexus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ async fn nexus_child_transaction() {
.with_agents(vec!["core"])
.with_req_timeouts(grpc_timeout, grpc_timeout)
.with_grpc_timeouts(grpc_timeout_opts())
.with_reconcile_period(Duration::from_secs(100), Duration::from_secs(100))
.build()
.await
.unwrap();
Expand Down Expand Up @@ -489,12 +490,12 @@ async fn nexus_child_transaction() {
// unpause io_engine
cluster.composer().thaw(io_engine.as_str()).await.unwrap();

// now it should be shared successfully
let uri = nexus_client
// now it should be added successfully
let child = nexus_client
.add_nexus_child(&add_child, None)
.await
.unwrap();
println!("Share uri: {uri:?}");
println!("Child: {child:?}");

cluster.composer().pause(io_engine.as_str()).await.unwrap();

Expand All @@ -520,13 +521,23 @@ async fn nexus_child_transaction() {
.len(),
1
);

let mut io_engine = cluster.grpc_handle(&cluster.node(0)).await.unwrap();
io_engine
.add_child(add_child.nexus.as_str(), add_child.uri.as_str(), true)
.await
.unwrap();

// now it should be added successfully
let child = nexus_client
.add_nexus_child(&add_child, None)
.await
.unwrap();
println!("Child: {child:?}");
}

/// Tests child add and remove operations when the store is temporarily down
/// TODO: these tests don't work anymore because the io_engine also writes child healthy states
/// to etcd so we can't simply pause etcd anymore..
/// Tests child add and remove operations when the store is temporarily down.
#[tokio::test]
#[ignore]
async fn nexus_child_transaction_store() {
let store_timeout = Duration::from_millis(250);
let reconcile_period = Duration::from_millis(250);
Expand All @@ -539,6 +550,7 @@ async fn nexus_child_transaction_store() {
.with_reconcile_period(reconcile_period, reconcile_period)
.with_store_timeout(store_timeout)
.with_grpc_timeouts(grpc_timeout_opts())
.with_options(|b| b.with_io_engine_no_pstor(true))
.build()
.await
.unwrap();
Expand Down
34 changes: 33 additions & 1 deletion control-plane/csi-driver/src/bin/node/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ pub fn nvme_keep_alive_tmo() -> String {
pub fn nvme_ctrl_loss_tmo() -> String {
Parameters::NvmeCtrlLossTmo.as_ref().to_kebab_case()
}
/// Command line arg name for `Parameters::NvmeIoTimeout`.
pub fn nvme_io_tmo() -> String {
Parameters::NvmeIoTimeout.as_ref().to_kebab_case()
}

/// Global configuration parameters.
#[derive(Debug, Default)]
Expand All @@ -42,17 +46,21 @@ pub(crate) struct NvmeConfig {
/// Default value for `ctrl_loss_tmo` when not specified via the volume parameters (sc).
ctrl_loss_tmo: Option<u32>,
keep_alive_tmo: Option<u32>,
/// Default value for `io_tmo` when not specified via the volume parameters (sc).
io_tmo: Option<humantime::Duration>,
}
impl NvmeConfig {
fn new(
nr_io_queues: Option<u32>,
ctrl_loss_tmo: Option<u32>,
keep_alive_tmo: Option<u32>,
io_tmo: Option<humantime::Duration>,
) -> Self {
Self {
nr_io_queues,
ctrl_loss_tmo,
keep_alive_tmo,
io_tmo,
}
}
/// Number of IO Queues.
Expand All @@ -68,6 +76,10 @@ impl NvmeConfig {
pub(crate) fn keep_alive_tmo(&self) -> Option<u32> {
self.keep_alive_tmo
}
/// The io timeout.
pub(crate) fn io_tmo(&self) -> Option<humantime::Duration> {
self.io_tmo
}
}

/// Get a mutex guard over the `Config`.
Expand Down Expand Up @@ -112,7 +124,22 @@ impl TryFrom<NvmeArgValues> for NvmeConfig {
error
)
})?;
Ok(Self::new(nvme_nr_ioq, ctrl_loss_tmo, keep_alive_tmo))
let nvme_io_tmo = Parameters::nvme_io_timeout(
src.0.get(Parameters::NvmeIoTimeout.as_ref()),
)
.map_err(|error| {
anyhow::anyhow!(
"Invalid value for {}, error = {}",
Parameters::NvmeIoTimeout.as_ref(),
error
)
})?;
Ok(Self::new(
nvme_nr_ioq,
ctrl_loss_tmo,
keep_alive_tmo,
nvme_io_tmo,
))
}
}
/// Nvme Arguments taken from the CSI volume calls (storage class parameters).
Expand Down Expand Up @@ -155,6 +182,11 @@ impl TryFrom<&ArgMatches> for NvmeArgValues {
map.0
.insert(Parameters::NvmeKeepAliveTmo.to_string(), value.to_string());
}

if let Some(value) = matches.get_one::<String>(&nvme_io_tmo()) {
map.0
.insert(Parameters::NvmeIoTimeout.to_string(), value.to_string());
}
Ok(map)
}
}
Expand Down
112 changes: 61 additions & 51 deletions control-plane/csi-driver/src/bin/node/dev/nvmf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use csi_driver::PublishParams;
use glob::glob;
use nvmeadm::nvmf_subsystem::Subsystem;
use regex::Regex;
use tracing::debug;
use udev::{Device, Enumerator};
use url::Url;
use uuid::Uuid;
Expand All @@ -35,7 +34,7 @@ pub(super) struct NvmfAttach {
port: u16,
uuid: Uuid,
nqn: String,
io_timeout: Option<u32>,
io_tmo: Option<u32>,
nr_io_queues: Option<u32>,
ctrl_loss_tmo: Option<u32>,
keep_alive_tmo: Option<u32>,
Expand All @@ -50,6 +49,7 @@ impl NvmfAttach {
uuid: Uuid,
nqn: String,
nr_io_queues: Option<u32>,
io_tmo: Option<humantime::Duration>,
ctrl_loss_tmo: Option<u32>,
keep_alive_tmo: Option<u32>,
hostnqn: Option<String>,
Expand All @@ -59,7 +59,7 @@ impl NvmfAttach {
port,
uuid,
nqn,
io_timeout: None,
io_tmo: io_tmo.map(|io_tmo| io_tmo.as_secs().try_into().unwrap_or(u32::MAX)),
nr_io_queues,
ctrl_loss_tmo,
keep_alive_tmo,
Expand Down Expand Up @@ -104,6 +104,7 @@ impl TryFrom<&Url> for NvmfAttach {
let nr_io_queues = config().nvme().nr_io_queues();
let ctrl_loss_tmo = config().nvme().ctrl_loss_tmo();
let keep_alive_tmo = config().nvme().keep_alive_tmo();
let io_tmo = config().nvme().io_tmo();

let hash_query: HashMap<_, _> = url.query_pairs().collect();
let hostnqn = hash_query.get("hostnqn").map(ToString::to_string);
Expand All @@ -114,6 +115,7 @@ impl TryFrom<&Url> for NvmfAttach {
uuid,
segments[0].to_string(),
nr_io_queues,
io_tmo,
ctrl_loss_tmo,
keep_alive_tmo,
hostnqn,
Expand All @@ -130,9 +132,6 @@ impl Attach for NvmfAttach {
let publish_context = PublishParams::try_from(context)
.map_err(|error| DeviceError::new(&error.to_string()))?;

if let Some(val) = publish_context.io_timeout() {
self.io_timeout = Some(*val);
}
if let Some(val) = publish_context.ctrl_loss_tmo() {
self.ctrl_loss_tmo = Some(*val);
}
Expand All @@ -159,7 +158,7 @@ impl Attach for NvmfAttach {
Err(NvmeError::SubsystemNotFound { .. }) => {
// The default reconnect delay in linux kernel is set to 10s. Use the
// same default value unless the timeout is less or equal to 10.
let reconnect_delay = match self.io_timeout {
let reconnect_delay = match self.io_tmo {
Some(io_timeout) => {
if io_timeout <= 10 {
Some(1)
Expand Down Expand Up @@ -200,47 +199,59 @@ impl Attach for NvmfAttach {
}

async fn fixup(&self) -> Result<(), DeviceError> {
if let Some(io_timeout) = self.io_timeout {
let device = self
.get_device()?
.ok_or_else(|| DeviceError::new("NVMe device not found"))?;
let dev_name = device.sysname().to_str().unwrap();
let major = DEVICE_REGEX
.captures(dev_name)
.ok_or_else(|| {
DeviceError::new(&format!(
"NVMe device \"{}\" does not match \"{}\"",
dev_name, *DEVICE_REGEX,
))
})?
.get(1)
.unwrap()
.as_str();
let pattern = format!("/sys/class/nvme/nvme{major}/nvme*n1/queue");
let path = glob(&pattern)
.unwrap()
.next()
.ok_or_else(|| {
DeviceError::new(&format!(
"failed to look up sysfs device directory \"{pattern}\"",
))
})?
.map_err(|_| {
DeviceError::new(&format!(
"IO error when reading device directory \"{pattern}\""
))
})?;
// If the timeout was higher than nexus's timeout then IOs could
// error out earlier than they should. Therefore we should make sure
// that timeouts in the nexus are set to a very high value.
debug!(
"Setting IO timeout on \"{}\" to {}s",
path.to_string_lossy(),
io_timeout
);
sysfs::write_value(&path, "io_timeout", 1000 * io_timeout)?;
let Some(io_timeout) = self.io_tmo else {
return Ok(());
};

let device = self
.get_device()?
.ok_or_else(|| DeviceError::new("NVMe device not found"))?;
let dev_name = device.sysname().to_str().unwrap();
let major = DEVICE_REGEX
.captures(dev_name)
.ok_or_else(|| {
DeviceError::new(&format!(
"NVMe device \"{}\" does not match \"{}\"",
dev_name, *DEVICE_REGEX,
))
})?
.get(1)
.unwrap()
.as_str();
let pattern = format!("/sys/class/block/nvme{major}c*n1/queue");
let glob = glob(&pattern).unwrap();
let result = glob
.into_iter()
.map(|glob_result| {
match glob_result {
Ok(path) => {
let path_str = path.display();
// If the timeout was higher than nexus's timeout then IOs could
// error out earlier than they should. Therefore we should make sure
// that timeouts in the nexus are set to a very high value.
tracing::debug!("Setting IO timeout on \"{path_str}\" to {io_timeout}s",);
sysfs::write_value(&path, "io_timeout", 1000 * io_timeout).map_err(
|error| {
tracing::error!(%error, path=%path_str, "Failed to set io_timeout to {io_timeout}s");
error.into()
},
)
}
Err(error) => {
// This should never happen as we should always have permissions to list.
tracing::error!(%error, "Unable to collect sysfs for /dev/nvme{major}");
Err(DeviceError::new(error.to_string().as_str()))
}
}
})
.collect::<Result<Vec<()>, DeviceError>>();
match result {
Ok(r) if r.is_empty() => Err(DeviceError::new(&format!(
"look up of sysfs device directory \"{pattern}\" found 0 entries",
))),
Ok(_) => Ok(()),
Err(error) => Err(error),
}
Ok(())
}
}

Expand Down Expand Up @@ -284,10 +295,9 @@ pub(crate) fn check_nvme_tcp_module() -> Result<(), std::io::Error> {
/// (note, this is a system-wide parameter)
pub(crate) fn set_nvmecore_iotimeout(io_timeout_secs: u32) -> Result<(), std::io::Error> {
let path = Path::new("/sys/module/nvme_core/parameters");
debug!(
"Setting nvme_core IO timeout on \"{}\" to {}s",
path.to_string_lossy(),
io_timeout_secs
tracing::debug!(
"Setting nvme_core IO timeout on \"{path}\" to {io_timeout_secs}s",
path = path.to_string_lossy(),
);
sysfs::write_value(path, "io_timeout", io_timeout_secs)?;
Ok(())
Expand Down
Loading