diff --git a/README.md b/README.md
index 516d939..de0a346 100644
--- a/README.md
+++ b/README.md
@@ -86,11 +86,9 @@ $ cat system-test-00000-000000000000.index.json | jq -M '.'
- Depending on your needs you can either limit to just the single block, or if you want to consume all records after that offset, you can consume from the offset right to the end of the file
- The range request bytes can be decompressed as a GZIP file on their own with any GZIP compatible tool, provided you limit to whole block boundaries.
-## Other Formats
+## BZip2 format
-For now we only support Block-GZIP output. This assumes that all your kafka messages can be output as newline-delimited text files.
-
-We could make the output format pluggable if others have use for this connector, but need binary serialisation formats like Avro/Thrift/Protobuf etc. Pull requests welcome.
+Works exactly the same way as the Block-GZIP output format, see above.
## Build and Run
@@ -113,6 +111,7 @@ In addition to the [standard kafka-connect config options](http://kafka.apache.o
| s3.endpoint | AWS defaults per region | Mostly useful for testing. |
| s3.path_style | `false` | Force path-style access to bucket rather than subdomain. Mostly useful for tests. |
| compressed_block_size | 67108864 | How much _uncompressed_ data to write to the file before we rol to a new block/chunk. See [Block-GZIP](#user-content-block-gzip-output-format) section above. |
+| compression.type | `gzip` | The compression algorithm, either `gzip` or `bzip2`. |
Note that we use the default AWS SDK credentials provider. [Refer to their docs](http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html#id1) for the options for configuring S3 credentials.
diff --git a/pom.xml b/pom.xml
index 4a97f85..49508fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.deviantart
kafka-connect-s3
- 0.0.3
+ 0.0.4
jar
kafka-connect-s3
@@ -83,6 +83,11 @@
aws-java-sdk-s3
${s3.version}
+
+ org.apache.commons
+ commons-compress
+ 1.13
+
junit
junit
diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriter.java
new file mode 100644
index 0000000..c13e50d
--- /dev/null
+++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriter.java
@@ -0,0 +1,69 @@
+package com.deviantart.kafka_connect_s3;
+
+import java.io.BufferedWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+
+/**
+ * BlockBZIP2FileWriter accumulates newline delimited UTF-8 records and writes them to an
+ * output file that is readable by BZIP2.
+ *
+ * In fact this file is the concatenation of possibly many separate BZIP2 files corresponding to smaller chunks
+ * of the input. Alongside the output filename.gz file, a file filename-index.json is written containing JSON
+ * metadata about the size and location of each block.
+ *
+ * This allows a reading class to skip to particular line/record without decompressing whole file by looking up
+ * the offset of the containing block, seeking to it and beginning BZIP2 read from there.
+ *
+ * This is especially useful when the file is an archive in HTTP storage like Amazon S3 where GET request with
+ * range headers can allow pulling a small segment from overall compressed file.
+ *
+ * Note that thanks to BZIP2 spec, the overall file is perfectly valid and will compress as if it was a single stream
+ * with any regular BZIP2 decoding library or program.
+ */
+public class BlockBZIP2FileWriter extends BlockFileWriter {
+
+ private BZip2CompressorOutputStream bzip2Stream;
+
+ public BlockBZIP2FileWriter(String filenameBase, String path) throws FileNotFoundException, IOException {
+ this(filenameBase, path, 0, 67108864);
+ }
+
+ public BlockBZIP2FileWriter(String filenameBase, String path, long firstRecordOffset) throws FileNotFoundException, IOException {
+ this(filenameBase, path, firstRecordOffset, 67108864);
+ }
+
+ public BlockBZIP2FileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws FileNotFoundException, IOException {
+ super(filenameBase, path, firstRecordOffset, chunkThreshold);
+ }
+
+ @Override
+ protected void initChunkWriter() throws IOException, UnsupportedEncodingException {
+ bzip2Stream = new BZip2CompressorOutputStream(fileStream);
+ writer = new BufferedWriter(new OutputStreamWriter(bzip2Stream, "UTF-8"));
+ }
+
+ @Override
+ protected void finishChunk() throws IOException {
+ Chunk ch = currentChunk();
+
+ // Complete GZIP block without closing stream
+ writer.flush();
+ bzip2Stream.finish();
+
+ // We can no find out how long this chunk was compressed
+ long bytesWritten = fileStream.getNumBytesWritten();
+ ch.compressedByteLength = bytesWritten - ch.byteOffset;
+ }
+
+ @Override
+ public String getDataFileName() {
+ return String.format("%s-%012d.bzip2", filenameBase, super.getFirstRecordOffset());
+ }
+}
diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java
new file mode 100644
index 0000000..0f39624
--- /dev/null
+++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java
@@ -0,0 +1,180 @@
+package com.deviantart.kafka_connect_s3;
+
+import java.io.BufferedWriter;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+
+import java.util.ArrayList;
+import java.util.zip.GZIPOutputStream;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+abstract class BlockFileWriter {
+ protected String filenameBase;
+ private String path;
+ protected BufferedWriter writer;
+ protected CountingOutputStream fileStream;
+
+ protected ArrayList chunks;
+
+ // Default each chunk is 64MB of uncompressed data
+ private long chunkThreshold;
+
+ // Offset to the first record.
+ // Set to non-zero if this file is part of a larger stream and you want
+ // record offsets in the index to reflect the global offset rather than local
+ private long firstRecordOffset;
+
+ BlockFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws IOException {
+ this.filenameBase = filenameBase;
+ this.path = path;
+ this.firstRecordOffset = firstRecordOffset;
+ this.chunkThreshold = chunkThreshold;
+
+ chunks = new ArrayList();
+
+ // Initialize first chunk
+ Chunk ch = new Chunk();
+ ch.firstOffset = firstRecordOffset;
+ chunks.add(ch);
+
+ // Explicitly truncate the file. On linux and OS X this appears to happen
+ // anyway when opening with FileOutputStream but that behavior is not actually documented
+ // or specified anywhere so let's be rigorous about it.
+ FileOutputStream fos = new FileOutputStream(new File(getDataFilePath()));
+ fos.getChannel().truncate(0);
+
+ // Open file for writing and setup
+ this.fileStream = new CountingOutputStream(fos);
+ initChunkWriter();
+ }
+
+ abstract protected void initChunkWriter() throws IOException;
+ abstract protected void finishChunk() throws IOException;
+ abstract protected String getDataFileName();
+
+ protected Chunk currentChunk() {
+ return chunks.get(chunks.size() - 1);
+ }
+
+ public long getFirstRecordOffset() {
+ return firstRecordOffset;
+ }
+
+ public String getIndexFileName() {
+ return String.format("%s-%012d.index.json", filenameBase, firstRecordOffset);
+ }
+
+ public String getDataFilePath() {
+ return String.format("%s/%s", path, this.getDataFileName());
+ }
+
+ public String getIndexFilePath() {
+ return String.format("%s/%s", path, this.getIndexFileName());
+ }
+
+ /**
+ * Writes string to file, assuming this is a single record
+ *
+ * If there is no newline at then end we will add one
+ */
+ public void write(String record) throws IOException {
+ Chunk ch = currentChunk();
+
+ boolean hasNewLine = record.endsWith("\n");
+
+ int rawBytesToWrite = record.length();
+ if (!hasNewLine) {
+ rawBytesToWrite += 1;
+ }
+
+ if ((ch.rawBytes + rawBytesToWrite) > chunkThreshold) {
+ finishChunk();
+ initChunkWriter();
+
+ Chunk newCh = new Chunk();
+ newCh.firstOffset = ch.firstOffset + ch.numRecords;
+ newCh.byteOffset = ch.byteOffset + ch.compressedByteLength;
+ chunks.add(newCh);
+ ch = newCh;
+ }
+
+ writer.append(record);
+ if (!hasNewLine) {
+ writer.newLine();
+ }
+ ch.rawBytes += rawBytesToWrite;
+ ch.numRecords++;
+ }
+
+ public void delete() throws IOException {
+ deleteIfExists(getDataFilePath());
+ deleteIfExists(getIndexFilePath());
+ }
+
+ private void deleteIfExists(String path) throws IOException {
+ File f = new File(path);
+ if (f.exists() && !f.isDirectory()) {
+ f.delete();
+ }
+ }
+
+ public void close() throws IOException {
+ // Flush last chunk, updating index
+ finishChunk();
+ // Now close the writer (and the whole stream stack)
+ writer.close();
+ writeIndex();
+ }
+
+ private void writeIndex() throws IOException {
+ JSONArray chunkArr = new JSONArray();
+
+ for (Chunk ch : chunks) {
+ JSONObject chunkObj = new JSONObject();
+ chunkObj.put("first_record_offset", ch.firstOffset);
+ chunkObj.put("num_records", ch.numRecords);
+ chunkObj.put("byte_offset", ch.byteOffset);
+ chunkObj.put("byte_length", ch.compressedByteLength);
+ chunkObj.put("byte_length_uncompressed", ch.rawBytes);
+ chunkArr.add(chunkObj);
+ }
+
+ JSONObject index = new JSONObject();
+ index.put("chunks", chunkArr);
+
+ try (FileWriter file = new FileWriter(getIndexFilePath())) {
+ file.write(index.toJSONString());
+ file.close();
+ }
+ }
+
+ public int getTotalUncompressedSize() {
+ int totalBytes = 0;
+ for (Chunk ch : chunks) {
+ totalBytes += ch.rawBytes;
+ }
+ return totalBytes;
+ }
+
+ public int getNumChunks() {
+ return chunks.size();
+ }
+
+ public int getNumRecords() {
+ int totalRecords = 0;
+ for (Chunk ch : chunks) {
+ totalRecords += ch.numRecords;
+ }
+ return totalRecords;
+ }
+}
diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java
index 1400307..185bdfb 100644
--- a/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java
+++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java
@@ -27,7 +27,7 @@
* metadata about the size and location of each block.
*
* This allows a reading class to skip to particular line/record without decompressing whole file by looking up
- * the offset of the containing block, seeking to it and beginning GZIp read from there.
+ * the offset of the containing block, seeking to it and beginning GZIP read from there.
*
* This is especially useful when the file is an archive in HTTP storage like Amazon S3 where GET request with
* range headers can allow pulling a small segment from overall compressed file.
@@ -35,227 +35,43 @@
* Note that thanks to GZIP spec, the overall file is perfectly valid and will compress as if it was a single stream
* with any regular GZIP decoding library or program.
*/
-public class BlockGZIPFileWriter {
- private String filenameBase;
- private String path;
- private GZIPOutputStream gzipStream;
- private BufferedWriter writer;
- private CountingOutputStream fileStream;
+public class BlockGZIPFileWriter extends BlockFileWriter {
- private class Chunk {
- public long rawBytes = 0;
- public long byteOffset = 0;
- public long compressedByteLength = 0;
- public long firstOffset = 0;
- public long numRecords = 0;
- };
+ private GZIPOutputStream gzipStream;
- private class CountingOutputStream extends FilterOutputStream {
- private long numBytes = 0;
-
- CountingOutputStream(OutputStream out) throws IOException {
- super(out);
+ public BlockGZIPFileWriter(String filenameBase, String path) throws FileNotFoundException, IOException {
+ this(filenameBase, path, 0, 67108864);
}
- @Override
- public void write(int b) throws IOException {
- out.write(b);
- numBytes++;
- }
- @Override
- public void write(byte[] b) throws IOException {
- out.write(b);
- numBytes += b.length;
- }
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- out.write(b, off, len);
- numBytes += len;
+ public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset) throws FileNotFoundException, IOException {
+ this(filenameBase, path, firstRecordOffset, 67108864);
}
- public long getNumBytesWritten() {
- return numBytes;
+ public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws FileNotFoundException, IOException {
+ super(filenameBase, path, firstRecordOffset, chunkThreshold);
}
- };
-
- private ArrayList chunks;
-
- // Default each chunk is 64MB of uncompressed data
- private long chunkThreshold;
-
- // Offset to the first record.
- // Set to non-zero if this file is part of a larger stream and you want
- // record offsets in the index to reflect the global offset rather than local
- private long firstRecordOffset;
-
- public BlockGZIPFileWriter(String filenameBase, String path) throws FileNotFoundException, IOException {
- this(filenameBase, path, 0, 67108864);
- }
-
- public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset) throws FileNotFoundException, IOException {
- this(filenameBase, path, firstRecordOffset, 67108864);
- }
-
- public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold)
- throws FileNotFoundException, IOException
- {
- this.filenameBase = filenameBase;
- this.path = path;
- this.firstRecordOffset = firstRecordOffset;
- this.chunkThreshold = chunkThreshold;
-
- chunks = new ArrayList();
-
- // Initialize first chunk
- Chunk ch = new Chunk();
- ch.firstOffset = firstRecordOffset;
- chunks.add(ch);
-
- // Explicitly truncate the file. On linux and OS X this appears to happen
- // anyway when opening with FileOutputStream but that behavior is not actually documented
- // or specified anywhere so let's be rigorous about it.
- FileOutputStream fos = new FileOutputStream(new File(getDataFilePath()));
- fos.getChannel().truncate(0);
-
- // Open file for writing and setup
- this.fileStream = new CountingOutputStream(fos);
- initChunkWriter();
- }
-
- private void initChunkWriter() throws IOException, UnsupportedEncodingException {
- gzipStream = new GZIPOutputStream(fileStream);
- writer = new BufferedWriter(new OutputStreamWriter(gzipStream, "UTF-8"));
- }
-
- private Chunk currentChunk() {
- return chunks.get(chunks.size() - 1);
- }
-
- public long getFirstRecordOffset() {
- return firstRecordOffset;
- }
-
- public String getDataFileName() {
- return String.format("%s-%012d.gz", filenameBase, firstRecordOffset);
- }
-
- public String getIndexFileName() {
- return String.format("%s-%012d.index.json", filenameBase, firstRecordOffset);
- }
-
- public String getDataFilePath() {
- return String.format("%s/%s", path, this.getDataFileName());
- }
-
- public String getIndexFilePath() {
- return String.format("%s/%s", path, this.getIndexFileName());
- }
- /**
- * Writes string to file, assuming this is a single record
- *
- * If there is no newline at then end we will add one
- */
- public void write(String record) throws IOException {
- Chunk ch = currentChunk();
-
- boolean hasNewLine = record.endsWith("\n");
-
- int rawBytesToWrite = record.length();
- if (!hasNewLine) {
- rawBytesToWrite += 1;
- }
-
- if ((ch.rawBytes + rawBytesToWrite) > chunkThreshold) {
- finishChunk();
- initChunkWriter();
-
- Chunk newCh = new Chunk();
- newCh.firstOffset = ch.firstOffset + ch.numRecords;
- newCh.byteOffset = ch.byteOffset + ch.compressedByteLength;
- chunks.add(newCh);
- ch = newCh;
- }
-
- writer.append(record);
- if (!hasNewLine) {
- writer.newLine();
- }
- ch.rawBytes += rawBytesToWrite;
- ch.numRecords++;
- }
-
- public void delete() throws IOException {
- deleteIfExists(getDataFilePath());
- deleteIfExists(getIndexFilePath());
- }
-
- private void deleteIfExists(String path) throws IOException {
- File f = new File(path);
- if (f.exists() && !f.isDirectory()) {
- f.delete();
- }
- }
-
- private void finishChunk() throws IOException {
- Chunk ch = currentChunk();
-
- // Complete GZIP block without closing stream
- writer.flush();
- gzipStream.finish();
-
- // We can no find out how long this chunk was compressed
- long bytesWritten = fileStream.getNumBytesWritten();
- ch.compressedByteLength = bytesWritten - ch.byteOffset;
- }
-
- public void close() throws IOException {
- // Flush last chunk, updating index
- finishChunk();
- // Now close the writer (and the whole stream stack)
- writer.close();
- writeIndex();
- }
-
- private void writeIndex() throws IOException {
- JSONArray chunkArr = new JSONArray();
-
- for (Chunk ch : chunks) {
- JSONObject chunkObj = new JSONObject();
- chunkObj.put("first_record_offset", ch.firstOffset);
- chunkObj.put("num_records", ch.numRecords);
- chunkObj.put("byte_offset", ch.byteOffset);
- chunkObj.put("byte_length", ch.compressedByteLength);
- chunkObj.put("byte_length_uncompressed", ch.rawBytes);
- chunkArr.add(chunkObj);
+ @Override
+ protected void initChunkWriter() throws IOException, UnsupportedEncodingException {
+ gzipStream = new GZIPOutputStream(fileStream);
+ writer = new BufferedWriter(new OutputStreamWriter(gzipStream, "UTF-8"));
}
- JSONObject index = new JSONObject();
- index.put("chunks", chunkArr);
+ @Override
+ protected void finishChunk() throws IOException {
+ Chunk ch = currentChunk();
- try (FileWriter file = new FileWriter(getIndexFilePath())) {
- file.write(index.toJSONString());
- file.close();
- }
- }
+ // Complete GZIP block without closing stream
+ writer.flush();
+ gzipStream.finish();
- public int getTotalUncompressedSize() {
- int totalBytes = 0;
- for (Chunk ch : chunks) {
- totalBytes += ch.rawBytes;
+ // We can no find out how long this chunk was compressed
+ long bytesWritten = fileStream.getNumBytesWritten();
+ ch.compressedByteLength = bytesWritten - ch.byteOffset;
}
- return totalBytes;
- }
- public int getNumChunks() {
- return chunks.size();
- }
-
- public int getNumRecords() {
- int totalRecords = 0;
- for (Chunk ch : chunks) {
- totalRecords += ch.numRecords;
+ @Override
+ public String getDataFileName() {
+ return String.format("%s-%012d.gz", filenameBase, super.getFirstRecordOffset());
}
- return totalRecords;
- }
-}
\ No newline at end of file
+}
diff --git a/src/main/java/com/deviantart/kafka_connect_s3/Chunk.java b/src/main/java/com/deviantart/kafka_connect_s3/Chunk.java
new file mode 100644
index 0000000..90f029f
--- /dev/null
+++ b/src/main/java/com/deviantart/kafka_connect_s3/Chunk.java
@@ -0,0 +1,9 @@
+package com.deviantart.kafka_connect_s3;
+
+class Chunk {
+ public long rawBytes = 0;
+ public long byteOffset = 0;
+ public long compressedByteLength = 0;
+ public long firstOffset = 0;
+ public long numRecords = 0;
+};
diff --git a/src/main/java/com/deviantart/kafka_connect_s3/CountingOutputStream.java b/src/main/java/com/deviantart/kafka_connect_s3/CountingOutputStream.java
new file mode 100644
index 0000000..be3be77
--- /dev/null
+++ b/src/main/java/com/deviantart/kafka_connect_s3/CountingOutputStream.java
@@ -0,0 +1,33 @@
+package com.deviantart.kafka_connect_s3;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+class CountingOutputStream extends FilterOutputStream {
+ private long numBytes = 0;
+
+ CountingOutputStream(OutputStream out) throws IOException {
+ super(out);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ numBytes++;
+ }
+ @Override
+ public void write(byte[] b) throws IOException {
+ out.write(b);
+ numBytes += b.length;
+ }
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ numBytes += len;
+ }
+
+ public long getNumBytesWritten() {
+ return numBytes;
+ }
+}
diff --git a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkConnector.java b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkConnector.java
index 649c8b3..385e18f 100644
--- a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkConnector.java
+++ b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkConnector.java
@@ -36,13 +36,16 @@ public class S3SinkConnector extends SinkConnector {
public static final String BUFFER_DIRECTORY_PATH_CONFIG = "local.buffer.dir";
+ public static final String COMPRESSION_TYPE = "compression.type";
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(MAX_BLOCK_SIZE_CONFIG, Type.LONG, DEFAULT_MAX_BLOCK_SIZE, Range.atLeast(0), Importance.LOW, "Maximum size of data chunks in bytes (before compression)")
+ .define(MAX_BLOCK_SIZE_CONFIG, Type.LONG, DEFAULT_MAX_BLOCK_SIZE, Range.atLeast(0), Importance.LOW, "Maximum size of data chunks in bytes (before compression).")
.define(S3_BUCKET_CONFIG, Type.STRING, Importance.HIGH, "Name of the S3 bucket")
- .define(S3_PREFIX_CONFIG, Type.STRING, "", Importance.HIGH, "Path prefix of files to be written to S3")
- .define(OVERRIDE_S3_ENDPOINT_CONFIG, Type.STRING, "", Importance.LOW, "Override the S3 URL endpoint")
- .define(S3_PATHSTYLE_CONFIG, Type.BOOLEAN, false, Importance.LOW, "Override the standard S3 URL style by placing the bucket name in the path instead of hostname")
- .define(BUFFER_DIRECTORY_PATH_CONFIG, Type.STRING, Importance.HIGH, "Path to directory to store data temporarily before uploading to S3")
+ .define(S3_PREFIX_CONFIG, Type.STRING, "", Importance.HIGH, "Path prefix of files to be written to S3.")
+ .define(OVERRIDE_S3_ENDPOINT_CONFIG, Type.STRING, "", Importance.LOW, "Override the S3 URL endpoint.")
+ .define(S3_PATHSTYLE_CONFIG, Type.BOOLEAN, false, Importance.LOW, "Override the standard S3 URL style by placing the bucket name in the path instead of hostname.")
+ .define(BUFFER_DIRECTORY_PATH_CONFIG, Type.STRING, Importance.HIGH, "Path to directory to store data temporarily before uploading to S3.")
+ .define(COMPRESSION_TYPE, Type.STRING, "gzip", Importance.HIGH, "The compression type to use, gzip and bzip2 supported. Defaults to gzip.")
;
private Map configProperties;
@@ -73,6 +76,7 @@ public List