diff --git a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java index 907f3d2d661..51fba620b76 100644 --- a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java +++ b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java @@ -97,6 +97,8 @@ public class MetricsManager { private final String topologyName; private final String metricsmgrId; + private final long mainThreadId; + /** * Metrics manager constructor */ @@ -123,6 +125,8 @@ public MetricsManager(String topologyName, String serverHost, this.metricsCollector = new MetricsCollector(metricsManagerServerLoop, metricsQueue); this.heronMetricsExportIntervalSec = systemConfig.getHeronMetricsExportIntervalSec(); + this.mainThreadId = Thread.currentThread().getId(); + // Set up the internal Metrics Export routine setupInternalMetricsExport(); @@ -192,8 +196,8 @@ public static void main(String[] args) throws IOException { if (args.length != 6) { throw new RuntimeException( "Invalid arguments; Usage: java com.twitter.heron.metricsmgr.MetricsManager " - + " " - + ""); + + " " + + ""); } String metricsmgrId = args[0]; @@ -221,7 +225,7 @@ public static void main(String[] args) throws IOException { LoggingHelper.addLoggingHandler(new ErrorReportLoggingHandler()); LOG.info(String.format("Starting Metrics Manager for topology %s with topologyId %s with " - + "Metrics Manager Id %s, Merics Manager Port: %d.", + + "Metrics Manager Id %s, Merics Manager Port: %d.", topologyName, topologyId, metricsmgrId, metricsPort)); LOG.info("System Config: " + systemConfig); @@ -328,26 +332,46 @@ public class DefaultExceptionHandler implements Thread.UncaughtExceptionHandler * Handler for uncaughtException */ public void uncaughtException(Thread thread, Throwable exception) { - String threadName = thread.getName(); + // We would fail fast when errors occur + if (exception instanceof Error) { + LOG.log(Level.SEVERE, + "Error caught in thread: " + thread.getName() + + " with thread id: " + thread.getId() + ". Process halting...", + exception); + Runtime.getRuntime().halt(1); + } + + // We would fail fast when exceptions happen in main thread + if (thread.getId() == mainThreadId) { + LOG.log(Level.SEVERE, + "Exception caught in main thread. Process halting...", + exception); + Runtime.getRuntime().halt(1); + } + LOG.log(Level.SEVERE, - "Exception caught in thread: " + threadName + " with thread id: " + thread.getId(), + "Exception caught in thread: " + thread.getName() + " with thread id: " + thread.getId(), exception); - String sinkId = threadName; - Integer thisSinkRetryAttempts = sinksRetryAttempts.remove(sinkId); - - // Remove the old sink executor - SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId); + String sinkId = null; + Integer thisSinkRetryAttempts = 0; - if (oldSinkExecutor != null) { + // We enforced the name of thread running particular IMetricsSink equal to its sink-id + // If the thread name is a key of SinkExecutors, then it is a thread running IMetricsSink + if (sinkExecutors.containsKey(thread.getName())) { + sinkId = thread.getName(); + // Remove the old sink executor + SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId); // Remove the unneeded Communicator bind with Metrics Manager Server metricsManagerServer.removeSinkCommunicator(oldSinkExecutor.getCommunicator()); - } - // Close the sink - SysUtils.closeIgnoringExceptions(oldSinkExecutor); + // Close the sink + SysUtils.closeIgnoringExceptions(oldSinkExecutor); - if (oldSinkExecutor != null && thisSinkRetryAttempts != 0) { + thisSinkRetryAttempts = sinksRetryAttempts.remove(sinkId); + } + + if (sinkId != null && thisSinkRetryAttempts != 0) { LOG.info(String.format("Restarting IMetricsSink: %s with %d available retries", sinkId, thisSinkRetryAttempts)); @@ -363,24 +387,23 @@ public void uncaughtException(Thread thread, Throwable exception) { } sinksRetryAttempts.put(sinkId, thisSinkRetryAttempts); - // Update the list of Communicator in Metrics Manager Server metricsManagerServer.addSinkCommunicator(newSinkExecutor.getCommunicator()); // Restart it executors.execute(newSinkExecutor); - } else if (oldSinkExecutor != null + } else if (sinkId != null && thisSinkRetryAttempts == 0 && sinkExecutors.size() > 0) { // If the dead executor is the only one executor and it is removed, // e.g. sinkExecutors.size() == 0, we would exit the process directly - LOG.severe("Could not recover from exceptions for IMetricsSink: " + sinkId); + LOG.severe("Failed to recover from exceptions for IMetricsSink: " + sinkId); LOG.info(sinkId + " would close and keep running rest sinks: " + sinkExecutors.keySet()); } else { - // We met metrics manager itself exceptions or we have retried too many times + // It is not recoverable (retried too many times, or not an exception from IMetricsSink) // So we would do basic cleaning and exit - LOG.info("Could not recover from exceptions; Metrics Manager Exiting"); + LOG.info("Failed to recover from exceptions; Metrics Manager Exiting"); for (Handler handler : java.util.logging.Logger.getLogger("").getHandlers()) { handler.close(); } @@ -388,7 +411,6 @@ public void uncaughtException(Thread thread, Throwable exception) { // thread in the pool. Threads may implement a clean Interrupt logic. executors.shutdownNow(); - // TODO : It is not clear if this signal should be sent to all the threads // (including threads not owned by HeronInstance). To be safe, not sending these // interrupts. Runtime.getRuntime().halt(1);