Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 280: Enable Stream tags for Python bindings. #281

Merged
merged 17 commits into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/python_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ jobs:
java-version: '11' # The JDK version to make available on the path.
- name: Download and Run Pravega standalone
run: |
wget https://github.com/Tristan1900/pravega/releases/download/ip-fix/pravega-0.10.0-ip-fix.tgz
tar -xzvf pravega-0.10.0-ip-fix.tgz
pravega-0.10.0-2866.172245f-SNAPSHOT/bin/pravega-standalone > pravega.log 2>&1 &
wget https://github.com/shrids/pravega/releases/download/stream-tags/pravega-0.10.0-2906.6a34e64-SNAPSHOT.tgz
tar -xzvf pravega-0.10.0-2906.6a34e64-SNAPSHOT.tgz
pravega-0.10.0-2906.6a34e64-SNAPSHOT/bin/pravega-standalone > pravega.log 2>&1 &
sleep 120 && echo "Started standalone"
- name: Set up Python
uses: actions/setup-python@v2
Expand Down
6 changes: 2 additions & 4 deletions benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,8 @@ async fn create_scope_stream(
scale_factor: 0,
min_num_segments: segment_number,
},
retention: Retention {
retention_type: RetentionType::None,
retention_param: 0,
},
retention: Default::default(),
tags: None,
};
controller_client
.create_stream(&request)
Expand Down
5 changes: 1 addition & 4 deletions controller-client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,7 @@ fn main() {
scale_factor: 0,
min_num_segments: segment_count,
},
retention: Retention {
retention_type: RetentionType::None,
retention_param: 0,
},
retention: Default::default(),
tags: if tags.is_empty() { None } else { Some(tags) },
};
let result = rt.block_on(controller_client.create_stream(&stream_cfg));
Expand Down
5 changes: 1 addition & 4 deletions controller-client/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ fn test_create_stream_error() {
scale_factor: 0,
min_num_segments: 1,
},
retention: Retention {
retention_type: RetentionType::None,
retention_param: 0,
},
retention: Default::default(),
};
let create_stream_result = rt.block_on(client.create_stream(&request));
assert!(create_stream_result.is_err());
Expand Down
3 changes: 3 additions & 0 deletions python_binding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ cfg_if! {
if #[cfg(feature = "python_binding")] {
use pyo3::prelude::*;
use stream_manager::StreamManager;
use stream_manager::{StreamScalingPolicy, StreamRetentionPolicy};
#[macro_use]
extern crate derive_new;
use stream_writer::StreamWriter;
Expand Down Expand Up @@ -49,6 +50,8 @@ fn pravega_client(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<StreamTransaction>()?;
m.add_class::<StreamReader>()?;
m.add_class::<StreamReaderGroup>()?;
m.add_class::<StreamScalingPolicy>()?;
m.add_class::<StreamRetentionPolicy>()?;
let txn_exception = py.get_type::<TxnFailedException>();
txn_exception.setattr("__doc__", TXNFAILED_EXCEPTION_DOCSTRING)?;
m.add("TxnFailedException", txn_exception)?;
Expand Down
222 changes: 207 additions & 15 deletions python_binding/src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,115 @@ pub(crate) struct StreamManager {
config: ClientConfig,
}

#[cfg(feature = "python_binding")]
#[pyclass]
#[derive(Clone)]
pub(crate) struct StreamRetentionPolicy {
retention: Retention,
}

impl Default for StreamRetentionPolicy {
fn default() -> Self {
StreamRetentionPolicy {
retention: Default::default(),
}
}
}

#[cfg(feature = "python_binding")]
#[pymethods]
impl StreamRetentionPolicy {
#[staticmethod]
pub fn none() -> StreamRetentionPolicy {
StreamRetentionPolicy {
retention: Default::default(),
}
}

#[staticmethod]
pub fn by_size(size_in_bytes: i64) -> StreamRetentionPolicy {
StreamRetentionPolicy {
retention: Retention {
retention_type: RetentionType::Size,
retention_param: size_in_bytes,
},
}
}

#[staticmethod]
pub fn by_time(time_in_millis: i64) -> StreamRetentionPolicy {
StreamRetentionPolicy {
retention: Retention {
retention_type: RetentionType::Size,
retention_param: time_in_millis,
},
}
}
}

#[cfg(feature = "python_binding")]
#[pyclass]
#[derive(Clone)]
pub(crate) struct StreamScalingPolicy {
scaling: Scaling,
}

impl Default for StreamScalingPolicy {
fn default() -> Self {
StreamScalingPolicy {
scaling: Default::default(),
}
}
}

#[cfg(feature = "python_binding")]
#[pymethods]
impl StreamScalingPolicy {
#[staticmethod]
pub fn fixed_scaling_policy(initial_segments: i32) -> StreamScalingPolicy {
StreamScalingPolicy {
scaling: Scaling {
scale_type: ScaleType::FixedNumSegments,
target_rate: 0,
scale_factor: 0,
min_num_segments: initial_segments,
},
}
}

#[staticmethod]
pub fn auto_scaling_policy_by_data_rate(
target_rate_kbytes_per_sec: i32,
scale_factor: i32,
initial_segments: i32,
) -> StreamScalingPolicy {
StreamScalingPolicy {
scaling: Scaling {
scale_type: ScaleType::ByRateInKbytesPerSec,
target_rate: target_rate_kbytes_per_sec,
scale_factor,
min_num_segments: initial_segments,
},
}
}

#[staticmethod]
pub fn auto_scaling_policy_by_event_rate(
target_events_per_sec: i32,
scale_factor: i32,
initial_segments: i32,
) -> StreamScalingPolicy {
StreamScalingPolicy {
scaling: Scaling {
scale_type: ScaleType::ByRateInEventsPerSec,
target_rate: target_events_per_sec,
scale_factor,
min_num_segments: initial_segments,
},
}
}
}

///
/// Create a StreamManager by providing a controller uri.
/// ```
Expand Down Expand Up @@ -121,6 +230,47 @@ impl StreamManager {
}
}

///
/// Create a Stream in Pravega
///
#[text_signature = "($self, scope_name, stream_name, scaling_policy, retention_policy, tags)"]
#[args(
scaling_policy = "Default::default()",
retention_policy = "Default::default()",
tags = "None"
)]
pub fn create_stream_with_policy(
&self,
scope_name: &str,
stream_name: &str,
scaling_policy: StreamScalingPolicy,
retention_policy: StreamRetentionPolicy,
tags: Option<Vec<String>>,
) -> PyResult<bool> {
let handle = self.cf.runtime();
info!(
"creating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}",
stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags
);
let stream_cfg = StreamConfiguration {
scoped_stream: ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
},
scaling: scaling_policy.scaling,
retention: retention_policy.retention,
tags,
};
let controller = self.cf.controller_client();

