diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 9975d477dae7..b6094aff4d3d 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -1052,7 +1052,7 @@ jobs: kafka-connector-it: needs: [ changes, sanity-check ] - if: needs.changes.outputs.api == 'true' + if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-kafka-e2e') runs-on: ${{ matrix.os }} strategy: matrix: @@ -1068,7 +1068,6 @@ jobs: distribution: 'temurin' cache: 'maven' - name: run kafka connector integration test - if: needs.changes.outputs.api == 'true' run: | ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kafka-e2e -am -Pci env: @@ -1076,7 +1075,7 @@ jobs: rocketmq-connector-it: needs: [ changes, sanity-check ] - if: needs.changes.outputs.api == 'true' + if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-rocketmq-e2e') runs-on: ${{ matrix.os }} strategy: matrix: @@ -1092,7 +1091,6 @@ jobs: distribution: 'temurin' cache: 'maven' - name: run rocket connector integration test - if: needs.changes.outputs.api == 'true' run: | ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-rocketmq-e2e -am -Pci env: diff --git a/docs/en/connector-v2/formats/avro.md b/docs/en/connector-v2/formats/avro.md index 638657b34567..8fef411fb58e 100644 --- a/docs/en/connector-v2/formats/avro.md +++ b/docs/en/connector-v2/formats/avro.md @@ -77,7 +77,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_avro_topic" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format = avro format_error_handle_way = skip schema = { diff --git a/docs/zh/connector-v2/formats/avro.md b/docs/zh/connector-v2/formats/avro.md index 4e19ea4b9823..7176f4e507fb 100644 --- a/docs/zh/connector-v2/formats/avro.md +++ b/docs/zh/connector-v2/formats/avro.md @@ -77,7 +77,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_avro_topic" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format = avro format_error_handle_way = skip schema = { diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java index d136fabc4034..02c2a9007e1b 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java @@ -45,6 +45,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -103,7 +104,7 @@ public void pollNext(Collector output) throws Exception { return; } - while (pendingPartitionsQueue.size() != 0) { + while (!pendingPartitionsQueue.isEmpty()) { sourceSplits.add(pendingPartitionsQueue.poll()); } sourceSplits.forEach( @@ -120,9 +121,10 @@ public void pollNext(Collector output) throws Exception { executorService.submit(thread); return thread; })); + List finishedSplits = new CopyOnWriteArrayList<>(); sourceSplits.forEach( sourceSplit -> { - CompletableFuture completableFuture = new CompletableFuture<>(); + CompletableFuture completableFuture = new CompletableFuture<>(); TablePath tablePath = sourceSplit.getTablePath(); DeserializationSchema deserializationSchema = tablePathMetadataMap.get(tablePath).getDeserializationSchema(); @@ -148,9 +150,14 @@ public void pollNext(Collector output) throws Exception { for (TopicPartition partition : partitions) { List> recordList = records.records(partition); + if (Boundedness.BOUNDED.equals( + context.getBoundedness()) + && recordList.isEmpty()) { + completableFuture.complete(true); + return; + } for (ConsumerRecord record : recordList) { - try { if (deserializationSchema instanceof @@ -180,7 +187,8 @@ public void pollNext(Collector output) throws Exception { && record.offset() >= sourceSplit .getEndOffset()) { - break; + completableFuture.complete(true); + return; } } long lastOffset = -1; @@ -199,18 +207,21 @@ public void pollNext(Collector output) throws Exception { } catch (Exception e) { completableFuture.completeExceptionally(e); } - completableFuture.complete(null); + completableFuture.complete(false); }); - } catch (InterruptedException e) { + if (completableFuture.get()) { + finishedSplits.add(sourceSplit); + } + } catch (Exception e) { throw new KafkaConnectorException( KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e); } - completableFuture.join(); }); - if (Boundedness.BOUNDED.equals(context.getBoundedness())) { - // signal to the source that we have reached the end of the data. - context.signalNoMoreElement(); + finishedSplits.forEach(sourceSplits::remove); + if (sourceSplits.isEmpty()) { + context.signalNoMoreElement(); + } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 2f1c92048e00..d4629851e79b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -212,6 +212,27 @@ public void testSourceKafkaTextToConsole(TestContainer container) Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } + @TestTemplate + public void testSourceKafkaToAssertWithMaxPollRecords1(TestContainer container) + throws IOException, InterruptedException { + TextSerializationSchema serializer = + TextSerializationSchema.builder() + .seaTunnelRowType(SEATUNNEL_ROW_TYPE) + .delimiter(",") + .build(); + generateTestData( + row -> + new ProducerRecord<>( + "test_topic_text_max_poll_records_1", + null, + serializer.serialize(row)), + 0, + 100); + Container.ExecResult execResult = + container.executeJob("/kafka/kafka_source_to_assert_with_max_poll_records_1.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + @TestTemplate public void testSourceKafkaTextToConsoleAssertCatalogTable(TestContainer container) throws IOException, InterruptedException { @@ -538,29 +559,34 @@ private Properties kafkaByteConsumerConfig() { } private void generateTestData(ProducerRecordConverter converter, int start, int end) { - for (int i = start; i < end; i++) { - SeaTunnelRow row = - new SeaTunnelRow( - new Object[] { - Long.valueOf(i), - Collections.singletonMap("key", Short.parseShort("1")), - new Byte[] {Byte.parseByte("1")}, - "string", - Boolean.FALSE, - Byte.parseByte("1"), - Short.parseShort("1"), - Integer.parseInt("1"), - Long.parseLong("1"), - Float.parseFloat("1.1"), - Double.parseDouble("1.1"), - BigDecimal.valueOf(11, 1), - "test".getBytes(), - LocalDate.of(2024, 1, 1), - LocalDateTime.of(2024, 1, 1, 12, 59, 23) - }); - ProducerRecord producerRecord = converter.convert(row); - producer.send(producerRecord); + try { + for (int i = start; i < end; i++) { + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + Long.valueOf(i), + Collections.singletonMap("key", Short.parseShort("1")), + new Byte[] {Byte.parseByte("1")}, + "string", + Boolean.FALSE, + Byte.parseByte("1"), + Short.parseShort("1"), + Integer.parseInt("1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(11, 1), + "test".getBytes(), + LocalDate.of(2024, 1, 1), + LocalDateTime.of(2024, 1, 1, 12, 59, 23) + }); + ProducerRecord producerRecord = converter.convert(row); + producer.send(producerRecord).get(); + } + } catch (Exception e) { + throw new RuntimeException(e); } + producer.flush(); } private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE = diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf index 31fe77a3e24d..755a9a2b8d50 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf @@ -32,7 +32,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_avro_topic" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format = avro format_error_handle_way = skip schema = { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf index f9a41e7987dd..3657390602e5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf @@ -32,7 +32,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_topic_json" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format_error_handle_way = skip schema = { fields { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf new file mode 100644 index 000000000000..787858e229f1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_source_to_assert_with_max_poll_records_1.conf @@ -0,0 +1,160 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9092" + topic = "test_topic_text_max_poll_records_1" + result_table_name = "kafka_table" + start_mode = "earliest" + format_error_handle_way = fail + kafka.config = { + max.poll.records = 1 + } + schema = { + columns = [ + { + name = id + type = bigint + } + { + name = c_map + type = "map" + } + { + name = c_array + type = "array" + } + { + name = c_string + type = "string" + } + { + name = c_boolean + type = "boolean" + } + { + name = c_tinyint + type = "tinyint" + } + { + name = c_smallint + type = "smallint" + } + { + name = c_int + type = "int" + } + { + name = c_bigint + type = "bigint" + } + { + name = c_float + type = "float" + } + { + name = c_double + type = "double" + } + { + name = c_decimal + type = "decimal(2, 1)" + } + { + name = c_bytes + type = "bytes" + } + { + name = c_date + type = "date" + } + { + name = c_timestamp + type = "timestamp" + } + ] + primaryKey = { + name = "primary key" + columnNames = ["id"] + } + constraintKeys = [ + { + constraintName = "unique_c_string" + constraintType = UNIQUE_KEY + constraintColumns = [ + { + columnName = "c_string" + sortType = ASC + } + ] + } + ] + } + format = text + field_delimiter = "," + } +} + +sink { + console { + source_table_name = "kafka_table" + } + Assert { + source_table_name = "kafka_table" + rules = + { + field_rules = [ + { + field_name = id + field_type = bigint + field_value = [ + { + rule_type = NOT_NULL + }, + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf index b6db50989a36..d2a0f05354d6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf @@ -37,7 +37,6 @@ source { result_table_name = "kafka_table" start_mode = "earliest" format_error_handle_way = fail - # kafka.auto.offset.reset = "earliest" schema = { fields { id = bigint diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf index 45b29d19154d..88b6098b5e5f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf @@ -37,7 +37,6 @@ source { result_table_name = "kafka_table" start_mode = "earliest" format_error_handle_way = skip - # kafka.auto.offset.reset = "earliest" schema = { fields { id = bigint diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf index 36f01c0337ce..3ce077bd589c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf @@ -32,7 +32,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_topic_text" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format_error_handle_way = fail schema = { fields { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf index d7f875272b09..132829e32442 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console_assert_catalog_table.conf @@ -32,7 +32,7 @@ source { bootstrap.servers = "kafkaCluster:9092" topic = "test_topic_text" result_table_name = "kafka_table" - kafka.auto.offset.reset = "earliest" + start_mode = "earliest" format_error_handle_way = fail schema = { columns = [