From 7a04023b63ab61434ad3f39d8e29028bb9d7cd05 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Wed, 29 Mar 2017 19:05:33 +0100 Subject: [PATCH 1/6] Refactored block gzip file writer --- .../kafka_connect_s3/BlockFileWriter.java | 183 ++++++++++++++ .../kafka_connect_s3/BlockGZIPFileWriter.java | 233 ++---------------- .../deviantart/kafka_connect_s3/Chunk.java | 9 + .../CountingOutputStream.java | 33 +++ 4 files changed, 247 insertions(+), 211 deletions(-) create mode 100644 src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java create mode 100644 src/main/java/com/deviantart/kafka_connect_s3/Chunk.java create mode 100644 src/main/java/com/deviantart/kafka_connect_s3/CountingOutputStream.java diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java new file mode 100644 index 0000000..cacf92e --- /dev/null +++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java @@ -0,0 +1,183 @@ +package com.deviantart.kafka_connect_s3; + +import java.io.BufferedWriter; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.UnsupportedEncodingException; + +import java.util.ArrayList; +import java.util.zip.GZIPOutputStream; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; + +abstract class BlockFileWriter { + private String filenameBase; + private String path; + protected BufferedWriter writer; + protected CountingOutputStream fileStream; + + protected ArrayList chunks; + + // Default each chunk is 64MB of uncompressed data + private long chunkThreshold; + + // Offset to the first record. + // Set to non-zero if this file is part of a larger stream and you want + // record offsets in the index to reflect the global offset rather than local + private long firstRecordOffset; + + BlockFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws IOException { + this.filenameBase = filenameBase; + this.path = path; + this.firstRecordOffset = firstRecordOffset; + this.chunkThreshold = chunkThreshold; + + chunks = new ArrayList(); + + // Initialize first chunk + Chunk ch = new Chunk(); + ch.firstOffset = firstRecordOffset; + chunks.add(ch); + + // Explicitly truncate the file. On linux and OS X this appears to happen + // anyway when opening with FileOutputStream but that behavior is not actually documented + // or specified anywhere so let's be rigorous about it. + FileOutputStream fos = new FileOutputStream(new File(getDataFilePath())); + fos.getChannel().truncate(0); + + // Open file for writing and setup + this.fileStream = new CountingOutputStream(fos); + initChunkWriter(); + } + + abstract protected void initChunkWriter() throws IOException; + abstract protected void finishChunk() throws IOException; + + protected Chunk currentChunk() { + return chunks.get(chunks.size() - 1); + } + + public long getFirstRecordOffset() { + return firstRecordOffset; + } + + public String getDataFileName() { + return String.format("%s-%012d.gz", filenameBase, firstRecordOffset); + } + + public String getIndexFileName() { + return String.format("%s-%012d.index.json", filenameBase, firstRecordOffset); + } + + public String getDataFilePath() { + return String.format("%s/%s", path, this.getDataFileName()); + } + + public String getIndexFilePath() { + return String.format("%s/%s", path, this.getIndexFileName()); + } + + /** + * Writes string to file, assuming this is a single record + * + * If there is no newline at then end we will add one + */ + public void write(String record) throws IOException { + Chunk ch = currentChunk(); + + boolean hasNewLine = record.endsWith("\n"); + + int rawBytesToWrite = record.length(); + if (!hasNewLine) { + rawBytesToWrite += 1; + } + + if ((ch.rawBytes + rawBytesToWrite) > chunkThreshold) { + finishChunk(); + initChunkWriter(); + + Chunk newCh = new Chunk(); + newCh.firstOffset = ch.firstOffset + ch.numRecords; + newCh.byteOffset = ch.byteOffset + ch.compressedByteLength; + chunks.add(newCh); + ch = newCh; + } + + writer.append(record); + if (!hasNewLine) { + writer.newLine(); + } + ch.rawBytes += rawBytesToWrite; + ch.numRecords++; + } + + public void delete() throws IOException { + deleteIfExists(getDataFilePath()); + deleteIfExists(getIndexFilePath()); + } + + private void deleteIfExists(String path) throws IOException { + File f = new File(path); + if (f.exists() && !f.isDirectory()) { + f.delete(); + } + } + + public void close() throws IOException { + // Flush last chunk, updating index + finishChunk(); + // Now close the writer (and the whole stream stack) + writer.close(); + writeIndex(); + } + + private void writeIndex() throws IOException { + JSONArray chunkArr = new JSONArray(); + + for (Chunk ch : chunks) { + JSONObject chunkObj = new JSONObject(); + chunkObj.put("first_record_offset", ch.firstOffset); + chunkObj.put("num_records", ch.numRecords); + chunkObj.put("byte_offset", ch.byteOffset); + chunkObj.put("byte_length", ch.compressedByteLength); + chunkObj.put("byte_length_uncompressed", ch.rawBytes); + chunkArr.add(chunkObj); + } + + JSONObject index = new JSONObject(); + index.put("chunks", chunkArr); + + try (FileWriter file = new FileWriter(getIndexFilePath())) { + file.write(index.toJSONString()); + file.close(); + } + } + + public int getTotalUncompressedSize() { + int totalBytes = 0; + for (Chunk ch : chunks) { + totalBytes += ch.rawBytes; + } + return totalBytes; + } + + public int getNumChunks() { + return chunks.size(); + } + + public int getNumRecords() { + int totalRecords = 0; + for (Chunk ch : chunks) { + totalRecords += ch.numRecords; + } + return totalRecords; + } +} diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java index 1400307..8bd0c22 100644 --- a/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java +++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java @@ -35,227 +35,38 @@ * Note that thanks to GZIP spec, the overall file is perfectly valid and will compress as if it was a single stream * with any regular GZIP decoding library or program. */ -public class BlockGZIPFileWriter { - private String filenameBase; - private String path; - private GZIPOutputStream gzipStream; - private BufferedWriter writer; - private CountingOutputStream fileStream; +public class BlockGZIPFileWriter extends BlockFileWriter { - private class Chunk { - public long rawBytes = 0; - public long byteOffset = 0; - public long compressedByteLength = 0; - public long firstOffset = 0; - public long numRecords = 0; - }; + private GZIPOutputStream gzipStream; - private class CountingOutputStream extends FilterOutputStream { - private long numBytes = 0; - - CountingOutputStream(OutputStream out) throws IOException { - super(out); - } - - @Override - public void write(int b) throws IOException { - out.write(b); - numBytes++; - } - @Override - public void write(byte[] b) throws IOException { - out.write(b); - numBytes += b.length; - } - @Override - public void write(byte[] b, int off, int len) throws IOException { - out.write(b, off, len); - numBytes += len; - } - - public long getNumBytesWritten() { - return numBytes; - } - }; - - private ArrayList chunks; - - // Default each chunk is 64MB of uncompressed data - private long chunkThreshold; - - // Offset to the first record. - // Set to non-zero if this file is part of a larger stream and you want - // record offsets in the index to reflect the global offset rather than local - private long firstRecordOffset; - - public BlockGZIPFileWriter(String filenameBase, String path) throws FileNotFoundException, IOException { - this(filenameBase, path, 0, 67108864); - } - - public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset) throws FileNotFoundException, IOException { - this(filenameBase, path, firstRecordOffset, 67108864); - } - - public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) - throws FileNotFoundException, IOException - { - this.filenameBase = filenameBase; - this.path = path; - this.firstRecordOffset = firstRecordOffset; - this.chunkThreshold = chunkThreshold; - - chunks = new ArrayList(); - - // Initialize first chunk - Chunk ch = new Chunk(); - ch.firstOffset = firstRecordOffset; - chunks.add(ch); - - // Explicitly truncate the file. On linux and OS X this appears to happen - // anyway when opening with FileOutputStream but that behavior is not actually documented - // or specified anywhere so let's be rigorous about it. - FileOutputStream fos = new FileOutputStream(new File(getDataFilePath())); - fos.getChannel().truncate(0); - - // Open file for writing and setup - this.fileStream = new CountingOutputStream(fos); - initChunkWriter(); - } - - private void initChunkWriter() throws IOException, UnsupportedEncodingException { - gzipStream = new GZIPOutputStream(fileStream); - writer = new BufferedWriter(new OutputStreamWriter(gzipStream, "UTF-8")); - } - - private Chunk currentChunk() { - return chunks.get(chunks.size() - 1); - } - - public long getFirstRecordOffset() { - return firstRecordOffset; - } - - public String getDataFileName() { - return String.format("%s-%012d.gz", filenameBase, firstRecordOffset); - } - - public String getIndexFileName() { - return String.format("%s-%012d.index.json", filenameBase, firstRecordOffset); - } - - public String getDataFilePath() { - return String.format("%s/%s", path, this.getDataFileName()); - } - - public String getIndexFilePath() { - return String.format("%s/%s", path, this.getIndexFileName()); - } - - /** - * Writes string to file, assuming this is a single record - * - * If there is no newline at then end we will add one - */ - public void write(String record) throws IOException { - Chunk ch = currentChunk(); - - boolean hasNewLine = record.endsWith("\n"); - - int rawBytesToWrite = record.length(); - if (!hasNewLine) { - rawBytesToWrite += 1; - } - - if ((ch.rawBytes + rawBytesToWrite) > chunkThreshold) { - finishChunk(); - initChunkWriter(); - - Chunk newCh = new Chunk(); - newCh.firstOffset = ch.firstOffset + ch.numRecords; - newCh.byteOffset = ch.byteOffset + ch.compressedByteLength; - chunks.add(newCh); - ch = newCh; + public BlockGZIPFileWriter(String filenameBase, String path) throws FileNotFoundException, IOException { + this(filenameBase, path, 0, 67108864); } - writer.append(record); - if (!hasNewLine) { - writer.newLine(); + public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset) throws FileNotFoundException, IOException { + this(filenameBase, path, firstRecordOffset, 67108864); } - ch.rawBytes += rawBytesToWrite; - ch.numRecords++; - } - - public void delete() throws IOException { - deleteIfExists(getDataFilePath()); - deleteIfExists(getIndexFilePath()); - } - private void deleteIfExists(String path) throws IOException { - File f = new File(path); - if (f.exists() && !f.isDirectory()) { - f.delete(); + public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws FileNotFoundException, IOException { + super(filenameBase, path, firstRecordOffset, chunkThreshold); } - } - private void finishChunk() throws IOException { - Chunk ch = currentChunk(); - - // Complete GZIP block without closing stream - writer.flush(); - gzipStream.finish(); - - // We can no find out how long this chunk was compressed - long bytesWritten = fileStream.getNumBytesWritten(); - ch.compressedByteLength = bytesWritten - ch.byteOffset; - } - - public void close() throws IOException { - // Flush last chunk, updating index - finishChunk(); - // Now close the writer (and the whole stream stack) - writer.close(); - writeIndex(); - } - - private void writeIndex() throws IOException { - JSONArray chunkArr = new JSONArray(); - - for (Chunk ch : chunks) { - JSONObject chunkObj = new JSONObject(); - chunkObj.put("first_record_offset", ch.firstOffset); - chunkObj.put("num_records", ch.numRecords); - chunkObj.put("byte_offset", ch.byteOffset); - chunkObj.put("byte_length", ch.compressedByteLength); - chunkObj.put("byte_length_uncompressed", ch.rawBytes); - chunkArr.add(chunkObj); - } - - JSONObject index = new JSONObject(); - index.put("chunks", chunkArr); - - try (FileWriter file = new FileWriter(getIndexFilePath())) { - file.write(index.toJSONString()); - file.close(); + @Override + protected void initChunkWriter() throws IOException, UnsupportedEncodingException { + gzipStream = new GZIPOutputStream(fileStream); + writer = new BufferedWriter(new OutputStreamWriter(gzipStream, "UTF-8")); } - } - public int getTotalUncompressedSize() { - int totalBytes = 0; - for (Chunk ch : chunks) { - totalBytes += ch.rawBytes; - } - return totalBytes; - } + @Override + protected void finishChunk() throws IOException { + Chunk ch = currentChunk(); - public int getNumChunks() { - return chunks.size(); - } + // Complete GZIP block without closing stream + writer.flush(); + gzipStream.finish(); - public int getNumRecords() { - int totalRecords = 0; - for (Chunk ch : chunks) { - totalRecords += ch.numRecords; + // We can no find out how long this chunk was compressed + long bytesWritten = fileStream.getNumBytesWritten(); + ch.compressedByteLength = bytesWritten - ch.byteOffset; } - return totalRecords; - } -} \ No newline at end of file +} diff --git a/src/main/java/com/deviantart/kafka_connect_s3/Chunk.java b/src/main/java/com/deviantart/kafka_connect_s3/Chunk.java new file mode 100644 index 0000000..90f029f --- /dev/null +++ b/src/main/java/com/deviantart/kafka_connect_s3/Chunk.java @@ -0,0 +1,9 @@ +package com.deviantart.kafka_connect_s3; + +class Chunk { + public long rawBytes = 0; + public long byteOffset = 0; + public long compressedByteLength = 0; + public long firstOffset = 0; + public long numRecords = 0; +}; diff --git a/src/main/java/com/deviantart/kafka_connect_s3/CountingOutputStream.java b/src/main/java/com/deviantart/kafka_connect_s3/CountingOutputStream.java new file mode 100644 index 0000000..be3be77 --- /dev/null +++ b/src/main/java/com/deviantart/kafka_connect_s3/CountingOutputStream.java @@ -0,0 +1,33 @@ +package com.deviantart.kafka_connect_s3; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +class CountingOutputStream extends FilterOutputStream { + private long numBytes = 0; + + CountingOutputStream(OutputStream out) throws IOException { + super(out); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + numBytes++; + } + @Override + public void write(byte[] b) throws IOException { + out.write(b); + numBytes += b.length; + } + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + numBytes += len; + } + + public long getNumBytesWritten() { + return numBytes; + } +} From dcd16e55493bfbe27390eafeda3e07025b024dd2 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Mon, 3 Apr 2017 11:37:05 +0100 Subject: [PATCH 2/6] Added bzip2 file handling --- pom.xml | 5 + .../BlockBZIP2FileWriter.java | 69 ++++ .../kafka_connect_s3/BlockFileWriter.java | 7 +- .../kafka_connect_s3/BlockGZIPFileWriter.java | 7 +- .../BlockBZIP2FileWriterTest.java | 235 ++++++++++++ .../BlockFileWriterTestCommon.java | 21 ++ .../BlockGZIPFileWriterTest.java | 351 +++++++++--------- 7 files changed, 509 insertions(+), 186 deletions(-) create mode 100644 src/main/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriter.java create mode 100644 src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java create mode 100644 src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java diff --git a/pom.xml b/pom.xml index 4a97f85..ec5715e 100644 --- a/pom.xml +++ b/pom.xml @@ -83,6 +83,11 @@ aws-java-sdk-s3 ${s3.version} + + org.apache.commons + commons-compress + 1.13 + junit junit diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriter.java new file mode 100644 index 0000000..c13e50d --- /dev/null +++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriter.java @@ -0,0 +1,69 @@ +package com.deviantart.kafka_connect_s3; + +import java.io.BufferedWriter; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.UnsupportedEncodingException; + +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; + +/** + * BlockBZIP2FileWriter accumulates newline delimited UTF-8 records and writes them to an + * output file that is readable by BZIP2. + * + * In fact this file is the concatenation of possibly many separate BZIP2 files corresponding to smaller chunks + * of the input. Alongside the output filename.gz file, a file filename-index.json is written containing JSON + * metadata about the size and location of each block. + * + * This allows a reading class to skip to particular line/record without decompressing whole file by looking up + * the offset of the containing block, seeking to it and beginning BZIP2 read from there. + * + * This is especially useful when the file is an archive in HTTP storage like Amazon S3 where GET request with + * range headers can allow pulling a small segment from overall compressed file. + * + * Note that thanks to BZIP2 spec, the overall file is perfectly valid and will compress as if it was a single stream + * with any regular BZIP2 decoding library or program. + */ +public class BlockBZIP2FileWriter extends BlockFileWriter { + + private BZip2CompressorOutputStream bzip2Stream; + + public BlockBZIP2FileWriter(String filenameBase, String path) throws FileNotFoundException, IOException { + this(filenameBase, path, 0, 67108864); + } + + public BlockBZIP2FileWriter(String filenameBase, String path, long firstRecordOffset) throws FileNotFoundException, IOException { + this(filenameBase, path, firstRecordOffset, 67108864); + } + + public BlockBZIP2FileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws FileNotFoundException, IOException { + super(filenameBase, path, firstRecordOffset, chunkThreshold); + } + + @Override + protected void initChunkWriter() throws IOException, UnsupportedEncodingException { + bzip2Stream = new BZip2CompressorOutputStream(fileStream); + writer = new BufferedWriter(new OutputStreamWriter(bzip2Stream, "UTF-8")); + } + + @Override + protected void finishChunk() throws IOException { + Chunk ch = currentChunk(); + + // Complete GZIP block without closing stream + writer.flush(); + bzip2Stream.finish(); + + // We can no find out how long this chunk was compressed + long bytesWritten = fileStream.getNumBytesWritten(); + ch.compressedByteLength = bytesWritten - ch.byteOffset; + } + + @Override + public String getDataFileName() { + return String.format("%s-%012d.bzip2", filenameBase, super.getFirstRecordOffset()); + } +} diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java index cacf92e..0f39624 100644 --- a/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java +++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java @@ -19,7 +19,7 @@ import org.json.simple.JSONObject; abstract class BlockFileWriter { - private String filenameBase; + protected String filenameBase; private String path; protected BufferedWriter writer; protected CountingOutputStream fileStream; @@ -60,6 +60,7 @@ abstract class BlockFileWriter { abstract protected void initChunkWriter() throws IOException; abstract protected void finishChunk() throws IOException; + abstract protected String getDataFileName(); protected Chunk currentChunk() { return chunks.get(chunks.size() - 1); @@ -69,10 +70,6 @@ public long getFirstRecordOffset() { return firstRecordOffset; } - public String getDataFileName() { - return String.format("%s-%012d.gz", filenameBase, firstRecordOffset); - } - public String getIndexFileName() { return String.format("%s-%012d.index.json", filenameBase, firstRecordOffset); } diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java index 8bd0c22..185bdfb 100644 --- a/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java +++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java @@ -27,7 +27,7 @@ * metadata about the size and location of each block. * * This allows a reading class to skip to particular line/record without decompressing whole file by looking up - * the offset of the containing block, seeking to it and beginning GZIp read from there. + * the offset of the containing block, seeking to it and beginning GZIP read from there. * * This is especially useful when the file is an archive in HTTP storage like Amazon S3 where GET request with * range headers can allow pulling a small segment from overall compressed file. @@ -69,4 +69,9 @@ protected void finishChunk() throws IOException { long bytesWritten = fileStream.getNumBytesWritten(); ch.compressedByteLength = bytesWritten - ch.byteOffset; } + + @Override + public String getDataFileName() { + return String.format("%s-%012d.gz", filenameBase, super.getFirstRecordOffset()); + } } diff --git a/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java b/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java new file mode 100644 index 0000000..e0c1853 --- /dev/null +++ b/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java @@ -0,0 +1,235 @@ +package com.deviantart.kafka_connect_s3; + +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.InputStreamReader; +import java.io.RandomAccessFile; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class BlockBZIP2FileWriterTest extends BlockFileWriterTestCommon { + + private static final String tmpDirPrefix = "BlockBZIP2FileWriterTest"; + + @BeforeClass + public static void oneTimeSetUp() { + + String tempDir = System.getProperty("java.io.tmpdir"); + tmpDir = new File(tempDir, tmpDirPrefix).toString(); + + System.out.println("Temp dir for writer test is: " + tmpDir); + } + + @Test + public void testPaths() throws Exception { + BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("foo", tmpDir); + assertEquals(tmpDir + "/foo-000000000000.bzip2", w.getDataFilePath()); + assertEquals(tmpDir + "/foo-000000000000.index.json", w.getIndexFilePath()); + + + BlockBZIP2FileWriter w2 = new BlockBZIP2FileWriter("foo", tmpDir, 123456); + assertEquals(tmpDir + "/foo-000000123456.bzip2", w2.getDataFilePath()); + assertEquals(tmpDir + "/foo-000000123456.index.json", w2.getIndexFilePath()); + } + + @Test + public void testWrite() throws Exception { + // Very compressible 200 byte padding string to prepend to our unique line prefix + String pad = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; + + // Make a writer with artificially small chunk threshold of 1kb + BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("write-test", tmpDir, 987654321, 1000); + + int totalUncompressedBytes = 0; + String[] expectedLines = new String[50]; + // 50 records * 200 bytes padding should be at least 10 chunks worth + for (int i = 0; i < 50; i++) { + String line = String.format("Record %d %s", i, pad); + // Plus one for newline + totalUncompressedBytes += line.length() + 1; + // Expect to read without newlines... + expectedLines[i] = line; + // But add newlines to half the input to verify writer adds them only if needed + if (i % 2 == 0) { + line += "\n"; + } + w.write(line); + } + + assertEquals(totalUncompressedBytes, w.getTotalUncompressedSize()); + assertEquals(50, w.getNumRecords()); + assertTrue("Should be at least 10 chunks in output file", w.getNumChunks() >= 10); + + w.close(); + + verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines); + verifyIndexFile(w, 987654321, expectedLines); + } + + private void verifyOutputIsSaneBZIP2File(String filename, String[] expectedRecords) throws Exception { + BZip2CompressorInputStream zip = new BZip2CompressorInputStream(new FileInputStream(filename)); + BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); + + String line; + int i = 0; + while ((line = r.readLine()) != null) { + assertTrue(String.format("Output file has more lines than expected. Expected %d lines", expectedRecords.length) + , i < expectedRecords.length); + + String expectedLine = expectedRecords[i]; + assertEquals(String.format("Output file doesn't match, first difference on line %d", i), expectedLine, line); + i++; + } + } + + private void verifyIndexFile(BlockBZIP2FileWriter w, int startOffset, String[] expectedRecords) throws Exception { + JSONParser parser = new JSONParser(); + + Object obj = parser.parse(new FileReader(w.getIndexFilePath())); + JSONObject index = (JSONObject) obj; + JSONArray chunks = (JSONArray) index.get("chunks"); + + assertEquals(w.getNumChunks(), chunks.size()); + + RandomAccessFile file = new RandomAccessFile(w.getDataFilePath(), "r"); + + // Check we can read all the chunks as individual bzip2 segments + int expectedStartOffset = startOffset; + int recordIndex = 0; + int totalBytes = 0; + int chunkIndex = 0; + for (Object chunk : chunks) { + JSONObject chunkObj = (JSONObject) chunk; + int firstOffset = (int) (long) chunkObj.get("first_record_offset"); + int numRecords = (int) (long) chunkObj.get("num_records"); + int byteOffset = (int) (long) chunkObj.get("byte_offset"); + int byteLength = (int) (long) chunkObj.get("byte_length"); + + assertEquals(expectedStartOffset, firstOffset); + assertTrue(byteLength > 0); + assertTrue(byteOffset >= 0); + + // Read just that segment of the file into byte array and attempt to parse BZIP2 + byte[] buffer = new byte[byteLength]; + file.seek(byteOffset); + int numBytesRead = file.read(buffer); + + assertEquals(buffer.length, numBytesRead); + + BZip2CompressorInputStream zip = new BZip2CompressorInputStream(new ByteArrayInputStream(buffer)); + BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); + + int numRecordsActuallyInChunk = 0; + String line; + while ((line = r.readLine()) != null) { + assertEquals(expectedRecords[recordIndex], line); + recordIndex++; + numRecordsActuallyInChunk++; + } + + assertEquals(numRecordsActuallyInChunk, numRecords); + + totalBytes += byteLength; + + expectedStartOffset = firstOffset + numRecords; + + chunkIndex++; + } + + assertEquals("All chunks should cover all bytes in the file", totalBytes, file.length()); + } + + // Hmm this test is actually not very conclusive - on OS X and most linux file systems + // it passes anyway due to nature of filesystems. Not sure how to write something more robust + // though to validate that we definitiely truncate the files even if we write less data + + @Test + public void testShouldOverwrite() throws Exception { + // Make writer and write to it a bit. + { + BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("overwrite-test", tmpDir); + + // Write at least a few 4k blocks to disk so we can be sure that we don't + // only overwrite the first block. + String[] expectedLines = new String[5000]; + for (int i = 0; i < 5000; i++) { + String line = String.format("Record %d", i); + w.write(line); + expectedLines[i] = line; + } + + assertEquals(5000, w.getNumRecords()); + + w.close(); + + // Just check it actually write to disk + verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines); + verifyIndexFile(w, 0, expectedLines); + + } + + { + // Now make a whole new writer for same chunk + BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("overwrite-test", tmpDir); + + // Only write a few lines + String[] expectedLines2 = new String[10]; + for (int i = 0; i < 10; i++) { + String line = String.format("Overwrite record %d", i); + w.write(line); + expectedLines2[i] = line; + } + + assertEquals(10, w.getNumRecords()); + + w.close(); + + // No check output is only the 10 lines we just wrote + verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines2); + verifyIndexFile(w, 0, expectedLines2); + } + } + + @Test + public void testDelete() throws Exception { + // Make writer and write to it a bit. + BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("overwrite-test", tmpDir); + + String[] expectedLines = new String[5000]; + for (int i = 0; i < 5000; i++) { + String line = String.format("Record %d", i); + w.write(line); + expectedLines[i] = line; + } + + assertEquals(5000, w.getNumRecords()); + + w.close(); + + // Just check it actually write to disk + verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines); + verifyIndexFile(w, 0, expectedLines); + + // Now remove it + w.delete(); + + File dataF = new File(w.getDataFilePath()); + File idxF = new File(w.getIndexFilePath()); + + assertFalse("Data file should not exist after delete", dataF.exists()); + assertFalse("Index file should not exist after delete", idxF.exists()); + } +} diff --git a/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java b/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java new file mode 100644 index 0000000..cf04aee --- /dev/null +++ b/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java @@ -0,0 +1,21 @@ +package com.deviantart.kafka_connect_s3; + +import org.junit.Before; + +import java.io.File; + +public abstract class BlockFileWriterTestCommon { + + protected Class compressedFileWriterClass; + + protected static String tmpDir; + + @Before + public void setUp() throws Exception { + File f = new File(tmpDir); + + if (!f.exists()) { + f.mkdir(); + } + } +} diff --git a/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java b/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java index 55f4389..ca54181 100644 --- a/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java +++ b/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java @@ -1,9 +1,5 @@ package com.deviantart.kafka_connect_s3; -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.File; @@ -16,230 +12,225 @@ import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; -public class BlockGZIPFileWriterTest extends TestCase { - - private String tmpDirPrefix = "BlockGZIPFileWriterTest"; - private String tmpDir; - - public BlockGZIPFileWriterTest(String testName) { - super(testName); +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; - String tempDir = System.getProperty("java.io.tmpdir"); - this.tmpDir = new File(tempDir, tmpDirPrefix).toString(); +public class BlockGZIPFileWriterTest extends BlockFileWriterTestCommon { - System.out.println("Temp dir for writer test is: " + tmpDir); - } + private static final String tmpDirPrefix = "BlockGZIPFileWriterTest"; - /** - * @return the suite of tests being tested - */ - public static Test suite() { - return new TestSuite(BlockGZIPFileWriterTest.class); - } + @BeforeClass + public static void oneTimeSetUp() { - @Override - protected void setUp() throws Exception { - File f = new File(tmpDir); + String tempDir = System.getProperty("java.io.tmpdir"); + tmpDir = new File(tempDir, tmpDirPrefix).toString(); - if (!f.exists()) { - f.mkdir(); - } - } - - public void testPaths() throws Exception { - BlockGZIPFileWriter w = new BlockGZIPFileWriter("foo", tmpDir); - assertEquals(tmpDir + "/foo-000000000000.gz", w.getDataFilePath()); - assertEquals(tmpDir + "/foo-000000000000.index.json", w.getIndexFilePath()); - - - BlockGZIPFileWriter w2 = new BlockGZIPFileWriter("foo", tmpDir, 123456); - assertEquals(tmpDir + "/foo-000000123456.gz", w2.getDataFilePath()); - assertEquals(tmpDir + "/foo-000000123456.index.json", w2.getIndexFilePath()); - } - - public void testWrite() throws Exception { - // Very compressible 200 byte padding string to prepend to our unique line prefix - String pad = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" - + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; - - // Make a writer with artificially small chunk threshold of 1kb - BlockGZIPFileWriter w = new BlockGZIPFileWriter("write-test", tmpDir, 987654321, 1000); - - int totalUncompressedBytes = 0; - String[] expectedLines = new String[50]; - // 50 records * 200 bytes padding should be at least 10 chunks worth - for (int i = 0; i < 50; i++) { - String line = String.format("Record %d %s", i, pad); - // Plus one for newline - totalUncompressedBytes += line.length() + 1; - // Expect to read without newlines... - expectedLines[i] = line; - // But add newlines to half the input to verify writer adds them only if needed - if (i % 2 == 0) { - line += "\n"; - } - w.write(line); + System.out.println("Temp dir for writer test is: " + tmpDir); } - assertEquals(totalUncompressedBytes, w.getTotalUncompressedSize()); - assertEquals(50, w.getNumRecords()); - assertTrue("Should be at least 10 chunks in output file", w.getNumChunks() >= 10); + @Test + public void testPaths() throws Exception { + BlockGZIPFileWriter w = new BlockGZIPFileWriter("foo", tmpDir); + assertEquals(tmpDir + "/foo-000000000000.gz", w.getDataFilePath()); + assertEquals(tmpDir + "/foo-000000000000.index.json", w.getIndexFilePath()); - w.close(); - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); - verifyIndexFile(w, 987654321, expectedLines); - } + BlockGZIPFileWriter w2 = new BlockGZIPFileWriter("foo", tmpDir, 123456); + assertEquals(tmpDir + "/foo-000000123456.gz", w2.getDataFilePath()); + assertEquals(tmpDir + "/foo-000000123456.index.json", w2.getIndexFilePath()); + } - private void verifyOutputIsSaneGZIPFile(String filename, String[] expectedRecords) throws Exception { - GZIPInputStream zip = new GZIPInputStream(new FileInputStream(filename)); - BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); + @Test + public void testWrite() throws Exception { + // Very compressible 200 byte padding string to prepend to our unique line prefix + String pad = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; + + // Make a writer with artificially small chunk threshold of 1kb + BlockGZIPFileWriter w = new BlockGZIPFileWriter("write-test", tmpDir, 987654321, 1000); + + int totalUncompressedBytes = 0; + String[] expectedLines = new String[50]; + // 50 records * 200 bytes padding should be at least 10 chunks worth + for (int i = 0; i < 50; i++) { + String line = String.format("Record %d %s", i, pad); + // Plus one for newline + totalUncompressedBytes += line.length() + 1; + // Expect to read without newlines... + expectedLines[i] = line; + // But add newlines to half the input to verify writer adds them only if needed + if (i % 2 == 0) { + line += "\n"; + } + w.write(line); + } + + assertEquals(totalUncompressedBytes, w.getTotalUncompressedSize()); + assertEquals(50, w.getNumRecords()); + assertTrue("Should be at least 10 chunks in output file", w.getNumChunks() >= 10); + + w.close(); + + verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); + verifyIndexFile(w, 987654321, expectedLines); + } - String line; - int i = 0; - while ((line = r.readLine()) != null) { - assertTrue( String.format("Output file has more lines than expected. Expected %d lines", expectedRecords.length) - , i < expectedRecords.length); + private void verifyOutputIsSaneGZIPFile(String filename, String[] expectedRecords) throws Exception { + GZIPInputStream zip = new GZIPInputStream(new FileInputStream(filename)); + BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); - String expectedLine = expectedRecords[i]; - assertEquals(String.format("Output file doesn't match, first difference on line %d", i), expectedLine, line); - i++; + String line; + int i = 0; + while ((line = r.readLine()) != null) { + assertTrue(String.format("Output file has more lines than expected. Expected %d lines", expectedRecords.length) + , i < expectedRecords.length); + + String expectedLine = expectedRecords[i]; + assertEquals(String.format("Output file doesn't match, first difference on line %d", i), expectedLine, line); + i++; + } } - } - private void verifyIndexFile(BlockGZIPFileWriter w, int startOffset, String[] expectedRecords) throws Exception { - JSONParser parser = new JSONParser(); + private void verifyIndexFile(BlockGZIPFileWriter w, int startOffset, String[] expectedRecords) throws Exception { + JSONParser parser = new JSONParser(); + + Object obj = parser.parse(new FileReader(w.getIndexFilePath())); + JSONObject index = (JSONObject) obj; + JSONArray chunks = (JSONArray) index.get("chunks"); - Object obj = parser.parse(new FileReader(w.getIndexFilePath())); - JSONObject index = (JSONObject) obj; - JSONArray chunks = (JSONArray) index.get("chunks"); + assertEquals(w.getNumChunks(), chunks.size()); - assertEquals(w.getNumChunks(), chunks.size()); + RandomAccessFile file = new RandomAccessFile(w.getDataFilePath(), "r"); - RandomAccessFile file = new RandomAccessFile(w.getDataFilePath(), "r"); + // Check we can read all the chunks as individual gzip segments + int expectedStartOffset = startOffset; + int recordIndex = 0; + int totalBytes = 0; + int chunkIndex = 0; + for (Object chunk : chunks) { + JSONObject chunkObj = (JSONObject) chunk; + int firstOffset = (int) (long) chunkObj.get("first_record_offset"); + int numRecords = (int) (long) chunkObj.get("num_records"); + int byteOffset = (int) (long) chunkObj.get("byte_offset"); + int byteLength = (int) (long) chunkObj.get("byte_length"); - // Check we can read all the chunks as individual gzip segments - int expectedStartOffset = startOffset; - int recordIndex = 0; - int totalBytes = 0; - int chunkIndex = 0; - for (Object chunk : chunks) { - JSONObject chunkObj = (JSONObject) chunk; - int firstOffset = (int)(long) chunkObj.get("first_record_offset"); - int numRecords = (int)(long) chunkObj.get("num_records"); - int byteOffset = (int)(long) chunkObj.get("byte_offset"); - int byteLength = (int)(long) chunkObj.get("byte_length"); + assertEquals(expectedStartOffset, firstOffset); + assertTrue(byteLength > 0); + assertTrue(byteOffset >= 0); - assertEquals(expectedStartOffset, firstOffset); - assertTrue(byteLength > 0); - assertTrue(byteOffset >= 0); + // Read just that segment of the file into byte array and attempt to parse GZIP + byte[] buffer = new byte[byteLength]; + file.seek(byteOffset); + int numBytesRead = file.read(buffer); - // Read just that segment of the file into byte array and attempt to parse GZIP - byte[] buffer = new byte[byteLength]; - file.seek(byteOffset); - int numBytesRead = file.read(buffer); + assertEquals(buffer.length, numBytesRead); - assertEquals(buffer.length, numBytesRead); + GZIPInputStream zip = new GZIPInputStream(new ByteArrayInputStream(buffer)); + BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); - GZIPInputStream zip = new GZIPInputStream(new ByteArrayInputStream(buffer)); - BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); + int numRecordsActuallyInChunk = 0; + String line; + while ((line = r.readLine()) != null) { + assertEquals(expectedRecords[recordIndex], line); + recordIndex++; + numRecordsActuallyInChunk++; + } - int numRecordsActuallyInChunk = 0; - String line; - while ((line = r.readLine()) != null) { - assertEquals(expectedRecords[recordIndex], line); - recordIndex++; - numRecordsActuallyInChunk++; - } + assertEquals(numRecordsActuallyInChunk, numRecords); - assertEquals(numRecordsActuallyInChunk, numRecords); + totalBytes += byteLength; - totalBytes += byteLength; + expectedStartOffset = firstOffset + numRecords; - expectedStartOffset = firstOffset + numRecords; + chunkIndex++; + } - chunkIndex++; + assertEquals("All chunks should cover all bytes in the file", totalBytes, file.length()); } - assertEquals("All chunks should cover all bytes in the file", totalBytes, file.length()); - } + // Hmm this test is actually not very conclusive - on OS X and most linux file systems + // it passes anyway due to nature of filesystems. Not sure how to write something more robust + // though to validate that we definitiely truncate the files even if we write less data - // Hmm this test is actually not very conclusive - on OS X and most linux file systems - // it passes anyway due to nature of filesystems. Not sure how to write something more robust - // though to validate that we definitiely truncate the files even if we write less data - public void testShouldOverwrite() throws Exception { - // Make writer and write to it a bit. - { - BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); + @Test + public void testShouldOverwrite() throws Exception { + // Make writer and write to it a bit. + { + BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); - // Write at least a few 4k blocks to disk so we can be sure that we don't - // only overwrite the first block. - String[] expectedLines = new String[5000]; - for (int i = 0; i < 5000; i++) { - String line = String.format("Record %d", i); - w.write(line); - expectedLines[i] = line; - } + // Write at least a few 4k blocks to disk so we can be sure that we don't + // only overwrite the first block. + String[] expectedLines = new String[5000]; + for (int i = 0; i < 5000; i++) { + String line = String.format("Record %d", i); + w.write(line); + expectedLines[i] = line; + } - assertEquals(5000, w.getNumRecords()); + assertEquals(5000, w.getNumRecords()); - w.close(); + w.close(); - // Just check it actually write to disk - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); - verifyIndexFile(w, 0, expectedLines); + // Just check it actually write to disk + verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); + verifyIndexFile(w, 0, expectedLines); - } + } - { - // Now make a whole new writer for same chunk - BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); + { + // Now make a whole new writer for same chunk + BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); - // Only write a few lines - String[] expectedLines2 = new String[10]; - for (int i = 0; i < 10; i++) { - String line = String.format("Overwrite record %d", i); - w.write(line); - expectedLines2[i] = line; - } + // Only write a few lines + String[] expectedLines2 = new String[10]; + for (int i = 0; i < 10; i++) { + String line = String.format("Overwrite record %d", i); + w.write(line); + expectedLines2[i] = line; + } - assertEquals(10, w.getNumRecords()); + assertEquals(10, w.getNumRecords()); - w.close(); + w.close(); - // No check output is only the 10 lines we just wrote - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines2); - verifyIndexFile(w, 0, expectedLines2); + // No check output is only the 10 lines we just wrote + verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines2); + verifyIndexFile(w, 0, expectedLines2); + } } - } - public void testDelete() throws Exception { - // Make writer and write to it a bit. - BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); + @Test + public void testDelete() throws Exception { + // Make writer and write to it a bit. + BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); - String[] expectedLines = new String[5000]; - for (int i = 0; i < 5000; i++) { - String line = String.format("Record %d", i); - w.write(line); - expectedLines[i] = line; - } + String[] expectedLines = new String[5000]; + for (int i = 0; i < 5000; i++) { + String line = String.format("Record %d", i); + w.write(line); + expectedLines[i] = line; + } - assertEquals(5000, w.getNumRecords()); + assertEquals(5000, w.getNumRecords()); - w.close(); + w.close(); - // Just check it actually write to disk - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); - verifyIndexFile(w, 0, expectedLines); + // Just check it actually write to disk + verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); + verifyIndexFile(w, 0, expectedLines); - // Now remove it - w.delete(); + // Now remove it + w.delete(); - File dataF = new File(w.getDataFilePath()); - File idxF = new File(w.getIndexFilePath()); + File dataF = new File(w.getDataFilePath()); + File idxF = new File(w.getIndexFilePath()); - assertFalse("Data file should not exist after delete", dataF.exists()); - assertFalse("Index file should not exist after delete", idxF.exists()); - } + assertFalse("Data file should not exist after delete", dataF.exists()); + assertFalse("Index file should not exist after delete", idxF.exists()); + } } From 4fa1a654bad38fcdf9d68c57666efe2e9da33ed5 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Mon, 3 Apr 2017 12:01:13 +0100 Subject: [PATCH 3/6] Refactored tests --- .../BlockBZIP2FileWriterTest.java | 119 ++++------------ .../BlockFileWriterTestCommon.java | 94 ++++++++++++- .../BlockGZIPFileWriterTest.java | 130 +++++------------- 3 files changed, 156 insertions(+), 187 deletions(-) diff --git a/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java b/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java index e0c1853..263d94d 100644 --- a/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java +++ b/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java @@ -1,19 +1,12 @@ package com.deviantart.kafka_connect_s3; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; import org.junit.BeforeClass; import org.junit.Test; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.FileReader; -import java.io.InputStreamReader; -import java.io.RandomAccessFile; +import java.io.IOException; +import java.io.InputStream; import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; @@ -32,14 +25,33 @@ public static void oneTimeSetUp() { System.out.println("Temp dir for writer test is: " + tmpDir); } + @Override + protected BlockFileWriter newBlockFileWriter(String filenameBase, String path) throws IOException { + return new BlockBZIP2FileWriter(filenameBase, path); + } + + @Override + protected BlockFileWriter newBlockFileWriter(String filenameBase, String path, long firstRecordOffset) throws IOException { + return new BlockBZIP2FileWriter(filenameBase, path, firstRecordOffset); + } + + @Override + protected BlockFileWriter newBlockFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws IOException { + return new BlockBZIP2FileWriter(filenameBase, path, firstRecordOffset, chunkThreshold); + } + + protected InputStream newCompressorInputStream(InputStream in) throws IOException { + return new BZip2CompressorInputStream(in); + } + @Test public void testPaths() throws Exception { - BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("foo", tmpDir); + BlockFileWriter w = newBlockFileWriter("foo", tmpDir); assertEquals(tmpDir + "/foo-000000000000.bzip2", w.getDataFilePath()); assertEquals(tmpDir + "/foo-000000000000.index.json", w.getIndexFilePath()); - BlockBZIP2FileWriter w2 = new BlockBZIP2FileWriter("foo", tmpDir, 123456); + BlockFileWriter w2 = newBlockFileWriter("foo", tmpDir, 123456); assertEquals(tmpDir + "/foo-000000123456.bzip2", w2.getDataFilePath()); assertEquals(tmpDir + "/foo-000000123456.index.json", w2.getIndexFilePath()); } @@ -51,7 +63,7 @@ public void testWrite() throws Exception { + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; // Make a writer with artificially small chunk threshold of 1kb - BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("write-test", tmpDir, 987654321, 1000); + BlockFileWriter w = newBlockFileWriter("write-test", tmpDir, 987654321, 1000); int totalUncompressedBytes = 0; String[] expectedLines = new String[50]; @@ -75,82 +87,11 @@ public void testWrite() throws Exception { w.close(); - verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines); verifyIndexFile(w, 987654321, expectedLines); } - private void verifyOutputIsSaneBZIP2File(String filename, String[] expectedRecords) throws Exception { - BZip2CompressorInputStream zip = new BZip2CompressorInputStream(new FileInputStream(filename)); - BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); - - String line; - int i = 0; - while ((line = r.readLine()) != null) { - assertTrue(String.format("Output file has more lines than expected. Expected %d lines", expectedRecords.length) - , i < expectedRecords.length); - String expectedLine = expectedRecords[i]; - assertEquals(String.format("Output file doesn't match, first difference on line %d", i), expectedLine, line); - i++; - } - } - - private void verifyIndexFile(BlockBZIP2FileWriter w, int startOffset, String[] expectedRecords) throws Exception { - JSONParser parser = new JSONParser(); - - Object obj = parser.parse(new FileReader(w.getIndexFilePath())); - JSONObject index = (JSONObject) obj; - JSONArray chunks = (JSONArray) index.get("chunks"); - - assertEquals(w.getNumChunks(), chunks.size()); - - RandomAccessFile file = new RandomAccessFile(w.getDataFilePath(), "r"); - - // Check we can read all the chunks as individual bzip2 segments - int expectedStartOffset = startOffset; - int recordIndex = 0; - int totalBytes = 0; - int chunkIndex = 0; - for (Object chunk : chunks) { - JSONObject chunkObj = (JSONObject) chunk; - int firstOffset = (int) (long) chunkObj.get("first_record_offset"); - int numRecords = (int) (long) chunkObj.get("num_records"); - int byteOffset = (int) (long) chunkObj.get("byte_offset"); - int byteLength = (int) (long) chunkObj.get("byte_length"); - - assertEquals(expectedStartOffset, firstOffset); - assertTrue(byteLength > 0); - assertTrue(byteOffset >= 0); - - // Read just that segment of the file into byte array and attempt to parse BZIP2 - byte[] buffer = new byte[byteLength]; - file.seek(byteOffset); - int numBytesRead = file.read(buffer); - - assertEquals(buffer.length, numBytesRead); - - BZip2CompressorInputStream zip = new BZip2CompressorInputStream(new ByteArrayInputStream(buffer)); - BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); - - int numRecordsActuallyInChunk = 0; - String line; - while ((line = r.readLine()) != null) { - assertEquals(expectedRecords[recordIndex], line); - recordIndex++; - numRecordsActuallyInChunk++; - } - - assertEquals(numRecordsActuallyInChunk, numRecords); - - totalBytes += byteLength; - - expectedStartOffset = firstOffset + numRecords; - - chunkIndex++; - } - - assertEquals("All chunks should cover all bytes in the file", totalBytes, file.length()); - } // Hmm this test is actually not very conclusive - on OS X and most linux file systems // it passes anyway due to nature of filesystems. Not sure how to write something more robust @@ -176,14 +117,14 @@ public void testShouldOverwrite() throws Exception { w.close(); // Just check it actually write to disk - verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines); verifyIndexFile(w, 0, expectedLines); } { // Now make a whole new writer for same chunk - BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("overwrite-test", tmpDir); + BlockFileWriter w = newBlockFileWriter("overwrite-test", tmpDir); // Only write a few lines String[] expectedLines2 = new String[10]; @@ -198,7 +139,7 @@ public void testShouldOverwrite() throws Exception { w.close(); // No check output is only the 10 lines we just wrote - verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines2); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines2); verifyIndexFile(w, 0, expectedLines2); } } @@ -206,7 +147,7 @@ public void testShouldOverwrite() throws Exception { @Test public void testDelete() throws Exception { // Make writer and write to it a bit. - BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("overwrite-test", tmpDir); + BlockFileWriter w = newBlockFileWriter("overwrite-test", tmpDir); String[] expectedLines = new String[5000]; for (int i = 0; i < 5000; i++) { @@ -220,7 +161,7 @@ public void testDelete() throws Exception { w.close(); // Just check it actually write to disk - verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines); verifyIndexFile(w, 0, expectedLines); // Now remove it diff --git a/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java b/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java index cf04aee..ffb0950 100644 --- a/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java +++ b/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java @@ -1,12 +1,24 @@ package com.deviantart.kafka_connect_s3; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; import org.junit.Before; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.RandomAccessFile; -public abstract class BlockFileWriterTestCommon { +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; - protected Class compressedFileWriterClass; +public abstract class BlockFileWriterTestCommon { protected static String tmpDir; @@ -18,4 +30,82 @@ public void setUp() throws Exception { f.mkdir(); } } + + abstract protected BlockFileWriter newBlockFileWriter(String filenameBase, String path) throws IOException; + abstract protected BlockFileWriter newBlockFileWriter(String filenameBase, String path, long firstRecordOffset) throws IOException; + abstract protected BlockFileWriter newBlockFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws IOException; + abstract protected InputStream newCompressorInputStream(InputStream in) throws IOException; + + protected void verifyOutputIsSaneCompressedFile(String filename, String[] expectedRecords) throws Exception { + InputStream zip = newCompressorInputStream(new FileInputStream(filename)); + BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); + + String line; + int i = 0; + while ((line = r.readLine()) != null) { + assertTrue(String.format("Output file has more lines than expected. Expected %d lines", expectedRecords.length) + , i < expectedRecords.length); + + String expectedLine = expectedRecords[i]; + assertEquals(String.format("Output file doesn't match, first difference on line %d", i), expectedLine, line); + i++; + } + } + + protected void verifyIndexFile(BlockFileWriter w, int startOffset, String[] expectedRecords) throws Exception { + JSONParser parser = new JSONParser(); + + Object obj = parser.parse(new FileReader(w.getIndexFilePath())); + JSONObject index = (JSONObject) obj; + JSONArray chunks = (JSONArray) index.get("chunks"); + + assertEquals(w.getNumChunks(), chunks.size()); + + RandomAccessFile file = new RandomAccessFile(w.getDataFilePath(), "r"); + + // Check we can read all the chunks as individual bzip2 segments + int expectedStartOffset = startOffset; + int recordIndex = 0; + int totalBytes = 0; + int chunkIndex = 0; + for (Object chunk : chunks) { + JSONObject chunkObj = (JSONObject) chunk; + int firstOffset = (int) (long) chunkObj.get("first_record_offset"); + int numRecords = (int) (long) chunkObj.get("num_records"); + int byteOffset = (int) (long) chunkObj.get("byte_offset"); + int byteLength = (int) (long) chunkObj.get("byte_length"); + + assertEquals(expectedStartOffset, firstOffset); + assertTrue(byteLength > 0); + assertTrue(byteOffset >= 0); + + // Read just that segment of the file into byte array and attempt to parse BZIP2 + byte[] buffer = new byte[byteLength]; + file.seek(byteOffset); + int numBytesRead = file.read(buffer); + + assertEquals(buffer.length, numBytesRead); + + InputStream zip = newCompressorInputStream(new ByteArrayInputStream(buffer)); + BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); + + int numRecordsActuallyInChunk = 0; + String line; + while ((line = r.readLine()) != null) { + assertEquals(expectedRecords[recordIndex], line); + recordIndex++; + numRecordsActuallyInChunk++; + } + + assertEquals(numRecordsActuallyInChunk, numRecords); + + totalBytes += byteLength; + + expectedStartOffset = firstOffset + numRecords; + + chunkIndex++; + } + + assertEquals("All chunks should cover all bytes in the file", totalBytes, file.length()); + } } diff --git a/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java b/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java index ca54181..28820c8 100644 --- a/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java +++ b/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java @@ -1,21 +1,13 @@ package com.deviantart.kafka_connect_s3; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileReader; -import java.io.InputStreamReader; -import java.io.RandomAccessFile; -import java.util.zip.GZIPInputStream; - -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; + import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -33,14 +25,33 @@ public static void oneTimeSetUp() { System.out.println("Temp dir for writer test is: " + tmpDir); } + @Override + protected BlockFileWriter newBlockFileWriter(String filenameBase, String path) throws IOException { + return new BlockGZIPFileWriter(filenameBase, path); + } + + @Override + protected BlockFileWriter newBlockFileWriter(String filenameBase, String path, long firstRecordOffset) throws IOException { + return new BlockGZIPFileWriter(filenameBase, path, firstRecordOffset); + } + + @Override + protected BlockFileWriter newBlockFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws IOException { + return new BlockGZIPFileWriter(filenameBase, path, firstRecordOffset, chunkThreshold); + } + + protected InputStream newCompressorInputStream(InputStream in) throws IOException { + return new GZIPInputStream(in); + } + @Test public void testPaths() throws Exception { - BlockGZIPFileWriter w = new BlockGZIPFileWriter("foo", tmpDir); + BlockFileWriter w = newBlockFileWriter("foo", tmpDir); assertEquals(tmpDir + "/foo-000000000000.gz", w.getDataFilePath()); assertEquals(tmpDir + "/foo-000000000000.index.json", w.getIndexFilePath()); - BlockGZIPFileWriter w2 = new BlockGZIPFileWriter("foo", tmpDir, 123456); + BlockFileWriter w2 = newBlockFileWriter("foo", tmpDir, 123456); assertEquals(tmpDir + "/foo-000000123456.gz", w2.getDataFilePath()); assertEquals(tmpDir + "/foo-000000123456.index.json", w2.getIndexFilePath()); } @@ -52,7 +63,7 @@ public void testWrite() throws Exception { + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; // Make a writer with artificially small chunk threshold of 1kb - BlockGZIPFileWriter w = new BlockGZIPFileWriter("write-test", tmpDir, 987654321, 1000); + BlockFileWriter w = newBlockFileWriter("write-test", tmpDir, 987654321, 1000); int totalUncompressedBytes = 0; String[] expectedLines = new String[50]; @@ -76,83 +87,10 @@ public void testWrite() throws Exception { w.close(); - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines); verifyIndexFile(w, 987654321, expectedLines); } - private void verifyOutputIsSaneGZIPFile(String filename, String[] expectedRecords) throws Exception { - GZIPInputStream zip = new GZIPInputStream(new FileInputStream(filename)); - BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); - - String line; - int i = 0; - while ((line = r.readLine()) != null) { - assertTrue(String.format("Output file has more lines than expected. Expected %d lines", expectedRecords.length) - , i < expectedRecords.length); - - String expectedLine = expectedRecords[i]; - assertEquals(String.format("Output file doesn't match, first difference on line %d", i), expectedLine, line); - i++; - } - } - - private void verifyIndexFile(BlockGZIPFileWriter w, int startOffset, String[] expectedRecords) throws Exception { - JSONParser parser = new JSONParser(); - - Object obj = parser.parse(new FileReader(w.getIndexFilePath())); - JSONObject index = (JSONObject) obj; - JSONArray chunks = (JSONArray) index.get("chunks"); - - assertEquals(w.getNumChunks(), chunks.size()); - - RandomAccessFile file = new RandomAccessFile(w.getDataFilePath(), "r"); - - // Check we can read all the chunks as individual gzip segments - int expectedStartOffset = startOffset; - int recordIndex = 0; - int totalBytes = 0; - int chunkIndex = 0; - for (Object chunk : chunks) { - JSONObject chunkObj = (JSONObject) chunk; - int firstOffset = (int) (long) chunkObj.get("first_record_offset"); - int numRecords = (int) (long) chunkObj.get("num_records"); - int byteOffset = (int) (long) chunkObj.get("byte_offset"); - int byteLength = (int) (long) chunkObj.get("byte_length"); - - assertEquals(expectedStartOffset, firstOffset); - assertTrue(byteLength > 0); - assertTrue(byteOffset >= 0); - - // Read just that segment of the file into byte array and attempt to parse GZIP - byte[] buffer = new byte[byteLength]; - file.seek(byteOffset); - int numBytesRead = file.read(buffer); - - assertEquals(buffer.length, numBytesRead); - - GZIPInputStream zip = new GZIPInputStream(new ByteArrayInputStream(buffer)); - BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); - - int numRecordsActuallyInChunk = 0; - String line; - while ((line = r.readLine()) != null) { - assertEquals(expectedRecords[recordIndex], line); - recordIndex++; - numRecordsActuallyInChunk++; - } - - assertEquals(numRecordsActuallyInChunk, numRecords); - - totalBytes += byteLength; - - expectedStartOffset = firstOffset + numRecords; - - chunkIndex++; - } - - assertEquals("All chunks should cover all bytes in the file", totalBytes, file.length()); - } - // Hmm this test is actually not very conclusive - on OS X and most linux file systems // it passes anyway due to nature of filesystems. Not sure how to write something more robust // though to validate that we definitiely truncate the files even if we write less data @@ -161,7 +99,7 @@ private void verifyIndexFile(BlockGZIPFileWriter w, int startOffset, String[] ex public void testShouldOverwrite() throws Exception { // Make writer and write to it a bit. { - BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); + BlockFileWriter w = newBlockFileWriter("overwrite-test", tmpDir); // Write at least a few 4k blocks to disk so we can be sure that we don't // only overwrite the first block. @@ -177,14 +115,14 @@ public void testShouldOverwrite() throws Exception { w.close(); // Just check it actually write to disk - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines); verifyIndexFile(w, 0, expectedLines); } { // Now make a whole new writer for same chunk - BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); + BlockFileWriter w = newBlockFileWriter("overwrite-test", tmpDir); // Only write a few lines String[] expectedLines2 = new String[10]; @@ -199,7 +137,7 @@ public void testShouldOverwrite() throws Exception { w.close(); // No check output is only the 10 lines we just wrote - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines2); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines2); verifyIndexFile(w, 0, expectedLines2); } } @@ -207,7 +145,7 @@ public void testShouldOverwrite() throws Exception { @Test public void testDelete() throws Exception { // Make writer and write to it a bit. - BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); + BlockFileWriter w = newBlockFileWriter("overwrite-test", tmpDir); String[] expectedLines = new String[5000]; for (int i = 0; i < 5000; i++) { @@ -221,7 +159,7 @@ public void testDelete() throws Exception { w.close(); // Just check it actually write to disk - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines); verifyIndexFile(w, 0, expectedLines); // Now remove it From fbc4803d45983ad3191814cf264db84520f1c856 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Mon, 3 Apr 2017 12:12:05 +0100 Subject: [PATCH 4/6] Added compression type config --- .../kafka_connect_s3/S3SinkConnector.java | 14 +++++++---- .../kafka_connect_s3/S3SinkTask.java | 24 ++++++++++++------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkConnector.java b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkConnector.java index 649c8b3..385e18f 100644 --- a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkConnector.java +++ b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkConnector.java @@ -36,13 +36,16 @@ public class S3SinkConnector extends SinkConnector { public static final String BUFFER_DIRECTORY_PATH_CONFIG = "local.buffer.dir"; + public static final String COMPRESSION_TYPE = "compression.type"; + public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(MAX_BLOCK_SIZE_CONFIG, Type.LONG, DEFAULT_MAX_BLOCK_SIZE, Range.atLeast(0), Importance.LOW, "Maximum size of data chunks in bytes (before compression)") + .define(MAX_BLOCK_SIZE_CONFIG, Type.LONG, DEFAULT_MAX_BLOCK_SIZE, Range.atLeast(0), Importance.LOW, "Maximum size of data chunks in bytes (before compression).") .define(S3_BUCKET_CONFIG, Type.STRING, Importance.HIGH, "Name of the S3 bucket") - .define(S3_PREFIX_CONFIG, Type.STRING, "", Importance.HIGH, "Path prefix of files to be written to S3") - .define(OVERRIDE_S3_ENDPOINT_CONFIG, Type.STRING, "", Importance.LOW, "Override the S3 URL endpoint") - .define(S3_PATHSTYLE_CONFIG, Type.BOOLEAN, false, Importance.LOW, "Override the standard S3 URL style by placing the bucket name in the path instead of hostname") - .define(BUFFER_DIRECTORY_PATH_CONFIG, Type.STRING, Importance.HIGH, "Path to directory to store data temporarily before uploading to S3") + .define(S3_PREFIX_CONFIG, Type.STRING, "", Importance.HIGH, "Path prefix of files to be written to S3.") + .define(OVERRIDE_S3_ENDPOINT_CONFIG, Type.STRING, "", Importance.LOW, "Override the S3 URL endpoint.") + .define(S3_PATHSTYLE_CONFIG, Type.BOOLEAN, false, Importance.LOW, "Override the standard S3 URL style by placing the bucket name in the path instead of hostname.") + .define(BUFFER_DIRECTORY_PATH_CONFIG, Type.STRING, Importance.HIGH, "Path to directory to store data temporarily before uploading to S3.") + .define(COMPRESSION_TYPE, Type.STRING, "gzip", Importance.HIGH, "The compression type to use, gzip and bzip2 supported. Defaults to gzip.") ; private Map configProperties; @@ -73,6 +76,7 @@ public List> taskConfigs(int maxTasks) { props.put(OVERRIDE_S3_ENDPOINT_CONFIG, configProperties.get(OVERRIDE_S3_ENDPOINT_CONFIG).toString()); props.put(S3_PATHSTYLE_CONFIG, configProperties.get(S3_PATHSTYLE_CONFIG).toString()); props.put(BUFFER_DIRECTORY_PATH_CONFIG, configProperties.get(BUFFER_DIRECTORY_PATH_CONFIG).toString()); + props.put(COMPRESSION_TYPE, configProperties.get(COMPRESSION_TYPE).toString()); configs.add(props); } return configs; diff --git a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java index e3010c1..fca67a2 100644 --- a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java +++ b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java @@ -39,7 +39,9 @@ public class S3SinkTask extends SinkTask { private String bufferDirectoryPath; - private Map tmpFiles; + private String compressionType; + + private Map tmpFiles; private Long maxBlockSize; @@ -63,6 +65,7 @@ private void readConfig(Map props) { overrideS3Endpoint = (String)config.get(S3SinkConnector.OVERRIDE_S3_ENDPOINT_CONFIG); s3PathStyle = (Boolean)config.get(S3SinkConnector.S3_PATHSTYLE_CONFIG); bufferDirectoryPath = (String)config.get(S3SinkConnector.BUFFER_DIRECTORY_PATH_CONFIG); + compressionType = (String)config.get(S3SinkConnector.COMPRESSION_TYPE); } @Override @@ -102,7 +105,7 @@ public void put(Collection records) throws ConnectException { String topic = record.topic(); int partition = record.kafkaPartition(); TopicPartition tp = new TopicPartition(topic, partition); - BlockGZIPFileWriter buffer = tmpFiles.get(tp); + BlockFileWriter buffer = tmpFiles.get(tp); if (buffer == null) { log.error("Trying to put {} records to partition {} which doesn't exist yet", records.size(), tp); throw new ConnectException("Trying to put records for a topic partition that has not be assigned"); @@ -121,9 +124,9 @@ public void flush(Map offsets) throws Connect // https://twitter.com/mr_paul_banks/status/702493772983177218 // Instead iterate over the writers we do have and get the offsets directly from them. - for (Map.Entry entry : tmpFiles.entrySet()) { + for (Map.Entry entry : tmpFiles.entrySet()) { TopicPartition tp = entry.getKey(); - BlockGZIPFileWriter writer = entry.getValue(); + BlockFileWriter writer = entry.getValue(); if (writer.getNumRecords() == 0) { // Not done anything yet log.info("No new records for partition {}", tp); @@ -145,9 +148,14 @@ public void flush(Map offsets) throws Connect } } - private BlockGZIPFileWriter createNextBlockWriter(TopicPartition tp, long nextOffset) throws ConnectException, IOException { + private BlockFileWriter createNextBlockWriter(TopicPartition tp, long nextOffset) throws ConnectException, IOException { String name = String.format("%s-%05d", tp.topic(), tp.partition()); - return new BlockGZIPFileWriter(name, bufferDirectoryPath, nextOffset, maxBlockSize); + + if (compressionType == "bzip2") { + return new BlockBZIP2FileWriter(name, bufferDirectoryPath, nextOffset, maxBlockSize); + } else { + return new BlockGZIPFileWriter(name, bufferDirectoryPath, nextOffset, maxBlockSize); + } } @Override @@ -159,7 +167,7 @@ public void onPartitionsAssigned(Collection partitions) throws C public void onPartitionsRevoked(Collection partitions) throws ConnectException { for (TopicPartition tp : partitions) { // See if this is a new assignment - BlockGZIPFileWriter writer = this.tmpFiles.remove(tp); + BlockFileWriter writer = this.tmpFiles.remove(tp); if (writer != null) { log.info("Revoked partition {} deleting buffer", tp); try { @@ -194,7 +202,7 @@ private void recoverPartition(TopicPartition tp) throws IOException { log.info("Recovering partition {} from offset {}", tp, offset); - BlockGZIPFileWriter w = createNextBlockWriter(tp, offset); + BlockFileWriter w = createNextBlockWriter(tp, offset); tmpFiles.put(tp, w); this.context.offset(tp, offset); From 74f84171d2e19b3fe0a9c668f99c0bf4033f02b5 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Mon, 3 Apr 2017 17:12:56 +0100 Subject: [PATCH 5/6] Fixed handling bzip2 writer --- src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java index fca67a2..74b566d 100644 --- a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java +++ b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java @@ -151,7 +151,7 @@ public void flush(Map offsets) throws Connect private BlockFileWriter createNextBlockWriter(TopicPartition tp, long nextOffset) throws ConnectException, IOException { String name = String.format("%s-%05d", tp.topic(), tp.partition()); - if (compressionType == "bzip2") { + if (compressionType.equals("bzip2")) { return new BlockBZIP2FileWriter(name, bufferDirectoryPath, nextOffset, maxBlockSize); } else { return new BlockGZIPFileWriter(name, bufferDirectoryPath, nextOffset, maxBlockSize); From 76420730cfed5eb8dbac00252794a2d1ed468208 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Mon, 3 Apr 2017 17:15:58 +0100 Subject: [PATCH 6/6] Updated README and version --- README.md | 7 +++---- pom.xml | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 516d939..de0a346 100644 --- a/README.md +++ b/README.md @@ -86,11 +86,9 @@ $ cat system-test-00000-000000000000.index.json | jq -M '.' - Depending on your needs you can either limit to just the single block, or if you want to consume all records after that offset, you can consume from the offset right to the end of the file - The range request bytes can be decompressed as a GZIP file on their own with any GZIP compatible tool, provided you limit to whole block boundaries. -## Other Formats +## BZip2 format -For now we only support Block-GZIP output. This assumes that all your kafka messages can be output as newline-delimited text files. - -We could make the output format pluggable if others have use for this connector, but need binary serialisation formats like Avro/Thrift/Protobuf etc. Pull requests welcome. +Works exactly the same way as the Block-GZIP output format, see above. ## Build and Run @@ -113,6 +111,7 @@ In addition to the [standard kafka-connect config options](http://kafka.apache.o | s3.endpoint | AWS defaults per region | Mostly useful for testing. | | s3.path_style | `false` | Force path-style access to bucket rather than subdomain. Mostly useful for tests. | | compressed_block_size | 67108864 | How much _uncompressed_ data to write to the file before we rol to a new block/chunk. See [Block-GZIP](#user-content-block-gzip-output-format) section above. | +| compression.type | `gzip` | The compression algorithm, either `gzip` or `bzip2`. | Note that we use the default AWS SDK credentials provider. [Refer to their docs](http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html#id1) for the options for configuring S3 credentials. diff --git a/pom.xml b/pom.xml index ec5715e..49508fa 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.deviantart kafka-connect-s3 - 0.0.3 + 0.0.4 jar kafka-connect-s3