diff --git a/Cargo.lock b/Cargo.lock index 079e8df09..f79f2cb92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1997,7 +1997,7 @@ dependencies = [ [[package]] name = "iggy" -version = "0.6.19" +version = "0.6.20" dependencies = [ "aes-gcm", "ahash 0.8.11", diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index b4cc0061d..aac194dfd 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iggy" -version = "0.6.19" +version = "0.6.20" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2021" license = "MIT" diff --git a/sdk/src/clients/consumer.rs b/sdk/src/clients/consumer.rs index a7b1a36b8..f59644da3 100644 --- a/sdk/src/clients/consumer.rs +++ b/sdk/src/clients/consumer.rs @@ -169,6 +169,29 @@ impl IggyConsumer { &self.topic_id } + /// Stores the consumer offset on the server either for the current partition or the provided partition ID. + pub async fn store_offset( + &self, + offset: u64, + partition_id: Option, + ) -> Result<(), IggyError> { + let partition_id = if let Some(partition_id) = partition_id { + partition_id + } else { + self.current_partition_id.load(ORDERING) + }; + Self::store_consumer_offset( + &self.client, + &self.consumer, + &self.stream_id, + &self.topic_id, + partition_id, + offset, + &self.last_stored_offsets, + ) + .await + } + /// Initializes the consumer by subscribing to diagnostic events, initializing the consumer group if needed, storing the offsets in the background etc. pub async fn init(&mut self) -> Result<(), IggyError> { if self.initialized { @@ -208,46 +231,22 @@ impl IggyConsumer { let consumer = self.consumer.clone(); let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); - let last_stored_offset = self.last_stored_offsets.clone(); + let last_stored_offsets = self.last_stored_offsets.clone(); let (store_offset_sender, store_offset_receiver) = flume::unbounded(); self.store_offset_sender = store_offset_sender; tokio::spawn(async move { while let Ok((partition_id, offset)) = store_offset_receiver.recv_async().await { - trace!("Received offset to store: {offset}, partition ID: {partition_id}"); - let stored_offset; - if let Some(offset_entry) = last_stored_offset.get(&partition_id) { - stored_offset = offset_entry.value().load(ORDERING); - } else { - stored_offset = 0; - last_stored_offset.insert(partition_id, AtomicU64::new(0)); - } - - if offset <= stored_offset && offset >= 1 { - trace!("Offset: {offset} is less than or equal to the last stored offset: {stored_offset}. Skipping storing the offset."); - continue; - } - - let client = client.read().await; - if let Err(error) = client - .store_consumer_offset( - &consumer, - &stream_id, - &topic_id, - Some(partition_id), - offset, - ) - .await - { - error!("Failed to store offset: {offset}, error: {error}"); - continue; - } - trace!("Stored offset: {offset}"); - if let Some(last_offset_entry) = last_stored_offset.get(&partition_id) { - last_offset_entry.value().store(offset, ORDERING); - } else { - last_stored_offset.insert(partition_id, AtomicU64::new(offset)); - } + trace!("Received offset to store: {offset}, partition ID: {partition_id}, stream: {stream_id}, topic: {topic_id}"); + _ = Self::store_consumer_offset( + &client, + &consumer, + &stream_id, + &topic_id, + partition_id, + offset, + &last_stored_offsets, + ) } }); @@ -255,10 +254,44 @@ impl IggyConsumer { Ok(()) } - fn store_offset(&self, partition_id: u32, offset: u64) { - if let Err(error) = self.store_offset_sender.send((partition_id, offset)) { - error!("Failed to send offset to store: {error}"); + async fn store_consumer_offset( + client: &IggySharedMut>, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: u32, + offset: u64, + last_stored_offsets: &DashMap, + ) -> Result<(), IggyError> { + trace!("Storing offset: {offset} for consumer: {consumer}, partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}..."); + let stored_offset; + if let Some(offset_entry) = last_stored_offsets.get(&partition_id) { + stored_offset = offset_entry.load(ORDERING); + } else { + stored_offset = 0; + last_stored_offsets.insert(partition_id, AtomicU64::new(0)); } + + if offset <= stored_offset && offset >= 1 { + trace!("Offset: {offset} is less than or equal to the last stored offset: {stored_offset} for consumer: {consumer}, partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}. Skipping storing the offset."); + return Ok(()); + } + + let client = client.read().await; + if let Err(error) = client + .store_consumer_offset(consumer, stream_id, topic_id, Some(partition_id), offset) + .await + { + error!("Failed to store offset: {offset} for consumer: {consumer}, partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}. {error}"); + return Err(error); + } + trace!("Stored offset: {offset} for consumer: {consumer}, partition ID: {partition_id}, topic: {topic_id}, stream: {stream_id}."); + if let Some(last_offset_entry) = last_stored_offsets.get(&partition_id) { + last_offset_entry.store(offset, ORDERING); + } else { + last_stored_offsets.insert(partition_id, AtomicU64::new(offset)); + } + Ok(()) } fn store_offsets_in_background(&self, interval: IggyDuration) { @@ -266,53 +299,35 @@ impl IggyConsumer { let consumer = self.consumer.clone(); let stream_id = self.stream_id.clone(); let topic_id = self.topic_id.clone(); - let last_consumed_offset = self.last_consumed_offsets.clone(); - let last_stored_offset = self.last_stored_offsets.clone(); + let last_consumed_offsets = self.last_consumed_offsets.clone(); + let last_stored_offsets = self.last_stored_offsets.clone(); tokio::spawn(async move { loop { sleep(interval.get_duration()).await; - for entry in last_consumed_offset.iter() { + for entry in last_consumed_offsets.iter() { let partition_id = *entry.key(); - let consumed_offset = entry.value().load(ORDERING); - let stored_offset = last_stored_offset - .get(&partition_id) - .map_or(0, |offset| offset.load(ORDERING)); - trace!( - "Trying to store the offset: {consumed_offset}, last stored offset: {stored_offset} for partition ID: {partition_id}" - ); - - if consumed_offset <= stored_offset && consumed_offset >= 1 { - trace!("Offset: {consumed_offset} is less than or equal to the last stored offset: {stored_offset} for partition ID: {partition_id}. Skipping storing the offset in the background."); - continue; - } - - let client = client.read().await; - if let Err(error) = client - .store_consumer_offset( - &consumer, - &stream_id, - &topic_id, - Some(partition_id), - consumed_offset, - ) - .await - { - error!( - "Failed to store offset: {consumed_offset} for partition ID: {partition_id} in the background, error: {error}" - ); - continue; - } - trace!("Stored offset: {consumed_offset} for partition ID: {partition_id} in the background."); - if let Some(stored_offset) = last_stored_offset.get(&partition_id) { - stored_offset.store(consumed_offset, ORDERING); - } else { - last_stored_offset.insert(partition_id, AtomicU64::new(consumed_offset)); - } + let consumed_offset = entry.load(ORDERING); + _ = Self::store_consumer_offset( + &client, + &consumer, + &stream_id, + &topic_id, + partition_id, + consumed_offset, + &last_stored_offsets, + ) + .await; } } }); } + fn send_store_offset(&self, partition_id: u32, offset: u64) { + if let Err(error) = self.store_offset_sender.send((partition_id, offset)) { + error!("Failed to send offset to store: {error}"); + } + } + async fn init_consumer_group(&self) -> Result<(), IggyError> { if !self.is_consumer_group { return Ok(()); @@ -486,7 +501,7 @@ impl IggyConsumer { let has_consumed_offset; if let Some(offset_entry) = last_consumed_offset.get(&partition_id) { has_consumed_offset = true; - consumed_offset = offset_entry.value().load(ORDERING); + consumed_offset = offset_entry.load(ORDERING); } else { consumed_offset = 0; has_consumed_offset = false; @@ -507,7 +522,7 @@ impl IggyConsumer { stored_offset_entry.store(consumed_offset, ORDERING); stored_offset = consumed_offset; } else { - stored_offset = stored_offset_entry.value().load(ORDERING); + stored_offset = stored_offset_entry.load(ORDERING); } } else { if auto_commit_after_polling { @@ -677,7 +692,7 @@ impl Stream for IggyConsumer { && message.offset % self.store_after_every_nth_message == 0) || self.store_offset_after_each_message { - self.store_offset(partition_id, message.offset); + self.send_store_offset(partition_id, message.offset); } } @@ -687,7 +702,7 @@ impl Stream for IggyConsumer { } if self.store_offset_after_all_messages { - self.store_offset(partition_id, message.offset); + self.send_store_offset(partition_id, message.offset); } } @@ -766,7 +781,7 @@ impl Stream for IggyConsumer { || (self.store_offset_after_all_messages && self.buffered_messages.is_empty()) { - self.store_offset(polled_messages.partition_id, message.offset); + self.send_store_offset(polled_messages.partition_id, message.offset); } self.poll_future = None;