Skip to content
This repository has been archived by the owner on Mar 31, 2024. It is now read-only.

Commit

Permalink
If no hosts are configured ESExporter should use the local node's Htt…
Browse files Browse the repository at this point in the history
…pServer to figure out binding address

Closes elastic#66
  • Loading branch information
bleskes committed Jun 16, 2014
1 parent 759116f commit 9400863
Showing 1 changed file with 66 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,28 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.smile.SmileXContent;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.marvel.agent.Utils;
import org.elasticsearch.marvel.agent.event.Event;
import org.elasticsearch.node.settings.NodeSettingsService;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Map;
Expand All @@ -63,14 +69,18 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
public static final String SETTINGS_INDEX_PREFIX = SETTINGS_PREFIX + "index.prefix";
public static final String SETTINGS_INDEX_TIME_FORMAT = SETTINGS_PREFIX + "index.timeformat";
public static final String SETTINGS_TIMEOUT = SETTINGS_PREFIX + "timeout";
public static final String SETTINGS_READ_TIMEOUT = SETTINGS_PREFIX + "read_timeout";

volatile String[] hosts;
volatile boolean boundToLocalNode = false;
final String indexPrefix;
final DateTimeFormatter indexTimeFormatter;
volatile int timeoutInMillis;
volatile int readTimeoutInMillis;

final ClusterService clusterService;
final ClusterName clusterName;
HttpServer httpServer;

public final static DateTimeFormatter defaultDatePrinter = Joda.forPattern("date_time").printer();

Expand All @@ -95,12 +105,13 @@ public ESExporter(Settings settings, ClusterService clusterService, ClusterName

this.clusterName = clusterName;

hosts = settings.getAsArray(SETTINGS_HOSTS, new String[]{"localhost:9200"});
hosts = settings.getAsArray(SETTINGS_HOSTS, Strings.EMPTY_ARRAY);
indexPrefix = settings.get(SETTINGS_INDEX_PREFIX, ".marvel");
String indexTimeFormat = settings.get(SETTINGS_INDEX_TIME_FORMAT, "YYYY.MM.dd");
indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat).withZoneUTC();

timeoutInMillis = (int) settings.getAsTime(SETTINGS_TIMEOUT, new TimeValue(6000)).millis();
readTimeoutInMillis = (int) settings.getAsTime(SETTINGS_READ_TIMEOUT, new TimeValue(timeoutInMillis * 10)).millis();

nodeStatsRenderer = new NodeStatsRenderer();
shardStatsRenderer = new ShardStatsRenderer();
Expand All @@ -113,6 +124,7 @@ public ESExporter(Settings settings, ClusterService clusterService, ClusterName

dynamicSettings.addDynamicSetting(SETTINGS_HOSTS + ".*");
dynamicSettings.addDynamicSetting(SETTINGS_TIMEOUT);
dynamicSettings.addDynamicSetting(SETTINGS_READ_TIMEOUT);
nodeSettingsService.addListener(this);

logger.debug("initialized with targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat);
Expand All @@ -123,6 +135,11 @@ public String name() {
return "es_exporter";
}

@Inject
public void setHttpServer(HttpServer httpServer) {
this.httpServer = httpServer;
}


@Override
public void exportNodeStats(NodeStats nodeStats) {
Expand Down Expand Up @@ -180,9 +197,7 @@ private HttpURLConnection openExportingConnection() {

logger.trace("setting up an export connection");
HttpURLConnection conn = openConnection("POST", "_bulk", XContentType.SMILE.restContentType());
if (conn == null) {
logger.error("could not connect to any configured elasticsearch instances: [{}]", hosts);
} else if (keepAliveThread == null || !keepAliveThread.isAlive()) {
if (conn != null && (keepAliveThread == null || !keepAliveThread.isAlive())) {
// start keep alive upon successful connection if not there.
initKeepAliveThread();
}
Expand Down Expand Up @@ -296,6 +311,44 @@ private HttpURLConnection openConnection(String method, String path) {

private HttpURLConnection openConnection(String method, String path, String contentType) {
int hostIndex = 0;
if (hosts.length == 0) {
if (httpServer == null) {
logger.debug("local http server is not yet injected. can't connect.");
return null;
}
logger.debug("deriving host setting from httpServer");
BoundTransportAddress boundAddress = httpServer.info().address();
if (httpServer.lifecycleState() != Lifecycle.State.STARTED || boundAddress == null || boundAddress.boundAddress() == null) {
logger.debug("local http server is not yet started. can't connect");
return null;
}
if (boundAddress.boundAddress().uniqueAddressTypeId() != 1) {
logger.error("local node is not bound via the http transport. can't connect");
return null;
}
InetSocketTransportAddress address = (InetSocketTransportAddress) boundAddress.boundAddress();
InetSocketAddress inetSocketAddress = address.address();
InetAddress inetAddress = inetSocketAddress.getAddress();
if (inetAddress == null) {
logger.error("failed to extract the ip address of current node.");
return null;
}

String host = inetAddress.getHostAddress();
if (host.indexOf(":") >= 0) {
// ipv6
host = "[" + host + "]";
}

hosts = new String[]{host + ":" + inetSocketAddress.getPort()};
boundToLocalNode = true;
}

if (boundToLocalNode && httpServer.lifecycleState() != Lifecycle.State.STARTED) {
logger.debug("local node http server is not started. can't connect");
return null;
}

try {
for (; hostIndex < hosts.length; hostIndex++) {
String host = hosts[hostIndex];
Expand All @@ -310,6 +363,7 @@ private HttpURLConnection openConnection(String method, String path, String cont

conn.setRequestMethod(method);
conn.setConnectTimeout(timeoutInMillis);
conn.setReadTimeout(readTimeoutInMillis);
if (contentType != null) {
conn.setRequestProperty("Content-Type", XContentType.SMILE.restContentType());
}
Expand Down Expand Up @@ -339,6 +393,8 @@ private HttpURLConnection openConnection(String method, String path, String cont
}
}

logger.error("could not connect to any configured elasticsearch instances: [{}]", hosts);

return null;
}

Expand Down Expand Up @@ -382,7 +438,6 @@ private boolean checkAndUploadIndexTemplate() {

HttpURLConnection conn = openConnection("GET", "_template/marvel");
if (conn == null) {
logger.error("could not connect to any configured elasticsearch instances: [{}]", hosts);
return false;
}

Expand Down Expand Up @@ -448,6 +503,12 @@ public void onRefreshSettings(Settings settings) {
timeoutInMillis = (int) newTimeout.millis();
}

newTimeout = settings.getAsTime(SETTINGS_READ_TIMEOUT, null);
if (newTimeout != null) {
logger.info("connection read timeout set to [{}]", newTimeout);
readTimeoutInMillis = (int) newTimeout.millis();
}

String[] newHosts = settings.getAsArray(SETTINGS_HOSTS, null);
if (newHosts != null) {
logger.info("hosts set to [{}]", Strings.arrayToCommaDelimitedString(newHosts));
Expand Down

0 comments on commit 9400863

Please sign in to comment.