Skip to content

Commit

Permalink
Improve concurrency for needed parts. (apache#3107)
Browse files Browse the repository at this point in the history
* Change concurrent Map

* Change concurrent Map

* HashMap changes for unneeded parts.

* HashMap changes for unneeded parts.

* Review changes

* Changes HashMap for unneeded parts.

* Improve concurrency for needed parts.

* Remove unused imports.

* Remove unused imports.

* Remove unused imports.

* Fix NPE

(cherry picked from commit 545d381)

* Fix WhitespaceAround

* Add dummy Object

* Fix ConstantName

(cherry picked from commit 8d6d506)
  • Loading branch information
thinker0 authored and sreev committed Apr 9, 2020
1 parent 37aca30 commit 92aa055
Show file tree
Hide file tree
Showing 13 changed files with 38 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MultiAssignableMetric<T extends Number> implements IMetric<Map<String, T>> {
private final Map<String, AssignableMetric<T>> value = new HashMap<>();
private final Map<String, AssignableMetric<T>> value = new ConcurrentHashMap<>();
private T initialValue;

public MultiAssignableMetric(T initialValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MultiCountMetric implements IMetric<Map<String, Long>> {
private Map<String, CountMetric> value = new HashMap<>();
private Map<String, CountMetric> value = new ConcurrentHashMap<>();

public MultiCountMetric() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/*
* A reduce metric that can hold multiple scoped values.
Expand All @@ -29,7 +30,7 @@
* @param <V> type of reduced value
*/
public class MultiReducedMetric<T, U, V> implements IMetric<Map<String, V>> {
private Map<String, ReducedMetric<T, U, V>> value = new HashMap<>();
private Map<String, ReducedMetric<T, U, V>> value = new ConcurrentHashMap<>();
private IReducer<T, U, V> reducer;

public MultiReducedMetric(IReducer<T, U, V> reducer) {
Expand Down
4 changes: 2 additions & 2 deletions heron/api/src/java/org/apache/heron/api/tuple/Fields.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class Fields implements Iterable<String>, Serializable {
private static final long serialVersionUID = -1045737418722082345L;

private List<String> fields;
private Map<String, Integer> mIndex = new HashMap<String, Integer>();
private Map<String, Integer> mIndex = new ConcurrentHashMap<String, Integer>();

public Fields(String... pFields) {
this(Arrays.asList(pFields));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -57,6 +58,7 @@
*/
public abstract class HeronClient implements ISelectHandler {
private static final Logger LOG = Logger.getLogger(HeronClient.class.getName());
private static final Object DUMMY = new Object();

// When we send a request, we need to:
// record the the context for this particular RID, and prepare the response for that RID
Expand Down Expand Up @@ -99,9 +101,9 @@ public HeronClient(NIOLooper s, String host, int port, HeronSocketOptions option
socketOptions = options;

isConnected = false;
contextMap = new HashMap<REQID, Object>();
responseMessageMap = new HashMap<REQID, Message.Builder>();
messageMap = new HashMap<String, Message.Builder>();
contextMap = new ConcurrentHashMap<REQID, Object>();
responseMessageMap = new ConcurrentHashMap<REQID, Message.Builder>();
messageMap = new ConcurrentHashMap<String, Message.Builder>();
}

// Register the protobuf Message's name with protobuf Message
Expand Down Expand Up @@ -193,7 +195,7 @@ public void sendRequest(Message request, Object context, Message.Builder respons
Duration timeout) {
// Pack it as a no-timeout request and send it!
final REQID rid = REQID.generate();
contextMap.put(rid, context);
contextMap.put(rid, Objects.nonNull(context) ? context : DUMMY); // Fix NPE
responseMessageMap.put(rid, responseBuilder);

// Add timeout for this request if necessary
Expand Down Expand Up @@ -402,15 +404,15 @@ public void forceFlushWithBestEffort() {
// Following protected methods are just used for testing
/////////////////////////////////////////////////////////
protected Map<String, Message.Builder> getMessageMap() {
return new HashMap<String, Message.Builder>(messageMap);
return new ConcurrentHashMap<String, Message.Builder>(messageMap);
}

protected Map<REQID, Message.Builder> getResponseMessageMap() {
return new HashMap<REQID, Message.Builder>(responseMessageMap);
return new ConcurrentHashMap<REQID, Message.Builder>(responseMessageMap);
}

protected Map<REQID, Object> getContextMap() {
return new HashMap<REQID, Object>(contextMap);
return new ConcurrentHashMap<>(contextMap);
}

protected SocketChannelHelper getSocketChannelHelper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -71,9 +71,9 @@ public HeronServer(NIOLooper s, String host, int port, HeronSocketOptions option
nioLooper = s;
endpoint = new InetSocketAddress(host, port);
socketOptions = options;
requestMap = new HashMap<String, Message.Builder>();
messageMap = new HashMap<String, Message.Builder>();
activeConnections = new HashMap<SocketChannel, SocketChannelHelper>();
requestMap = new ConcurrentHashMap<String, Message.Builder>();
messageMap = new ConcurrentHashMap<String, Message.Builder>();
activeConnections = new ConcurrentHashMap<SocketChannel, SocketChannelHelper>();
}

public InetSocketAddress getEndpoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

import org.apache.heron.api.metric.CumulativeCountMetric;
Expand Down Expand Up @@ -53,8 +53,8 @@ public class MetricsCollector implements IMetricsRegister {

public MetricsCollector(WakeableLooper runnableToGatherMetrics,
Communicator<Metrics.MetricPublisherPublishMessage> queue) {
metrics = new HashMap<>();
timeBucketToMetricNames = new HashMap<>();
metrics = new ConcurrentHashMap<>();
timeBucketToMetricNames = new ConcurrentHashMap<>();
this.queue = queue;
this.runnableToGatherMetrics = runnableToGatherMetrics;
metricCollectionCount = new CumulativeCountMetric();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

import org.apache.heron.api.Config;
Expand Down Expand Up @@ -101,7 +102,7 @@ public PhysicalPlanHelper(PhysicalPlans.PhysicalPlan pplan, String instanceId) {
}

// setup outputSchema
outputSchema = new HashMap<String, Integer>();
outputSchema = new ConcurrentHashMap<String, Integer>();
List<TopologyAPI.OutputStream> outputs;
if (mySpout != null) {
outputs = mySpout.getOutputsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.heron.api.Config;
import org.apache.heron.api.generated.TopologyAPI;
Expand Down Expand Up @@ -63,8 +64,8 @@ public GeneralTopologyContextImpl(Map<String, Object> clusterConfig,
this.topology = topology;
this.topologyConfig = new HashMap<>(clusterConfig);
this.taskToComponentMap = taskToComponentMap;
this.inputs = new HashMap<>();
this.outputs = new HashMap<>();
this.inputs = new ConcurrentHashMap<>();
this.outputs = new ConcurrentHashMap<>();
this.componentsOutputFields = new HashMap<>();

for (int i = 0; i < this.topology.getSpoutsCount(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MultiCountMetric implements IMetric {
private Map<String, CountMetric> value = new HashMap<>();
private Map<String, CountMetric> value = new ConcurrentHashMap<>();

public MultiCountMetric() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@SuppressWarnings("rawtypes")
public class MultiReducedMetric implements IMetric {
private Map<String, ReducedMetric> value = new HashMap<>();
private Map<String, ReducedMetric> value = new ConcurrentHashMap<>();
private IReducer reducer;

public MultiReducedMetric(IReducer reducer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MultiCountMetric implements IMetric {
private Map<String, CountMetric> value = new HashMap<>();
private Map<String, CountMetric> value = new ConcurrentHashMap<>();

public MultiCountMetric() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@SuppressWarnings("rawtypes")
public class MultiReducedMetric implements IMetric {
private Map<String, ReducedMetric> value = new HashMap<>();
private Map<String, ReducedMetric> value = new ConcurrentHashMap<>();
private IReducer reducer;

public MultiReducedMetric(IReducer reducer) {
Expand Down

0 comments on commit 92aa055

Please sign in to comment.