Skip to content

Commit

Permalink
Issue 241: Reorg code (#248)
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <wenqi.mou@dell.com>
  • Loading branch information
Wenqi Mou authored May 11, 2021
1 parent 94a4351 commit e6ed8c1
Show file tree
Hide file tree
Showing 77 changed files with 3,754 additions and 3,827 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/cibuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace
args: --workspace --features integration-test
env: #Will be handled by clippy
RUSTFLAGS: -A warnings

Expand Down Expand Up @@ -100,7 +100,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: -p pravega-client-channel -p pravega-controller-client -p pravega-client-integration-test -p pravega-client-retry -p pravega-client-shared -p pravega-wire-protocol -p pravega-client -p pravega-client-auth -p pravega-client-config
args: --features integration-test -p pravega-client-channel -p pravega-controller-client -p pravega-client-integration-test -p pravega-client-retry -p pravega-client-shared -p pravega-wire-protocol -p pravega-client -p pravega-client-auth -p pravega-client-config
- name: Run code cov
run: |
chmod +x ./.github/workflows/codecov/codecov.sh
Expand Down Expand Up @@ -195,6 +195,6 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: clippy
args: --workspace
args: --workspace --features integration-test
env:
RUSTFLAGS: -D warnings
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ path = "src/cli.rs"
required-features = ["cli"]

[features]
default = ["cli"]
cli = ["clap", "structopt"]
default = []
cli = []
integration-test = []

[[bench]]
name = "benchmark"
Expand Down
16 changes: 8 additions & 8 deletions benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use criterion::{criterion_group, criterion_main, Criterion};
use byteorder::BigEndian;
use pravega_client::byte_stream::ByteStreamReader;
use pravega_client::client_factory::ClientFactory;
use pravega_client::error::SegmentWriterError;
use pravega_client::event_reader::EventReader;
use pravega_client::event_stream_writer::EventStreamWriter;
use pravega_client::errors::SegmentWriterError;
use pravega_client::event::event_reader::EventReader;
use pravega_client::event::writer::EventStreamWriter;
use pravega_client_config::connection_type::{ConnectionType, MockType};
use pravega_client_config::{ClientConfig, ClientConfigBuilder};
use pravega_client_shared::*;
Expand Down Expand Up @@ -345,20 +345,20 @@ async fn set_up_event_stream_writer(config: ClientConfig) -> EventStreamWriter {
let scope_name: Scope = Scope::from("testWriterPerf".to_string());
let stream_name = Stream::from("testWriterPerf".to_string());
let client_factory = ClientFactory::new(config.clone());
let controller_client = client_factory.get_controller_client();
let controller_client = client_factory.controller_client();
create_scope_stream(controller_client, &scope_name, &stream_name, 1).await;
let scoped_stream = ScopedStream {
scope: scope_name.clone(),
stream: stream_name.clone(),
};
client_factory.create_event_stream_writer(scoped_stream)
client_factory.create_event_writer(scoped_stream)
}

async fn set_up_event_stream_reader(config: ClientConfig) -> EventReader {
let scope_name: Scope = Scope::from("testReaderPerf".to_string());
let stream_name = Stream::from("testReaderPerf".to_string());
let client_factory = ClientFactory::new(config.clone());
let controller_client = client_factory.get_controller_client();
let controller_client = client_factory.controller_client();
create_scope_stream(controller_client, &scope_name, &stream_name, 1).await;
let scoped_stream = ScopedStream {
scope: scope_name.clone(),
Expand All @@ -376,15 +376,15 @@ fn set_up_byte_stream_reader(config: ClientConfig, rt: &mut Runtime) -> ByteStre
let scope_name: Scope = Scope::from("testByteReaderPerf".to_string());
let stream_name = Stream::from("testByteReaderPerf".to_string());
let client_factory = ClientFactory::new(config.clone());
let controller_client = client_factory.get_controller_client();
let controller_client = client_factory.controller_client();
rt.block_on(create_scope_stream(
controller_client,
&scope_name,
&stream_name,
1,
));
let scoped_segment = ScopedSegment::from("testByteReaderPerf/testByteReaderPerf/0");
client_factory.create_byte_stream_reader(scoped_segment)
client_factory.create_byte_reader(scoped_segment)
}

async fn create_scope_stream(
Expand Down
30 changes: 15 additions & 15 deletions bindings/src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ impl StreamManager {
///
#[text_signature = "($self, scope_name)"]
pub fn create_scope(&self, scope_name: &str) -> PyResult<bool> {
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();

info!("creating scope {:?}", scope_name);

let controller = self.cf.get_controller_client();
let controller = self.cf.controller_client();
let scope_name = Scope::from(scope_name.to_string());

let scope_result = handle.block_on(controller.create_scope(&scope_name));
Expand All @@ -96,10 +96,10 @@ impl StreamManager {
///
#[text_signature = "($self, scope_name)"]
pub fn delete_scope(&self, scope_name: &str) -> PyResult<bool> {
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();
info!("Delete scope {:?}", scope_name);

let controller = self.cf.get_controller_client();
let controller = self.cf.controller_client();
let scope_name = Scope::from(scope_name.to_string());

let scope_result = handle.block_on(controller.delete_scope(&scope_name));
Expand All @@ -120,7 +120,7 @@ impl StreamManager {
stream_name: &str,
initial_segments: i32,
) -> PyResult<bool> {
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();
info!(
"creating stream {:?} under scope {:?} with segment count {:?}",
stream_name, scope_name, initial_segments
Expand All @@ -141,7 +141,7 @@ impl StreamManager {
retention_param: 0,
},
};
let controller = self.cf.get_controller_client();
let controller = self.cf.controller_client();

let stream_result = handle.block_on(controller.create_stream(&stream_cfg));
info!("Stream creation status {:?}", stream_result);
Expand All @@ -156,14 +156,14 @@ impl StreamManager {
///
#[text_signature = "($self, scope_name, stream_name)"]
pub fn seal_stream(&self, scope_name: &str, stream_name: &str) -> PyResult<bool> {
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();
info!("Sealing stream {:?} under scope {:?} ", stream_name, scope_name);
let scoped_stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
};

let controller = self.cf.get_controller_client();
let controller = self.cf.controller_client();

let stream_result = handle.block_on(controller.seal_stream(&scoped_stream));
info!("Sealing stream status {:?}", stream_result);
Expand All @@ -178,14 +178,14 @@ impl StreamManager {
///
#[text_signature = "($self, scope_name, stream_name)"]
pub fn delete_stream(&self, scope_name: &str, stream_name: &str) -> PyResult<bool> {
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();
info!("Deleting stream {:?} under scope {:?} ", stream_name, scope_name);
let scoped_stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
};

let controller = self.cf.get_controller_client();
let controller = self.cf.controller_client();
let stream_result = handle.block_on(controller.delete_stream(&scoped_stream));
info!("Deleting stream status {:?}", stream_result);
match stream_result {
Expand All @@ -211,7 +211,7 @@ impl StreamManager {
stream: Stream::from(stream_name.to_string()),
};
let stream_writer = StreamWriter::new(
self.cf.create_event_stream_writer(scoped_stream.clone()),
self.cf.create_event_writer(scoped_stream.clone()),
self.cf.clone(),
scoped_stream,
);
Expand Down Expand Up @@ -239,10 +239,10 @@ impl StreamManager {
scope: Scope::from(scope_name.to_owned()),
stream: Stream::from(stream_name.to_owned()),
};
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();
let txn_writer = handle.block_on(
self.cf
.create_transactional_event_stream_writer(scoped_stream.clone(), WriterId(writer_id)),
.create_transactional_event_writer(scoped_stream.clone(), WriterId(writer_id)),
);
let txn_stream_writer = StreamTxnWriter::new(txn_writer, self.cf.clone(), scoped_stream);
Ok(txn_stream_writer)
Expand All @@ -255,7 +255,7 @@ impl StreamManager {
/// import pravega_client;
/// manager=pravega_client.StreamManager("127.0.0.1:9090")
/// // Create a ReaderGroup against an already created Pravega scope and Stream.
/// reader_group=manager.create_reader_group("rg1", "scope", "stream")
/// event.reader_group=manager.create_reader_group("rg1", "scope", "stream")
/// ```
///
#[text_signature = "($self, reader_group_name, scope_name, stream_name)"]
Expand All @@ -270,7 +270,7 @@ impl StreamManager {
scope: scope.clone(),
stream: Stream::from(stream_name.to_string()),
};
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();
let rg = handle.block_on(self.cf.create_reader_group(
scope,
reader_group_name.to_string(),
Expand Down
10 changes: 5 additions & 5 deletions bindings/src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@

cfg_if! {
if #[cfg(feature = "python_binding")] {
use pravega_client::event_reader::EventReader;
use pravega_client::event::reader::EventReader;
use pravega_client_shared::ScopedStream;
use pravega_client::client_factory::ClientFactory;
use pyo3::prelude::*;
use pyo3::PyResult;
use pyo3::PyObjectProtocol;
use tracing::info;
use std::sync::Arc;
use pravega_client::segment_slice::{Event, SegmentSlice};
use pravega_client::event::reader::{Event, SegmentSlice};
use pyo3::PyIterProtocol;
use tokio::sync::Mutex;
}
Expand Down Expand Up @@ -67,7 +67,7 @@ impl StreamReader {
(fut.clone_ref(py), fut, loop_event)
};
let read = self.reader.clone();
let _guard = self.factory.get_runtime().enter();
let _guard = self.factory.runtime().enter();
tokio::spawn(async move {
let slice_result = read.lock().await.acquire_segment().await;
let slice_py: Slice = Slice {
Expand All @@ -92,7 +92,7 @@ impl StreamReader {
///
#[text_signature = "($self)"]
pub fn reader_offline(&self) -> PyResult<()> {
self.factory.get_runtime().block_on(self.reader_offline_async());
self.factory.runtime().block_on(self.reader_offline_async());
Ok(())
}

Expand All @@ -103,7 +103,7 @@ impl StreamReader {
pub fn release_segment(&self, slice: &mut Slice) -> PyResult<()> {
info!("Release segment slice back");
if let Some(s) = slice.get_set_to_none() {
self.factory.get_runtime().block_on(self.release_segment_async(s));
self.factory.runtime().block_on(self.release_segment_async(s));
}
Ok(())
}
Expand Down
14 changes: 6 additions & 8 deletions bindings/src/stream_reader_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::stream_reader::StreamReader;
cfg_if! {
if #[cfg(feature = "python_binding")] {
use pravega_client_shared::ScopedStream;
use pravega_client::event_reader_group::ReaderGroup;
use pravega_client::event::reader_group::ReaderGroup;
use pravega_client::client_factory::ClientFactory;
use pyo3::prelude::*;
use pyo3::PyResult;
Expand Down Expand Up @@ -50,8 +50,8 @@ impl StreamReaderGroup {
/// import pravega_client;
/// manager=pravega_client.StreamManager("127.0.0.1:9090")
/// // lets assume the Pravega scope and stream are already created.
/// reader_group=manager.create_reader_group("rg1", "scope", "stream")
/// reader=reader_group.create_reader("reader_id");
/// event.reader_group=manager.create_reader_group("rg1", "scope", "stream")
/// reader=event.reader_group.create_reader("reader_id");
/// slice=await reader.get_segment_slice_async()
/// for event in slice:
/// print(event.data())
Expand All @@ -60,12 +60,11 @@ impl StreamReaderGroup {
pub fn create_reader(&self, reader_name: &str) -> PyResult<StreamReader> {
info!(
"Creating reader {:?} under reader group {:?}",
reader_name,
self.reader_group.get_name()
reader_name, self.reader_group.name
);
let reader = self
.factory
.get_runtime()
.runtime()
.block_on(self.reader_group.create_reader(reader_name.to_string()));
let stream_reader = StreamReader::new(
Arc::new(Mutex::new(reader)),
Expand All @@ -79,8 +78,7 @@ impl StreamReaderGroup {
fn to_str(&self) -> String {
format!(
"Stream: {:?} , ReaderGroup: {:?}",
self.stream,
self.reader_group.get_name()
self.stream, self.reader_group.name
)
}
}
Expand Down
18 changes: 9 additions & 9 deletions bindings/src/stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
// http://www.apache.org/licenses/LICENSE-2.0
//

use std::io::Error;
cfg_if! {
if #[cfg(feature = "python_binding")] {
use pravega_client::error::SegmentWriterError;
use pravega_client::event_stream_writer::EventStreamWriter;
use pravega_client::event::writer::EventWriter;
use pravega_client_shared::ScopedStream;
use pravega_client::client_factory::ClientFactory;
use pyo3::exceptions;
Expand All @@ -33,7 +33,7 @@ cfg_if! {
#[pyclass]
#[derive(new)]
pub(crate) struct StreamWriter {
writer: EventStreamWriter,
writer: EventWriter,
factory: ClientFactory,
stream: ScopedStream,
}
Expand Down Expand Up @@ -97,26 +97,26 @@ impl StreamWriter {
#[args(event, routing_key = "None", "*")]
pub fn write_event_bytes(&mut self, event: &[u8], routing_key: Option<&str>) -> PyResult<()> {
// to_vec creates an owned copy of the python byte array object.
let write_future: tokio::sync::oneshot::Receiver<Result<(), SegmentWriterError>> = match routing_key {
let write_future: tokio::sync::oneshot::Receiver<Result<(), Error>> = match routing_key {
Option::None => {
trace!("Writing a single event with no routing key");
self.factory
.get_runtime()
.runtime()
.block_on(self.writer.write_event(event.to_vec()))
}
Option::Some(key) => {
trace!("Writing a single event for a given routing key {:?}", key);
self.factory
.get_runtime()
.runtime()
.block_on(self.writer.write_event_by_routing_key(key.into(), event.to_vec()))
}
};

let _guard = self.factory.get_runtime().enter();
let _guard = self.factory.runtime().enter();
let timeout_fut = timeout(Duration::from_secs(TIMEOUT_IN_SECONDS), write_future);

let result: Result<Result<Result<(), SegmentWriterError>, RecvError>, _> =
self.factory.get_runtime().block_on(timeout_fut);
let result: Result<Result<Result<(), Error>, RecvError>, _> =
self.factory.runtime().block_on(timeout_fut);
match result {
Ok(t) => match t {
Ok(t1) => match t1 {
Expand Down
11 changes: 4 additions & 7 deletions bindings/src/stream_writer_transactional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

cfg_if! {
if #[cfg(feature = "python_binding")] {
use pravega_client::transaction::transactional_event_stream_writer::TransactionalEventStreamWriter;
use pravega_client::event::transactional_writer::TransactionalEventWriter;
use pravega_client::client_factory::ClientFactory;
use pyo3::exceptions;
use pyo3::prelude::*;
Expand All @@ -31,7 +31,7 @@ cfg_if! {
#[pyclass]
#[derive(new)]
pub(crate) struct StreamTxnWriter {
writer: TransactionalEventStreamWriter,
writer: TransactionalEventWriter,
factory: ClientFactory,
stream: ScopedStream,
}
Expand All @@ -46,7 +46,7 @@ impl StreamTxnWriter {
///
#[text_signature = "($self)"]
pub fn begin_txn(&mut self) -> PyResult<StreamTransaction> {
let result = self.factory.get_runtime().block_on(self.writer.begin());
let result = self.factory.runtime().block_on(self.writer.begin());
match result {
Ok(txn) => Ok(StreamTransaction::new(txn, self.factory.clone())),
Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))),
Expand All @@ -59,10 +59,7 @@ impl StreamTxnWriter {
#[text_signature = "($self, txn_id)"]
pub fn get_txn(&mut self, txn_id: u128) -> PyResult<StreamTransaction> {
debug!("Writing a single event for a given routing key");
let result = self
.factory
.get_runtime()
.block_on(self.writer.get_txn(TxId(txn_id)));
let result = self.factory.runtime().block_on(self.writer.get_txn(TxId(txn_id)));

match result {
Ok(txn) => Ok(StreamTransaction::new(txn, self.factory.clone())),
Expand Down
Loading

0 comments on commit e6ed8c1

Please sign in to comment.