Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into HEAD
Browse files Browse the repository at this point in the history
Change-Id: I3624d258090838ce250d30275082fd9165824cf1
  • Loading branch information
swamirishi committed Dec 13, 2024
2 parents 3a29fea + 16ba289 commit efa38b0
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public ContainerInfo allocateContainer(
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
pipelineManager.acquireReadLock(replicationConfig);
pipelineManager.acquireReadLock();
lock.lock();
List<Pipeline> pipelines;
Pipeline pipeline;
Expand All @@ -196,7 +196,7 @@ public ContainerInfo allocateContainer(
}
} finally {
lock.unlock();
pipelineManager.releaseReadLock(replicationConfig);
pipelineManager.releaseReadLock();
}

if (pipelines.isEmpty()) {
Expand All @@ -209,7 +209,7 @@ public ContainerInfo allocateContainer(
" matching pipeline for replicationConfig: " + replicationConfig
+ ", State:PipelineState.OPEN", e);
}
pipelineManager.acquireReadLock(replicationConfig);
pipelineManager.acquireReadLock();
lock.lock();
try {
pipelines = pipelineManager
Expand All @@ -224,7 +224,7 @@ public ContainerInfo allocateContainer(
}
} finally {
lock.unlock();
pipelineManager.releaseReadLock(replicationConfig);
pipelineManager.releaseReadLock();
}
}
return containerInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,20 @@ void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
/**
* Acquire read lock.
*/
void acquireReadLock(ReplicationConfig replicationConfig);
void acquireReadLock();

/**
* Release read lock.
*/
void releaseReadLock(ReplicationConfig replicationConfig);
void releaseReadLock();

/**
* Acquire write lock.
*/
void acquireWriteLock(ReplicationConfig replicationConfig);
void acquireWriteLock();

