Skip to content

Commit

Permalink
Issue 280: Enable Stream tags for Python bindings. (#281)
Browse files Browse the repository at this point in the history
Enable Stream tags for Python bindings.

Signed-off-by: Sandeep <sandeep.shridhar@emc.com>
  • Loading branch information
shrids authored Jul 1, 2021
1 parent e29d97f commit 8954c04
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 32 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/python_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ jobs:
java-version: '11' # The JDK version to make available on the path.
- name: Download and Run Pravega standalone
run: |
wget https://github.com/Tristan1900/pravega/releases/download/ip-fix/pravega-0.10.0-ip-fix.tgz
tar -xzvf pravega-0.10.0-ip-fix.tgz
pravega-0.10.0-2866.172245f-SNAPSHOT/bin/pravega-standalone > pravega.log 2>&1 &
wget https://github.com/shrids/pravega/releases/download/stream-tags/pravega-0.10.0-2906.6a34e64-SNAPSHOT.tgz
tar -xzvf pravega-0.10.0-2906.6a34e64-SNAPSHOT.tgz
pravega-0.10.0-2906.6a34e64-SNAPSHOT/bin/pravega-standalone > pravega.log 2>&1 &
sleep 120 && echo "Started standalone"
- name: Set up Python
uses: actions/setup-python@v2
Expand Down
6 changes: 2 additions & 4 deletions benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,8 @@ async fn create_scope_stream(
scale_factor: 0,
min_num_segments: segment_number,
},
retention: Retention {
retention_type: RetentionType::None,
retention_param: 0,
},
retention: Default::default(),
tags: None,
};
controller_client
.create_stream(&request)
Expand Down
5 changes: 1 addition & 4 deletions controller-client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,7 @@ fn main() {
scale_factor: 0,
min_num_segments: segment_count,
},
retention: Retention {
retention_type: RetentionType::None,
retention_param: 0,
},
retention: Default::default(),
tags: if tags.is_empty() { None } else { Some(tags) },
};
let result = rt.block_on(controller_client.create_stream(&stream_cfg));
Expand Down
5 changes: 1 addition & 4 deletions controller-client/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ fn test_create_stream_error() {
scale_factor: 0,
min_num_segments: 1,
},
retention: Retention {
retention_type: RetentionType::None,
retention_param: 0,
},
retention: Default::default(),
};
let create_stream_result = rt.block_on(client.create_stream(&request));
assert!(create_stream_result.is_err());
Expand Down
3 changes: 3 additions & 0 deletions python_binding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ cfg_if! {
if #[cfg(feature = "python_binding")] {
use pyo3::prelude::*;
use stream_manager::StreamManager;
use stream_manager::{StreamScalingPolicy, StreamRetentionPolicy};
#[macro_use]
extern crate derive_new;
use stream_writer::StreamWriter;
Expand Down Expand Up @@ -49,6 +50,8 @@ fn pravega_client(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<StreamTransaction>()?;
m.add_class::<StreamReader>()?;
m.add_class::<StreamReaderGroup>()?;
m.add_class::<StreamScalingPolicy>()?;
m.add_class::<StreamRetentionPolicy>()?;
let txn_exception = py.get_type::<TxnFailedException>();
txn_exception.setattr("__doc__", TXNFAILED_EXCEPTION_DOCSTRING)?;
m.add("TxnFailedException", txn_exception)?;
Expand Down
222 changes: 207 additions & 15 deletions python_binding/src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,115 @@ pub(crate) struct StreamManager {
config: ClientConfig,
}

#[cfg(feature = "python_binding")]
#[pyclass]
#[derive(Clone)]
pub(crate) struct StreamRetentionPolicy {
retention: Retention,
}

impl Default for StreamRetentionPolicy {
fn default() -> Self {
StreamRetentionPolicy {
retention: Default::default(),
}
}
}

#[cfg(feature = "python_binding")]
#[pymethods]
impl StreamRetentionPolicy {
#[staticmethod]
pub fn none() -> StreamRetentionPolicy {
StreamRetentionPolicy {
retention: Default::default(),
}
}

#[staticmethod]
pub fn by_size(size_in_bytes: i64) -> StreamRetentionPolicy {
StreamRetentionPolicy {
retention: Retention {
retention_type: RetentionType::Size,
retention_param: size_in_bytes,
},
}
}

#[staticmethod]
pub fn by_time(time_in_millis: i64) -> StreamRetentionPolicy {
StreamRetentionPolicy {
retention: Retention {
retention_type: RetentionType::Size,
retention_param: time_in_millis,
},
}
}
}

#[cfg(feature = "python_binding")]
#[pyclass]
#[derive(Clone)]
pub(crate) struct StreamScalingPolicy {
scaling: Scaling,
}

impl Default for StreamScalingPolicy {
fn default() -> Self {
StreamScalingPolicy {
scaling: Default::default(),
}
}
}

#[cfg(feature = "python_binding")]
#[pymethods]
impl StreamScalingPolicy {
#[staticmethod]
pub fn fixed_scaling_policy(initial_segments: i32) -> StreamScalingPolicy {
StreamScalingPolicy {
scaling: Scaling {
scale_type: ScaleType::FixedNumSegments,
target_rate: 0,
scale_factor: 0,
min_num_segments: initial_segments,
},
}
}

#[staticmethod]
pub fn auto_scaling_policy_by_data_rate(
target_rate_kbytes_per_sec: i32,
scale_factor: i32,
initial_segments: i32,
) -> StreamScalingPolicy {
StreamScalingPolicy {
scaling: Scaling {
scale_type: ScaleType::ByRateInKbytesPerSec,
target_rate: target_rate_kbytes_per_sec,
scale_factor,
min_num_segments: initial_segments,
},
}
}

#[staticmethod]
pub fn auto_scaling_policy_by_event_rate(
target_events_per_sec: i32,
scale_factor: i32,
initial_segments: i32,
) -> StreamScalingPolicy {
StreamScalingPolicy {
scaling: Scaling {
scale_type: ScaleType::ByRateInEventsPerSec,
target_rate: target_events_per_sec,
scale_factor,
min_num_segments: initial_segments,
},
}
}
}

///
/// Create a StreamManager by providing a controller uri.
/// ```
Expand Down Expand Up @@ -121,6 +230,47 @@ impl StreamManager {
}
}

///
/// Create a Stream in Pravega
///
#[text_signature = "($self, scope_name, stream_name, scaling_policy, retention_policy, tags)"]
#[args(
scaling_policy = "Default::default()",
retention_policy = "Default::default()",
tags = "None"
)]
pub fn create_stream_with_policy(
&self,
scope_name: &str,
stream_name: &str,
scaling_policy: StreamScalingPolicy,
retention_policy: StreamRetentionPolicy,
tags: Option<Vec<String>>,
) -> PyResult<bool> {
let handle = self.cf.runtime();
info!(
"creating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}",
stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags
);
let stream_cfg = StreamConfiguration {
scoped_stream: ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
},
scaling: scaling_policy.scaling,
retention: retention_policy.retention,
tags,
};
let controller = self.cf.controller_client();

