Skip to content

Commit

Permalink
Merge pull request #2 from robvadai/bzip2
Browse files Browse the repository at this point in the history
Added BZip2 support
  • Loading branch information
robvadai authored Apr 3, 2017
2 parents 07ddad0 + 7642073 commit a93f7f5
Show file tree
Hide file tree
Showing 12 changed files with 768 additions and 429 deletions.
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ $ cat system-test-00000-000000000000.index.json | jq -M '.'
- Depending on your needs you can either limit to just the single block, or if you want to consume all records after that offset, you can consume from the offset right to the end of the file
- The range request bytes can be decompressed as a GZIP file on their own with any GZIP compatible tool, provided you limit to whole block boundaries.

## Other Formats
## BZip2 format

For now we only support Block-GZIP output. This assumes that all your kafka messages can be output as newline-delimited text files.

We could make the output format pluggable if others have use for this connector, but need binary serialisation formats like Avro/Thrift/Protobuf etc. Pull requests welcome.
Works exactly the same way as the Block-GZIP output format, see above.

## Build and Run

Expand All @@ -113,6 +111,7 @@ In addition to the [standard kafka-connect config options](http://kafka.apache.o
| s3.endpoint | AWS defaults per region | Mostly useful for testing. |
| s3.path_style | `false` | Force path-style access to bucket rather than subdomain. Mostly useful for tests. |
| compressed_block_size | 67108864 | How much _uncompressed_ data to write to the file before we rol to a new block/chunk. See [Block-GZIP](#user-content-block-gzip-output-format) section above. |
| compression.type | `gzip` | The compression algorithm, either `gzip` or `bzip2`. |

Note that we use the default AWS SDK credentials provider. [Refer to their docs](http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html#id1) for the options for configuring S3 credentials.

Expand Down
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.deviantart</groupId>
<artifactId>kafka-connect-s3</artifactId>
<version>0.0.3</version>
<version>0.0.4</version>
<packaging>jar</packaging>

<name>kafka-connect-s3</name>
Expand Down Expand Up @@ -83,6 +83,11 @@
<artifactId>aws-java-sdk-s3</artifactId>
<version>${s3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.deviantart.kafka_connect_s3;

import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;

import java.util.zip.GZIPOutputStream;

import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;

/**
* BlockBZIP2FileWriter accumulates newline delimited UTF-8 records and writes them to an
* output file that is readable by BZIP2.
*
* In fact this file is the concatenation of possibly many separate BZIP2 files corresponding to smaller chunks
* of the input. Alongside the output filename.gz file, a file filename-index.json is written containing JSON
* metadata about the size and location of each block.
*
* This allows a reading class to skip to particular line/record without decompressing whole file by looking up
* the offset of the containing block, seeking to it and beginning BZIP2 read from there.
*
* This is especially useful when the file is an archive in HTTP storage like Amazon S3 where GET request with
* range headers can allow pulling a small segment from overall compressed file.
*
* Note that thanks to BZIP2 spec, the overall file is perfectly valid and will compress as if it was a single stream
* with any regular BZIP2 decoding library or program.
*/
public class BlockBZIP2FileWriter extends BlockFileWriter {

private BZip2CompressorOutputStream bzip2Stream;

public BlockBZIP2FileWriter(String filenameBase, String path) throws FileNotFoundException, IOException {
this(filenameBase, path, 0, 67108864);
}

public BlockBZIP2FileWriter(String filenameBase, String path, long firstRecordOffset) throws FileNotFoundException, IOException {
this(filenameBase, path, firstRecordOffset, 67108864);
}

public BlockBZIP2FileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws FileNotFoundException, IOException {
super(filenameBase, path, firstRecordOffset, chunkThreshold);
}

@Override
protected void initChunkWriter() throws IOException, UnsupportedEncodingException {
bzip2Stream = new BZip2CompressorOutputStream(fileStream);
writer = new BufferedWriter(new OutputStreamWriter(bzip2Stream, "UTF-8"));
}

@Override
protected void finishChunk() throws IOException {
Chunk ch = currentChunk();

// Complete GZIP block without closing stream
writer.flush();
bzip2Stream.finish();

// We can no find out how long this chunk was compressed
long bytesWritten = fileStream.getNumBytesWritten();
ch.compressedByteLength = bytesWritten - ch.byteOffset;
}

@Override
public String getDataFileName() {
return String.format("%s-%012d.bzip2", filenameBase, super.getFirstRecordOffset());
}
}
180 changes: 180 additions & 0 deletions src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package com.deviantart.kafka_connect_s3;

import java.io.BufferedWriter;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;

import java.util.ArrayList;
import java.util.zip.GZIPOutputStream;

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;

abstract class BlockFileWriter {
protected String filenameBase;
private String path;
protected BufferedWriter writer;
protected CountingOutputStream fileStream;

protected ArrayList<Chunk> chunks;

// Default each chunk is 64MB of uncompressed data
private long chunkThreshold;

// Offset to the first record.
// Set to non-zero if this file is part of a larger stream and you want
// record offsets in the index to reflect the global offset rather than local
private long firstRecordOffset;

BlockFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws IOException {
this.filenameBase = filenameBase;
this.path = path;
this.firstRecordOffset = firstRecordOffset;
this.chunkThreshold = chunkThreshold;

chunks = new ArrayList<Chunk>();

// Initialize first chunk
Chunk ch = new Chunk();
ch.firstOffset = firstRecordOffset;
chunks.add(ch);

// Explicitly truncate the file. On linux and OS X this appears to happen
// anyway when opening with FileOutputStream but that behavior is not actually documented
// or specified anywhere so let's be rigorous about it.
FileOutputStream fos = new FileOutputStream(new File(getDataFilePath()));
fos.getChannel().truncate(0);

// Open file for writing and setup
this.fileStream = new CountingOutputStream(fos);
initChunkWriter();
}

abstract protected void initChunkWriter() throws IOException;
abstract protected void finishChunk() throws IOException;
abstract protected String getDataFileName();

protected Chunk currentChunk() {
return chunks.get(chunks.size() - 1);
}

public long getFirstRecordOffset() {
return firstRecordOffset;
}

public String getIndexFileName() {
return String.format("%s-%012d.index.json", filenameBase, firstRecordOffset);
}

public String getDataFilePath() {
return String.format("%s/%s", path, this.getDataFileName());
}

public String getIndexFilePath() {
return String.format("%s/%s", path, this.getIndexFileName());
}

/**
* Writes string to file, assuming this is a single record
*
* If there is no newline at then end we will add one
*/
public void write(String record) throws IOException {
Chunk ch = currentChunk();

boolean hasNewLine = record.endsWith("\n");

int rawBytesToWrite = record.length();
if (!hasNewLine) {
rawBytesToWrite += 1;
}

if ((ch.rawBytes + rawBytesToWrite) > chunkThreshold) {
finishChunk();
initChunkWriter();

Chunk newCh = new Chunk();
newCh.firstOffset = ch.firstOffset + ch.numRecords;
newCh.byteOffset = ch.byteOffset + ch.compressedByteLength;
chunks.add(newCh);
ch = newCh;
}

writer.append(record);
if (!hasNewLine) {
writer.newLine();
}
ch.rawBytes += rawBytesToWrite;
ch.numRecords++;
}

public void delete() throws IOException {
deleteIfExists(getDataFilePath());
deleteIfExists(getIndexFilePath());
}

private void deleteIfExists(String path) throws IOException {
File f = new File(path);
if (f.exists() && !f.isDirectory()) {
f.delete();
}
}

public void close() throws IOException {
// Flush last chunk, updating index
finishChunk();
// Now close the writer (and the whole stream stack)
writer.close();
writeIndex();
}

private void writeIndex() throws IOException {
JSONArray chunkArr = new JSONArray();

for (Chunk ch : chunks) {
JSONObject chunkObj = new JSONObject();
chunkObj.put("first_record_offset", ch.firstOffset);
chunkObj.put("num_records", ch.numRecords);
chunkObj.put("byte_offset", ch.byteOffset);
chunkObj.put("byte_length", ch.compressedByteLength);
chunkObj.put("byte_length_uncompressed", ch.rawBytes);
chunkArr.add(chunkObj);
}

JSONObject index = new JSONObject();
index.put("chunks", chunkArr);

try (FileWriter file = new FileWriter(getIndexFilePath())) {
file.write(index.toJSONString());
file.close();
}
}

public int getTotalUncompressedSize() {
int totalBytes = 0;
for (Chunk ch : chunks) {
totalBytes += ch.rawBytes;
}
return totalBytes;
}

public int getNumChunks() {
return chunks.size();
}

public int getNumRecords() {
int totalRecords = 0;
for (Chunk ch : chunks) {
totalRecords += ch.numRecords;
}
return totalRecords;
}
}
Loading

0 comments on commit a93f7f5

Please sign in to comment.