Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout for failed writes retry #663 #664

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,18 @@ public class HdfsSinkConnectorConfig extends StorageSinkConnectorConfig {
private static final String KERBEROS_TICKET_RENEW_PERIOD_MS_DISPLAY = "Kerberos Ticket Renew "
+ "Period (ms)";

public static final String RETRY_TIMEOUT_CONFIG = "retry.timeout.ms";
public static final long RETRY_TIMEOUT_DEFAULT = -1;
public static final String RETRY_TIMEOUT_DISPLAY = "Retry Timeout (ms)";
public static final String RETRY_TIMEOUT_DOC =
"The retry timeout in milliseconds. This config set's a timeout for retry and recovery "
+ "from write failures. If writes keeps constantly failing, after the timeout, "
+ "reads will reset and retry write from last committed offset. This could be "
+ "useful if write pipeline is targeting an unhealthy node and recovery would "
+ "take longer time. Sink could try and succeed via new write pipeline which "
+ "possibly might targeting different nodes in cluster. The default value ``-1`` "
+ "indicates that this feature is disabled and retries will";

private static final Pattern SUBSTITUTION_PATTERN = Pattern.compile("\\$\\{(\\d+)}");
private static final Pattern INVALID_SUB_PATTERN = Pattern.compile("\\$\\{.*}");

Expand Down Expand Up @@ -366,6 +378,18 @@ public static ConfigDef newConfigDef() {
TOPIC_CAPTURE_GROUPS_REGEX_DISPLAY
);

configDef
.define(
RETRY_TIMEOUT_CONFIG,
Type.LONG,
RETRY_TIMEOUT_DEFAULT,
Importance.LOW,
RETRY_TIMEOUT_DOC,
group,
++lastOrder,
Width.LONG,
RETRY_TIMEOUT_DISPLAY);

return configDef;
}