let stream_result = handle.block_on(controller.create_stream(&stream_cfg));
info!("Stream creation status {:?}", stream_result);
match stream_result {
Ok(t) => Ok(t),
Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))),
}
}

///
/// Create a Stream in Pravega.
///
Expand All @@ -130,39 +280,81 @@ impl StreamManager {
scope_name: &str,
stream_name: &str,
initial_segments: i32,
) -> PyResult<bool> {
self.create_stream_with_policy(
scope_name,
stream_name,
StreamScalingPolicy::fixed_scaling_policy(initial_segments),
Default::default(),
None,
)
}

///
/// Update Stream Configuration in Pravega
///
#[text_signature = "($self, scope_name, stream_name, scaling_policy, retention_policy, tags)"]
#[args(
scaling_policy = "Default::default()",
retention_policy = "Default::default()",
tags = "None"
)]
pub fn update_stream_with_policy(
&self,
scope_name: &str,
stream_name: &str,
scaling_policy: StreamScalingPolicy,
retention_policy: StreamRetentionPolicy,
tags: Option<Vec<String>>,
) -> PyResult<bool> {
let handle = self.cf.runtime();
info!(
"creating stream {:?} under scope {:?} with segment count {:?}",
stream_name, scope_name, initial_segments
"updating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}",
stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags
);
let stream_cfg = StreamConfiguration {
scoped_stream: ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
},
scaling: Scaling {
scale_type: ScaleType::FixedNumSegments,
target_rate: 0,
scale_factor: 0,
min_num_segments: initial_segments,
},
retention: Retention {
retention_type: RetentionType::None,
retention_param: 0,
},
tags: None,
scaling: scaling_policy.scaling,
retention: retention_policy.retention,
tags,
};
let controller = self.cf.controller_client();

