Skip to content

Commit

Permalink
test: test
Browse files Browse the repository at this point in the history
Signed-off-by: Abhinandan Purkait <purkaitabhinandan@gmail.com>
  • Loading branch information
Abhinandan-Purkait committed Dec 3, 2024
1 parent 2c0635c commit 2111c90
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 185 deletions.
75 changes: 5 additions & 70 deletions control-plane/csi-driver/src/bin/controller/server.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,11 @@
use crate::{controller::CsiControllerSvc, identity::CsiIdentitySvc};
use rpc::csi::{controller_server::ControllerServer, identity_server::IdentityServer};

use futures::TryFutureExt;
use std::{
fs,
io::ErrorKind,
ops::Add,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::UnixListener,
};
use tonic::transport::{server::Connected, Server};
use std::{fs, io::ErrorKind, ops::Add};
use tokio::net::UnixListener;
use tonic::{codegen::tokio_stream::wrappers::UnixListenerStream, transport::Server};
use tracing::{debug, error, info};

#[derive(Debug)]
struct UnixStream(tokio::net::UnixStream);

impl Connected for UnixStream {
type ConnectInfo = UdsConnectInfo;

fn connect_info(&self) -> Self::ConnectInfo {
UdsConnectInfo {
peer_addr: self.0.peer_addr().ok().map(Arc::new),
peer_cred: self.0.peer_cred().ok(),
}
}
}

// Not sure why we need the inner fields, probably worth checking if we can remove them.
#[derive(Clone, Debug)]
#[allow(unused)]
struct UdsConnectInfo {
peer_addr: Option<Arc<tokio::net::unix::SocketAddr>>,
peer_cred: Option<tokio::net::unix::UCred>,
}

impl AsyncRead for UnixStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}

impl AsyncWrite for UnixStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_shutdown(cx)
}
}

pub(super) struct CsiServer {}
impl CsiServer {
/// Runs the CSI Server identity and controller services.
Expand All @@ -91,6 +30,7 @@ impl CsiServer {

let incoming = {
let uds = UnixListener::bind(csi_socket)?;
info!("CSI plugin bound to {}", csi_socket);

// Change permissions on CSI socket to allow non-privileged clients to access it
// to simplify testing.
Expand All @@ -103,12 +43,7 @@ impl CsiServer {
debug!("Successfully changed file permissions for CSI socket");
}

async_stream::stream! {
loop {
let item = uds.accept().map_ok(|(st, _)| UnixStream(st)).await;
yield item;
}
}
UnixListenerStream::new(uds)
};

let cfg = crate::CsiControllerConfig::get_config();
Expand Down
35 changes: 22 additions & 13 deletions control-plane/csi-driver/src/bin/node/dev/nvmf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use csi_driver::PublishParams;
use glob::glob;
use nvmeadm::nvmf_subsystem::Subsystem;
use regex::Regex;
use tokio::task::spawn_blocking;
use tracing::warn;
use udev::{Device, Enumerator};
use url::Url;
Expand Down Expand Up @@ -192,12 +193,17 @@ impl Attach for NvmfAttach {
.hostnqn(self.hostnqn.clone())
.keep_alive_tmo(self.keep_alive_tmo)
.build()?;
match ca.connect() {
// Should we remove this arm?
Err(NvmeError::ConnectInProgress) => Ok(()),
Err(err) => Err(err.into()),
Ok(_) => Ok(()),
}

spawn_blocking(move || {
match ca.connect() {
// Should we remove this arm?
Err(NvmeError::ConnectInProgress) => Ok(()),
Err(err) => Err(err.into()),
Ok(_) => Ok(()),
}
})
.await
.map_err(|error| DeviceError::new(&error.to_string()))?
}
Err(err) => Err(err.into()),
}
Expand Down Expand Up @@ -283,14 +289,17 @@ impl NvmfDetach {
#[tonic::async_trait]
impl Detach for NvmfDetach {
async fn detach(&self) -> Result<(), DeviceError> {
if disconnect(&self.nqn)? == 0 {
return Err(DeviceError::from(format!(
let nqn = self.nqn.clone();
spawn_blocking(move || match disconnect(&nqn) {
Ok(0) => Err(DeviceError::from(format!(
"nvmf disconnect {} failed: no device found",
self.nqn
)));
}

Ok(())
nqn
))),
Err(error) => Err(error.into()),
Ok(_) => Ok(()),
})
.await
.map_err(|error| DeviceError::from(error.to_string()))?
}

fn devname(&self) -> DeviceName {
Expand Down
69 changes: 6 additions & 63 deletions control-plane/csi-driver/src/bin/node/main_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,71 +33,12 @@ use std::{
future::Future,
io::ErrorKind,
net::{IpAddr, SocketAddr},
pin::Pin,
str::FromStr,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::UnixListener,
};
use tonic::{
codegen::tokio_stream::wrappers::UnixListenerStream,
transport::{server::Connected, Server},
};
use tokio::net::UnixListener;
use tonic::{codegen::tokio_stream::wrappers::UnixListenerStream, transport::Server};
use tracing::{debug, error, info};

#[derive(Clone, Debug)]
pub struct UdsConnectInfo {
#[allow(dead_code)]
pub peer_addr: Option<Arc<tokio::net::unix::SocketAddr>>,
#[allow(dead_code)]
pub peer_cred: Option<tokio::net::unix::UCred>,
}

#[derive(Debug)]
struct UnixStream(tokio::net::UnixStream);

impl Connected for UnixStream {
type ConnectInfo = UdsConnectInfo;

fn connect_info(&self) -> Self::ConnectInfo {
UdsConnectInfo {
peer_addr: self.0.peer_addr().ok().map(Arc::new),
peer_cred: self.0.peer_cred().ok(),
}
}
}

impl AsyncRead for UnixStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}

impl AsyncWrite for UnixStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_shutdown(cx)
}
}

const GRPC_PORT: u16 = 50051;

pub(super) async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -500,6 +441,7 @@ pub(super) async fn main() -> anyhow::Result<()> {
registration_enabled
)
);
tracing::info!("Exiting main process");
vec![csi, grpc, registration].into_iter().collect()
}