Expand Down
58 changes: 44 additions & 14 deletions src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public class TopicPartitionWriter {
private final Map<String, Long> startOffsets;
private final Map<String, Long> endOffsets;
private final long timeoutMs;
private final long retryTimeoutMs;
private long failureStartTimeMs;
private long failureTime;
private final StorageSchemaCompatibility compatibility;
private Schema currentSchema;
Expand Down Expand Up @@ -197,6 +199,7 @@ public TopicPartitionWriter(
rotateScheduleIntervalMs = config.getLong(HdfsSinkConnectorConfig
.ROTATE_SCHEDULE_INTERVAL_MS_CONFIG);
timeoutMs = config.getLong(HdfsSinkConnectorConfig.RETRY_BACKOFF_CONFIG);
retryTimeoutMs = config.getLong(HdfsSinkConnectorConfig.RETRY_TIMEOUT_CONFIG);
compatibility = StorageSchemaCompatibility.getCompatibility(
config.getString(StorageSinkConnectorConfig.SCHEMA_COMPATIBILITY_CONFIG));

Expand All @@ -210,6 +213,7 @@ public TopicPartitionWriter(
startOffsets = new HashMap<>();
endOffsets = new HashMap<>();
state = State.RECOVERY_STARTED;
failureStartTimeMs = -1L;
failureTime = -1L;
// The next offset to consume after the last commit (one more than last offset written to HDFS)
offset = -1L;
Expand Down Expand Up @@ -386,6 +390,7 @@ public void write() {
} else {
SinkRecord projectedRecord = compatibility.project(record, null, currentSchema);
writeRecord(projectedRecord);
failureStartTimeMs = -1L;
buffer.poll();
break;
}
Expand All @@ -411,7 +416,15 @@ public void write() {
} catch (ConnectException e) {
log.error("Exception on topic partition {}: ", tp, e);
failureTime = time.milliseconds();
setRetryTimeout(timeoutMs);
if (failureStartTimeMs == -1L) {
failureStartTimeMs = failureTime;
}
if (retryTimeoutMs > 0 && (failureTime - failureStartTimeMs) > retryTimeoutMs) {
log.warn("Resetting reads for topic partition {} because write retries timed out", tp);
resetReads();
} else {
setRetryTimeout(timeoutMs);
}
break;
}
}
Expand Down Expand Up @@ -785,23 +798,40 @@ private void closeTempFile() {
// at least one tmp file did not close properly therefore will try to recreate the tmp and
// delete all buffered records + tmp files and start over because otherwise there will be
// duplicates, since there is no way to reclaim the records in the tmp file.
for (String encodedPartition : tempFiles.keySet()) {
try {
deleteTempFile(encodedPartition);
} catch (ConnectException e) {
log.error("Failed to delete tmp file {}", tempFiles.get(encodedPartition), e);
}
startOffsets.remove(encodedPartition);
endOffsets.remove(encodedPartition);
buffer.clear();
deleteTempFileAndResetOffsets();
throw connectException;
}
}

private void resetReads() {
// silently close all temp files
for (String encodedPartition : tempFiles.keySet()) {
try {
closeTempFile(encodedPartition);
} catch (ConnectException e) {
log.debug("Failed to close temporary file for partition {}.", encodedPartition);
}
}

log.debug("Resetting offset for {} to {}", tp, offset);
context.offset(tp, offset);
deleteTempFileAndResetOffsets();
}

recordCounter = 0;
throw connectException;
private void deleteTempFileAndResetOffsets() {
for (String encodedPartition : tempFiles.keySet()) {
try {
deleteTempFile(encodedPartition);
} catch (ConnectException e) {
log.error("Failed to delete tmp file {}", tempFiles.get(encodedPartition), e);
}
startOffsets.remove(encodedPartition);
endOffsets.remove(encodedPartition);
buffer.clear();
}

log.debug("Resetting offset for {} to {}", tp, offset);
context.offset(tp, offset);

recordCounter = 0;
}

private void appendToWAL(String encodedPartition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.connect.hdfs;

import io.confluent.connect.storage.format.RecordWriter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand All @@ -29,6 +30,7 @@
import org.junit.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -39,6 +41,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.confluent.common.utils.MockTime;
import io.confluent.connect.hdfs.avro.AvroDataFileReader;
Expand All @@ -63,6 +66,8 @@
import static io.confluent.connect.storage.StorageSinkConnectorConfig.FLUSH_SIZE_CONFIG;
import static org.apache.kafka.common.utils.Time.SYSTEM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public class TopicPartitionWriterTest extends TestWithMiniDFSCluster {
Expand Down Expand Up @@ -783,6 +788,131 @@ public void testWriteRecordTimeBasedPartitionWallclockMockedWithScheduleRotation
verify(expectedFiles, 3, records, schema);
}

@Test
public void testWriteFailureResetReadsOnRetryTimeout() throws Exception {
long retryTimeout = 30000;
long retryBackoff = 1000;
localProps.put(HdfsSinkConnectorConfig.RETRY_TIMEOUT_CONFIG, String.valueOf(retryTimeout));
localProps.put(HdfsSinkConnectorConfig.RETRY_BACKOFF_CONFIG, String.valueOf(retryBackoff));
setUp();

partitioner = new DataWriter.PartitionerWrapper(
new io.confluent.connect.storage.partitioner.TimeBasedPartitioner<>()
);
parsedConfig.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, TimeUnit.DAYS.toMillis(1));
parsedConfig.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockedWallclockTimestampExtractor.class.getName());
partitioner.configure(parsedConfig);
MockedWallclockTimestampExtractor.TIME.sleep(SYSTEM.milliseconds());

AtomicBoolean breakWritesSignal = new AtomicBoolean(false);
newWriterProvider = createFailingWriterFactory(breakWritesSignal);

TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
TOPIC_PARTITION,
storage,
writerProvider,
newWriterProvider,
partitioner,
connectorConfig,
context,
avroData,
time
);

Schema schema = createSchema();
SinkRecord record = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, "key",
schema, createRecord(schema, 16, 12.2f), 3);


// 1. successful write
topicPartitionWriter.buffer(record);
topicPartitionWriter.write();

assertEquals(-1L, context.timeout());
assertTrue(context.offsets().isEmpty());
assertFalse(topicPartitionWriter.getWriters().isEmpty());

