Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds flush.file.size to allow file size based rotation #671

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.hdfs;

import io.confluent.connect.storage.format.RecordWriter;

public interface FileSizeAwareRecordWriter extends RecordWriter {
long getFileSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ public class HdfsSinkConnectorConfig extends StorageSinkConnectorConfig {
private static final String KERBEROS_TICKET_RENEW_PERIOD_MS_DISPLAY = "Kerberos Ticket Renew "
+ "Period (ms)";

public static final String FLUSH_FILE_SIZE_CONFIG = "flush.file.size";
private static final long FLUSH_FILE_SIZE_DEFAULT = 0;
private static final String FLUSH_FILE_SIZE_DOC = "Bytes written to a single file before "
+ "invoking commits for all files in the current partition.";
private static final String FLUSH_FILE_SIZE_DISPLAY = "Flush File Size";

private static final Pattern SUBSTITUTION_PATTERN = Pattern.compile("\\$\\{(\\d+)}");
private static final Pattern INVALID_SUB_PATTERN = Pattern.compile("\\$\\{.*}");

Expand Down Expand Up @@ -366,6 +372,18 @@ public static ConfigDef newConfigDef() {
TOPIC_CAPTURE_GROUPS_REGEX_DISPLAY
);

configDef.define(
FLUSH_FILE_SIZE_CONFIG,
Type.LONG,
FLUSH_FILE_SIZE_DEFAULT,
Importance.MEDIUM,
FLUSH_FILE_SIZE_DOC,
"Connector",
1,
Width.LONG,
FLUSH_FILE_SIZE_DISPLAY
);

return configDef;
}

Expand All @@ -374,6 +392,7 @@ public static ConfigDef newConfigDef() {
private final HiveConfig hiveConfig;
private final PartitionerConfig partitionerConfig;
private final Pattern topicRegexCaptureGroup;
private final long flushFileSize;
private final Map<String, ComposableConfig> propertyToConfig = new HashMap<>();
private final Set<AbstractConfig> allConfigs = new HashSet<>();
private Configuration hadoopConfig;
Expand All @@ -400,6 +419,7 @@ protected HdfsSinkConnectorConfig(ConfigDef configDef, Map<String, String> props
addToGlobal(commonConfig);
addToGlobal(this);
this.url = extractUrl();
this.flushFileSize = Long.parseLong(props.getOrDefault(FLUSH_FILE_SIZE_CONFIG, "0"));
try {
String topicRegex = getString(TOPIC_CAPTURE_GROUPS_REGEX_CONFIG);
this.topicRegexCaptureGroup = topicRegex != null ? Pattern.compile(topicRegex) : null;
Expand All @@ -415,6 +435,7 @@ protected HdfsSinkConnectorConfig(ConfigDef configDef, Map<String, String> props

validateDirsAndRegex();
validateTimezone();
validateFlushSizes();
}

/**
Expand All @@ -435,6 +456,17 @@ private void validateTimezone() {
}
}

private void validateFlushSizes() {
if (getInt(FLUSH_SIZE_CONFIG) <= 0 && flushFileSize <= 0) {
String message = String.format(
"%s and %s",
HdfsSinkConnectorConfig.FLUSH_SIZE_CONFIG,
HdfsSinkConnectorConfig.FLUSH_FILE_SIZE_CONFIG
);
throw new ConfigException(message, 0, "At least one variable must be greater than 0");
}
}

public static Map<String, String> addDefaults(Map<String, String> props) {
ConcurrentMap<String, String> propsCopy = new ConcurrentHashMap<>(props);
propsCopy.putIfAbsent(STORAGE_CLASS_CONFIG, HdfsStorage.class.getName());
Expand Down Expand Up @@ -724,6 +756,10 @@ private void validateReplacements(String config) {
}
}

public long getFlushFileSize() {
return flushFileSize;
}

private static class BooleanParentRecommender implements ConfigDef.Recommender {

protected String parentConfigName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.IllegalWorkerStateException;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class TopicPartitionWriter {
private final SinkTaskContext context;
private int recordCounter;
private final int flushSize;
private final long flushFileSize;
private final long rotateIntervalMs;
private Long lastRotate;
private final long rotateScheduleIntervalMs;
Expand Down Expand Up @@ -193,6 +195,7 @@ public TopicPartitionWriter(

topicsDir = config.getTopicsDirFromTopic(tp.topic());
flushSize = config.getInt(HdfsSinkConnectorConfig.FLUSH_SIZE_CONFIG);
flushFileSize = config.getFlushFileSize();
rotateIntervalMs = config.getLong(HdfsSinkConnectorConfig.ROTATE_INTERVAL_MS_CONFIG);
rotateScheduleIntervalMs = config.getLong(HdfsSinkConnectorConfig
.ROTATE_SCHEDULE_INTERVAL_MS_CONFIG);
Expand Down Expand Up @@ -568,12 +571,28 @@ private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long
lastRotate = lastRotate == null ? currentTimestamp : lastRotate;
}

Long fileSize = null;
if (currentRecord != null && flushFileSize > 0) {
io.confluent.connect.storage.format.RecordWriter writer = getWriter(
currentRecord,
partitioner.encodePartition(currentRecord)
);
if (!(writer instanceof FileSizeAwareRecordWriter)) {
throw new ConfigException("The Format's provided RecordWriterProvider does not support "
+ "FileSizeAwareRecordWriter and cannot be used with flush.file.size > 0.");
}
fileSize = ((FileSizeAwareRecordWriter) writer).getFileSize();
}

boolean periodicRotation = rotateIntervalMs > 0
&& currentTimestamp != null
&& lastRotate != null
&& currentTimestamp - lastRotate >= rotateIntervalMs;
boolean scheduledRotation = rotateScheduleIntervalMs > 0 && now >= nextScheduledRotate;
boolean messageSizeRotation = recordCounter >= flushSize;
boolean messageSizeRotation = flushSize > 0 && recordCounter >= flushSize;
boolean fileSizeRotation = flushFileSize > 0
&& fileSize != null
&& fileSize >= flushFileSize;

log.trace(
"Should apply periodic time-based rotation (rotateIntervalMs: '{}', lastRotate: "
Expand All @@ -600,7 +619,14 @@ private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long
messageSizeRotation
);

return periodicRotation || scheduledRotation || messageSizeRotation;
log.trace(
"Should apply file size-based rotation (file size {} >= flush file size {})? {}",
fileSize,
flushFileSize,
fileSizeRotation
);

return periodicRotation || scheduledRotation || messageSizeRotation || fileSizeRotation;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

package io.confluent.connect.hdfs.avro;

import io.confluent.connect.hdfs.FileSizeAwareRecordWriter;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.storage.format.RecordWriter;
import java.io.OutputStream;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
Expand Down Expand Up @@ -52,9 +52,12 @@ public String getExtension() {
}

@Override
public RecordWriter getRecordWriter(HdfsSinkConnectorConfig conf, String filename) {
return new RecordWriter() {
final DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>());
public FileSizeAwareRecordWriter getRecordWriter(HdfsSinkConnectorConfig conf, String filename) {
return new FileSizeAwareRecordWriter() {
private long fileSize;
final TransparentDataFileWriter<Object> writer = new TransparentDataFileWriter<>(
new DataFileWriter<>(new GenericDatumWriter<>())
);
Schema schema;

@Override
Expand All @@ -63,7 +66,7 @@ public void write(SinkRecord record) {
schema = record.valueSchema();
try {
log.info("Opening record writer for: {}", filename);
final OutputStream out = storage.create(filename, true);
final FSDataOutputStream out = storage.create(filename, true);
org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(schema);
writer.setCodec(CodecFactory.fromString(conf.getAvroCodec()));
writer.create(avroSchema, out);
Expand All @@ -81,6 +84,7 @@ public void write(SinkRecord record) {
} else {
writer.append(value);
}
fileSize = writer.getInnerFileStream().getPos();
} catch (IOException e) {
throw new DataException(e);
}
Expand All @@ -90,13 +94,21 @@ public void write(SinkRecord record) {
public void close() {
try {
writer.close();
if (writer.getInnerFileStream() != null) {
fileSize = writer.getInnerFileStream().getPos();
}
} catch (IOException e) {
throw new DataException(e);
}
}

@Override
public void commit() {}

@Override
public long getFileSize() {
return fileSize;
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.hdfs.avro;

import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableInput;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.fs.FSDataOutputStream;

import java.io.Closeable;
import java.io.File;
import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
* A wrapper for `DataFileWriter` which exposes the inner file stream. This is helpful for
* monitoring the file size of the underlying file.
*/
public class TransparentDataFileWriter<D> implements Closeable, Flushable {
private final DataFileWriter<D> dataFileWriter;

private FSDataOutputStream innerFileStream;

public TransparentDataFileWriter(DataFileWriter<D> dataFileWriter) {
this.dataFileWriter = dataFileWriter;
}

public DataFileWriter<D> create(Schema schema, File file) throws IOException {
throw new NotImplementedException();
}

public DataFileWriter<D> create(Schema schema, FSDataOutputStream outs) throws IOException {
this.innerFileStream = outs;
return dataFileWriter.create(schema, outs);
}

public DataFileWriter<D> create(
Schema schema,
FSDataOutputStream outs,
byte[] sync
) throws IOException {
this.innerFileStream = outs;
return dataFileWriter.create(schema, outs, sync);
}

public DataFileWriter<D> create(
Schema schema,
OutputStream outs,
byte[] sync
) throws IOException {
return dataFileWriter.create(schema, outs, sync);
}

public DataFileWriter<D> setCodec(CodecFactory c) {
return dataFileWriter.setCodec(c);
}

public DataFileWriter<D> setSyncInterval(int syncInterval) {
return dataFileWriter.setSyncInterval(syncInterval);
}

public void setFlushOnEveryBlock(boolean flushOnEveryBlock) {
dataFileWriter.setFlushOnEveryBlock(flushOnEveryBlock);
}

public boolean isFlushOnEveryBlock() {
return dataFileWriter.isFlushOnEveryBlock();
}

public DataFileWriter<D> appendTo(File file) throws IOException {
return dataFileWriter.appendTo(file);
}

public DataFileWriter<D> appendTo(SeekableInput in, OutputStream out) throws IOException {
return dataFileWriter.appendTo(in, out);
}

public DataFileWriter<D> setMeta(String key, byte[] value) {
return dataFileWriter.setMeta(key, value);
}

public DataFileWriter<D> setMeta(String key, String value) {
return dataFileWriter.setMeta(key, value);
}

public DataFileWriter<D> setMeta(String key, long value) {
return dataFileWriter.setMeta(key, value);
}

public static boolean isReservedMeta(String key) {
return DataFileWriter.isReservedMeta(key);
}

public void append(D datum) throws IOException {
dataFileWriter.append(datum);
}

public void appendEncoded(ByteBuffer datum) throws IOException {
dataFileWriter.appendEncoded(datum);
}

public void appendAllFrom(DataFileStream<D> otherFile, boolean recompress) throws IOException {
dataFileWriter.appendAllFrom(otherFile, recompress);
}

public long sync() throws IOException {
return dataFileWriter.sync();
}

@Override
public void flush() throws IOException {
dataFileWriter.flush();
}

@Override
public void close() throws IOException {
dataFileWriter.close();
}

public FSDataOutputStream getInnerFileStream() {
return innerFileStream;
}
}
Loading