diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index b48e52dafe6..d61f9ee366b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -182,7 +182,7 @@ public ContainerInfo allocateContainer( // Acquire pipeline manager lock, to avoid any updates to pipeline // while allocate container happens. This is to avoid scenario like // mentioned in HDDS-5655. - pipelineManager.acquireReadLock(replicationConfig); + pipelineManager.acquireReadLock(); lock.lock(); List pipelines; Pipeline pipeline; @@ -196,7 +196,7 @@ public ContainerInfo allocateContainer( } } finally { lock.unlock(); - pipelineManager.releaseReadLock(replicationConfig); + pipelineManager.releaseReadLock(); } if (pipelines.isEmpty()) { @@ -209,7 +209,7 @@ public ContainerInfo allocateContainer( " matching pipeline for replicationConfig: " + replicationConfig + ", State:PipelineState.OPEN", e); } - pipelineManager.acquireReadLock(replicationConfig); + pipelineManager.acquireReadLock(); lock.lock(); try { pipelines = pipelineManager @@ -224,7 +224,7 @@ public ContainerInfo allocateContainer( } } finally { lock.unlock(); - pipelineManager.releaseReadLock(replicationConfig); + pipelineManager.releaseReadLock(); } } return containerInfo; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 77353adc7b8..15b0f408c56 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -207,20 +207,20 @@ void reinitialize(Table pipelineStore) /** * Acquire read lock. */ - void acquireReadLock(ReplicationConfig replicationConfig); + void acquireReadLock(); /** * Release read lock. */ - void releaseReadLock(ReplicationConfig replicationConfig); + void releaseReadLock(); /** * Acquire write lock. */ - void acquireWriteLock(ReplicationConfig replicationConfig); + void acquireWriteLock(); /** * Release write lock. */ - void releaseWriteLock(ReplicationConfig replicationConfig); + void releaseWriteLock(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index d3dd42cdca7..000d3e73633 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -77,7 +77,6 @@ public class PipelineManagerImpl implements PipelineManager { // Limit the number of on-going ratis operation to be 1. private final ReentrantReadWriteLock lock; - private final ReentrantReadWriteLock ecPipelineLock; private PipelineFactory pipelineFactory; private PipelineStateManager stateManager; private BackgroundPipelineCreator backgroundPipelineCreator; @@ -106,7 +105,6 @@ protected PipelineManagerImpl(ConfigurationSource conf, SCMContext scmContext, Clock clock) { this.lock = new ReentrantReadWriteLock(); - this.ecPipelineLock = new ReentrantReadWriteLock(); this.pipelineFactory = pipelineFactory; this.stateManager = pipelineStateManager; this.conf = conf; @@ -250,7 +248,7 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig, throws IOException { checkIfPipelineCreationIsAllowed(replicationConfig); - acquireWriteLock(replicationConfig); + acquireWriteLock(); final Pipeline pipeline; try { try { @@ -263,7 +261,7 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig, addPipelineToManager(pipeline); return pipeline; } finally { - releaseWriteLock(replicationConfig); + releaseWriteLock(); } } @@ -288,8 +286,7 @@ private void addPipelineToManager(Pipeline pipeline) throws IOException { HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage( ClientVersion.CURRENT_VERSION); - ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); - acquireWriteLock(replicationConfig); + acquireWriteLock(); try { stateManager.addPipeline(pipelineProto); } catch (IOException ex) { @@ -297,7 +294,7 @@ private void addPipelineToManager(Pipeline pipeline) metrics.incNumPipelineCreationFailed(); throw ex; } finally { - releaseWriteLock(replicationConfig); + releaseWriteLock(); } recordMetricsForPipeline(pipeline); } @@ -422,23 +419,19 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException { public void openPipeline(PipelineID pipelineId) throws IOException { HddsProtos.PipelineID pipelineIdProtobuf = pipelineId.getProtobuf(); - - final Pipeline pipeline = getPipeline(pipelineId); - ReplicationConfig replicationConfig = null; + acquireWriteLock(); + final Pipeline pipeline; try { + pipeline = stateManager.getPipeline(pipelineId); if (pipeline.isClosed()) { throw new IOException("Closed pipeline can not be opened"); } - replicationConfig = pipeline.getReplicationConfig(); - acquireWriteLock(replicationConfig); if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { stateManager.updatePipelineState(pipelineIdProtobuf, HddsProtos.PipelineState.PIPELINE_OPEN); } } finally { - if (replicationConfig != null) { - releaseWriteLock(replicationConfig); - } + releaseWriteLock(); } metrics.incNumPipelineCreated(); metrics.createPerPipelineMetrics(pipeline); @@ -454,15 +447,14 @@ protected void removePipeline(Pipeline pipeline) throws IOException { pipelineFactory.close(pipeline.getType(), pipeline); HddsProtos.PipelineID pipelineID = pipeline.getId().getProtobuf(); - ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); - acquireWriteLock(replicationConfig); + acquireWriteLock(); try { stateManager.removePipeline(pipelineID); } catch (IOException ex) { metrics.incNumPipelineDestroyFailed(); throw ex; } finally { - releaseWriteLock(replicationConfig); + releaseWriteLock(); } LOG.info("Pipeline {} removed.", pipeline); metrics.incNumPipelineDestroyed(); @@ -515,20 +507,19 @@ public void closePipeline(PipelineID pipelineID) throws IOException { HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf(); // close containers. closeContainersForPipeline(pipelineID); - Pipeline pipeline = getPipeline(pipelineID); - if (!pipeline.isClosed()) { - ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); - acquireWriteLock(replicationConfig); + if (!getPipeline(pipelineID).isClosed()) { + acquireWriteLock(); try { stateManager.updatePipelineState(pipelineIDProtobuf, HddsProtos.PipelineState.PIPELINE_CLOSED); } finally { - releaseWriteLock(replicationConfig); + releaseWriteLock(); } LOG.info("Pipeline {} moved to CLOSED state", pipelineID); } metrics.removePipelineMetrics(pipelineID); + } /** @@ -693,14 +684,12 @@ public int minPipelineLimit(Pipeline pipeline) { public void activatePipeline(PipelineID pipelineID) throws IOException { HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf(); - Pipeline pipeline = getPipeline(pipelineID); - ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); - acquireWriteLock(replicationConfig); + acquireWriteLock(); try { stateManager.updatePipelineState(pipelineIDProtobuf, HddsProtos.PipelineState.PIPELINE_OPEN); } finally { - releaseWriteLock(replicationConfig); + releaseWriteLock(); } } @@ -714,14 +703,12 @@ public void activatePipeline(PipelineID pipelineID) public void deactivatePipeline(PipelineID pipelineID) throws IOException { HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf(); - Pipeline pipeline = getPipeline(pipelineID); - ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); - acquireWriteLock(replicationConfig); + acquireWriteLock(); try { stateManager.updatePipelineState(pipelineIDProtobuf, HddsProtos.PipelineState.PIPELINE_DORMANT); } finally { - releaseWriteLock(replicationConfig); + releaseWriteLock(); } } @@ -944,38 +931,22 @@ private void recordMetricsForPipeline(Pipeline pipeline) { } @Override - public void acquireReadLock(ReplicationConfig replicationConfig) { - if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) { - ecPipelineLock.readLock().lock(); - } else { - lock.readLock().lock(); - } + public void acquireReadLock() { + lock.readLock().lock(); } @Override - public void releaseReadLock(ReplicationConfig replicationConfig) { - if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) { - ecPipelineLock.readLock().unlock(); - } else { - lock.readLock().unlock(); - } + public void releaseReadLock() { + lock.readLock().unlock(); } @Override - public void acquireWriteLock(ReplicationConfig replicationConfig) { - if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) { - ecPipelineLock.writeLock().lock(); - } else { - lock.writeLock().lock(); - } + public void acquireWriteLock() { + lock.writeLock().lock(); } @Override - public void releaseWriteLock(ReplicationConfig replicationConfig) { - if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) { - ecPipelineLock.writeLock().unlock(); - } else { - lock.writeLock().unlock(); - } + public void releaseWriteLock() { + lock.writeLock().unlock(); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java index a1b2a28493b..99a58f690c2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java @@ -156,13 +156,13 @@ private ContainerInfo getContainer(ReplicationConfig repConfig, String owner, // Acquire pipeline manager lock, to avoid any updates to pipeline // while allocate container happens. This is to avoid scenario like // mentioned in HDDS-5655. - pipelineManager.acquireReadLock(repConfig); + pipelineManager.acquireReadLock(); try { List availablePipelines = findPipelinesByState(repConfig, excludeList, Pipeline.PipelineState.OPEN); return selectContainer(availablePipelines, req, owner, excludeList); } finally { - pipelineManager.releaseReadLock(repConfig); + pipelineManager.releaseReadLock(); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java index 952dc1f010f..6ece2ecb88f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java @@ -332,22 +332,22 @@ public Map getPipelineInfo() { } @Override - public void acquireReadLock(ReplicationConfig replicationConfig) { + public void acquireReadLock() { } @Override - public void releaseReadLock(ReplicationConfig replicationConfig) { + public void releaseReadLock() { } @Override - public void acquireWriteLock(ReplicationConfig replicationConfig) { + public void acquireWriteLock() { } @Override - public void releaseWriteLock(ReplicationConfig replicationConfig) { + public void releaseWriteLock() { } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java index 8e5e514f8c9..6bc54f3e6ee 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.stream.Collectors; -import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -99,46 +98,29 @@ public static ReconPipelineManager newReconPipelineManager( */ void initializePipelines(List pipelinesFromScm) throws IOException { - HddsProtos.ReplicationType[] replicationTypes = - HddsProtos.ReplicationType.values(); - for (HddsProtos.ReplicationType replicationType : replicationTypes) { - List pipelines = pipelinesFromScm.stream().filter( - p -> p.getReplicationConfig().getReplicationType() - .equals(replicationType)).collect(Collectors.toList()); - - if (!pipelines.isEmpty()) { - ReplicationConfig replicationConfig = - pipelines.iterator().next().getReplicationConfig(); - - acquireWriteLock(replicationConfig); - try { - List pipelinesInHouse = getPipelines().stream().filter( - p -> p.getReplicationConfig().getReplicationType() - .equals(replicationType)).collect(Collectors.toList()); - - LOG.info("Recon has {} pipelines in house.", pipelinesInHouse.size()); - for (Pipeline pipeline : pipelines) { - // New pipeline got from SCM. Validate If it doesn't exist at Recon, try adding it. - if (addPipeline(pipeline)) { - LOG.info("Added new pipeline {} from SCM.", pipeline.getId()); - } else { - LOG.warn("Pipeline {} already exists in Recon pipeline metadata.", - pipeline.getId()); - // Recon already has this pipeline. Just update state and creation - // time. - getStateManager().updatePipelineState( - pipeline.getId().getProtobuf(), - Pipeline.PipelineState.getProtobuf( - pipeline.getPipelineState())); - getPipeline(pipeline.getId()).setCreationTimestamp( - pipeline.getCreationTimestamp()); - } - } - removeInvalidPipelines(pipelines); - } finally { - releaseWriteLock(replicationConfig); + + acquireWriteLock(); + try { + List pipelinesInHouse = getPipelines(); + LOG.info("Recon has {} pipelines in house.", pipelinesInHouse.size()); + for (Pipeline pipeline : pipelinesFromScm) { + // New pipeline got from SCM. Validate If it doesn't exist at Recon, try adding it. + if (addPipeline(pipeline)) { + LOG.info("Added new pipeline {} from SCM.", pipeline.getId()); + } else { + LOG.warn("Pipeline {} already exists in Recon pipeline metadata.", pipeline.getId()); + // Recon already has this pipeline. Just update state and creation + // time. + getStateManager().updatePipelineState( + pipeline.getId().getProtobuf(), + Pipeline.PipelineState.getProtobuf(pipeline.getPipelineState())); + getPipeline(pipeline.getId()).setCreationTimestamp( + pipeline.getCreationTimestamp()); } } + removeInvalidPipelines(pipelinesFromScm); + } finally { + releaseWriteLock(); } } @@ -181,8 +163,7 @@ public void removeInvalidPipelines(List pipelinesFromScm) { */ @VisibleForTesting public boolean addPipeline(Pipeline pipeline) throws IOException { - ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); - acquireWriteLock(replicationConfig); + acquireWriteLock(); try { // Check if the pipeline already exists if (containsPipeline(pipeline.getId())) { @@ -191,7 +172,7 @@ public boolean addPipeline(Pipeline pipeline) throws IOException { getStateManager().addPipeline(pipeline.getProtobufMessage(ClientVersion.CURRENT_VERSION)); return true; } finally { - releaseWriteLock(replicationConfig); + releaseWriteLock(); } }