Skip to content

Commit

Permalink
Merge branch 'master' into stream-tag
Browse files Browse the repository at this point in the history
  • Loading branch information
shrids committed Jun 25, 2021
2 parents 1520467 + 530b080 commit 4bc7ea6
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 55 deletions.
68 changes: 53 additions & 15 deletions integration_test/src/byte_reader_writer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub fn test_byte_stream(config: PravegaStandaloneServiceConfig) {
client_factory.controller_client(),
&scope_name,
&stream_name,
1,
2,
));

let scoped_segment = ScopedSegment {
Expand All @@ -70,19 +70,16 @@ pub fn test_byte_stream(config: PravegaStandaloneServiceConfig) {
client_factory.controller_client(),
&scope_name,
&stream_name,
4,
1,
));

for i in 0..4 {
let scoped_segment = ScopedSegment {
scope: scope_name.clone(),
stream: stream_name.clone(),
segment: Segment::from(i),
};
let mut writer = client_factory.create_byte_writer(scoped_segment.clone());
let mut reader = client_factory.create_byte_reader(scoped_segment);
test_write_and_read_with_workload(&mut writer, &mut reader);
}
let scoped_segment = ScopedSegment {
scope: scope_name,
stream: stream_name,
segment: Segment::from(0),
};
let mut writer = client_factory.create_byte_writer(scoped_segment.clone());
let mut reader = client_factory.create_byte_reader(scoped_segment);
test_write_and_read_with_workload(&mut writer, &mut reader);

let scope_name = Scope::from("testScopeByteStreamConditionalAppend".to_owned());
let stream_name = Stream::from("testStreamByteStreamConditionalAppend".to_owned());
Expand All @@ -98,6 +95,20 @@ pub fn test_byte_stream(config: PravegaStandaloneServiceConfig) {
1,
));
test_multiple_writers_conditional_append(&client_factory, segment);
let scope_name = Scope::from("testScopeAsyncByteStream".to_owned());
let stream_name = Stream::from("testStreamAsyncByteStream".to_owned());
let segment = ScopedSegment {
scope: scope_name.clone(),
stream: stream_name.clone(),
segment: Segment::from(0),
};
handle.block_on(utils::create_scope_stream(
client_factory.controller_client(),
&scope_name,
&stream_name,
1,
));
handle.block_on(test_async_write_and_read(&client_factory, segment));
}

