From d53f72691be2437e3038f9f65109927b3cd6cd27 Mon Sep 17 00:00:00 2001 From: Piotr Gankiewicz Date: Wed, 3 Jan 2024 20:12:35 +0100 Subject: [PATCH] Add purge stream feature (#444) Close #441 --- Cargo.lock | 4 +- README.md | 2 +- cli/src/command.rs | 1 + cli/src/streams.rs | 6 + iggy/Cargo.toml | 2 +- iggy/src/binary/streams.rs | 12 +- iggy/src/client.rs | 5 + iggy/src/clients/client.rs | 5 + iggy/src/command.rs | 18 +++ iggy/src/http/streams.rs | 7 ++ iggy/src/quic/streams.rs | 5 + iggy/src/streams/mod.rs | 1 + iggy/src/streams/purge_stream.rs | 107 ++++++++++++++++++ iggy/src/tcp/streams.rs | 5 + iggy/src/topics/purge_topic.rs | 4 +- .../tests/server/scenarios/system_scenario.rs | 82 +++++++++----- integration/tests/streaming/stream.rs | 63 +++++++++++ server/Cargo.toml | 2 +- server/server.http | 8 ++ server/src/binary/command.rs | 3 + server/src/binary/handlers/streams/mod.rs | 1 + .../handlers/streams/purge_stream_handler.rs | 20 ++++ server/src/http/streams.rs | 19 +++- server/src/streaming/streams/persistence.rs | 7 ++ server/src/streaming/systems/streams.rs | 11 ++ .../users/permissioner_rules/streams.rs | 4 + 26 files changed, 367 insertions(+), 37 deletions(-) create mode 100644 iggy/src/streams/purge_stream.rs create mode 100644 server/src/binary/handlers/streams/purge_stream_handler.rs diff --git a/Cargo.lock b/Cargo.lock index e532964db..85af83f8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1781,7 +1781,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.1.3" +version = "0.1.4" dependencies = [ "aes-gcm", "anyhow", @@ -3373,7 +3373,7 @@ dependencies = [ [[package]] name = "server" -version = "0.1.6" +version = "0.1.7" dependencies = [ "aes-gcm", "anyhow", diff --git a/README.md b/README.md index ed07418c7..85299e37e 100644 --- a/README.md +++ b/README.md @@ -178,7 +178,7 @@ Send another message 'lorem ipsum' (ID 2) to the same stream, topic and partitio Poll messages by a regular consumer `c` (`g` for consumer group) with ID 0 from the stream `dev` (ID 1) for topic `sample` (ID 1) and partition with ID 1, starting with offset (`o`) 0, messages count 2, without auto commit (`n`) (storing consumer offset on server) and using string format `s` to render messages payload: -`message.poll|c|0|1|1|1|o|0|2|n|s` +`message.poll|c|1|1|1|1|o|0|2|n|s` Finally, restart the server to see it is able to load the persisted data. diff --git a/cli/src/command.rs b/cli/src/command.rs index 04a8fa8e5..3b6772597 100644 --- a/cli/src/command.rs +++ b/cli/src/command.rs @@ -63,6 +63,7 @@ pub async fn handle(input: &str, client: &IggyClient) -> Result<(), ClientError> Command::CreateStream(payload) => streams::create_stream(&payload, client).await, Command::DeleteStream(payload) => streams::delete_stream(&payload, client).await, Command::UpdateStream(payload) => streams::update_stream(&payload, client).await, + Command::PurgeStream(payload) => streams::purge_stream(&payload, client).await, Command::GetTopic(payload) => topics::get_topic(&payload, client).await, Command::GetTopics(payload) => topics::get_topics(&payload, client).await, Command::CreateTopic(payload) => topics::create_topic(&payload, client).await, diff --git a/cli/src/streams.rs b/cli/src/streams.rs index 4b3a66eda..33046a4f0 100644 --- a/cli/src/streams.rs +++ b/cli/src/streams.rs @@ -4,6 +4,7 @@ use iggy::streams::create_stream::CreateStream; use iggy::streams::delete_stream::DeleteStream; use iggy::streams::get_stream::GetStream; use iggy::streams::get_streams::GetStreams; +use iggy::streams::purge_stream::PurgeStream; use iggy::streams::update_stream::UpdateStream; use tracing::info; @@ -38,3 +39,8 @@ pub async fn update_stream(command: &UpdateStream, client: &dyn Client) -> Resul client.update_stream(command).await?; Ok(()) } + +pub async fn purge_stream(command: &PurgeStream, client: &dyn Client) -> Result<(), ClientError> { + client.purge_stream(command).await?; + Ok(()) +} diff --git a/iggy/Cargo.toml b/iggy/Cargo.toml index cb767d021..c24b67b78 100644 --- a/iggy/Cargo.toml +++ b/iggy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.1.3" +version = "0.1.4" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "MIT" diff --git a/iggy/src/binary/streams.rs b/iggy/src/binary/streams.rs index 8a96174d7..b34929e52 100644 --- a/iggy/src/binary/streams.rs +++ b/iggy/src/binary/streams.rs @@ -2,7 +2,8 @@ use crate::binary::binary_client::BinaryClient; use crate::binary::{fail_if_not_authenticated, mapper}; use crate::bytes_serializable::BytesSerializable; use crate::command::{ - CREATE_STREAM_CODE, DELETE_STREAM_CODE, GET_STREAMS_CODE, GET_STREAM_CODE, UPDATE_STREAM_CODE, + CREATE_STREAM_CODE, DELETE_STREAM_CODE, GET_STREAMS_CODE, GET_STREAM_CODE, PURGE_STREAM_CODE, + UPDATE_STREAM_CODE, }; use crate::error::Error; use crate::models::stream::{Stream, StreamDetails}; @@ -10,6 +11,7 @@ use crate::streams::create_stream::CreateStream; use crate::streams::delete_stream::DeleteStream; use crate::streams::get_stream::GetStream; use crate::streams::get_streams::GetStreams; +use crate::streams::purge_stream::PurgeStream; use crate::streams::update_stream::UpdateStream; pub async fn get_stream( @@ -57,3 +59,11 @@ pub async fn update_stream(client: &dyn BinaryClient, command: &UpdateStream) -> .await?; Ok(()) } + +pub async fn purge_stream(client: &dyn BinaryClient, command: &PurgeStream) -> Result<(), Error> { + fail_if_not_authenticated(client).await?; + client + .send_with_response(PURGE_STREAM_CODE, &command.as_bytes()) + .await?; + Ok(()) +} diff --git a/iggy/src/client.rs b/iggy/src/client.rs index e561a56cc..7aae4ea18 100644 --- a/iggy/src/client.rs +++ b/iggy/src/client.rs @@ -29,6 +29,7 @@ use crate::streams::create_stream::CreateStream; use crate::streams::delete_stream::DeleteStream; use crate::streams::get_stream::GetStream; use crate::streams::get_streams::GetStreams; +use crate::streams::purge_stream::PurgeStream; use crate::streams::update_stream::UpdateStream; use crate::system::get_client::GetClient; use crate::system::get_clients::GetClients; @@ -187,6 +188,10 @@ pub trait StreamClient { /// /// Authentication is required, and the permission to manage the streams. async fn delete_stream(&self, command: &DeleteStream) -> Result<(), Error>; + /// Purge a stream by unique ID or name. + /// + /// Authentication is required, and the permission to manage the streams. + async fn purge_stream(&self, command: &PurgeStream) -> Result<(), Error>; } /// This trait defines the methods to interact with the topic module. diff --git a/iggy/src/clients/client.rs b/iggy/src/clients/client.rs index 6f44ed2bf..fd75bcf00 100644 --- a/iggy/src/clients/client.rs +++ b/iggy/src/clients/client.rs @@ -37,6 +37,7 @@ use crate::streams::create_stream::CreateStream; use crate::streams::delete_stream::DeleteStream; use crate::streams::get_stream::GetStream; use crate::streams::get_streams::GetStreams; +use crate::streams::purge_stream::PurgeStream; use crate::streams::update_stream::UpdateStream; use crate::system::get_client::GetClient; use crate::system::get_clients::GetClients; @@ -632,6 +633,10 @@ impl StreamClient for IggyClient { async fn delete_stream(&self, command: &DeleteStream) -> Result<(), Error> { self.client.read().await.delete_stream(command).await } + + async fn purge_stream(&self, command: &PurgeStream) -> Result<(), Error> { + self.client.read().await.purge_stream(command).await + } } #[async_trait] diff --git a/iggy/src/command.rs b/iggy/src/command.rs index 6e81504b4..e32e17f48 100644 --- a/iggy/src/command.rs +++ b/iggy/src/command.rs @@ -20,6 +20,7 @@ use crate::streams::create_stream::CreateStream; use crate::streams::delete_stream::DeleteStream; use crate::streams::get_stream::GetStream; use crate::streams::get_streams::GetStreams; +use crate::streams::purge_stream::PurgeStream; use crate::streams::update_stream::UpdateStream; use crate::system::get_client::GetClient; use crate::system::get_clients::GetClients; @@ -99,6 +100,8 @@ pub const DELETE_STREAM: &str = "stream.delete"; pub const DELETE_STREAM_CODE: u32 = 203; pub const UPDATE_STREAM: &str = "stream.update"; pub const UPDATE_STREAM_CODE: u32 = 204; +pub const PURGE_STREAM: &str = "stream.purge"; +pub const PURGE_STREAM_CODE: u32 = 205; pub const GET_TOPIC: &str = "topic.get"; pub const GET_TOPIC_CODE: u32 = 300; pub const GET_TOPICS: &str = "topic.list"; @@ -157,6 +160,7 @@ pub enum Command { CreateStream(CreateStream), DeleteStream(DeleteStream), UpdateStream(UpdateStream), + PurgeStream(PurgeStream), GetTopic(GetTopic), GetTopics(GetTopics), CreateTopic(CreateTopic), @@ -220,6 +224,7 @@ impl BytesSerializable for Command { Command::CreateStream(payload) => as_bytes(CREATE_STREAM_CODE, &payload.as_bytes()), Command::DeleteStream(payload) => as_bytes(DELETE_STREAM_CODE, &payload.as_bytes()), Command::UpdateStream(payload) => as_bytes(UPDATE_STREAM_CODE, &payload.as_bytes()), + Command::PurgeStream(payload) => as_bytes(PURGE_STREAM_CODE, &payload.as_bytes()), Command::GetTopic(payload) => as_bytes(GET_TOPIC_CODE, &payload.as_bytes()), Command::GetTopics(payload) => as_bytes(GET_TOPICS_CODE, &payload.as_bytes()), Command::CreateTopic(payload) => as_bytes(CREATE_TOPIC_CODE, &payload.as_bytes()), @@ -300,6 +305,7 @@ impl BytesSerializable for Command { CREATE_STREAM_CODE => Ok(Command::CreateStream(CreateStream::from_bytes(payload)?)), DELETE_STREAM_CODE => Ok(Command::DeleteStream(DeleteStream::from_bytes(payload)?)), UPDATE_STREAM_CODE => Ok(Command::UpdateStream(UpdateStream::from_bytes(payload)?)), + PURGE_STREAM_CODE => Ok(Command::PurgeStream(PurgeStream::from_bytes(payload)?)), GET_TOPIC_CODE => Ok(Command::GetTopic(GetTopic::from_bytes(payload)?)), GET_TOPICS_CODE => Ok(Command::GetTopics(GetTopics::from_bytes(payload)?)), CREATE_TOPIC_CODE => Ok(Command::CreateTopic(CreateTopic::from_bytes(payload)?)), @@ -388,6 +394,7 @@ impl FromStr for Command { CREATE_STREAM => Ok(Command::CreateStream(CreateStream::from_str(payload)?)), DELETE_STREAM => Ok(Command::DeleteStream(DeleteStream::from_str(payload)?)), UPDATE_STREAM => Ok(Command::UpdateStream(UpdateStream::from_str(payload)?)), + PURGE_STREAM => Ok(Command::PurgeStream(PurgeStream::from_str(payload)?)), GET_TOPIC => Ok(Command::GetTopic(GetTopic::from_str(payload)?)), GET_TOPICS => Ok(Command::GetTopics(GetTopics::from_str(payload)?)), CREATE_TOPIC => Ok(Command::CreateTopic(CreateTopic::from_str(payload)?)), @@ -461,6 +468,7 @@ impl Display for Command { Command::CreateStream(payload) => write!(formatter, "{CREATE_STREAM}|{payload}"), Command::DeleteStream(payload) => write!(formatter, "{DELETE_STREAM}|{payload}"), Command::UpdateStream(payload) => write!(formatter, "{UPDATE_STREAM}|{payload}"), + Command::PurgeStream(payload) => write!(formatter, "{PURGE_STREAM}|{payload}"), Command::GetTopic(payload) => write!(formatter, "{GET_TOPIC}|{payload}"), Command::GetTopics(payload) => write!(formatter, "{GET_TOPICS}|{payload}"), Command::CreateTopic(payload) => write!(formatter, "{CREATE_TOPIC}|{payload}"), @@ -644,6 +652,11 @@ mod tests { UPDATE_STREAM_CODE, &UpdateStream::default(), ); + assert_serialized_as_bytes_and_deserialized_from_bytes( + &Command::PurgeStream(PurgeStream::default()), + PURGE_STREAM_CODE, + &PurgeStream::default(), + ); assert_serialized_as_bytes_and_deserialized_from_bytes( &Command::GetTopic(GetTopic::default()), GET_TOPIC_CODE, @@ -845,6 +858,11 @@ mod tests { UPDATE_STREAM, &UpdateStream::default(), ); + assert_read_from_string( + &Command::PurgeStream(PurgeStream::default()), + PURGE_STREAM, + &PurgeStream::default(), + ); assert_read_from_string( &Command::GetTopic(GetTopic::default()), GET_TOPIC, diff --git a/iggy/src/http/streams.rs b/iggy/src/http/streams.rs index 9b2db90e7..071d58d48 100644 --- a/iggy/src/http/streams.rs +++ b/iggy/src/http/streams.rs @@ -6,6 +6,7 @@ use crate::streams::create_stream::CreateStream; use crate::streams::delete_stream::DeleteStream; use crate::streams::get_stream::GetStream; use crate::streams::get_streams::GetStreams; +use crate::streams::purge_stream::PurgeStream; use crate::streams::update_stream::UpdateStream; use async_trait::async_trait; @@ -43,6 +44,12 @@ impl StreamClient for HttpClient { self.delete(&path).await?; Ok(()) } + + async fn purge_stream(&self, command: &PurgeStream) -> Result<(), Error> { + let path = format!("{}/{}/purge", PATH, command.stream_id.as_string()); + self.delete(&path).await?; + Ok(()) + } } fn get_details_path(stream_id: &str) -> String { diff --git a/iggy/src/quic/streams.rs b/iggy/src/quic/streams.rs index 3f244c470..4b41b052e 100644 --- a/iggy/src/quic/streams.rs +++ b/iggy/src/quic/streams.rs @@ -7,6 +7,7 @@ use crate::streams::create_stream::CreateStream; use crate::streams::delete_stream::DeleteStream; use crate::streams::get_stream::GetStream; use crate::streams::get_streams::GetStreams; +use crate::streams::purge_stream::PurgeStream; use crate::streams::update_stream::UpdateStream; use async_trait::async_trait; @@ -31,4 +32,8 @@ impl StreamClient for QuicClient { async fn delete_stream(&self, command: &DeleteStream) -> Result<(), Error> { binary::streams::delete_stream(self, command).await } + + async fn purge_stream(&self, command: &PurgeStream) -> Result<(), Error> { + binary::streams::purge_stream(self, command).await + } } diff --git a/iggy/src/streams/mod.rs b/iggy/src/streams/mod.rs index cdd385527..766063100 100644 --- a/iggy/src/streams/mod.rs +++ b/iggy/src/streams/mod.rs @@ -2,6 +2,7 @@ pub mod create_stream; pub mod delete_stream; pub mod get_stream; pub mod get_streams; +pub mod purge_stream; pub mod update_stream; const MAX_NAME_LENGTH: usize = 255; diff --git a/iggy/src/streams/purge_stream.rs b/iggy/src/streams/purge_stream.rs new file mode 100644 index 000000000..c5c9dfdb2 --- /dev/null +++ b/iggy/src/streams/purge_stream.rs @@ -0,0 +1,107 @@ +use crate::bytes_serializable::BytesSerializable; +use crate::command::CommandPayload; +use crate::error::Error; +use crate::identifier::Identifier; +use crate::validatable::Validatable; +use serde::{Deserialize, Serialize}; +use std::fmt::Display; +use std::str::FromStr; + +/// `PurgeStream` command is used to purge stream data (all the messages from its topics). +/// It has additional payload: +/// - `stream_id` - unique stream ID (numeric or name). +#[derive(Debug, Serialize, Deserialize, PartialEq, Default)] +pub struct PurgeStream { + /// Unique stream ID (numeric or name). + #[serde(skip)] + pub stream_id: Identifier, +} + +impl CommandPayload for PurgeStream {} + +impl Validatable for PurgeStream { + fn validate(&self) -> Result<(), Error> { + Ok(()) + } +} + +impl FromStr for PurgeStream { + type Err = Error; + fn from_str(input: &str) -> Result { + let parts = input.split('|').collect::>(); + if parts.len() != 1 { + return Err(Error::InvalidCommand); + } + + let stream_id = parts[0].parse::()?; + let command = PurgeStream { stream_id }; + command.validate()?; + Ok(command) + } +} + +impl BytesSerializable for PurgeStream { + fn as_bytes(&self) -> Vec { + let stream_id_bytes = self.stream_id.as_bytes(); + let mut bytes = Vec::with_capacity(stream_id_bytes.len()); + bytes.extend(stream_id_bytes); + bytes + } + + fn from_bytes(bytes: &[u8]) -> Result { + if bytes.len() < 5 { + return Err(Error::InvalidCommand); + } + + let stream_id = Identifier::from_bytes(bytes)?; + let command = PurgeStream { stream_id }; + command.validate()?; + Ok(command) + } +} + +impl Display for PurgeStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.stream_id) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn should_be_serialized_as_bytes() { + let command = PurgeStream { + stream_id: Identifier::numeric(1).unwrap(), + }; + + let bytes = command.as_bytes(); + let stream_id = Identifier::from_bytes(&bytes).unwrap(); + + assert!(!bytes.is_empty()); + assert_eq!(stream_id, command.stream_id); + } + + #[test] + fn should_be_deserialized_from_bytes() { + let stream_id = Identifier::numeric(1).unwrap(); + let bytes = stream_id.as_bytes(); + let command = PurgeStream::from_bytes(&bytes); + assert!(command.is_ok()); + + let command = command.unwrap(); + assert_eq!(command.stream_id, stream_id); + } + + #[test] + fn should_be_read_from_string() { + let stream_id = Identifier::numeric(1).unwrap(); + let input = format!("{stream_id}"); + let command = PurgeStream::from_str(&input); + assert!(command.is_ok()); + + let command = command.unwrap(); + assert_eq!(command.stream_id, stream_id); + } +} diff --git a/iggy/src/tcp/streams.rs b/iggy/src/tcp/streams.rs index 2c94c8e20..4bb5edb2c 100644 --- a/iggy/src/tcp/streams.rs +++ b/iggy/src/tcp/streams.rs @@ -6,6 +6,7 @@ use crate::streams::create_stream::CreateStream; use crate::streams::delete_stream::DeleteStream; use crate::streams::get_stream::GetStream; use crate::streams::get_streams::GetStreams; +use crate::streams::purge_stream::PurgeStream; use crate::streams::update_stream::UpdateStream; use crate::tcp::client::TcpClient; use async_trait::async_trait; @@ -31,4 +32,8 @@ impl StreamClient for TcpClient { async fn delete_stream(&self, command: &DeleteStream) -> Result<(), Error> { binary::streams::delete_stream(self, command).await } + + async fn purge_stream(&self, command: &PurgeStream) -> Result<(), Error> { + binary::streams::purge_stream(self, command).await + } } diff --git a/iggy/src/topics/purge_topic.rs b/iggy/src/topics/purge_topic.rs index 5170e4a9b..88a670a2c 100644 --- a/iggy/src/topics/purge_topic.rs +++ b/iggy/src/topics/purge_topic.rs @@ -31,7 +31,7 @@ impl Validatable for PurgeTopic { impl FromStr for PurgeTopic { type Err = Error; - fn from_str(input: &str) -> std::result::Result { + fn from_str(input: &str) -> Result { let parts = input.split('|').collect::>(); if parts.len() != 2 { return Err(Error::InvalidCommand); @@ -58,7 +58,7 @@ impl BytesSerializable for PurgeTopic { bytes } - fn from_bytes(bytes: &[u8]) -> std::result::Result { + fn from_bytes(bytes: &[u8]) -> Result { if bytes.len() < 10 { return Err(Error::InvalidCommand); } diff --git a/integration/tests/server/scenarios/system_scenario.rs b/integration/tests/server/scenarios/system_scenario.rs index 3242771a8..91deb204b 100644 --- a/integration/tests/server/scenarios/system_scenario.rs +++ b/integration/tests/server/scenarios/system_scenario.rs @@ -23,6 +23,7 @@ use iggy::streams::create_stream::CreateStream; use iggy::streams::delete_stream::DeleteStream; use iggy::streams::get_stream::GetStream; use iggy::streams::get_streams::GetStreams; +use iggy::streams::purge_stream::PurgeStream; use iggy::streams::update_stream::UpdateStream; use iggy::system::get_clients::GetClients; use iggy::system::get_me::GetMe; @@ -215,18 +216,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert!(create_topic_result.is_err()); // 17. Send messages to the specific topic and partition - let mut messages = Vec::new(); - for offset in 0..MESSAGES_COUNT { - let id = (offset + 1) as u128; - let payload = get_message_payload(offset as u64); - messages.push(Message { - id, - length: payload.len() as u32, - payload, - headers: None, - }); - } - + let messages = create_messages(); let mut send_messages = SendMessages { stream_id: Identifier::numeric(STREAM_ID).unwrap(), topic_id: Identifier::numeric(TOPIC_ID).unwrap(), @@ -626,43 +616,64 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(polled_messages.current_offset, 0); assert!(polled_messages.messages.is_empty()); - // 38. Delete the existing topic and ensure it doesn't exist anymore + // 38. Update the existing stream and ensure it's updated + let updated_stream_name = format!("{}-updated", STREAM_NAME); + client - .delete_topic(&DeleteTopic { + .update_stream(&UpdateStream { stream_id: Identifier::numeric(STREAM_ID).unwrap(), - topic_id: Identifier::numeric(TOPIC_ID).unwrap(), + name: updated_stream_name.clone(), }) .await .unwrap(); - let topics = client - .get_topics(&GetTopics { + + let updated_stream = client + .get_stream(&GetStream { stream_id: Identifier::numeric(STREAM_ID).unwrap(), }) .await .unwrap(); - assert!(topics.is_empty()); - // 39. Update the existing stream and ensure it's updated - let updated_stream_name = format!("{}-updated", STREAM_NAME); + assert_eq!(updated_stream.name, updated_stream_name); + + // 39. Purge the existing stream and ensure it has no messages + let messages = create_messages(); + let mut send_messages = SendMessages { + stream_id: Identifier::numeric(STREAM_ID).unwrap(), + topic_id: Identifier::numeric(TOPIC_ID).unwrap(), + partitioning: Partitioning::partition_id(PARTITION_ID), + messages, + }; + client.send_messages(&mut send_messages).await.unwrap(); client - .update_stream(&UpdateStream { + .purge_stream(&PurgeStream { stream_id: Identifier::numeric(STREAM_ID).unwrap(), - name: updated_stream_name.clone(), }) .await .unwrap(); - let updated_stream = client - .get_stream(&GetStream { + let polled_messages = client.poll_messages(&poll_messages).await.unwrap(); + assert_eq!(polled_messages.current_offset, 0); + assert!(polled_messages.messages.is_empty()); + + // 40. Delete the existing topic and ensure it doesn't exist anymore + client + .delete_topic(&DeleteTopic { stream_id: Identifier::numeric(STREAM_ID).unwrap(), + topic_id: Identifier::numeric(TOPIC_ID).unwrap(), }) .await .unwrap(); + let topics = client + .get_topics(&GetTopics { + stream_id: Identifier::numeric(STREAM_ID).unwrap(), + }) + .await + .unwrap(); + assert!(topics.is_empty()); - assert_eq!(updated_stream.name, updated_stream_name); - - // 40. Delete the existing stream and ensure it doesn't exist anymore + // 41. Delete the existing stream and ensure it doesn't exist anymore client .delete_stream(&DeleteStream { stream_id: Identifier::numeric(STREAM_ID).unwrap(), @@ -672,7 +683,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { let streams = client.get_streams(&GetStreams {}).await.unwrap(); assert!(streams.is_empty()); - // 41. Get clients and ensure that there's 0 (HTTP) or 1 (TCP, QUIC) client + // 42. Get clients and ensure that there's 0 (HTTP) or 1 (TCP, QUIC) client let clients = client.get_clients(&GetClients {}).await.unwrap(); assert!(clients.len() <= 1); @@ -687,6 +698,21 @@ fn assert_message(message: &iggy::models::messages::Message, offset: u64) { assert_eq!(message.payload, expected_payload); } +fn create_messages() -> Vec { + let mut messages = Vec::new(); + for offset in 0..MESSAGES_COUNT { + let id = (offset + 1) as u128; + let payload = get_message_payload(offset as u64); + messages.push(Message { + id, + length: payload.len() as u32, + payload, + headers: None, + }); + } + messages +} + fn get_message_payload(offset: u64) -> Bytes { Bytes::from(format!("message {}", offset)) } diff --git a/integration/tests/streaming/stream.rs b/integration/tests/streaming/stream.rs index 86bad7f2e..8973cdf16 100644 --- a/integration/tests/streaming/stream.rs +++ b/integration/tests/streaming/stream.rs @@ -1,4 +1,9 @@ use crate::streaming::common::test_setup::TestSetup; +use crate::streaming::create_messages; +use iggy::identifier::Identifier; +use iggy::messages::poll_messages::PollingStrategy; +use iggy::messages::send_messages::Partitioning; +use server::streaming::polling_consumer::PollingConsumer; use server::streaming::streams::stream::Stream; use tokio::fs; @@ -71,6 +76,64 @@ async fn should_delete_existing_stream_from_disk() { } } +#[tokio::test] +async fn should_purge_existing_stream_on_disk() { + let setup = TestSetup::init().await; + setup.create_streams_directory().await; + let stream_ids = get_stream_ids(); + for stream_id in stream_ids { + let name = format!("test-{}", stream_id); + let mut stream = Stream::create( + stream_id, + &name, + setup.config.clone(), + setup.storage.clone(), + ); + stream.persist().await.unwrap(); + assert_persisted_stream(&stream.path, &setup.config.topic.path).await; + + let topic_id = 1; + stream + .create_topic(topic_id, "test", 1, None) + .await + .unwrap(); + + let messages = create_messages(); + let messages_count = messages.len(); + let topic = stream + .get_topic(&Identifier::numeric(topic_id).unwrap()) + .unwrap(); + topic + .append_messages(&Partitioning::partition_id(1), messages) + .await + .unwrap(); + let loaded_messages = topic + .get_messages( + PollingConsumer::Consumer(1, 1), + 1, + PollingStrategy::offset(0), + 100, + ) + .await + .unwrap(); + + assert_eq!(loaded_messages.messages.len(), messages_count); + + stream.purge().await.unwrap(); + let loaded_messages = topic + .get_messages( + PollingConsumer::Consumer(1, 1), + 1, + PollingStrategy::offset(0), + 100, + ) + .await + .unwrap(); + assert_eq!(loaded_messages.current_offset, 0); + assert!(loaded_messages.messages.is_empty()); + } +} + async fn assert_persisted_stream(stream_path: &str, topics_directory: &str) { let stream_metadata = fs::metadata(stream_path).await.unwrap(); assert!(stream_metadata.is_dir()); diff --git a/server/Cargo.toml b/server/Cargo.toml index 045a03f92..b28ef23bd 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.1.6" +version = "0.1.7" edition = "2021" build = "src/build.rs" diff --git a/server/server.http b/server/server.http index 5123352e9..a0e336dc4 100644 --- a/server/server.http +++ b/server/server.http @@ -210,6 +210,10 @@ Content-Type: application/json DELETE {{url}}/streams/{{stream_id}} Authorization: Bearer {{access_token}} +### +DELETE {{url}}/streams/{{stream_id}}/purge +Authorization: Bearer {{access_token}} + ### GET {{url}}/streams/{{stream_id}}/topics Authorization: Bearer {{access_token}} @@ -244,6 +248,10 @@ Content-Type: application/json DELETE {{url}}/streams/{{stream_id}}/topics/{{topic_id}} Authorization: Bearer {{access_token}} +### +DELETE {{url}}/streams/{{stream_id}}/topics/{{topic_id}}/purge +Authorization: Bearer {{access_token}} + ### POST {{url}}/streams/{{stream_id}}/topics/{{topic_id}}/partitions Authorization: Bearer {{access_token}} diff --git a/server/src/binary/command.rs b/server/src/binary/command.rs index 46b567f77..27b33b19b 100644 --- a/server/src/binary/command.rs +++ b/server/src/binary/command.rs @@ -127,6 +127,9 @@ async fn try_handle( Command::UpdateStream(command) => { update_stream_handler::handle(command, sender, session, system).await } + Command::PurgeStream(command) => { + purge_stream_handler::handle(command, sender, session, system).await + } Command::GetTopic(command) => { get_topic_handler::handle(command, sender, session, system).await } diff --git a/server/src/binary/handlers/streams/mod.rs b/server/src/binary/handlers/streams/mod.rs index dfc0bebbb..ee804d6bf 100644 --- a/server/src/binary/handlers/streams/mod.rs +++ b/server/src/binary/handlers/streams/mod.rs @@ -2,4 +2,5 @@ pub mod create_stream_handler; pub mod delete_stream_handler; pub mod get_stream_handler; pub mod get_streams_handler; +pub mod purge_stream_handler; pub mod update_stream_handler; diff --git a/server/src/binary/handlers/streams/purge_stream_handler.rs b/server/src/binary/handlers/streams/purge_stream_handler.rs new file mode 100644 index 000000000..00aefc371 --- /dev/null +++ b/server/src/binary/handlers/streams/purge_stream_handler.rs @@ -0,0 +1,20 @@ +use crate::binary::sender::Sender; +use crate::streaming::session::Session; +use crate::streaming::systems::system::SharedSystem; +use anyhow::Result; +use iggy::error::Error; +use iggy::streams::purge_stream::PurgeStream; +use tracing::debug; + +pub async fn handle( + command: &PurgeStream, + sender: &mut dyn Sender, + session: &Session, + system: &SharedSystem, +) -> Result<(), Error> { + debug!("session: {session}, command: {command}"); + let system = system.read(); + system.purge_stream(session, &command.stream_id).await?; + sender.send_empty_ok_response().await?; + Ok(()) +} diff --git a/server/src/http/streams.rs b/server/src/http/streams.rs index 32d6c7a52..dee34c465 100644 --- a/server/src/http/streams.rs +++ b/server/src/http/streams.rs @@ -5,7 +5,7 @@ use crate::http::shared::AppState; use crate::streaming::session::Session; use axum::extract::{Path, State}; use axum::http::StatusCode; -use axum::routing::get; +use axum::routing::{delete, get}; use axum::{Extension, Json, Router}; use iggy::identifier::Identifier; use iggy::models::stream::{Stream, StreamDetails}; @@ -21,6 +21,7 @@ pub fn router(state: Arc) -> Router { "/streams/:stream_id", get(get_stream).put(update_stream).delete(delete_stream), ) + .route("/streams/:stream_id/purge", delete(purge_stream)) .with_state(state) } @@ -101,3 +102,19 @@ async fn delete_stream( .await?; Ok(StatusCode::NO_CONTENT) } + +async fn purge_stream( + State(state): State>, + Extension(identity): Extension, + Path(stream_id): Path, +) -> Result { + let stream_id = Identifier::from_str_value(&stream_id)?; + let system = state.system.read(); + system + .purge_stream( + &Session::stateless(identity.user_id, identity.ip_address), + &stream_id, + ) + .await?; + Ok(StatusCode::NO_CONTENT) +} diff --git a/server/src/streaming/streams/persistence.rs b/server/src/streaming/streams/persistence.rs index 1b458fb23..88cf5a63d 100644 --- a/server/src/streaming/streams/persistence.rs +++ b/server/src/streaming/streams/persistence.rs @@ -28,4 +28,11 @@ impl Stream { Ok(()) } + + pub async fn purge(&self) -> Result<(), Error> { + for topic in self.get_topics() { + topic.purge().await?; + } + Ok(()) + } } diff --git a/server/src/streaming/systems/streams.rs b/server/src/streaming/systems/streams.rs index 4ed002bb8..603cfe03f 100644 --- a/server/src/streaming/systems/streams.rs +++ b/server/src/streaming/systems/streams.rs @@ -264,6 +264,17 @@ impl System { .await; Ok(stream_id) } + + pub async fn purge_stream( + &self, + session: &Session, + stream_id: &Identifier, + ) -> Result<(), Error> { + let stream = self.get_stream(stream_id)?; + self.permissioner + .purge_stream(session.user_id, stream.stream_id)?; + stream.purge().await + } } #[cfg(test)] diff --git a/server/src/streaming/users/permissioner_rules/streams.rs b/server/src/streaming/users/permissioner_rules/streams.rs index ea016a73b..189d4a4f3 100644 --- a/server/src/streaming/users/permissioner_rules/streams.rs +++ b/server/src/streaming/users/permissioner_rules/streams.rs @@ -47,6 +47,10 @@ impl Permissioner { self.manage_stream(user_id, stream_id) } + pub fn purge_stream(&self, user_id: u32, stream_id: u32) -> Result<(), Error> { + self.manage_stream(user_id, stream_id) + } + fn manage_stream(&self, user_id: u32, stream_id: u32) -> Result<(), Error> { if let Some(global_permissions) = self.users_permissions.get(&user_id) { if global_permissions.manage_streams {