2929import org .elasticsearch .action .admin .cluster .snapshots .status .SnapshotStats ;
3030import org .elasticsearch .action .admin .cluster .snapshots .status .SnapshotStatus ;
3131import org .elasticsearch .action .admin .cluster .snapshots .status .SnapshotsStatusResponse ;
32+ import org .elasticsearch .action .admin .indices .stats .ShardStats ;
3233import org .elasticsearch .action .index .IndexRequestBuilder ;
3334import org .elasticsearch .action .support .ActiveShardCount ;
3435import org .elasticsearch .action .support .master .AcknowledgedResponse ;
4041import org .elasticsearch .cluster .NamedDiff ;
4142import org .elasticsearch .cluster .SnapshotsInProgress ;
4243import org .elasticsearch .cluster .health .ClusterHealthStatus ;
44+ import org .elasticsearch .cluster .metadata .IndexMetaData ;
4345import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
4446import org .elasticsearch .cluster .metadata .MetaData ;
4547import org .elasticsearch .cluster .routing .allocation .decider .EnableAllocationDecider ;
6163import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
6264import org .elasticsearch .common .xcontent .XContentParser ;
6365import org .elasticsearch .env .Environment ;
66+ import org .elasticsearch .index .seqno .RetentionLeaseActions ;
67+ import org .elasticsearch .index .seqno .RetentionLeases ;
68+ import org .elasticsearch .index .shard .ShardId ;
6469import org .elasticsearch .indices .recovery .RecoveryState ;
6570import org .elasticsearch .node .Node ;
6671import org .elasticsearch .plugins .Plugin ;
9499import java .util .Collections ;
95100import java .util .EnumSet ;
96101import java .util .List ;
102+ import java .util .Locale ;
97103import java .util .concurrent .CountDownLatch ;
98104import java .util .concurrent .TimeUnit ;
99105import java .util .concurrent .atomic .AtomicReference ;
100106import java .util .function .Consumer ;
101107
108+ import static org .elasticsearch .index .seqno .RetentionLeaseActions .RETAIN_ALL ;
102109import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
110+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertHitCount ;
103111import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertThrows ;
104112import static org .hamcrest .Matchers .allOf ;
105113import static org .hamcrest .Matchers .containsString ;
@@ -1227,6 +1235,79 @@ public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
12271235 }, 60L , TimeUnit .SECONDS );
12281236 }
12291237
1238+ public void testRetentionLeasesClearedOnRestore () throws Exception {
1239+ final String repoName = "test-repo-retention-leases" ;
1240+ assertAcked (client ().admin ().cluster ().preparePutRepository (repoName )
1241+ .setType ("fs" )
1242+ .setSettings (Settings .builder ()
1243+ .put ("location" , randomRepoPath ())
1244+ .put ("compress" , randomBoolean ())));
1245+
1246+ final String indexName = "index-retention-leases" ;
1247+ final int shardCount = randomIntBetween (1 , 5 );
1248+ assertAcked (client ().admin ().indices ().prepareCreate (indexName )
1249+ .setSettings (Settings .builder ()
1250+ .put (IndexMetaData .SETTING_NUMBER_OF_SHARDS , shardCount )
1251+ .put (IndexMetaData .SETTING_NUMBER_OF_REPLICAS , 0 ))
1252+ .get ());
1253+ final ShardId shardId = new ShardId (resolveIndex (indexName ), randomIntBetween (0 , shardCount - 1 ));
1254+
1255+ final int snapshotDocCount = iterations (10 , 1000 );
1256+ logger .debug ("--> indexing {} docs into {}" , snapshotDocCount , indexName );
1257+ IndexRequestBuilder [] indexRequestBuilders = new IndexRequestBuilder [snapshotDocCount ];
1258+ for (int i = 0 ; i < snapshotDocCount ; i ++) {
1259+ indexRequestBuilders [i ] = client ().prepareIndex (indexName , "_doc" ).setSource ("field" , "value" );
1260+ }
1261+ indexRandom (true , indexRequestBuilders );
1262+ assertHitCount (client ().prepareSearch (indexName ).setSize (0 ).get (), snapshotDocCount );
1263+
1264+ final String leaseId = randomAlphaOfLength (randomIntBetween (1 , 10 )).toLowerCase (Locale .ROOT );
1265+ logger .debug ("--> adding retention lease with id {} to {}" , leaseId , shardId );
1266+ client ().execute (RetentionLeaseActions .Add .INSTANCE , new RetentionLeaseActions .AddRequest (
1267+ shardId , leaseId , RETAIN_ALL , "test" )).actionGet ();
1268+
1269+ final ShardStats shardStats = Arrays .stream (client ().admin ().indices ().prepareStats (indexName ).get ().getShards ())
1270+ .filter (s -> s .getShardRouting ().shardId ().equals (shardId )).findFirst ().get ();
1271+ final RetentionLeases retentionLeases = shardStats .getRetentionLeaseStats ().retentionLeases ();
1272+ assertTrue (shardStats + ": " + retentionLeases , retentionLeases .contains (leaseId ));
1273+
1274+ final String snapshotName = "snapshot-retention-leases" ;
1275+ logger .debug ("--> create snapshot {}:{}" , repoName , snapshotName );
1276+ CreateSnapshotResponse createResponse = client ().admin ().cluster ().prepareCreateSnapshot (repoName , snapshotName )
1277+ .setWaitForCompletion (true ).setIndices (indexName ).get ();
1278+ assertThat (createResponse .getSnapshotInfo ().successfulShards (), equalTo (shardCount ));
1279+ assertThat (createResponse .getSnapshotInfo ().failedShards (), equalTo (0 ));
1280+
1281+ if (randomBoolean ()) {
1282+ final int extraDocCount = iterations (10 , 1000 );
1283+ logger .debug ("--> indexing {} extra docs into {}" , extraDocCount , indexName );
1284+ indexRequestBuilders = new IndexRequestBuilder [extraDocCount ];
1285+ for (int i = 0 ; i < extraDocCount ; i ++) {
1286+ indexRequestBuilders [i ] = client ().prepareIndex (indexName , "_doc" ).setSource ("field" , "value" );
1287+ }
1288+ indexRandom (true , indexRequestBuilders );
1289+ }
1290+
1291+ // Wait for green so the close does not fail in the edge case of coinciding with a shard recovery that hasn't fully synced yet
1292+ ensureGreen ();
1293+ logger .debug ("--> close index {}" , indexName );
1294+ assertAcked (client ().admin ().indices ().prepareClose (indexName ));
1295+
1296+ logger .debug ("--> restore index {} from snapshot" , indexName );
1297+ RestoreSnapshotResponse restoreResponse = client ().admin ().cluster ().prepareRestoreSnapshot (repoName , snapshotName )
1298+ .setWaitForCompletion (true ).get ();
1299+ assertThat (restoreResponse .getRestoreInfo ().successfulShards (), equalTo (shardCount ));
1300+ assertThat (restoreResponse .getRestoreInfo ().failedShards (), equalTo (0 ));
1301+
1302+ ensureGreen ();
1303+ assertHitCount (client ().prepareSearch (indexName ).setSize (0 ).get (), snapshotDocCount );
1304+
1305+ final RetentionLeases restoredRetentionLeases = Arrays .stream (client ().admin ().indices ().prepareStats (indexName ).get ()
1306+ .getShards ()).filter (s -> s .getShardRouting ().shardId ().equals (shardId )).findFirst ().get ()
1307+ .getRetentionLeaseStats ().retentionLeases ();
1308+ assertFalse (restoredRetentionLeases .toString () + " has no " + leaseId , restoredRetentionLeases .contains (leaseId ));
1309+ }
1310+
12301311 private long calculateTotalFilesSize (List <Path > files ) {
12311312 return files .stream ().mapToLong (f -> {
12321313 try {
@@ -1237,7 +1318,6 @@ private long calculateTotalFilesSize(List<Path> files) {
12371318 }).sum ();
12381319 }
12391320
1240-
12411321 private List <Path > scanSnapshotFolder (Path repoPath ) throws IOException {
12421322 List <Path > files = new ArrayList <>();
12431323 Files .walkFileTree (repoPath , new SimpleFileVisitor <Path >(){
0 commit comments