let stream_result = handle.block_on(controller.create_stream(&stream_cfg));
info!("Stream creation status {:?}", stream_result);
match stream_result {
Ok(t) => Ok(t),
Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))),
}
}

///
/// Create a Stream in Pravega.
///
Expand All @@ -130,39 +280,81 @@ impl StreamManager {
scope_name: &str,
stream_name: &str,
initial_segments: i32,
) -> PyResult<bool> {
self.create_stream_with_policy(
scope_name,
stream_name,
StreamScalingPolicy::fixed_scaling_policy(initial_segments),
Default::default(),
None,
)
}

///
/// Update Stream Configuration in Pravega
///
#[text_signature = "($self, scope_name, stream_name, scaling_policy, retention_policy, tags)"]
#[args(
scaling_policy = "Default::default()",
retention_policy = "Default::default()",
tags = "None"
)]
pub fn update_stream_with_policy(
&self,
scope_name: &str,
stream_name: &str,
scaling_policy: StreamScalingPolicy,
retention_policy: StreamRetentionPolicy,
tags: Option<Vec<String>>,
) -> PyResult<bool> {
let handle = self.cf.runtime();
info!(
"creating stream {:?} under scope {:?} with segment count {:?}",
stream_name, scope_name, initial_segments
"updating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}",
stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags
);
let stream_cfg = StreamConfiguration {
scoped_stream: ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
},
scaling: Scaling {
scale_type: ScaleType::FixedNumSegments,
target_rate: 0,
scale_factor: 0,
min_num_segments: initial_segments,
},
retention: Retention {
retention_type: RetentionType::None,
retention_param: 0,
},
tags: None,
scaling: scaling_policy.scaling,
retention: retention_policy.retention,
tags,
};
let controller = self.cf.controller_client();

