diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java index 395cced5229..c6b5d141126 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; @@ -107,7 +108,7 @@ public final void prepare(Map conf, TopologyContext topologyContext, OutputColle throw new IllegalStateException("File system URL must be specified."); } - writers = new WritersMap(this.maxOpenFiles); + writers = new LinkedHashMap<>(this.maxOpenFiles, 0.75f, true); this.collector = collector; this.fileNameFormat.prepare(conf, topologyContext); @@ -202,18 +203,6 @@ public final void execute(Tuple tuple) { } } - private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple tuple) throws IOException { - AbstractHDFSWriter writer; - - writer = writers.get(writerKey); - if (writer == null) { - Path pathForNextFile = getBasePathForNextFile(tuple); - writer = makeNewWriter(pathForNextFile, tuple); - writers.put(writerKey, writer); - } - return writer; - } - /** * A tuple must be mapped to a writer based on two factors: * - bolt specific logic that must separate tuples into different files in the same directory (see the avro bolt @@ -308,17 +297,31 @@ protected Path getBasePathForNextFile(Tuple tuple) { abstract protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException; - static class WritersMap extends LinkedHashMap { - final long maxWriters; + private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple tuple) throws IOException { + AbstractHDFSWriter writer = writers.get( writerKey ); + if (writer == null) { + LOG.debug("Creating Writer for writerKey : " + writerKey); + Path pathForNextFile = getBasePathForNextFile(tuple); + writer = makeNewWriter(pathForNextFile, tuple); - public WritersMap(long maxWriters) { - super((int)maxWriters, 0.75f, true); - this.maxWriters = maxWriters; + if (writers.size() > (this.maxOpenFiles - 1)) { + LOG.info("cached writers size {} exceeded maxOpenFiles {} ", writers.size(), this.maxOpenFiles); + retireEldestWriter(); + } + this.writers.put(writerKey, writer); } + return writer; + } - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return this.size() > this.maxWriters; + /** + * Locate writer that has not been used for longest time and retire it + */ + private void retireEldestWriter() throws IOException { + LOG.info("Attempting to close eldest writer"); + Iterator iterator = this.writers.keySet().iterator(); + if (iterator.hasNext()) { + String eldestKey = iterator.next(); + doRotationAndRemoveWriter(eldestKey, this.writers.get(eldestKey)); } } } diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java index e8f07023fc1..2ff4e16b618 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java @@ -214,9 +214,35 @@ public void testTickTuples() throws IOException Assert.assertEquals(1, countNonZeroLengthFiles(testRoot)); } - public void createBaseDirectory(FileSystem passedFs, String path) throws IOException { - Path p = new Path(path); - passedFs.mkdirs(p); + @Test + public void testWriterRetirement() throws IOException { + // We do not want the count sync policy to fire so set it to one greater than the number of tuples we will write (10) + HdfsBolt bolt = makeHdfsBolt(hdfsURI, 11, 1000f); + + // Use the id field as the partition path to ensure that each tuple gets written to its own directory + Partitioner partitoner = new Partitioner() { + @Override + public String getPartitionPath(Tuple tuple) { + return Path.SEPARATOR + tuple.getIntegerByField("id"); + } + }; + + bolt.prepare(new Config(), topologyContext, collector); + bolt.withPartitioner(partitoner); + bolt.withMaxOpenFiles(9); + + // Write 10 tuples. The 10th tuple should force the least recently used writer (0) to be retired since we are only allowing 9 open writers. + for (int i = 0; i < 10; i++) { + bolt.execute(generateTestTuple(i, null, null, null)); + } + + // Verify that writers for 1-9 have not been closed yet + for (int i = 1; i < 10; i++) { + Assert.assertEquals(1, countZeroLengthFiles(testRoot + Path.SEPARATOR + i)); + } + + // Verify that the writer for 0 has been closed + Assert.assertEquals(1, countNonZeroLengthFiles(testRoot + Path.SEPARATOR + "0")); } private HdfsBolt makeHdfsBolt(String nameNodeAddr, int countSync, float rotationSizeMB) { @@ -282,3 +308,4 @@ private int countZeroLengthFiles(String path) throws IOException { return zeroLength; } } + diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java deleted file mode 100644 index fd99efe8301..00000000000 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.hdfs.bolt; - -import org.apache.storm.hdfs.common.AbstractHDFSWriter; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mock; - -public class TestWritersMap { - - AbstractHdfsBolt.WritersMap map = new AbstractHdfsBolt.WritersMap(2); - @Mock AbstractHDFSWriter foo; - @Mock AbstractHDFSWriter bar; - @Mock AbstractHDFSWriter baz; - - @Test public void testLRUBehavior() - { - map.put("FOO", foo); - map.put("BAR", bar); - - //Access foo to make it most recently used - map.get("FOO"); - - //Add an element and bar should drop out - map.put("BAZ", baz); - - Assert.assertTrue(map.keySet().contains("FOO")); - Assert.assertTrue(map.keySet().contains("BAZ")); - - Assert.assertFalse(map.keySet().contains("BAR")); - } -}