Skip to content

Commit

Permalink
[Hotfix][Connector-V2][kafka] code review
Browse files Browse the repository at this point in the history
  • Loading branch information
fcb-xiaobo committed Oct 21, 2024
1 parent 1f1c6a3 commit e63cee9
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public KafkaTransactionSender(String transactionPrefix, Properties kafkaProperti
@Override
public void send(ProducerRecord<K, V> producerRecord) {
kafkaProducer.send(producerRecord);
recordNumInTransaction++;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,11 +770,22 @@ public void testKafkaProtobufToAssert(TestContainer container)
}

@TestTemplate
public void testKafkaExactlyOnce(TestContainer container) throws Exception {
public void testKafkaToKafkaExactlyOnce(TestContainer container) throws Exception {
TextSerializationSchema serializer =
TextSerializationSchema.builder()
.seaTunnelRowType(SEATUNNEL_ROW_TYPE)
.delimiter(",")
.build();
generateTestData(
row ->
new ProducerRecord<>(
"kafka_topic_exactly_once", null, serializer.serialize(row)),
0,
10);
container.executeJob("/kafka/fake_to_kafka_exactly_once.conf");
String topicName = "kafka_topic_exactly_once";
Map<String, String> data = getKafkaConsumerData(topicName);
Assertions.assertEquals(4, data.size());
Assertions.assertEquals(10, data.size());
}

public static String getTestConfigFile(String configFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,20 @@
#
env {
execution.parallelism = 1
job.mode = "BATCH"
job.mode = "STREAMING"
}

source {
FakeSource {
parallelism = 1
result_table_name = "fake"
split.read-interval = 30000
split.num = 2
row.num = 4
schema = {
fields {
name = "string"
age = "int"
}
}
}
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "kafka_topic_exactly_once"
result_table_name = "kafka_topic_exactly_once"
# The default format is json, which is optional
format = json
start_mode = earliest
}

}
transform {}


Expand Down

0 comments on commit e63cee9

Please sign in to comment.