diff --git a/integration_test/src/byte_reader_writer_tests.rs b/integration_test/src/byte_reader_writer_tests.rs index b2f7ede41..b89f65740 100644 --- a/integration_test/src/byte_reader_writer_tests.rs +++ b/integration_test/src/byte_reader_writer_tests.rs @@ -47,7 +47,7 @@ pub fn test_byte_stream(config: PravegaStandaloneServiceConfig) { client_factory.controller_client(), &scope_name, &stream_name, - 1, + 2, )); let scoped_segment = ScopedSegment { @@ -70,19 +70,16 @@ pub fn test_byte_stream(config: PravegaStandaloneServiceConfig) { client_factory.controller_client(), &scope_name, &stream_name, - 4, + 1, )); - - for i in 0..4 { - let scoped_segment = ScopedSegment { - scope: scope_name.clone(), - stream: stream_name.clone(), - segment: Segment::from(i), - }; - let mut writer = client_factory.create_byte_writer(scoped_segment.clone()); - let mut reader = client_factory.create_byte_reader(scoped_segment); - test_write_and_read_with_workload(&mut writer, &mut reader); - } + let scoped_segment = ScopedSegment { + scope: scope_name, + stream: stream_name, + segment: Segment::from(0), + }; + let mut writer = client_factory.create_byte_writer(scoped_segment.clone()); + let mut reader = client_factory.create_byte_reader(scoped_segment); + test_write_and_read_with_workload(&mut writer, &mut reader); let scope_name = Scope::from("testScopeByteStreamConditionalAppend".to_owned()); let stream_name = Stream::from("testStreamByteStreamConditionalAppend".to_owned()); @@ -98,6 +95,20 @@ pub fn test_byte_stream(config: PravegaStandaloneServiceConfig) { 1, )); test_multiple_writers_conditional_append(&client_factory, segment); + let scope_name = Scope::from("testScopeAsyncByteStream".to_owned()); + let stream_name = Stream::from("testStreamAsyncByteStream".to_owned()); + let segment = ScopedSegment { + scope: scope_name.clone(), + stream: stream_name.clone(), + segment: Segment::from(0), + }; + handle.block_on(utils::create_scope_stream( + client_factory.controller_client(), + &scope_name, + &stream_name, + 1, + )); + handle.block_on(test_async_write_and_read(&client_factory, segment)); } fn test_simple_write_and_read(writer: &mut ByteWriter, reader: &mut ByteReader) { @@ -108,7 +119,6 @@ fn test_simple_write_and_read(writer: &mut ByteWriter, reader: &mut ByteReader) let size1 = writer.write(&payload1).expect("write payload1 to byte stream"); assert_eq!(size1, 4); writer.flush().expect("flush byte stream writer"); - writer.seek_to_tail(); assert_eq!(writer.current_write_offset(), 4); let size2 = writer.write(&payload2).expect("write payload2 to byte stream"); @@ -166,7 +176,7 @@ fn test_truncation(writer: &mut ByteWriter, reader: &mut ByteReader, rt: &mut Ru assert!(result.is_err()); // get current head - let head = reader.current_head().expect("get current head"); + let head = rt.block_on(reader.current_head()).expect("get current head"); assert_eq!(head, 4); // read from current head @@ -253,3 +263,31 @@ fn test_multiple_writers_conditional_append(factory: &ClientFactory, segment: Sc assert_eq!(writer1.current_write_offset(), 3072); info!("test byte stream multiple writers concurrent append passed"); } + +async fn test_async_write_and_read(factory: &ClientFactory, segment: ScopedSegment) { + info!("test byte stream async write and read"); + let mut writer = factory.create_byte_writer_async(segment.clone()).await; + for _i in 0i32..10 { + for _j in 0i32..1000 { + let buf = vec![1; 1024]; + let size = writer.write_async(&buf).await; + assert_eq!(size, 1024); + } + writer.flush_async().await.expect("flush"); + writer.seek_to_tail_async().await; + } + + let mut reader = factory.create_byte_reader_async(segment.clone()).await; + assert!(reader.seek_async(SeekFrom::Start(0)).await.is_ok()); + let mut read = 0; + loop { + let mut buf = vec![0; 1024]; + let size = reader.read_async(&mut buf).await.expect("read from byte stream"); + read += size; + if read == 1024 * 1000 * 10 { + break; + } + } + assert_eq!(reader.available(), 0); + info!("test async write and read passed"); +} diff --git a/integration_test/src/lib.rs b/integration_test/src/lib.rs index 3727eb414..586caea96 100644 --- a/integration_test/src/lib.rs +++ b/integration_test/src/lib.rs @@ -98,7 +98,7 @@ mod test { let config = PravegaStandaloneServiceConfig::new(false, true, true); run_tests(config); - let config = PravegaStandaloneServiceConfig::new(false, false, false); + let config = PravegaStandaloneServiceConfig::new(true, false, false); run_tests(config); // disconnection test will start its own Pravega Standalone. diff --git a/python_binding/Cargo.toml b/python_binding/Cargo.toml index 1f261a153..d6597798e 100644 --- a/python_binding/Cargo.toml +++ b/python_binding/Cargo.toml @@ -47,4 +47,4 @@ derive-new = "0.5" pyo3 = { features = ["extension-module"], optional = true, version = "0.13" } #WASM bindings wasm-bindgen = { version = "0.2.63", optional = true } -cfg-if = "0.1.10" +cfg-if = "0.1.10" \ No newline at end of file diff --git a/shared/src/lib.rs b/shared/src/lib.rs index c6d909ca5..aeac66d97 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -348,6 +348,7 @@ impl From<&str> for ScopedSegment { let mut tokens = NameUtils::extract_segment_tokens(qualified_name.to_owned()); let segment_id = tokens.pop().expect("get segment id from tokens"); let stream_name = tokens.pop().expect("get stream name from tokens"); + if tokens.is_empty() { // scope not present ScopedSegment { diff --git a/src/byte/reader.rs b/src/byte/reader.rs index 3a48878c9..52775f555 100644 --- a/src/byte/reader.rs +++ b/src/byte/reader.rs @@ -79,18 +79,24 @@ impl Read for ByteReader { impl ByteReader { pub(crate) fn new(segment: ScopedSegment, factory: ClientFactory, buffer_size: usize) -> Self { - let async_reader = factory + factory .runtime() - .block_on(factory.create_async_event_reader(segment.clone())); + .block_on(ByteReader::new_async(segment, factory.clone(), buffer_size)) + } + + pub(crate) async fn new_async( + segment: ScopedSegment, + factory: ClientFactory, + buffer_size: usize, + ) -> Self { + let async_reader = factory.create_async_event_reader(segment.clone()).await; let async_reader_wrapper = PrefetchingAsyncSegmentReader::new( factory.runtime().handle().clone(), Arc::new(Box::new(async_reader)), 0, buffer_size, ); - let metadata_client = factory - .runtime() - .block_on(factory.create_segment_metadata_client(segment)); + let metadata_client = factory.create_segment_metadata_client(segment).await; ByteReader { reader_id: Uuid::new_v4(), reader: Some(async_reader_wrapper), @@ -100,19 +106,49 @@ impl ByteReader { } } + /// Read data asynchronously. + /// + /// ```ignore + /// let mut byte_reader = client_factory.create_byte_reader_async(segment).await; + /// let mut buf: Vec = vec![0; 4]; + /// let size = byte_reader.read_async(&mut buf).expect("read"); + /// ``` + pub async fn read_async(&mut self, buf: &mut [u8]) -> Result { + self.reader + .as_mut() + .unwrap() + .read(buf) + .await + .map_err(|e| Error::new(ErrorKind::Other, format!("Error: {:?}", e))) + } + /// Return the head of current readable data in the segment. /// /// The ByteReader is initialized to read from the segment at offset 0. However, it might /// encounter the SegmentIsTruncated error due to the segment has been truncated. In this case, /// application should call this method to get the current readable head and read from it. /// ```ignore - /// let mut byte_reader = client_factory.create_byte_reader(segment); - /// let offset = byte_reader.current_head().expect("get current head offset"); + /// let mut byte_reader = client_factory.create_byte_reader_async(segment).await; + /// let offset = byte_reader.current_head().await.expect("get current head offset"); /// ``` - pub fn current_head(&self) -> std::io::Result { - self.factory - .runtime() - .block_on(self.metadata_client.fetch_current_starting_head()) + pub async fn current_head(&self) -> std::io::Result { + self.metadata_client + .fetch_current_starting_head() + .await + .map(|i| i as u64) + .map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e))) + } + + /// Return the tail offset of the segment. + /// + /// ```ignore + /// let mut byte_reader = client_factory.create_byte_reader_async(segment).await; + /// let offset = byte_reader.current_tail().await.expect("get current tail offset"); + /// ``` + pub async fn current_tail(&self) -> std::io::Result { + self.metadata_client + .fetch_current_segment_length() + .await .map(|i| i as u64) .map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e))) } @@ -123,8 +159,8 @@ impl ByteReader { /// let mut byte_reader = client_factory.create_byte_reader(segment); /// let offset = byte_reader.current_offset(); /// ``` - pub fn current_offset(&self) -> i64 { - self.reader.as_ref().unwrap().offset + pub fn current_offset(&self) -> u64 { + self.reader.as_ref().unwrap().offset as u64 } /// Return the bytes that are available to read instantly without fetching from server. @@ -138,23 +174,8 @@ impl ByteReader { self.reader.as_ref().unwrap().available() } - fn recreate_reader_wrapper(&mut self, offset: i64) { - let internal_reader = self.reader.take().unwrap().extract_reader(); - let new_reader_wrapper = PrefetchingAsyncSegmentReader::new( - self.factory.runtime().handle().clone(), - internal_reader, - offset, - self.reader_buffer_size, - ); - self.reader = Some(new_reader_wrapper); - } -} - -/// The Seek implementation for ByteStreamReader allows seeking to a byte offset from the beginning -/// of the stream or a byte offset relative to the current position in the stream. -/// If the stream has been truncated, the byte offset will be relative to the original beginning of the stream. -impl Seek for ByteReader { - fn seek(&mut self, pos: SeekFrom) -> std::io::Result { + /// Seek to an offset asynchronously. + pub async fn seek_async(&mut self, pos: SeekFrom) -> std::io::Result { match pos { SeekFrom::Start(offset) => { let offset = offset.try_into().map_err(|e| { @@ -180,9 +201,9 @@ impl Seek for ByteReader { } SeekFrom::End(offset) => { let tail = self - .factory - .runtime() - .block_on(self.metadata_client.fetch_current_segment_length()) + .metadata_client + .fetch_current_segment_length() + .await .map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e)))?; if tail + offset < 0 { Err(Error::new( @@ -197,6 +218,27 @@ impl Seek for ByteReader { } } } + + fn recreate_reader_wrapper(&mut self, offset: i64) { + let internal_reader = self.reader.take().unwrap().extract_reader(); + let new_reader_wrapper = PrefetchingAsyncSegmentReader::new( + self.factory.runtime().handle().clone(), + internal_reader, + offset, + self.reader_buffer_size, + ); + self.reader = Some(new_reader_wrapper); + } +} + +/// The Seek implementation for ByteReader allows seeking to a byte offset from the beginning +/// of the stream or a byte offset relative to the current position in the stream. +/// If the stream has been truncated, the byte offset will be relative to the original beginning of the stream. +impl Seek for ByteReader { + fn seek(&mut self, pos: SeekFrom) -> std::io::Result { + let factory = self.factory.clone(); + factory.runtime().block_on(self.seek_async(pos)) + } } #[cfg(test)] @@ -284,7 +326,7 @@ mod test { assert!(reader.read(&mut buf).is_err()); // read from current head - let offset = reader.current_head().expect("get current head"); + let offset = rt.block_on(reader.current_head()).expect("get current head"); reader.seek(SeekFrom::Start(offset)).expect("seek to new head"); let mut buf = vec![0; 100]; assert!(reader.read(&mut buf).is_ok()); diff --git a/src/byte/writer.rs b/src/byte/writer.rs index db3ae7e59..28d6b3882 100644 --- a/src/byte/writer.rs +++ b/src/byte/writer.rs @@ -133,9 +133,15 @@ impl ByteWriter { const CHANNEL_CAPACITY: usize = 16 * 1024 * 1024; pub(crate) fn new(segment: ScopedSegment, factory: ClientFactory) -> Self { + factory + .runtime() + .block_on(ByteWriter::new_async(segment, factory.clone())) + } + + pub(crate) async fn new_async(segment: ScopedSegment, factory: ClientFactory) -> Self { let rt = factory.runtime(); let (sender, receiver) = create_channel(Self::CHANNEL_CAPACITY); - let metadata_client = rt.block_on(factory.create_segment_metadata_client(segment.clone())); + let metadata_client = factory.create_segment_metadata_client(segment.clone()).await; let writer_id = WriterId(get_random_u128()); let stream = ScopedStream::from(&segment); let span = info_span!("Reactor", byte_stream_writer = %writer_id); @@ -152,11 +158,46 @@ impl ByteWriter { } } + /// Write data asynchronously. + /// + /// # Examples + /// ```ignore + /// let mut byte_writer = client_factory.create_byte_writer_async(segment).await; + /// let payload = vec![0; 8]; + /// let (size, handle) = byte_writer.write_async(&payload).await; + /// assert!(handle.await.is_ok()); + /// ``` + pub async fn write_async(&mut self, buf: &[u8]) -> usize { + let bytes_to_write = std::cmp::min(buf.len(), EventWriter::MAX_EVENT_SIZE); + let payload = buf[0..bytes_to_write].to_vec(); + let event_handle = self.write_internal(self.sender.clone(), payload).await; + self.write_offset += bytes_to_write as i64; + self.event_handle = Some(event_handle); + bytes_to_write + } + + /// Flush data asynchronously. + /// + /// # Examples + /// ```ignore + /// let mut byte_writer = client_factory.create_byte_writer_async(segment).await; + /// let payload = vec![0; 8]; + /// let size = byte_writer.write_async(&payload).await; + /// byte_writer.flush().await; + /// ``` + pub async fn flush_async(&mut self) -> Result<(), Error> { + if let Some(event_handle) = self.event_handle.take() { + self.flush_internal(event_handle).await + } else { + Ok(()) + } + } + /// Seal the segment and no further writes are allowed. /// /// # Examples /// ```ignore - /// let mut byte_writer = client_factory.create_byte_writer(segment); + /// let mut byte_writer = client_factory.create_byte_writer_async(segment).await; /// byte_writer.seal().await.expect("seal segment"); /// ``` pub async fn seal(&mut self) -> Result<(), Error> { @@ -174,7 +215,7 @@ impl ByteWriter { /// /// # Examples /// ```ignore - /// let byte_writer = client_factory.create_byte_writer(segment); + /// let byte_writer = client_factory.create_byte_writer_async(segment).await; /// byte_writer.truncate_data_before(1024).await.expect("truncate segment"); /// ``` pub async fn truncate_data_before(&self, offset: i64) -> Result<(), Error> { @@ -200,7 +241,7 @@ impl ByteWriter { /// This method is useful for tail reads. /// # Examples /// ```ignore - /// let mut byte_writer = client_factory.create_byte_writer(segment); + /// let mut byte_writer = client_factory.create_byte_writer_async(segment); /// byte_writer.seek_to_tail(); /// ``` pub fn seek_to_tail(&mut self) { @@ -212,6 +253,23 @@ impl ByteWriter { self.write_offset = segment_info.write_offset; } + /// Seek to the tail of the segment. + /// + /// This method is useful for tail reads. + /// # Examples + /// ```ignore + /// let mut byte_writer = client_factory.create_byte_writer_async(segment).await; + /// byte_writer.seek_to_tail_async().await; + /// ``` + pub async fn seek_to_tail_async(&mut self) { + let segment_info = self + .metadata_client + .get_segment_info() + .await + .expect("failed to get segment info"); + self.write_offset = segment_info.write_offset; + } + async fn write_internal( &self, sender: ChannelSender, diff --git a/src/client_factory.rs b/src/client_factory.rs index 3ba4b457c..111ad06d0 100644 --- a/src/client_factory.rs +++ b/src/client_factory.rs @@ -137,10 +137,18 @@ impl ClientFactory { ByteWriter::new(segment, self.clone()) } + pub async fn create_byte_writer_async(&self, segment: ScopedSegment) -> ByteWriter { + ByteWriter::new_async(segment, self.clone()).await + } + pub fn create_byte_reader(&self, segment: ScopedSegment) -> ByteReader { ByteReader::new(segment, self.clone(), self.config().reader_wrapper_buffer_size()) } + pub async fn create_byte_reader_async(&self, segment: ScopedSegment) -> ByteReader { + ByteReader::new_async(segment, self.clone(), self.config().reader_wrapper_buffer_size()).await + } + pub async fn create_table(&self, scope: Scope, name: String) -> Table { Table::new(scope, name, self.clone()) .await