From 8a1809cb09c1fc59052f363731bdc94bc978c015 Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Mon, 10 Apr 2023 19:13:40 +0800 Subject: [PATCH 01/18] fix typo --- core/src/docs/concepts.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/docs/concepts.rs b/core/src/docs/concepts.rs index 5e87a98855b..aa01e6e93dd 100644 --- a/core/src/docs/concepts.rs +++ b/core/src/docs/concepts.rs @@ -19,7 +19,7 @@ //! //! OpenDAL provides a unified abstraction for all storage services. //! -//! There are three core concepts in OpenDAL: +//! There are two core concepts in OpenDAL: //! //! - [`Builder`]: Build an instance of underlying services. //! - [`Operator`]: A bridge between underlying implementation detail and unified abstraction. From af806f6f3d6270e794845d299ba5adc0b7c1ff4e Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Thu, 13 Apr 2023 17:31:37 +0800 Subject: [PATCH 02/18] , --- core/Cargo.toml | 1 + core/src/layers/mod.rs | 14 ++ core/src/layers/simulation/mock.rs | 242 +++++++++++++++++++++++++++++ core/src/layers/simulation/mod.rs | 47 ++++++ core/src/layers/simulation/real.rs | 82 ++++++++++ 5 files changed, 386 insertions(+) create mode 100644 core/src/layers/simulation/mock.rs create mode 100644 core/src/layers/simulation/mod.rs create mode 100644 core/src/layers/simulation/real.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index 17b49a542e2..71ed20f8029 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -104,6 +104,7 @@ http = "0.2.5" hyper = "0.14" lazy-regex = { version = "2.5.0", optional = true } log = "0.4" +madsim = "0.2.19" md-5 = "0.10" metrics = { version = "0.20", optional = true } moka = { version = "0.10", optional = true, features = ["future"] } diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 26a949dcc05..76f10428a94 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -18,37 +18,51 @@ //! `Layer` is the mechanism to intercept operations. mod concurrent_limit; + pub use concurrent_limit::ConcurrentLimitLayer; mod immutable_index; + pub use immutable_index::ImmutableIndexLayer; mod logging; + pub use logging::LoggingLayer; #[cfg(feature = "layers-chaos")] mod chaos; + #[cfg(feature = "layers-chaos")] pub use chaos::ChaosLayer; #[cfg(feature = "layers-metrics")] mod metrics; + #[cfg(feature = "layers-metrics")] pub use self::metrics::MetricsLayer; mod retry; + pub use self::retry::RetryLayer; #[cfg(feature = "layers-tracing")] mod tracing; + #[cfg(feature = "layers-tracing")] pub use self::tracing::TracingLayer; mod type_eraser; + pub(crate) use type_eraser::TypeEraseLayer; mod error_context; + pub(crate) use error_context::ErrorContextLayer; mod complete; + pub(crate) use complete::CompleteLayer; + +mod simulation; + +pub use simulation::MadsimLayer; diff --git a/core/src/layers/simulation/mock.rs b/core/src/layers/simulation/mock.rs new file mode 100644 index 00000000000..67253bad41e --- /dev/null +++ b/core/src/layers/simulation/mock.rs @@ -0,0 +1,242 @@ +use std::collections::HashMap; +use crate::ops::{OpList, OpRead, OpScan, OpWrite}; +use crate::raw::oio::Entry; +use crate::raw::{oio, Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite}; +use async_trait::async_trait; +use bytes::Bytes; +use std::fmt::Debug; +use std::io::SeekFrom; +use std::task::{Context, Poll}; +use madsim::net::Endpoint; +use std::io::Result; +use std::net::SocketAddr; +use std::sync::Arc; + +const SIM_SERVER_ADDR: &str = "10.0.0.1:2379"; + +#[derive(Debug, Copy, Clone, Default)] +pub struct MadsimLayer; + +impl Layer for MadsimLayer { + type LayeredAccessor = MadsimAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccessor { + let addr = SIM_SERVER_ADDR.parse().unwrap(); + + let runtime = madsim::runtime::Runtime::new(); + std::thread::spawn(move || { + runtime.block_on(async { + SimServer::serve(addr).await.unwrap(); + Endpoint::connect(addr).await.unwrap(); + }) + }); + MadsimAccessor { inner } + } +} + +#[derive(Debug)] +pub struct MadsimAccessor { + inner: A, +} + +#[async_trait] +impl LayeredAccessor for MadsimAccessor { + type Inner = A; + type Reader = MadsimReader; + type BlockingReader = MadsimReader; + type Writer = MadsimWriter; + type BlockingWriter = MadsimWriter; + type Pager = MadsimPager; + type BlockingPager = MadsimPager; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + todo!() + } + + async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + todo!() + } + + async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Pager)> { + todo!() + } + + async fn scan(&self, path: &str, args: OpScan) -> crate::Result<(RpScan, Self::Pager)> { + todo!() + } + + fn blocking_read( + &self, + path: &str, + args: OpRead, + ) -> crate::Result<(RpRead, Self::BlockingReader)> { + todo!() + } + + fn blocking_write( + &self, + path: &str, + args: OpWrite, + ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { + todo!() + } + + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> crate::Result<(RpList, Self::BlockingPager)> { + todo!() + } + + fn blocking_scan( + &self, + path: &str, + args: OpScan, + ) -> crate::Result<(RpScan, Self::BlockingPager)> { + todo!() + } +} + +pub struct MadsimReader { + inner: R, +} + +impl oio::Read for MadsimReader { + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + todo!() + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { + todo!() + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + todo!() + } +} + +impl oio::BlockingRead for MadsimReader { + fn read(&mut self, buf: &mut [u8]) -> crate::Result { + todo!() + } + + fn seek(&mut self, pos: SeekFrom) -> crate::Result { + todo!() + } + + fn next(&mut self) -> Option> { + todo!() + } +} + +pub struct MadsimWriter { + inner: W, +} + +impl oio::BlockingWrite for MadsimWriter { + fn write(&mut self, bs: Bytes) -> crate::Result<()> { + todo!() + } + + fn append(&mut self, bs: Bytes) -> crate::Result<()> { + todo!() + } + + fn close(&mut self) -> crate::Result<()> { + todo!() + } +} + +#[async_trait] +impl oio::Write for MadsimWriter { + async fn write(&mut self, bs: Bytes) -> crate::Result<()> { + todo!() + } + + async fn append(&mut self, bs: Bytes) -> crate::Result<()> { + todo!() + } + + async fn close(&mut self) -> crate::Result<()> { + todo!() + } +} + +pub struct MadsimPager

{ + inner: P, +} + +#[async_trait] +impl oio::Page for MadsimPager

{ + async fn next(&mut self) -> crate::Result>> { + todo!() + } +} + +impl oio::BlockingPage for MadsimPager

