Skip to content

Commit

Permalink
fix details
Browse files Browse the repository at this point in the history
  • Loading branch information
SkyFan2002 committed Apr 18, 2023
1 parent efbaba9 commit 0ca264d
Showing 1 changed file with 124 additions and 62 deletions.
186 changes: 124 additions & 62 deletions core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

use crate::ops::{OpList, OpRead, OpScan, OpWrite};
use crate::raw::oio::Entry;
use crate::raw::AccessorInfo;
use crate::raw::{oio, Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite};
use crate::types::Error;
use crate::types::ErrorKind;
use async_trait::async_trait;
use bytes::Bytes;
use madsim::net::Endpoint;
Expand All @@ -31,53 +34,111 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::task::{Context, Poll};

/// Add deterministic simulation for async operations, powered by [`madsim`](https://docs.rs/madsim/latest/madsim/).
///
/// # Note
///
/// - blocking operations are not supported, as [`madsim`](https://docs.rs/madsim/latest/madsim/) is async only.
///
///
/// # Examples
///
/// ```
/// use opendal::Operator;
/// use opendal::services;
/// use opendal::layers::MadsimLayer;
/// use opendal::layers::SimServer;
/// use madsim::{net::NetSim, runtime::Handle, time::sleep};
/// use std::time::Duration;
///
/// #[cfg(madsim)]
/// #[madsim::test]
/// async fn deterministic_simulation_test(){
/// let handle = Handle::current();
/// let ip1 = "10.0.0.1".parse().unwrap();
/// let ip2 = "10.0.0.2".parse().unwrap();
/// let sim_server_socket = "10.0.0.1:2379".parse().unwrap();
/// let server = handle.create_node().name("server").ip(ip1).build();
/// let client = handle.create_node().name("client").ip(ip2).build();
///
/// server.spawn(async move {
/// SimServer::serve(sim_server_socket).await.unwrap();
/// });
/// sleep(Duration::from_secs(1)).await;
///
/// let handle = client.spawn(async move {
/// let mut builder = services::Fs::default();
/// builder.root(".");
/// let op = Operator::new(builder)
/// .unwrap()
/// .layer(MadsimLayer::new(sim_server_socket))
/// .finish();
///
/// let path = "hello.txt";
/// let data = "Hello, World!";
/// op.write(path, data).await.unwrap();
/// assert_eq!(data.as_bytes(), op.read(path).await.unwrap());
/// });
/// handle.await.unwrap();
/// }
/// ```
/// To enable logging output, please set `RUSTFLAGS="--cfg madsim"`:
/// ```shell
/// RUSTFLAGS="--cfg madsim" cargo test
/// ```
#[derive(Debug, Copy, Clone)]
pub struct MadsimLayer {
sim_server_socket: SocketAddr,
addr: SocketAddr,
}

impl MadsimLayer {
pub fn new(sim_server_socket: SocketAddr) -> Self {
Self { sim_server_socket }
pub fn new(endpoint: &str) -> Self {
Self {
addr: endpoint.parse().unwrap(),
}
}
}

impl<A: Accessor> Layer<A> for MadsimLayer {
type LayeredAccessor = MadsimAccessor;

fn layer(&self, _inner: A) -> Self::LayeredAccessor {
MadsimAccessor {
sim_server_socket: self.sim_server_socket,
}
fn layer(&self, _: A) -> Self::LayeredAccessor {
MadsimAccessor { addr: self.addr }
}
}

#[derive(Debug)]
pub struct MadsimAccessor {
sim_server_socket: SocketAddr,
addr: SocketAddr,
}

#[async_trait]
impl LayeredAccessor for MadsimAccessor {
type Inner = ();
type Reader = MadsimReader;
type BlockingReader = MadsimReader;
type BlockingReader = ();
type Writer = MadsimWriter;
type BlockingWriter = MadsimWriter;
type BlockingWriter = ();
type Pager = MadsimPager;
type BlockingPager = MadsimPager;
type BlockingPager = ();

fn inner(&self) -> &Self::Inner {
&()
}

fn metadata(&self) -> AccessorInfo {
let mut info = AccessorInfo::default();
info.set_name("madsim");
info
}

async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> {
let req = Request::Read(path.to_string(), args);
let ep = Endpoint::connect(self.sim_server_socket)
let ep = Endpoint::connect(self.addr)
.await
.expect("fail to connect to sim server");
let (tx, mut rx) = ep
.connect1(self.sim_server_socket)
.connect1(self.addr)
.await
.expect("fail to connect1 to sim server");
tx.send(Box::new(req))
Expand All @@ -103,49 +164,67 @@ impl LayeredAccessor for MadsimAccessor {
MadsimWriter {
path: path.to_string(),
args,
sim_server_socket: self.sim_server_socket,
sim_server_socket: self.addr,
},
))
}

async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Pager)> {
todo!()
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
))
}

