Skip to content

Commit

Permalink
Add purge stream feature (#444)
Browse files Browse the repository at this point in the history
Close #441
  • Loading branch information
spetz authored Jan 3, 2024
1 parent 10b2b91 commit d53f726
Show file tree
Hide file tree
Showing 26 changed files with 367 additions and 37 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ Send another message 'lorem ipsum' (ID 2) to the same stream, topic and partitio

Poll messages by a regular consumer `c` (`g` for consumer group) with ID 0 from the stream `dev` (ID 1) for topic `sample` (ID 1) and partition with ID 1, starting with offset (`o`) 0, messages count 2, without auto commit (`n`) (storing consumer offset on server) and using string format `s` to render messages payload:

`message.poll|c|0|1|1|1|o|0|2|n|s`
`message.poll|c|1|1|1|1|o|0|2|n|s`

Finally, restart the server to see it is able to load the persisted data.

Expand Down
1 change: 1 addition & 0 deletions cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub async fn handle(input: &str, client: &IggyClient) -> Result<(), ClientError>
Command::CreateStream(payload) => streams::create_stream(&payload, client).await,
Command::DeleteStream(payload) => streams::delete_stream(&payload, client).await,
Command::UpdateStream(payload) => streams::update_stream(&payload, client).await,
Command::PurgeStream(payload) => streams::purge_stream(&payload, client).await,
Command::GetTopic(payload) => topics::get_topic(&payload, client).await,
Command::GetTopics(payload) => topics::get_topics(&payload, client).await,
Command::CreateTopic(payload) => topics::create_topic(&payload, client).await,
Expand Down
6 changes: 6 additions & 0 deletions cli/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use iggy::streams::create_stream::CreateStream;
use iggy::streams::delete_stream::DeleteStream;
use iggy::streams::get_stream::GetStream;
use iggy::streams::get_streams::GetStreams;
use iggy::streams::purge_stream::PurgeStream;
use iggy::streams::update_stream::UpdateStream;
use tracing::info;

Expand Down Expand Up @@ -38,3 +39,8 @@ pub async fn update_stream(command: &UpdateStream, client: &dyn Client) -> Resul
client.update_stream(command).await?;
Ok(())
}

pub async fn purge_stream(command: &PurgeStream, client: &dyn Client) -> Result<(), ClientError> {
client.purge_stream(command).await?;
Ok(())
}
2 changes: 1 addition & 1 deletion iggy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.1.3"
version = "0.1.4"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
12 changes: 11 additions & 1 deletion iggy/src/binary/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ use crate::binary::binary_client::BinaryClient;
use crate::binary::{fail_if_not_authenticated, mapper};
use crate::bytes_serializable::BytesSerializable;
use crate::command::{
CREATE_STREAM_CODE, DELETE_STREAM_CODE, GET_STREAMS_CODE, GET_STREAM_CODE, UPDATE_STREAM_CODE,
CREATE_STREAM_CODE, DELETE_STREAM_CODE, GET_STREAMS_CODE, GET_STREAM_CODE, PURGE_STREAM_CODE,
UPDATE_STREAM_CODE,
};
use crate::error::Error;
use crate::models::stream::{Stream, StreamDetails};
use crate::streams::create_stream::CreateStream;
use crate::streams::delete_stream::DeleteStream;
use crate::streams::get_stream::GetStream;
use crate::streams::get_streams::GetStreams;
use crate::streams::purge_stream::PurgeStream;
use crate::streams::update_stream::UpdateStream;

pub async fn get_stream(
Expand Down Expand Up @@ -57,3 +59,11 @@ pub async fn update_stream(client: &dyn BinaryClient, command: &UpdateStream) ->
.await?;
Ok(())
}

pub async fn purge_stream(client: &dyn BinaryClient, command: &PurgeStream) -> Result<(), Error> {
fail_if_not_authenticated(client).await?;
client
.send_with_response(PURGE_STREAM_CODE, &command.as_bytes())
.await?;
Ok(())
}
5 changes: 5 additions & 0 deletions iggy/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::streams::create_stream::CreateStream;
use crate::streams::delete_stream::DeleteStream;
use crate::streams::get_stream::GetStream;
use crate::streams::get_streams::GetStreams;
use crate::streams::purge_stream::PurgeStream;
use crate::streams::update_stream::UpdateStream;
use crate::system::get_client::GetClient;
use crate::system::get_clients::GetClients;
Expand Down Expand Up @@ -187,6 +188,10 @@ pub trait StreamClient {
///
/// Authentication is required, and the permission to manage the streams.
async fn delete_stream(&self, command: &DeleteStream) -> Result<(), Error>;
/// Purge a stream by unique ID or name.
///
/// Authentication is required, and the permission to manage the streams.
async fn purge_stream(&self, command: &PurgeStream) -> Result<(), Error>;
}

