Skip to content

Commit

Permalink
Initial attempt at discovery plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
johann8384 committed Mar 3, 2016
1 parent e5b8be4 commit c9b8ac7
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 4 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ tsdb_SRC := \
src/tools/MetaPurge.java \
src/tools/MetaSync.java \
src/tools/Search.java \
src/tools/StartupPlugin.java \
src/tools/TSDMain.java \
src/tools/TextImporter.java \
src/tools/TreeSync.java \
Expand Down
28 changes: 25 additions & 3 deletions src/core/TSDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import net.opentsdb.query.filter.TagVFilter;
import net.opentsdb.search.SearchPlugin;
import net.opentsdb.search.SearchQuery;
import net.opentsdb.tools.StartupPlugin;
import net.opentsdb.stats.Histogram;
import net.opentsdb.stats.QueryStats;
import net.opentsdb.stats.StatsCollector;
Expand Down Expand Up @@ -117,7 +118,10 @@ public final class TSDB {

/** Search indexer to use if configure */
private SearchPlugin search = null;


/** Optional Startup Plugin to use if configured */
private StartupPlugin startup = null;

/** Optional real time pulblisher plugin to use if configured */
private RTPublisher rt_publisher = null;

Expand Down Expand Up @@ -343,8 +347,21 @@ public void initializePlugins(final boolean init_rpcs) {
public final HBaseClient getClient() {
return this.client;
}

/**

/**
* Sets the startup plugin so that it can be shutdown properly.
* @param startup
* @since 2.3
*/
public final void setStartup(StartupPlugin startup) { this.startup = startup; }
/**
* Getter that returns the startup plugin object
* @return The StartupPlugin object
* @since 2.3
*/
public final StartupPlugin getStartup() { return this.startup; }

/**
* Getter that returns the configuration object
* @return The configuration object
* @since 2.0
Expand Down Expand Up @@ -964,6 +981,11 @@ public Object call(ArrayList<Object> compactions) throws Exception {
LOG.info("Flushing compaction queue");
deferreds.add(compactionq.flush().addCallback(new CompactCB()));
}
if (startup != null) {
LOG.info("Shutting down startup plugin: " +
startup.getClass().getCanonicalName());
deferreds.add(startup.shutdown());
}
if (search != null) {
LOG.info("Shutting down search plugin: " +
search.getClass().getCanonicalName());
Expand Down
4 changes: 4 additions & 0 deletions src/opentsdb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,7 @@ tsd.http.cachedir =

# Compaction flush speed multiplier, default 2
# tsd.storage.compaction.flush_speed = 2

tsd.core.plugin_path = '/Users/jcreasy/code/opentsdb/build/plugins'
tsd.startup.enable = true
tsd.startup.plugin = 'io.tsdb.opentsdb.DiscoveryPlugins.CuratorPlugin'
78 changes: 78 additions & 0 deletions src/tools/StartupPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// This file is part of OpenTSDB.
// Copyright (C) 2013 The OpenTSDB Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 2.1 of the License, or (at your
// option) any later version. This program is distributed in the hope that it
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details. You should have received a copy
// of the GNU Lesser General Public License along with this program. If not,
// see <http://www.gnu.org/licenses/>.
package net.opentsdb.tools;

import com.stumbleupon.async.Deferred;

import net.opentsdb.utils.Config;
import net.opentsdb.stats.StatsCollector;

/**
* The StartupPlugin allows users to interact with the OpenTSDB configuration
* as soon as it is completely parsed, just before OpenTSDB begins to use it.
* <p>
* <b>Note:</b> Implementations must have a parameterless constructor. The
* {@link #initialize(TSDB)} method will be called immediately after the plugin is
* instantiated and before any other methods are called.
* @since 2.3
*/
public abstract class StartupPlugin {

/**
* Called by TSDB to initialize the plugin
* Implementations are responsible for setting up any IO they need as well
* as starting any required background threads.
* <b>Note:</b> Implementations should throw exceptions if they can't start
* up properly. The TSD will then shutdown so the operator can fix the
* problem. Please use IllegalArgumentException for configuration issues.
* @param tsdb The parent TSDB object
* @throws IllegalArgumentException if required configuration parameters are
* missing
* @throws Exception if something else goes wrong
*/
public abstract void initialize(final Config config) throws IllegalArgumentException, Exception;

/**
* Called to gracefully shutdown the plugin. Implementations should close
* any IO they have open
* @return A deferred object that indicates the completion of the request.
* The {@link Object} has not special meaning and can be {@code null}
* (think of it as {@code Deferred<Void>}).
*/
public abstract Deferred<Object> shutdown();

/**
* Should return the version of this plugin in the format:
* MAJOR.MINOR.MAINT, e.g. "2.0.1". The MAJOR version should match the major
* version of OpenTSDB the plugin is meant to work with.
* @return A version string used to log the loaded version
*/
public abstract String version();

/**
* Should return the version of this plugin in the format:
* MAJOR.MINOR.MAINT, e.g. "2.0.1". The MAJOR version should match the major
* version of OpenTSDB the plugin is meant to work with.
* @return A version string used to log the loaded version
*/
public abstract String getType();

/**
* Called by the TSD when a request for statistics collection has come in. The
* implementation may provide one or more statistics. If no statistics are
* available for the implementation, simply stub the method.
* @param collector The collector used for emitting statistics
*/
public abstract void collectStats(final StatsCollector collector);

}
55 changes: 54 additions & 1 deletion src/tools/TSDMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@
package net.opentsdb.tools;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
Expand All @@ -34,6 +41,8 @@
import net.opentsdb.tsd.RpcManager;
import net.opentsdb.utils.Config;
import net.opentsdb.utils.FileSystem;
import net.opentsdb.utils.Pair;
import net.opentsdb.utils.PluginLoader;
import net.opentsdb.utils.Threads;

/**
Expand All @@ -53,6 +62,11 @@ static void usage(final ArgP argp, final String errmsg, final int retval) {
System.exit(retval);
}

/** A map of configured filters for use in querying */
private static Map<String, Pair<Class<?>, Constructor<? extends StartupPlugin>>>
startupPlugin_filter_map = new HashMap<String,
Pair<Class<?>, Constructor<? extends StartupPlugin>>>();

private static final short DEFAULT_FLUSH_INTERVAL = 1000;

private static TSDB tsdb = null;
Expand Down Expand Up @@ -145,9 +159,21 @@ public static void main(String[] args) throws IOException {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
new Threads.PrependThreadNamer());
}


StartupPlugin startup = null;
try {
startup = loadStartupPlugins(config);
} catch (IllegalArgumentException e) {
usage(argp, e.getMessage(), 3);
} catch (Exception e) {
throw new RuntimeException("Initialization failed", e);
}

try {
tsdb = new TSDB(config);
if (startup != null) {
tsdb.setStartup(startup);
}
tsdb.initializePlugins(true);
if (config.getBoolean("tsd.storage.hbase.prefetch_meta")) {
tsdb.preFetchHBaseMeta();
Expand Down Expand Up @@ -198,6 +224,33 @@ public static void main(String[] args) throws IOException {
// The server is now running in separate threads, we can exit main.
}

private static StartupPlugin loadStartupPlugins(Config config) {
Logger log = LoggerFactory.getLogger(TSDMain.class);
// load the startup plugin if enabled
StartupPlugin startup = null;

if (config.getBoolean("tsd.startup.enable")) {
startup = PluginLoader.loadSpecificPlugin(
config.getString("tsd.startup.plugin"), StartupPlugin.class);
if (startup == null) {
throw new IllegalArgumentException("Unable to locate startup plugin: " +
config.getString("tsd.startup.plugin"));
}
try {
startup.initialize(config);
} catch (Exception e) {
throw new RuntimeException("Failed to initialize startup plugin", e);
}
log.info("Successfully initialized startup plugin [" +
startup.getClass().getCanonicalName() + "] version: "
+ startup.version());
} else {
startup = null;
}

return startup;
}

private static void registerShutdownHook() {
final class TSDBShutdown extends Thread {
public TSDBShutdown() {
Expand Down

0 comments on commit c9b8ac7

Please sign in to comment.