From 92aa0554faea955e70fa752e19828729334c4033 Mon Sep 17 00:00:00 2001 From: choi se Date: Thu, 5 Mar 2020 00:21:08 +0900 Subject: [PATCH] Improve concurrency for needed parts. (#3107) * Change concurrent Map * Change concurrent Map * HashMap changes for unneeded parts. * HashMap changes for unneeded parts. * Review changes * Changes HashMap for unneeded parts. * Improve concurrency for needed parts. * Remove unused imports. * Remove unused imports. * Remove unused imports. * Fix NPE (cherry picked from commit 545d3814b315c29d3e396309a2ededaad193ec32) * Fix WhitespaceAround * Add dummy Object * Fix ConstantName (cherry picked from commit 8d6d5067072e92d6e276f93e18297ddedc647c6c) --- .../api/metric/MultiAssignableMetric.java | 3 ++- .../heron/api/metric/MultiCountMetric.java | 3 ++- .../heron/api/metric/MultiReducedMetric.java | 3 ++- .../org/apache/heron/api/tuple/Fields.java | 4 ++-- .../heron/common/network/HeronClient.java | 18 ++++++++++-------- .../heron/common/network/HeronServer.java | 8 ++++---- .../common/utils/metrics/MetricsCollector.java | 6 +++--- .../common/utils/misc/PhysicalPlanHelper.java | 3 ++- .../topology/GeneralTopologyContextImpl.java | 5 +++-- .../storm/metric/api/MultiCountMetric.java | 3 ++- .../storm/metric/api/MultiReducedMetric.java | 3 ++- .../storm/metric/api/MultiCountMetric.java | 3 ++- .../storm/metric/api/MultiReducedMetric.java | 3 ++- 13 files changed, 38 insertions(+), 27 deletions(-) diff --git a/heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java b/heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java index d00a93ccc94..c34bc289f11 100644 --- a/heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java +++ b/heron/api/src/java/org/apache/heron/api/metric/MultiAssignableMetric.java @@ -21,9 +21,10 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class MultiAssignableMetric implements IMetric> { - private final Map> value = new HashMap<>(); + private final Map> value = new ConcurrentHashMap<>(); private T initialValue; public MultiAssignableMetric(T initialValue) { diff --git a/heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java b/heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java index 2f2fee04750..979037a8602 100644 --- a/heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java +++ b/heron/api/src/java/org/apache/heron/api/metric/MultiCountMetric.java @@ -21,9 +21,10 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class MultiCountMetric implements IMetric> { - private Map value = new HashMap<>(); + private Map value = new ConcurrentHashMap<>(); public MultiCountMetric() { } diff --git a/heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java b/heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java index 951279366fa..39ed0909a63 100644 --- a/heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java +++ b/heron/api/src/java/org/apache/heron/api/metric/MultiReducedMetric.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /* * A reduce metric that can hold multiple scoped values. @@ -29,7 +30,7 @@ * @param type of reduced value */ public class MultiReducedMetric implements IMetric> { - private Map> value = new HashMap<>(); + private Map> value = new ConcurrentHashMap<>(); private IReducer reducer; public MultiReducedMetric(IReducer reducer) { diff --git a/heron/api/src/java/org/apache/heron/api/tuple/Fields.java b/heron/api/src/java/org/apache/heron/api/tuple/Fields.java index 50e99055b92..8526c14c5a5 100644 --- a/heron/api/src/java/org/apache/heron/api/tuple/Fields.java +++ b/heron/api/src/java/org/apache/heron/api/tuple/Fields.java @@ -22,16 +22,16 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class Fields implements Iterable, Serializable { private static final long serialVersionUID = -1045737418722082345L; private List fields; - private Map mIndex = new HashMap(); + private Map mIndex = new ConcurrentHashMap(); public Fields(String... pFields) { this(Arrays.asList(pFields)); diff --git a/heron/common/src/java/org/apache/heron/common/network/HeronClient.java b/heron/common/src/java/org/apache/heron/common/network/HeronClient.java index ff30b21adad..58d2d42970e 100644 --- a/heron/common/src/java/org/apache/heron/common/network/HeronClient.java +++ b/heron/common/src/java/org/apache/heron/common/network/HeronClient.java @@ -24,9 +24,10 @@ import java.nio.channels.SelectableChannel; import java.nio.channels.SocketChannel; import java.time.Duration; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; @@ -57,6 +58,7 @@ */ public abstract class HeronClient implements ISelectHandler { private static final Logger LOG = Logger.getLogger(HeronClient.class.getName()); + private static final Object DUMMY = new Object(); // When we send a request, we need to: // record the the context for this particular RID, and prepare the response for that RID @@ -99,9 +101,9 @@ public HeronClient(NIOLooper s, String host, int port, HeronSocketOptions option socketOptions = options; isConnected = false; - contextMap = new HashMap(); - responseMessageMap = new HashMap(); - messageMap = new HashMap(); + contextMap = new ConcurrentHashMap(); + responseMessageMap = new ConcurrentHashMap(); + messageMap = new ConcurrentHashMap(); } // Register the protobuf Message's name with protobuf Message @@ -193,7 +195,7 @@ public void sendRequest(Message request, Object context, Message.Builder respons Duration timeout) { // Pack it as a no-timeout request and send it! final REQID rid = REQID.generate(); - contextMap.put(rid, context); + contextMap.put(rid, Objects.nonNull(context) ? context : DUMMY); // Fix NPE responseMessageMap.put(rid, responseBuilder); // Add timeout for this request if necessary @@ -402,15 +404,15 @@ public void forceFlushWithBestEffort() { // Following protected methods are just used for testing ///////////////////////////////////////////////////////// protected Map getMessageMap() { - return new HashMap(messageMap); + return new ConcurrentHashMap(messageMap); } protected Map getResponseMessageMap() { - return new HashMap(responseMessageMap); + return new ConcurrentHashMap(responseMessageMap); } protected Map getContextMap() { - return new HashMap(contextMap); + return new ConcurrentHashMap<>(contextMap); } protected SocketChannelHelper getSocketChannelHelper() { diff --git a/heron/common/src/java/org/apache/heron/common/network/HeronServer.java b/heron/common/src/java/org/apache/heron/common/network/HeronServer.java index 4ee87f9ca63..8589f58da48 100644 --- a/heron/common/src/java/org/apache/heron/common/network/HeronServer.java +++ b/heron/common/src/java/org/apache/heron/common/network/HeronServer.java @@ -26,9 +26,9 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.time.Duration; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; @@ -71,9 +71,9 @@ public HeronServer(NIOLooper s, String host, int port, HeronSocketOptions option nioLooper = s; endpoint = new InetSocketAddress(host, port); socketOptions = options; - requestMap = new HashMap(); - messageMap = new HashMap(); - activeConnections = new HashMap(); + requestMap = new ConcurrentHashMap(); + messageMap = new ConcurrentHashMap(); + activeConnections = new ConcurrentHashMap(); } public InetSocketAddress getEndpoint() { diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java index 0a2bfc9d37b..f0b1166348e 100644 --- a/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java +++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java @@ -22,10 +22,10 @@ import java.time.Duration; import java.util.Collection; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Logger; import org.apache.heron.api.metric.CumulativeCountMetric; @@ -53,8 +53,8 @@ public class MetricsCollector implements IMetricsRegister { public MetricsCollector(WakeableLooper runnableToGatherMetrics, Communicator queue) { - metrics = new HashMap<>(); - timeBucketToMetricNames = new HashMap<>(); + metrics = new ConcurrentHashMap<>(); + timeBucketToMetricNames = new ConcurrentHashMap<>(); this.queue = queue; this.runnableToGatherMetrics = runnableToGatherMetrics; metricCollectionCount = new CumulativeCountMetric(); diff --git a/heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java b/heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java index 485e821eeb5..6b4d504fa5d 100644 --- a/heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java +++ b/heron/common/src/java/org/apache/heron/common/utils/misc/PhysicalPlanHelper.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Logger; import org.apache.heron.api.Config; @@ -101,7 +102,7 @@ public PhysicalPlanHelper(PhysicalPlans.PhysicalPlan pplan, String instanceId) { } // setup outputSchema - outputSchema = new HashMap(); + outputSchema = new ConcurrentHashMap(); List outputs; if (mySpout != null) { outputs = mySpout.getOutputsList(); diff --git a/heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java b/heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java index 94328719ff3..e06c18c8eff 100644 --- a/heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java +++ b/heron/common/src/java/org/apache/heron/common/utils/topology/GeneralTopologyContextImpl.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.heron.api.Config; import org.apache.heron.api.generated.TopologyAPI; @@ -63,8 +64,8 @@ public GeneralTopologyContextImpl(Map clusterConfig, this.topology = topology; this.topologyConfig = new HashMap<>(clusterConfig); this.taskToComponentMap = taskToComponentMap; - this.inputs = new HashMap<>(); - this.outputs = new HashMap<>(); + this.inputs = new ConcurrentHashMap<>(); + this.outputs = new ConcurrentHashMap<>(); this.componentsOutputFields = new HashMap<>(); for (int i = 0; i < this.topology.getSpoutsCount(); ++i) { diff --git a/storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java b/storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java index 6e126fe6b09..7f2abf8d192 100644 --- a/storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java +++ b/storm-compatibility/src/java/backtype/storm/metric/api/MultiCountMetric.java @@ -20,9 +20,10 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class MultiCountMetric implements IMetric { - private Map value = new HashMap<>(); + private Map value = new ConcurrentHashMap<>(); public MultiCountMetric() { } diff --git a/storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java b/storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java index 91c1eeae703..4f0f85510f1 100644 --- a/storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java +++ b/storm-compatibility/src/java/backtype/storm/metric/api/MultiReducedMetric.java @@ -20,10 +20,11 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; @SuppressWarnings("rawtypes") public class MultiReducedMetric implements IMetric { - private Map value = new HashMap<>(); + private Map value = new ConcurrentHashMap<>(); private IReducer reducer; public MultiReducedMetric(IReducer reducer) { diff --git a/storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java b/storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java index f55eab532ca..c9119475afa 100644 --- a/storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java +++ b/storm-compatibility/src/java/org/apache/storm/metric/api/MultiCountMetric.java @@ -20,9 +20,10 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class MultiCountMetric implements IMetric { - private Map value = new HashMap<>(); + private Map value = new ConcurrentHashMap<>(); public MultiCountMetric() { } diff --git a/storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java b/storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java index 15e05a24049..0866773175d 100644 --- a/storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java +++ b/storm-compatibility/src/java/org/apache/storm/metric/api/MultiReducedMetric.java @@ -20,10 +20,11 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; @SuppressWarnings("rawtypes") public class MultiReducedMetric implements IMetric { - private Map value = new HashMap<>(); + private Map value = new ConcurrentHashMap<>(); private IReducer reducer; public MultiReducedMetric(IReducer reducer) {