diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index c7ab7ca4bde..65ff3bfe030 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -553,6 +553,7 @@ Only methods mentioned in **bold** are required.
| .setMaxOutstanding() |~~hdfsspout.max.outstanding~~ | 10000 | Limits the number of unACKed tuples by pausing tuple generation (if ACKers are used in the topology) |
| .setLockTimeoutSec() |~~hdfsspout.lock.timeout.sec~~ | 5 minutes | Duration of inactivity after which a lock file is considered to be abandoned and ready for another spout to take ownership |
| .setClocksInSync() |~~hdfsspout.clocks.insync~~ | true | Indicates whether clocks on the storm machines are in sync (using services like NTP). Used for detecting stale locks. |
+| .setInotifyEnabled() |~~hdfsspout.inotify.enabled~~ | false | Enables inotify based watch dir monitoring if set to "true". Note: **root privileges are needed** on HDFS to use inotify |
| .withConfigKey() | | | Optional setting. Overrides the default key name ('hdfs.config', see below) used for specifying HDFS client configs. |
| .setHdfsClientSettings() |~~hdfs.config~~ (unless changed via withConfigKey)| | Set it to a Map of Key/value pairs indicating the HDFS settings to be used. For example, keytab and principle could be set using this. See section **Using keytabs on all worker hosts** under HDFS bolt below.|
| .withOutputStream() | | | Name of output stream. If set, the tuples will be emited to the specified stream. Else tuples will be emited to the default output stream |
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsDirectoryMonitor.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsDirectoryMonitor.java
new file mode 100644
index 00000000000..1685bc08b70
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsDirectoryMonitor.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.common;
+
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
+import org.apache.hadoop.hdfs.inotify.MissingEventsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+public class HdfsDirectoryMonitor implements Iterable {
+ public enum WatchMode { POLL, INOTIFY }
+
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsUtils.class);
+
+ private Path watchDir = null;
+ private WatchMode watchMode = null;
+ private FileSystem hdfs = null;
+ private DFSClient dfsClient = null;
+ private HdfsAdmin hdfsAdmin = null;
+
+ private ArrayList newFiles = new ArrayList<>();
+ private long lastTxId = -1;
+
+ private HdfsDirectoryMonitor(Path watchDir, WatchMode watchMode, FileSystem hdfs, DFSClient dfsClient,
+ HdfsAdmin hdfsAdmin) {
+ this.watchDir = watchDir;
+ this.watchMode = watchMode;
+ this.hdfs = hdfs;
+ this.dfsClient = dfsClient;
+ this.hdfsAdmin = hdfsAdmin;
+ }
+
+ public static HdfsDirectoryMonitor createPollMonitor(Path watchDir, FileSystem hdfs) {
+ LOG.info("Instantiating Poll based HDFS Directory Monitor");
+ return new HdfsDirectoryMonitor(watchDir, WatchMode.POLL, hdfs, null, null);
+ }
+
+ public static HdfsDirectoryMonitor createInotifyMonitor(Path watchDir, FileSystem hdfs, DFSClient dfsClient,
+ HdfsAdmin hdfsAdmin) throws IOException {
+ LOG.info("Instantiating inotify based HDFS Directory Monitor");
+
+ /* Initialize the last Tx Id to catch the inotify events after the monitor has been created */
+ HdfsDirectoryMonitor monitor = new HdfsDirectoryMonitor(watchDir, WatchMode.INOTIFY, hdfs, dfsClient, hdfsAdmin);
+ monitor.lastTxId = dfsClient.getNamenode().getCurrentEditLogTxid();
+
+ /* Do an initial collection in the input dir to pick up the leftover files */
+ monitor.newFiles = HdfsUtils.listFilesByModificationTime(hdfs, watchDir, 0);
+
+ return monitor;
+ }
+
+
+ public void update() throws IOException, MissingEventsException, InterruptedException {
+ /* The poll method lists the directory contents while inotify uses incremental updates.
+ Therefore the new file list needs to be replaced in case of poll but added to existing in case of inotify.
+ */
+ if (this.watchMode == WatchMode.POLL) {
+ newFiles = HdfsUtils.listFilesByModificationTime(hdfs, watchDir, 0);
+ } else {
+ newFiles.addAll(getNewFilesFromInotify(watchDir.toString()));
+ }
+ }
+
+ private boolean filterFiles(String watchDirectory, String filename)
+ {
+ /* include files which are located inside the watch directory */
+ return !filename.equals(watchDirectory) &&
+ filename.startsWith(watchDirectory.endsWith("/") ? watchDirectory : watchDirectory + "/");
+ }
+
+ private ArrayList getNewFilesFromInotify(String watchDirectory)
+ throws IOException, InterruptedException, MissingEventsException {
+
+ if (lastTxId == -1) {
+ lastTxId = dfsClient.getNamenode().getCurrentEditLogTxid();
+ }
+
+ LOG.trace("Reading inotify events from HDFS from tx id: " + lastTxId);
+
+ DFSInotifyEventInputStream eventStream = hdfsAdmin.getInotifyEventStream(this.lastTxId);
+
+ EventBatch batch;
+ Set modifiedFiles = new HashSet<>();
+ while ( (batch = eventStream.poll()) != null) {
+ for (Event event : batch.getEvents()) {
+ switch (event.getEventType()) {
+ case CREATE: {
+ /* inotify event's getPath() returns a string, not a real Path object.
+ We need to construct the full Path object using hdfs.makeQualified()
+ to get the same granularity (scheme + authority + path) as created
+ by HdfsUtils.listFilesByModificationTime */
+ String filename = ((Event.CreateEvent) event).getPath();
+
+ if (filterFiles(watchDirectory, filename)) {
+ Path path = hdfs.makeQualified(new Path(filename));
+ modifiedFiles.add(path);
+ }
+ }
+ break;
+
+ case UNLINK: {
+ String filename = ((Event.UnlinkEvent) event).getPath();
+
+ if (filterFiles(watchDirectory, filename)) {
+ Path path = hdfs.makeQualified(new Path(filename));
+ modifiedFiles.add(path);
+ }
+ }
+ break;
+
+ case RENAME: {
+ String srcFilename = ((Event.RenameEvent) event).getSrcPath();
+ String dstFilename = ((Event.RenameEvent) event).getDstPath();
+
+ if (filterFiles(watchDirectory, srcFilename) && filterFiles(watchDirectory, dstFilename)) {
+ Path srcPath = hdfs.makeQualified(new Path(srcFilename));
+ Path dstPath = hdfs.makeQualified(new Path(dstFilename));
+
+ modifiedFiles.remove(srcPath);
+ modifiedFiles.add(dstPath);
+ }
+ }
+ break;
+ }
+ }
+
+ lastTxId = batch.getTxid();
+ }
+
+ return new ArrayList<>(modifiedFiles);
+ }
+
+ @Override
+ public Iterator iterator() {
+ return new HdfsDirectoryMonitorIterator();
+ }
+
+ private class HdfsDirectoryMonitorIterator implements Iterator {
+ HdfsDirectoryMonitorIterator() {
+ }
+
+ @Override
+ public boolean hasNext() {
+ return newFiles.size() > 0;
+ }
+
+ @Override
+ public Path next() {
+ if (newFiles == null || newFiles.size() == 0) {
+ throw new NoSuchElementException();
+ }
+ return newFiles.remove(0);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Delete Not Supported");
+ }
+ }
+}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
index 8911b3c7b85..5de5b723237 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
@@ -34,6 +34,7 @@ public class Configs {
public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec"; // inactivity duration after which locks are considered candidates for being reassigned to another spout
public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; // if clocks on machines in the Storm cluster are in sync
public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix"; // filenames with this suffix in archive dir will be ignored by the Spout
+ public static final String INOTIFY_ENABLED = "hdfsspout.inotify.enabled"; // enables inotify based directory watch mode if set to "true"
public static final String DEFAULT_LOCK_DIR = ".lock";
public static final int DEFAULT_COMMIT_FREQ_COUNT = 20000;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index b7627f24178..4d597ae70d0 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.URI;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -30,11 +29,14 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.storm.Config;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.common.HdfsUtils;
+import org.apache.storm.hdfs.common.HdfsDirectoryMonitor;
import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +84,8 @@ public class HdfsSpout extends BaseRichSpout {
private FileSystem hdfs;
private FileReader reader;
+ private HdfsDirectoryMonitor hdfsDirectoryMonitor;
+ private boolean inotifyEnabled = false;
private SpoutOutputCollector collector;
HashMap > inflight = new HashMap<>();
@@ -169,6 +173,11 @@ public HdfsSpout setIgnoreSuffix(String ignoreSuffix) {
return this;
}
+ public HdfsSpout setInotifyEnabled(boolean inotifyEnabled) {
+ this.inotifyEnabled = inotifyEnabled;
+ return this;
+ }
+
/** Output field names. Number of fields depends upon the reader type */
public HdfsSpout withOutputFields(String... fields) {
outputFields = new Fields(fields);
@@ -503,6 +512,23 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
clocksInSync = Boolean.parseBoolean(conf.get(Configs.CLOCKS_INSYNC).toString());
}
+ if ( conf.get(Configs.INOTIFY_ENABLED) != null ) {
+ this.inotifyEnabled = Boolean.parseBoolean(conf.get(Configs.INOTIFY_ENABLED).toString());
+ }
+
+ if (this.inotifyEnabled) {
+ try {
+ DFSClient dfsClient = new DFSClient(URI.create(hdfsUri), hdfsConfig);
+ HdfsAdmin hdfsAdmin = new HdfsAdmin(hdfs.getUri(), hdfsConfig);
+ this.hdfsDirectoryMonitor = HdfsDirectoryMonitor.createInotifyMonitor(sourceDirPath, hdfs, dfsClient, hdfsAdmin);
+ } catch (IOException e) {
+ LOG.error("Error creating inotify based HDFS directory monitor. Are we running as root?", e);
+ throw new RuntimeException("Error creating inotify based HDFS directory monitor. Are we running as root?", e);
+ }
+ } else {
+ this.hdfsDirectoryMonitor = HdfsDirectoryMonitor.createPollMonitor(sourceDirPath, hdfs);
+ }
+
// -- spout id
spoutId = context.getThisComponentId();
@@ -596,13 +622,16 @@ private FileReader pickNextFile() {
}
// 2) If no abandoned files, then pick oldest file in sourceDirPath, lock it and rename it
- Collection listing = HdfsUtils.listFilesByModificationTime(hdfs, sourceDirPath, 0);
-
- for (Path file : listing) {
- if (file.getName().endsWith(inprogress_suffix)) {
- continue;
- }
- if (file.getName().endsWith(ignoreSuffix)) {
+ try {
+ hdfsDirectoryMonitor.update();
+ } catch (Exception e) {
+ LOG.error("Unable to update source dir: " + sourceDirPath, e);
+ return null;
+ }
+ for (Path file : hdfsDirectoryMonitor) {
+ if (Path.getPathWithoutSchemeAndAuthority(file).toString().startsWith(lockDirPath.toString()) ||
+ file.getName().endsWith(inprogress_suffix) ||
+ file.getName().endsWith(ignoreSuffix)) {
continue;
}
lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId);
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index f60cbf3e315..985e59ab83e 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -44,6 +44,10 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
import java.io.BufferedReader;
import java.io.File;
@@ -52,6 +56,7 @@
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -59,6 +64,7 @@
import org.apache.storm.hdfs.common.HdfsUtils.Pair;
+@RunWith(Parameterized.class)
public class TestHdfsSpout {
@Rule
@@ -72,6 +78,14 @@ public class TestHdfsSpout {
public TestHdfsSpout() {
}
+ @Parameter
+ public String inotifyEnabled;
+
+ @Parameters(name="inotifyEnabled={0}")
+ public static Collection