From d2990d06c2161ab7a879b21b0804607aa9ab9865 Mon Sep 17 00:00:00 2001 From: Maosong Fu Date: Wed, 5 Oct 2016 13:27:36 -0700 Subject: [PATCH 1/3] Make stream mgr fail fast when unexpected errors happen. --- .../heron/metricsmgr/MetricsManager.java | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java index 907f3d2d661..bf3b3dce862 100644 --- a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java +++ b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java @@ -97,6 +97,8 @@ public class MetricsManager { private final String topologyName; private final String metricsmgrId; + private final long mainThreadId; + /** * Metrics manager constructor */ @@ -123,6 +125,8 @@ public MetricsManager(String topologyName, String serverHost, this.metricsCollector = new MetricsCollector(metricsManagerServerLoop, metricsQueue); this.heronMetricsExportIntervalSec = systemConfig.getHeronMetricsExportIntervalSec(); + this.mainThreadId = Thread.currentThread().getId(); + // Set up the internal Metrics Export routine setupInternalMetricsExport(); @@ -192,8 +196,8 @@ public static void main(String[] args) throws IOException { if (args.length != 6) { throw new RuntimeException( "Invalid arguments; Usage: java com.twitter.heron.metricsmgr.MetricsManager " - + " " - + ""); + + " " + + ""); } String metricsmgrId = args[0]; @@ -221,7 +225,7 @@ public static void main(String[] args) throws IOException { LoggingHelper.addLoggingHandler(new ErrorReportLoggingHandler()); LOG.info(String.format("Starting Metrics Manager for topology %s with topologyId %s with " - + "Metrics Manager Id %s, Merics Manager Port: %d.", + + "Metrics Manager Id %s, Merics Manager Port: %d.", topologyName, topologyId, metricsmgrId, metricsPort)); LOG.info("System Config: " + systemConfig); @@ -328,13 +332,16 @@ public class DefaultExceptionHandler implements Thread.UncaughtExceptionHandler * Handler for uncaughtException */ public void uncaughtException(Thread thread, Throwable exception) { - String threadName = thread.getName(); LOG.log(Level.SEVERE, - "Exception caught in thread: " + threadName + " with thread id: " + thread.getId(), + "Exception caught in thread: " + thread.getName() + " with thread id: " + thread.getId(), exception); - String sinkId = threadName; - Integer thisSinkRetryAttempts = sinksRetryAttempts.remove(sinkId); + if (exception instanceof Error || thread.getId() == mainThreadId) { + LOG.severe("Would not recover from error. Metrics Manager halts immediately"); + Runtime.getRuntime().halt(1); + } + + String sinkId = thread.getName(); // Remove the old sink executor SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId); @@ -342,10 +349,15 @@ public void uncaughtException(Thread thread, Throwable exception) { if (oldSinkExecutor != null) { // Remove the unneeded Communicator bind with Metrics Manager Server metricsManagerServer.removeSinkCommunicator(oldSinkExecutor.getCommunicator()); + + // Close the sink + SysUtils.closeIgnoringExceptions(oldSinkExecutor); } - // Close the sink - SysUtils.closeIgnoringExceptions(oldSinkExecutor); + Integer thisSinkRetryAttempts = 0; + if (sinksRetryAttempts.containsKey(sinkId)) { + thisSinkRetryAttempts = sinksRetryAttempts.remove(sinkId); + } if (oldSinkExecutor != null && thisSinkRetryAttempts != 0) { LOG.info(String.format("Restarting IMetricsSink: %s with %d available retries", From 23e194fdda5f8f344a476457ead43200b1a9fb2e Mon Sep 17 00:00:00 2001 From: Maosong Fu Date: Wed, 5 Oct 2016 13:28:18 -0700 Subject: [PATCH 2/3] Make stream mgr fail fast when unexpected errors happen. --- .../src/java/com/twitter/heron/metricsmgr/MetricsManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java index bf3b3dce862..b6c972e1382 100644 --- a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java +++ b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java @@ -375,7 +375,6 @@ public void uncaughtException(Thread thread, Throwable exception) { } sinksRetryAttempts.put(sinkId, thisSinkRetryAttempts); - // Update the list of Communicator in Metrics Manager Server metricsManagerServer.addSinkCommunicator(newSinkExecutor.getCommunicator()); From fff073642e8481cd976656972fb6a2e1c2c1f4d3 Mon Sep 17 00:00:00 2001 From: Maosong Fu Date: Thu, 6 Oct 2016 15:15:16 -0700 Subject: [PATCH 3/3] Address Bill's feedback --- .../heron/metricsmgr/MetricsManager.java | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java index b6c972e1382..51fba620b76 100644 --- a/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java +++ b/heron/metricsmgr/src/java/com/twitter/heron/metricsmgr/MetricsManager.java @@ -332,34 +332,46 @@ public class DefaultExceptionHandler implements Thread.UncaughtExceptionHandler * Handler for uncaughtException */ public void uncaughtException(Thread thread, Throwable exception) { - LOG.log(Level.SEVERE, - "Exception caught in thread: " + thread.getName() + " with thread id: " + thread.getId(), - exception); + // We would fail fast when errors occur + if (exception instanceof Error) { + LOG.log(Level.SEVERE, + "Error caught in thread: " + thread.getName() + + " with thread id: " + thread.getId() + ". Process halting...", + exception); + Runtime.getRuntime().halt(1); + } - if (exception instanceof Error || thread.getId() == mainThreadId) { - LOG.severe("Would not recover from error. Metrics Manager halts immediately"); + // We would fail fast when exceptions happen in main thread + if (thread.getId() == mainThreadId) { + LOG.log(Level.SEVERE, + "Exception caught in main thread. Process halting...", + exception); Runtime.getRuntime().halt(1); } - String sinkId = thread.getName(); + LOG.log(Level.SEVERE, + "Exception caught in thread: " + thread.getName() + " with thread id: " + thread.getId(), + exception); - // Remove the old sink executor - SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId); + String sinkId = null; + Integer thisSinkRetryAttempts = 0; - if (oldSinkExecutor != null) { + // We enforced the name of thread running particular IMetricsSink equal to its sink-id + // If the thread name is a key of SinkExecutors, then it is a thread running IMetricsSink + if (sinkExecutors.containsKey(thread.getName())) { + sinkId = thread.getName(); + // Remove the old sink executor + SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId); // Remove the unneeded Communicator bind with Metrics Manager Server metricsManagerServer.removeSinkCommunicator(oldSinkExecutor.getCommunicator()); // Close the sink SysUtils.closeIgnoringExceptions(oldSinkExecutor); - } - Integer thisSinkRetryAttempts = 0; - if (sinksRetryAttempts.containsKey(sinkId)) { thisSinkRetryAttempts = sinksRetryAttempts.remove(sinkId); } - if (oldSinkExecutor != null && thisSinkRetryAttempts != 0) { + if (sinkId != null && thisSinkRetryAttempts != 0) { LOG.info(String.format("Restarting IMetricsSink: %s with %d available retries", sinkId, thisSinkRetryAttempts)); @@ -380,18 +392,18 @@ public void uncaughtException(Thread thread, Throwable exception) { // Restart it executors.execute(newSinkExecutor); - } else if (oldSinkExecutor != null + } else if (sinkId != null && thisSinkRetryAttempts == 0 && sinkExecutors.size() > 0) { // If the dead executor is the only one executor and it is removed, // e.g. sinkExecutors.size() == 0, we would exit the process directly - LOG.severe("Could not recover from exceptions for IMetricsSink: " + sinkId); + LOG.severe("Failed to recover from exceptions for IMetricsSink: " + sinkId); LOG.info(sinkId + " would close and keep running rest sinks: " + sinkExecutors.keySet()); } else { - // We met metrics manager itself exceptions or we have retried too many times + // It is not recoverable (retried too many times, or not an exception from IMetricsSink) // So we would do basic cleaning and exit - LOG.info("Could not recover from exceptions; Metrics Manager Exiting"); + LOG.info("Failed to recover from exceptions; Metrics Manager Exiting"); for (Handler handler : java.util.logging.Logger.getLogger("").getHandlers()) { handler.close(); } @@ -399,7 +411,6 @@ public void uncaughtException(Thread thread, Throwable exception) { // thread in the pool. Threads may implement a clean Interrupt logic. executors.shutdownNow(); - // TODO : It is not clear if this signal should be sent to all the threads // (including threads not owned by HeronInstance). To be safe, not sending these // interrupts. Runtime.getRuntime().halt(1);