diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
index a76931423..69cf37a7b 100644
--- a/.github/CODEOWNERS
+++ b/.github/CODEOWNERS
@@ -1,4 +1,4 @@
# Each line is a file pattern followed by one or more owners.
# These owners will be the default owners for everything in
# the repo.
-* @confluentinc/connect-team1
+* @confluentinc/connect
diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml
new file mode 100644
index 000000000..f8693723e
--- /dev/null
+++ b/.semaphore/semaphore.yml
@@ -0,0 +1,89 @@
+# This file is managed by ServiceBot plugin - Semaphore. The content in this file is created using a common
+# template and configurations in service.yml.
+# Any modifications made to ths file will be overwritten by the generated content in nightly runs.
+# For more information, please refer to the page:
+# https://confluentinc.atlassian.net/wiki/spaces/Foundations/pages/2871296194/Add+SemaphoreCI
+version: v1.0
+name: build-test-release
+agent:
+ machine:
+ type: s1-prod-ubuntu20-04-amd64-1
+
+fail_fast:
+ cancel:
+ when: "true"
+
+execution_time_limit:
+ hours: 1
+
+queue:
+ - when: "branch != 'master' and branch !~ '[0-9]+\\.[0-9]+\\.x'"
+ processing: parallel
+
+global_job_config:
+ prologue:
+ commands:
+ - checkout
+ - sem-version java 8
+ - . cache-maven restore
+
+blocks:
+ - name: Test
+ dependencies: []
+ run:
+ # don't run the tests on non-functional changes...
+ when: "change_in('/', {exclude: ['/.deployed-versions/', '.github/']})"
+ task:
+ jobs:
+ - name: Test
+ commands:
+ - . sem-pint
+ - mvn -Dcloud -Pjenkins -U -Dmaven.wagon.http.retryHandler.count=10 --batch-mode --no-transfer-progress clean verify install dependency:analyze validate
+ - cve-scan
+ - . cache-maven store
+ epilogue:
+ always:
+ commands:
+ - . publish-test-results
+ - artifact push workflow target/test-results
+ - artifact push workflow target
+
+ - name: Release
+ dependencies: ["Test"]
+ run:
+ when: "branch = 'master' or branch =~ '[0-9]+\\.[0-9]+\\.x'"
+ task:
+ jobs:
+ - name: Release
+ commands:
+ - mvn -Dcloud -Pjenkins -U -Dmaven.wagon.http.retryHandler.count=10 --batch-mode -DaltDeploymentRepository=confluent-codeartifact-internal::default::https://confluent-519856050701.d.codeartifact.us-west-2.amazonaws.com/maven/maven-snapshots/
+ -DrepositoryId=confluent-codeartifact-internal deploy -DskipTests
+ - name: Release Notes
+ dependencies: []
+ run:
+ when: "branch =~ '[0-9]+\\.[0-9]+\\.x'"
+ task:
+ jobs:
+ - name: Generate Release Notes
+ commands:
+ - git clone --branch master --single-branch git@github.com:confluentinc/connect-releases.git
+ - ./connect-releases/tasks/release-connect-plugins/generate-connect-changelogs.sh
+
+after_pipeline:
+ task:
+ agent:
+ machine:
+ type: s1-prod-ubuntu20-04-arm64-0
+ jobs:
+ - name: Metrics
+ commands:
+ - emit-ci-metrics -p -a test-results
+ - name: Publish Test Results
+ commands:
+ - test-results gen-pipeline-report
+ - name: SonarQube
+ commands:
+ - checkout
+ - sem-version java 11
+ - artifact pull workflow target
+ - emit-sonarqube-data --run_only_sonar_scan
diff --git a/.trivyignore b/.trivyignore
new file mode 100644
index 000000000..91319d110
--- /dev/null
+++ b/.trivyignore
@@ -0,0 +1 @@
+# See https://aquasecurity.github.io/trivy/v0.56/docs/configuration/filtering/#trivyignore for guidance on adding exceptions for Trivy scanner
diff --git a/Jenkinsfile b/Jenkinsfile
deleted file mode 100755
index fcd106b90..000000000
--- a/Jenkinsfile
+++ /dev/null
@@ -1,9 +0,0 @@
-#!/usr/bin/env groovy
-common {
- slackChannel = '#connect-warn'
- upstreamProjects = ['confluentinc/schema-registry','confluentinc/common']
- twistlockCveScan = true
- downStreamValidate = false
- nodeLabel = 'docker-debian-jdk8'
- disableConcurrentBuilds = true
-}
diff --git a/pom.xml b/pom.xml
index 0c2f98209..b8e316abe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
io.confluent
kafka-connect-storage-common-parent
- 11.1.4
+ 11.2.20
kafka-connect-hdfs
@@ -55,16 +55,23 @@
2.0.0-M2
1.2.17-cp8
0.11.1
+ 0.65
+ 0.59
+ 0.63
+ 0.67
+ 0.53
+ 0.66
2.5.3
- 11.1.4
+ 11.2.20
3.2.2
3.9.0
- 2.5.1
+ 2.5.2
2.4.10
- 0.13.0
+ 0.14.0
2.17.1
1.5.4
2.0
+ 1.1.10.4
6.5.0
@@ -110,7 +117,7 @@
org.apache.mina
mina-core
- 2.0.23
+ 2.0.27
com.jamesmurty.utils
@@ -122,6 +129,18 @@
snakeyaml
${snakeyaml.version}
+
+
+ org.apache.velocity
+ velocity-engine-core
+ 2.4
+
+
+
+ commons-io
+ commons-io
+ 2.14.0
+
xerces
@@ -134,6 +153,58 @@
slider-core
0.92.0-incubating
+
+
+ org.xerial.snappy
+ snappy-java
+ ${snappy.java.version}
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.7.2
+
+
+
+ com.nimbusds
+ nimbus-jose-jwt
+ 9.37.2
+
+
+
+ com.squareup.okio
+ okio
+ 3.4.0
+
+
+
+ org.apache.hbase
+ hbase-server
+ 1.1.1
+
+
+ commons-httpclient
+ commons-httpclient
+
+
+
+
+
+ io.airlift
+ aircompressor
+ 0.27
+
+
+ org.apache.calcite.avatica
+ avatica-core
+ 1.22.0
+
+
+
+ org.apache.tomcat.embed
+ tomcat-embed-core
+ 8.5.100
+
@@ -209,7 +280,7 @@
org.apache.logging.log4j
log4j-core
-
+
org.apache.ant
ant
@@ -230,12 +301,12 @@
org.apache.calcite
calcite-core
- 1.11.0
+ 1.22.0
org.apache.calcite
calcite-druid
- 1.11.0
+ 1.26.0
commons-collections
@@ -504,6 +575,107 @@
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.11
+
+
+ **/model/**
+ **/rest/**
+
+
+
+
+ prepare-agent
+
+ prepare-agent
+
+
+
+ prepare-agent-it
+
+ prepare-agent-integration
+
+ pre-integration-test
+
+
+ merge-coverage-reports
+ verify
+
+ merge
+
+
+
+
+ ${project.basedir}
+
+ /target/jacoco.exec
+ /target/jacoco-it.exec
+
+
+
+ ${project.basedir}/target/jacoco-aggregate.exec
+
+
+
+ check
+
+ check
+
+
+
+
+ BUNDLE
+
+
+ INSTRUCTION
+ COVEREDRATIO
+ ${instruction.coverage.threshold}
+
+
+ BRANCH
+ COVEREDRATIO
+ ${branch.coverage.threshold}
+
+
+ COMPLEXITY
+ COVEREDRATIO
+ ${complexity.coverage.threshold}
+
+
+ LINE
+ COVEREDRATIO
+ ${line.coverage.threshold}
+
+
+ METHOD
+ COVEREDRATIO
+ ${method.coverage.threshold}
+
+
+ CLASS
+ COVEREDRATIO
+ ${class.coverage.threshold}
+
+
+
+
+ ${project.basedir}/target/jacoco-aggregate.exec
+
+
+
+ generate-code-coverage-report
+ verify
+
+ report
+
+
+ ${project.basedir}/target/jacoco-aggregate.exec
+
+
+
+
org.apache.maven.plugins
maven-release-plugin
diff --git a/service.yml b/service.yml
new file mode 100644
index 000000000..61999f17b
--- /dev/null
+++ b/service.yml
@@ -0,0 +1,19 @@
+name: kafka-connect-hdfs
+lang: java
+lang_version: 8
+git:
+ enable: true
+codeowners:
+ enable: true
+semaphore:
+ enable: true
+ pipeline_type: cp
+ extra_deploy_args: -Dcloud -Pjenkins
+ extra_build_args: -Dcloud -Pjenkins
+ generate_connect_changelogs: true
+ trivy_scan: true
+ run_pint_merge: true
+code_artifact:
+ enable: true
+ package_paths:
+ - maven-snapshots/maven/io.confluent/kafka-connect-hdfs
diff --git a/sonar-project.properties b/sonar-project.properties
new file mode 100644
index 000000000..4bb62e5d6
--- /dev/null
+++ b/sonar-project.properties
@@ -0,0 +1,9 @@
+### service-bot sonarqube plugin managed file
+sonar.coverage.exclusions=**/test/**/*,**/tests/**/*,**/mock/**/*,**/mocks/**/*,**/*mock*,**/*test*
+sonar.coverage.jacoco.xmlReportPaths=**/jacoco.xml
+sonar.cpd.exclusions=**/test/**/*,**/tests/**/*,**/mock/**/*,**/mocks/**/*,**/*mock*,**/*test*
+sonar.exclusions=**/*.pb.*,**/mk-include/**/*
+sonar.java.binaries=.
+sonar.language=java
+sonar.projectKey=kafka-connect-hdfs
+sonar.sources=.
diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java
index dcb821d30..9bb8b9390 100644
--- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java
+++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java
@@ -15,6 +15,7 @@
package io.confluent.connect.hdfs;
+import io.confluent.connect.hdfs.avro.AvroIOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.TopicPartition;
@@ -249,6 +250,26 @@ public TopicPartitionWriter(
updateRotationTimers(null);
}
+ private void resetBuffers() {
+ buffer.clear();
+ writers.clear();
+ appended.clear();
+ startOffsets.clear();
+ endOffsets.clear();
+ recordCounter = 0;
+ currentSchema = null;
+ }
+
+ private void safeDeleteTempFiles() {
+ for (String encodedPartition : tempFiles.keySet()) {
+ try {
+ deleteTempFile(encodedPartition);
+ } catch (ConnectException e) {
+ log.error("Failed to delete tmp file {}", tempFiles.get(encodedPartition), e);
+ }
+ }
+ }
+
@SuppressWarnings("fallthrough")
public boolean recover() {
try {
@@ -263,6 +284,7 @@ public boolean recover() {
nextState();
case WAL_APPLIED:
log.debug("Start recovery state: Reset Offsets for topic partition {}", tp);
+ safeDeleteTempFiles();
resetOffsets();
nextState();
case OFFSET_RESET:
@@ -282,8 +304,9 @@ public boolean recover() {
tp
);
}
- } catch (ConnectException e) {
+ } catch (AvroIOException | ConnectException e) {
log.error("Recovery failed at state {}", state, e);
+ failureTime = time.milliseconds();
setRetryTimeout(timeoutMs);
return false;
}
@@ -319,6 +342,13 @@ private void updateRotationTimers(SinkRecord currentRecord) {
}
}
+ private void resetAndSetRecovery() {
+ context.offset(tp, offset);
+ resetBuffers();
+ state = State.RECOVERY_STARTED;
+ recovered = false;
+ }
+
@SuppressWarnings("fallthrough")
public void write() {
long now = time.milliseconds();
@@ -361,7 +391,7 @@ public void write() {
currentRecord = record;
Schema valueSchema = record.valueSchema();
if ((recordCounter <= 0 && currentSchema == null && valueSchema != null)
- || compatibility.shouldChangeSchema(record, null, currentSchema)) {
+ || compatibility.shouldChangeSchema(record, null, currentSchema).isInCompatible()) {
currentSchema = valueSchema;
if (hiveIntegration) {
createHiveTable();
@@ -408,10 +438,15 @@ public void write() {
}
} catch (SchemaProjectorException | IllegalWorkerStateException | HiveMetaStoreException e) {
throw new RuntimeException(e);
- } catch (ConnectException e) {
+ } catch (AvroIOException | ConnectException e) {
log.error("Exception on topic partition {}: ", tp, e);
failureTime = time.milliseconds();
setRetryTimeout(timeoutMs);
+ if (e instanceof AvroIOException) {
+ log.error("Encountered AVRO IO exception, resetting this topic partition {} "
+ + "to offset {}", tp, offset);
+ resetAndSetRecovery();
+ }
break;
}
}
@@ -448,10 +483,15 @@ public void write() {
default:
log.error("{} is not a valid state to empty batch for topic partition {}.", state, tp);
}
- } catch (ConnectException e) {
+ } catch (AvroIOException | ConnectException e) {
log.error("Exception on topic partition {}: ", tp, e);
failureTime = time.milliseconds();
setRetryTimeout(timeoutMs);
+ if (e instanceof AvroIOException) {
+ log.error("Encountered AVRO IO exception, resetting this topic partition {} "
+ + "to offset {}", tp, offset);
+ resetAndSetRecovery();
+ }
return;
}
@@ -765,14 +805,14 @@ private void closeTempFile(String encodedPartition) {
}
private void closeTempFile() {
- ConnectException connectException = null;
+ RuntimeException exception = null;
for (String encodedPartition : tempFiles.keySet()) {
// Close the file and propagate any errors
try {
closeTempFile(encodedPartition);
- } catch (ConnectException e) {
+ } catch (RuntimeException e) {
// still want to close all of the other data writers
- connectException = e;
+ exception = e;
log.error(
"Failed to close temporary file for partition {}. The connector will attempt to"
+ " rewrite the temporary file.",
@@ -781,16 +821,12 @@ private void closeTempFile() {
}
}
- if (connectException != null) {
+ if (exception != null) {
// at least one tmp file did not close properly therefore will try to recreate the tmp and
// delete all buffered records + tmp files and start over because otherwise there will be
// duplicates, since there is no way to reclaim the records in the tmp file.
for (String encodedPartition : tempFiles.keySet()) {
- try {
- deleteTempFile(encodedPartition);
- } catch (ConnectException e) {
- log.error("Failed to delete tmp file {}", tempFiles.get(encodedPartition), e);
- }
+ safeDeleteTempFiles();
startOffsets.remove(encodedPartition);
endOffsets.remove(encodedPartition);
buffer.clear();
@@ -800,7 +836,7 @@ private void closeTempFile() {
context.offset(tp, offset);
recordCounter = 0;
- throw connectException;
+ throw exception;
}
}
diff --git a/src/main/java/io/confluent/connect/hdfs/avro/AvroIOException.java b/src/main/java/io/confluent/connect/hdfs/avro/AvroIOException.java
new file mode 100644
index 000000000..2a2956e6d
--- /dev/null
+++ b/src/main/java/io/confluent/connect/hdfs/avro/AvroIOException.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+
+package io.confluent.connect.hdfs.avro;
+
+import java.io.IOException;
+
+@SuppressWarnings("serial")
+public class AvroIOException extends RuntimeException {
+ public AvroIOException(IOException e) {
+ super(e);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java
index f64940d31..c3a22c620 100644
--- a/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java
+++ b/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java
@@ -22,8 +22,6 @@
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +66,7 @@ public void write(SinkRecord record) {
writer.setCodec(CodecFactory.fromString(conf.getAvroCodec()));
writer.create(avroSchema, out);
} catch (IOException e) {
- throw new ConnectException(e);
+ throw new AvroIOException(e);
}
}
@@ -82,7 +80,7 @@ public void write(SinkRecord record) {
writer.append(value);
}
} catch (IOException e) {
- throw new DataException(e);
+ throw new AvroIOException(e);
}
}
@@ -91,7 +89,7 @@ public void close() {
try {
writer.close();
} catch (IOException e) {
- throw new DataException(e);
+ throw new AvroIOException(e);
}
}
diff --git a/src/test/java/io/confluent/connect/hdfs/avro/DataWriterAvroTest.java b/src/test/java/io/confluent/connect/hdfs/avro/DataWriterAvroTest.java
index 57060b16d..568ed05ec 100644
--- a/src/test/java/io/confluent/connect/hdfs/avro/DataWriterAvroTest.java
+++ b/src/test/java/io/confluent/connect/hdfs/avro/DataWriterAvroTest.java
@@ -15,12 +15,14 @@
package io.confluent.connect.hdfs.avro;
+import io.confluent.connect.hdfs.partitioner.DefaultPartitioner;
import io.confluent.connect.hdfs.wal.FSWAL;
import io.confluent.connect.hdfs.wal.WALFile.Writer;
import io.confluent.connect.hdfs.wal.WALFileTest;
import io.confluent.connect.hdfs.wal.WALFileTest.CorruptWriter;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -458,6 +460,82 @@ public void testFlushPartialFile() throws Exception {
hdfsWriter.stop();
}
+ @Test
+ public void testRecoveryAfterFailedFlush() throws Exception {
+ // Define constants
+ String FLUSH_SIZE_CONFIG = "60";
+ int FLUSH_SIZE = Integer.valueOf(FLUSH_SIZE_CONFIG);
+ int NUMBER_OF_RECORDS = FLUSH_SIZE*2;
+ long SLEEP_TIME = 10000L;
+
+ // Create connector configs
+ Map props = createProps();
+ props.put(HdfsSinkConnectorConfig.FLUSH_SIZE_CONFIG, FLUSH_SIZE_CONFIG);
+ props.put(
+ PartitionerConfig.PARTITIONER_CLASS_CONFIG,
+ DefaultPartitioner.class.getName()
+ );
+ HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
+
+ // Initialize data writer
+ context.assignment().clear();
+ context.assignment().add(TOPIC_PARTITION);
+ Time time = TopicPartitionWriterTest.MockedWallclockTimestampExtractor.TIME;
+ DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData, time);
+ hdfsWriter.open(context.assignment());
+ partitioner = hdfsWriter.getPartitioner();
+
+ List sinkRecords = createSinkRecords(NUMBER_OF_RECORDS);
+
+ // Write initial batch of records
+ hdfsWriter.write(sinkRecords.subList(0, FLUSH_SIZE-2));
+
+ // Stop all datanodes
+ int NUM_DATANODES = 3;
+ ArrayList dnProps = new ArrayList<>();
+ for(int j=0; j());
+
+ // Restart the datanodes and wait for them to come up
+ for(int j=0; j());
+
+ // Assert that all records have been committed
+ Map committedOffsets = hdfsWriter.getCommittedOffsets();
+ assertTrue(committedOffsets.containsKey(TOPIC_PARTITION));
+ long nextOffset = committedOffsets.get(TOPIC_PARTITION);
+ assertEquals(NUMBER_OF_RECORDS, nextOffset);
+
+ hdfsWriter.close();
+ hdfsWriter.stop();
+
+ // Assert that there are no zero data files
+ long[] validOffsets = {0, FLUSH_SIZE, FLUSH_SIZE*2};
+ verify(sinkRecords, validOffsets, Collections.singleton(TOPIC_PARTITION), false);
+ }
+
@Test
public void testAvroCompression() throws Exception {
//set compression codec to Snappy