Skip to content

Commit

Permalink
[Fix][Connector-V2] Fix kafka batch mode can not read all message (#7135
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Hisoka-X authored Jul 9, 2024
1 parent bb9fd51 commit 1784c01
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 44 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ jobs:

kafka-connector-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-kafka-e2e')
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand All @@ -1068,15 +1068,14 @@ jobs:
distribution: 'temurin'
cache: 'maven'
- name: run kafka connector integration test
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kafka-e2e -am -Pci
env:
MAVEN_OPTS: -Xmx4096m

rocketmq-connector-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true'
if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-rocketmq-e2e')
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand All @@ -1092,7 +1091,6 @@ jobs:
distribution: 'temurin'
cache: 'maven'
- name: run rocket connector integration test
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-rocketmq-e2e -am -Pci
env:
Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/formats/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ source {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_avro_topic"
result_table_name = "kafka_table"
kafka.auto.offset.reset = "earliest"
start_mode = "earliest"
format = avro
format_error_handle_way = skip
schema = {
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/connector-v2/formats/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ source {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_avro_topic"
result_table_name = "kafka_table"
kafka.auto.offset.reset = "earliest"
start_mode = "earliest"
format = avro
format_error_handle_way = skip
schema = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -103,7 +104,7 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
return;
}

while (pendingPartitionsQueue.size() != 0) {
while (!pendingPartitionsQueue.isEmpty()) {
sourceSplits.add(pendingPartitionsQueue.poll());
}
sourceSplits.forEach(
Expand All @@ -120,9 +121,10 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
executorService.submit(thread);
return thread;
}));
List<KafkaSourceSplit> finishedSplits = new CopyOnWriteArrayList<>();
sourceSplits.forEach(
sourceSplit -> {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
TablePath tablePath = sourceSplit.getTablePath();
DeserializationSchema<SeaTunnelRow> deserializationSchema =
tablePathMetadataMap.get(tablePath).getDeserializationSchema();
Expand All @@ -148,9 +150,14 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
for (TopicPartition partition : partitions) {
List<ConsumerRecord<byte[], byte[]>>
recordList = records.records(partition);
if (Boundedness.BOUNDED.equals(
context.getBoundedness())
&& recordList.isEmpty()) {
completableFuture.complete(true);
return;
}
for (ConsumerRecord<byte[], byte[]> record :
recordList) {

try {
if (deserializationSchema
instanceof
Expand Down Expand Up @@ -180,7 +187,8 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
&& record.offset()
>= sourceSplit
.getEndOffset()) {
break;
completableFuture.complete(true);
return;
}
}
long lastOffset = -1;
Expand All @@ -199,18 +207,21 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
completableFuture.complete(null);
completableFuture.complete(false);
});
} catch (InterruptedException e) {
if (completableFuture.get()) {
finishedSplits.add(sourceSplit);
}
} catch (Exception e) {
throw new KafkaConnectorException(
KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e);
}
completableFuture.join();
});

if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
context.signalNoMoreElement();
finishedSplits.forEach(sourceSplits::remove);
if (sourceSplits.isEmpty()) {
context.signalNoMoreElement();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,27 @@ public void testSourceKafkaTextToConsole(TestContainer container)
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

@TestTemplate
public void testSourceKafkaToAssertWithMaxPollRecords1(TestContainer container)
throws IOException, InterruptedException {
TextSerializationSchema serializer =
TextSerializationSchema.builder()
.seaTunnelRowType(SEATUNNEL_ROW_TYPE)
.delimiter(",")
.build();
generateTestData(
row ->
new ProducerRecord<>(
"test_topic_text_max_poll_records_1",
null,
serializer.serialize(row)),
0,
100);
Container.ExecResult execResult =
container.executeJob("/kafka/kafka_source_to_assert_with_max_poll_records_1.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

@TestTemplate
public void testSourceKafkaTextToConsoleAssertCatalogTable(TestContainer container)
throws IOException, InterruptedException {
Expand Down Expand Up @@ -538,29 +559,34 @@ private Properties kafkaByteConsumerConfig() {
}

private void generateTestData(ProducerRecordConverter converter, int start, int end) {
for (int i = start; i < end; i++) {
SeaTunnelRow row =
new SeaTunnelRow(
new Object[] {
Long.valueOf(i),
Collections.singletonMap("key", Short.parseShort("1")),
new Byte[] {Byte.parseByte("1")},
"string",
Boolean.FALSE,
Byte.parseByte("1"),
Short.parseShort("1"),
Integer.parseInt("1"),
Long.parseLong("1"),
Float.parseFloat("1.1"),
Double.parseDouble("1.1"),
BigDecimal.valueOf(11, 1),
"test".getBytes(),
LocalDate.of(2024, 1, 1),
LocalDateTime.of(2024, 1, 1, 12, 59, 23)
});
ProducerRecord<byte[], byte[]> producerRecord = converter.convert(row);
producer.send(producerRecord);
try {
for (int i = start; i < end; i++) {
SeaTunnelRow row =
new SeaTunnelRow(
new Object[] {
Long.valueOf(i),
Collections.singletonMap("key", Short.parseShort("1")),
new Byte[] {Byte.parseByte("1")},
"string",
Boolean.FALSE,
Byte.parseByte("1"),
Short.parseShort("1"),
Integer.parseInt("1"),
Long.parseLong("1"),
Float.parseFloat("1.1"),
Double.parseDouble("1.1"),
BigDecimal.valueOf(11, 1),
"test".getBytes(),
LocalDate.of(2024, 1, 1),
LocalDateTime.of(2024, 1, 1, 12, 59, 23)
});
ProducerRecord<byte[], byte[]> producerRecord = converter.convert(row);
producer.send(producerRecord).get();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
producer.flush();
}

private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ source {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_avro_topic"
result_table_name = "kafka_table"
kafka.auto.offset.reset = "earliest"
start_mode = "earliest"
format = avro
format_error_handle_way = skip
schema = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ source {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_topic_json"
result_table_name = "kafka_table"
kafka.auto.offset.reset = "earliest"
start_mode = "earliest"
format_error_handle_way = skip
schema = {
fields {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"

# You can set spark configuration here
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_topic_text_max_poll_records_1"
result_table_name = "kafka_table"
start_mode = "earliest"
format_error_handle_way = fail
kafka.config = {
max.poll.records = 1
}
schema = {
columns = [
{
name = id
type = bigint
}
{
name = c_map
type = "map<string, smallint>"
}
{
name = c_array
type = "array<tinyint>"
}
{
name = c_string
type = "string"
}
{
name = c_boolean
type = "boolean"
}
{
name = c_tinyint
type = "tinyint"
}
{
name = c_smallint
type = "smallint"
}
{
name = c_int
type = "int"
}
{
name = c_bigint
type = "bigint"
}
{
name = c_float
type = "float"
}
{
name = c_double
type = "double"
}
{
name = c_decimal
type = "decimal(2, 1)"
}
{
name = c_bytes
type = "bytes"
}
{
name = c_date
type = "date"
}
{
name = c_timestamp
type = "timestamp"
}
]
primaryKey = {
name = "primary key"
columnNames = ["id"]
}
constraintKeys = [
{
constraintName = "unique_c_string"
constraintType = UNIQUE_KEY
constraintColumns = [
{
columnName = "c_string"
sortType = ASC
}
]
}
]
}
format = text
field_delimiter = ","
}
}

sink {
console {
source_table_name = "kafka_table"
}
Assert {
source_table_name = "kafka_table"
rules =
{
field_rules = [
{
field_name = id
field_type = bigint
field_value = [
{
rule_type = NOT_NULL
},
{
rule_type = MIN
rule_value = 0
},
{
rule_type = MAX
rule_value = 99
}
]
}
]
row_rules = [
{
rule_type = MIN_ROW
rule_value = 100
}
]
}
}
}
Loading

0 comments on commit 1784c01

Please sign in to comment.