diff --git a/Makefile.am b/Makefile.am index 78abae4d5d..44cc5f7a07 100644 --- a/Makefile.am +++ b/Makefile.am @@ -130,6 +130,7 @@ tsdb_SRC := \ src/tools/MetaPurge.java \ src/tools/MetaSync.java \ src/tools/Search.java \ + src/tools/StartupPlugin.java \ src/tools/TSDMain.java \ src/tools/TextImporter.java \ src/tools/TreeSync.java \ diff --git a/src/core/TSDB.java b/src/core/TSDB.java index 5e3f9657f7..21e853faa1 100644 --- a/src/core/TSDB.java +++ b/src/core/TSDB.java @@ -58,6 +58,7 @@ import net.opentsdb.query.filter.TagVFilter; import net.opentsdb.search.SearchPlugin; import net.opentsdb.search.SearchQuery; +import net.opentsdb.tools.StartupPlugin; import net.opentsdb.stats.Histogram; import net.opentsdb.stats.QueryStats; import net.opentsdb.stats.StatsCollector; @@ -117,7 +118,10 @@ public final class TSDB { /** Search indexer to use if configure */ private SearchPlugin search = null; - + + /** Optional Startup Plugin to use if configured */ + private StartupPlugin startup = null; + /** Optional real time pulblisher plugin to use if configured */ private RTPublisher rt_publisher = null; @@ -229,7 +233,22 @@ public TSDB(final Config config) { public static byte[] FAMILY() { return FAMILY; } - + + /** + * Called by initializePlugins, also used to load startup plugins. + * @since 2.3 + */ + public static void loadPluginPath(final String plugin_path) throws RuntimeException { + if (plugin_path != null && !plugin_path.isEmpty()) { + try { + PluginLoader.loadJARs(plugin_path); + } catch (Exception e) { + throw new RuntimeException("Error loading plugins from plugin path: " + + plugin_path, e); + } + } + } + /** * Should be called immediately after construction to initialize plugins and * objects that rely on such. It also moves most of the potential exception @@ -242,16 +261,12 @@ public static byte[] FAMILY() { */ public void initializePlugins(final boolean init_rpcs) { final String plugin_path = config.getString("tsd.core.plugin_path"); - if (plugin_path != null && !plugin_path.isEmpty()) { - try { - PluginLoader.loadJARs(plugin_path); - } catch (Exception e) { - LOG.error("Error loading plugins from plugin path: " + plugin_path, e); - throw new RuntimeException("Error loading plugins from plugin path: " + - plugin_path, e); - } + try { + loadPluginPath(plugin_path); + } catch (Exception e) { + LOG.error("Error loading plugins from plugin path: " + plugin_path, e); } - + try { TagVFilter.initializeFilterMap(this); // @#$@%$%#$ing typed exceptions @@ -343,8 +358,21 @@ public void initializePlugins(final boolean init_rpcs) { public final HBaseClient getClient() { return this.client; } - - /** + + /** + * Sets the startup plugin so that it can be shutdown properly. + * @param startup + * @since 2.3 + */ + public final void setStartup(StartupPlugin startup) { this.startup = startup; } + /** + * Getter that returns the startup plugin object + * @return The StartupPlugin object + * @since 2.3 + */ + public final StartupPlugin getStartup() { return this.startup; } + + /** * Getter that returns the configuration object * @return The configuration object * @since 2.0 @@ -964,6 +992,11 @@ public Object call(ArrayList compactions) throws Exception { LOG.info("Flushing compaction queue"); deferreds.add(compactionq.flush().addCallback(new CompactCB())); } + if (startup != null) { + LOG.info("Shutting down startup plugin: " + + startup.getClass().getCanonicalName()); + deferreds.add(startup.shutdown()); + } if (search != null) { LOG.info("Shutting down search plugin: " + search.getClass().getCanonicalName()); diff --git a/src/tools/StartupPlugin.java b/src/tools/StartupPlugin.java new file mode 100644 index 0000000000..580caa7ed3 --- /dev/null +++ b/src/tools/StartupPlugin.java @@ -0,0 +1,78 @@ +// This file is part of OpenTSDB. +// Copyright (C) 2013 The OpenTSDB Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 2.1 of the License, or (at your +// option) any later version. This program is distributed in the hope that it +// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty +// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. You should have received a copy +// of the GNU Lesser General Public License along with this program. If not, +// see . +package net.opentsdb.tools; + +import com.stumbleupon.async.Deferred; + +import net.opentsdb.utils.Config; +import net.opentsdb.stats.StatsCollector; + +/** + * The StartupPlugin allows users to interact with the OpenTSDB configuration + * as soon as it is completely parsed, just before OpenTSDB begins to use it. + *

+ * Note: Implementations must have a parameterless constructor. The + * {@link #initialize(TSDB)} method will be called immediately after the plugin is + * instantiated and before any other methods are called. + * @since 2.3 + */ +public abstract class StartupPlugin { + + /** + * Called by TSDB to initialize the plugin + * Implementations are responsible for setting up any IO they need as well + * as starting any required background threads. + * Note: Implementations should throw exceptions if they can't start + * up properly. The TSD will then shutdown so the operator can fix the + * problem. Please use IllegalArgumentException for configuration issues. + * @param tsdb The parent TSDB object + * @throws IllegalArgumentException if required configuration parameters are + * missing + * @throws Exception if something else goes wrong + */ + public abstract void initialize(final Config config) throws IllegalArgumentException, Exception; + + /** + * Called to gracefully shutdown the plugin. Implementations should close + * any IO they have open + * @return A deferred object that indicates the completion of the request. + * The {@link Object} has not special meaning and can be {@code null} + * (think of it as {@code Deferred}). + */ + public abstract Deferred shutdown(); + + /** + * Should return the version of this plugin in the format: + * MAJOR.MINOR.MAINT, e.g. "2.0.1". The MAJOR version should match the major + * version of OpenTSDB the plugin is meant to work with. + * @return A version string used to log the loaded version + */ + public abstract String version(); + + /** + * Should return the version of this plugin in the format: + * MAJOR.MINOR.MAINT, e.g. "2.0.1". The MAJOR version should match the major + * version of OpenTSDB the plugin is meant to work with. + * @return A version string used to log the loaded version + */ + public abstract String getType(); + + /** + * Called by the TSD when a request for statistics collection has come in. The + * implementation may provide one or more statistics. If no statistics are + * available for the implementation, simply stub the method. + * @param collector The collector used for emitting statistics + */ + public abstract void collectStats(final StatsCollector collector); + +} \ No newline at end of file diff --git a/src/tools/TSDMain.java b/src/tools/TSDMain.java index da02826a5f..ce23661335 100644 --- a/src/tools/TSDMain.java +++ b/src/tools/TSDMain.java @@ -13,10 +13,17 @@ package net.opentsdb.tools; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.socket.ServerSocketChannelFactory; @@ -34,6 +41,8 @@ import net.opentsdb.tsd.RpcManager; import net.opentsdb.utils.Config; import net.opentsdb.utils.FileSystem; +import net.opentsdb.utils.Pair; +import net.opentsdb.utils.PluginLoader; import net.opentsdb.utils.Threads; /** @@ -53,6 +62,11 @@ static void usage(final ArgP argp, final String errmsg, final int retval) { System.exit(retval); } + /** A map of configured filters for use in querying */ + private static Map, Constructor>> + startupPlugin_filter_map = new HashMap, Constructor>>(); + private static final short DEFAULT_FLUSH_INTERVAL = 1000; private static TSDB tsdb = null; @@ -145,9 +159,21 @@ public static void main(String[] args) throws IOException { Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), new Threads.PrependThreadNamer()); } - + + StartupPlugin startup = null; + try { + startup = loadStartupPlugins(config); + } catch (IllegalArgumentException e) { + usage(argp, e.getMessage(), 3); + } catch (Exception e) { + throw new RuntimeException("Initialization failed", e); + } + try { tsdb = new TSDB(config); + if (startup != null) { + tsdb.setStartup(startup); + } tsdb.initializePlugins(true); if (config.getBoolean("tsd.storage.hbase.prefetch_meta")) { tsdb.preFetchHBaseMeta(); @@ -198,6 +224,42 @@ public static void main(String[] args) throws IOException { // The server is now running in separate threads, we can exit main. } + private static StartupPlugin loadStartupPlugins(Config config) { + Logger log = LoggerFactory.getLogger(TSDMain.class); + + // load the startup plugin if enabled + StartupPlugin startup = null; + + if (config.getBoolean("tsd.startup.enable")) { + final String plugin_path = config.getString("tsd.core.plugin_path"); + + try { + TSDB.loadPluginPath(plugin_path); + } catch (Exception e) { + log.error("Error loading plugins from plugin path: " + plugin_path, e); + } + + startup = PluginLoader.loadSpecificPlugin( + config.getString("tsd.startup.plugin"), StartupPlugin.class); + if (startup == null) { + throw new IllegalArgumentException("Unable to locate startup plugin: " + + config.getString("tsd.startup.plugin")); + } + try { + startup.initialize(config); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize startup plugin", e); + } + log.info("Successfully initialized startup plugin [" + + startup.getClass().getCanonicalName() + "] version: " + + startup.version()); + } else { + startup = null; + } + + return startup; + } + private static void registerShutdownHook() { final class TSDBShutdown extends Thread { public TSDBShutdown() { diff --git a/src/utils/PluginLoader.java b/src/utils/PluginLoader.java index 8dadfc5ec3..d66c75f0c9 100644 --- a/src/utils/PluginLoader.java +++ b/src/utils/PluginLoader.java @@ -104,7 +104,7 @@ public static T loadSpecificPlugin(final String name, while(it.hasNext()) { T plugin = it.next(); - if (plugin.getClass().getName().equals(name)) { + if (plugin.getClass().getName().equals(name) || plugin.getClass().getSuperclass().getName().equals(name)) { return plugin; } }