@@ -402,7 +402,7 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws I
402402 break ;
403403 }
404404 final FileInfo fileToRecover ;
405- final FileSession fileSession ;
405+ final FileSession prevFileSession ;
406406 synchronized (mutex ) {
407407 if (inFlightRequests .isEmpty () && remainingFiles .isEmpty ()) {
408408 break ;
@@ -418,15 +418,17 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws I
418418 }
419419 final Map .Entry <FileInfo , FileSession > minEntry =
420420 inFlightRequests .entrySet ().stream ().min (Comparator .comparingLong (e -> e .getValue ().lastTrackedSeqNo )).get ();
421- fileSession = minEntry .getValue ();
421+ prevFileSession = minEntry .getValue ();
422422 fileToRecover = minEntry .getKey ();
423423 }
424424 try {
425- requestSeqIdTracker .waitForOpsToComplete (fileSession .lastTrackedSeqNo );
425+ requestSeqIdTracker .waitForOpsToComplete (prevFileSession .lastTrackedSeqNo );
426+ final FileSession fileSession ;
426427 synchronized (mutex ) {
428+ fileSession = inFlightRequests .get (fileToRecover );
427429 // if file has been removed in the mean-while, it means that restore of this file completed, so start working
428430 // on the next one
429- if (inFlightRequests . containsKey ( fileToRecover ) == false ) {
431+ if (fileSession == null ) {
430432 continue ;
431433 }
432434 }
@@ -439,7 +441,8 @@ protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws I
439441 fileToRecover .length () - fileSession .lastOffset ));
440442 final GetCcrRestoreFileChunkRequest request =
441443 new GetCcrRestoreFileChunkRequest (node , sessionUUID , fileToRecover .name (), bytesRequested );
442- logger .trace ("[{}] [{}] fetching chunk for file [{}]" , shardId , snapshotId , fileToRecover .name ());
444+ logger .trace ("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}" , shardId , snapshotId ,
445+ fileToRecover .name (), fileSession .lastOffset , bytesRequested );
443446
444447 remoteClient .execute (GetCcrRestoreFileChunkAction .INSTANCE , request ,
445448 ActionListener .wrap (
@@ -453,23 +456,32 @@ public void onFailure(Exception e) {
453456 @ Override
454457 protected void doRun () throws Exception {
455458 final int actualChunkSize = r .getChunk ().length ();
459+ logger .trace ("[{}] [{}] got response for file [{}], offset: {}, length: {}" , shardId ,
460+ snapshotId , fileToRecover .name (), r .getOffset (), actualChunkSize );
456461 final long nanosPaused = ccrSettings .getRateLimiter ().maybePause (actualChunkSize );
457462 throttleListener .accept (nanosPaused );
458463 final long newOffset = r .getOffset () + actualChunkSize ;
464+
465+ assert r .getOffset () == fileSession .lastOffset ;
466+ assert actualChunkSize == bytesRequested ;
459467 assert newOffset <= fileToRecover .length ();
460468 final boolean lastChunk = newOffset >= fileToRecover .length ();
461469 multiFileWriter .writeFileChunk (fileToRecover .metadata (), r .getOffset (), r .getChunk (),
462470 lastChunk );
463471 if (lastChunk ) {
464472 synchronized (mutex ) {
465- final FileSession session = inFlightRequests .remove (fileToRecover );
466- assert session != null : "session disappeared for " + fileToRecover .name ();
473+ final FileSession removed = inFlightRequests .remove (fileToRecover );
474+ assert removed != null : "session disappeared for " + fileToRecover .name ();
475+ assert removed .lastTrackedSeqNo == requestSeqId ;
476+ assert removed .lastOffset == fileSession .lastOffset ;
467477 }
468478 } else {
469479 synchronized (mutex ) {
470480 final FileSession replaced = inFlightRequests .replace (fileToRecover ,
471481 new FileSession (requestSeqId , newOffset ));
472482 assert replaced != null : "session disappeared for " + fileToRecover .name ();
483+ assert replaced .lastTrackedSeqNo == requestSeqId ;
484+ assert replaced .lastOffset == fileSession .lastOffset ;
473485 }
474486 }
475487 requestSeqIdTracker .markSeqNoAsCompleted (requestSeqId );
0 commit comments