3030import org .elasticsearch .action .admin .cluster .snapshots .status .SnapshotStats ;
3131import org .elasticsearch .action .admin .cluster .snapshots .status .SnapshotStatus ;
3232import org .elasticsearch .action .admin .cluster .snapshots .status .SnapshotsStatusResponse ;
33+ import org .elasticsearch .action .admin .indices .stats .ShardStats ;
3334import org .elasticsearch .action .index .IndexRequestBuilder ;
3435import org .elasticsearch .action .support .ActiveShardCount ;
3536import org .elasticsearch .action .support .master .AcknowledgedResponse ;
4142import org .elasticsearch .cluster .NamedDiff ;
4243import org .elasticsearch .cluster .SnapshotsInProgress ;
4344import org .elasticsearch .cluster .health .ClusterHealthStatus ;
45+ import org .elasticsearch .cluster .metadata .IndexMetaData ;
4446import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
4547import org .elasticsearch .cluster .metadata .MetaData ;
4648import org .elasticsearch .cluster .routing .allocation .decider .EnableAllocationDecider ;
6264import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
6365import org .elasticsearch .common .xcontent .XContentParser ;
6466import org .elasticsearch .env .Environment ;
67+ import org .elasticsearch .index .seqno .RetentionLeaseActions ;
68+ import org .elasticsearch .index .seqno .RetentionLeases ;
69+ import org .elasticsearch .index .shard .ShardId ;
6570import org .elasticsearch .indices .recovery .RecoveryState ;
6671import org .elasticsearch .node .Node ;
6772import org .elasticsearch .plugins .Plugin ;
95100import java .util .Collections ;
96101import java .util .EnumSet ;
97102import java .util .List ;
103+ import java .util .Locale ;
98104import java .util .concurrent .CountDownLatch ;
99105import java .util .concurrent .TimeUnit ;
100106import java .util .concurrent .atomic .AtomicReference ;
101107import java .util .function .Consumer ;
102108
109+ import static org .elasticsearch .index .seqno .RetentionLeaseActions .RETAIN_ALL ;
103110import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
111+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertHitCount ;
104112import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertThrows ;
105113import static org .hamcrest .Matchers .allOf ;
106114import static org .hamcrest .Matchers .containsString ;
@@ -1229,6 +1237,79 @@ public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
12291237 }, 60L , TimeUnit .SECONDS );
12301238 }
12311239
1240+ public void testRetentionLeasesClearedOnRestore () throws Exception {
1241+ final String repoName = "test-repo-retention-leases" ;
1242+ assertAcked (client ().admin ().cluster ().preparePutRepository (repoName )
1243+ .setType ("fs" )
1244+ .setSettings (Settings .builder ()
1245+ .put ("location" , randomRepoPath ())
1246+ .put ("compress" , randomBoolean ())));
1247+
1248+ final String indexName = "index-retention-leases" ;
1249+ final int shardCount = randomIntBetween (1 , 5 );
1250+ assertAcked (client ().admin ().indices ().prepareCreate (indexName )
1251+ .setSettings (Settings .builder ()
1252+ .put (IndexMetaData .SETTING_NUMBER_OF_SHARDS , shardCount )
1253+ .put (IndexMetaData .SETTING_NUMBER_OF_REPLICAS , 0 ))
1254+ .get ());
1255+ final ShardId shardId = new ShardId (resolveIndex (indexName ), randomIntBetween (0 , shardCount - 1 ));
1256+
1257+ final int snapshotDocCount = iterations (10 , 1000 );
1258+ logger .debug ("--> indexing {} docs into {}" , snapshotDocCount , indexName );
1259+ IndexRequestBuilder [] indexRequestBuilders = new IndexRequestBuilder [snapshotDocCount ];
1260+ for (int i = 0 ; i < snapshotDocCount ; i ++) {
1261+ indexRequestBuilders [i ] = client ().prepareIndex (indexName , "_doc" ).setSource ("field" , "value" );
1262+ }
1263+ indexRandom (true , indexRequestBuilders );
1264+ assertHitCount (client ().prepareSearch (indexName ).setSize (0 ).get (), snapshotDocCount );
1265+
1266+ final String leaseId = randomAlphaOfLength (randomIntBetween (1 , 10 )).toLowerCase (Locale .ROOT );
1267+ logger .debug ("--> adding retention lease with id {} to {}" , leaseId , shardId );
1268+ client ().execute (RetentionLeaseActions .Add .INSTANCE , new RetentionLeaseActions .AddRequest (
1269+ shardId , leaseId , RETAIN_ALL , "test" )).actionGet ();
1270+
1271+ final ShardStats shardStats = Arrays .stream (client ().admin ().indices ().prepareStats (indexName ).get ().getShards ())
1272+ .filter (s -> s .getShardRouting ().shardId ().equals (shardId )).findFirst ().get ();
1273+ final RetentionLeases retentionLeases = shardStats .getRetentionLeaseStats ().retentionLeases ();
1274+ assertTrue (shardStats + ": " + retentionLeases , retentionLeases .contains (leaseId ));
1275+
1276+ final String snapshotName = "snapshot-retention-leases" ;
1277+ logger .debug ("--> create snapshot {}:{}" , repoName , snapshotName );
1278+ CreateSnapshotResponse createResponse = client ().admin ().cluster ().prepareCreateSnapshot (repoName , snapshotName )
1279+ .setWaitForCompletion (true ).setIndices (indexName ).get ();
1280+ assertThat (createResponse .getSnapshotInfo ().successfulShards (), equalTo (shardCount ));
1281+ assertThat (createResponse .getSnapshotInfo ().failedShards (), equalTo (0 ));
1282+
1283+ if (randomBoolean ()) {
1284+ final int extraDocCount = iterations (10 , 1000 );
1285+ logger .debug ("--> indexing {} extra docs into {}" , extraDocCount , indexName );
1286+ indexRequestBuilders = new IndexRequestBuilder [extraDocCount ];
1287+ for (int i = 0 ; i < extraDocCount ; i ++) {
1288+ indexRequestBuilders [i ] = client ().prepareIndex (indexName , "_doc" ).setSource ("field" , "value" );
1289+ }
1290+ indexRandom (true , indexRequestBuilders );
1291+ }
1292+
1293+ // Wait for green so the close does not fail in the edge case of coinciding with a shard recovery that hasn't fully synced yet
1294+ ensureGreen ();
1295+ logger .debug ("--> close index {}" , indexName );
1296+ assertAcked (client ().admin ().indices ().prepareClose (indexName ));
1297+
1298+ logger .debug ("--> restore index {} from snapshot" , indexName );
1299+ RestoreSnapshotResponse restoreResponse = client ().admin ().cluster ().prepareRestoreSnapshot (repoName , snapshotName )
1300+ .setWaitForCompletion (true ).get ();
1301+ assertThat (restoreResponse .getRestoreInfo ().successfulShards (), equalTo (shardCount ));
1302+ assertThat (restoreResponse .getRestoreInfo ().failedShards (), equalTo (0 ));
1303+
1304+ ensureGreen ();
1305+ assertHitCount (client ().prepareSearch (indexName ).setSize (0 ).get (), snapshotDocCount );
1306+
1307+ final RetentionLeases restoredRetentionLeases = Arrays .stream (client ().admin ().indices ().prepareStats (indexName ).get ()
1308+ .getShards ()).filter (s -> s .getShardRouting ().shardId ().equals (shardId )).findFirst ().get ()
1309+ .getRetentionLeaseStats ().retentionLeases ();
1310+ assertFalse (restoredRetentionLeases .toString () + " has no " + leaseId , restoredRetentionLeases .contains (leaseId ));
1311+ }
1312+
12321313 private long calculateTotalFilesSize (List <Path > files ) {
12331314 return files .stream ().mapToLong (f -> {
12341315 try {
@@ -1239,7 +1320,6 @@ private long calculateTotalFilesSize(List<Path> files) {
12391320 }).sum ();
12401321 }
12411322
1242-
12431323 private List <Path > scanSnapshotFolder (Path repoPath ) throws IOException {
12441324 List <Path > files = new ArrayList <>();
12451325 Files .walkFileTree (repoPath , new SimpleFileVisitor <Path >(){
0 commit comments