{ + fn next(&mut self) -> crate::Result>> { + todo!() + } +} + + +/// A simulated server. +#[derive(Default, Clone)] +pub struct SimServer; + +impl SimServer { + pub async fn serve(addr: SocketAddr) -> Result<()> { + let ep = Endpoint::bind(addr).await?; + let service = Arc::new(SimService::default()); + loop { + let (tx, mut rx, _) = ep.accept1().await?; + let service = service.clone(); + madsim::task::spawn(async move { + let request = *rx.recv().await?.downcast::().unwrap(); + let response = match request { + Request::Read(path, args) => Box::new(SimServerResponse::Read(service.read(&path, args).await)), + Request::Write(path, args) => Box::new(SimServerResponse::Write(service.write(&path, args).await)), + }; + tx.send(response).await?; + Ok(()) as Result<()> + }); + } + } +} + +enum Request { + Read(String, OpRead), + Write(String, OpWrite), +} + + +#[derive(Default)] +pub struct SimService { + data: HashMap>, +} + +impl SimService { + async fn read(&self, path: &str, args: OpRead) -> ReadResponse { + todo!() + } + + async fn write(&self, path: &str, args: OpWrite) -> WriteResponse { + todo!() + } +} + +struct ReadResponse { + data: Option>, +} + +struct WriteResponse {} + +enum SimServerResponse { + Read(ReadResponse), + Write(WriteResponse), +} \ No newline at end of file diff --git a/core/src/layers/simulation/mod.rs b/core/src/layers/simulation/mod.rs new file mode 100644 index 00000000000..b5bb0482ee2 --- /dev/null +++ b/core/src/layers/simulation/mod.rs @@ -0,0 +1,47 @@ +#[cfg(madsim)] +mod mock; +#[cfg(not(madsim))] +mod real; + +#[cfg(madsim)] +pub use mock::MadsimLayer; +#[cfg(not(madsim))] +pub use real::MadsimLayer; + +#[cfg(test)] +mod test { + use super::*; + use crate::{services, EntryMode, Operator}; + + #[tokio::test] + async fn test_madsim_layer() { + let mut builder = services::Fs::default(); + builder.root("."); + + // Init an operator + let op = Operator::new(builder) + .unwrap() + // Init with logging layer enabled. + .layer(MadsimLayer::default()) + .finish(); + + let path = "hello.txt"; + let data = "Hello, World!"; + + // Write data + op.write("hello.txt", "Hello, World!").await.unwrap(); + + // Read data + let bs = op.read(path).await.unwrap(); + assert_eq!(bs, data.as_bytes()); + + // Fetch metadata + let meta = op.stat("hello.txt").await.unwrap(); + let mode = meta.mode(); + let length = meta.content_length(); + assert_eq!(mode, EntryMode::FILE); + assert_eq!(length, data.len() as u64); + // Delete + op.delete("hello.txt").await.unwrap(); + } +} diff --git a/core/src/layers/simulation/real.rs b/core/src/layers/simulation/real.rs new file mode 100644 index 00000000000..14d14b93845 --- /dev/null +++ b/core/src/layers/simulation/real.rs @@ -0,0 +1,82 @@ +use crate::ops::{OpList, OpRead, OpScan, OpWrite}; +use crate::raw::{Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite}; +use async_trait::async_trait; + +#[derive(Debug, Copy, Clone, Default)] +pub struct MadsimLayer; + +impl Layer for MadsimLayer { + type LayeredAccessor = MadsimAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccessor { + MadsimAccessor { inner } + } +} + +#[derive(Debug)] +pub struct MadsimAccessor { + inner: A, +} + +#[async_trait] +impl LayeredAccessor for MadsimAccessor { + type Inner = A; + type Reader = A::Reader; + type BlockingReader = A::BlockingReader; + type Writer = A::Writer; + type BlockingWriter = A::BlockingWriter; + type Pager = A::Pager; + type BlockingPager = A::BlockingPager; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + self.inner.read(path, args).await + } + + async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + self.inner.write(path, args).await + } + + async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Pager)> { + self.inner.list(path, args).await + } + + async fn scan(&self, path: &str, args: OpScan) -> crate::Result<(RpScan, Self::Pager)> { + self.inner.scan(path, args).await + } + + fn blocking_read( + &self, + path: &str, + args: OpRead, + ) -> crate::Result<(RpRead, Self::BlockingReader)> { + self.inner.blocking_read(path, args) + } + + fn blocking_write( + &self, + path: &str, + args: OpWrite, + ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { + self.inner.blocking_write(path, args) + } + + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> crate::Result<(RpList, Self::BlockingPager)> { + self.inner.blocking_list(path, args) + } + + fn blocking_scan( + &self, + path: &str, + args: OpScan, + ) -> crate::Result<(RpScan, Self::BlockingPager)> { + self.inner.blocking_scan(path, args) + } +} From ae80fa0cdc3a800254fa8710d17327ae87cab69a Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Thu, 13 Apr 2023 23:32:33 +0800 Subject: [PATCH 03/18] archive --- core/src/layers/simulation/mock.rs | 37 +++++++++++++++++++----------- core/src/layers/simulation/mod.rs | 27 +++++++++++----------- 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/core/src/layers/simulation/mock.rs b/core/src/layers/simulation/mock.rs index 67253bad41e..134c9ed7023 100644 --- a/core/src/layers/simulation/mock.rs +++ b/core/src/layers/simulation/mock.rs @@ -4,7 +4,7 @@ use crate::raw::oio::Entry; use crate::raw::{oio, Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite}; use async_trait::async_trait; use bytes::Bytes; -use std::fmt::Debug; +use std::fmt::{Debug, Formatter}; use std::io::SeekFrom; use std::task::{Context, Poll}; use madsim::net::Endpoint; @@ -12,7 +12,8 @@ use std::io::Result; use std::net::SocketAddr; use std::sync::Arc; -const SIM_SERVER_ADDR: &str = "10.0.0.1:2379"; +const SIM_SERVER_ADDR: &str = "10.0.0.1:1"; +const SIM_CLIENT_ADDR: &str = "10.0.0.2:1"; #[derive(Debug, Copy, Clone, Default)] pub struct MadsimLayer; @@ -21,22 +22,27 @@ impl Layer for MadsimLayer { type LayeredAccessor = MadsimAccessor; fn layer(&self, inner: A) -> Self::LayeredAccessor { - let addr = SIM_SERVER_ADDR.parse().unwrap(); - let runtime = madsim::runtime::Runtime::new(); - std::thread::spawn(move || { - runtime.block_on(async { - SimServer::serve(addr).await.unwrap(); - Endpoint::connect(addr).await.unwrap(); - }) - }); - MadsimAccessor { inner } + let sim_server_adder = "10.0.0.1:1".parse::().unwrap(); + let sim_client_addr = "10.0.0.2:1".parse::().unwrap(); + let ep = runtime.block_on(async { + madsim::task::spawn(SimServer::serve(sim_server_adder)); + Endpoint::bind(sim_client_addr).await + } + ).unwrap(); + MadsimAccessor { inner, ep } } } -#[derive(Debug)] pub struct MadsimAccessor { inner: A, + ep: Endpoint, +} + +impl Debug for MadsimAccessor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } } #[async_trait] @@ -58,7 +64,12 @@ impl LayeredAccessor for MadsimAccessor { } async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { - todo!() + let req = Request::Write(path.to_string(), args); + let sim_server_addr = SIM_SERVER_ADDR.parse::().unwrap(); + let (tx, mut rx) = self.ep.connect1(sim_server_addr).await.unwrap(); + tx.send(Box::new(req)).await.unwrap(); + let resp = *rx.recv().await.unwrap().downcast().unwrap(); + Ok(resp) } async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Pager)> { diff --git a/core/src/layers/simulation/mod.rs b/core/src/layers/simulation/mod.rs index b5bb0482ee2..fe2402ffd19 100644 --- a/core/src/layers/simulation/mod.rs +++ b/core/src/layers/simulation/mod.rs @@ -13,7 +13,7 @@ mod test { use super::*; use crate::{services, EntryMode, Operator}; - #[tokio::test] + #[madsim::test] async fn test_madsim_layer() { let mut builder = services::Fs::default(); builder.root("."); @@ -30,18 +30,17 @@ mod test { // Write data op.write("hello.txt", "Hello, World!").await.unwrap(); - - // Read data - let bs = op.read(path).await.unwrap(); - assert_eq!(bs, data.as_bytes()); - - // Fetch metadata - let meta = op.stat("hello.txt").await.unwrap(); - let mode = meta.mode(); - let length = meta.content_length(); - assert_eq!(mode, EntryMode::FILE); - assert_eq!(length, data.len() as u64); - // Delete - op.delete("hello.txt").await.unwrap(); } + // // Read data + // let bs = op.read(path).await.unwrap(); + // assert_eq!(bs, data.as_bytes()); + // + // // Fetch metadata + // let meta = op.stat("hello.txt").await.unwrap(); + // let mode = meta.mode(); + // let length = meta.content_length(); + // assert_eq!(mode, EntryMode::FILE); + // assert_eq!(length, data.len() as u64); + // // Delete + // op.delete("hello.txt").await.unwrap(); } From b87cb065c9d1d27d32d96adc14d6c0ef2e8c5d58 Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Sat, 15 Apr 2023 20:43:12 +0800 Subject: [PATCH 04/18] skeleton --- core/src/layers/mod.rs | 2 + core/src/layers/simulation/mock.rs | 187 +++++++++++++++++------------ core/src/layers/simulation/mod.rs | 53 ++++---- 3 files changed, 141 insertions(+), 101 deletions(-) diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 76f10428a94..3110b889461 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -66,3 +66,5 @@ pub(crate) use complete::CompleteLayer; mod simulation; pub use simulation::MadsimLayer; +#[cfg(madsim)] +pub use simulation::SimServer; diff --git a/core/src/layers/simulation/mock.rs b/core/src/layers/simulation/mock.rs index 134c9ed7023..8b379b104f3 100644 --- a/core/src/layers/simulation/mock.rs +++ b/core/src/layers/simulation/mock.rs @@ -1,75 +1,83 @@ -use std::collections::HashMap; use crate::ops::{OpList, OpRead, OpScan, OpWrite}; use crate::raw::oio::Entry; use crate::raw::{oio, Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite}; +use crate::EntryMode; use async_trait::async_trait; use bytes::Bytes; -use std::fmt::{Debug, Formatter}; -use std::io::SeekFrom; -use std::task::{Context, Poll}; use madsim::net::Endpoint; +use madsim::net::Payload; +use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; use std::io::Result; +use std::io::SeekFrom; use std::net::SocketAddr; use std::sync::Arc; +use std::sync::Mutex; +use std::task::{Context, Poll}; -const SIM_SERVER_ADDR: &str = "10.0.0.1:1"; -const SIM_CLIENT_ADDR: &str = "10.0.0.2:1"; +#[derive(Debug, Copy, Clone)] +pub struct MadsimLayer { + sim_server_socket: SocketAddr, +} -#[derive(Debug, Copy, Clone, Default)] -pub struct MadsimLayer; +impl MadsimLayer { + pub fn new(sim_server_socket: SocketAddr) -> Self { + Self { sim_server_socket } + } +} impl Layer for MadsimLayer { - type LayeredAccessor = MadsimAccessor; - - fn layer(&self, inner: A) -> Self::LayeredAccessor { - let runtime = madsim::runtime::Runtime::new(); - let sim_server_adder = "10.0.0.1:1".parse::().unwrap(); - let sim_client_addr = "10.0.0.2:1".parse::().unwrap(); - let ep = runtime.block_on(async { - madsim::task::spawn(SimServer::serve(sim_server_adder)); - Endpoint::bind(sim_client_addr).await + type LayeredAccessor = MadsimAccessor; + + fn layer(&self, _inner: A) -> Self::LayeredAccessor { + MadsimAccessor { + sim_server_socket: self.sim_server_socket, } - ).unwrap(); - MadsimAccessor { inner, ep } } } -pub struct MadsimAccessor { - inner: A, - ep: Endpoint, -} - -impl Debug for MadsimAccessor { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - todo!() - } +#[derive(Debug)] +pub struct MadsimAccessor { + sim_server_socket: SocketAddr, } #[async_trait] -impl LayeredAccessor for MadsimAccessor { - type Inner = A; - type Reader = MadsimReader; - type BlockingReader = MadsimReader; - type Writer = MadsimWriter; - type BlockingWriter = MadsimWriter; - type Pager = MadsimPager; - type BlockingPager = MadsimPager; +impl LayeredAccessor for MadsimAccessor { + type Inner = (); + type Reader = MadsimReader; + type BlockingReader = MadsimReader; + type Writer = MadsimWriter; + type BlockingWriter = MadsimWriter; + type Pager = MadsimPager; + type BlockingPager = MadsimPager; fn inner(&self) -> &Self::Inner { - &self.inner + &() } async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { - todo!() + let req = Request::Read(path.to_string(), args); + let ep = Endpoint::connect(self.sim_server_socket).await.expect("fail to connect to sim server"); + let (tx, mut rx) = ep.connect1(self.sim_server_socket).await.expect("fail to connect1 to sim server"); + tx.send(Box::new(req)).await.expect("fail to send request to sim server"); + let resp = rx.recv().await.expect("fail to recv response from sim server"); + let resp = resp.downcast::().expect("fail to downcast response to ReadResponse"); + let content_length = resp.data.as_ref().map(|b| b.len()).unwrap_or(0); + Ok(( + RpRead::new(content_length as u64), + MadsimReader { data: resp.data }, + )) } async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { - let req = Request::Write(path.to_string(), args); - let sim_server_addr = SIM_SERVER_ADDR.parse::().unwrap(); - let (tx, mut rx) = self.ep.connect1(sim_server_addr).await.unwrap(); - tx.send(Box::new(req)).await.unwrap(); - let resp = *rx.recv().await.unwrap().downcast().unwrap(); - Ok(resp) + Ok(( + RpWrite::default(), + MadsimWriter { + path: path.to_string(), + args, + sim_server_socket: self.sim_server_socket, + }, + )) } async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Pager)> { @@ -113,13 +121,19 @@ impl LayeredAccessor for MadsimAccessor { } } -pub struct MadsimReader { - inner: R, +pub struct MadsimReader { + data: Option, } -impl oio::Read for MadsimReader { - fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - todo!() +impl oio::Read for MadsimReader { + fn poll_read(&mut self, _cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + if let Some(ref data) = self.data { + let len = data.len(); + buf[..len].copy_from_slice(&data); + Poll::Ready(Ok(len)) + } else { + Poll::Ready(Ok(0)) + } } fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { @@ -131,7 +145,7 @@ impl oio::Read for MadsimReader { } } -impl oio::BlockingRead for MadsimReader { +impl oio::BlockingRead for MadsimReader { fn read(&mut self, buf: &mut [u8]) -> crate::Result { todo!() } @@ -145,11 +159,13 @@ impl oio::BlockingRead for MadsimReader { } } -pub struct MadsimWriter { - inner: W, +pub struct MadsimWriter { + path: String, + args: OpWrite, + sim_server_socket: SocketAddr, } -impl oio::BlockingWrite for MadsimWriter { +impl oio::BlockingWrite for MadsimWriter { fn write(&mut self, bs: Bytes) -> crate::Result<()> { todo!() } @@ -164,9 +180,14 @@ impl oio::BlockingWrite for MadsimWriter { } #[async_trait] -impl oio::Write for MadsimWriter { +impl oio::Write for MadsimWriter { async fn write(&mut self, bs: Bytes) -> crate::Result<()> { - todo!() + let req = Request::Write(self.path.to_string(), bs); + let ep = Endpoint::connect(self.sim_server_socket).await.expect("fail to connect to sim server"); + let (tx, mut rx) = ep.connect1(self.sim_server_socket).await.expect("fail to connect1 to sim server"); + tx.send(Box::new(req)).await.expect("fail to send request to sim server"); + rx.recv().await.expect("fail to recv response from sim server"); + Ok(()) } async fn append(&mut self, bs: Bytes) -> crate::Result<()> { @@ -174,28 +195,25 @@ impl oio::Write for MadsimWriter { } async fn close(&mut self) -> crate::Result<()> { - todo!() + Ok(()) } } -pub struct MadsimPager