/**
* Release write lock.
*/
void releaseWriteLock(ReplicationConfig replicationConfig);
void releaseWriteLock();
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public class PipelineManagerImpl implements PipelineManager {

// Limit the number of on-going ratis operation to be 1.
private final ReentrantReadWriteLock lock;
private final ReentrantReadWriteLock ecPipelineLock;
private PipelineFactory pipelineFactory;
private PipelineStateManager stateManager;
private BackgroundPipelineCreator backgroundPipelineCreator;
Expand Down Expand Up @@ -106,7 +105,6 @@ protected PipelineManagerImpl(ConfigurationSource conf,
SCMContext scmContext,
Clock clock) {
this.lock = new ReentrantReadWriteLock();
this.ecPipelineLock = new ReentrantReadWriteLock();
this.pipelineFactory = pipelineFactory;
this.stateManager = pipelineStateManager;
this.conf = conf;
Expand Down Expand Up @@ -250,7 +248,7 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig,
throws IOException {
checkIfPipelineCreationIsAllowed(replicationConfig);

acquireWriteLock(replicationConfig);
acquireWriteLock();
final Pipeline pipeline;
try {
try {
Expand All @@ -263,7 +261,7 @@ public Pipeline createPipeline(ReplicationConfig replicationConfig,
addPipelineToManager(pipeline);
return pipeline;
} finally {
releaseWriteLock(replicationConfig);
releaseWriteLock();
}
}

Expand All @@ -288,16 +286,15 @@ private void addPipelineToManager(Pipeline pipeline)
throws IOException {
HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
ClientVersion.CURRENT_VERSION);
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
acquireWriteLock();
try {
stateManager.addPipeline(pipelineProto);
} catch (IOException ex) {
LOG.debug("Failed to add pipeline {}.", pipeline, ex);
metrics.incNumPipelineCreationFailed();
throw ex;
} finally {
releaseWriteLock(replicationConfig);
releaseWriteLock();
}
recordMetricsForPipeline(pipeline);
}
Expand Down Expand Up @@ -422,23 +419,19 @@ public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
public void openPipeline(PipelineID pipelineId)
throws IOException {
HddsProtos.PipelineID pipelineIdProtobuf = pipelineId.getProtobuf();

final Pipeline pipeline = getPipeline(pipelineId);
ReplicationConfig replicationConfig = null;
acquireWriteLock();
final Pipeline pipeline;
try {
pipeline = stateManager.getPipeline(pipelineId);
if (pipeline.isClosed()) {
throw new IOException("Closed pipeline can not be opened");
}
replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
stateManager.updatePipelineState(pipelineIdProtobuf,
HddsProtos.PipelineState.PIPELINE_OPEN);
}
} finally {
if (replicationConfig != null) {
releaseWriteLock(replicationConfig);
}
releaseWriteLock();
}
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
Expand All @@ -454,15 +447,14 @@ protected void removePipeline(Pipeline pipeline)
throws IOException {
pipelineFactory.close(pipeline.getType(), pipeline);
HddsProtos.PipelineID pipelineID = pipeline.getId().getProtobuf();
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
acquireWriteLock();
try {
stateManager.removePipeline(pipelineID);
} catch (IOException ex) {
metrics.incNumPipelineDestroyFailed();
throw ex;
} finally {
releaseWriteLock(replicationConfig);
releaseWriteLock();
}
LOG.info("Pipeline {} removed.", pipeline);
metrics.incNumPipelineDestroyed();
Expand Down Expand Up @@ -515,20 +507,19 @@ public void closePipeline(PipelineID pipelineID) throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
// close containers.
closeContainersForPipeline(pipelineID);
Pipeline pipeline = getPipeline(pipelineID);
if (!pipeline.isClosed()) {
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
if (!getPipeline(pipelineID).isClosed()) {
acquireWriteLock();
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_CLOSED);
} finally {
releaseWriteLock(replicationConfig);
releaseWriteLock();
}
LOG.info("Pipeline {} moved to CLOSED state", pipelineID);
}

metrics.removePipelineMetrics(pipelineID);

}

/**
Expand Down Expand Up @@ -693,14 +684,12 @@ public int minPipelineLimit(Pipeline pipeline) {
public void activatePipeline(PipelineID pipelineID)
throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
Pipeline pipeline = getPipeline(pipelineID);
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
acquireWriteLock();
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_OPEN);
} finally {
releaseWriteLock(replicationConfig);
releaseWriteLock();
}
}

Expand All @@ -714,14 +703,12 @@ public void activatePipeline(PipelineID pipelineID)
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
HddsProtos.PipelineID pipelineIDProtobuf = pipelineID.getProtobuf();
Pipeline pipeline = getPipeline(pipelineID);
ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
acquireWriteLock(replicationConfig);
acquireWriteLock();
try {
stateManager.updatePipelineState(pipelineIDProtobuf,
HddsProtos.PipelineState.PIPELINE_DORMANT);
} finally {
releaseWriteLock(replicationConfig);
releaseWriteLock();
}
}

Expand Down Expand Up @@ -944,38 +931,22 @@ private void recordMetricsForPipeline(Pipeline pipeline) {
}

@Override
public void acquireReadLock(ReplicationConfig replicationConfig) {
if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
ecPipelineLock.readLock().lock();
} else {
lock.readLock().lock();
}
public void acquireReadLock() {
lock.readLock().lock();
}

@Override
public void releaseReadLock(ReplicationConfig replicationConfig) {
if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
ecPipelineLock.readLock().unlock();
} else {
lock.readLock().unlock();
}
public void releaseReadLock() {
lock.readLock().unlock();
}

@Override
public void acquireWriteLock(ReplicationConfig replicationConfig) {
if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
ecPipelineLock.writeLock().lock();
} else {
lock.writeLock().lock();
}
public void acquireWriteLock() {
lock.writeLock().lock();
}

@Override
public void releaseWriteLock(ReplicationConfig replicationConfig) {
if (replicationConfig.getReplicationType().equals(ReplicationType.EC)) {
ecPipelineLock.writeLock().unlock();
} else {
lock.writeLock().unlock();
}
public void releaseWriteLock() {
lock.writeLock().unlock();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ private ContainerInfo getContainer(ReplicationConfig repConfig, String owner,
// Acquire pipeline manager lock, to avoid any updates to pipeline
// while allocate container happens. This is to avoid scenario like
// mentioned in HDDS-5655.
pipelineManager.acquireReadLock(repConfig);
pipelineManager.acquireReadLock();
try {
List<Pipeline> availablePipelines = findPipelinesByState(repConfig,
excludeList, Pipeline.PipelineState.OPEN);
return selectContainer(availablePipelines, req, owner, excludeList);
} finally {
pipelineManager.releaseReadLock(repConfig);
pipelineManager.releaseReadLock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,22 +332,22 @@ public Map<String, Integer> getPipelineInfo() {
}

@Override
public void acquireReadLock(ReplicationConfig replicationConfig) {
public void acquireReadLock() {

}

@Override
public void releaseReadLock(ReplicationConfig replicationConfig) {
public void releaseReadLock() {

}

@Override
public void acquireWriteLock(ReplicationConfig replicationConfig) {
public void acquireWriteLock() {

}

@Override
public void releaseWriteLock(ReplicationConfig replicationConfig) {
public void releaseWriteLock() {

}

Expand Down
Loading

0 comments on commit efa38b0

Please sign in to comment.