Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 241: Reorg code #248

Merged
merged 14 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/cibuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace
args: --workspace --features integration-test
env: #Will be handled by clippy
RUSTFLAGS: -A warnings

Expand Down Expand Up @@ -100,7 +100,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: -p pravega-client-channel -p pravega-controller-client -p pravega-client-integration-test -p pravega-client-retry -p pravega-client-shared -p pravega-wire-protocol -p pravega-client -p pravega-client-auth -p pravega-client-config
args: --features integration-test -p pravega-client-channel -p pravega-controller-client -p pravega-client-integration-test -p pravega-client-retry -p pravega-client-shared -p pravega-wire-protocol -p pravega-client -p pravega-client-auth -p pravega-client-config
- name: Run code cov
run: |
chmod +x ./.github/workflows/codecov/codecov.sh
Expand Down Expand Up @@ -195,6 +195,6 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: clippy
args: --workspace
args: --workspace --features integration-test
env:
RUSTFLAGS: -D warnings
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ path = "src/cli.rs"
required-features = ["cli"]

[features]
default = ["cli"]
cli = ["clap", "structopt"]
default = []
cli = []
integration-test = []

[[bench]]
name = "benchmark"
Expand Down
16 changes: 8 additions & 8 deletions benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use criterion::{criterion_group, criterion_main, Criterion};
use byteorder::BigEndian;
use pravega_client::byte_stream::ByteStreamReader;
use pravega_client::client_factory::ClientFactory;
use pravega_client::error::SegmentWriterError;
use pravega_client::event_reader::EventReader;
use pravega_client::event_stream_writer::EventStreamWriter;
use pravega_client::errors::SegmentWriterError;
use pravega_client::event::event_reader::EventReader;
use pravega_client::event::writer::EventStreamWriter;
use pravega_client_config::connection_type::{ConnectionType, MockType};
use pravega_client_config::{ClientConfig, ClientConfigBuilder};
use pravega_client_shared::*;
Expand Down Expand Up @@ -345,20 +345,20 @@ async fn set_up_event_stream_writer(config: ClientConfig) -> EventStreamWriter {
let scope_name: Scope = Scope::from("testWriterPerf".to_string());
let stream_name = Stream::from("testWriterPerf".to_string());
let client_factory = ClientFactory::new(config.clone());
let controller_client = client_factory.get_controller_client();
let controller_client = client_factory.controller_client();
create_scope_stream(controller_client, &scope_name, &stream_name, 1).await;
let scoped_stream = ScopedStream {
scope: scope_name.clone(),
stream: stream_name.clone(),
};
client_factory.create_event_stream_writer(scoped_stream)
client_factory.create_event_writer(scoped_stream)
}