{ - inner: P, -} +pub struct MadsimPager {} #[async_trait] -impl oio::Page for MadsimPager

{ +impl oio::Page for MadsimPager { async fn next(&mut self) -> crate::Result>> { todo!() } } -impl oio::BlockingPage for MadsimPager

{ +impl oio::BlockingPage for MadsimPager { fn next(&mut self) -> crate::Result>> { todo!() } } - /// A simulated server. #[derive(Default, Clone)] pub struct SimServer; @@ -208,10 +226,14 @@ impl SimServer { let (tx, mut rx, _) = ep.accept1().await?; let service = service.clone(); madsim::task::spawn(async move { - let request = *rx.recv().await?.downcast::().unwrap(); + let request = *rx.recv().await?.downcast::().expect("invalid request"); let response = match request { - Request::Read(path, args) => Box::new(SimServerResponse::Read(service.read(&path, args).await)), - Request::Write(path, args) => Box::new(SimServerResponse::Write(service.write(&path, args).await)), + Request::Read(path, args) => { + Box::new(service.read(&path, args).await) as Payload + } + Request::Write(path, args) => { + Box::new(service.write(&path, args).await) as Payload + } }; tx.send(response).await?; Ok(()) as Result<()> @@ -220,34 +242,45 @@ impl SimServer { } } +pub struct SimClient { + ep: Endpoint, +} + +impl SimClient { + pub async fn connect(addr: SocketAddr) -> Result { + let ep = Endpoint::bind(addr).await?; + Ok(Self { ep }) + } +} + enum Request { Read(String, OpRead), - Write(String, OpWrite), + Write(String, Bytes), } - #[derive(Default)] pub struct SimService { - data: HashMap>, + inner: Mutex>, } impl SimService { async fn read(&self, path: &str, args: OpRead) -> ReadResponse { - todo!() + let mut inner = self.inner.lock().unwrap(); + let data = inner.get(path); + ReadResponse { + data: data.cloned(), + } } - async fn write(&self, path: &str, args: OpWrite) -> WriteResponse { - todo!() + async fn write(&self, path: &str, data: Bytes) -> WriteResponse { + let mut inner = self.inner.lock().unwrap(); + inner.insert(path.to_string(), data); + WriteResponse {} } } struct ReadResponse { - data: Option>, + data: Option, } struct WriteResponse {} - -enum SimServerResponse { - Read(ReadResponse), - Write(WriteResponse), -} \ No newline at end of file diff --git a/core/src/layers/simulation/mod.rs b/core/src/layers/simulation/mod.rs index fe2402ffd19..70666e1188a 100644 --- a/core/src/layers/simulation/mod.rs +++ b/core/src/layers/simulation/mod.rs @@ -5,42 +5,47 @@ mod real; #[cfg(madsim)] pub use mock::MadsimLayer; +#[cfg(madsim)] +pub use mock::SimServer; #[cfg(not(madsim))] pub use real::MadsimLayer; +#[cfg(madsim)] #[cfg(test)] mod test { use super::*; use crate::{services, EntryMode, Operator}; + use madsim::{net::NetSim, runtime::Handle, time::sleep}; + use std::time::Duration; + // RUSTFLAGS="--cfg madsim" cargo test layers::simulation::test::test_madsim_layer #[madsim::test] async fn test_madsim_layer() { - let mut builder = services::Fs::default(); - builder.root("."); + let handle = Handle::current(); + let ip1 = "10.0.0.1".parse().unwrap(); + let ip2 = "10.0.0.2".parse().unwrap(); + let sim_server_socket = "10.0.0.1:2379".parse().unwrap(); + let server = handle.create_node().name("server").ip(ip1).build(); + let client = handle.create_node().name("client").ip(ip2).build(); - // Init an operator - let op = Operator::new(builder) - .unwrap() - // Init with logging layer enabled. - .layer(MadsimLayer::default()) - .finish(); + server.spawn(async move { + SimServer::serve(sim_server_socket).await.unwrap(); + }); + sleep(Duration::from_secs(1)).await; - let path = "hello.txt"; - let data = "Hello, World!"; + let handle = client.spawn(async move { + let mut builder = services::Fs::default(); + builder.root("."); + let op = Operator::new(builder) + .unwrap() + .layer(MadsimLayer::new(sim_server_socket)) + .finish(); - // Write data - op.write("hello.txt", "Hello, World!").await.unwrap(); + let path = "hello.txt"; + let data = "Hello, World!"; + op.write(path, data).await.unwrap(); + assert_eq!(data.as_bytes(), op.read(path).await.unwrap()); + }); + handle.await.unwrap(); } - // // Read data - // let bs = op.read(path).await.unwrap(); - // assert_eq!(bs, data.as_bytes()); - // - // // Fetch metadata - // let meta = op.stat("hello.txt").await.unwrap(); - // let mode = meta.mode(); - // let length = meta.content_length(); - // assert_eq!(mode, EntryMode::FILE); - // assert_eq!(length, data.len() as u64); - // // Delete - // op.delete("hello.txt").await.unwrap(); } From 9c136faa92f351f0352f7befe18ed7521901ba3b Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Sat, 15 Apr 2023 20:50:10 +0800 Subject: [PATCH 05/18] fmt --- core/src/layers/simulation/mock.rs | 60 ++++++++++++++++++------------ 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/core/src/layers/simulation/mock.rs b/core/src/layers/simulation/mock.rs index 8b379b104f3..ed304a36045 100644 --- a/core/src/layers/simulation/mock.rs +++ b/core/src/layers/simulation/mock.rs @@ -57,11 +57,23 @@ impl LayeredAccessor for MadsimAccessor { async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { let req = Request::Read(path.to_string(), args); - let ep = Endpoint::connect(self.sim_server_socket).await.expect("fail to connect to sim server"); - let (tx, mut rx) = ep.connect1(self.sim_server_socket).await.expect("fail to connect1 to sim server"); - tx.send(Box::new(req)).await.expect("fail to send request to sim server"); - let resp = rx.recv().await.expect("fail to recv response from sim server"); - let resp = resp.downcast::().expect("fail to downcast response to ReadResponse"); + let ep = Endpoint::connect(self.sim_server_socket) + .await + .expect("fail to connect to sim server"); + let (tx, mut rx) = ep + .connect1(self.sim_server_socket) + .await + .expect("fail to connect1 to sim server"); + tx.send(Box::new(req)) + .await + .expect("fail to send request to sim server"); + let resp = rx + .recv() + .await + .expect("fail to recv response from sim server"); + let resp = resp + .downcast::() + .expect("fail to downcast response to ReadResponse"); let content_length = resp.data.as_ref().map(|b| b.len()).unwrap_or(0); Ok(( RpRead::new(content_length as u64), @@ -129,7 +141,7 @@ impl oio::Read for MadsimReader { fn poll_read(&mut self, _cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { if let Some(ref data) = self.data { let len = data.len(); - buf[..len].copy_from_slice(&data); + buf[..len].copy_from_slice(data); Poll::Ready(Ok(len)) } else { Poll::Ready(Ok(0)) @@ -183,10 +195,19 @@ impl oio::BlockingWrite for MadsimWriter { impl oio::Write for MadsimWriter { async fn write(&mut self, bs: Bytes) -> crate::Result<()> { let req = Request::Write(self.path.to_string(), bs); - let ep = Endpoint::connect(self.sim_server_socket).await.expect("fail to connect to sim server"); - let (tx, mut rx) = ep.connect1(self.sim_server_socket).await.expect("fail to connect1 to sim server"); - tx.send(Box::new(req)).await.expect("fail to send request to sim server"); - rx.recv().await.expect("fail to recv response from sim server"); + let ep = Endpoint::connect(self.sim_server_socket) + .await + .expect("fail to connect to sim server"); + let (tx, mut rx) = ep + .connect1(self.sim_server_socket) + .await + .expect("fail to connect1 to sim server"); + tx.send(Box::new(req)) + .await + .expect("fail to send request to sim server"); + rx.recv() + .await + .expect("fail to recv response from sim server"); Ok(()) } @@ -226,7 +247,11 @@ impl SimServer { let (tx, mut rx, _) = ep.accept1().await?; let service = service.clone(); madsim::task::spawn(async move { - let request = *rx.recv().await?.downcast::().expect("invalid request"); + let request = *rx + .recv() + .await? + .downcast::() + .expect("invalid request"); let response = match request { Request::Read(path, args) => { Box::new(service.read(&path, args).await) as Payload @@ -242,17 +267,6 @@ impl SimServer { } } -pub struct SimClient { - ep: Endpoint, -} - -impl SimClient { - pub async fn connect(addr: SocketAddr) -> Result { - let ep = Endpoint::bind(addr).await?; - Ok(Self { ep }) - } -} - enum Request { Read(String, OpRead), Write(String, Bytes), @@ -265,7 +279,7 @@ pub struct SimService { impl SimService { async fn read(&self, path: &str, args: OpRead) -> ReadResponse { - let mut inner = self.inner.lock().unwrap(); + let inner = self.inner.lock().unwrap(); let data = inner.get(path); ReadResponse { data: data.cloned(), From 89af65d6de81d096229f14efce0a99aaea7b0922 Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Sat, 15 Apr 2023 21:50:55 +0800 Subject: [PATCH 06/18] panic in blocking func --- core/src/layers/simulation/mock.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/layers/simulation/mock.rs b/core/src/layers/simulation/mock.rs index ed304a36045..388b637b406 100644 --- a/core/src/layers/simulation/mock.rs +++ b/core/src/layers/simulation/mock.rs @@ -105,7 +105,7 @@ impl LayeredAccessor for MadsimAccessor { path: &str, args: OpRead, ) -> crate::Result<(RpRead, Self::BlockingReader)> { - todo!() + panic!("blocking_read is not supported in MadsimLayer"); } fn blocking_write( @@ -113,7 +113,7 @@ impl LayeredAccessor for MadsimAccessor { path: &str, args: OpWrite, ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { - todo!() + panic!("blocking_write is not supported in MadsimLayer"); } fn blocking_list( @@ -121,7 +121,7 @@ impl LayeredAccessor for MadsimAccessor { path: &str, args: OpList, ) -> crate::Result<(RpList, Self::BlockingPager)> { - todo!() + panic!("blocking_list is not supported in MadsimLayer"); } fn blocking_scan( @@ -129,7 +129,7 @@ impl LayeredAccessor for MadsimAccessor { path: &str, args: OpScan, ) -> crate::Result<(RpScan, Self::BlockingPager)> { - todo!() + panic!("blocking_scan is not supported in MadsimLayer"); } } From f0d7f02eb2f2f4576d7c8daffd6e7cd10864d99e Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Sat, 15 Apr 2023 21:56:57 +0800 Subject: [PATCH 07/18] add license header --- core/src/layers/simulation/mock.rs | 17 +++++++++++++++++ core/src/layers/simulation/mod.rs | 17 +++++++++++++++++ core/src/layers/simulation/real.rs | 18 ++++++++++++++++++ 3 files changed, 52 insertions(+) diff --git a/core/src/layers/simulation/mock.rs b/core/src/layers/simulation/mock.rs index 388b637b406..3dfaed7f0de 100644 --- a/core/src/layers/simulation/mock.rs +++ b/core/src/layers/simulation/mock.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::ops::{OpList, OpRead, OpScan, OpWrite}; use crate::raw::oio::Entry; use crate::raw::{oio, Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite}; diff --git a/core/src/layers/simulation/mod.rs b/core/src/layers/simulation/mod.rs index 70666e1188a..8c1b1d5938e 100644 --- a/core/src/layers/simulation/mod.rs +++ b/core/src/layers/simulation/mod.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + #[cfg(madsim)] mod mock; #[cfg(not(madsim))] diff --git a/core/src/layers/simulation/real.rs b/core/src/layers/simulation/real.rs index 14d14b93845..05c96c02ec3 100644 --- a/core/src/layers/simulation/real.rs +++ b/core/src/layers/simulation/real.rs @@ -1,7 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::ops::{OpList, OpRead, OpScan, OpWrite}; use crate::raw::{Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite}; use async_trait::async_trait; + #[derive(Debug, Copy, Clone, Default)] pub struct MadsimLayer; From 3263fe58c7ba8de3c863ccf77189224676064884 Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Sat, 15 Apr 2023 21:59:35 +0800 Subject: [PATCH 08/18] fmt --- core/src/layers/simulation/real.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/layers/simulation/real.rs b/core/src/layers/simulation/real.rs index 05c96c02ec3..2e987cf3aa0 100644 --- a/core/src/layers/simulation/real.rs +++ b/core/src/layers/simulation/real.rs @@ -19,7 +19,6 @@ use crate::ops::{OpList, OpRead, OpScan, OpWrite}; use crate::raw::{Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite}; use async_trait::async_trait; - #[derive(Debug, Copy, Clone, Default)] pub struct MadsimLayer; From fb60208eb79c2a5ccd5e997d5807cf3d0684402a Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Sat, 15 Apr 2023 22:36:05 +0800 Subject: [PATCH 09/18] add doc --- core/src/layers/simulation/real.rs | 52 ++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/core/src/layers/simulation/real.rs b/core/src/layers/simulation/real.rs index 2e987cf3aa0..48a614b5f5f 100644 --- a/core/src/layers/simulation/real.rs +++ b/core/src/layers/simulation/real.rs @@ -19,6 +19,58 @@ use crate::ops::{OpList, OpRead, OpScan, OpWrite}; use crate::raw::{Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite}; use async_trait::async_trait; +/// Add deterministic simulation for async operations, powered by [`madsim`](https://docs.rs/madsim/latest/madsim/). +/// +/// # Note +/// +/// - blocking operations are not supported, as [`madsim`](https://docs.rs/madsim/latest/madsim/) is async only. +/// +/// +/// # Examples +/// +/// ``` +/// use opendal::Operator; +/// use opendal::services; +/// use opendal::layers::MadsimLayer; +/// use opendal::layers::SimServer; +/// use madsim::{net::NetSim, runtime::Handle, time::sleep}; +/// use std::time::Duration; +/// +/// #[cfg(madsim)] +/// #[madsim::test] +/// async fn deterministic_simulation_test(){ +/// let handle = Handle::current(); +/// let ip1 = "10.0.0.1".parse().unwrap(); +/// let ip2 = "10.0.0.2".parse().unwrap(); +/// let sim_server_socket = "10.0.0.1:2379".parse().unwrap(); +/// let server = handle.create_node().name("server").ip(ip1).build(); +/// let client = handle.create_node().name("client").ip(ip2).build(); +/// +/// server.spawn(async move { +/// SimServer::serve(sim_server_socket).await.unwrap(); +/// }); +/// sleep(Duration::from_secs(1)).await; +/// +/// let handle = client.spawn(async move { +/// let mut builder = services::Fs::default(); +/// builder.root("."); +/// let op = Operator::new(builder) +/// .unwrap() +/// .layer(MadsimLayer::new(sim_server_socket)) +/// .finish(); +/// +/// let path = "hello.txt"; +/// let data = "Hello, World!"; +/// op.write(path, data).await.unwrap(); +/// assert_eq!(data.as_bytes(), op.read(path).await.unwrap()); +/// }); +/// handle.await.unwrap(); +/// } +/// ``` +/// To enable logging output, please set `RUSTFLAGS="--cfg madsim"`: +/// ```shell +/// RUSTFLAGS="--cfg madsim" cargo test +/// ``` #[derive(Debug, Copy, Clone, Default)] pub struct MadsimLayer; From c131139c106ddfd244ea257637919ac622fcc6cc Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Sat, 15 Apr 2023 22:39:30 +0800 Subject: [PATCH 10/18] delete unrelated changes --- core/src/layers/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 3110b889461..753a2fd8eea 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -18,7 +18,6 @@ //! `Layer` is the mechanism to intercept operations. mod concurrent_limit; - pub use concurrent_limit::ConcurrentLimitLayer; mod immutable_index; From 71f6c9ac223afa7b1af198288d94de799aa4b589 Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Sat, 15 Apr 2023 23:27:37 +0800 Subject: [PATCH 11/18] refactor: madsim as a feature, delete real mod --- core/Cargo.toml | 6 +- .../layers/{simulation/mock.rs => madsim.rs} | 38 +++++ core/src/layers/mod.rs | 12 +- core/src/layers/simulation/mod.rs | 68 -------- core/src/layers/simulation/real.rs | 151 ------------------ 5 files changed, 51 insertions(+), 224 deletions(-) rename core/src/layers/{simulation/mock.rs => madsim.rs} (87%) delete mode 100644 core/src/layers/simulation/mod.rs delete mode 100644 core/src/layers/simulation/real.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index 71ed20f8029..a485fce1cc9 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -53,13 +53,15 @@ native-tls = ["reqwest/native-tls", "ureq/native-tls"] native-tls-vendored = ["reqwest/native-tls-vendored", "ureq/native-tls"] # Enable all layers. -layers-all = ["layers-chaos", "layers-metrics", "layers-tracing"] +layers-all = ["layers-chaos", "layers-metrics", "layers-tracing","layers-madsim"] # Enable layers chaos support layers-chaos = ["dep:rand"] # Enable layers metrics support layers-metrics = ["dep:metrics"] # Enable layers tracing support. layers-tracing = ["dep:tracing"] +# Enable layers madsim support +layers-madsim = ["dep:madsim"] # Enable services dashmap support services-dashmap = ["dep:dashmap"] @@ -104,7 +106,7 @@ http = "0.2.5" hyper = "0.14" lazy-regex = { version = "2.5.0", optional = true } log = "0.4" -madsim = "0.2.19" +madsim = {version = "0.2.19", optional = true } md-5 = "0.10" metrics = { version = "0.20", optional = true } moka = { version = "0.10", optional = true, features = ["future"] } diff --git a/core/src/layers/simulation/mock.rs b/core/src/layers/madsim.rs similarity index 87% rename from core/src/layers/simulation/mock.rs rename to core/src/layers/madsim.rs index 3dfaed7f0de..6db20b7aa31 100644 --- a/core/src/layers/simulation/mock.rs +++ b/core/src/layers/madsim.rs @@ -315,3 +315,41 @@ struct ReadResponse { } struct WriteResponse {} + +#[cfg(test)] +mod test { + use super::*; + use crate::{services, EntryMode, Operator}; + use madsim::{net::NetSim, runtime::Handle, time::sleep}; + use std::time::Duration; + + #[madsim::test] + async fn test_madsim_layer() { + let handle = Handle::current(); + let ip1 = "10.0.0.1".parse().unwrap(); + let ip2 = "10.0.0.2".parse().unwrap(); + let sim_server_socket = "10.0.0.1:2379".parse().unwrap(); + let server = handle.create_node().name("server").ip(ip1).build(); + let client = handle.create_node().name("client").ip(ip2).build(); + + server.spawn(async move { + SimServer::serve(sim_server_socket).await.unwrap(); + }); + sleep(Duration::from_secs(1)).await; + + let handle = client.spawn(async move { + let mut builder = services::Fs::default(); + builder.root("."); + let op = Operator::new(builder) + .unwrap() + .layer(MadsimLayer::new(sim_server_socket)) + .finish(); + + let path = "hello.txt"; + let data = "Hello, World!"; + op.write(path, data).await.unwrap(); + assert_eq!(data.as_bytes(), op.read(path).await.unwrap()); + }); + handle.await.unwrap(); + } +} diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 753a2fd8eea..485f02a84b9 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -62,8 +62,14 @@ mod complete; pub(crate) use complete::CompleteLayer; -mod simulation; +#[cfg(feature = "layers-madsim")] +#[cfg(madsim)] +mod madsim; + +#[cfg(feature = "layers-madsim")] +#[cfg(madsim)] +pub use self::madsim::MadsimLayer; -pub use simulation::MadsimLayer; +#[cfg(feature = "layers-madsim")] #[cfg(madsim)] -pub use simulation::SimServer; +pub use self::madsim::SimServer; diff --git a/core/src/layers/simulation/mod.rs b/core/src/layers/simulation/mod.rs deleted file mode 100644 index 8c1b1d5938e..00000000000 --- a/core/src/layers/simulation/mod.rs +++ /dev/null @@ -1,68 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#[cfg(madsim)] -mod mock; -#[cfg(not(madsim))] -mod real; - -#[cfg(madsim)] -pub use mock::MadsimLayer; -#[cfg(madsim)] -pub use mock::SimServer; -#[cfg(not(madsim))] -pub use real::MadsimLayer; - -#[cfg(madsim)] -#[cfg(test)] -mod test { - use super::*; - use crate::{services, EntryMode, Operator}; - use madsim::{net::NetSim, runtime::Handle, time::sleep}; - use std::time::Duration; - - // RUSTFLAGS="--cfg madsim" cargo test layers::simulation::test::test_madsim_layer - #[madsim::test] - async fn test_madsim_layer() { - let handle = Handle::current(); - let ip1 = "10.0.0.1".parse().unwrap(); - let ip2 = "10.0.0.2".parse().unwrap(); - let sim_server_socket = "10.0.0.1:2379".parse().unwrap(); - let server = handle.create_node().name("server").ip(ip1).build(); - let client = handle.create_node().name("client").ip(ip2).build(); - - server.spawn(async move { - SimServer::serve(sim_server_socket).await.unwrap(); - }); - sleep(Duration::from_secs(1)).await; - - let handle = client.spawn(async move { - let mut builder = services::Fs::default(); - builder.root("."); - let op = Operator::new(builder) - .unwrap() - .layer(MadsimLayer::new(sim_server_socket)) - .finish(); - - let path = "hello.txt"; - let data = "Hello, World!"; - op.write(path, data).await.unwrap(); - assert_eq!(data.as_bytes(), op.read(path).await.unwrap()); - }); - handle.await.unwrap(); - } -} diff --git a/core/src/layers/simulation/real.rs b/core/src/layers/simulation/real.rs deleted file mode 100644 index 48a614b5f5f..00000000000 --- a/core/src/layers/simulation/real.rs +++ /dev/null @@ -1,151 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::ops::{OpList, OpRead, OpScan, OpWrite}; -use crate::raw::{Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite}; -use async_trait::async_trait; - -/// Add deterministic simulation for async operations, powered by [`madsim`](https://docs.rs/madsim/latest/madsim/). -/// -/// # Note -/// -/// - blocking operations are not supported, as [`madsim`](https://docs.rs/madsim/latest/madsim/) is async only. -/// -/// -/// # Examples -/// -/// ``` -/// use opendal::Operator; -/// use opendal::services; -/// use opendal::layers::MadsimLayer; -/// use opendal::layers::SimServer; -/// use madsim::{net::NetSim, runtime::Handle, time::sleep}; -/// use std::time::Duration; -/// -/// #[cfg(madsim)] -/// #[madsim::test] -/// async fn deterministic_simulation_test(){ -/// let handle = Handle::current(); -/// let ip1 = "10.0.0.1".parse().unwrap(); -/// let ip2 = "10.0.0.2".parse().unwrap(); -/// let sim_server_socket = "10.0.0.1:2379".parse().unwrap(); -/// let server = handle.create_node().name("server").ip(ip1).build(); -/// let client = handle.create_node().name("client").ip(ip2).build(); -/// -/// server.spawn(async move { -/// SimServer::serve(sim_server_socket).await.unwrap(); -/// }); -/// sleep(Duration::from_secs(1)).await; -/// -/// let handle = client.spawn(async move { -/// let mut builder = services::Fs::default(); -/// builder.root("."); -/// let op = Operator::new(builder) -/// .unwrap() -/// .layer(MadsimLayer::new(sim_server_socket)) -/// .finish(); -/// -/// let path = "hello.txt"; -/// let data = "Hello, World!"; -/// op.write(path, data).await.unwrap(); -/// assert_eq!(data.as_bytes(), op.read(path).await.unwrap()); -/// }); -/// handle.await.unwrap(); -/// } -/// ``` -/// To enable logging output, please set `RUSTFLAGS="--cfg madsim"`: -/// ```shell -/// RUSTFLAGS="--cfg madsim" cargo test -/// ``` -#[derive(Debug, Copy, Clone, Default)] -pub struct MadsimLayer; - -impl Layer for MadsimLayer { - type LayeredAccessor = MadsimAccessor; - - fn layer(&self, inner: A) -> Self::LayeredAccessor { - MadsimAccessor { inner } - } -} - -#[derive(Debug)] -pub struct MadsimAccessor { - inner: A, -} - -#[async_trait] -impl LayeredAccessor for MadsimAccessor { - type Inner = A; - type Reader = A::Reader; - type BlockingReader = A::BlockingReader; - type Writer = A::Writer; - type BlockingWriter = A::BlockingWriter; - type Pager = A::Pager; - type BlockingPager = A::BlockingPager; - - fn inner(&self) -> &Self::Inner { - &self.inner - } - - async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { - self.inner.read(path, args).await - } - - async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { - self.inner.write(path, args).await - } - - async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Pager)> { - self.inner.list(path, args).await - } - - async fn scan(&self, path: &str, args: OpScan) -> crate::Result<(RpScan, Self::Pager)> { - self.inner.scan(path, args).await - } - - fn blocking_read( - &self, - path: &str, - args: OpRead, - ) -> crate::Result<(RpRead, Self::BlockingReader)> { - self.inner.blocking_read(path, args) - } - - fn blocking_write( - &self, - path: &str, - args: OpWrite, - ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { - self.inner.blocking_write(path, args) - } - - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> crate::Result<(RpList, Self::BlockingPager)> { - self.inner.blocking_list(path, args) - } - - fn blocking_scan( - &self, - path: &str, - args: OpScan, - ) -> crate::Result<(RpScan, Self::BlockingPager)> { - self.inner.blocking_scan(path, args) - } -} From 751f8dcfd7835d86005d9b2a8fef082e56ccd6c0 Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Sat, 15 Apr 2023 23:53:44 +0800 Subject: [PATCH 12/18] fix conflict --- core/src/layers/madsim.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 6db20b7aa31..a89432f746a 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -18,13 +18,12 @@ use crate::ops::{OpList, OpRead, OpScan, OpWrite}; use crate::raw::oio::Entry; use crate::raw::{oio, Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite}; -use crate::EntryMode; use async_trait::async_trait; use bytes::Bytes; use madsim::net::Endpoint; use madsim::net::Payload; use std::collections::HashMap; -use std::fmt::{Debug, Formatter}; +use std::fmt::Debug; use std::io::Result; use std::io::SeekFrom; use std::net::SocketAddr; @@ -232,6 +231,10 @@ impl oio::Write for MadsimWriter { todo!() } + async fn abort(&mut self) -> crate::Result<()> { + todo!() + } + async fn close(&mut self) -> crate::Result<()> { Ok(()) } @@ -319,8 +322,8 @@ struct WriteResponse {} #[cfg(test)] mod test { use super::*; - use crate::{services, EntryMode, Operator}; - use madsim::{net::NetSim, runtime::Handle, time::sleep}; + use crate::{services, Operator}; + use madsim::{runtime::Handle, time::sleep}; use std::time::Duration; #[madsim::test] From 7472d8662be6e5c3009d16d8492107ad4115b757 Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Sat, 15 Apr 2023 23:56:08 +0800 Subject: [PATCH 13/18] delete useless modification --- core/src/layers/mod.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 287d5e758f2..6892488fbe2 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -21,22 +21,18 @@ mod concurrent_limit; pub use concurrent_limit::ConcurrentLimitLayer; mod immutable_index; - pub use immutable_index::ImmutableIndexLayer; mod logging; - pub use logging::LoggingLayer; #[cfg(feature = "layers-chaos")] mod chaos; - #[cfg(feature = "layers-chaos")] pub use chaos::ChaosLayer; #[cfg(feature = "layers-metrics")] mod metrics; - #[cfg(feature = "layers-metrics")] pub use self::metrics::MetricsLayer; @@ -46,12 +42,10 @@ mod prometheus; pub use self::prometheus::PrometheusLayer; mod retry; - pub use self::retry::RetryLayer; #[cfg(feature = "layers-tracing")] mod tracing; - #[cfg(feature = "layers-tracing")] pub use self::tracing::TracingLayer; @@ -61,15 +55,12 @@ mod minitrace; pub use self::minitrace::MinitraceLayer; mod type_eraser; - pub(crate) use type_eraser::TypeEraseLayer; mod error_context; - pub(crate) use error_context::ErrorContextLayer; mod complete; - pub(crate) use complete::CompleteLayer; #[cfg(feature = "layers-madsim")] From 072a5738999e3325fe9824cc039d4b634fa14fcc Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Tue, 18 Apr 2023 15:11:33 +0800 Subject: [PATCH 14/18] update madsim version --- core/Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 0526bdf658f..90ca60e8e43 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -82,12 +82,12 @@ layers-chaos = ["dep:rand"] layers-metrics = ["dep:metrics"] # Enable layers prometheus support layers-prometheus = ["dep:prometheus"] +# Enable layers madsim support +layers-madsim = ["dep:madsim"] # Enable layers minitrace support. layers-minitrace = ["dep:minitrace"] # Enable layers tracing support. layers-tracing = ["dep:tracing"] -# Enable layers madsim support -layers-madsim = ["dep:madsim"] services-azblob = [ "dep:reqsign", @@ -161,7 +161,7 @@ http = "0.2.5" hyper = "0.14" lazy-regex = { version = "2.5.0", optional = true } log = "0.4" -madsim = {version = "0.2.19", optional = true } +madsim = {version = "0.2.21", optional = true } md-5 = "0.10" metrics = { version = "0.20", optional = true } minitrace = { version = "0.4.0", optional = true } From efbaba98d68b1dae3a8fb5a01dd6a3fc63e4fdcd Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Tue, 18 Apr 2023 15:15:03 +0800 Subject: [PATCH 15/18] mark blocking func as not supported --- core/src/layers/madsim.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index a89432f746a..720c9fac940 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -175,15 +175,15 @@ impl oio::Read for MadsimReader { impl oio::BlockingRead for MadsimReader { fn read(&mut self, buf: &mut [u8]) -> crate::Result { - todo!() + panic!("blocking_read is not supported in MadsimLayer"); } fn seek(&mut self, pos: SeekFrom) -> crate::Result { - todo!() + panic!("blocking_read is not supported in MadsimLayer"); } fn next(&mut self) -> Option> { - todo!() + panic!("blocking_read is not supported in MadsimLayer"); } } @@ -195,15 +195,15 @@ pub struct MadsimWriter { impl oio::BlockingWrite for MadsimWriter { fn write(&mut self, bs: Bytes) -> crate::Result<()> { - todo!() + panic!("blocking_write is not supported in MadsimLayer"); } fn append(&mut self, bs: Bytes) -> crate::Result<()> { - todo!() + panic!("blocking_write is not supported in MadsimLayer"); } fn close(&mut self) -> crate::Result<()> { - todo!() + panic!("blocking_write is not supported in MadsimLayer"); } } @@ -251,7 +251,7 @@ impl oio::Page for MadsimPager { impl oio::BlockingPage for MadsimPager { fn next(&mut self) -> crate::Result>> { - todo!() + panic!("blocking_page is not supported in MadsimLayer"); } } From 0ca264d8933b01f8d5e93a2037a85576faee106e Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Tue, 18 Apr 2023 16:08:12 +0800 Subject: [PATCH 16/18] fix details --- core/src/layers/madsim.rs | 186 +++++++++++++++++++++++++------------- 1 file changed, 124 insertions(+), 62 deletions(-) diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 720c9fac940..dc6c0acfa81 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -17,7 +17,10 @@ use crate::ops::{OpList, OpRead, OpScan, OpWrite}; use crate::raw::oio::Entry; +use crate::raw::AccessorInfo; use crate::raw::{oio, Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite}; +use crate::types::Error; +use crate::types::ErrorKind; use async_trait::async_trait; use bytes::Bytes; use madsim::net::Endpoint; @@ -31,53 +34,111 @@ use std::sync::Arc; use std::sync::Mutex; use std::task::{Context, Poll}; +/// Add deterministic simulation for async operations, powered by [`madsim`](https://docs.rs/madsim/latest/madsim/). +/// +/// # Note +/// +/// - blocking operations are not supported, as [`madsim`](https://docs.rs/madsim/latest/madsim/) is async only. +/// +/// +/// # Examples +/// +/// ``` +/// use opendal::Operator; +/// use opendal::services; +/// use opendal::layers::MadsimLayer; +/// use opendal::layers::SimServer; +/// use madsim::{net::NetSim, runtime::Handle, time::sleep}; +/// use std::time::Duration; +/// +/// #[cfg(madsim)] +/// #[madsim::test] +/// async fn deterministic_simulation_test(){ +/// let handle = Handle::current(); +/// let ip1 = "10.0.0.1".parse().unwrap(); +/// let ip2 = "10.0.0.2".parse().unwrap(); +/// let sim_server_socket = "10.0.0.1:2379".parse().unwrap(); +/// let server = handle.create_node().name("server").ip(ip1).build(); +/// let client = handle.create_node().name("client").ip(ip2).build(); +/// +/// server.spawn(async move { +/// SimServer::serve(sim_server_socket).await.unwrap(); +/// }); +/// sleep(Duration::from_secs(1)).await; +/// +/// let handle = client.spawn(async move { +/// let mut builder = services::Fs::default(); +/// builder.root("."); +/// let op = Operator::new(builder) +/// .unwrap() +/// .layer(MadsimLayer::new(sim_server_socket)) +/// .finish(); +/// +/// let path = "hello.txt"; +/// let data = "Hello, World!"; +/// op.write(path, data).await.unwrap(); +/// assert_eq!(data.as_bytes(), op.read(path).await.unwrap()); +/// }); +/// handle.await.unwrap(); +/// } +/// ``` +/// To enable logging output, please set `RUSTFLAGS="--cfg madsim"`: +/// ```shell +/// RUSTFLAGS="--cfg madsim" cargo test +/// ``` #[derive(Debug, Copy, Clone)] pub struct MadsimLayer { - sim_server_socket: SocketAddr, + addr: SocketAddr, } impl MadsimLayer { - pub fn new(sim_server_socket: SocketAddr) -> Self { - Self { sim_server_socket } + pub fn new(endpoint: &str) -> Self { + Self { + addr: endpoint.parse().unwrap(), + } } } impl Layer for MadsimLayer { type LayeredAccessor = MadsimAccessor; - fn layer(&self, _inner: A) -> Self::LayeredAccessor { - MadsimAccessor { - sim_server_socket: self.sim_server_socket, - } + fn layer(&self, _: A) -> Self::LayeredAccessor { + MadsimAccessor { addr: self.addr } } } #[derive(Debug)] pub struct MadsimAccessor { - sim_server_socket: SocketAddr, + addr: SocketAddr, } #[async_trait] impl LayeredAccessor for MadsimAccessor { type Inner = (); type Reader = MadsimReader; - type BlockingReader = MadsimReader; + type BlockingReader = (); type Writer = MadsimWriter; - type BlockingWriter = MadsimWriter; + type BlockingWriter = (); type Pager = MadsimPager; - type BlockingPager = MadsimPager; + type BlockingPager = (); fn inner(&self) -> &Self::Inner { &() } + fn metadata(&self) -> AccessorInfo { + let mut info = AccessorInfo::default(); + info.set_name("madsim"); + info + } + async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { let req = Request::Read(path.to_string(), args); - let ep = Endpoint::connect(self.sim_server_socket) + let ep = Endpoint::connect(self.addr) .await .expect("fail to connect to sim server"); let (tx, mut rx) = ep - .connect1(self.sim_server_socket) + .connect1(self.addr) .await .expect("fail to connect1 to sim server"); tx.send(Box::new(req)) @@ -103,17 +164,23 @@ impl LayeredAccessor for MadsimAccessor { MadsimWriter { path: path.to_string(), args, - sim_server_socket: self.sim_server_socket, + sim_server_socket: self.addr, }, )) } async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Pager)> { - todo!() + Err(Error::new( + ErrorKind::Unsupported, + "will be supported in the future", + )) } async fn scan(&self, path: &str, args: OpScan) -> crate::Result<(RpScan, Self::Pager)> { - todo!() + Err(Error::new( + ErrorKind::Unsupported, + "will be supported in the future", + )) } fn blocking_read( @@ -121,7 +188,10 @@ impl LayeredAccessor for MadsimAccessor { path: &str, args: OpRead, ) -> crate::Result<(RpRead, Self::BlockingReader)> { - panic!("blocking_read is not supported in MadsimLayer"); + Err(Error::new( + ErrorKind::Unsupported, + "will not be supported in MadsimLayer", + )) } fn blocking_write( @@ -129,7 +199,10 @@ impl LayeredAccessor for MadsimAccessor { path: &str, args: OpWrite, ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { - panic!("blocking_write is not supported in MadsimLayer"); + Err(Error::new( + ErrorKind::Unsupported, + "will not be supported in MadsimLayer", + )) } fn blocking_list( @@ -137,7 +210,10 @@ impl LayeredAccessor for MadsimAccessor { path: &str, args: OpList, ) -> crate::Result<(RpList, Self::BlockingPager)> { - panic!("blocking_list is not supported in MadsimLayer"); + Err(Error::new( + ErrorKind::Unsupported, + "will not be supported in MadsimLayer", + )) } fn blocking_scan( @@ -145,7 +221,10 @@ impl LayeredAccessor for MadsimAccessor { path: &str, args: OpScan, ) -> crate::Result<(RpScan, Self::BlockingPager)> { - panic!("blocking_scan is not supported in MadsimLayer"); + Err(Error::new( + ErrorKind::Unsupported, + "will not be supported in MadsimLayer", + )) } } @@ -165,25 +244,17 @@ impl oio::Read for MadsimReader { } fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { - todo!() + Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "will be supported in the future", + ))) } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - todo!() - } -} - -impl oio::BlockingRead for MadsimReader { - fn read(&mut self, buf: &mut [u8]) -> crate::Result { - panic!("blocking_read is not supported in MadsimLayer"); - } - - fn seek(&mut self, pos: SeekFrom) -> crate::Result { - panic!("blocking_read is not supported in MadsimLayer"); - } - - fn next(&mut self) -> Option> { - panic!("blocking_read is not supported in MadsimLayer"); + Poll::Ready(Some(Err(Error::new( + ErrorKind::Unsupported, + "will be supported in the future", + )))) } } @@ -193,20 +264,6 @@ pub struct MadsimWriter { sim_server_socket: SocketAddr, } -impl oio::BlockingWrite for MadsimWriter { - fn write(&mut self, bs: Bytes) -> crate::Result<()> { - panic!("blocking_write is not supported in MadsimLayer"); - } - - fn append(&mut self, bs: Bytes) -> crate::Result<()> { - panic!("blocking_write is not supported in MadsimLayer"); - } - - fn close(&mut self) -> crate::Result<()> { - panic!("blocking_write is not supported in MadsimLayer"); - } -} - #[async_trait] impl oio::Write for MadsimWriter { async fn write(&mut self, bs: Bytes) -> crate::Result<()> { @@ -228,11 +285,17 @@ impl oio::Write for MadsimWriter { } async fn append(&mut self, bs: Bytes) -> crate::Result<()> { - todo!() + Err(Error::new( + ErrorKind::Unsupported, + "will be supported in the future", + )) } async fn abort(&mut self) -> crate::Result<()> { - todo!() + Err(Error::new( + ErrorKind::Unsupported, + "will be supported in the future", + )) } async fn close(&mut self) -> crate::Result<()> { @@ -245,17 +308,14 @@ pub struct MadsimPager {} #[async_trait] impl oio::Page for MadsimPager { async fn next(&mut self) -> crate::Result>> { - todo!() - } -} - -impl oio::BlockingPage for MadsimPager { - fn next(&mut self) -> crate::Result>> { - panic!("blocking_page is not supported in MadsimLayer"); + Err(Error::new( + ErrorKind::Unsupported, + "will be supported in the future", + )) } } -/// A simulated server. +/// A simulated server.This an experimental feature, docs are not ready yet. #[derive(Default, Clone)] pub struct SimServer; @@ -331,12 +391,14 @@ mod test { let handle = Handle::current(); let ip1 = "10.0.0.1".parse().unwrap(); let ip2 = "10.0.0.2".parse().unwrap(); - let sim_server_socket = "10.0.0.1:2379".parse().unwrap(); + let sim_server_socket = "10.0.0.1:2379"; let server = handle.create_node().name("server").ip(ip1).build(); let client = handle.create_node().name("client").ip(ip2).build(); server.spawn(async move { - SimServer::serve(sim_server_socket).await.unwrap(); + SimServer::serve(sim_server_socket.parse().unwrap()) + .await + .unwrap(); }); sleep(Duration::from_secs(1)).await; From 01a3ca8baa9a4de5b7c2f3b17efbfa76fb5c773c Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Tue, 18 Apr 2023 16:10:17 +0800 Subject: [PATCH 17/18] rename struct --- core/src/layers/madsim.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index dc6c0acfa81..7f5630d7da5 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -47,7 +47,7 @@ use std::task::{Context, Poll}; /// use opendal::Operator; /// use opendal::services; /// use opendal::layers::MadsimLayer; -/// use opendal::layers::SimServer; +/// use opendal::layers::MadsimServer; /// use madsim::{net::NetSim, runtime::Handle, time::sleep}; /// use std::time::Duration; /// @@ -317,9 +317,9 @@ impl oio::Page for MadsimPager { /// A simulated server.This an experimental feature, docs are not ready yet. #[derive(Default, Clone)] -pub struct SimServer; +pub struct MadsimServer; -impl SimServer { +impl MadsimServer { pub async fn serve(addr: SocketAddr) -> Result<()> { let ep = Endpoint::bind(addr).await?; let service = Arc::new(SimService::default()); From f3e94342b74a892872e7da3a6ad7965fb69d5233 Mon Sep 17 00:00:00 2001 From: Sky <3374614481@qq.com> Date: Tue, 18 Apr 2023 16:38:37 +0800 Subject: [PATCH 18/18] throw error, rename var, set cap --- core/src/layers/madsim.rs | 41 +++++++++++++++++++-------------------- core/src/layers/mod.rs | 2 +- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 7f5630d7da5..b76ebc94d77 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -17,6 +17,7 @@ use crate::ops::{OpList, OpRead, OpScan, OpWrite}; use crate::raw::oio::Entry; +use crate::raw::AccessorCapability; use crate::raw::AccessorInfo; use crate::raw::{oio, Accessor, Layer, LayeredAccessor, RpList, RpRead, RpScan, RpWrite}; use crate::types::Error; @@ -57,12 +58,12 @@ use std::task::{Context, Poll}; /// let handle = Handle::current(); /// let ip1 = "10.0.0.1".parse().unwrap(); /// let ip2 = "10.0.0.2".parse().unwrap(); -/// let sim_server_socket = "10.0.0.1:2379".parse().unwrap(); +/// let server_addr = "10.0.0.1:2379".parse().unwrap(); /// let server = handle.create_node().name("server").ip(ip1).build(); /// let client = handle.create_node().name("client").ip(ip2).build(); /// /// server.spawn(async move { -/// SimServer::serve(sim_server_socket).await.unwrap(); +/// SimServer::serve(server_addr).await.unwrap(); /// }); /// sleep(Duration::from_secs(1)).await; /// @@ -71,7 +72,7 @@ use std::task::{Context, Poll}; /// builder.root("."); /// let op = Operator::new(builder) /// .unwrap() -/// .layer(MadsimLayer::new(sim_server_socket)) +/// .layer(MadsimLayer::new(server_addr)) /// .finish(); /// /// let path = "hello.txt"; @@ -129,6 +130,7 @@ impl LayeredAccessor for MadsimAccessor { fn metadata(&self) -> AccessorInfo { let mut info = AccessorInfo::default(); info.set_name("madsim"); + info.set_capabilities(AccessorCapability::Read | AccessorCapability::Write); info } @@ -164,7 +166,7 @@ impl LayeredAccessor for MadsimAccessor { MadsimWriter { path: path.to_string(), args, - sim_server_socket: self.addr, + addr: self.addr, }, )) } @@ -261,26 +263,17 @@ impl oio::Read for MadsimReader { pub struct MadsimWriter { path: String, args: OpWrite, - sim_server_socket: SocketAddr, + addr: SocketAddr, } #[async_trait] impl oio::Write for MadsimWriter { async fn write(&mut self, bs: Bytes) -> crate::Result<()> { let req = Request::Write(self.path.to_string(), bs); - let ep = Endpoint::connect(self.sim_server_socket) - .await - .expect("fail to connect to sim server"); - let (tx, mut rx) = ep - .connect1(self.sim_server_socket) - .await - .expect("fail to connect1 to sim server"); - tx.send(Box::new(req)) - .await - .expect("fail to send request to sim server"); - rx.recv() - .await - .expect("fail to recv response from sim server"); + let ep = Endpoint::connect(self.addr).await?; + let (tx, mut rx) = ep.connect1(self.addr).await?; + tx.send(Box::new(req)).await?; + rx.recv().await?; Ok(()) } @@ -315,6 +308,12 @@ impl oio::Page for MadsimPager { } } +impl From for Error { + fn from(e: std::io::Error) -> Self { + Error::new(ErrorKind::Unexpected, "madsim error") + } +} + /// A simulated server.This an experimental feature, docs are not ready yet. #[derive(Default, Clone)] pub struct MadsimServer; @@ -391,12 +390,12 @@ mod test { let handle = Handle::current(); let ip1 = "10.0.0.1".parse().unwrap(); let ip2 = "10.0.0.2".parse().unwrap(); - let sim_server_socket = "10.0.0.1:2379"; + let server_addr = "10.0.0.1:2379"; let server = handle.create_node().name("server").ip(ip1).build(); let client = handle.create_node().name("client").ip(ip2).build(); server.spawn(async move { - SimServer::serve(sim_server_socket.parse().unwrap()) + MadsimServer::serve(server_addr.parse().unwrap()) .await .unwrap(); }); @@ -407,7 +406,7 @@ mod test { builder.root("."); let op = Operator::new(builder) .unwrap() - .layer(MadsimLayer::new(sim_server_socket)) + .layer(MadsimLayer::new(server_addr)) .finish(); let path = "hello.txt"; diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 6892488fbe2..3b6b7756589 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -73,4 +73,4 @@ pub use self::madsim::MadsimLayer; #[cfg(feature = "layers-madsim")] #[cfg(madsim)] -pub use self::madsim::SimServer; +pub use self::madsim::MadsimServer;