/// This trait defines the methods to interact with the topic module.
Expand Down
5 changes: 5 additions & 0 deletions iggy/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::streams::create_stream::CreateStream;
use crate::streams::delete_stream::DeleteStream;
use crate::streams::get_stream::GetStream;
use crate::streams::get_streams::GetStreams;
use crate::streams::purge_stream::PurgeStream;
use crate::streams::update_stream::UpdateStream;
use crate::system::get_client::GetClient;
use crate::system::get_clients::GetClients;
Expand Down Expand Up @@ -632,6 +633,10 @@ impl StreamClient for IggyClient {
async fn delete_stream(&self, command: &DeleteStream) -> Result<(), Error> {
self.client.read().await.delete_stream(command).await
}

async fn purge_stream(&self, command: &PurgeStream) -> Result<(), Error> {
self.client.read().await.purge_stream(command).await
}
}

#[async_trait]
Expand Down
18 changes: 18 additions & 0 deletions iggy/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::streams::create_stream::CreateStream;
use crate::streams::delete_stream::DeleteStream;
use crate::streams::get_stream::GetStream;
use crate::streams::get_streams::GetStreams;
use crate::streams::purge_stream::PurgeStream;
use crate::streams::update_stream::UpdateStream;
use crate::system::get_client::GetClient;
use crate::system::get_clients::GetClients;
Expand Down Expand Up @@ -99,6 +100,8 @@ pub const DELETE_STREAM: &str = "stream.delete";
pub const DELETE_STREAM_CODE: u32 = 203;
pub const UPDATE_STREAM: &str = "stream.update";
pub const UPDATE_STREAM_CODE: u32 = 204;
pub const PURGE_STREAM: &str = "stream.purge";
pub const PURGE_STREAM_CODE: u32 = 205;
pub const GET_TOPIC: &str = "topic.get";
pub const GET_TOPIC_CODE: u32 = 300;
pub const GET_TOPICS: &str = "topic.list";
Expand Down Expand Up @@ -157,6 +160,7 @@ pub enum Command {
CreateStream(CreateStream),
DeleteStream(DeleteStream),
UpdateStream(UpdateStream),
PurgeStream(PurgeStream),
GetTopic(GetTopic),
GetTopics(GetTopics),
CreateTopic(CreateTopic),
Expand Down Expand Up @@ -220,6 +224,7 @@ impl BytesSerializable for Command {
Command::CreateStream(payload) => as_bytes(CREATE_STREAM_CODE, &payload.as_bytes()),
Command::DeleteStream(payload) => as_bytes(DELETE_STREAM_CODE, &payload.as_bytes()),
Command::UpdateStream(payload) => as_bytes(UPDATE_STREAM_CODE, &payload.as_bytes()),
Command::PurgeStream(payload) => as_bytes(PURGE_STREAM_CODE, &payload.as_bytes()),
Command::GetTopic(payload) => as_bytes(GET_TOPIC_CODE, &payload.as_bytes()),
Command::GetTopics(payload) => as_bytes(GET_TOPICS_CODE, &payload.as_bytes()),
Command::CreateTopic(payload) => as_bytes(CREATE_TOPIC_CODE, &payload.as_bytes()),
Expand Down Expand Up @@ -300,6 +305,7 @@ impl BytesSerializable for Command {
CREATE_STREAM_CODE => Ok(Command::CreateStream(CreateStream::from_bytes(payload)?)),
DELETE_STREAM_CODE => Ok(Command::DeleteStream(DeleteStream::from_bytes(payload)?)),
UPDATE_STREAM_CODE => Ok(Command::UpdateStream(UpdateStream::from_bytes(payload)?)),
PURGE_STREAM_CODE => Ok(Command::PurgeStream(PurgeStream::from_bytes(payload)?)),
GET_TOPIC_CODE => Ok(Command::GetTopic(GetTopic::from_bytes(payload)?)),
GET_TOPICS_CODE => Ok(Command::GetTopics(GetTopics::from_bytes(payload)?)),
CREATE_TOPIC_CODE => Ok(Command::CreateTopic(CreateTopic::from_bytes(payload)?)),
Expand Down Expand Up @@ -388,6 +394,7 @@ impl FromStr for Command {
CREATE_STREAM => Ok(Command::CreateStream(CreateStream::from_str(payload)?)),
DELETE_STREAM => Ok(Command::DeleteStream(DeleteStream::from_str(payload)?)),
UPDATE_STREAM => Ok(Command::UpdateStream(UpdateStream::from_str(payload)?)),
PURGE_STREAM => Ok(Command::PurgeStream(PurgeStream::from_str(payload)?)),
GET_TOPIC => Ok(Command::GetTopic(GetTopic::from_str(payload)?)),
GET_TOPICS => Ok(Command::GetTopics(GetTopics::from_str(payload)?)),
CREATE_TOPIC => Ok(Command::CreateTopic(CreateTopic::from_str(payload)?)),
Expand Down Expand Up @@ -461,6 +468,7 @@ impl Display for Command {
Command::CreateStream(payload) => write!(formatter, "{CREATE_STREAM}|{payload}"),
Command::DeleteStream(payload) => write!(formatter, "{DELETE_STREAM}|{payload}"),
Command::UpdateStream(payload) => write!(formatter, "{UPDATE_STREAM}|{payload}"),
Command::PurgeStream(payload) => write!(formatter, "{PURGE_STREAM}|{payload}"),
Command::GetTopic(payload) => write!(formatter, "{GET_TOPIC}|{payload}"),
Command::GetTopics(payload) => write!(formatter, "{GET_TOPICS}|{payload}"),
Command::CreateTopic(payload) => write!(formatter, "{CREATE_TOPIC}|{payload}"),
Expand Down Expand Up @@ -644,6 +652,11 @@ mod tests {
UPDATE_STREAM_CODE,
&UpdateStream::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&Command::PurgeStream(PurgeStream::default()),
PURGE_STREAM_CODE,
&PurgeStream::default(),
);
assert_serialized_as_bytes_and_deserialized_from_bytes(
&Command::GetTopic(GetTopic::default()),
GET_TOPIC_CODE,
Expand Down Expand Up @@ -845,6 +858,11 @@ mod tests {
UPDATE_STREAM,
&UpdateStream::default(),
);
assert_read_from_string(
&Command::PurgeStream(PurgeStream::default()),
PURGE_STREAM,
&PurgeStream::default(),
);
assert_read_from_string(
&Command::GetTopic(GetTopic::default()),
GET_TOPIC,
Expand Down
7 changes: 7 additions & 0 deletions iggy/src/http/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::streams::create_stream::CreateStream;
use crate::streams::delete_stream::DeleteStream;
use crate::streams::get_stream::GetStream;
use crate::streams::get_streams::GetStreams;
use crate::streams::purge_stream::PurgeStream;
use crate::streams::update_stream::UpdateStream;
use async_trait::async_trait;

Expand Down Expand Up @@ -43,6 +44,12 @@ impl StreamClient for HttpClient {
self.delete(&path).await?;
Ok(())
}

async fn purge_stream(&self, command: &PurgeStream) -> Result<(), Error> {
let path = format!("{}/{}/purge", PATH, command.stream_id.as_string());
self.delete(&path).await?;
Ok(())
}
}

fn get_details_path(stream_id: &str) -> String {
Expand Down
5 changes: 5 additions & 0 deletions iggy/src/quic/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::streams::create_stream::CreateStream;
use crate::streams::delete_stream::DeleteStream;
use crate::streams::get_stream::GetStream;
use crate::streams::get_streams::GetStreams;
use crate::streams::purge_stream::PurgeStream;
use crate::streams::update_stream::UpdateStream;
use async_trait::async_trait;

Expand All @@ -31,4 +32,8 @@ impl StreamClient for QuicClient {
async fn delete_stream(&self, command: &DeleteStream) -> Result<(), Error> {
binary::streams::delete_stream(self, command).await
}

async fn purge_stream(&self, command: &PurgeStream) -> Result<(), Error> {
binary::streams::purge_stream(self, command).await
}
}
1 change: 1 addition & 0 deletions iggy/src/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod create_stream;
pub mod delete_stream;
pub mod get_stream;
pub mod get_streams;
pub mod purge_stream;
pub mod update_stream;

const MAX_NAME_LENGTH: usize = 255;
107 changes: 107 additions & 0 deletions iggy/src/streams/purge_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use crate::bytes_serializable::BytesSerializable;
use crate::command::CommandPayload;
use crate::error::Error;
use crate::identifier::Identifier;
use crate::validatable::Validatable;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::str::FromStr;

/// `PurgeStream` command is used to purge stream data (all the messages from its topics).
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
pub struct PurgeStream {
/// Unique stream ID (numeric or name).
#[serde(skip)]
pub stream_id: Identifier,
}

impl CommandPayload for PurgeStream {}

impl Validatable<Error> for PurgeStream {
fn validate(&self) -> Result<(), Error> {
Ok(())
}
}

impl FromStr for PurgeStream {
type Err = Error;
fn from_str(input: &str) -> Result<Self, Self::Err> {
let parts = input.split('|').collect::<Vec<&str>>();
if parts.len() != 1 {
return Err(Error::InvalidCommand);
}

let stream_id = parts[0].parse::<Identifier>()?;
let command = PurgeStream { stream_id };
command.validate()?;
Ok(command)
}
}

impl BytesSerializable for PurgeStream {
fn as_bytes(&self) -> Vec<u8> {
let stream_id_bytes = self.stream_id.as_bytes();
let mut bytes = Vec::with_capacity(stream_id_bytes.len());
bytes.extend(stream_id_bytes);
bytes
}

fn from_bytes(bytes: &[u8]) -> Result<PurgeStream, Error> {
if bytes.len() < 5 {
return Err(Error::InvalidCommand);
}

let stream_id = Identifier::from_bytes(bytes)?;
let command = PurgeStream { stream_id };
command.validate()?;
Ok(command)
}
}

impl Display for PurgeStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.stream_id)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn should_be_serialized_as_bytes() {
let command = PurgeStream {
stream_id: Identifier::numeric(1).unwrap(),
};

let bytes = command.as_bytes();
let stream_id = Identifier::from_bytes(&bytes).unwrap();

assert!(!bytes.is_empty());
assert_eq!(stream_id, command.stream_id);
}

#[test]
fn should_be_deserialized_from_bytes() {
let stream_id = Identifier::numeric(1).unwrap();
let bytes = stream_id.as_bytes();
let command = PurgeStream::from_bytes(&bytes);
assert!(command.is_ok());

let command = command.unwrap();
assert_eq!(command.stream_id, stream_id);
}

#[test]
fn should_be_read_from_string() {
let stream_id = Identifier::numeric(1).unwrap();
let input = format!("{stream_id}");
let command = PurgeStream::from_str(&input);
assert!(command.is_ok());

let command = command.unwrap();
assert_eq!(command.stream_id, stream_id);
}
}
5 changes: 5 additions & 0 deletions iggy/src/tcp/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::streams::create_stream::CreateStream;
use crate::streams::delete_stream::DeleteStream;
use crate::streams::get_stream::GetStream;
use crate::streams::get_streams::GetStreams;
use crate::streams::purge_stream::PurgeStream;
use crate::streams::update_stream::UpdateStream;
use crate::tcp::client::TcpClient;
use async_trait::async_trait;
Expand All @@ -31,4 +32,8 @@ impl StreamClient for TcpClient {
async fn delete_stream(&self, command: &DeleteStream) -> Result<(), Error> {
binary::streams::delete_stream(self, command).await
}

async fn purge_stream(&self, command: &PurgeStream) -> Result<(), Error> {
binary::streams::purge_stream(self, command).await
}
}
4 changes: 2 additions & 2 deletions iggy/src/topics/purge_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Validatable<Error> for PurgeTopic {

impl FromStr for PurgeTopic {
type Err = Error;
fn from_str(input: &str) -> std::result::Result<Self, Self::Err> {
fn from_str(input: &str) -> Result<Self, Self::Err> {
let parts = input.split('|').collect::<Vec<&str>>();
if parts.len() != 2 {
return Err(Error::InvalidCommand);
Expand All @@ -58,7 +58,7 @@ impl BytesSerializable for PurgeTopic {
bytes
}

fn from_bytes(bytes: &[u8]) -> std::result::Result<PurgeTopic, Error> {
fn from_bytes(bytes: &[u8]) -> Result<PurgeTopic, Error> {
if bytes.len() < 10 {
return Err(Error::InvalidCommand);
}
Expand Down
Loading

0 comments on commit d53f726

Please sign in to comment.