Skip to content

Commit

Permalink
Don't require message in Consumer::store_offset
Browse files Browse the repository at this point in the history
  • Loading branch information
benesch committed Oct 16, 2021
1 parent a7f3b8e commit 27ce995
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 7 deletions.
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).

Thanks, [@djKooks].

* **Breaking change.** Change `Consumer::store_offset` to accept the topic,
partition, and offset directly ([#89], [#368]). The old API, which took a
`BorrowedMessage`, is still accessible as
`Consumer::store_offset_from_message`.

[#89]: https://github.com/fede1024/rust-rdkafka/issues/89
[#95]: https://github.com/fede1024/rust-rdkafka/issues/95
[#360]: https://github.com/fede1024/rust-rdkafka/issues/360
[#367]: https://github.com/fede1024/rust-rdkafka/issues/367
Expand Down
2 changes: 1 addition & 1 deletion examples/at_least_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ async fn main() {
.expect("Message delivery failed for some topic");
// Now that the message is completely processed, add it's position to the offset
// store. The actual offset will be committed every 5 seconds.
if let Err(e) = consumer.store_offset(&m) {
if let Err(e) = consumer.store_offset_from_message(&m) {
warn!("Error while storing offset: {}", e);
}
}
Expand Down
12 changes: 11 additions & 1 deletion src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,17 @@ where
}
}

fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> {
let topic = self.client.native_topic(topic)?;
let error = unsafe { rdsys::rd_kafka_offset_store(topic.ptr(), partition, offset) };
if error.is_error() {
Err(KafkaError::StoreOffset(error.into()))
} else {
Ok(())
}
}

fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
let error = unsafe {
rdsys::rd_kafka_offset_store(message.topic_ptr(), message.partition(), message.offset())
};
Expand Down
8 changes: 6 additions & 2 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,14 @@ where
/// commit every message with lower offset within the same partition.
fn commit_message(&self, message: &BorrowedMessage<'_>, mode: CommitMode) -> KafkaResult<()>;

/// Stores offset for this message to be used on the next (auto)commit. When
/// Stores offset to be used on the next (auto)commit. When
/// using this `enable.auto.offset.store` should be set to `false` in the
/// config.
fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>;
fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()>;

/// Like [`Consumer::store_offset`], but the offset to store is derived from
/// the provided message.
fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()>;

/// Store offsets to be used on the next (auto)commit. When using this
/// `enable.auto.offset.store` should be set to `false` in the config.
Expand Down
8 changes: 6 additions & 2 deletions src/consumer/stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,12 @@ where
self.base.commit_message(message, mode)
}

fn store_offset(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
self.base.store_offset(message)
fn store_offset(&self, topic: &str, partition: i32, offset: i64) -> KafkaResult<()> {
self.base.store_offset(topic, partition, offset)
}

fn store_offset_from_message(&self, message: &BorrowedMessage<'_>) -> KafkaResult<()> {
self.base.store_offset_from_message(message)
}

fn store_offsets(&self, tpl: &TopicPartitionList) -> KafkaResult<()> {
Expand Down
2 changes: 1 addition & 1 deletion tests/test_high_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ async fn test_consumer_store_offset_commit() {
match message {
Ok(m) => {
if m.partition() == 1 {
consumer.store_offset(&m).unwrap();
consumer.store_offset_from_message(&m).unwrap();
}
}
Err(KafkaError::PartitionEOF(_)) => {}
Expand Down

0 comments on commit 27ce995

Please sign in to comment.