Skip to content

Commit

Permalink
Issue 319: Enable delete readergroup (#342)
Browse files Browse the repository at this point in the history
Enable delete ReaderGroup on the RUST and Python clients.

Signed-off-by: Luis Liu <luis_liu@dell.com>
Co-authored-by: Luis Liu <luis_liu@dell.com>
Co-authored-by: Sandeep <sandeep.shridhar@emc.com>
Co-authored-by: Tom Kaitchuck <tkaitchuck@users.noreply.github.com>
Co-authored-by: Luis Liu <vangork@live.com>
Co-authored-by: Luis Liu <luis_liu@dell.com>
  • Loading branch information
5 people authored Mar 22, 2022
1 parent 6f67ec1 commit b57b2ea
Show file tree
Hide file tree
Showing 16 changed files with 303 additions and 96 deletions.
2 changes: 1 addition & 1 deletion config/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Credentials {

pub fn keycloak(path: &str, disable_cert_verification: bool) -> Self {
// read keycloak json
let file = File::open(path.to_string()).expect("open keycloak.json");
let file = File::open(path).expect("open keycloak.json");
let mut buf_reader = BufReader::new(file);
let mut buffer = Vec::new();
buf_reader.read_to_end(&mut buffer).expect("read to the end");
Expand Down
2 changes: 1 addition & 1 deletion config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl ClientConfigBuilder {
Err(format!(
"is_tls_enabled option {} does not match scheme in uri {}",
is_tls_enabled,
self.controller_uri.as_ref().unwrap().to_string()
**self.controller_uri.as_ref().unwrap()
))
} else {
Ok(())
Expand Down
23 changes: 19 additions & 4 deletions integration_test/src/event_reader_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use pravega_client_shared::{
};
use pravega_controller_client::ControllerClient;

use pravega_wire_protocol::commands::NoSuchSegmentCommand;
use pravega_wire_protocol::wire_commands::Replies;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
Expand Down Expand Up @@ -399,7 +401,7 @@ async fn test_read_api(client_factory: &ClientFactoryAsync) {
let stream_name = Stream::from("testReaderStream".to_owned());

const NUM_EVENTS: usize = 10;
const EVNET_SIZE: usize = 10;
const EVENT_SIZE: usize = 10;

let new_stream =
create_scope_stream(client_factory.controller_client(), &scope_name, &stream_name, 4).await;
Expand All @@ -411,7 +413,7 @@ async fn test_read_api(client_factory: &ClientFactoryAsync) {
stream_name.clone(),
client_factory.clone(),
NUM_EVENTS,
EVNET_SIZE,
EVENT_SIZE,
)
.await;
}
Expand All @@ -432,7 +434,7 @@ async fn test_read_api(client_factory: &ClientFactoryAsync) {
loop {
if let Some(event) = slice.next() {
assert_eq!(
vec![1; EVNET_SIZE],
vec![1; EVENT_SIZE],
event.value.as_slice(),
"Corrupted event read"
);
Expand All @@ -450,7 +452,20 @@ async fn test_read_api(client_factory: &ClientFactoryAsync) {
break;
}
}
info!("test event stream reader read api passed");
info!("read all events from the stream");
client_factory
.delete_reader_group(scope_name, "rg-read-api".to_string())
.await
.expect("Deletion of ReaderGroup failed");
// Attempt acquiring a segment of a deleted ReaderGroup.
let acquire_segment_result = reader.acquire_segment().await;
//Verify the operation fails with EventReaderError
assert!(
acquire_segment_result.is_err(),
"After reader group deletion acquire_segment API should return an error"
);
let reader_offline_result = reader.reader_offline().await;
assert!(reader_offline_result.is_ok(), "The reader is already offline");
}

fn test_multiple_readers(client_factory: &ClientFactoryAsync) {
Expand Down
7 changes: 3 additions & 4 deletions integration_test/src/pravega_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,10 @@ impl PravegaService for PravegaStandaloneService {
drop(src); // Close the file early

// Run the replace operation in memory
let new_data: String;
if enable {
new_data = data.replace("INFO", "DEBUG");
let new_data = if enable {
data.replace("INFO", "DEBUG")
} else {
new_data = data.replace("DEBUG", "INFO");
data.replace("DEBUG", "INFO")
};

// Recreate the file and dump the processed contents to it
Expand Down
4 changes: 2 additions & 2 deletions integration_test/src/transactional_event_writer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async fn test_multiple_transactions(factory: &ClientFactory, stream: ScopedStrea
for mut transaction in transactions {
let event = vec![1; 100];
transaction.write_event(None, event).await.expect("write event");
let timestamp = Timestamp { 0: 0 };
let timestamp = Timestamp(0);
transaction.commit(timestamp).await.expect("commit");
}

Expand All @@ -208,7 +208,7 @@ async fn test_multiple_transactions(factory: &ClientFactory, stream: ScopedStrea
for mut transaction in transactions {
let event = vec![1; 100];
transaction.write_event(None, event).await.expect("write event");
let timestamp = Timestamp { 0: 0 };
let timestamp = Timestamp(0);
transaction.commit(timestamp).await.expect("commit");
}
}
Expand Down
26 changes: 26 additions & 0 deletions python/src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,32 @@ impl StreamManager {
Ok(reader_group)
}

///
/// Delete a ReaderGroup.
///
/// ```
/// import pravega_client;
/// manager=pravega_client.StreamManager("tcp://127.0.0.1:9090")
/// // Delete a ReaderGroup against an already created Pravega scope..
/// manager.delete_reader_group_with_config("rg1", "scope", rg_config)
///
/// ```
///
#[pyo3(text_signature = "($self, reader_group_name, scope_name)")]
pub fn delete_reader_group(&self, reader_group_name: &str, scope_name: &str) -> PyResult<()> {
let scope = Scope::from(scope_name.to_string());

let handle = self.cf.runtime_handle();

let delete_result =
handle.block_on(self.cf.delete_reader_group(scope, reader_group_name.to_string()));
info!("Delete ReaderGroup {:?}", delete_result);
match delete_result {
Ok(_) => Ok(()),
Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))),
}
}

///
/// Create a Binary I/O representation of a Pravega Stream. This ByteStream implements the
/// APIs provided by [io.IOBase](https://docs.python.org/3/library/io.html#io.IOBase)
Expand Down
6 changes: 3 additions & 3 deletions shared/src/naming_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ impl NameUtils {
);
if let Some(transaction_id) = tx_id {
format!(
"{}{}{}{}",
"{}{}{:016x}{:016x}",
segment_name,
TRANSACTION_DELIMITER,
format!("{:016x}", (transaction_id.0 >> 64) as i64),
format!("{:016x}", transaction_id.0 as i64)
(transaction_id.0 >> 64) as i64,
transaction_id.0 as i64,
)
} else {
segment_name
Expand Down
30 changes: 29 additions & 1 deletion src/client_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::segment::metadata::SegmentMetadataClient;
use crate::segment::raw_client::RawClientImpl;
use crate::segment::reader::AsyncSegmentReaderImpl;
use crate::sync::synchronizer::Synchronizer;
use crate::sync::table::Table;
use crate::sync::table::{Table, TableError};
cfg_if::cfg_if! {
if #[cfg(feature = "integration-test")] {
use crate::test_utils::{RawClientWrapper, SegmentReaderWrapper};
Expand Down Expand Up @@ -155,6 +155,23 @@ impl ClientFactory {
.await
}

///
/// Delete a ReaderGroup.
///
pub async fn delete_reader_group(
&self,
scope: Scope,
reader_group_name: String,
) -> Result<(), TableError> {
info!(
"Deleting reader group {:?} under scope {:?}",
reader_group_name, scope
);
self.client_factory_async
.delete_reader_group(scope, reader_group_name)
.await
}

pub async fn create_transactional_event_writer(
&self,
stream: ScopedStream,
Expand Down Expand Up @@ -316,6 +333,17 @@ impl ClientFactoryAsync {
ReaderGroup::create(scope, reader_group_name, rg_config, self.clone()).await
}

///
/// Delete a ReaderGroup given for a given scope.
///
pub async fn delete_reader_group(
&self,
scope: Scope,
reader_group_name: String,
) -> Result<(), TableError> {
ReaderGroup::delete(scope, reader_group_name, self.clone()).await
}

pub async fn create_transactional_event_writer(
&self,
stream: ScopedStream,
Expand Down
59 changes: 48 additions & 11 deletions src/event/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ impl EventReader {
.lock()
.await
.compute_segments_to_acquire_or_release(&reader)
.await;
.await
.expect("should compute segments");
// attempt acquiring the desired number of segments.
if new_segments_to_acquire > 0 {
for _ in 0..new_segments_to_acquire {
Expand Down Expand Up @@ -267,9 +268,20 @@ impl EventReader {
/// is assumed dead.
pub async fn release_segment(&mut self, mut slice: SegmentSlice) -> Result<(), EventReaderError> {
info!(
"releasing segment slice {} from reader {}",
"releasing segment slice {} from reader {:?}",
slice.meta.scoped_segment, self.id
);
// check if the reader is already offline.
if self.meta.reader_offline {
return Err(EventReaderError::StateError {
source: ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: format!("Reader already marked offline {:?}", self.id),
source: SynchronizerError::SyncPreconditionError {
error_msg: String::from("Precondition failure"),
},
},
});
}
//update meta data.
let scoped_segment = ScopedSegment::from(slice.meta.scoped_segment.clone().as_str());
self.meta.add_slices(slice.meta.clone());
Expand Down Expand Up @@ -322,6 +334,16 @@ impl EventReader {
slice.meta.end_offset >= offset,
"the offset where the segment slice is released should be less than the end offset"
);
if self.meta.reader_offline {
return Err(EventReaderError::StateError {
source: ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: format!("Reader already marked offline {:?}", self.id),
source: SynchronizerError::SyncPreconditionError {
error_msg: String::from("Precondition failure"),
},
},
});
}
let segment = ScopedSegment::from(slice.meta.scoped_segment.as_str());
if slice.meta.read_offset != offset {
self.meta.stop_reading(&segment);
Expand Down Expand Up @@ -361,7 +383,7 @@ impl EventReader {
/// that another thread removes this reader from the ReaderGroup probably due to the host of this reader
/// is assumed dead.
pub async fn reader_offline(&mut self) -> Result<(), EventReaderError> {
if !self.meta.reader_offline {
if !self.meta.reader_offline && self.rg_state.lock().await.check_online(&self.id).await {
info!("Putting reader {:?} offline", self.id);
// stop reading from all the segments.
self.meta.stop_reading_all();
Expand Down Expand Up @@ -411,12 +433,23 @@ impl EventReader {
mut slice: SegmentSlice,
read_offset: i64,
) -> Result<(), EventReaderError> {
if self.meta.reader_offline {
return Err(EventReaderError::StateError {
source: ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: format!("Reader already marked offline {:?}", self.id),
source: SynchronizerError::SyncPreconditionError {
error_msg: String::from("Precondition failure"),
},
},
});
}
let new_segments_to_release = self
.rg_state
.lock()
.await
.compute_segments_to_acquire_or_release(&self.id)
.await;
.await
.map_err(|err| EventReaderError::StateError { source: err })?;
let segment = ScopedSegment::from(slice.meta.scoped_segment.as_str());
// check if segments needs to be released from the reader
if new_segments_to_release < 0 {
Expand Down Expand Up @@ -462,7 +495,10 @@ impl EventReader {
if self.meta.reader_offline || !self.rg_state.lock().await.check_online(&self.id).await {
return Err(EventReaderError::StateError {
source: ReaderGroupStateError::ReaderAlreadyOfflineError {
error_msg: format!("Reader already marked offline {:?}", self.id),
error_msg: format!(
"Reader already marked offline {:?} or the ReaderGroup is deleted",
self.id
),
source: SynchronizerError::SyncPreconditionError {
error_msg: String::from("Precondition failure"),
},
Expand Down Expand Up @@ -666,7 +702,8 @@ impl EventReader {
.lock()
.await
.compute_segments_to_acquire_or_release(&self.id)
.await;
.await
.expect("should compute segments");
if new_segments_to_acquire <= 0 {
Ok(None)
} else {
Expand Down Expand Up @@ -1288,7 +1325,7 @@ mod tests {
rg_mock.expect_check_online().return_const(true);
rg_mock
.expect_compute_segments_to_acquire_or_release()
.return_const(0 as isize);
.return_once(move |_| Ok(0 as isize));
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
// create a new Event Reader with the segment slice data.
let mut reader = EventReader::init_event_reader(
Expand Down Expand Up @@ -1356,7 +1393,7 @@ mod tests {
rg_mock
.expect_compute_segments_to_acquire_or_release()
.with(predicate::eq(Reader::from("r1".to_string())))
.return_const(1 as isize);
.return_once(move |_| Ok(1 as isize));
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
rg_mock.expect_check_online().return_const(true);

Expand Down Expand Up @@ -1460,7 +1497,7 @@ mod tests {
let mut rg_mock: ReaderGroupState = ReaderGroupState::default();
rg_mock
.expect_compute_segments_to_acquire_or_release()
.return_const(0 as isize);
.return_once(move |_| Ok(0 as isize));
rg_mock.expect_check_online().return_const(true);
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
// create a new Event Reader with the segment slice data.
Expand Down Expand Up @@ -1535,7 +1572,7 @@ mod tests {
rg_mock.expect_check_online().return_const(true);
rg_mock
.expect_compute_segments_to_acquire_or_release()
.return_const(0 as isize);
.return_once(move |_| Ok(0 as isize));
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
// create a new Event Reader with the segment slice data.
let mut reader = EventReader::init_event_reader(
Expand Down Expand Up @@ -1619,7 +1656,7 @@ mod tests {
rg_mock.expect_check_online().return_const(true);
rg_mock
.expect_compute_segments_to_acquire_or_release()
.return_const(0 as isize);
.return_once(move |_| Ok(0 as isize));
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
// create a new Event Reader with the segment slice data.
let mut reader = EventReader::init_event_reader(
Expand Down
11 changes: 11 additions & 0 deletions src/event/reader_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use crate::event::reader_group_state::{Offset, ReaderGroupStateError};

use pravega_client_shared::{Reader, Scope, ScopedSegment, ScopedStream};

use crate::sync::table::TableError;
use crate::sync::Table;
use serde::{Deserialize, Serialize};
use serde_cbor::Error as CborError;
use serde_cbor::{from_slice, to_vec};
Expand Down Expand Up @@ -143,6 +145,15 @@ impl ReaderGroup {
}
}

/// Delete a reader group.
pub(crate) async fn delete(
scope: Scope,
name: String,
client_factory: ClientFactoryAsync,
) -> Result<(), TableError> {
Table::delete(scope, name, client_factory).await
}

/// Create a new EventReader under the ReaderGroup. This method panics if the reader is
/// already part of the reader group.
///
Expand Down
Loading

0 comments on commit b57b2ea

Please sign in to comment.