Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -107,7 +108,7 @@ public final void prepare(Map conf, TopologyContext topologyContext, OutputColle
throw new IllegalStateException("File system URL must be specified.");
}

writers = new WritersMap(this.maxOpenFiles);
writers = new LinkedHashMap<>(this.maxOpenFiles, 0.75f, true);

this.collector = collector;
this.fileNameFormat.prepare(conf, topologyContext);
Expand Down Expand Up @@ -202,18 +203,6 @@ public final void execute(Tuple tuple) {
}
}

private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple tuple) throws IOException {
AbstractHDFSWriter writer;

writer = writers.get(writerKey);
if (writer == null) {
Path pathForNextFile = getBasePathForNextFile(tuple);
writer = makeNewWriter(pathForNextFile, tuple);
writers.put(writerKey, writer);
}
return writer;
}

/**
* A tuple must be mapped to a writer based on two factors:
* - bolt specific logic that must separate tuples into different files in the same directory (see the avro bolt
Expand Down Expand Up @@ -308,17 +297,31 @@ protected Path getBasePathForNextFile(Tuple tuple) {

abstract protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException;

static class WritersMap extends LinkedHashMap<String, AbstractHDFSWriter> {
final long maxWriters;
private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple tuple) throws IOException {
AbstractHDFSWriter writer = writers.get( writerKey );
if (writer == null) {
LOG.debug("Creating Writer for writerKey : " + writerKey);
Path pathForNextFile = getBasePathForNextFile(tuple);
writer = makeNewWriter(pathForNextFile, tuple);

public WritersMap(long maxWriters) {
super((int)maxWriters, 0.75f, true);
this.maxWriters = maxWriters;
if (writers.size() > (this.maxOpenFiles - 1)) {
LOG.info("cached writers size {} exceeded maxOpenFiles {} ", writers.size(), this.maxOpenFiles);
retireEldestWriter();
}
this.writers.put(writerKey, writer);
}
return writer;
}

@Override
protected boolean removeEldestEntry(Map.Entry<String, AbstractHDFSWriter> eldest) {
return this.size() > this.maxWriters;
/**
* Locate writer that has not been used for longest time and retire it
*/
private void retireEldestWriter() throws IOException {
LOG.info("Attempting to close eldest writer");
Iterator<String> iterator = this.writers.keySet().iterator();
if (iterator.hasNext()) {
String eldestKey = iterator.next();
doRotationAndRemoveWriter(eldestKey, this.writers.get(eldestKey));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,35 @@ public void testTickTuples() throws IOException
Assert.assertEquals(1, countNonZeroLengthFiles(testRoot));
}

public void createBaseDirectory(FileSystem passedFs, String path) throws IOException {
Path p = new Path(path);
passedFs.mkdirs(p);
@Test
public void testWriterRetirement() throws IOException {
// We do not want the count sync policy to fire so set it to one greater than the number of tuples we will write (10)
HdfsBolt bolt = makeHdfsBolt(hdfsURI, 11, 1000f);

// Use the id field as the partition path to ensure that each tuple gets written to its own directory
Partitioner partitoner = new Partitioner() {
@Override
public String getPartitionPath(Tuple tuple) {
return Path.SEPARATOR + tuple.getIntegerByField("id");
}
};

bolt.prepare(new Config(), topologyContext, collector);
bolt.withPartitioner(partitoner);
bolt.withMaxOpenFiles(9);

// Write 10 tuples. The 10th tuple should force the least recently used writer (0) to be retired since we are only allowing 9 open writers.
for (int i = 0; i < 10; i++) {
bolt.execute(generateTestTuple(i, null, null, null));
}

// Verify that writers for 1-9 have not been closed yet
for (int i = 1; i < 10; i++) {
Assert.assertEquals(1, countZeroLengthFiles(testRoot + Path.SEPARATOR + i));
}

// Verify that the writer for 0 has been closed
Assert.assertEquals(1, countNonZeroLengthFiles(testRoot + Path.SEPARATOR + "0"));
}

private HdfsBolt makeHdfsBolt(String nameNodeAddr, int countSync, float rotationSizeMB) {
Expand Down Expand Up @@ -282,3 +308,4 @@ private int countZeroLengthFiles(String path) throws IOException {
return zeroLength;
}
}

This file was deleted.