Skip to content

Commit

Permalink
HDDS-10971. Replace ConcurrentHashMap with HashMap in PipelineStateMap (
Browse files Browse the repository at this point in the history
  • Loading branch information
Montura authored Jun 7, 2024
1 parent 0f848e4 commit 6d177d9
Showing 1 changed file with 12 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

import static java.lang.String.format;

Expand All @@ -46,24 +44,19 @@
* its state.
* Invariant: If a pipeline exists in PipelineStateMap, both pipelineMap and
* pipeline2container would have a non-null mapping for it.
*
* Concurrency consideration:
* - thread-unsafe
*/
class PipelineStateMap {
private static final Logger LOG = LoggerFactory.getLogger(PipelineStateMap.class);

private static final Logger LOG = LoggerFactory.getLogger(
PipelineStateMap.class);

private final Map<PipelineID, Pipeline> pipelineMap;
private final Map<PipelineID, NavigableSet<ContainerID>> pipeline2container;
private final Map<ReplicationConfig, List<Pipeline>> query2OpenPipelines;

PipelineStateMap() {
// TODO: Use TreeMap for range operations?
private final Map<PipelineID, Pipeline> pipelineMap = new HashMap<>();
private final Map<PipelineID, NavigableSet<ContainerID>> pipeline2container = new HashMap<>();
private final Map<ReplicationConfig, List<Pipeline>> query2OpenPipelines = new HashMap<>();

// TODO: Use TreeMap for range operations?
pipelineMap = new ConcurrentHashMap<>();
pipeline2container = new ConcurrentHashMap<>();
query2OpenPipelines = new HashMap<>();

}
PipelineStateMap() { }

/**
* Adds provided pipeline in the data structures.
Expand All @@ -87,8 +80,8 @@ void addPipeline(Pipeline pipeline) throws IOException {
}
pipeline2container.put(pipeline.getId(), new TreeSet<>());
if (pipeline.getPipelineState() == PipelineState.OPEN) {
query2OpenPipelines.computeIfAbsent(pipeline.getReplicationConfig(),
any -> new CopyOnWriteArrayList<>()).add(pipeline);
query2OpenPipelines.computeIfAbsent(pipeline.getReplicationConfig(), any -> new ArrayList<>())
.add(pipeline);
}
}

Expand Down Expand Up @@ -416,7 +409,7 @@ Pipeline updatePipelineState(PipelineID pipelineID, PipelineState state)
if (updatedPipeline.getPipelineState() == PipelineState.OPEN) {
// for transition to OPEN state add pipeline to query2OpenPipelines
if (pipelineList == null) {
pipelineList = new CopyOnWriteArrayList<>();
pipelineList = new ArrayList<>();
query2OpenPipelines.put(pipeline.getReplicationConfig(), pipelineList);
}
pipelineList.add(updatedPipeline);
Expand Down

0 comments on commit 6d177d9

Please sign in to comment.