From 21554f43bb2a23df9536a74a057f4c494847c8f4 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 1 Jun 2021 17:49:29 +0530 Subject: [PATCH 01/12] First cut to enable stream tag changes on RUST client. Signed-off-by: Sandeep --- controller-client/Cargo.toml | 3 + controller-client/proto/Controller.proto | 213 +++++++++++++++++- controller-client/src/cli.rs | 1 + controller-client/src/lib.rs | 132 ++++++++++- controller-client/src/mock_controller.rs | 65 ++++++ controller-client/src/model_helper.rs | 35 +++ controller-client/src/paginator.rs | 62 +++++ integration_test/src/controller_tests.rs | 122 ++++++++-- integration_test/src/disconnection_tests.rs | 1 + integration_test/src/event_reader_tests.rs | 1 + .../src/transactional_event_writer_tests.rs | 1 + integration_test/src/utils.rs | 1 + integration_test/src/wirecommand_tests.rs | 1 + python_binding/src/lib.rs | 1 - python_binding/src/stream_manager.rs | 1 + shared/Cargo.toml | 2 + shared/src/lib.rs | 8 +- src/util/mod.rs | 1 + 18 files changed, 628 insertions(+), 23 deletions(-) diff --git a/controller-client/Cargo.toml b/controller-client/Cargo.toml index 9a58be6b0..b19d11eec 100644 --- a/controller-client/Cargo.toml +++ b/controller-client/Cargo.toml @@ -33,6 +33,9 @@ tracing = "0.1" jsonwebtoken = "7" serde = {version = "1.0", features = ["derive"] } futures = "0.3" +num = "0.4" +num-derive = "0.3" +num-traits = "0.2" [build-dependencies] tonic-build = "0.4" diff --git a/controller-client/proto/Controller.proto b/controller-client/proto/Controller.proto index 35424d298..c0c7e0069 100644 --- a/controller-client/proto/Controller.proto +++ b/controller-client/proto/Controller.proto @@ -1,11 +1,17 @@ /** - * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * Copyright Pravega Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ syntax = "proto3"; package io.pravega.controller.stream.api.grpc.v1; @@ -40,11 +46,26 @@ service ControllerService { rpc pingTransaction(PingTxnRequest) returns (PingTxnStatus); rpc checkTransactionState(TxnRequest) returns (TxnState); rpc createScope(ScopeInfo) returns (CreateScopeStatus); + rpc listScopes(ScopesRequest) returns (ScopesResponse); + rpc checkScopeExists(ScopeInfo) returns (ExistsResponse); + rpc checkStreamExists(StreamInfo) returns (ExistsResponse); rpc listStreamsInScope(StreamsInScopeRequest) returns (StreamsInScopeResponse); rpc deleteScope(ScopeInfo) returns (DeleteScopeStatus); rpc getDelegationToken(StreamInfo) returns (DelegationToken); rpc removeWriter(RemoveWriterRequest) returns (RemoveWriterResponse); rpc noteTimestampFromWriter(TimestampFromWriter) returns (TimestampResponse); + rpc createKeyValueTable(KeyValueTableConfig) returns (CreateKeyValueTableStatus); + rpc getCurrentSegmentsKeyValueTable(KeyValueTableInfo) returns (SegmentRanges); + rpc listKeyValueTablesInScope(KVTablesInScopeRequest) returns (KVTablesInScopeResponse); + rpc deleteKeyValueTable(KeyValueTableInfo) returns (DeleteKVTableStatus); + rpc listSubscribers(StreamInfo) returns (SubscribersResponse); + rpc updateSubscriberStreamCut(SubscriberStreamCut) returns (UpdateSubscriberStatus); + rpc createReaderGroup(ReaderGroupConfiguration) returns (CreateReaderGroupResponse); + rpc getReaderGroupConfig(ReaderGroupInfo) returns (ReaderGroupConfigResponse); + rpc deleteReaderGroup(ReaderGroupInfo) returns (DeleteReaderGroupStatus); + rpc updateReaderGroup(ReaderGroupConfiguration) returns (UpdateReaderGroupResponse); + rpc getStreamConfiguration(StreamInfo) returns (StreamConfig); + rpc listStreamsInScopeForTag(StreamsInScopeWithTagRequest) returns (StreamsInScopeResponse); } message ServerRequest { @@ -54,6 +75,119 @@ message ServerResponse { repeated NodeUri nodeURI = 1; } +message ReaderGroupConfiguration { + enum RetentionType { + NONE = 0; + MANUAL = 1; + AUTOMATIC = 2; + } + string scope = 1; + string readerGroupName = 2; + int64 groupRefreshTimeMillis = 3; + int64 automaticCheckpointIntervalMillis = 4; + int32 maxOutstandingCheckpointRequest = 5; + int32 retentionType = 6; + int64 generation = 7; + string readerGroupId = 8; + repeated StreamCut startingStreamCuts = 9; + repeated StreamCut endingStreamCuts = 10; +} + +message ReaderGroupConfigResponse { + ReaderGroupConfiguration config = 1; + enum Status { + SUCCESS = 0; + FAILURE = 1; + RG_NOT_FOUND = 2; + } + Status status = 2; +} + +message ReaderGroupInfo { + string scope = 1; + string readerGroup = 2; + string readerGroupId = 3; + int64 generation = 4; +} + +message CreateReaderGroupResponse { + enum Status { + SUCCESS = 0; + FAILURE = 1; + SCOPE_NOT_FOUND = 2; + INVALID_RG_NAME = 3; + } + Status status = 1; + ReaderGroupConfiguration config = 2; +} + +message DeleteReaderGroupStatus { + enum Status { + SUCCESS = 0; + FAILURE = 1; + RG_NOT_FOUND = 2; + } + Status status = 1; +} + +message UpdateReaderGroupResponse { + enum Status { + SUCCESS = 0; + FAILURE = 1; + RG_NOT_FOUND = 2; + INVALID_CONFIG = 3; + } + Status status = 1; + int64 generation = 2; +} + +message CreateKeyValueTableStatus { + enum Status { + SUCCESS = 0; + FAILURE = 1; + TABLE_EXISTS = 2; + SCOPE_NOT_FOUND = 3; + INVALID_TABLE_NAME = 4; + } + Status status = 1; +} + +message KeyValueTableConfig { + string scope = 1; + string kvtName = 2; + int32 partitionCount = 3; +} + +message KeyValueTableInfo { + string scope = 1; + string kvtName = 2; +} + +message KVTablesInScopeRequest { + ScopeInfo scope = 1; + ContinuationToken continuationToken = 2; +} + +message KVTablesInScopeResponse { + repeated KeyValueTableInfo kvtables = 1; + ContinuationToken continuationToken = 2; + enum Status { + SUCCESS = 0; + FAILURE = 1; + SCOPE_NOT_FOUND = 2; + } + Status status = 3; +} + +message DeleteKVTableStatus { + enum Status { + SUCCESS = 0; + FAILURE = 1; + TABLE_NOT_FOUND = 2; + } + Status status = 1; +} + message CreateStreamStatus { enum Status { SUCCESS = 0; @@ -75,6 +209,42 @@ message UpdateStreamStatus { Status status = 1; } +message UpdateSubscriberStatus { + enum Status { + SUCCESS = 0; + FAILURE = 1; + STREAM_NOT_FOUND = 2; + SUBSCRIBER_NOT_FOUND = 3; + STREAM_CUT_NOT_VALID = 4; + GENERATION_MISMATCH = 5; + } + Status status = 1; +} + +message StreamSubscriberInfo { + string scope = 1; + string stream = 2; + string subscriber = 3; + int64 operationGeneration = 4; +} + +message SubscriberStreamCut { + string subscriber = 1; + int64 generation = 2; + string readerGroupId = 3; + StreamCut streamCut = 4; +} + +message SubscribersResponse { + repeated string subscribers = 1; + enum Status { + SUCCESS = 0; + FAILURE = 1; + STREAM_NOT_FOUND = 2; + } + Status status = 2; +} + message DeleteStreamStatus { enum Status { SUCCESS = 0; @@ -117,10 +287,10 @@ message TxnStatus { message PingTxnStatus { enum Status { - reserved 3; OK = 0; LEASE_TOO_LARGE = 1; MAX_EXECUTION_TIME_EXCEEDED = 2; + SCALE_GRACE_TIME_EXCEEDED = 3 [deprecated=true]; DISCONNECTED = 4; COMMITTED = 5; ABORTED = 6; @@ -141,7 +311,7 @@ message TxnState { State state = 1; } -message ScopeInfo { + message ScopeInfo { string scope = 1; } @@ -154,6 +324,12 @@ message ContinuationToken { ContinuationToken continuationToken = 2; } +message StreamsInScopeWithTagRequest { + ScopeInfo scope = 1; + string tag = 2; + ContinuationToken continuationToken =3; +} + message StreamsInScopeResponse { repeated StreamInfo streams = 1; ContinuationToken continuationToken = 2; @@ -168,6 +344,15 @@ message ContinuationToken { message StreamInfo { string scope = 1; string stream = 2; + enum AccessOperation { + UNSPECIFIED = 0; + NONE = 1; + ANY = 2; + READ = 3; + WRITE = 4; + READ_WRITE = 5; + } + AccessOperation accessOperation = 3; } message ScalingPolicy { @@ -190,12 +375,18 @@ message RetentionPolicy { } RetentionPolicyType retentionType = 1; int64 retentionParam = 2; + int64 retentionMax = 3; } message StreamConfig { StreamInfo streamInfo = 1; ScalingPolicy scalingPolicy = 2; RetentionPolicy retentionPolicy = 3; + Tags tags = 4; +} + +message Tags { + repeated string tag = 1; } message StreamCut { @@ -240,9 +431,9 @@ message TxnId { } message CreateTxnRequest { - reserved 3; StreamInfo streamInfo = 1; int64 lease = 2; + int64 scaleGracePeriod = 3 [deprecated=true]; } message CreateTxnResponse { @@ -373,3 +564,17 @@ message TimestampResponse { } Status result = 1; } + +message ScopesResponse { + repeated string scopes = 1; + ContinuationToken continuationToken = 2; +} + +message ScopesRequest { + ContinuationToken continuationToken = 1; +} + +message ExistsResponse { + bool exists = 1; +} + diff --git a/controller-client/src/cli.rs b/controller-client/src/cli.rs index 6c9814b22..51cc0d9b5 100644 --- a/controller-client/src/cli.rs +++ b/controller-client/src/cli.rs @@ -109,6 +109,7 @@ fn main() { retention_type: RetentionType::None, retention_param: 0, }, + tags: None, }; let result = rt.block_on(controller_client.create_stream(&stream_cfg)); println!("Stream creation status {:?}", result); diff --git a/controller-client/src/lib.rs b/controller-client/src/lib.rs index 500d887f2..fa9dc2692 100644 --- a/controller-client/src/lib.rs +++ b/controller-client/src/lib.rs @@ -26,7 +26,6 @@ #![allow(clippy::multiple_crate_versions)] #![allow(dead_code)] #![allow(clippy::similar_names)] -#![allow(clippy::upper_case_acronyms)] use std::result::Result as StdResult; use std::time::{Duration, Instant}; @@ -40,7 +39,8 @@ use controller::{ DeleteStreamStatus, GetEpochSegmentsRequest, GetSegmentsRequest, NodeUri, PingTxnRequest, PingTxnStatus, ScaleRequest, ScaleResponse, ScaleStatusRequest, ScaleStatusResponse, ScopeInfo, SegmentId, SegmentRanges, SegmentsAtTime, StreamConfig, StreamInfo, StreamsInScopeRequest, StreamsInScopeResponse, - SuccessorResponse, TxnId, TxnRequest, TxnState, TxnStatus, UpdateStreamStatus, + StreamsInScopeWithTagRequest, SuccessorResponse, TxnId, TxnRequest, TxnState, TxnStatus, + UpdateStreamStatus, }; use im::{HashMap as ImHashMap, OrdMap}; use ordered_float::OrderedFloat; @@ -145,6 +145,17 @@ pub trait ControllerClient: Send + Sync { token: &CToken, ) -> ResultRetry, CToken)>>; + /** + * API to list streams associated with the given tag under a given scope and continuation token. + * Use the pravega_controller_client::paginator::list_streams to paginate over all the streams. + */ + async fn list_streams_for_tag( + &self, + scope: &Scope, + tag: &str, + token: &CToken, + ) -> ResultRetry, CToken)>>; + /** * API to delete a scope. Note that a scope can only be deleted in the case is it empty. If * the scope contains at least one stream, then the delete request will fail. @@ -164,6 +175,16 @@ pub trait ControllerClient: Send + Sync { */ async fn update_stream(&self, stream_config: &StreamConfiguration) -> ResultRetry; + /** + * API to fetch the Stream Configuration of a Stream. + */ + async fn get_stream_configuration(&self, stream: &ScopedStream) -> ResultRetry; + + /** + * API to fetch the Tags for a Stream. + */ + async fn get_stream_tags(&self, stream: &ScopedStream) -> ResultRetry>>; + /** * API to Truncate stream. This api takes a stream cut point which corresponds to a cut in * the stream segments which is consistent and covers the entire key range space. @@ -352,6 +373,18 @@ impl ControllerClient for ControllerClientImpl { ) } + async fn list_streams_for_tag( + &self, + scope: &Scope, + tag: &str, + token: &CToken, + ) -> ResultRetry, CToken)>> { + wrap_with_async_retry!( + self.config.retry_policy.max_tries(MAX_RETRIES), + self.call_list_streams_for_tag(scope, tag, token) + ) + } + async fn delete_scope(&self, scope: &Scope) -> ResultRetry { wrap_with_async_retry!( self.config.retry_policy.max_tries(MAX_RETRIES), @@ -373,6 +406,20 @@ impl ControllerClient for ControllerClientImpl { ) } + async fn get_stream_configuration(&self, stream: &ScopedStream) -> ResultRetry { + wrap_with_async_retry!( + self.config.retry_policy.max_tries(MAX_RETRIES), + self.call_get_stream_configuration(stream) + ) + } + + async fn get_stream_tags(&self, stream: &ScopedStream) -> ResultRetry>> { + wrap_with_async_retry!( + self.config.retry_policy.max_tries(MAX_RETRIES), + self.call_get_stream_tags(stream) + ) + } + async fn truncate_stream(&self, stream_cut: &StreamCut) -> ResultRetry { wrap_with_async_retry!( self.config.retry_policy.max_tries(MAX_RETRIES), @@ -704,6 +751,64 @@ impl ControllerClientImpl { } } + async fn call_list_streams_for_tag( + &self, + scope: &Scope, + tag: &str, + token: &CToken, + ) -> Result, CToken)>> { + let operation_name = "ListStreamsForTag"; + let request: StreamsInScopeWithTagRequest = StreamsInScopeWithTagRequest { + scope: Some(ScopeInfo::from(scope)), + tag: tag.to_string(), + continuation_token: Some(ContinuationToken::from(token)), + }; + debug!( + "Triggering a request to the controller to list streams with tag {} under scope {}", + tag, scope + ); + + let op_status: StdResult, tonic::Status> = self + .get_controller_client() + .await + .list_streams_in_scope_for_tag(request) + .await; + match op_status { + Ok(streams_with_token) => { + let result = streams_with_token.into_inner(); + let mut t: Vec = result.streams; + if t.is_empty() { + // Empty result from the controller implies no further streams present. + Ok(None) + } else { + // update state with the new set of streams. + let stream_list: Vec = t.drain(..).map(|i| i.into()).collect(); + let token: Option = result.continuation_token; + match token.map(|t| t.token) { + None => { + warn!( + "None returned for continuation token list streams API for scope {}", + scope + ); + Err(ControllerError::InvalidResponse { + can_retry: false, + error_msg: "No continuation token received from Controller".to_string(), + }) + } + Some(ct) => { + debug!("Returned token {} for list streams API under scope {}", ct, scope); + Ok(Some((stream_list, CToken::from(ct.as_str())))) + } + } + } + } + Err(status) => { + debug!("Error {} while listing streams under scope {}", status, scope); + Err(self.map_grpc_error(operation_name, status).await) + } + } + } + async fn call_create_scope(&self, scope: &Scope) -> Result { use create_scope_status::Status; let operation_name = "CreateScope"; @@ -822,6 +927,27 @@ impl ControllerClientImpl { } } + async fn call_get_stream_configuration(&self, stream: &ScopedStream) -> Result { + let request: StreamInfo = StreamInfo::from(stream); + let op_status: StdResult, tonic::Status> = self + .get_controller_client() + .await + .get_stream_configuration(tonic::Request::new(request)) + .await; + let operation_name = "get_stream_configuration"; + + match op_status { + Ok(config) => Ok(StreamConfiguration::from(config.into_inner())), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), + } + } + + async fn call_get_stream_tags(&self, stream: &ScopedStream) -> Result>> { + self.call_get_stream_configuration(stream) + .await + .map(|cfg| cfg.tags) + } + async fn call_truncate_stream(&self, stream_cut: &StreamCut) -> Result { use update_stream_status::Status; @@ -978,6 +1104,7 @@ impl ControllerClientImpl { let request = CreateTxnRequest { stream_info: Some(StreamInfo::from(stream)), lease: lease.as_millis() as i64, + scale_grace_period: 0, }; let op_status: StdResult, tonic::Status> = self .get_controller_client() @@ -1369,6 +1496,7 @@ mod test { retention_type: RetentionType::None, retention_param: 0, }, + tags: None, }; let res = rt .block_on(controller.create_stream(&stream_config)) diff --git a/controller-client/src/mock_controller.rs b/controller-client/src/mock_controller.rs index 2f406b4db..1d8bc8bde 100644 --- a/controller-client/src/mock_controller.rs +++ b/controller-client/src/mock_controller.rs @@ -99,6 +99,47 @@ impl ControllerClient for MockController { Ok(Some((result, CToken::from("mock_token")))) } + async fn list_streams_for_tag( + &self, + scope: &Scope, + tag: &str, + _token: &CToken, + ) -> Result, CToken)>, RetryError> { + let scope_gaurd = self.created_scopes.read().await; + let stream_gaurd = self.created_streams.read().await; + + let streams_set = scope_gaurd.get(&scope.name).ok_or(RetryError { + error: ControllerError::OperationError { + can_retry: false, + operation: "listStreams".into(), + error_msg: "Scope not exist".into(), + }, + total_delay: Duration::from_millis(1), + tries: 0, + })?; + let mut result = Vec::new(); + for stream in streams_set { + let cfg = stream_gaurd.get(stream).ok_or(RetryError { + error: ControllerError::OperationError { + can_retry: false, + operation: "listStreamsForTag".into(), + error_msg: "Stream does not exist".into(), + }, + total_delay: Duration::from_millis(1), + tries: 0, + })?; + match &cfg.tags { + None => {} + Some(tag_list) => { + if tag_list.contains(&tag.to_string()) { + result.push(stream.clone()) + } + } + }; + } + Ok(Some((result, CToken::from("mock_token")))) + } + async fn delete_scope(&self, scope: &Scope) -> Result> { let scope_name = scope.name.clone(); if self.created_scopes.read().await.get(&scope_name).is_none() { @@ -181,6 +222,30 @@ impl ControllerClient for MockController { }) } + async fn get_stream_configuration(&self, _stream: &ScopedStream) -> ResultRetry { + Err(RetryError { + error: ControllerError::OperationError { + can_retry: false, + operation: "get stream configuration".into(), + error_msg: "unsupported operation.".into(), + }, + total_delay: Duration::from_millis(1), + tries: 0, + }) + } + + async fn get_stream_tags(&self, _stream: &ScopedStream) -> ResultRetry>> { + Err(RetryError { + error: ControllerError::OperationError { + can_retry: false, + operation: "get stream tags".into(), + error_msg: "unsupported operation.".into(), + }, + total_delay: Duration::from_millis(1), + tries: 0, + }) + } + async fn truncate_stream(&self, _stream_cut: &StreamCut) -> Result> { Err(RetryError { error: ControllerError::OperationError { diff --git a/controller-client/src/model_helper.rs b/controller-client/src/model_helper.rs index f2ce458ad..a9532fc94 100644 --- a/controller-client/src/model_helper.rs +++ b/controller-client/src/model_helper.rs @@ -12,6 +12,7 @@ use crate::controller::*; use ordered_float::OrderedFloat; use pravega_client_shared::*; use std::collections::{BTreeMap, HashMap}; +use stream_info::AccessOperation; impl From for PravegaNodeUri { fn from(value: NodeUri) -> PravegaNodeUri { @@ -36,6 +37,7 @@ impl From for SegmentId { stream_info: Some(StreamInfo { scope: segment.scope.name, stream: segment.stream.name, + access_operation: AccessOperation::Unspecified as i32, }), segment_id: segment.segment.number, } @@ -48,6 +50,7 @@ impl<'a> From<&'a ScopedSegment> for SegmentId { stream_info: Some(StreamInfo { scope: value.scope.name.to_owned(), stream: value.stream.name.to_owned(), + access_operation: AccessOperation::Unspecified as i32, }), segment_id: value.segment.number, } @@ -58,6 +61,7 @@ impl From for StreamInfo { StreamInfo { scope: stream.scope.name, stream: stream.stream.name, + access_operation: AccessOperation::Unspecified as i32, } } } @@ -67,6 +71,7 @@ impl<'a> From<&'a ScopedStream> for StreamInfo { StreamInfo { scope: value.scope.name.to_owned(), stream: value.stream.name.to_owned(), + access_operation: AccessOperation::Unspecified as i32, } } } @@ -109,7 +114,9 @@ impl<'a> From<&'a StreamConfiguration> for StreamConfig { retention_policy: Some(RetentionPolicy { retention_type: value.retention.retention_type.to_owned() as i32, retention_param: value.retention.retention_param, + retention_max: i64::MAX, }), + tags: value.tags.as_ref().map(|tags| Tags { tag: tags.to_owned() }), } } } @@ -126,7 +133,35 @@ impl From for StreamConfig { retention_policy: Some(RetentionPolicy { retention_type: config.retention.retention_type as i32, retention_param: config.retention.retention_param, + retention_max: i64::MAX, }), + tags: config.tags.map(|tags| Tags { tag: tags }), + } + } +} +impl From for StreamConfiguration { + fn from(config: StreamConfig) -> StreamConfiguration { + // StreamInfo is mandatory, panic if not present. + let info: StreamInfo = config.stream_info.unwrap(); + // Scaling policy is mandatory, panic if not present. + let scaling_policy = config.scaling_policy.unwrap(); + + StreamConfiguration { + scoped_stream: ScopedStream::from(info), + scaling: Scaling { + scale_type: num::FromPrimitive::from_i32(scaling_policy.scale_type).unwrap(), + target_rate: scaling_policy.target_rate, + scale_factor: scaling_policy.scale_factor, + min_num_segments: scaling_policy.min_num_segments, + }, + retention: config + .retention_policy + .map(|ret| Retention { + retention_type: num::FromPrimitive::from_i32(ret.retention_type).unwrap(), + retention_param: ret.retention_param, + }) + .unwrap_or_default(), + tags: config.tags.map(|tags| tags.tag), } } } diff --git a/controller-client/src/paginator.rs b/controller-client/src/paginator.rs index 21d1dbd51..6c8c27af1 100644 --- a/controller-client/src/paginator.rs +++ b/controller-client/src/paginator.rs @@ -129,3 +129,65 @@ pub fn list_streams( get_next_stream_async, ) } + +pub fn list_streams_for_tag( + scope: Scope, + tag: String, + client: &dyn ControllerClient, +) -> impl Stream>> + '_ { + struct State { + streams: IntoIter, + scope: Scope, + tag: String, + token: CToken, + } + + // Initial state with an empty Continuation token. + let get_next_stream_async = move |mut state: State| async move { + if let Some(element) = state.streams.next() { + Some((Ok(element), state)) + } else { + // execute a request to the controller. + info!( + "Fetch the next set of streams with tag {} under scope {} using the provided token", + state.tag, state.scope + ); + let res: ResultRetry, CToken)>> = client + .list_streams_for_tag(&state.scope, &state.tag, &state.token) + .await; + match res { + Ok(None) => None, + Ok(Some((list, ct))) => { + // create a consuming iterator + let mut stream_iter = list.into_iter(); + Some(( + Ok(stream_iter.next()?), + State { + streams: stream_iter, + scope: state.scope.clone(), + tag: state.tag.clone(), + token: ct, + }, + )) + } + Err(e) => { + //log an error and return None to indicate end of stream. + error!( + "Error while attempting to list streams with tag {} under scope {}. Error: {:?}", + state.tag, state.scope, e + ); + None + } + } + } + }; + stream::unfold( + State { + streams: Vec::new().into_iter(), + scope, + tag, + token: CToken::empty(), + }, + get_next_stream_async, + ) +} diff --git a/integration_test/src/controller_tests.rs b/integration_test/src/controller_tests.rs index 0911eef39..74bb165d8 100644 --- a/integration_test/src/controller_tests.rs +++ b/integration_test/src/controller_tests.rs @@ -12,10 +12,13 @@ use crate::pravega_service::PravegaStandaloneServiceConfig; use pravega_client::client_factory::ClientFactory; use pravega_client_config::{ClientConfigBuilder, MOCK_CONTROLLER_URI}; use pravega_client_shared::*; +use pravega_controller_client::paginator::{list_streams, list_streams_for_tag}; use pravega_controller_client::ControllerClient; use std::sync::Arc; use tracing::info; +const SCOPE: &str = "testScope123"; + pub fn test_controller_apis(config: PravegaStandaloneServiceConfig) { let config = ClientConfigBuilder::default() .controller_uri(MOCK_CONTROLLER_URI) @@ -26,41 +29,132 @@ pub fn test_controller_apis(config: PravegaStandaloneServiceConfig) { let client_factory = ClientFactory::new(config); let controller = client_factory.controller_client(); - let scope_name = Scope::from("testScope123".to_owned()); - let stream_name = Stream::from("testStream".to_owned()); let handle = client_factory.runtime(); - - let scope_result = handle.block_on(controller.create_scope(&scope_name)); + // Create a Scope that is used by all the tests. + let scope_result = handle.block_on(controller.create_scope(&Scope::from(SCOPE.to_owned()))); info!("Response for create_scope is {:?}", scope_result); + // Inovke the tests. + handle.block_on(test_stream_tags(controller)); + //handle.block_on(test_scale_stream(controller)); +} + +pub async fn test_stream_tags(controller: &dyn ControllerClient) { + let scope_name = Scope::from(SCOPE.to_string()); + let stream_name = Stream::from("testTags".to_owned()); + let scoped_stream1 = ScopedStream { + scope: scope_name.clone(), + stream: stream_name, + }; let stream_cfg = StreamConfiguration { - scoped_stream: ScopedStream { - scope: scope_name, - stream: stream_name, - }, + scoped_stream: scoped_stream1.clone(), scaling: Scaling { scale_type: ScaleType::FixedNumSegments, target_rate: 0, scale_factor: 0, min_num_segments: 1, }, + retention: Default::default(), + tags: Some(vec!["tag1".to_string(), "tag2".to_string()]), + }; + + let stream_result = controller.create_stream(&stream_cfg).await; + info!("Response for create_stream is {:?}", stream_result); + let config_result = controller + .get_stream_configuration(&scoped_stream1) + .await + .unwrap(); + assert_eq!(config_result, stream_cfg); + info!("Response of get Stream Configuration is {:?}", config_result); + + let tags = controller.get_stream_tags(&scoped_stream1).await.unwrap(); + assert_eq!(tags, stream_cfg.tags); + info!("Response for getTags is {:?}", tags); + + let scoped_stream2 = ScopedStream { + scope: scope_name.clone(), + stream: Stream { + name: "testTags2".to_string(), + }, + }; + let stream_cfg = StreamConfiguration { + scoped_stream: scoped_stream2.clone(), + scaling: Scaling { + scale_type: ScaleType::ByRateInEventsPerSec, + target_rate: 10, + scale_factor: 2, + min_num_segments: 1, + }, retention: Retention { - retention_type: RetentionType::None, - retention_param: 0, + retention_type: RetentionType::Size, + retention_param: 1024 * 1024, }, + tags: Some(vec!["tag2".to_string(), "tag3".to_string()]), }; - - let stream_result = handle.block_on(controller.create_stream(&stream_cfg)); + let stream_result = controller.create_stream(&stream_cfg).await; info!("Response for create_stream is {:?}", stream_result); + let config_result = controller + .get_stream_configuration(&scoped_stream2) + .await + .unwrap(); + assert_eq!(config_result, stream_cfg); + info!("Response of get Stream Configuration is {:?}", config_result); + + let tags = controller.get_stream_tags(&scoped_stream2).await.unwrap(); + assert_eq!(tags, stream_cfg.tags); + info!("Response for getTags is {:?}", tags); + + // Verify listStreams for the specified tag. + let stream_list = get_all_streams_for_tag(controller, &scope_name, "tag2").await; + + assert_eq!(2, stream_list.len()); + assert!(stream_list.contains(&scoped_stream1)); + assert!(stream_list.contains(&scoped_stream2)); + + let stream_list = get_all_streams_for_tag(controller, &scope_name, "tag1").await; + + assert_eq!(1, stream_list.len()); + assert!(stream_list.contains(&scoped_stream1)); +} + +// Helper method to fetch all the streams for a tag. +async fn get_all_streams_for_tag( + controller: &dyn ControllerClient, + scope_name: &Scope, + tag: &str, +) -> Vec { + let mut result: Vec = Vec::new(); - handle.block_on(test_scale_stream(controller)); + let mut token = CToken::empty(); + while let Some((mut res, next_token)) = controller + .list_streams_for_tag(&scope_name, tag, &token) + .await + .unwrap() + { + result.append(&mut res); + token = next_token; + } + result } pub async fn test_scale_stream(controller: &dyn ControllerClient) { let scoped_stream = ScopedStream { scope: Scope::from("testScope123".to_owned()), - stream: Stream::from("testStream".to_owned()), + stream: Stream::from("testStreamScale".to_owned()), + }; + let stream_cfg = StreamConfiguration { + scoped_stream: scoped_stream.clone(), + scaling: Scaling { + scale_type: ScaleType::FixedNumSegments, + target_rate: 0, + scale_factor: 0, + min_num_segments: 1, + }, + retention: Default::default(), + tags: Some(vec!["tag1".to_string(), "tag2".to_string()]), }; + let stream_result = controller.create_stream(&stream_cfg).await; + info!("Response of create stream is {:?}", stream_result); let current_segments_result = controller.get_current_segments(&scoped_stream).await; info!( diff --git a/integration_test/src/disconnection_tests.rs b/integration_test/src/disconnection_tests.rs index 1a9d9c11b..fb8a294a1 100644 --- a/integration_test/src/disconnection_tests.rs +++ b/integration_test/src/disconnection_tests.rs @@ -124,6 +124,7 @@ async fn create_scope_stream(controller_client: &dyn ControllerClient) { retention_type: RetentionType::None, retention_param: 0, }, + tags: None, }; let retry_policy = RetryWithBackoff::default().max_tries(10); let result = retry_async(retry_policy, || async { diff --git a/integration_test/src/event_reader_tests.rs b/integration_test/src/event_reader_tests.rs index 5d6a28d0e..e3b27c55b 100644 --- a/integration_test/src/event_reader_tests.rs +++ b/integration_test/src/event_reader_tests.rs @@ -727,6 +727,7 @@ async fn create_scope_stream( retention_type: RetentionType::None, retention_param: 0, }, + tags: None, }; controller_client .create_stream(&request) diff --git a/integration_test/src/transactional_event_writer_tests.rs b/integration_test/src/transactional_event_writer_tests.rs index ffeb69b27..33fd8abf0 100644 --- a/integration_test/src/transactional_event_writer_tests.rs +++ b/integration_test/src/transactional_event_writer_tests.rs @@ -196,6 +196,7 @@ async fn setup_test(scope_name: &Scope, stream_name: &Stream, controller_client: retention_type: RetentionType::None, retention_param: 0, }, + tags: None, }; controller_client .create_stream(&request) diff --git a/integration_test/src/utils.rs b/integration_test/src/utils.rs index 60e08fbe4..a01b6bf67 100644 --- a/integration_test/src/utils.rs +++ b/integration_test/src/utils.rs @@ -40,6 +40,7 @@ pub(crate) async fn create_scope_stream( retention_type: RetentionType::None, retention_param: 0, }, + tags: None, }; controller_client .create_stream(&request) diff --git a/integration_test/src/wirecommand_tests.rs b/integration_test/src/wirecommand_tests.rs index 983974d99..f96af784b 100644 --- a/integration_test/src/wirecommand_tests.rs +++ b/integration_test/src/wirecommand_tests.rs @@ -135,6 +135,7 @@ async fn test_hello(factory: &ClientFactory) { retention_type: RetentionType::None, retention_param: 0, }, + tags: None, }; controller_client .create_stream(&request) diff --git a/python_binding/src/lib.rs b/python_binding/src/lib.rs index 4e3a50378..76503e24a 100644 --- a/python_binding/src/lib.rs +++ b/python_binding/src/lib.rs @@ -7,7 +7,6 @@ // // http://www.apache.org/licenses/LICENSE-2.0 // -#![allow(clippy::from_over_into)] #[macro_use] extern crate cfg_if; diff --git a/python_binding/src/stream_manager.rs b/python_binding/src/stream_manager.rs index d7b2d528a..4f334e553 100644 --- a/python_binding/src/stream_manager.rs +++ b/python_binding/src/stream_manager.rs @@ -140,6 +140,7 @@ impl StreamManager { retention_type: RetentionType::None, retention_param: 0, }, + tags: None, }; let controller = self.cf.controller_client(); diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 2e84ffe43..4f2743a95 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -25,3 +25,5 @@ murmurhash3 = "0.0.5" encoding_rs = "0.8" derive_more = "0.99.9" tokio = { version = "1.1", features = ["full"] } +num-traits = "0.2" +num-derive = "0.3" diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 1363c1d42..fbfe469bf 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -50,6 +50,9 @@ extern crate shrinkwraprs; #[macro_use] extern crate derive_new; +#[macro_use] +extern crate num_derive; + #[derive(From, Shrinkwrap, Debug, Clone, Hash, PartialEq, Eq)] pub struct PravegaNodeUri(pub String); @@ -327,7 +330,7 @@ impl Display for WriterId { } } -#[derive(Debug, Clone, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, FromPrimitive)] pub enum ScaleType { FixedNumSegments = 0, ByRateInKbytesPerSec = 1, @@ -353,7 +356,7 @@ impl Default for Scaling { } } -#[derive(Debug, Clone, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, FromPrimitive)] pub enum RetentionType { None = 0, Time = 1, @@ -396,6 +399,7 @@ pub struct StreamConfiguration { pub scoped_stream: ScopedStream, pub scaling: Scaling, pub retention: Retention, + pub tags: Option>, } #[derive(new, Debug, Clone)] diff --git a/src/util/mod.rs b/src/util/mod.rs index 67e2f6b90..ce2529ad9 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -82,6 +82,7 @@ pub(crate) async fn create_stream(factory: &ClientFactory, scope: &str, stream: retention_type: RetentionType::None, retention_param: 0, }, + tags: None, }) .await .unwrap(); From a5efd3f343d8e9e9d221e4b8322003f6e0fc54d1 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Wed, 2 Jun 2021 17:31:55 +0530 Subject: [PATCH 02/12] Remove deprecated field in proto file. Enable CLI for tags. Signed-off-by: Sandeep --- controller-client/proto/Controller.proto | 1 - controller-client/src/cli.rs | 29 +++++++++++- controller-client/src/lib.rs | 3 +- controller-client/src/paginator.rs | 58 ++++++++++++++++++++++++ integration_test/src/controller_tests.rs | 11 +++++ 5 files changed, 98 insertions(+), 4 deletions(-) diff --git a/controller-client/proto/Controller.proto b/controller-client/proto/Controller.proto index c0c7e0069..f40de879e 100644 --- a/controller-client/proto/Controller.proto +++ b/controller-client/proto/Controller.proto @@ -433,7 +433,6 @@ message TxnId { message CreateTxnRequest { StreamInfo streamInfo = 1; int64 lease = 2; - int64 scaleGracePeriod = 3 [deprecated=true]; } message CreateTxnResponse { diff --git a/controller-client/src/cli.rs b/controller-client/src/cli.rs index 51cc0d9b5..e79991782 100644 --- a/controller-client/src/cli.rs +++ b/controller-client/src/cli.rs @@ -33,6 +33,8 @@ enum Command { stream_name: String, #[structopt(help = "Segment Count")] segment_count: i32, + #[structopt(help = "tag", value_name = "Tag,", use_delimiter = true, min_values = 0)] + tags: Vec, }, /// Seal a Stream. SealStream { @@ -53,6 +55,13 @@ enum Command { #[structopt(help = "Scope Name")] scope_name: String, }, + /// List Streams for a tag, under a scope + ListStreamsForTag { + #[structopt(help = "Scope Name")] + scope_name: String, + #[structopt(help = "Tag Name")] + tag: String, + }, } #[derive(StructOpt, Debug)] @@ -93,6 +102,7 @@ fn main() { scope_name, stream_name, segment_count, + tags, } => { let stream_cfg = StreamConfiguration { scoped_stream: ScopedStream { @@ -109,7 +119,7 @@ fn main() { retention_type: RetentionType::None, retention_param: 0, }, - tags: None, + tags: if tags.is_empty() { None } else { Some(tags) }, }; let result = rt.block_on(controller_client.create_stream(&stream_cfg)); println!("Stream creation status {:?}", result); @@ -147,5 +157,22 @@ fn main() { future::ready(()) })); } + Command::ListStreamsForTag { scope_name, tag } => { + use futures::future; + use futures::stream::StreamExt; + use pravega_controller_client::paginator::list_streams_for_tag; + + let scope = Scope::from(scope_name.clone()); + let stream = list_streams_for_tag(scope, tag.clone(), &controller_client); + println!("Listing streams with tag {:?} under scope {:?}", tag, scope_name); + rt.block_on(stream.for_each(|stream| { + if stream.is_ok() { + println!("{:?}", stream.unwrap()); + } else { + println!("Error while fetching data from Controller. Details: {:?}", stream); + } + future::ready(()) + })); + } } } diff --git a/controller-client/src/lib.rs b/controller-client/src/lib.rs index 2090c8077..8a1e200a6 100644 --- a/controller-client/src/lib.rs +++ b/controller-client/src/lib.rs @@ -149,7 +149,7 @@ pub trait ControllerClient: Send + Sync { /** * API to list streams associated with the given tag under a given scope and continuation token. - * Use the pravega_controller_client::paginator::list_streams to paginate over all the streams. + * Use the pravega_controller_client::paginator::list_streams_for_tag to paginate over all the streams. */ async fn list_streams_for_tag( &self, @@ -1122,7 +1122,6 @@ impl ControllerClientImpl { let request = CreateTxnRequest { stream_info: Some(StreamInfo::from(stream)), lease: lease.as_millis() as i64, - scale_grace_period: 0, }; let op_status: StdResult, tonic::Status> = self .get_controller_client() diff --git a/controller-client/src/paginator.rs b/controller-client/src/paginator.rs index 6c8c27af1..5fb026b3e 100644 --- a/controller-client/src/paginator.rs +++ b/controller-client/src/paginator.rs @@ -130,6 +130,64 @@ pub fn list_streams( ) } +/// +///Helper method to iterated over the all the Pravega streams under the provided Scope. +///This method returns a stream of values,Pravega streams, produced asynchronously. +/// +/// The below snippets show case the example uses. +/// Sample 1: +///```ignore +/// # futures::executor::block_on(async { +/// use pravega_client_shared::Scope; +/// use pravega_client_shared::ScopedStream; +/// use pravega_controller_client::paginator::list_streams_for_tag; +/// use pravega_client::client_factory::ClientFactory; +/// use pravega_client_config::ClientConfigBuilder; +/// use pravega_client_config::MOCK_CONTROLLER_URI; +/// let config = ClientConfigBuilder::default() +/// .controller_uri(MOCK_CONTROLLER_URI) +/// .build() +/// .expect("creating config"); +/// let controller_client = ClientFactory::new(config).get_controller_client(); +/// let stream = list_streams_for_tag( +/// Scope { +/// name: "testScope".to_string(), +/// }, +/// "tagx".to_string(), +/// controller_client, +/// ); +/// // collect all the Streams in a single vector +/// let stream_list:Vec = stream.map(|str| str.unwrap()).collect::>().await; +/// # }); +/// ``` +/// +/// Sample 2: +/// ```ignore +/// # futures::executor::block_on(async { +/// use pravega_client_shared::Scope; +/// use pravega_controller_client::paginator::list_streams_for_tag; +/// use pravega_client::client_factory::ClientFactory; +/// use pravega_client_config::ClientConfigBuilder; +/// use pravega_client_config::MOCK_CONTROLLER_URI; +/// use futures::StreamExt; +/// let config = ClientConfigBuilder::default() +/// .controller_uri(MOCK_CONTROLLER_URI) +/// .build() +/// .expect("creating config"); +/// let controller_client = ClientFactory::new(config).get_controller_client(); +/// let mut stream = list_streams_for_tag( +/// Scope { +/// name: "testScope".to_string(), +/// }, +/// "tagx".to_string(), +/// controller_client, +/// ); +/// let pravega_stream_1 = stream.next().await; +/// let pravega_stream_2 = stream.next().await; +/// // A None is returned at the end of the stream. +/// # }); +/// ``` +/// pub fn list_streams_for_tag( scope: Scope, tag: String, diff --git a/integration_test/src/controller_tests.rs b/integration_test/src/controller_tests.rs index 74bb165d8..19a9436f5 100644 --- a/integration_test/src/controller_tests.rs +++ b/integration_test/src/controller_tests.rs @@ -115,6 +115,17 @@ pub async fn test_stream_tags(controller: &dyn ControllerClient) { assert_eq!(1, stream_list.len()); assert!(stream_list.contains(&scoped_stream1)); + + use futures::StreamExt; + let stream = list_streams_for_tag(scope_name, "tag2".to_string(), controller); + futures::pin_mut!(stream); + let stream_list: Vec = stream + .map(|str| str.unwrap()) + .collect::>() + .await; + assert_eq!(2, stream_list.len()); + assert!(stream_list.contains(&scoped_stream1)); + assert!(stream_list.contains(&scoped_stream2)); } // Helper method to fetch all the streams for a tag. From d724f50345e3aaa93376fdd1f841d27676c6577f Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 22 Jun 2021 17:05:02 +0530 Subject: [PATCH 03/12] Clippy and mock controller fixes. Signed-off-by: Sandeep --- controller-client/src/lib.rs | 98 ++++++++++++++++++++++++++++++++++++ shared/src/lib.rs | 6 +-- 2 files changed, 100 insertions(+), 4 deletions(-) diff --git a/controller-client/src/lib.rs b/controller-client/src/lib.rs index 8a1e200a6..8d0be1585 100644 --- a/controller-client/src/lib.rs +++ b/controller-client/src/lib.rs @@ -1954,6 +1954,104 @@ mod test { }; Ok(Response::new(reply)) } + async fn list_scopes( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not Implemented")) + } + + async fn check_scope_exists( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not Implemented")) + } + async fn check_stream_exists( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not Implemented")) + } + + async fn create_key_value_table( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + Err(tonic::Status::unimplemented("Not Implemented")) + } + async fn get_current_segments_key_value_table( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not Implemented")) + } + async fn list_key_value_tables_in_scope( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + Err(tonic::Status::unimplemented("Not Implemented")) + } + async fn delete_key_value_table( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not Implemented")) + } + async fn list_subscribers( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not Implemented")) + } + async fn update_subscriber_stream_cut( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not Implemented")) + } + async fn create_reader_group( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + Err(tonic::Status::unimplemented("Not Implemented")) + } + async fn get_reader_group_config( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + Err(tonic::Status::unimplemented("Not Implemented")) + } + async fn delete_reader_group( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + Err(tonic::Status::unimplemented("Not Implemented")) + } + async fn update_reader_group( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + Err(tonic::Status::unimplemented("Not Implemented")) + } + async fn get_stream_configuration( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not Implemented")) + } + async fn list_streams_in_scope_for_tag( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not Implemented")) + } } async fn run_server() { diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 82b0f24e8..f4a073d15 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -254,10 +254,10 @@ impl From<&str> for ScopedSegment { ScopedSegment::from(original_segment_name) } else { let mut tokens = NameUtils::extract_segment_tokens(qualified_name.to_owned()); + let segment_id = tokens.pop().expect("get segment id from tokens"); + let stream_name = tokens.pop().expect("get stream name from tokens"); if tokens.len() == 2 { // scope not present - let segment_id = tokens.pop().expect("get segment id from tokens"); - let stream_name = tokens.pop().expect("get stream name from tokens"); ScopedSegment { scope: Scope { name: String::from(""), @@ -269,8 +269,6 @@ impl From<&str> for ScopedSegment { }, } } else { - let segment_id = tokens.pop().expect("get segment id from tokens"); - let stream_name = tokens.pop().expect("get stream name from tokens"); let scope = tokens.pop().expect("get scope from tokens"); ScopedSegment { scope: Scope { name: scope }, From 9c4a79d222a68a367fe1cf90e282a31c60ad0c51 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 22 Jun 2021 17:26:01 +0530 Subject: [PATCH 04/12] Clippy fixes. Signed-off-by: Sandeep --- examples/event_write_and_read.rs | 1 + src/client_factory.rs | 6 ++---- src/event/reader.rs | 2 +- src/segment/metadata.rs | 12 +++--------- src/sync/synchronizer.rs | 9 +++------ 5 files changed, 10 insertions(+), 20 deletions(-) diff --git a/examples/event_write_and_read.rs b/examples/event_write_and_read.rs index a6246ae80..cf203462a 100644 --- a/examples/event_write_and_read.rs +++ b/examples/event_write_and_read.rs @@ -53,6 +53,7 @@ fn main() { retention_type: RetentionType::None, retention_param: 0, }, + tags: None, }; controller_client .create_stream(&stream_config) diff --git a/src/client_factory.rs b/src/client_factory.rs index 8011bb3fd..3ba4b457c 100644 --- a/src/client_factory.rs +++ b/src/client_factory.rs @@ -226,13 +226,11 @@ impl ClientFactoryInternal { stream: ScopedStream, ) -> DelegationTokenProvider { let token_provider = DelegationTokenProvider::new(stream); - if self.config.is_auth_enabled { - token_provider - } else { + if !self.config.is_auth_enabled { let empty_token = DelegationToken::new("".to_string(), None); token_provider.populate(empty_token).await; - token_provider } + token_provider } pub(crate) fn get_connection_pool(&self) -> &ConnectionPool { diff --git a/src/event/reader.rs b/src/event/reader.rs index 4f39a6214..efa021cc0 100644 --- a/src/event/reader.rs +++ b/src/event/reader.rs @@ -486,7 +486,7 @@ impl EventReader { } else { //None is sent if the the segment is released from the reader. debug!("ignore the received data since None was returned"); - return None; + None } } Err((e, offset)) => { diff --git a/src/segment/metadata.rs b/src/segment/metadata.rs index 00afd8170..3a28c9852 100644 --- a/src/segment/metadata.rs +++ b/src/segment/metadata.rs @@ -98,10 +98,8 @@ impl SegmentMetadataClient { Err(e) => { if e.is_token_expired() { self.delegation_token_provider.signal_token_expiry(); - RetryResult::Retry(e.to_string()) - } else { - RetryResult::Retry(e.to_string()) } + RetryResult::Retry(e.to_string()) } } }) @@ -159,10 +157,8 @@ impl SegmentMetadataClient { Err(e) => { if e.is_token_expired() { self.delegation_token_provider.signal_token_expiry(); - RetryResult::Retry(e.to_string()) - } else { - RetryResult::Retry(e.to_string()) } + RetryResult::Retry(e.to_string()) } } }) @@ -208,10 +204,8 @@ impl SegmentMetadataClient { Err(e) => { if e.is_token_expired() { self.delegation_token_provider.signal_token_expiry(); - RetryResult::Retry(e.to_string()) - } else { - RetryResult::Retry(e.to_string()) } + RetryResult::Retry(e.to_string()) } } }) diff --git a/src/sync/synchronizer.rs b/src/sync/synchronizer.rs index de8f40a13..b346fa956 100644 --- a/src/sync/synchronizer.rs +++ b/src/sync/synchronizer.rs @@ -350,19 +350,16 @@ impl InternalKey { let outer_name_length: usize = self.key[..PREFIX_LENGTH].parse().expect("parse prefix length"); assert!(self.key.len() >= PREFIX_LENGTH + outer_name_length); + let outer = self.key[PREFIX_LENGTH..PREFIX_LENGTH + outer_name_length] + .parse::() + .expect("parse outer key"); if self.key.len() > PREFIX_LENGTH + outer_name_length { - let outer = self.key[PREFIX_LENGTH..PREFIX_LENGTH + outer_name_length] - .parse::() - .expect("parse outer key"); // there is a slash separating outer_key and_inner key let inner = self.key[PREFIX_LENGTH + outer_name_length + 1..] .parse::() .expect("parse inner key"); (outer, Some(inner)) } else { - let outer = self.key[PREFIX_LENGTH..PREFIX_LENGTH + outer_name_length] - .parse::() - .expect("parse outer key"); (outer, None) } } From eec2a1d1a73a14ae10acd168f6bf7d9c26daf34a Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 22 Jun 2021 17:44:25 +0530 Subject: [PATCH 05/12] Clippy test fixes. Signed-off-by: Sandeep --- integration_test/src/event_writer_tests.rs | 10 +++++----- integration_test/src/table_tests.rs | 6 +++--- src/segment/selector.rs | 1 + 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/integration_test/src/event_writer_tests.rs b/integration_test/src/event_writer_tests.rs index 60b47d349..3f9d65039 100644 --- a/integration_test/src/event_writer_tests.rs +++ b/integration_test/src/event_writer_tests.rs @@ -110,7 +110,7 @@ async fn test_simple_write(writer: &mut EventWriter) { for rx in receivers { let reply: Result<(), Error> = rx.await.expect("wait for result from oneshot"); - assert_eq!(reply.is_ok(), true); + assert!(reply.is_ok()); } info!("test simple write passed"); } @@ -150,7 +150,7 @@ async fn test_segment_scaling_up(writer: &mut EventWriter, factory: &ClientFacto for rx in receivers { let reply: Result<(), Error> = rx.await.expect("wait for result from oneshot"); - assert_eq!(reply.is_ok(), true); + assert!(reply.is_ok()); } info!("test event stream writer with segment scaled up passed"); @@ -189,7 +189,7 @@ async fn test_segment_scaling_down(writer: &mut EventWriter, factory: &ClientFac for rx in receivers { let reply: Result<(), Error> = rx.await.expect("wait for result from oneshot"); - assert_eq!(reply.is_ok(), true); + assert!(reply.is_ok()); } info!("test event stream writer with segment sealed passed"); } @@ -198,7 +198,7 @@ async fn test_write_correctness(writer: &mut EventWriter, factory: &ClientFactor info!("test read and write"); let rx = writer.write_event(String::from("event0").into_bytes()).await; let reply: Result<(), Error> = rx.await.expect("wait for result from oneshot"); - assert_eq!(reply.is_ok(), true); + assert!(reply.is_ok()); let scope_name = Scope::from("testScopeWriter2".to_owned()); let stream_name = Stream::from("testStreamWriter2".to_owned()); let segment_name = ScopedSegment { @@ -274,7 +274,7 @@ async fn test_write_correctness_while_scaling(writer: &mut EventWriter, factory: // the data should write successfully. for rx in receivers { let reply: Result<(), Error> = rx.await.expect("wait for result from oneshot"); - assert_eq!(reply.is_ok(), true); + assert!(reply.is_ok()); } let segment_name = ScopedSegment { diff --git a/integration_test/src/table_tests.rs b/integration_test/src/table_tests.rs index 706e4c9c3..ea728775f 100644 --- a/integration_test/src/table_tests.rs +++ b/integration_test/src/table_tests.rs @@ -298,7 +298,7 @@ async fn test_iterators(client_factory: &ClientFactory) { Ok(t) => { let k: String = t.0; info!("key {:?} version {:?}", k, t.1); - assert_eq!(false, k.is_empty()); + assert!(!k.is_empty()); key_count += 1; } _ => panic!("Failed fetch keys."), @@ -317,8 +317,8 @@ async fn test_iterators(client_factory: &ClientFactory) { let v: String = t.1; info!("key {:?} value {:?} version {:?}", k, v, t.2); - assert_eq!(false, k.is_empty()); - assert_eq!(false, v.is_empty()); + assert!(!k.is_empty()); + assert!(!v.is_empty()); entry_count += 1; } _ => panic!("Failed fetch entries."), diff --git a/src/segment/selector.rs b/src/segment/selector.rs index 3bdcc65da..1fecb48d9 100644 --- a/src/segment/selector.rs +++ b/src/segment/selector.rs @@ -278,6 +278,7 @@ pub(crate) mod test { retention_type: RetentionType::None, retention_param: 0, }, + tags: None, }) .await .unwrap(); From d10c30ea2924ac5d0b5d6990a010f53adc105923 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 22 Jun 2021 17:52:00 +0530 Subject: [PATCH 06/12] Clippy test fixes. Signed-off-by: Sandeep --- integration_test/src/event_writer_tests.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/integration_test/src/event_writer_tests.rs b/integration_test/src/event_writer_tests.rs index 3f9d65039..1179c23f4 100644 --- a/integration_test/src/event_writer_tests.rs +++ b/integration_test/src/event_writer_tests.rs @@ -253,8 +253,8 @@ async fn test_write_correctness_while_scaling(writer: &mut EventWriter, factory: assert_eq!(2, current_segments_result.unwrap().key_segment_map.len()); } + let data = format!("event{}", i); if i % 2 == 0 { - let data = format!("event{}", i); // Routing key "even" and "odd" works is because their hashed value are in // 0~0.5 and 0.5~1.0 respectively. If the routing key hashing algorithm changed, this // test might fail. @@ -263,7 +263,6 @@ async fn test_write_correctness_while_scaling(writer: &mut EventWriter, factory: .await; receivers.push(rx); } else { - let data = format!("event{}", i); let rx = writer .write_event_by_routing_key(String::from("odd"), data.into_bytes()) .await; @@ -351,14 +350,13 @@ async fn test_write_correctness_with_routing_key(writer: &mut EventWriter, facto let stream_name = Stream::from("testStreamWriter3".to_owned()); let mut receivers = vec![]; while i < count { + let data = format!("event{}", i); if i % 2 == 0 { - let data = format!("event{}", i); let rx = writer .write_event_by_routing_key(String::from("even"), data.into_bytes()) .await; receivers.push(rx); } else { - let data = format!("event{}", i); let rx = writer .write_event_by_routing_key(String::from("odd"), data.into_bytes()) .await; From 3edf0756be84dfa81926f65a13767071817adcca Mon Sep 17 00:00:00 2001 From: Sandeep Date: Wed, 23 Jun 2021 16:04:07 +0530 Subject: [PATCH 07/12] Fix Pravega version for standalone. Signed-off-by: Sandeep --- integration_test/build.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration_test/build.rs b/integration_test/build.rs index b5a0f7213..64f82a866 100644 --- a/integration_test/build.rs +++ b/integration_test/build.rs @@ -16,8 +16,8 @@ use tar::Archive; use tracing::info; const LIBRARY: &str = "pravega"; -const VERSION: &str = "0.10.0-2876.1c3854e-SNAPSHOT"; -const TAG: &str = "refresh-cert"; +const VERSION: &str = "0.10.0-2906.6a34e64-SNAPSHOT"; +const TAG: &str = "stream-tags"; const BASE: &str = "./"; fn main() { @@ -47,7 +47,7 @@ fn remove_suffix(value: &mut String, suffix: &str) { /// Downloads and unpacks a prebuilt binary. Only works for certain platforms. fn install_prebuilt() { let url = format!( - "https://github.com/Tristan1900/pravega/releases/download/{}/pravega-{}.tgz", + "https://github.com/shrids/pravega/releases/download/{}/pravega-{}.tgz", TAG, VERSION ); let short_file_name = url.split('/').last().unwrap(); From af8fab4f02d7f7e816b6c5297ae0e8d2e59bba06 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Wed, 23 Jun 2021 16:39:45 +0530 Subject: [PATCH 08/12] Bug fix. Signed-off-by: Sandeep --- shared/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shared/src/lib.rs b/shared/src/lib.rs index f4a073d15..d59029d02 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -256,7 +256,7 @@ impl From<&str> for ScopedSegment { let mut tokens = NameUtils::extract_segment_tokens(qualified_name.to_owned()); let segment_id = tokens.pop().expect("get segment id from tokens"); let stream_name = tokens.pop().expect("get stream name from tokens"); - if tokens.len() == 2 { + if tokens.is_empty() { // scope not present ScopedSegment { scope: Scope { From 3bc08d9dd5d2ffb6d978bc8c3dcfd971e4682010 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Thu, 24 Jun 2021 12:14:09 +0530 Subject: [PATCH 09/12] CR Changes: improve rust doc check. Signed-off-by: Sandeep --- controller-client/src/paginator.rs | 143 ++++++++++++++--------------- 1 file changed, 67 insertions(+), 76 deletions(-) diff --git a/controller-client/src/paginator.rs b/controller-client/src/paginator.rs index 5fb026b3e..6402c9814 100644 --- a/controller-client/src/paginator.rs +++ b/controller-client/src/paginator.rs @@ -23,54 +23,49 @@ use tracing::info; /// /// The below snippets show case the example uses. /// Sample 1: -///```ignore -/// # futures::executor::block_on(async { +///``` +/// # use tonic::transport::Channel; +/// # use pravega_controller_client::controller::controller_service_client::ControllerServiceClient; +/// use pravega_controller_client::ControllerClient; +/// # async fn call_list_stream(controller_client: &dyn ControllerClient) { /// use pravega_client_shared::Scope; /// use pravega_client_shared::ScopedStream; +/// use futures::future; +/// use futures::stream::StreamExt; /// use pravega_controller_client::paginator::list_streams; -/// use pravega_client::client_factory::ClientFactory; -/// use pravega_client_config::ClientConfigBuilder; -/// use pravega_client_config::MOCK_CONTROLLER_URI; -/// let config = ClientConfigBuilder::default() -/// .controller_uri(MOCK_CONTROLLER_URI) -/// .build() -/// .expect("creating config"); -/// let controller_client = ClientFactory::new(config).get_controller_client(); -/// let stream = list_streams( -/// Scope { -/// name: "testScope".to_string(), -/// }, -/// controller_client, -/// ); -/// // collect all the Streams in a single vector -/// let stream_list:Vec = stream.map(|str| str.unwrap()).collect::>().await; -/// # }); +/// let stream = list_streams( +/// Scope { +/// name: "testScope".to_string(), +/// }, +/// controller_client, +/// ); +/// // collect all the Streams in a single vector +/// let stream_list:Vec = stream.map(|str| str.unwrap()).collect::>().await; +/// # } /// ``` /// /// Sample 2: -/// ```ignore -/// # futures::executor::block_on(async { +/// ``` +/// # use tonic::transport::Channel; +/// # use pravega_controller_client::controller::controller_service_client::ControllerServiceClient; +/// use pravega_controller_client::ControllerClient; +/// # async fn call_list_stream(controller_client: &dyn ControllerClient) { /// use pravega_client_shared::Scope; +/// use pravega_client_shared::ScopedStream; +/// use futures::future; +/// use futures::stream::StreamExt; /// use pravega_controller_client::paginator::list_streams; -/// use pravega_client::client_factory::ClientFactory; -/// use pravega_client_config::ClientConfigBuilder; -/// use pravega_client_config::MOCK_CONTROLLER_URI; -/// use futures::StreamExt; -/// let config = ClientConfigBuilder::default() -/// .controller_uri(MOCK_CONTROLLER_URI) -/// .build() -/// .expect("creating config"); -/// let controller_client = ClientFactory::new(config).get_controller_client(); -/// let mut stream = list_streams( -/// Scope { -/// name: "testScope".to_string(), -/// }, -/// controller_client, -/// ); +/// let stream = list_streams( +/// Scope { +/// name: "testScope".to_string(), +/// }, +/// controller_client, +/// ); +/// futures::pin_mut!(stream); /// let pravega_stream_1 = stream.next().await; /// let pravega_stream_2 = stream.next().await; /// // A None is returned at the end of the stream. -/// # }); +/// # } /// ``` /// pub fn list_streams( @@ -135,57 +130,53 @@ pub fn list_streams( ///This method returns a stream of values,Pravega streams, produced asynchronously. /// /// The below snippets show case the example uses. +/// /// Sample 1: -///```ignore -/// # futures::executor::block_on(async { +/// ``` +/// # use tonic::transport::Channel; +/// # use pravega_controller_client::controller::controller_service_client::ControllerServiceClient; +/// use pravega_controller_client::ControllerClient; +/// # async fn call_list_stream(controller_client: &dyn ControllerClient) { /// use pravega_client_shared::Scope; /// use pravega_client_shared::ScopedStream; +/// use futures::future; +/// use futures::stream::StreamExt; /// use pravega_controller_client::paginator::list_streams_for_tag; -/// use pravega_client::client_factory::ClientFactory; -/// use pravega_client_config::ClientConfigBuilder; -/// use pravega_client_config::MOCK_CONTROLLER_URI; -/// let config = ClientConfigBuilder::default() -/// .controller_uri(MOCK_CONTROLLER_URI) -/// .build() -/// .expect("creating config"); -/// let controller_client = ClientFactory::new(config).get_controller_client(); -/// let stream = list_streams_for_tag( -/// Scope { -/// name: "testScope".to_string(), -/// }, -/// "tagx".to_string(), -/// controller_client, -/// ); -/// // collect all the Streams in a single vector -/// let stream_list:Vec = stream.map(|str| str.unwrap()).collect::>().await; -/// # }); +/// let stream = list_streams_for_tag( +/// Scope { +/// name: "testScope".to_string(), +/// }, +/// "tagx".to_string(), +/// controller_client, +/// ); +/// // collect all the Streams in a single vector +/// let stream_list:Vec = stream.map(|str| str.unwrap()).collect::>().await; +/// # } /// ``` /// /// Sample 2: -/// ```ignore -/// # futures::executor::block_on(async { +/// ``` +/// # use tonic::transport::Channel; +/// # use pravega_controller_client::controller::controller_service_client::ControllerServiceClient; +/// use pravega_controller_client::ControllerClient; +/// # async fn call_list_stream(controller_client: &dyn ControllerClient) { /// use pravega_client_shared::Scope; +/// use pravega_client_shared::ScopedStream; +/// use futures::future; +/// use futures::stream::StreamExt; /// use pravega_controller_client::paginator::list_streams_for_tag; -/// use pravega_client::client_factory::ClientFactory; -/// use pravega_client_config::ClientConfigBuilder; -/// use pravega_client_config::MOCK_CONTROLLER_URI; -/// use futures::StreamExt; -/// let config = ClientConfigBuilder::default() -/// .controller_uri(MOCK_CONTROLLER_URI) -/// .build() -/// .expect("creating config"); -/// let controller_client = ClientFactory::new(config).get_controller_client(); -/// let mut stream = list_streams_for_tag( -/// Scope { -/// name: "testScope".to_string(), -/// }, -/// "tagx".to_string(), -/// controller_client, -/// ); +/// let stream = list_streams_for_tag( +/// Scope { +/// name: "testScope".to_string(), +/// }, +/// "tagx".to_string(), +/// controller_client, +/// ); +/// futures::pin_mut!(stream); /// let pravega_stream_1 = stream.next().await; /// let pravega_stream_2 = stream.next().await; /// // A None is returned at the end of the stream. -/// # }); +/// # } /// ``` /// pub fn list_streams_for_tag( From 2c72052890fd8948d3742a3ce5245a8d6da94e54 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Wed, 30 Jun 2021 18:31:25 +0530 Subject: [PATCH 10/12] Enable tag apis in python bindings. Signed-off-by: Sandeep --- benches/benchmark.rs | 6 +- controller-client/src/cli.rs | 5 +- controller-client/src/test.rs | 5 +- python_binding/src/lib.rs | 3 + python_binding/src/stream_manager.rs | 222 ++++++++++++++++++-- python_binding/tests/pravega_client_test.py | 37 ++++ shared/src/lib.rs | 6 +- 7 files changed, 254 insertions(+), 30 deletions(-) diff --git a/benches/benchmark.rs b/benches/benchmark.rs index 905f569cb..957305892 100644 --- a/benches/benchmark.rs +++ b/benches/benchmark.rs @@ -408,10 +408,8 @@ async fn create_scope_stream( scale_factor: 0, min_num_segments: segment_number, }, - retention: Retention { - retention_type: RetentionType::None, - retention_param: 0, - }, + retention: Default::default(), + tags: None, }; controller_client .create_stream(&request) diff --git a/controller-client/src/cli.rs b/controller-client/src/cli.rs index 3bac7476d..924bd6dad 100644 --- a/controller-client/src/cli.rs +++ b/controller-client/src/cli.rs @@ -115,10 +115,7 @@ fn main() { scale_factor: 0, min_num_segments: segment_count, }, - retention: Retention { - retention_type: RetentionType::None, - retention_param: 0, - }, + retention: Default::default(), tags: if tags.is_empty() { None } else { Some(tags) }, }; let result = rt.block_on(controller_client.create_stream(&stream_cfg)); diff --git a/controller-client/src/test.rs b/controller-client/src/test.rs index 54ac70043..bcc963b1d 100644 --- a/controller-client/src/test.rs +++ b/controller-client/src/test.rs @@ -59,10 +59,7 @@ fn test_create_stream_error() { scale_factor: 0, min_num_segments: 1, }, - retention: Retention { - retention_type: RetentionType::None, - retention_param: 0, - }, + retention: Default::default(), }; let create_stream_result = rt.block_on(client.create_stream(&request)); assert!(create_stream_result.is_err()); diff --git a/python_binding/src/lib.rs b/python_binding/src/lib.rs index 76503e24a..eba09712b 100644 --- a/python_binding/src/lib.rs +++ b/python_binding/src/lib.rs @@ -22,6 +22,7 @@ cfg_if! { if #[cfg(feature = "python_binding")] { use pyo3::prelude::*; use stream_manager::StreamManager; + use stream_manager::{StreamScalingPolicy, StreamRetentionPolicy}; #[macro_use] extern crate derive_new; use stream_writer::StreamWriter; @@ -49,6 +50,8 @@ fn pravega_client(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; let txn_exception = py.get_type::(); txn_exception.setattr("__doc__", TXNFAILED_EXCEPTION_DOCSTRING)?; m.add("TxnFailedException", txn_exception)?; diff --git a/python_binding/src/stream_manager.rs b/python_binding/src/stream_manager.rs index 13d48b89b..2594d2b0b 100644 --- a/python_binding/src/stream_manager.rs +++ b/python_binding/src/stream_manager.rs @@ -44,6 +44,115 @@ pub(crate) struct StreamManager { config: ClientConfig, } +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(Clone)] +pub(crate) struct StreamRetentionPolicy { + retention: Retention, +} + +impl Default for StreamRetentionPolicy { + fn default() -> Self { + StreamRetentionPolicy { + retention: Default::default(), + } + } +} + +#[cfg(feature = "python_binding")] +#[pymethods] +impl StreamRetentionPolicy { + #[staticmethod] + pub fn none() -> StreamRetentionPolicy { + StreamRetentionPolicy { + retention: Default::default(), + } + } + + #[staticmethod] + pub fn by_size(size_in_bytes: i64) -> StreamRetentionPolicy { + StreamRetentionPolicy { + retention: Retention { + retention_type: RetentionType::Size, + retention_param: size_in_bytes, + }, + } + } + + #[staticmethod] + pub fn by_time(time_in_millis: i64) -> StreamRetentionPolicy { + StreamRetentionPolicy { + retention: Retention { + retention_type: RetentionType::Size, + retention_param: time_in_millis, + }, + } + } +} + +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(Clone)] +pub(crate) struct StreamScalingPolicy { + scaling: Scaling, +} + +impl Default for StreamScalingPolicy { + fn default() -> Self { + StreamScalingPolicy { + scaling: Default::default(), + } + } +} + +#[cfg(feature = "python_binding")] +#[pymethods] +impl StreamScalingPolicy { + #[staticmethod] + pub fn fixed_scaling_policy(initial_segments: i32) -> StreamScalingPolicy { + StreamScalingPolicy { + scaling: Scaling { + scale_type: ScaleType::FixedNumSegments, + target_rate: 0, + scale_factor: 0, + min_num_segments: initial_segments, + }, + } + } + + #[staticmethod] + pub fn auto_scaling_policy_by_data_rate( + target_rate_kbytes_per_sec: i32, + scale_factor: i32, + initial_segments: i32, + ) -> StreamScalingPolicy { + StreamScalingPolicy { + scaling: Scaling { + scale_type: ScaleType::ByRateInKbytesPerSec, + target_rate: target_rate_kbytes_per_sec, + scale_factor, + min_num_segments: initial_segments, + }, + } + } + + #[staticmethod] + pub fn auto_scaling_policy_by_event_rate( + target_events_per_sec: i32, + scale_factor: i32, + initial_segments: i32, + ) -> StreamScalingPolicy { + StreamScalingPolicy { + scaling: Scaling { + scale_type: ScaleType::ByRateInEventsPerSec, + target_rate: target_events_per_sec, + scale_factor, + min_num_segments: initial_segments, + }, + } + } +} + /// /// Create a StreamManager by providing a controller uri. /// ``` @@ -121,6 +230,47 @@ impl StreamManager { } } + /// + /// Create a Stream in Pravega + /// + #[text_signature = "($self, scope_name, stream_name, scaling_policy, retention_policy, tags)"] + #[args( + scaling_policy = "Default::default()", + retention_policy = "Default::default()", + tags = "None" + )] + pub fn create_stream_with_policy( + &self, + scope_name: &str, + stream_name: &str, + scaling_policy: StreamScalingPolicy, + retention_policy: StreamRetentionPolicy, + tags: Option>, + ) -> PyResult { + let handle = self.cf.runtime(); + info!( + "creating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}", + stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags + ); + let stream_cfg = StreamConfiguration { + scoped_stream: ScopedStream { + scope: Scope::from(scope_name.to_string()), + stream: Stream::from(stream_name.to_string()), + }, + scaling: scaling_policy.scaling, + retention: retention_policy.retention, + tags, + }; + let controller = self.cf.controller_client(); + + let stream_result = handle.block_on(controller.create_stream(&stream_cfg)); + info!("Stream creation status {:?}", stream_result); + match stream_result { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))), + } + } + /// /// Create a Stream in Pravega. /// @@ -130,39 +280,81 @@ impl StreamManager { scope_name: &str, stream_name: &str, initial_segments: i32, + ) -> PyResult { + self.create_stream_with_policy( + scope_name, + stream_name, + StreamScalingPolicy::fixed_scaling_policy(initial_segments), + Default::default(), + None, + ) + } + + /// + /// Update Stream Configuration in Pravega + /// + #[text_signature = "($self, scope_name, stream_name, scaling_policy, retention_policy, tags)"] + #[args( + scaling_policy = "Default::default()", + retention_policy = "Default::default()", + tags = "None" + )] + pub fn update_stream_with_policy( + &self, + scope_name: &str, + stream_name: &str, + scaling_policy: StreamScalingPolicy, + retention_policy: StreamRetentionPolicy, + tags: Option>, ) -> PyResult { let handle = self.cf.runtime(); info!( - "creating stream {:?} under scope {:?} with segment count {:?}", - stream_name, scope_name, initial_segments + "updating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}", + stream_name, scope_name, scaling_policy.scaling, retention_policy.retention, tags ); let stream_cfg = StreamConfiguration { scoped_stream: ScopedStream { scope: Scope::from(scope_name.to_string()), stream: Stream::from(stream_name.to_string()), }, - scaling: Scaling { - scale_type: ScaleType::FixedNumSegments, - target_rate: 0, - scale_factor: 0, - min_num_segments: initial_segments, - }, - retention: Retention { - retention_type: RetentionType::None, - retention_param: 0, - }, - tags: None, + scaling: scaling_policy.scaling, + retention: retention_policy.retention, + tags, }; let controller = self.cf.controller_client(); - let stream_result = handle.block_on(controller.create_stream(&stream_cfg)); - info!("Stream creation status {:?}", stream_result); + let stream_result = handle.block_on(controller.update_stream(&stream_cfg)); + info!("Stream updation status {:?}", stream_result); match stream_result { Ok(t) => Ok(t), Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))), } } + /// + /// Get Stream tags from Pravega + /// + #[text_signature = "($self, scope_name, stream_name, scaling_policy, retention_policy, tags)"] + pub fn get_stream_tags(&self, scope_name: &str, stream_name: &str) -> PyResult>> { + let handle = self.cf.runtime(); + info!( + "fetch tags for stream {:?} under scope {:?}", + stream_name, scope_name, + ); + let stream = ScopedStream { + scope: Scope::from(scope_name.to_string()), + stream: Stream::from(stream_name.to_string()), + }; + let controller = self.cf.controller_client(); + + let stream_configuration = handle.block_on(controller.get_stream_configuration(&stream)); + info!("Stream configuration is {:?}", stream_configuration); + match stream_configuration { + Ok(t) => Ok(t.tags), + Err(e) => Err(exceptions::PyValueError::new_err(format!("{:?}", e))), + } + } + /// /// Create a Stream in Pravega. /// diff --git a/python_binding/tests/pravega_client_test.py b/python_binding/tests/pravega_client_test.py index e64205ee4..4000c91e8 100644 --- a/python_binding/tests/pravega_client_test.py +++ b/python_binding/tests/pravega_client_test.py @@ -13,9 +13,46 @@ import string import pravega_client from pravega_client import TxnFailedException +from pravega_client import StreamScalingPolicy +from pravega_client import StreamRetentionPolicy class PravegaTest(unittest.TestCase): + def test_tags(self): + scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits) + for i in range(10)) + print("Creating a Stream Manager, ensure Pravega is running") + stream_manager=pravega_client.StreamManager("tcp://127.0.0.1:9090", False, False) + + print("Creating a scope") + scope_result=stream_manager.create_scope(scope) + self.assertEqual(True, scope_result, "Scope creation status") + print("Creating a stream") + stream_result=stream_manager.create_stream(scope, "testStream", 1) + self.assertTrue(stream_result, "Stream creation status") + stream_update=stream_manager.update_stream_with_policy(scope_name=scope, stream_name="testStream", tags=["t1"]) + self.assertTrue(stream_update, "Stream update status") + tags = stream_manager.get_stream_tags(scope, "testStream") + self.assertEqual(["t1"], tags) + + # create a stream with stream scaling is enabled with data rate as 10kbps, scaling factor as 2 and initial segments as 1 + policy = StreamScalingPolicy.auto_scaling_policy_by_data_rate(10, 2, 1) + stream_result=stream_manager.create_stream_with_policy(scope_name=scope, stream_name="testStream1", scaling_policy=policy) + self.assertTrue(stream_result, "Stream creation status") + # add tags + stream_update=stream_manager.update_stream_with_policy(scope_name=scope, stream_name="testStream1", scaling_policy=policy, tags=['t1', 't2']) + self.assertTrue(stream_update, "Stream update status") + tags = stream_manager.get_stream_tags(scope, "testStream1") + self.assertEqual(['t1', 't2'], tags) + + # update retention policy + # retention policy of 10GB + retention = StreamRetentionPolicy.by_size(10*1024*1024 * 1024) + stream_update=stream_manager.update_stream_with_policy(scope, "testStream1", policy, retention, tags=["t4", "t5"]) + self.assertTrue(stream_update, "Stream update status") + tags = stream_manager.get_stream_tags(scope, "testStream1") + self.assertEqual(["t4", "t5"], tags) + def test_writeEvent(self): scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for i in range(10)) diff --git a/shared/src/lib.rs b/shared/src/lib.rs index aeac66d97..601e8f24e 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -458,8 +458,8 @@ impl Default for Scaling { Scaling { scale_type: ScaleType::FixedNumSegments, min_num_segments: 1, - scale_factor: 1, - target_rate: 1000, + scale_factor: 0, + target_rate: 0, } } } @@ -497,7 +497,7 @@ impl Default for Retention { fn default() -> Self { Retention { retention_type: RetentionType::None, - retention_param: std::i64::MAX, + retention_param: 0, } } } From 6e0bf0ae4124fa195cfc9516e4f06dd723f23250 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Wed, 30 Jun 2021 18:49:32 +0530 Subject: [PATCH 11/12] fix python test pravega image. Signed-off-by: Sandeep --- .github/workflows/python_test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/python_test.yml b/.github/workflows/python_test.yml index 28a7dbc0a..118db42ef 100644 --- a/.github/workflows/python_test.yml +++ b/.github/workflows/python_test.yml @@ -23,9 +23,9 @@ jobs: java-version: '11' # The JDK version to make available on the path. - name: Download and Run Pravega standalone run: | - wget https://github.com/Tristan1900/pravega/releases/download/ip-fix/pravega-0.10.0-ip-fix.tgz - tar -xzvf pravega-0.10.0-ip-fix.tgz - pravega-0.10.0-2866.172245f-SNAPSHOT/bin/pravega-standalone > pravega.log 2>&1 & + wget https://github.com/shrids/pravega/releases/download/stream-tags/pravega-0.10.0-2906.6a34e64-SNAPSHOT.tgz + tar -xzvf pravega-0.10.0-2906.6a34e64-SNAPSHOT.tgz + pravega-0.10.0-2906.6a34e64-SNAPSHOT/bin/pravega-standalone > pravega.log 2>&1 & sleep 120 && echo "Started standalone" - name: Set up Python uses: actions/setup-python@v2 From ecfc84454d404e404e6fbc59607b355705fd622e Mon Sep 17 00:00:00 2001 From: Sandeep Date: Thu, 1 Jul 2021 10:38:04 +0530 Subject: [PATCH 12/12] CR Changes: Improve test. Signed-off-by: Sandeep --- python_binding/tests/pravega_client_test.py | 4 ++++ shared/src/lib.rs | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/python_binding/tests/pravega_client_test.py b/python_binding/tests/pravega_client_test.py index 4000c91e8..f7a540b3a 100644 --- a/python_binding/tests/pravega_client_test.py +++ b/python_binding/tests/pravega_client_test.py @@ -28,8 +28,12 @@ def test_tags(self): scope_result=stream_manager.create_scope(scope) self.assertEqual(True, scope_result, "Scope creation status") print("Creating a stream") + # stream is created using a fixed scaling policy. stream_result=stream_manager.create_stream(scope, "testStream", 1) self.assertTrue(stream_result, "Stream creation status") + tags = stream_manager.get_stream_tags(scope, "testStream") + # verify empty tags. + self.assertTrue(len(tags)==0) stream_update=stream_manager.update_stream_with_policy(scope_name=scope, stream_name="testStream", tags=["t1"]) self.assertTrue(stream_update, "Stream update status") tags = stream_manager.get_stream_tags(scope, "testStream") diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 601e8f24e..b0f224b9c 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -458,7 +458,7 @@ impl Default for Scaling { Scaling { scale_type: ScaleType::FixedNumSegments, min_num_segments: 1, - scale_factor: 0, + scale_factor: 1, target_rate: 0, } }