// 2. fail write
breakWritesSignal.set(true);
topicPartitionWriter.buffer(record);
topicPartitionWriter.write();

assertEquals(retryBackoff, context.timeout());
assertTrue(context.offsets().isEmpty());

// 3. fail write again - timeout not trigger yes
time.sleep(retryTimeout - retryBackoff);
topicPartitionWriter.write();

assertEquals(retryBackoff, context.timeout());
assertTrue(context.offsets().isEmpty());

// 4. reset offsets due to retry timeout
time.sleep(retryBackoff + 1);
topicPartitionWriter.write();

assertNotNull(context.offsets().get(new TopicPartition(TOPIC, PARTITION)));
assertTrue(topicPartitionWriter.getWriters().isEmpty());
}

@Test
public void testWriteFailureRetryForever() throws Exception {
long retryBackoff = 1000;
localProps.put(HdfsSinkConnectorConfig.RETRY_TIMEOUT_CONFIG, "-1");
localProps.put(HdfsSinkConnectorConfig.RETRY_BACKOFF_CONFIG, String.valueOf(retryBackoff));
setUp();

partitioner = new DataWriter.PartitionerWrapper(
new io.confluent.connect.storage.partitioner.TimeBasedPartitioner<>()
);
parsedConfig.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, TimeUnit.DAYS.toMillis(1));
parsedConfig.put(PartitionerConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockedWallclockTimestampExtractor.class.getName());
partitioner.configure(parsedConfig);
MockedWallclockTimestampExtractor.TIME.sleep(SYSTEM.milliseconds());

AtomicBoolean breakWritesSignal = new AtomicBoolean(false);
newWriterProvider = createFailingWriterFactory(breakWritesSignal);

TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
TOPIC_PARTITION,
storage,
writerProvider,
newWriterProvider,
partitioner,
connectorConfig,
context,
avroData,
time
);

Schema schema = createSchema();
SinkRecord record = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, "key",
schema, createRecord(schema, 16, 12.2f), 3);


// 1. successful write
topicPartitionWriter.buffer(record);
topicPartitionWriter.write();

assertEquals(-1L, context.timeout());
assertTrue(context.offsets().isEmpty());

// 2. fail write
breakWritesSignal.set(true);
topicPartitionWriter.buffer(record);
topicPartitionWriter.write();

assertEquals(retryBackoff, context.timeout());
assertTrue(context.offsets().isEmpty());

// 3. fail write again after long time should just backoff
time.sleep(Duration.ofDays(999).toMillis());
topicPartitionWriter.write();

assertEquals(retryBackoff, context.timeout());
assertTrue(context.offsets().isEmpty());
}

private String getTimebasedEncodedPartition(long timestamp) {
long partitionDurationMs = (Long) parsedConfig.get(PartitionerConfig.PARTITION_DURATION_MS_CONFIG);
String pathFormat = (String) parsedConfig.get(PartitionerConfig.PATH_FORMAT_CONFIG);
Expand Down Expand Up @@ -821,6 +951,38 @@ private void verify(Set<Path> expectedFiles, int expectedSize, List<Struct> reco
}
}

private io.confluent.connect.storage.format.RecordWriterProvider<HdfsSinkConnectorConfig>
createFailingWriterFactory(AtomicBoolean breakWritesSignal) {
return new io.confluent.connect.storage.format.RecordWriterProvider<HdfsSinkConnectorConfig>() {
@Override
public String getExtension() {
return ".dat";
}

@Override
public RecordWriter getRecordWriter(HdfsSinkConnectorConfig hdfsSinkConnectorConfig, String s) {
return new RecordWriter() {
@Override
public void write(SinkRecord sinkRecord) {
if (breakWritesSignal.get()) {
throw new ConnectException("Write failed");
} else {
// nothing to do
}
}

@Override
public void close() {
}

@Override
public void commit() {
}
};
}
};
}

public static class MockedWallclockTimestampExtractor
extends
io.confluent.connect.storage.partitioner.TimeBasedPartitioner.WallclockTimestampExtractor{
Expand Down