async fn scan(&self, path: &str, args: OpScan) -> crate::Result<(RpScan, Self::Pager)> {
todo!()
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
))
}

fn blocking_read(
&self,
path: &str,
args: OpRead,
) -> crate::Result<(RpRead, Self::BlockingReader)> {
panic!("blocking_read is not supported in MadsimLayer");
Err(Error::new(
ErrorKind::Unsupported,
"will not be supported in MadsimLayer",
))
}

fn blocking_write(
&self,
path: &str,
args: OpWrite,
) -> crate::Result<(RpWrite, Self::BlockingWriter)> {
panic!("blocking_write is not supported in MadsimLayer");
Err(Error::new(
ErrorKind::Unsupported,
"will not be supported in MadsimLayer",
))
}

fn blocking_list(
&self,
path: &str,
args: OpList,
) -> crate::Result<(RpList, Self::BlockingPager)> {
panic!("blocking_list is not supported in MadsimLayer");
Err(Error::new(
ErrorKind::Unsupported,
"will not be supported in MadsimLayer",
))
}

fn blocking_scan(
&self,
path: &str,
args: OpScan,
) -> crate::Result<(RpScan, Self::BlockingPager)> {
panic!("blocking_scan is not supported in MadsimLayer");
Err(Error::new(
ErrorKind::Unsupported,
"will not be supported in MadsimLayer",
))
}
}

Expand All @@ -165,25 +244,17 @@ impl oio::Read for MadsimReader {
}

fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<crate::Result<u64>> {
todo!()
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
)))
}

fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
todo!()
}
}

impl oio::BlockingRead for MadsimReader {
fn read(&mut self, buf: &mut [u8]) -> crate::Result<usize> {
panic!("blocking_read is not supported in MadsimLayer");
}

fn seek(&mut self, pos: SeekFrom) -> crate::Result<u64> {
panic!("blocking_read is not supported in MadsimLayer");
}

fn next(&mut self) -> Option<crate::Result<Bytes>> {
panic!("blocking_read is not supported in MadsimLayer");
Poll::Ready(Some(Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
))))
}
}

Expand All @@ -193,20 +264,6 @@ pub struct MadsimWriter {
sim_server_socket: SocketAddr,
}

impl oio::BlockingWrite for MadsimWriter {
fn write(&mut self, bs: Bytes) -> crate::Result<()> {
panic!("blocking_write is not supported in MadsimLayer");
}

fn append(&mut self, bs: Bytes) -> crate::Result<()> {
panic!("blocking_write is not supported in MadsimLayer");
}

fn close(&mut self) -> crate::Result<()> {
panic!("blocking_write is not supported in MadsimLayer");
}
}

#[async_trait]
impl oio::Write for MadsimWriter {
async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
Expand All @@ -228,11 +285,17 @@ impl oio::Write for MadsimWriter {
}

async fn append(&mut self, bs: Bytes) -> crate::Result<()> {
todo!()
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
))
}

async fn abort(&mut self) -> crate::Result<()> {
todo!()
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
))
}

async fn close(&mut self) -> crate::Result<()> {
Expand All @@ -245,17 +308,14 @@ pub struct MadsimPager {}
#[async_trait]
impl oio::Page for MadsimPager {
async fn next(&mut self) -> crate::Result<Option<Vec<Entry>>> {
todo!()
}
}

impl oio::BlockingPage for MadsimPager {
fn next(&mut self) -> crate::Result<Option<Vec<Entry>>> {
panic!("blocking_page is not supported in MadsimLayer");
Err(Error::new(
ErrorKind::Unsupported,
"will be supported in the future",
))
}
}

/// A simulated server.
/// A simulated server.This an experimental feature, docs are not ready yet.
#[derive(Default, Clone)]
pub struct SimServer;

Expand Down Expand Up @@ -331,12 +391,14 @@ mod test {
let handle = Handle::current();
let ip1 = "10.0.0.1".parse().unwrap();
let ip2 = "10.0.0.2".parse().unwrap();
let sim_server_socket = "10.0.0.1:2379".parse().unwrap();
let sim_server_socket = "10.0.0.1:2379";
let server = handle.create_node().name("server").ip(ip1).build();
let client = handle.create_node().name("client").ip(ip2).build();

server.spawn(async move {
SimServer::serve(sim_server_socket).await.unwrap();
SimServer::serve(sim_server_socket.parse().unwrap())
.await
.unwrap();
});
sleep(Duration::from_secs(1)).await;

Expand Down

0 comments on commit 0ca264d

Please sign in to comment.