async fn set_up_event_stream_reader(config: ClientConfig) -> EventReader {
let scope_name: Scope = Scope::from("testReaderPerf".to_string());
let stream_name = Stream::from("testReaderPerf".to_string());
let client_factory = ClientFactory::new(config.clone());
let controller_client = client_factory.get_controller_client();
let controller_client = client_factory.controller_client();
create_scope_stream(controller_client, &scope_name, &stream_name, 1).await;
let scoped_stream = ScopedStream {
scope: scope_name.clone(),
Expand All @@ -376,15 +376,15 @@ fn set_up_byte_stream_reader(config: ClientConfig, rt: &mut Runtime) -> ByteStre
let scope_name: Scope = Scope::from("testByteReaderPerf".to_string());
let stream_name = Stream::from("testByteReaderPerf".to_string());
let client_factory = ClientFactory::new(config.clone());
let controller_client = client_factory.get_controller_client();
let controller_client = client_factory.controller_client();
rt.block_on(create_scope_stream(
controller_client,
&scope_name,
&stream_name,
1,
));
let scoped_segment = ScopedSegment::from("testByteReaderPerf/testByteReaderPerf/0");
client_factory.create_byte_stream_reader(scoped_segment)
client_factory.create_byte_reader(scoped_segment)
}

async fn create_scope_stream(
Expand Down
30 changes: 15 additions & 15 deletions bindings/src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ impl StreamManager {
///
#[text_signature = "($self, scope_name)"]
pub fn create_scope(&self, scope_name: &str) -> PyResult<bool> {
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();

info!("creating scope {:?}", scope_name);

let controller = self.cf.get_controller_client();
let controller = self.cf.controller_client();
let scope_name = Scope::from(scope_name.to_string());

let scope_result = handle.block_on(controller.create_scope(&scope_name));
Expand All @@ -96,10 +96,10 @@ impl StreamManager {
///
#[text_signature = "($self, scope_name)"]
pub fn delete_scope(&self, scope_name: &str) -> PyResult<bool> {
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();
info!("Delete scope {:?}", scope_name);

let controller = self.cf.get_controller_client();
let controller = self.cf.controller_client();
let scope_name = Scope::from(scope_name.to_string());

let scope_result = handle.block_on(controller.delete_scope(&scope_name));
Expand All @@ -120,7 +120,7 @@ impl StreamManager {
stream_name: &str,
initial_segments: i32,
) -> PyResult<bool> {
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();
info!(
"creating stream {:?} under scope {:?} with segment count {:?}",
stream_name, scope_name, initial_segments
Expand All @@ -141,7 +141,7 @@ impl StreamManager {
retention_param: 0,
},
};
let controller = self.cf.get_controller_client();
let controller = self.cf.controller_client();

let stream_result = handle.block_on(controller.create_stream(&stream_cfg));
info!("Stream creation status {:?}", stream_result);
Expand All @@ -156,14 +156,14 @@ impl StreamManager {
///
#[text_signature = "($self, scope_name, stream_name)"]
pub fn seal_stream(&self, scope_name: &str, stream_name: &str) -> PyResult<bool> {
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();
info!("Sealing stream {:?} under scope {:?} ", stream_name, scope_name);
let scoped_stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
};

let controller = self.cf.get_controller_client();
let controller = self.cf.controller_client();

let stream_result = handle.block_on(controller.seal_stream(&scoped_stream));
info!("Sealing stream status {:?}", stream_result);
Expand All @@ -178,14 +178,14 @@ impl StreamManager {
///
#[text_signature = "($self, scope_name, stream_name)"]
pub fn delete_stream(&self, scope_name: &str, stream_name: &str) -> PyResult<bool> {
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();
info!("Deleting stream {:?} under scope {:?} ", stream_name, scope_name);
let scoped_stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
};

let controller = self.cf.get_controller_client();
let controller = self.cf.controller_client();
let stream_result = handle.block_on(controller.delete_stream(&scoped_stream));
info!("Deleting stream status {:?}", stream_result);
match stream_result {
Expand All @@ -211,7 +211,7 @@ impl StreamManager {
stream: Stream::from(stream_name.to_string()),
};
let stream_writer = StreamWriter::new(
self.cf.create_event_stream_writer(scoped_stream.clone()),
self.cf.create_event_writer(scoped_stream.clone()),
self.cf.clone(),
scoped_stream,
);
Expand Down Expand Up @@ -239,10 +239,10 @@ impl StreamManager {
scope: Scope::from(scope_name.to_owned()),
stream: Stream::from(stream_name.to_owned()),
};
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();
let txn_writer = handle.block_on(
self.cf
.create_transactional_event_stream_writer(scoped_stream.clone(), WriterId(writer_id)),
.create_transactional_event_writer(scoped_stream.clone(), WriterId(writer_id)),
);
let txn_stream_writer = StreamTxnWriter::new(txn_writer, self.cf.clone(), scoped_stream);
Ok(txn_stream_writer)
Expand All @@ -255,7 +255,7 @@ impl StreamManager {
/// import pravega_client;
/// manager=pravega_client.StreamManager("127.0.0.1:9090")
/// // Create a ReaderGroup against an already created Pravega scope and Stream.
/// reader_group=manager.create_reader_group("rg1", "scope", "stream")
/// event.reader_group=manager.create_reader_group("rg1", "scope", "stream")
/// ```
///
#[text_signature = "($self, reader_group_name, scope_name, stream_name)"]
Expand All @@ -270,7 +270,7 @@ impl StreamManager {
scope: scope.clone(),
stream: Stream::from(stream_name.to_string()),
};
let handle = self.cf.get_runtime();
let handle = self.cf.runtime();
let rg = handle.block_on(self.cf.create_reader_group(
scope,
reader_group_name.to_string(),
Expand Down
10 changes: 5 additions & 5 deletions bindings/src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@

cfg_if! {
if #[cfg(feature = "python_binding")] {
use pravega_client::event_reader::EventReader;
use pravega_client::event::reader::EventReader;
use pravega_client_shared::ScopedStream;
use pravega_client::client_factory::ClientFactory;
use pyo3::prelude::*;
use pyo3::PyResult;
use pyo3::PyObjectProtocol;
use tracing::info;
use std::sync::Arc;
use pravega_client::segment_slice::{Event, SegmentSlice};
use pravega_client::event::reader::{Event, SegmentSlice};
use pyo3::PyIterProtocol;
use tokio::sync::Mutex;
}
Expand Down Expand Up @@ -67,7 +67,7 @@ impl StreamReader {
(fut.clone_ref(py), fut, loop_event)
};
let read = self.reader.clone();
let _guard = self.factory.get_runtime().enter();
let _guard = self.factory.runtime().enter();
tokio::spawn(async move {
let slice_result = read.lock().await.acquire_segment().await;
let slice_py: Slice = Slice {
Expand All @@ -92,7 +92,7 @@ impl StreamReader {
///
#[text_signature = "($self)"]
pub fn reader_offline(&self) -> PyResult<()> {
self.factory.get_runtime().block_on(self.reader_offline_async());
self.factory.runtime().block_on(self.reader_offline_async());
Ok(())
}

Expand All @@ -103,7 +103,7 @@ impl StreamReader {
pub fn release_segment(&self, slice: &mut Slice) -> PyResult<()> {
info!("Release segment slice back");
if let Some(s) = slice.get_set_to_none() {
self.factory.get_runtime().block_on(self.release_segment_async(s));
self.factory.runtime().block_on(self.release_segment_async(s));
}
Ok(())
}
Expand Down
14 changes: 6 additions & 8 deletions bindings/src/stream_reader_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::stream_reader::StreamReader;
cfg_if! {
if #[cfg(feature = "python_binding")] {
use pravega_client_shared::ScopedStream;
use pravega_client::event_reader_group::ReaderGroup;
use pravega_client::event::reader_group::ReaderGroup;
use pravega_client::client_factory::ClientFactory;
use pyo3::prelude::*;
use pyo3::PyResult;
Expand Down Expand Up @@ -50,8 +50,8 @@ impl StreamReaderGroup {
/// import pravega_client;
/// manager=pravega_client.StreamManager("127.0.0.1:9090")
/// // lets assume the Pravega scope and stream are already created.
/// reader_group=manager.create_reader_group("rg1", "scope", "stream")
/// reader=reader_group.create_reader("reader_id");
/// event.reader_group=manager.create_reader_group("rg1", "scope", "stream")
/// reader=event.reader_group.create_reader("reader_id");
/// slice=await reader.get_segment_slice_async()
/// for event in slice:
/// print(event.data())
Expand All @@ -60,12 +60,11 @@ impl StreamReaderGroup {
pub fn create_reader(&self, reader_name: &str) -> PyResult<StreamReader> {
info!(
"Creating reader {:?} under reader group {:?}",
reader_name,
self.reader_group.get_name()
reader_name, self.reader_group.name
);
let reader = self
.factory
.get_runtime()
.runtime()
.block_on(self.reader_group.create_reader(reader_name.to_string()));
let stream_reader = StreamReader::new(
Arc::new(Mutex::new(reader)),
Expand All @@ -79,8 +78,7 @@ impl StreamReaderGroup {
fn to_str(&self) -> String {
format!(
"Stream: {:?} , ReaderGroup: {:?}",
self.stream,
self.reader_group.get_name()
self.stream, self.reader_group.name
)
}
}
Expand Down
18 changes: 9 additions & 9 deletions bindings/src/stream_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
// http://www.apache.org/licenses/LICENSE-2.0
//

use std::io::Error;
cfg_if! {
if #[cfg(feature = "python_binding")] {
use pravega_client::error::SegmentWriterError;
use pravega_client::event_stream_writer::EventStreamWriter;
use pravega_client::event::writer::EventWriter;
use pravega_client_shared::ScopedStream;
use pravega_client::client_factory::ClientFactory;
use pyo3::exceptions;
Expand All @@ -33,7 +33,7 @@ cfg_if! {
#[pyclass]
#[derive(new)]
pub(crate) struct StreamWriter {
writer: EventStreamWriter,
writer: EventWriter,
factory: ClientFactory,
stream: ScopedStream,
}
Expand Down Expand Up @@ -97,26 +97,26 @@ impl StreamWriter {
#[args(event, routing_key = "None", "*")]
pub fn write_event_bytes(&mut self, event: &[u8], routing_key: Option<&str>) -> PyResult<()> {
// to_vec creates an owned copy of the python byte array object.
let write_future: tokio::sync::oneshot::Receiver<Result<(), SegmentWriterError>> = match routing_key {
let write_future: tokio::sync::oneshot::Receiver<Result<(), Error>> = match routing_key {
Option::None => {
trace!("Writing a single event with no routing key");
self.factory
.get_runtime()
.runtime()
.block_on(self.writer.write_event(event.to_vec()))
}
Option::Some(key) => {
trace!("Writing a single event for a given routing key {:?}", key);
self.factory
.get_runtime()
.runtime()
.block_on(self.writer.write_event_by_routing_key(key.into(), event.to_vec()))
}
};

let _guard = self.factory.get_runtime().enter();
let _guard = self.factory.runtime().enter();
let timeout_fut = timeout(Duration::from_secs(TIMEOUT_IN_SECONDS), write_future);

let result: Result<Result<Result<(), SegmentWriterError>, RecvError>, _> =
self.factory.get_runtime().block_on(timeout_fut);
let result: Result<Result<Result<(), Error>, RecvError>, _> =
self.factory.runtime().block_on(timeout_fut);
match result {
Ok(t) => match t {
Ok(t1) => match t1 {
Expand Down
11 changes: 4 additions & 7 deletions bindings/src/stream_writer_transactional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

cfg_if! {
if #[cfg(feature = "python_binding")] {
use pravega_client::transaction::transactional_event_stream_writer::TransactionalEventStreamWriter;
use pravega_client::event::transactional_writer::TransactionalEventWriter;
use pravega_client::client_factory::ClientFactory;
use pyo3::exceptions;
use pyo3::prelude::*;
Expand All @@ -31,7 +31,7 @@ cfg_if! {
#[pyclass]
#[derive(new)]
pub(crate) struct StreamTxnWriter {
writer: TransactionalEventStreamWriter,
writer: TransactionalEventWriter,
factory: ClientFactory,
stream: ScopedStream,
}
Expand All @@ -46,7 +46,7 @@ impl StreamTxnWriter {
///
#[text_signature = "($self)"]
pub fn begin_txn(&mut self) -> PyResult<StreamTransaction> {
let result = self.factory.get_runtime().block_on(self.writer.begin());
let result = self.factory.runtime().block_on(self.writer.begin());
match result {
Ok(txn) => Ok(StreamTransaction::new(txn, self.factory.clone())),
Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))),
Expand All @@ -59,10 +59,7 @@ impl StreamTxnWriter {
#[text_signature = "($self, txn_id)"]
pub fn get_txn(&mut self, txn_id: u128) -> PyResult<StreamTransaction> {
debug!("Writing a single event for a given routing key");
let result = self
.factory
.get_runtime()
.block_on(self.writer.get_txn(TxId(txn_id)));
let result = self.factory.runtime().block_on(self.writer.get_txn(TxId(txn_id)));

match result {
Ok(txn) => Ok(StreamTransaction::new(txn, self.factory.clone())),
Expand Down
Loading