let stream_result = handle.block_on(controller.create_stream(&stream_cfg));
info!("Stream creation status {:?}", stream_result);
let stream_result = handle.block_on(controller.update_stream(&stream_cfg));
info!("Stream updation status {:?}", stream_result);
match stream_result {
Ok(t) => Ok(t),
Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))),
}
}

///
/// Get Stream tags from Pravega
///
#[text_signature = "($self, scope_name, stream_name, scaling_policy, retention_policy, tags)"]
pub fn get_stream_tags(&self, scope_name: &str, stream_name: &str) -> PyResult<Option<Vec<String>>> {
let handle = self.cf.runtime();
info!(
"fetch tags for stream {:?} under scope {:?}",
stream_name, scope_name,
);
let stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
};
let controller = self.cf.controller_client();

let stream_configuration = handle.block_on(controller.get_stream_configuration(&stream));
info!("Stream configuration is {:?}", stream_configuration);
match stream_configuration {
Ok(t) => Ok(t.tags),
Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))),
}
}

///
/// Create a Stream in Pravega.
///
Expand Down
37 changes: 37 additions & 0 deletions python_binding/tests/pravega_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,46 @@
import string
import pravega_client
from pravega_client import TxnFailedException
from pravega_client import StreamScalingPolicy
from pravega_client import StreamRetentionPolicy

class PravegaTest(unittest.TestCase):

def test_tags(self):
shrids marked this conversation as resolved.
Show resolved Hide resolved
scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits)
for i in range(10))
print("Creating a Stream Manager, ensure Pravega is running")
stream_manager=pravega_client.StreamManager("tcp://127.0.0.1:9090", False, False)

print("Creating a scope")
scope_result=stream_manager.create_scope(scope)
self.assertEqual(True, scope_result, "Scope creation status")
print("Creating a stream")
stream_result=stream_manager.create_stream(scope, "testStream", 1)
self.assertTrue(stream_result, "Stream creation status")
stream_update=stream_manager.update_stream_with_policy(scope_name=scope, stream_name="testStream", tags=["t1"])
self.assertTrue(stream_update, "Stream update status")
tags = stream_manager.get_stream_tags(scope, "testStream")
self.assertEqual(["t1"], tags)

# create a stream with stream scaling is enabled with data rate as 10kbps, scaling factor as 2 and initial segments as 1
policy = StreamScalingPolicy.auto_scaling_policy_by_data_rate(10, 2, 1)
stream_result=stream_manager.create_stream_with_policy(scope_name=scope, stream_name="testStream1", scaling_policy=policy)
self.assertTrue(stream_result, "Stream creation status")
# add tags
stream_update=stream_manager.update_stream_with_policy(scope_name=scope, stream_name="testStream1", scaling_policy=policy, tags=['t1', 't2'])
self.assertTrue(stream_update, "Stream update status")
tags = stream_manager.get_stream_tags(scope, "testStream1")
self.assertEqual(['t1', 't2'], tags)

# update retention policy
# retention policy of 10GB
retention = StreamRetentionPolicy.by_size(10*1024*1024 * 1024)
stream_update=stream_manager.update_stream_with_policy(scope, "testStream1", policy, retention, tags=["t4", "t5"])
self.assertTrue(stream_update, "Stream update status")
tags = stream_manager.get_stream_tags(scope, "testStream1")
self.assertEqual(["t4", "t5"], tags)

def test_writeEvent(self):
scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits)
for i in range(10))
Expand Down
6 changes: 3 additions & 3 deletions shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,8 @@ impl Default for Scaling {
Scaling {
scale_type: ScaleType::FixedNumSegments,
min_num_segments: 1,
scale_factor: 1,
target_rate: 1000,
scale_factor: 0,
shrids marked this conversation as resolved.
Show resolved Hide resolved
target_rate: 0,
}
}
}
Expand Down Expand Up @@ -497,7 +497,7 @@ impl Default for Retention {
fn default() -> Self {
Retention {
retention_type: RetentionType::None,
retention_param: std::i64::MAX,
retention_param: 0,
}
}
}
Expand Down