let stream_result = handle.block_on(controller.create_stream(&stream_cfg));
info!("Stream creation status {:?}", stream_result);
let stream_result = handle.block_on(controller.update_stream(&stream_cfg));
info!("Stream updation status {:?}", stream_result);
match stream_result {
Ok(t) => Ok(t),
Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))),
}
}

///
/// Get Stream tags from Pravega
///
#[text_signature = "($self, scope_name, stream_name, scaling_policy, retention_policy, tags)"]
pub fn get_stream_tags(&self, scope_name: &str, stream_name: &str) -> PyResult<Option<Vec<String>>> {
let handle = self.cf.runtime();
info!(
"fetch tags for stream {:?} under scope {:?}",
stream_name, scope_name,
);
let stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
};
let controller = self.cf.controller_client();

let stream_configuration = handle.block_on(controller.get_stream_configuration(&stream));
info!("Stream configuration is {:?}", stream_configuration);
match stream_configuration {
Ok(t) => Ok(t.tags),
Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))),
}
}

///
/// Create a Stream in Pravega.
///
Expand Down
41 changes: 41 additions & 0 deletions python_binding/tests/pravega_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,50 @@
import string
import pravega_client
from pravega_client import TxnFailedException
from pravega_client import StreamScalingPolicy
from pravega_client import StreamRetentionPolicy

class PravegaTest(unittest.TestCase):

def test_tags(self):
scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits)
for i in range(10))
print("Creating a Stream Manager, ensure Pravega is running")
stream_manager=pravega_client.StreamManager("tcp://127.0.0.1:9090", False, False)

print("Creating a scope")
scope_result=stream_manager.create_scope(scope)
self.assertEqual(True, scope_result, "Scope creation status")
print("Creating a stream")
# stream is created using a fixed scaling policy.
stream_result=stream_manager.create_stream(scope, "testStream", 1)
self.assertTrue(stream_result, "Stream creation status")
tags = stream_manager.get_stream_tags(scope, "testStream")
# verify empty tags.
self.assertTrue(len(tags)==0)
stream_update=stream_manager.update_stream_with_policy(scope_name=scope, stream_name="testStream", tags=["t1"])
self.assertTrue(stream_update, "Stream update status")
tags = stream_manager.get_stream_tags(scope, "testStream")
self.assertEqual(["t1"], tags)

# create a stream with stream scaling is enabled with data rate as 10kbps, scaling factor as 2 and initial segments as 1
policy = StreamScalingPolicy.auto_scaling_policy_by_data_rate(10, 2, 1)
stream_result=stream_manager.create_stream_with_policy(scope_name=scope, stream_name="testStream1", scaling_policy=policy)
self.assertTrue(stream_result, "Stream creation status")
# add tags
stream_update=stream_manager.update_stream_with_policy(scope_name=scope, stream_name="testStream1", scaling_policy=policy, tags=['t1', 't2'])
self.assertTrue(stream_update, "Stream update status")
tags = stream_manager.get_stream_tags(scope, "testStream1")
self.assertEqual(['t1', 't2'], tags)

# update retention policy
# retention policy of 10GB
retention = StreamRetentionPolicy.by_size(10*1024*1024 * 1024)
stream_update=stream_manager.update_stream_with_policy(scope, "testStream1", policy, retention, tags=["t4", "t5"])
self.assertTrue(stream_update, "Stream update status")
tags = stream_manager.get_stream_tags(scope, "testStream1")
self.assertEqual(["t4", "t5"], tags)

def test_writeEvent(self):
scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits)
for i in range(10))
Expand Down
4 changes: 2 additions & 2 deletions shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ impl Default for Scaling {
scale_type: ScaleType::FixedNumSegments,
min_num_segments: 1,
scale_factor: 1,
target_rate: 1000,
target_rate: 0,
}
}
}
Expand Down Expand Up @@ -497,7 +497,7 @@ impl Default for Retention {
fn default() -> Self {
Retention {
retention_type: RetentionType::None,
retention_param: std::i64::MAX,
retention_param: 0,
}
}
}
Expand Down

0 comments on commit 8954c04

Please sign in to comment.