3434import org .apache .hadoop .util .Time ;
3535import org .apache .ratis .proto .RaftProtos .RaftPeerRole ;
3636import org .apache .ratis .protocol .RaftGroupId ;
37+ import org .apache .ratis .protocol .StateMachineException ;
3738import org .apache .ratis .server .RaftServer ;
3839import org .apache .ratis .server .impl .RaftServerProxy ;
3940import org .apache .ratis .server .protocol .TermIndex ;
8384import java .util .concurrent .Semaphore ;
8485import java .util .concurrent .TimeUnit ;
8586import java .util .concurrent .ExecutionException ;
87+ import java .util .concurrent .atomic .AtomicBoolean ;
8688import java .util .stream .Collectors ;
8789import java .util .Set ;
8890import java .util .concurrent .ConcurrentSkipListSet ;
@@ -147,6 +149,7 @@ public class ContainerStateMachine extends BaseStateMachine {
147149 private final Cache <Long , ByteString > stateMachineDataCache ;
148150 private final boolean isBlockTokenEnabled ;
149151 private final TokenVerifier tokenVerifier ;
152+ private final AtomicBoolean isStateMachineHealthy ;
150153
151154 private final Semaphore applyTransactionSemaphore ;
152155 /**
@@ -184,6 +187,7 @@ public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
184187 ScmConfigKeys .
185188 DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT );
186189 applyTransactionSemaphore = new Semaphore (maxPendingApplyTransactions );
190+ isStateMachineHealthy = new AtomicBoolean (true );
187191 this .executors = new ExecutorService [numContainerOpExecutors ];
188192 for (int i = 0 ; i < numContainerOpExecutors ; i ++) {
189193 final int index = i ;
@@ -265,6 +269,14 @@ public void persistContainerSet(OutputStream out) throws IOException {
265269 public long takeSnapshot () throws IOException {
266270 TermIndex ti = getLastAppliedTermIndex ();
267271 long startTime = Time .monotonicNow ();
272+ if (!isStateMachineHealthy .get ()) {
273+ String msg =
274+ "Failed to take snapshot " + " for " + gid + " as the stateMachine"
275+ + " is unhealthy. The last applied index is at " + ti ;
276+ StateMachineException sme = new StateMachineException (msg );
277+ LOG .error (msg );
278+ throw sme ;
279+ }
268280 if (ti != null && ti .getIndex () != RaftLog .INVALID_LOG_INDEX ) {
269281 final File snapshotFile =
270282 storage .getSnapshotFile (ti .getTerm (), ti .getIndex ());
@@ -275,12 +287,12 @@ public long takeSnapshot() throws IOException {
275287 // make sure the snapshot file is synced
276288 fos .getFD ().sync ();
277289 } catch (IOException ioe ) {
278- LOG .info ("{}: Failed to write snapshot at:{} file {}" , gid , ti ,
290+ LOG .error ("{}: Failed to write snapshot at:{} file {}" , gid , ti ,
279291 snapshotFile );
280292 throw ioe ;
281293 }
282- LOG .info ("{}: Finished taking a snapshot at:{} file:{} time:{}" ,
283- gid , ti , snapshotFile , (Time .monotonicNow () - startTime ));
294+ LOG .info ("{}: Finished taking a snapshot at:{} file:{} time:{}" , gid , ti ,
295+ snapshotFile , (Time .monotonicNow () - startTime ));
284296 return ti .getIndex ();
285297 }
286298 return -1 ;
@@ -385,17 +397,12 @@ private ContainerCommandResponseProto dispatchCommand(
385397 return response ;
386398 }
387399
388- private ContainerCommandResponseProto runCommandGetResponse (
400+ private ContainerCommandResponseProto runCommand (
389401 ContainerCommandRequestProto requestProto ,
390402 DispatcherContext context ) {
391403 return dispatchCommand (requestProto , context );
392404 }
393405
394- private Message runCommand (ContainerCommandRequestProto requestProto ,
395- DispatcherContext context ) {
396- return runCommandGetResponse (requestProto , context )::toByteString ;
397- }
398-
399406 private ExecutorService getCommandExecutor (
400407 ContainerCommandRequestProto requestProto ) {
401408 int executorId = (int )(requestProto .getContainerID () % executors .length );
@@ -425,7 +432,7 @@ private CompletableFuture<Message> handleWriteChunk(
425432 // thread.
426433 CompletableFuture <ContainerCommandResponseProto > writeChunkFuture =
427434 CompletableFuture .supplyAsync (() ->
428- runCommandGetResponse (requestProto , context ), chunkExecutor );
435+ runCommand (requestProto , context ), chunkExecutor );
429436
430437 CompletableFuture <Message > raftFuture = new CompletableFuture <>();
431438
@@ -502,7 +509,8 @@ public CompletableFuture<Message> query(Message request) {
502509 metrics .incNumQueryStateMachineOps ();
503510 final ContainerCommandRequestProto requestProto =
504511 getContainerCommandRequestProto (request .getContent ());
505- return CompletableFuture .completedFuture (runCommand (requestProto , null ));
512+ return CompletableFuture
513+ .completedFuture (runCommand (requestProto , null )::toByteString );
506514 } catch (IOException e ) {
507515 metrics .incNumQueryStateMachineFails ();
508516 return completeExceptionally (e );
@@ -674,30 +682,58 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
674682 if (cmdType == Type .WriteChunk || cmdType ==Type .PutSmallFile ) {
675683 builder .setCreateContainerSet (createContainerSet );
676684 }
685+ CompletableFuture <Message > applyTransactionFuture =
686+ new CompletableFuture <>();
677687 // Ensure the command gets executed in a separate thread than
678688 // stateMachineUpdater thread which is calling applyTransaction here.
679- CompletableFuture <Message > future = CompletableFuture
680- .supplyAsync (() -> runCommand (requestProto , builder .build ()),
689+ CompletableFuture <ContainerCommandResponseProto > future =
690+ CompletableFuture .supplyAsync (
691+ () -> runCommand (requestProto , builder .build ()),
681692 getCommandExecutor (requestProto ));
682-
683- future .thenAccept (m -> {
693+ future .thenApply (r -> {
684694 if (trx .getServerRole () == RaftPeerRole .LEADER ) {
685695 long startTime = (long ) trx .getStateMachineContext ();
686696 metrics .incPipelineLatency (cmdType ,
687697 Time .monotonicNowNanos () - startTime );
688698 }
689-
690- final Long previous =
691- applyTransactionCompletionMap
699+ if (r .getResult () != ContainerProtos .Result .SUCCESS ) {
700+ StorageContainerException sce =
701+ new StorageContainerException (r .getMessage (), r .getResult ());
702+ LOG .error (
703+ "gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : "
704+ + "{} Container Result: {}" , gid , r .getCmdType (), index ,
705+ r .getMessage (), r .getResult ());
706+ metrics .incNumApplyTransactionsFails ();
707+ // Since the applyTransaction now is completed exceptionally,
708+ // before any further snapshot is taken , the exception will be
709+ // caught in stateMachineUpdater in Ratis and ratis server will
710+ // shutdown.
711+ applyTransactionFuture .completeExceptionally (sce );
712+ isStateMachineHealthy .compareAndSet (true , false );
713+ ratisServer .handleApplyTransactionFailure (gid , trx .getServerRole ());
714+ } else {
715+ LOG .debug (
716+ "gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : "
717+ + "{} Container Result: {}" , gid , r .getCmdType (), index ,
718+ r .getMessage (), r .getResult ());
719+ applyTransactionFuture .complete (r ::toByteString );
720+ if (cmdType == Type .WriteChunk || cmdType == Type .PutSmallFile ) {
721+ metrics .incNumBytesCommittedCount (
722+ requestProto .getWriteChunk ().getChunkData ().getLen ());
723+ }
724+ // add the entry to the applyTransactionCompletionMap only if the
725+ // stateMachine is healthy i.e, there has been no applyTransaction
726+ // failures before.
727+ if (isStateMachineHealthy .get ()) {
728+ final Long previous = applyTransactionCompletionMap
692729 .put (index , trx .getLogEntry ().getTerm ());
693- Preconditions .checkState (previous == null );
694- if (cmdType == Type .WriteChunk || cmdType == Type .PutSmallFile ) {
695- metrics .incNumBytesCommittedCount (
696- requestProto .getWriteChunk ().getChunkData ().getLen ());
730+ Preconditions .checkState (previous == null );
731+ updateLastApplied ();
732+ }
697733 }
698- updateLastApplied () ;
734+ return applyTransactionFuture ;
699735 }).whenComplete ((r , t ) -> applyTransactionSemaphore .release ());
700- return future ;
736+ return applyTransactionFuture ;
701737 } catch (IOException | InterruptedException e ) {
702738 metrics .incNumApplyTransactionsFails ();
703739 return completeExceptionally (e );
0 commit comments