From 719c815b01f110cab296604d1733e5ff919f96ac Mon Sep 17 00:00:00 2001 From: guohao1 Date: Thu, 5 Sep 2024 14:35:23 +0800 Subject: [PATCH 1/2] HDDS-11413. PipelineManagerImpl lock optimization reduces AllocateBlock latency --- .../scm/container/ContainerManagerImpl.java | 8 +- .../hdds/scm/pipeline/PipelineManager.java | 8 +- .../scm/pipeline/PipelineManagerImpl.java | 81 +++++++++++++------ .../WritableRatisContainerProvider.java | 4 +- 4 files changed, 65 insertions(+), 36 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index 00aee0f62c2..8779e2578b5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -176,7 +176,7 @@ public ContainerInfo allocateContainer( // Acquire pipeline manager lock, to avoid any updates to pipeline // while allocate container happens. This is to avoid scenario like // mentioned in HDDS-5655. - pipelineManager.acquireReadLock(); + pipelineManager.acquireReadLock(replicationConfig); lock.lock(); List pipelines; Pipeline pipeline; @@ -190,7 +190,7 @@ public ContainerInfo allocateContainer( } } finally { lock.unlock(); - pipelineManager.releaseReadLock(); + pipelineManager.releaseReadLock(replicationConfig); } if (pipelines.isEmpty()) { @@ -203,7 +203,7 @@ public ContainerInfo allocateContainer( " matching pipeline for replicationConfig: " + replicationConfig + ", State:PipelineState.OPEN", e); } - pipelineManager.acquireReadLock(); + pipelineManager.acquireReadLock(replicationConfig); lock.lock(); try { pipelines = pipelineManager @@ -218,7 +218,7 @@ public ContainerInfo allocateContainer( } } finally { lock.unlock(); - pipelineManager.releaseReadLock(); + pipelineManager.releaseReadLock(replicationConfig); } } return containerInfo; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 15b0f408c56..77353adc7b8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -207,20 +207,20 @@ void reinitialize(Table pipelineStore) /** * Acquire read lock. */ - void acquireReadLock(); + void acquireReadLock(ReplicationConfig replicationConfig); /** * Release read lock. */ - void releaseReadLock(); + void releaseReadLock(ReplicationConfig replicationConfig); /** * Acquire write lock. */ - void acquireWriteLock(); + void acquireWriteLock(ReplicationConfig replicationConfig); /** * Release write lock. */ - void releaseWriteLock(); + void releaseWriteLock(ReplicationConfig replicationConfig); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 000d3e73633..d3dd42cdca7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -77,6 +77,7 @@ public class PipelineManagerImpl implements PipelineManager { // Limit the number of on-going ratis operation to be 1. private final ReentrantReadWriteLock lock; + private final ReentrantReadWriteLock ecPipelineLock; private PipelineFactory pipelineFactory; private PipelineStateManager stateManager; private BackgroundPipelineCreator backgroundPipelineCreator; @@ -105,6 +106,7 @@ protected PipelineManagerImpl(ConfigurationSource conf, SCMContext scmContext, Clock clock) { this.lock = new ReentrantReadWriteLock(); + this.ecPipelineLock = new ReentrantReadWriteLock(); this.pipelineFactory = pipelineFactory; this.stateManager = pipelineStateManager; this.conf = conf; @@ -248,7 +250,7 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig, throws IOException { checkIfPipelineCreationIsAllowed(replicationConfig); - acquireWriteLock(); + acquireWriteLock(replicationConfig); final Pipeline pipeline; try { try { @@ -261,7 +263,7 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig, addPipelineToManager(pipeline); return pipeline; } finally { - releaseWriteLock(); + releaseWriteLock(replicationConfig); } } @@ -286,7 +288,8 @@ private void addPipelineToManager(Pipeline pipeline) throws IOException { HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage( ClientVersion.CURRENT_VERSION); - acquireWriteLock(); + ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); + acquireWriteLock(replicationConfig); try { stateManager.addPipeline(pipelineProto); } catch (IOException ex) { @@ -294,7 +297,7 @@ private void addPipelineToManager(Pipeline pipeline) metrics.incNumPipelineCreationFailed(); throw ex; } finally { - releaseWriteLock(); + releaseWriteLock(replicationConfig); } recordMetricsForPipeline(pipeline); } @@ -419,19 +422,23 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException { public void openPipeline(PipelineID pipelineId) throws IOException { HddsProtos.PipelineID pipelineIdProtobuf = pipelineId.getProtobuf(); - acquireWriteLock(); - final Pipeline pipeline; + + final Pipeline pipeline = getPipeline(pipelineId); + ReplicationConfig replicationConfig = null; try { - pipeline = stateManager.getPipeline(pipelineId); if (pipeline.isClosed()) { throw new IOException("Closed pipeline can not be opened"); } + replicationConfig = pipeline.getReplicationConfig(); + acquireWriteLock(replicationConfig); if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) { stateManager.updatePipelineState(pipelineIdProtobuf, HddsProtos.PipelineState.PIPELINE_OPEN); } } finally { - releaseWriteLock(); + if (replicationConfig != null) { + releaseWriteLock(replicationConfig); + } } metrics.incNumPipelineCreated(); metrics.createPerPipelineMetrics(pipeline); @@ -447,14 +454,15 @@ protected void removePipeline(Pipeline pipeline) throws IOException { pipelineFactory.close(pipeline.getType(), pipeline); HddsProtos.PipelineID pipelineID = pipeline.getId().getProtobuf(); - acquireWriteLock(); + ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); + acquireWriteLock(replicationConfig); try { stateManager.removePipeline(pipelineID); } catch (IOException ex) { metrics.incNumPipelineDestroyFailed(); throw ex; } finally { - releaseWriteLock(); + releaseWriteLock(replicationConfig); } LOG.info("Pipeline {} removed.", pipeline); metrics.incNumPipelineDestroyed(); @@ -507,19 +515,20 @@ public void closePipeline(PipelineID pipelineID) throws IOException { HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf(); // close containers. closeContainersForPipeline(pipelineID); - if (!getPipeline(pipelineID).isClosed()) { - acquireWriteLock(); + Pipeline pipeline = getPipeline(pipelineID); + if (!pipeline.isClosed()) { + ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); + acquireWriteLock(replicationConfig); try { stateManager.updatePipelineState(pipelineIDProtobuf, HddsProtos.PipelineState.PIPELINE_CLOSED); } finally { - releaseWriteLock(); + releaseWriteLock(replicationConfig); } LOG.info("Pipeline {} moved to CLOSED state", pipelineID); } metrics.removePipelineMetrics(pipelineID); - } /** @@ -684,12 +693,14 @@ public int minPipelineLimit(Pipeline pipeline) { public void activatePipeline(PipelineID pipelineID) throws IOException { HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf(); - acquireWriteLock(); + Pipeline pipeline = getPipeline(pipelineID); + ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); + acquireWriteLock(replicationConfig); try { stateManager.updatePipelineState(pipelineIDProtobuf, HddsProtos.PipelineState.PIPELINE_OPEN); } finally { - releaseWriteLock(); + releaseWriteLock(replicationConfig); } } @@ -703,12 +714,14 @@ public void activatePipeline(PipelineID pipelineID) public void deactivatePipeline(PipelineID pipelineID) throws IOException { HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf(); - acquireWriteLock(); + Pipeline pipeline = getPipeline(pipelineID); + ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); + acquireWriteLock(replicationConfig); try { stateManager.updatePipelineState(pipelineIDProtobuf, HddsProtos.PipelineState.PIPELINE_DORMANT); } finally { - releaseWriteLock(); + releaseWriteLock(replicationConfig); } } @@ -931,22 +944,38 @@ private void recordMetricsForPipeline(Pipeline pipeline) { } @Override - public void acquireReadLock() { - lock.readLock().lock(); + public void acquireReadLock(ReplicationConfig replicationConfig) { + if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) { + ecPipelineLock.readLock().lock(); + } else { + lock.readLock().lock(); + } } @Override - public void releaseReadLock() { - lock.readLock().unlock(); + public void releaseReadLock(ReplicationConfig replicationConfig) { + if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) { + ecPipelineLock.readLock().unlock(); + } else { + lock.readLock().unlock(); + } } @Override - public void acquireWriteLock() { - lock.writeLock().lock(); + public void acquireWriteLock(ReplicationConfig replicationConfig) { + if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) { + ecPipelineLock.writeLock().lock(); + } else { + lock.writeLock().lock(); + } } @Override - public void releaseWriteLock() { - lock.writeLock().unlock(); + public void releaseWriteLock(ReplicationConfig replicationConfig) { + if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) { + ecPipelineLock.writeLock().unlock(); + } else { + lock.writeLock().unlock(); + } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java index 99a58f690c2..a1b2a28493b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableRatisContainerProvider.java @@ -156,13 +156,13 @@ private ContainerInfo getContainer(ReplicationConfig repConfig, String owner, // Acquire pipeline manager lock, to avoid any updates to pipeline // while allocate container happens. This is to avoid scenario like // mentioned in HDDS-5655. - pipelineManager.acquireReadLock(); + pipelineManager.acquireReadLock(repConfig); try { List availablePipelines = findPipelinesByState(repConfig, excludeList, Pipeline.PipelineState.OPEN); return selectContainer(availablePipelines, req, owner, excludeList); } finally { - pipelineManager.releaseReadLock(); + pipelineManager.releaseReadLock(repConfig); } } From 4ba30fc39bdfe0dfba3b07628747ba9ee19f5f59 Mon Sep 17 00:00:00 2001 From: guohao1 Date: Tue, 10 Sep 2024 17:34:42 +0800 Subject: [PATCH 2/2] recon --- .../scm/pipeline/MockPipelineManager.java | 8 +-- .../ozone/recon/scm/ReconPipelineManager.java | 65 ++++++++++++------- 2 files changed, 46 insertions(+), 27 deletions(-) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java index 6ece2ecb88f..952dc1f010f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java @@ -332,22 +332,22 @@ public Map getPipelineInfo() { } @Override - public void acquireReadLock() { + public void acquireReadLock(ReplicationConfig replicationConfig) { } @Override - public void releaseReadLock() { + public void releaseReadLock(ReplicationConfig replicationConfig) { } @Override - public void acquireWriteLock() { + public void acquireWriteLock(ReplicationConfig replicationConfig) { } @Override - public void releaseWriteLock() { + public void releaseWriteLock(ReplicationConfig replicationConfig) { } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java index 6bc54f3e6ee..8e5e514f8c9 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.stream.Collectors; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -98,29 +99,46 @@ public static ReconPipelineManager newReconPipelineManager( */ void initializePipelines(List pipelinesFromScm) throws IOException { - - acquireWriteLock(); - try { - List pipelinesInHouse = getPipelines(); - LOG.info("Recon has {} pipelines in house.", pipelinesInHouse.size()); - for (Pipeline pipeline : pipelinesFromScm) { - // New pipeline got from SCM. Validate If it doesn't exist at Recon, try adding it. - if (addPipeline(pipeline)) { - LOG.info("Added new pipeline {} from SCM.", pipeline.getId()); - } else { - LOG.warn("Pipeline {} already exists in Recon pipeline metadata.", pipeline.getId()); - // Recon already has this pipeline. Just update state and creation - // time. - getStateManager().updatePipelineState( - pipeline.getId().getProtobuf(), - Pipeline.PipelineState.getProtobuf(pipeline.getPipelineState())); - getPipeline(pipeline.getId()).setCreationTimestamp( - pipeline.getCreationTimestamp()); + HddsProtos.ReplicationType[] replicationTypes = + HddsProtos.ReplicationType.values(); + for (HddsProtos.ReplicationType replicationType : replicationTypes) { + List pipelines = pipelinesFromScm.stream().filter( + p -> p.getReplicationConfig().getReplicationType() + .equals(replicationType)).collect(Collectors.toList()); + + if (!pipelines.isEmpty()) { + ReplicationConfig replicationConfig = + pipelines.iterator().next().getReplicationConfig(); + + acquireWriteLock(replicationConfig); + try { + List pipelinesInHouse = getPipelines().stream().filter( + p -> p.getReplicationConfig().getReplicationType() + .equals(replicationType)).collect(Collectors.toList()); + + LOG.info("Recon has {} pipelines in house.", pipelinesInHouse.size()); + for (Pipeline pipeline : pipelines) { + // New pipeline got from SCM. Validate If it doesn't exist at Recon, try adding it. + if (addPipeline(pipeline)) { + LOG.info("Added new pipeline {} from SCM.", pipeline.getId()); + } else { + LOG.warn("Pipeline {} already exists in Recon pipeline metadata.", + pipeline.getId()); + // Recon already has this pipeline. Just update state and creation + // time. + getStateManager().updatePipelineState( + pipeline.getId().getProtobuf(), + Pipeline.PipelineState.getProtobuf( + pipeline.getPipelineState())); + getPipeline(pipeline.getId()).setCreationTimestamp( + pipeline.getCreationTimestamp()); + } + } + removeInvalidPipelines(pipelines); + } finally { + releaseWriteLock(replicationConfig); } } - removeInvalidPipelines(pipelinesFromScm); - } finally { - releaseWriteLock(); } } @@ -163,7 +181,8 @@ public void removeInvalidPipelines(List pipelinesFromScm) { */ @VisibleForTesting public boolean addPipeline(Pipeline pipeline) throws IOException { - acquireWriteLock(); + ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); + acquireWriteLock(replicationConfig); try { // Check if the pipeline already exists if (containsPipeline(pipeline.getId())) { @@ -172,7 +191,7 @@ public boolean addPipeline(Pipeline pipeline) throws IOException { getStateManager().addPipeline(pipeline.getProtobufMessage(ClientVersion.CURRENT_VERSION)); return true; } finally { - releaseWriteLock(); + releaseWriteLock(replicationConfig); } }