Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Make metrics mgr fail fast when unexpected errors happen. #1473

Merged
merged 3 commits into from
Oct 6, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public class MetricsManager {
private final String topologyName;
private final String metricsmgrId;

private final long mainThreadId;

/**
* Metrics manager constructor
*/
Expand All @@ -123,6 +125,8 @@ public MetricsManager(String topologyName, String serverHost,
this.metricsCollector = new MetricsCollector(metricsManagerServerLoop, metricsQueue);
this.heronMetricsExportIntervalSec = systemConfig.getHeronMetricsExportIntervalSec();

this.mainThreadId = Thread.currentThread().getId();

// Set up the internal Metrics Export routine
setupInternalMetricsExport();

Expand Down Expand Up @@ -192,8 +196,8 @@ public static void main(String[] args) throws IOException {
if (args.length != 6) {
throw new RuntimeException(
"Invalid arguments; Usage: java com.twitter.heron.metricsmgr.MetricsManager "
+ "<id> <port> <topname> <topid> <heron_internals_config_filename> "
+ "<metrics_sinks_config_filename>");
+ "<id> <port> <topname> <topid> <heron_internals_config_filename> "
+ "<metrics_sinks_config_filename>");
}

String metricsmgrId = args[0];
Expand Down Expand Up @@ -221,7 +225,7 @@ public static void main(String[] args) throws IOException {
LoggingHelper.addLoggingHandler(new ErrorReportLoggingHandler());

LOG.info(String.format("Starting Metrics Manager for topology %s with topologyId %s with "
+ "Metrics Manager Id %s, Merics Manager Port: %d.",
+ "Metrics Manager Id %s, Merics Manager Port: %d.",
topologyName, topologyId, metricsmgrId, metricsPort));

LOG.info("System Config: " + systemConfig);
Expand Down Expand Up @@ -328,26 +332,46 @@ public class DefaultExceptionHandler implements Thread.UncaughtExceptionHandler
* Handler for uncaughtException
*/
public void uncaughtException(Thread thread, Throwable exception) {
String threadName = thread.getName();
// We would fail fast when errors occur
if (exception instanceof Error) {
LOG.log(Level.SEVERE,
"Error caught in thread: " + thread.getName()
+ " with thread id: " + thread.getId() + ". Process halting...",
exception);
Runtime.getRuntime().halt(1);
}

// We would fail fast when exceptions happen in main thread
if (thread.getId() == mainThreadId) {
LOG.log(Level.SEVERE,
"Exception caught in main thread. Process halting...",
exception);
Runtime.getRuntime().halt(1);
}

LOG.log(Level.SEVERE,
"Exception caught in thread: " + threadName + " with thread id: " + thread.getId(),
"Exception caught in thread: " + thread.getName() + " with thread id: " + thread.getId(),
exception);

String sinkId = threadName;
Integer thisSinkRetryAttempts = sinksRetryAttempts.remove(sinkId);

// Remove the old sink executor
SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId);
String sinkId = null;
Integer thisSinkRetryAttempts = 0;

if (oldSinkExecutor != null) {
// We enforced the name of thread running particular IMetricsSink equal to its sink-id
// If the thread name is a key of SinkExecutors, then it is a thread running IMetricsSink
if (sinkExecutors.containsKey(thread.getName())) {
sinkId = thread.getName();
// Remove the old sink executor
SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId);
// Remove the unneeded Communicator bind with Metrics Manager Server
metricsManagerServer.removeSinkCommunicator(oldSinkExecutor.getCommunicator());
}

// Close the sink
SysUtils.closeIgnoringExceptions(oldSinkExecutor);
// Close the sink
SysUtils.closeIgnoringExceptions(oldSinkExecutor);

if (oldSinkExecutor != null && thisSinkRetryAttempts != 0) {
thisSinkRetryAttempts = sinksRetryAttempts.remove(sinkId);
}

if (sinkId != null && thisSinkRetryAttempts != 0) {
LOG.info(String.format("Restarting IMetricsSink: %s with %d available retries",
sinkId, thisSinkRetryAttempts));

Expand All @@ -363,32 +387,30 @@ public void uncaughtException(Thread thread, Throwable exception) {
}
sinksRetryAttempts.put(sinkId, thisSinkRetryAttempts);


// Update the list of Communicator in Metrics Manager Server
metricsManagerServer.addSinkCommunicator(newSinkExecutor.getCommunicator());

// Restart it
executors.execute(newSinkExecutor);
} else if (oldSinkExecutor != null
} else if (sinkId != null
&& thisSinkRetryAttempts == 0
&& sinkExecutors.size() > 0) {
// If the dead executor is the only one executor and it is removed,
// e.g. sinkExecutors.size() == 0, we would exit the process directly

LOG.severe("Could not recover from exceptions for IMetricsSink: " + sinkId);
LOG.severe("Failed to recover from exceptions for IMetricsSink: " + sinkId);
LOG.info(sinkId + " would close and keep running rest sinks: " + sinkExecutors.keySet());
} else {
// We met metrics manager itself exceptions or we have retried too many times
// It is not recoverable (retried too many times, or not an exception from IMetricsSink)
// So we would do basic cleaning and exit
LOG.info("Could not recover from exceptions; Metrics Manager Exiting");
LOG.info("Failed to recover from exceptions; Metrics Manager Exiting");
for (Handler handler : java.util.logging.Logger.getLogger("").getHandlers()) {
handler.close();
}
// Attempts to shutdown all the thread in threadsPool. This will send Interrupt to every
// thread in the pool. Threads may implement a clean Interrupt logic.
executors.shutdownNow();

// TODO : It is not clear if this signal should be sent to all the threads
// (including threads not owned by HeronInstance). To be safe, not sending these
// interrupts.
Runtime.getRuntime().halt(1);
Expand Down