Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Improve concurrency for needed parts. #3107

Merged
merged 14 commits into from
Mar 4, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MultiAssignableMetric<T extends Number> implements IMetric<Map<String, T>> {
private final Map<String, AssignableMetric<T>> value = new HashMap<>();
private final Map<String, AssignableMetric<T>> value = new ConcurrentHashMap<>();
private T initialValue;

public MultiAssignableMetric(T initialValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MultiCountMetric implements IMetric<Map<String, Long>> {
private Map<String, CountMetric> value = new HashMap<>();
private Map<String, CountMetric> value = new ConcurrentHashMap<>();

public MultiCountMetric() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/*
* A reduce metric that can hold multiple scoped values.
Expand All @@ -29,7 +30,7 @@
* @param <V> type of reduced value
*/
public class MultiReducedMetric<T, U, V> implements IMetric<Map<String, V>> {
private Map<String, ReducedMetric<T, U, V>> value = new HashMap<>();
private Map<String, ReducedMetric<T, U, V>> value = new ConcurrentHashMap<>();
private IReducer<T, U, V> reducer;

public MultiReducedMetric(IReducer<T, U, V> reducer) {
Expand Down
4 changes: 2 additions & 2 deletions heron/api/src/java/org/apache/heron/api/tuple/Fields.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class Fields implements Iterable<String>, Serializable {
private static final long serialVersionUID = -1045737418722082345L;

private List<String> fields;
private Map<String, Integer> mIndex = new HashMap<String, Integer>();
private Map<String, Integer> mIndex = new ConcurrentHashMap<String, Integer>();

public Fields(String... pFields) {
this(Arrays.asList(pFields));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -57,6 +58,7 @@
*/
public abstract class HeronClient implements ISelectHandler {
private static final Logger LOG = Logger.getLogger(HeronClient.class.getName());
private static final Object DUMMY = new Object();

// When we send a request, we need to:
// record the the context for this particular RID, and prepare the response for that RID
Expand Down Expand Up @@ -99,9 +101,9 @@ public HeronClient(NIOLooper s, String host, int port, HeronSocketOptions option
socketOptions = options;

isConnected = false;
contextMap = new HashMap<REQID, Object>();
responseMessageMap = new HashMap<REQID, Message.Builder>();
messageMap = new HashMap<String, Message.Builder>();
contextMap = new ConcurrentHashMap<REQID, Object>();
responseMessageMap = new ConcurrentHashMap<REQID, Message.Builder>();
messageMap = new ConcurrentHashMap<String, Message.Builder>();
}

// Register the protobuf Message's name with protobuf Message
Expand Down Expand Up @@ -193,7 +195,7 @@ public void sendRequest(Message request, Object context, Message.Builder respons
Duration timeout) {
// Pack it as a no-timeout request and send it!
final REQID rid = REQID.generate();
contextMap.put(rid, context);
contextMap.put(rid, Objects.nonNull(context) ? context : DUMMY); // Fix NPE
responseMessageMap.put(rid, responseBuilder);

// Add timeout for this request if necessary
Expand Down Expand Up @@ -402,15 +404,15 @@ public void forceFlushWithBestEffort() {
// Following protected methods are just used for testing
/////////////////////////////////////////////////////////
protected Map<String, Message.Builder> getMessageMap() {
return new HashMap<String, Message.Builder>(messageMap);
return new ConcurrentHashMap<String, Message.Builder>(messageMap);
}

protected Map<REQID, Message.Builder> getResponseMessageMap() {
return new HashMap<REQID, Message.Builder>(responseMessageMap);
return new ConcurrentHashMap<REQID, Message.Builder>(responseMessageMap);
}

protected Map<REQID, Object> getContextMap() {
return new HashMap<REQID, Object>(contextMap);
return new ConcurrentHashMap<>(contextMap);
}

protected SocketChannelHelper getSocketChannelHelper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -71,9 +71,9 @@ public HeronServer(NIOLooper s, String host, int port, HeronSocketOptions option
nioLooper = s;
endpoint = new InetSocketAddress(host, port);
socketOptions = options;
requestMap = new HashMap<String, Message.Builder>();
messageMap = new HashMap<String, Message.Builder>();
activeConnections = new HashMap<SocketChannel, SocketChannelHelper>();
requestMap = new ConcurrentHashMap<String, Message.Builder>();
messageMap = new ConcurrentHashMap<String, Message.Builder>();
activeConnections = new ConcurrentHashMap<SocketChannel, SocketChannelHelper>();
}

public InetSocketAddress getEndpoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

import org.apache.heron.api.metric.CumulativeCountMetric;
Expand Down Expand Up @@ -53,8 +53,8 @@ public class MetricsCollector implements IMetricsRegister {

public MetricsCollector(WakeableLooper runnableToGatherMetrics,
Communicator<Metrics.MetricPublisherPublishMessage> queue) {
metrics = new HashMap<>();
timeBucketToMetricNames = new HashMap<>();
metrics = new ConcurrentHashMap<>();
timeBucketToMetricNames = new ConcurrentHashMap<>();
this.queue = queue;
this.runnableToGatherMetrics = runnableToGatherMetrics;
metricCollectionCount = new CumulativeCountMetric();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

import org.apache.heron.api.Config;
Expand Down Expand Up @@ -101,7 +102,7 @@ public PhysicalPlanHelper(PhysicalPlans.PhysicalPlan pplan, String instanceId) {
}

// setup outputSchema
outputSchema = new HashMap<String, Integer>();
outputSchema = new ConcurrentHashMap<String, Integer>();
List<TopologyAPI.OutputStream> outputs;
if (mySpout != null) {
outputs = mySpout.getOutputsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.heron.api.Config;
import org.apache.heron.api.generated.TopologyAPI;
Expand Down Expand Up @@ -63,8 +64,8 @@ public GeneralTopologyContextImpl(Map<String, Object> clusterConfig,
this.topology = topology;
this.topologyConfig = new HashMap<>(clusterConfig);
this.taskToComponentMap = taskToComponentMap;
this.inputs = new HashMap<>();
this.outputs = new HashMap<>();
this.inputs = new ConcurrentHashMap<>();
this.outputs = new ConcurrentHashMap<>();
this.componentsOutputFields = new HashMap<>();

for (int i = 0; i < this.topology.getSpoutsCount(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MultiCountMetric implements IMetric {
private Map<String, CountMetric> value = new HashMap<>();
private Map<String, CountMetric> value = new ConcurrentHashMap<>();

public MultiCountMetric() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@SuppressWarnings("rawtypes")
public class MultiReducedMetric implements IMetric {
private Map<String, ReducedMetric> value = new HashMap<>();
private Map<String, ReducedMetric> value = new ConcurrentHashMap<>();
private IReducer reducer;

public MultiReducedMetric(IReducer reducer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MultiCountMetric implements IMetric {
private Map<String, CountMetric> value = new HashMap<>();
private Map<String, CountMetric> value = new ConcurrentHashMap<>();

public MultiCountMetric() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@SuppressWarnings("rawtypes")
public class MultiReducedMetric implements IMetric {
private Map<String, ReducedMetric> value = new HashMap<>();
private Map<String, ReducedMetric> value = new ConcurrentHashMap<>();
private IReducer reducer;

public MultiReducedMetric(IReducer reducer) {
Expand Down