fn test_simple_write_and_read(writer: &mut ByteWriter, reader: &mut ByteReader) {
Expand All @@ -108,7 +119,6 @@ fn test_simple_write_and_read(writer: &mut ByteWriter, reader: &mut ByteReader)
let size1 = writer.write(&payload1).expect("write payload1 to byte stream");
assert_eq!(size1, 4);
writer.flush().expect("flush byte stream writer");
writer.seek_to_tail();
assert_eq!(writer.current_write_offset(), 4);

let size2 = writer.write(&payload2).expect("write payload2 to byte stream");
Expand Down Expand Up @@ -166,7 +176,7 @@ fn test_truncation(writer: &mut ByteWriter, reader: &mut ByteReader, rt: &mut Ru
assert!(result.is_err());

// get current head
let head = reader.current_head().expect("get current head");
let head = rt.block_on(reader.current_head()).expect("get current head");
assert_eq!(head, 4);

// read from current head
Expand Down Expand Up @@ -253,3 +263,31 @@ fn test_multiple_writers_conditional_append(factory: &ClientFactory, segment: Sc
assert_eq!(writer1.current_write_offset(), 3072);
info!("test byte stream multiple writers concurrent append passed");
}

async fn test_async_write_and_read(factory: &ClientFactory, segment: ScopedSegment) {
info!("test byte stream async write and read");
let mut writer = factory.create_byte_writer_async(segment.clone()).await;
for _i in 0i32..10 {
for _j in 0i32..1000 {
let buf = vec![1; 1024];
let size = writer.write_async(&buf).await;
assert_eq!(size, 1024);
}
writer.flush_async().await.expect("flush");
writer.seek_to_tail_async().await;
}

let mut reader = factory.create_byte_reader_async(segment.clone()).await;
assert!(reader.seek_async(SeekFrom::Start(0)).await.is_ok());
let mut read = 0;
loop {
let mut buf = vec![0; 1024];
let size = reader.read_async(&mut buf).await.expect("read from byte stream");
read += size;
if read == 1024 * 1000 * 10 {
break;
}
}
assert_eq!(reader.available(), 0);
info!("test async write and read passed");
}
2 changes: 1 addition & 1 deletion integration_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ mod test {
let config = PravegaStandaloneServiceConfig::new(false, true, true);
run_tests(config);

let config = PravegaStandaloneServiceConfig::new(false, false, false);
let config = PravegaStandaloneServiceConfig::new(true, false, false);
run_tests(config);

// disconnection test will start its own Pravega Standalone.
Expand Down
2 changes: 1 addition & 1 deletion python_binding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ derive-new = "0.5"
pyo3 = { features = ["extension-module"], optional = true, version = "0.13" }
#WASM bindings
wasm-bindgen = { version = "0.2.63", optional = true }
cfg-if = "0.1.10"
cfg-if = "0.1.10"
1 change: 1 addition & 0 deletions shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ impl From<&str> for ScopedSegment {
let mut tokens = NameUtils::extract_segment_tokens(qualified_name.to_owned());
let segment_id = tokens.pop().expect("get segment id from tokens");
let stream_name = tokens.pop().expect("get stream name from tokens");

if tokens.is_empty() {
// scope not present
ScopedSegment {
Expand Down
110 changes: 76 additions & 34 deletions src/byte/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,24 @@ impl Read for ByteReader {

impl ByteReader {
pub(crate) fn new(segment: ScopedSegment, factory: ClientFactory, buffer_size: usize) -> Self {
let async_reader = factory
factory
.runtime()
.block_on(factory.create_async_event_reader(segment.clone()));
.block_on(ByteReader::new_async(segment, factory.clone(), buffer_size))
}

pub(crate) async fn new_async(
segment: ScopedSegment,
factory: ClientFactory,
buffer_size: usize,
) -> Self {
let async_reader = factory.create_async_event_reader(segment.clone()).await;
let async_reader_wrapper = PrefetchingAsyncSegmentReader::new(
factory.runtime().handle().clone(),
Arc::new(Box::new(async_reader)),
0,
buffer_size,
);
let metadata_client = factory
.runtime()
.block_on(factory.create_segment_metadata_client(segment));
let metadata_client = factory.create_segment_metadata_client(segment).await;
ByteReader {
reader_id: Uuid::new_v4(),
reader: Some(async_reader_wrapper),
Expand All @@ -100,19 +106,49 @@ impl ByteReader {
}
}

/// Read data asynchronously.
///
/// ```ignore
/// let mut byte_reader = client_factory.create_byte_reader_async(segment).await;
/// let mut buf: Vec<u8> = vec![0; 4];
/// let size = byte_reader.read_async(&mut buf).expect("read");
/// ```
pub async fn read_async(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
self.reader
.as_mut()
.unwrap()
.read(buf)
.await
.map_err(|e| Error::new(ErrorKind::Other, format!("Error: {:?}", e)))
}

/// Return the head of current readable data in the segment.
///
/// The ByteReader is initialized to read from the segment at offset 0. However, it might
/// encounter the SegmentIsTruncated error due to the segment has been truncated. In this case,
/// application should call this method to get the current readable head and read from it.
/// ```ignore
/// let mut byte_reader = client_factory.create_byte_reader(segment);
/// let offset = byte_reader.current_head().expect("get current head offset");
/// let mut byte_reader = client_factory.create_byte_reader_async(segment).await;
/// let offset = byte_reader.current_head().await.expect("get current head offset");
/// ```
pub fn current_head(&self) -> std::io::Result<u64> {
self.factory
.runtime()
.block_on(self.metadata_client.fetch_current_starting_head())
pub async fn current_head(&self) -> std::io::Result<u64> {
self.metadata_client
.fetch_current_starting_head()
.await
.map(|i| i as u64)
.map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e)))
}

/// Return the tail offset of the segment.
///
/// ```ignore
/// let mut byte_reader = client_factory.create_byte_reader_async(segment).await;
/// let offset = byte_reader.current_tail().await.expect("get current tail offset");
/// ```
pub async fn current_tail(&self) -> std::io::Result<u64> {
self.metadata_client
.fetch_current_segment_length()
.await
.map(|i| i as u64)
.map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e)))
}
Expand All @@ -123,8 +159,8 @@ impl ByteReader {
/// let mut byte_reader = client_factory.create_byte_reader(segment);
/// let offset = byte_reader.current_offset();
/// ```
pub fn current_offset(&self) -> i64 {
self.reader.as_ref().unwrap().offset
pub fn current_offset(&self) -> u64 {
self.reader.as_ref().unwrap().offset as u64
}

/// Return the bytes that are available to read instantly without fetching from server.
Expand All @@ -138,23 +174,8 @@ impl ByteReader {
self.reader.as_ref().unwrap().available()
}

fn recreate_reader_wrapper(&mut self, offset: i64) {
let internal_reader = self.reader.take().unwrap().extract_reader();
let new_reader_wrapper = PrefetchingAsyncSegmentReader::new(
self.factory.runtime().handle().clone(),
internal_reader,
offset,
self.reader_buffer_size,
);
self.reader = Some(new_reader_wrapper);
}
}

/// The Seek implementation for ByteStreamReader allows seeking to a byte offset from the beginning
/// of the stream or a byte offset relative to the current position in the stream.
/// If the stream has been truncated, the byte offset will be relative to the original beginning of the stream.
impl Seek for ByteReader {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
/// Seek to an offset asynchronously.
pub async fn seek_async(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
match pos {
SeekFrom::Start(offset) => {
let offset = offset.try_into().map_err(|e| {
Expand All @@ -180,9 +201,9 @@ impl Seek for ByteReader {
}
SeekFrom::End(offset) => {
let tail = self
.factory
.runtime()
.block_on(self.metadata_client.fetch_current_segment_length())
.metadata_client
.fetch_current_segment_length()
.await
.map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e)))?;
if tail + offset < 0 {
Err(Error::new(
Expand All @@ -197,6 +218,27 @@ impl Seek for ByteReader {
}
}
}

fn recreate_reader_wrapper(&mut self, offset: i64) {
let internal_reader = self.reader.take().unwrap().extract_reader();
let new_reader_wrapper = PrefetchingAsyncSegmentReader::new(
self.factory.runtime().handle().clone(),
internal_reader,
offset,
self.reader_buffer_size,
);
self.reader = Some(new_reader_wrapper);
}
}

/// The Seek implementation for ByteReader allows seeking to a byte offset from the beginning
/// of the stream or a byte offset relative to the current position in the stream.
/// If the stream has been truncated, the byte offset will be relative to the original beginning of the stream.
impl Seek for ByteReader {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
let factory = self.factory.clone();
factory.runtime().block_on(self.seek_async(pos))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -284,7 +326,7 @@ mod test {
assert!(reader.read(&mut buf).is_err());

// read from current head
let offset = reader.current_head().expect("get current head");
let offset = rt.block_on(reader.current_head()).expect("get current head");
reader.seek(SeekFrom::Start(offset)).expect("seek to new head");
let mut buf = vec![0; 100];
assert!(reader.read(&mut buf).is_ok());
Expand Down
Loading

0 comments on commit 4bc7ea6

Please sign in to comment.