Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix][Connector-V2][kafka] fix kafka sink config exactly-once exception #7857

Merged
merged 22 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,17 @@ public void initTransactions() {

@Override
public void beginTransaction() throws ProducerFencedException {
if (log.isDebugEnabled()) {
log.debug("KafkaInternalProducer.beginTransaction. " + this.transactionalId);
}
super.beginTransaction();
}

@Override
public void commitTransaction() throws ProducerFencedException {
if (log.isDebugEnabled()) {
log.debug("KafkaInternalProducer.commitTransaction." + this.transactionalId);
}
super.commitTransaction();
}

Expand All @@ -69,7 +75,18 @@ public void abortTransaction() throws ProducerFencedException {
}

public void setTransactionalId(String transactionalId) {
if (log.isDebugEnabled()) {
log.debug(
"KafkaInternalProducer.abortTransaction. Target transactionalId="
+ transactionalId);
}
if (!transactionalId.equals(this.transactionalId)) {
if (log.isDebugEnabled()) {
log.debug(
"KafkaInternalProducer.abortTransaction. Current transactionalId={} not match target transactionalId={}",
this.transactionalId,
transactionalId);
}
Object transactionManager = getTransactionManager();
synchronized (transactionManager) {
ReflectionUtils.setField(transactionManager, "transactionalId", transactionalId);
Expand Down Expand Up @@ -97,7 +114,7 @@ public long getProducerId() {
return (long) ReflectionUtils.getField(producerIdAndEpoch, "producerId").get();
}

public void resumeTransaction(long producerId, short epoch) {
public void resumeTransaction(long producerId, short epoch, boolean txnStarted) {

log.info(
"Attempting to resume transaction {} with producerId {} and epoch {}",
Expand Down Expand Up @@ -125,10 +142,15 @@ public void resumeTransaction(long producerId, short epoch) {
transitionTransactionManagerStateTo(transactionManager, "READY");

transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION");
ReflectionUtils.setField(transactionManager, "transactionStarted", true);
ReflectionUtils.setField(transactionManager, "transactionStarted", txnStarted);
}
}

public boolean isTxnStarted() {
Object transactionManager = getTransactionManager();
return (boolean) ReflectionUtils.getField(transactionManager, "transactionStarted").get();
}

private static Object createProducerIdAndEpoch(long producerId, short epoch) {
try {
Field field =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public List<KafkaCommitInfo> commit(List<KafkaCommitInfo> commitInfos) {
for (KafkaCommitInfo commitInfo : commitInfos) {
String transactionId = commitInfo.getTransactionId();
if (log.isDebugEnabled()) {
log.debug("Committing transaction {}", transactionId);
log.debug("Committing transaction {}, commitInfo {}", transactionId, commitInfo);
}
KafkaProducer<?, ?> producer = getProducer(commitInfo);
producer.commitTransaction();
Expand Down Expand Up @@ -87,7 +87,8 @@ public void abort(List<KafkaCommitInfo> commitInfos) {
new KafkaInternalProducer<>(
commitInfo.getKafkaProperties(), commitInfo.getTransactionId());
}
kafkaProducer.resumeTransaction(commitInfo.getProducerId(), commitInfo.getEpoch());
kafkaProducer.resumeTransaction(
commitInfo.getProducerId(), commitInfo.getEpoch(), commitInfo.isTxnStarted());
return kafkaProducer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -45,6 +46,7 @@ public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
private String transactionId;
private final String transactionPrefix;
private final Properties kafkaProperties;
private int recordNumInTransaction = 0;

public KafkaTransactionSender(String transactionPrefix, Properties kafkaProperties) {
this.transactionPrefix = transactionPrefix;
Expand All @@ -54,13 +56,15 @@ public KafkaTransactionSender(String transactionPrefix, Properties kafkaProperti
@Override
public void send(ProducerRecord<K, V> producerRecord) {
kafkaProducer.send(producerRecord);
recordNumInTransaction++;
}

@Override
public void beginTransaction(String transactionId) {
this.transactionId = transactionId;
this.kafkaProducer = getTransactionProducer(kafkaProperties, transactionId);
kafkaProducer.beginTransaction();
recordNumInTransaction = 0;
}

@Override
Expand All @@ -70,7 +74,8 @@ public Optional<KafkaCommitInfo> prepareCommit() {
transactionId,
kafkaProperties,
this.kafkaProducer.getProducerId(),
this.kafkaProducer.getEpoch());
this.kafkaProducer.getEpoch(),
this.kafkaProducer.isTxnStarted());
return Optional.of(kafkaCommitInfo);
}

Expand Down Expand Up @@ -107,6 +112,10 @@ public void abortTransaction(long checkpointId) {

@Override
public List<KafkaSinkState> snapshotState(long checkpointId) {
if (recordNumInTransaction == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it's always equal to 0

// KafkaSinkCommitter does not support emptyTransaction, so we commit here.
kafkaProducer.commitTransaction();
}
return Lists.newArrayList(
new KafkaSinkState(
transactionId, transactionPrefix, checkpointId, kafkaProperties));
Expand All @@ -116,7 +125,9 @@ public List<KafkaSinkState> snapshotState(long checkpointId) {
public void close() {
if (kafkaProducer != null) {
kafkaProducer.flush();
kafkaProducer.close();
// kafkaProducer will abort the transaction if you call close() without a duration arg
// which will cause an exception when Committer commit the transaction later.
kafkaProducer.close(Duration.ZERO);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we move him to KafkaInternalProducer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fcb-xiaobo Is it missing here

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ public class KafkaCommitInfo implements Serializable {
private final Properties kafkaProperties;
private final long producerId;
private final short epoch;
private final boolean txnStarted;
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
Expand Down Expand Up @@ -97,11 +98,14 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;

@Slf4j
public class KafkaIT extends TestSuiteBase implements TestResource {
private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9";
Expand Down Expand Up @@ -752,6 +756,94 @@ public void testKafkaProtobufToAssert(TestContainer container)
});
}

@TestTemplate
@DisabledOnContainer(
type = EngineType.SPARK,
value = {})
public void testKafkaToKafkaExactlyOnceOnStreaming(TestContainer container)
throws InterruptedException {
String producerTopic = "kafka_topic_exactly_once_1";
String consumerTopic = "kafka_topic_exactly_once_2";
String sourceData = "Seatunnel Exactly Once Example";
for (int i = 0; i < 10; i++) {
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(producerTopic, null, sourceData.getBytes());
producer.send(record);
producer.flush();
}
Long endOffset = 0l;
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
consumer.subscribe(Arrays.asList(producerTopic));
Map<TopicPartition, Long> offsets =
consumer.endOffsets(Arrays.asList(new TopicPartition(producerTopic, 0)));
endOffset = offsets.entrySet().iterator().next().getValue();
}
// async execute
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/kafka/kafka_to_kafka_exactly_once_streaming.conf");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});
TimeUnit.MINUTES.sleep(5);
// wait for data written to kafka
Long finalEndOffset = endOffset;
await().atMost(5, TimeUnit.MINUTES)
.pollInterval(5000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
checkData(consumerTopic, finalEndOffset, sourceData)));
}

@TestTemplate
public void testKafkaToKafkaExactlyOnceOnBatch(TestContainer container)
throws InterruptedException, IOException {
String producerTopic = "kafka_topic_exactly_once_1";
String consumerTopic = "kafka_topic_exactly_once_2";
String sourceData = "Seatunnel Exactly Once Example";
for (int i = 0; i < 10; i++) {
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(producerTopic, null, sourceData.getBytes());
producer.send(record);
producer.flush();
}
Long endOffset;
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
consumer.subscribe(Arrays.asList(producerTopic));
Map<TopicPartition, Long> offsets =
consumer.endOffsets(Arrays.asList(new TopicPartition(producerTopic, 0)));
endOffset = offsets.entrySet().iterator().next().getValue();
}
Container.ExecResult execResult =
container.executeJob("/kafka/kafka_to_kafka_exactly_once_batch.conf");
Assertions.assertEquals(0, execResult.getExitCode());
// wait for data written to kafka
Assertions.assertTrue(checkData(consumerTopic, endOffset, sourceData));
}

// Compare the values of data fields obtained from consumers
private boolean checkData(String topicName, long endOffset, String data) {
List<String> listData = getKafkaConsumerListData(topicName, endOffset);
if (listData.isEmpty() || listData.size() != endOffset) {
log.error(
"testKafkaToKafkaExactlyOnce get data size is not expect,get consumer data size {}",
listData.size());
return false;
}
for (String value : listData) {
if (!data.equals(value)) {
log.error("testKafkaToKafkaExactlyOnce get data value is not expect");
return false;
}
}
return true;
}

private @NotNull DefaultSeaTunnelRowSerializer getDefaultSeaTunnelRowSerializer(
String topic, SeaTunnelRowType seaTunnelRowType, ReadonlyConfig readonlyConfig) {
// Create serializer
Expand Down Expand Up @@ -934,6 +1026,10 @@ private Properties kafkaConsumerConfig() {
props.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.toString().toLowerCase());
// exactly once semantics must set config read_commit
props.put(
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
Expand Down Expand Up @@ -1067,6 +1163,24 @@ private List<String> getKafkaConsumerListData(String topicName) {
return data;
}

private List<String> getKafkaConsumerListData(String topicName, long endOffset) {
List<String> data = new ArrayList<>();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
consumer.subscribe(Arrays.asList(topicName));
Long lastProcessedOffset = -1L;
do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (lastProcessedOffset < record.offset()) {
data.add(record.value());
}
lastProcessedOffset = record.offset();
}
} while (lastProcessedOffset < endOffset - 1);
}
return data;
}

private List<SeaTunnelRow> getKafkaSTRow(String topicName, ConsumerRecordConverter converter) {
List<SeaTunnelRow> data = new ArrayList<>();
try (KafkaConsumer<byte[], byte[]> consumer =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
env {
parallelism = 1
job.mode = "BATCH"
}

source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "kafka_topic_exactly_once_1"
# The default format is json, which is optional
format = text
start_mode = earliest
}

}
transform {}


sink{
kafka {
format = text
topic = "kafka_topic_exactly_once_2"
bootstrap.servers = "kafkaCluster:9092"
semantics = EXACTLY_ONCE
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
checkpoint.timeout = 60000
}

source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "kafka_topic_exactly_once_1"
# The default format is json, which is optional
format = text
start_mode = earliest
}

}
transform {}


sink{
kafka {
format = text
topic = "kafka_topic_exactly_once_2"
bootstrap.servers = "kafkaCluster:9092"
semantics = EXACTLY_ONCE
}
}
Loading