Skip to content

Commit

Permalink
HDDS-2210. ContainerStateMachine should not be marked unhealthy if ap…
Browse files Browse the repository at this point in the history
…plyTransaction fails with closed container exception(apache#1552).
  • Loading branch information
bshashikant authored and fapifta committed Oct 2, 2019
1 parent 7077586 commit 4f211db
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public class ContainerStateMachine extends BaseStateMachine {
private final Cache<Long, ByteString> stateMachineDataCache;
private final boolean isBlockTokenEnabled;
private final TokenVerifier tokenVerifier;
private final AtomicBoolean isStateMachineHealthy;
private final AtomicBoolean stateMachineHealthy;

private final Semaphore applyTransactionSemaphore;
/**
Expand Down Expand Up @@ -190,7 +190,7 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
ScmConfigKeys.
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
isStateMachineHealthy = new AtomicBoolean(true);
stateMachineHealthy = new AtomicBoolean(true);
this.executors = new ExecutorService[numContainerOpExecutors];
for (int i = 0; i < numContainerOpExecutors; i++) {
final int index = i;
Expand Down Expand Up @@ -271,11 +271,15 @@ public void persistContainerSet(OutputStream out) throws IOException {
IOUtils.write(builder.build().toByteArray(), out);
}

public boolean isStateMachineHealthy() {
return stateMachineHealthy.get();
}

@Override
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
long startTime = Time.monotonicNow();
if (!isStateMachineHealthy.get()) {
if (!isStateMachineHealthy()) {
String msg =
"Failed to take snapshot " + " for " + gid + " as the stateMachine"
+ " is unhealthy. The last applied index is at " + ti;
Expand Down Expand Up @@ -731,7 +735,11 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
metrics.incPipelineLatency(cmdType,
Time.monotonicNowNanos() - startTime);
}
if (r.getResult() != ContainerProtos.Result.SUCCESS) {
// ignore close container exception while marking the stateMachine
// unhealthy
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(
Expand All @@ -744,7 +752,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// caught in stateMachineUpdater in Ratis and ratis server will
// shutdown.
applyTransactionFuture.completeExceptionally(sce);
isStateMachineHealthy.compareAndSet(true, false);
stateMachineHealthy.compareAndSet(true, false);
ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
} else {
LOG.debug(
Expand All @@ -759,7 +767,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// add the entry to the applyTransactionCompletionMap only if the
// stateMachine is healthy i.e, there has been no applyTransaction
// failures before.
if (isStateMachineHealthy.get()) {
if (isStateMachineHealthy()) {
final Long previous = applyTransactionCompletionMap
.put(index, trx.getLogEntry().getTerm());
Preconditions.checkState(previous == null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,71 @@ public void testApplyTransactionFailure() throws Exception {
Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath()));
}

@Test
public void testApplyTransactionIdempotencyWithClosedContainer()
throws Exception {
OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName)
.createKey("ratis", 1024, ReplicationType.RATIS,
ReplicationFactor.ONE, new HashMap<>());
// First write and flush creates a container in the datanode
key.write("ratis".getBytes());
key.flush();
key.write("ratis".getBytes());
KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream();
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
Assert.assertEquals(1, locationInfoList.size());
OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
ContainerData containerData =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet()
.getContainer(omKeyLocationInfo.getContainerID())
.getContainerData();
Assert.assertTrue(containerData instanceof KeyValueContainerData);
key.close();
ContainerStateMachine stateMachine =
(ContainerStateMachine) ContainerTestHelper.getStateMachine(cluster);
SimpleStateMachineStorage storage =
(SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
Path parentPath = storage.findLatestSnapshot().getFile().getPath();
// Since the snapshot threshold is set to 1, since there are
// applyTransactions, we should see snapshots
Assert.assertTrue(parentPath.getParent().toFile().listFiles().length > 0);
FileInfo snapshot = storage.findLatestSnapshot().getFile();
Assert.assertNotNull(snapshot);
long containerID = omKeyLocationInfo.getContainerID();
Pipeline pipeline = cluster.getStorageContainerLocationClient()
.getContainerWithPipeline(containerID).getPipeline();
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(pipeline);
ContainerProtos.ContainerCommandRequestProto.Builder request =
ContainerProtos.ContainerCommandRequestProto.newBuilder();
request.setDatanodeUuid(pipeline.getFirstNode().getUuidString());
request.setCmdType(ContainerProtos.Type.CloseContainer);
request.setContainerID(containerID);
request.setCloseContainer(
ContainerProtos.CloseContainerRequestProto.getDefaultInstance());
try {
xceiverClient.sendCommand(request.build());
} catch (IOException e) {
Assert.fail("Exception should not be thrown");
}
Assert.assertTrue(
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet().getContainer(containerID)
.getContainerState()
== ContainerProtos.ContainerDataProto.State.CLOSED);
Assert.assertTrue(stateMachine.isStateMachineHealthy());
try {
stateMachine.takeSnapshot();
} catch (IOException ioe) {
Assert.fail("Exception should not be thrown");
}
FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
Assert.assertFalse(snapshot.getPath().equals(latestSnapshot.getPath()));
}

@Test
public void testValidateBCSIDOnDnRestart() throws Exception {
OzoneOutputStream key =
Expand Down

0 comments on commit 4f211db

Please sign in to comment.