diff --git a/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java b/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java index 88d2dc5fb..fc0e971dd 100644 --- a/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/hdfs/HdfsSinkConnectorConfig.java @@ -157,6 +157,18 @@ public class HdfsSinkConnectorConfig extends StorageSinkConnectorConfig { private static final String KERBEROS_TICKET_RENEW_PERIOD_MS_DISPLAY = "Kerberos Ticket Renew " + "Period (ms)"; + public static final String RETRY_TIMEOUT_CONFIG = "retry.timeout.ms"; + public static final long RETRY_TIMEOUT_DEFAULT = -1; + public static final String RETRY_TIMEOUT_DISPLAY = "Retry Timeout (ms)"; + public static final String RETRY_TIMEOUT_DOC = + "The retry timeout in milliseconds. This config set's a timeout for retry and recovery " + + "from write failures. If writes keeps constantly failing, after the timeout, " + + "reads will reset and retry write from last committed offset. This could be " + + "useful if write pipeline is targeting an unhealthy node and recovery would " + + "take longer time. Sink could try and succeed via new write pipeline which " + + "possibly might targeting different nodes in cluster. The default value ``-1`` " + + "indicates that this feature is disabled and retries will"; + private static final Pattern SUBSTITUTION_PATTERN = Pattern.compile("\\$\\{(\\d+)}"); private static final Pattern INVALID_SUB_PATTERN = Pattern.compile("\\$\\{.*}"); @@ -366,6 +378,18 @@ public static ConfigDef newConfigDef() { TOPIC_CAPTURE_GROUPS_REGEX_DISPLAY ); + configDef + .define( + RETRY_TIMEOUT_CONFIG, + Type.LONG, + RETRY_TIMEOUT_DEFAULT, + Importance.LOW, + RETRY_TIMEOUT_DOC, + group, + ++lastOrder, + Width.LONG, + RETRY_TIMEOUT_DISPLAY); + return configDef; } diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index dcb821d30..4a01b108f 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -101,6 +101,8 @@ public class TopicPartitionWriter { private final Map startOffsets; private final Map endOffsets; private final long timeoutMs; + private final long retryTimeoutMs; + private long failureStartTimeMs; private long failureTime; private final StorageSchemaCompatibility compatibility; private Schema currentSchema; @@ -197,6 +199,7 @@ public TopicPartitionWriter( rotateScheduleIntervalMs = config.getLong(HdfsSinkConnectorConfig .ROTATE_SCHEDULE_INTERVAL_MS_CONFIG); timeoutMs = config.getLong(HdfsSinkConnectorConfig.RETRY_BACKOFF_CONFIG); + retryTimeoutMs = config.getLong(HdfsSinkConnectorConfig.RETRY_TIMEOUT_CONFIG); compatibility = StorageSchemaCompatibility.getCompatibility( config.getString(StorageSinkConnectorConfig.SCHEMA_COMPATIBILITY_CONFIG)); @@ -210,6 +213,7 @@ public TopicPartitionWriter( startOffsets = new HashMap<>(); endOffsets = new HashMap<>(); state = State.RECOVERY_STARTED; + failureStartTimeMs = -1L; failureTime = -1L; // The next offset to consume after the last commit (one more than last offset written to HDFS) offset = -1L; @@ -386,6 +390,7 @@ public void write() { } else { SinkRecord projectedRecord = compatibility.project(record, null, currentSchema); writeRecord(projectedRecord); + failureStartTimeMs = -1L; buffer.poll(); break; } @@ -411,7 +416,15 @@ public void write() { } catch (ConnectException e) { log.error("Exception on topic partition {}: ", tp, e); failureTime = time.milliseconds(); - setRetryTimeout(timeoutMs); + if (failureStartTimeMs == -1L) { + failureStartTimeMs = failureTime; + } + if (retryTimeoutMs > 0 && (failureTime - failureStartTimeMs) > retryTimeoutMs) { + log.warn("Resetting reads for topic partition {} because write retries timed out", tp); + resetReads(); + } else { + setRetryTimeout(timeoutMs); + } break; } } @@ -785,23 +798,40 @@ private void closeTempFile() { // at least one tmp file did not close properly therefore will try to recreate the tmp and // delete all buffered records + tmp files and start over because otherwise there will be // duplicates, since there is no way to reclaim the records in the tmp file. - for (String encodedPartition : tempFiles.keySet()) { - try { - deleteTempFile(encodedPartition); - } catch (ConnectException e) { - log.error("Failed to delete tmp file {}", tempFiles.get(encodedPartition), e); - } - startOffsets.remove(encodedPartition); - endOffsets.remove(encodedPartition); - buffer.clear(); + deleteTempFileAndResetOffsets(); + throw connectException; + } + } + + private void resetReads() { + // silently close all temp files + for (String encodedPartition : tempFiles.keySet()) { + try { + closeTempFile(encodedPartition); + } catch (ConnectException e) { + log.debug("Failed to close temporary file for partition {}.", encodedPartition); } + } - log.debug("Resetting offset for {} to {}", tp, offset); - context.offset(tp, offset); + deleteTempFileAndResetOffsets(); + } - recordCounter = 0; - throw connectException; + private void deleteTempFileAndResetOffsets() { + for (String encodedPartition : tempFiles.keySet()) { + try { + deleteTempFile(encodedPartition); + } catch (ConnectException e) { + log.error("Failed to delete tmp file {}", tempFiles.get(encodedPartition), e); + } + startOffsets.remove(encodedPartition); + endOffsets.remove(encodedPartition); + buffer.clear(); } + + log.debug("Resetting offset for {} to {}", tp, offset); + context.offset(tp, offset); + + recordCounter = 0; } private void appendToWAL(String encodedPartition) { diff --git a/src/test/java/io/confluent/connect/hdfs/TopicPartitionWriterTest.java b/src/test/java/io/confluent/connect/hdfs/TopicPartitionWriterTest.java index 2732b73b4..1a03b8b7a 100644 --- a/src/test/java/io/confluent/connect/hdfs/TopicPartitionWriterTest.java +++ b/src/test/java/io/confluent/connect/hdfs/TopicPartitionWriterTest.java @@ -15,6 +15,7 @@ package io.confluent.connect.hdfs; +import io.confluent.connect.storage.format.RecordWriter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -29,6 +30,7 @@ import org.junit.Test; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,6 +41,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import io.confluent.common.utils.MockTime; import io.confluent.connect.hdfs.avro.AvroDataFileReader; @@ -63,6 +66,8 @@ import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG; import static org.apache.kafka.common.utils.Time.SYSTEM; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class TopicPartitionWriterTest extends TestWithMiniDFSCluster { @@ -783,6 +788,131 @@ public void testWriteRecordTimeBasedPartitionWallclockMockedWithScheduleRotation verify(expectedFiles, 3, records, schema); } + @Test + public void testWriteFailureResetReadsOnRetryTimeout() throws Exception { + long retryTimeout = 30000; + long retryBackoff = 1000; + localProps.put(HdfsSinkConnectorConfig.RETRY_TIMEOUT_CONFIG, String.valueOf(retryTimeout)); + localProps.put(HdfsSinkConnectorConfig.RETRY_BACKOFF_CONFIG, String.valueOf(retryBackoff)); + setUp(); + + partitioner = new DataWriter.PartitionerWrapper( + new io.confluent.connect.storage.partitioner.TimeBasedPartitioner<>() + ); + parsedConfig.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, TimeUnit.DAYS.toMillis(1)); + parsedConfig.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockedWallclockTimestampExtractor.class.getName()); + partitioner.configure(parsedConfig); + MockedWallclockTimestampExtractor.TIME.sleep(SYSTEM.milliseconds()); + + AtomicBoolean breakWritesSignal = new AtomicBoolean(false); + newWriterProvider = createFailingWriterFactory(breakWritesSignal); + + TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter( + TOPIC_PARTITION, + storage, + writerProvider, + newWriterProvider, + partitioner, + connectorConfig, + context, + avroData, + time + ); + + Schema schema = createSchema(); + SinkRecord record = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, "key", + schema, createRecord(schema, 16, 12.2f), 3); + + + // 1. successful write + topicPartitionWriter.buffer(record); + topicPartitionWriter.write(); + + assertEquals(-1L, context.timeout()); + assertTrue(context.offsets().isEmpty()); + assertFalse(topicPartitionWriter.getWriters().isEmpty()); + + // 2. fail write + breakWritesSignal.set(true); + topicPartitionWriter.buffer(record); + topicPartitionWriter.write(); + + assertEquals(retryBackoff, context.timeout()); + assertTrue(context.offsets().isEmpty()); + + // 3. fail write again - timeout not trigger yes + time.sleep(retryTimeout - retryBackoff); + topicPartitionWriter.write(); + + assertEquals(retryBackoff, context.timeout()); + assertTrue(context.offsets().isEmpty()); + + // 4. reset offsets due to retry timeout + time.sleep(retryBackoff + 1); + topicPartitionWriter.write(); + + assertNotNull(context.offsets().get(new TopicPartition(TOPIC, PARTITION))); + assertTrue(topicPartitionWriter.getWriters().isEmpty()); + } + + @Test + public void testWriteFailureRetryForever() throws Exception { + long retryBackoff = 1000; + localProps.put(HdfsSinkConnectorConfig.RETRY_TIMEOUT_CONFIG, "-1"); + localProps.put(HdfsSinkConnectorConfig.RETRY_BACKOFF_CONFIG, String.valueOf(retryBackoff)); + setUp(); + + partitioner = new DataWriter.PartitionerWrapper( + new io.confluent.connect.storage.partitioner.TimeBasedPartitioner<>() + ); + parsedConfig.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, TimeUnit.DAYS.toMillis(1)); + parsedConfig.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockedWallclockTimestampExtractor.class.getName()); + partitioner.configure(parsedConfig); + MockedWallclockTimestampExtractor.TIME.sleep(SYSTEM.milliseconds()); + + AtomicBoolean breakWritesSignal = new AtomicBoolean(false); + newWriterProvider = createFailingWriterFactory(breakWritesSignal); + + TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter( + TOPIC_PARTITION, + storage, + writerProvider, + newWriterProvider, + partitioner, + connectorConfig, + context, + avroData, + time + ); + + Schema schema = createSchema(); + SinkRecord record = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, "key", + schema, createRecord(schema, 16, 12.2f), 3); + + + // 1. successful write + topicPartitionWriter.buffer(record); + topicPartitionWriter.write(); + + assertEquals(-1L, context.timeout()); + assertTrue(context.offsets().isEmpty()); + + // 2. fail write + breakWritesSignal.set(true); + topicPartitionWriter.buffer(record); + topicPartitionWriter.write(); + + assertEquals(retryBackoff, context.timeout()); + assertTrue(context.offsets().isEmpty()); + + // 3. fail write again after long time should just backoff + time.sleep(Duration.ofDays(999).toMillis()); + topicPartitionWriter.write(); + + assertEquals(retryBackoff, context.timeout()); + assertTrue(context.offsets().isEmpty()); + } + private String getTimebasedEncodedPartition(long timestamp) { long partitionDurationMs = (Long) parsedConfig.get(PartitionerConfig.PARTITION_DURATION_MS_CONFIG); String pathFormat = (String) parsedConfig.get(PartitionerConfig.PATH_FORMAT_CONFIG); @@ -821,6 +951,38 @@ private void verify(Set expectedFiles, int expectedSize, List reco } } + private io.confluent.connect.storage.format.RecordWriterProvider + createFailingWriterFactory(AtomicBoolean breakWritesSignal) { + return new io.confluent.connect.storage.format.RecordWriterProvider() { + @Override + public String getExtension() { + return ".dat"; + } + + @Override + public RecordWriter getRecordWriter(HdfsSinkConnectorConfig hdfsSinkConnectorConfig, String s) { + return new RecordWriter() { + @Override + public void write(SinkRecord sinkRecord) { + if (breakWritesSignal.get()) { + throw new ConnectException("Write failed"); + } else { + // nothing to do + } + } + + @Override + public void close() { + } + + @Override + public void commit() { + } + }; + } + }; + } + public static class MockedWallclockTimestampExtractor extends io.confluent.connect.storage.partitioner.TimeBasedPartitioner.WallclockTimestampExtractor{