Skip to content

Commit

Permalink
Add table for kafka exactly-once state
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Jun 11, 2024
1 parent 2a6be6c commit 6b8b7c1
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions crates/arroyo-connectors/src/kafka/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;

use arroyo_rpc::grpc::TableConfig;
use arroyo_rpc::grpc::{GlobalKeyedTableConfig, TableConfig, TableEnum};
use arroyo_rpc::{CheckpointEvent, ControlMessage, ControlResp};
use arroyo_types::*;
use std::collections::HashMap;
Expand All @@ -18,6 +18,7 @@ use arroyo_operator::context::ArrowContext;
use arroyo_operator::operator::ArrowOperator;
use arroyo_types::CheckpointBarrier;
use async_trait::async_trait;
use prost::Message;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use std::time::{Duration, SystemTime};

Expand Down Expand Up @@ -157,7 +158,18 @@ impl ArrowOperator for KafkaSinkFunc {

fn tables(&self) -> HashMap<String, TableConfig> {
if self.is_committing() {
todo!("implement committing state")
single_item_hash_map(
"i".to_string(),
TableConfig {
table_type: TableEnum::GlobalKeyValue.into(),
config: GlobalKeyedTableConfig {
table_name: "i".to_string(),
description: "index for transactional ids".to_string(),
uses_two_phase_commit: true,
}
.encode_to_vec(),
},
)
} else {
HashMap::new()
}
Expand Down

0 comments on commit 6b8b7c1

Please sign in to comment.