diff --git a/.github/workflows/python_test.yml b/.github/workflows/python_test.yml index 28a7dbc0a..118db42ef 100644 --- a/.github/workflows/python_test.yml +++ b/.github/workflows/python_test.yml @@ -23,9 +23,9 @@ jobs: java-version: '11' # The JDK version to make available on the path. - name: Download and Run Pravega standalone run: | - wget https://github.com/Tristan1900/pravega/releases/download/ip-fix/pravega-0.10.0-ip-fix.tgz - tar -xzvf pravega-0.10.0-ip-fix.tgz - pravega-0.10.0-2866.172245f-SNAPSHOT/bin/pravega-standalone > pravega.log 2>&1 & + wget https://github.com/shrids/pravega/releases/download/stream-tags/pravega-0.10.0-2906.6a34e64-SNAPSHOT.tgz + tar -xzvf pravega-0.10.0-2906.6a34e64-SNAPSHOT.tgz + pravega-0.10.0-2906.6a34e64-SNAPSHOT/bin/pravega-standalone > pravega.log 2>&1 & sleep 120 && echo "Started standalone" - name: Set up Python uses: actions/setup-python@v2 diff --git a/benches/benchmark.rs b/benches/benchmark.rs index 905f569cb..957305892 100644 --- a/benches/benchmark.rs +++ b/benches/benchmark.rs @@ -408,10 +408,8 @@ async fn create_scope_stream( scale_factor: 0, min_num_segments: segment_number, }, - retention: Retention { - retention_type: RetentionType::None, - retention_param: 0, - }, + retention: Default::default(), + tags: None, }; controller_client .create_stream(&request) diff --git a/controller-client/src/cli.rs b/controller-client/src/cli.rs index 3bac7476d..924bd6dad 100644 --- a/controller-client/src/cli.rs +++ b/controller-client/src/cli.rs @@ -115,10 +115,7 @@ fn main() { scale_factor: 0, min_num_segments: segment_count, }, - retention: Retention { - retention_type: RetentionType::None, - retention_param: 0, - }, + retention: Default::default(), tags: if tags.is_empty() { None } else { Some(tags) }, }; let result = rt.block_on(controller_client.create_stream(&stream_cfg)); diff --git a/controller-client/src/test.rs b/controller-client/src/test.rs index 54ac70043..bcc963b1d 100644 --- a/controller-client/src/test.rs +++ b/controller-client/src/test.rs @@ -59,10 +59,7 @@ fn test_create_stream_error() { scale_factor: 0, min_num_segments: 1, }, - retention: Retention { - retention_type: RetentionType::None, - retention_param: 0, - }, + retention: Default::default(), }; let create_stream_result = rt.block_on(client.create_stream(&request)); assert!(create_stream_result.is_err()); diff --git a/python_binding/src/lib.rs b/python_binding/src/lib.rs index 76503e24a..eba09712b 100644 --- a/python_binding/src/lib.rs +++ b/python_binding/src/lib.rs @@ -22,6 +22,7 @@ cfg_if! { if #[cfg(feature = "python_binding")] { use pyo3::prelude::*; use stream_manager::StreamManager; + use stream_manager::{StreamScalingPolicy, StreamRetentionPolicy}; #[macro_use] extern crate derive_new; use stream_writer::StreamWriter; @@ -49,6 +50,8 @@ fn pravega_client(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; let txn_exception = py.get_type::(); txn_exception.setattr("__doc__", TXNFAILED_EXCEPTION_DOCSTRING)?; m.add("TxnFailedException", txn_exception)?; diff --git a/python_binding/src/stream_manager.rs b/python_binding/src/stream_manager.rs index 13d48b89b..2594d2b0b 100644 --- a/python_binding/src/stream_manager.rs +++ b/python_binding/src/stream_manager.rs @@ -44,6 +44,115 @@ pub(crate) struct StreamManager { config: ClientConfig, } +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(Clone)] +pub(crate) struct StreamRetentionPolicy { + retention: Retention, +} + +impl Default for StreamRetentionPolicy { + fn default() -> Self { + StreamRetentionPolicy { + retention: Default::default(), + } + } +} + +#[cfg(feature = "python_binding")] +#[pymethods] +impl StreamRetentionPolicy { + #[staticmethod] + pub fn none() -> StreamRetentionPolicy { + StreamRetentionPolicy { + retention: Default::default(), + } + } + + #[staticmethod] + pub fn by_size(size_in_bytes: i64) -> StreamRetentionPolicy { + StreamRetentionPolicy { + retention: Retention { + retention_type: RetentionType::Size, + retention_param: size_in_bytes, + }, + } + } + + #[staticmethod] + pub fn by_time(time_in_millis: i64) -> StreamRetentionPolicy { + StreamRetentionPolicy { + retention: Retention { + retention_type: RetentionType::Size, + retention_param: time_in_millis, + }, + } + } +} + +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(Clone)] +pub(crate) struct StreamScalingPolicy { + scaling: Scaling, +} + +impl Default for StreamScalingPolicy { + fn default() -> Self { + StreamScalingPolicy { + scaling: Default::default(), + } + } +} + +#[cfg(feature = "python_binding")] +#[pymethods] +impl StreamScalingPolicy { + #[staticmethod] + pub fn fixed_scaling_policy(initial_segments: i32) -> StreamScalingPolicy { + StreamScalingPolicy { + scaling: Scaling { + scale_type: ScaleType::FixedNumSegments, + target_rate: 0, + scale_factor: 0, + min_num_segments: initial_segments, + }, + } + } + + #[staticmethod] + pub fn auto_scaling_policy_by_data_rate( + target_rate_kbytes_per_sec: i32, + scale_factor: i32, + initial_segments: i32, + ) -> StreamScalingPolicy { + StreamScalingPolicy { + scaling: Scaling { + scale_type: ScaleType::ByRateInKbytesPerSec, + target_rate: target_rate_kbytes_per_sec, + scale_factor, + min_num_segments: initial_segments, + }, + } + } + + #[staticmethod] + pub fn auto_scaling_policy_by_event_rate( + target_events_per_sec: i32, + scale_factor: i32, + initial_segments: i32, + ) -> StreamScalingPolicy { + StreamScalingPolicy { + scaling: Scaling { + scale_type: ScaleType::ByRateInEventsPerSec, + target_rate: target_events_per_sec, + scale_factor, + min_num_segments: initial_segments, + }, + } + } +} + /// /// Create a StreamManager by providing a controller uri. /// ``` @@ -121,6 +230,47 @@ impl StreamManager { } } + /// + /// Create a Stream in Pravega + /// + #[text_signature = "($self, scope_name, stream_name, scaling_policy, retention_policy, tags)"] + #[args( + scaling_policy = "Default::default()", + retention_policy = "Default::default()", + tags = "None" + )] + pub fn create_stream_with_policy( + &self, + scope_name: &str, + stream_name: &str, + scaling_policy: StreamScalingPolicy, + retention_policy: StreamRetentionPolicy, + tags: Option>, + ) -> PyResult { + let handle = self.cf.runtime(); + info!( + "creating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}", + stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags + ); + let stream_cfg = StreamConfiguration { + scoped_stream: ScopedStream { + scope: Scope::from(scope_name.to_string()), + stream: Stream::from(stream_name.to_string()), + }, + scaling: scaling_policy.scaling, + retention: retention_policy.retention, + tags, + }; + let controller = self.cf.controller_client(); + + let stream_result = handle.block_on(controller.create_stream(&stream_cfg)); + info!("Stream creation status {:?}", stream_result); + match stream_result { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))), + } + } + /// /// Create a Stream in Pravega. /// @@ -130,39 +280,81 @@ impl StreamManager { scope_name: &str, stream_name: &str, initial_segments: i32, + ) -> PyResult { + self.create_stream_with_policy( + scope_name, + stream_name, + StreamScalingPolicy::fixed_scaling_policy(initial_segments), + Default::default(), + None, + ) + } + + /// + /// Update Stream Configuration in Pravega + /// + #[text_signature = "($self, scope_name, stream_name, scaling_policy, retention_policy, tags)"] + #[args( + scaling_policy = "Default::default()", + retention_policy = "Default::default()", + tags = "None" + )] + pub fn update_stream_with_policy( + &self, + scope_name: &str, + stream_name: &str, + scaling_policy: StreamScalingPolicy, + retention_policy: StreamRetentionPolicy, + tags: Option>, ) -> PyResult { let handle = self.cf.runtime(); info!( - "creating stream {:?} under scope {:?} with segment count {:?}", - stream_name, scope_name, initial_segments + "updating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}", + stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags ); let stream_cfg = StreamConfiguration { scoped_stream: ScopedStream { scope: Scope::from(scope_name.to_string()), stream: Stream::from(stream_name.to_string()), }, - scaling: Scaling { - scale_type: ScaleType::FixedNumSegments, - target_rate: 0, - scale_factor: 0, - min_num_segments: initial_segments, - }, - retention: Retention { - retention_type: RetentionType::None, - retention_param: 0, - }, - tags: None, + scaling: scaling_policy.scaling, + retention: retention_policy.retention, + tags, }; let controller = self.cf.controller_client(); - let stream_result = handle.block_on(controller.create_stream(&stream_cfg)); - info!("Stream creation status {:?}", stream_result); + let stream_result = handle.block_on(controller.update_stream(&stream_cfg)); + info!("Stream updation status {:?}", stream_result); match stream_result { Ok(t) => Ok(t), Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))), } } + /// + /// Get Stream tags from Pravega + /// + #[text_signature = "($self, scope_name, stream_name, scaling_policy, retention_policy, tags)"] + pub fn get_stream_tags(&self, scope_name: &str, stream_name: &str) -> PyResult>> { + let handle = self.cf.runtime(); + info!( + "fetch tags for stream {:?} under scope {:?}", + stream_name, scope_name, + ); + let stream = ScopedStream { + scope: Scope::from(scope_name.to_string()), + stream: Stream::from(stream_name.to_string()), + }; + let controller = self.cf.controller_client(); + + let stream_configuration = handle.block_on(controller.get_stream_configuration(&stream)); + info!("Stream configuration is {:?}", stream_configuration); + match stream_configuration { + Ok(t) => Ok(t.tags), + Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))), + } + } + /// /// Create a Stream in Pravega. /// diff --git a/python_binding/tests/pravega_client_test.py b/python_binding/tests/pravega_client_test.py index e64205ee4..f7a540b3a 100644 --- a/python_binding/tests/pravega_client_test.py +++ b/python_binding/tests/pravega_client_test.py @@ -13,9 +13,50 @@ import string import pravega_client from pravega_client import TxnFailedException +from pravega_client import StreamScalingPolicy +from pravega_client import StreamRetentionPolicy class PravegaTest(unittest.TestCase): + def test_tags(self): + scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits) + for i in range(10)) + print("Creating a Stream Manager, ensure Pravega is running") + stream_manager=pravega_client.StreamManager("tcp://127.0.0.1:9090", False, False) + + print("Creating a scope") + scope_result=stream_manager.create_scope(scope) + self.assertEqual(True, scope_result, "Scope creation status") + print("Creating a stream") + # stream is created using a fixed scaling policy. + stream_result=stream_manager.create_stream(scope, "testStream", 1) + self.assertTrue(stream_result, "Stream creation status") + tags = stream_manager.get_stream_tags(scope, "testStream") + # verify empty tags. + self.assertTrue(len(tags)==0) + stream_update=stream_manager.update_stream_with_policy(scope_name=scope, stream_name="testStream", tags=["t1"]) + self.assertTrue(stream_update, "Stream update status") + tags = stream_manager.get_stream_tags(scope, "testStream") + self.assertEqual(["t1"], tags) + + # create a stream with stream scaling is enabled with data rate as 10kbps, scaling factor as 2 and initial segments as 1 + policy = StreamScalingPolicy.auto_scaling_policy_by_data_rate(10, 2, 1) + stream_result=stream_manager.create_stream_with_policy(scope_name=scope, stream_name="testStream1", scaling_policy=policy) + self.assertTrue(stream_result, "Stream creation status") + # add tags + stream_update=stream_manager.update_stream_with_policy(scope_name=scope, stream_name="testStream1", scaling_policy=policy, tags=['t1', 't2']) + self.assertTrue(stream_update, "Stream update status") + tags = stream_manager.get_stream_tags(scope, "testStream1") + self.assertEqual(['t1', 't2'], tags) + + # update retention policy + # retention policy of 10GB + retention = StreamRetentionPolicy.by_size(10*1024*1024 * 1024) + stream_update=stream_manager.update_stream_with_policy(scope, "testStream1", policy, retention, tags=["t4", "t5"]) + self.assertTrue(stream_update, "Stream update status") + tags = stream_manager.get_stream_tags(scope, "testStream1") + self.assertEqual(["t4", "t5"], tags) + def test_writeEvent(self): scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(10)) diff --git a/shared/src/lib.rs b/shared/src/lib.rs index aeac66d97..b0f224b9c 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -459,7 +459,7 @@ impl Default for Scaling { scale_type: ScaleType::FixedNumSegments, min_num_segments: 1, scale_factor: 1, - target_rate: 1000, + target_rate: 0, } } } @@ -497,7 +497,7 @@ impl Default for Retention { fn default() -> Self { Retention { retention_type: RetentionType::None, - retention_param: std::i64::MAX, + retention_param: 0, } } }