Expand All @@ -518,6 +460,9 @@ impl CsiServer {
)?;

let incoming = {
let uds = UnixListener::bind(csi_socket)?;
info!("CSI plugin bound to {}", csi_socket);

// Change permissions on CSI socket to allow non-privileged clients to access it
// to simplify testing.
if let Err(e) = fs::set_permissions(
Expand All @@ -529,8 +474,6 @@ impl CsiServer {
debug!("Successfully changed file permissions for CSI socket");
}

let uds = UnixListener::bind(csi_socket).unwrap();
info!("CSI plugin bound to {}", csi_socket);
UnixListenerStream::new(uds)
};

Expand Down
6 changes: 5 additions & 1 deletion control-plane/csi-driver/src/bin/node/mount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,14 @@ async fn wait_file_removal(
let check_interval = std::time::Duration::from_millis(200);
let proc_str = proc.to_string_lossy().to_string();
let mut exists = proc.exists();
let mut logged = false;
while start.elapsed() < timeout && exists {
tracing::error!(proc = proc_str, "proc entry still exists");
if !logged {
tracing::error!(proc = proc_str, "proc entry still exists");
}
tokio::time::sleep(check_interval).await;
exists = proc.exists();
logged = true;
}
match exists {
false => Ok(()),
Expand Down
47 changes: 10 additions & 37 deletions control-plane/csi-driver/src/bin/node/mount_utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
pub(crate) mod bin;

use tokio::{
process::Command,
signal::unix::{signal, SignalKind},
};
use tokio::process::Command;
use tonic::Status;

const CSI_NODE_BINARY: &str = "csi-node";
Expand Down Expand Up @@ -111,32 +108,12 @@ impl MayastorMount {
}

async fn execute_command_with_signals(mut command: Command) -> Result<(), Status> {
command
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());

unsafe {
command.pre_exec(|| {
nix::unistd::setsid()?;
Ok(())
});
}

let mut sigterm = signal(SignalKind::terminate()).map_err(|error| {
Status::aborted(format!(
"Failed to setup SIGTERM signal handlers: {}",
error
))
})?;
let mut sigint = signal(SignalKind::interrupt()).map_err(|error| {
Status::aborted(format!("Failed to setup SIGINT signal handlers: {}", error))
})?;

tokio::select! {
result = command.output() => {
let output = result.map_err(|error| Status::aborted(format!("Failed to execute command: {}", error)))?;
let child = command
.spawn()
.map_err(|_| Status::aborted("Failed to spawn"))?;

match child.wait_with_output().await {
Ok(output) => {
if output.status.success() {
return Ok(());
}
Expand All @@ -145,14 +122,10 @@ async fn execute_command_with_signals(mut command: Command) -> Result<(), Status
"Command failed: {}",
String::from_utf8(output.stderr).unwrap()
)))
},
_ = sigint.recv() => {
tracing::error!("Signalled, signal: {:?}", SignalKind::interrupt());
Err(Status::aborted(format!("Signalled, signal: {:?}", SignalKind::interrupt())))
},
_ = sigterm.recv() => {
tracing::error!("Signalled, signal: {:?}", SignalKind::terminate());
Err(Status::aborted(format!("Signalled, signal: {:?}", SignalKind::terminate())))
}
Err(error) => Err(Status::aborted(format!(
"Failed to execute command: {}",
error
))),
}
}
2 changes: 1 addition & 1 deletion control-plane/csi-driver/src/bin/node/nodeplugin_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl NodePluginGrpcServer {
endpoint
);
Server::builder()
.timeout(std::time::Duration::from_secs(10))
.timeout(std::time::Duration::from_secs(30))
.add_service(NodePluginServer::new(NodePluginSvc {}))
.serve_with_shutdown(endpoint, Shutdown::wait())
.await
Expand Down

0 comments on commit 2111c90

Please sign in to comment.