Skip to content

Commit

Permalink
Add store offset function to consumer high-level sdk (#1246)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Sep 16, 2024
1 parent 673937a commit c0dc588
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 83 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy"
version = "0.6.19"
version = "0.6.20"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2021"
license = "MIT"
Expand Down
177 changes: 96 additions & 81 deletions sdk/src/clients/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,29 @@ impl IggyConsumer {
&self.topic_id
}

/// Stores the consumer offset on the server either for the current partition or the provided partition ID.
pub async fn store_offset(
&self,
offset: u64,
partition_id: Option<u32>,
) -> Result<(), IggyError> {
let partition_id = if let Some(partition_id) = partition_id {
partition_id
} else {
self.current_partition_id.load(ORDERING)
};
Self::store_consumer_offset(
&self.client,
&self.consumer,
&self.stream_id,
&self.topic_id,
partition_id,
offset,
&self.last_stored_offsets,
)
.await
}

/// Initializes the consumer by subscribing to diagnostic events, initializing the consumer group if needed, storing the offsets in the background etc.
pub async fn init(&mut self) -> Result<(), IggyError> {
if self.initialized {
Expand Down Expand Up @@ -208,111 +231,103 @@ impl IggyConsumer {
let consumer = self.consumer.clone();
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
let last_stored_offset = self.last_stored_offsets.clone();
let last_stored_offsets = self.last_stored_offsets.clone();
let (store_offset_sender, store_offset_receiver) = flume::unbounded();
self.store_offset_sender = store_offset_sender;

tokio::spawn(async move {
while let Ok((partition_id, offset)) = store_offset_receiver.recv_async().await {
trace!("Received offset to store: {offset}, partition ID: {partition_id}");
let stored_offset;
if let Some(offset_entry) = last_stored_offset.get(&partition_id) {
stored_offset = offset_entry.value().load(ORDERING);
} else {
stored_offset = 0;
last_stored_offset.insert(partition_id, AtomicU64::new(0));
}

if offset <= stored_offset && offset >= 1 {
trace!("Offset: {offset} is less than or equal to the last stored offset: {stored_offset}. Skipping storing the offset.");
continue;
}

let client = client.read().await;
if let Err(error) = client
.store_consumer_offset(
&consumer,
&stream_id,
&topic_id,
Some(partition_id),
offset,
)
.await
{
error!("Failed to store offset: {offset}, error: {error}");
continue;
}
trace!("Stored offset: {offset}");
if let Some(last_offset_entry) = last_stored_offset.get(&partition_id) {
last_offset_entry.value().store(offset, ORDERING);
} else {
last_stored_offset.insert(partition_id, AtomicU64::new(offset));
}
trace!("Received offset to store: {offset}, partition ID: {partition_id}, stream: {stream_id}, topic: {topic_id}");
_ = Self::store_consumer_offset(
&client,
&consumer,
&stream_id,
&topic_id,
partition_id,
offset,
&last_stored_offsets,
)
}
});

self.initialized = true;
Ok(())
}

fn store_offset(&self, partition_id: u32, offset: u64) {
if let Err(error) = self.store_offset_sender.send((partition_id, offset)) {
error!("Failed to send offset to store: {error}");
async fn store_consumer_offset(
client: &IggySharedMut<Box<dyn Client>>,
consumer: &Consumer,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: u32,
offset: u64,
last_stored_offsets: &DashMap<u32, AtomicU64>,
) -> Result<(), IggyError> {
trace!("Storing offset: {offset} for consumer: {consumer}, partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}...");
let stored_offset;
if let Some(offset_entry) = last_stored_offsets.get(&partition_id) {
stored_offset = offset_entry.load(ORDERING);
} else {
stored_offset = 0;
last_stored_offsets.insert(partition_id, AtomicU64::new(0));
}

if offset <= stored_offset && offset >= 1 {
trace!("Offset: {offset} is less than or equal to the last stored offset: {stored_offset} for consumer: {consumer}, partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}. Skipping storing the offset.");
return Ok(());
}

let client = client.read().await;
if let Err(error) = client
.store_consumer_offset(consumer, stream_id, topic_id, Some(partition_id), offset)
.await
{
error!("Failed to store offset: {offset} for consumer: {consumer}, partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}. {error}");
return Err(error);
}
trace!("Stored offset: {offset} for consumer: {consumer}, partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}.");
if let Some(last_offset_entry) = last_stored_offsets.get(&partition_id) {
last_offset_entry.store(offset, ORDERING);
} else {
last_stored_offsets.insert(partition_id, AtomicU64::new(offset));
}
Ok(())
}

fn store_offsets_in_background(&self, interval: IggyDuration) {
let client = self.client.clone();
let consumer = self.consumer.clone();
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
let last_consumed_offset = self.last_consumed_offsets.clone();
let last_stored_offset = self.last_stored_offsets.clone();
let last_consumed_offsets = self.last_consumed_offsets.clone();
let last_stored_offsets = self.last_stored_offsets.clone();
tokio::spawn(async move {
loop {
sleep(interval.get_duration()).await;
for entry in last_consumed_offset.iter() {
for entry in last_consumed_offsets.iter() {
let partition_id = *entry.key();
let consumed_offset = entry.value().load(ORDERING);
let stored_offset = last_stored_offset
.get(&partition_id)
.map_or(0, |offset| offset.load(ORDERING));
trace!(
"Trying to store the offset: {consumed_offset}, last stored offset: {stored_offset} for partition ID: {partition_id}"
);

if consumed_offset <= stored_offset && consumed_offset >= 1 {
trace!("Offset: {consumed_offset} is less than or equal to the last stored offset: {stored_offset} for partition ID: {partition_id}. Skipping storing the offset in the background.");
continue;
}

let client = client.read().await;
if let Err(error) = client
.store_consumer_offset(
&consumer,
&stream_id,
&topic_id,
Some(partition_id),
consumed_offset,
)
.await
{
error!(
"Failed to store offset: {consumed_offset} for partition ID: {partition_id} in the background, error: {error}"
);
continue;
}
trace!("Stored offset: {consumed_offset} for partition ID: {partition_id} in the background.");
if let Some(stored_offset) = last_stored_offset.get(&partition_id) {
stored_offset.store(consumed_offset, ORDERING);
} else {
last_stored_offset.insert(partition_id, AtomicU64::new(consumed_offset));
}
let consumed_offset = entry.load(ORDERING);
_ = Self::store_consumer_offset(
&client,
&consumer,
&stream_id,
&topic_id,
partition_id,
consumed_offset,
&last_stored_offsets,
)
.await;
}
}
});
}

fn send_store_offset(&self, partition_id: u32, offset: u64) {
if let Err(error) = self.store_offset_sender.send((partition_id, offset)) {
error!("Failed to send offset to store: {error}");
}
}

async fn init_consumer_group(&self) -> Result<(), IggyError> {
if !self.is_consumer_group {
return Ok(());
Expand Down Expand Up @@ -486,7 +501,7 @@ impl IggyConsumer {
let has_consumed_offset;
if let Some(offset_entry) = last_consumed_offset.get(&partition_id) {
has_consumed_offset = true;
consumed_offset = offset_entry.value().load(ORDERING);
consumed_offset = offset_entry.load(ORDERING);
} else {
consumed_offset = 0;
has_consumed_offset = false;
Expand All @@ -507,7 +522,7 @@ impl IggyConsumer {
stored_offset_entry.store(consumed_offset, ORDERING);
stored_offset = consumed_offset;
} else {
stored_offset = stored_offset_entry.value().load(ORDERING);
stored_offset = stored_offset_entry.load(ORDERING);
}
} else {
if auto_commit_after_polling {
Expand Down Expand Up @@ -677,7 +692,7 @@ impl Stream for IggyConsumer {
&& message.offset % self.store_after_every_nth_message == 0)
|| self.store_offset_after_each_message
{
self.store_offset(partition_id, message.offset);
self.send_store_offset(partition_id, message.offset);
}
}

Expand All @@ -687,7 +702,7 @@ impl Stream for IggyConsumer {
}

if self.store_offset_after_all_messages {
self.store_offset(partition_id, message.offset);
self.send_store_offset(partition_id, message.offset);
}
}

Expand Down Expand Up @@ -766,7 +781,7 @@ impl Stream for IggyConsumer {
|| (self.store_offset_after_all_messages
&& self.buffered_messages.is_empty())
{
self.store_offset(polled_messages.partition_id, message.offset);
self.send_store_offset(polled_messages.partition_id, message.offset);
}

self.poll_future = None;
Expand Down

0 comments on commit c0dc588

Please sign in to comment.