Skip to content

Commit

Permalink
Keep track of data recovered from snapshots in RecoveryState (elastic…
Browse files Browse the repository at this point in the history
…#76499)

Adds new field to recovery API to keep track of amount of data
recovered from snapshots.

The normal recovered_bytes field remains and is also increased for
recovery from snapshot but can go backwards in the unlikely case
that recovery from snapshot fails to download a file.

Relates elastic#73496
  • Loading branch information
fcofdez authored and henningandersen committed Aug 16, 2021
1 parent 79b786c commit 105a699
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 45 deletions.
10 changes: 8 additions & 2 deletions docs/reference/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ The API returns the following response:
"reused_in_bytes" : 0,
"recovered" : "65.7mb",
"recovered_in_bytes" : 68891939,
"recovered_from_snapshot" : "0b",
"recovered_from_snapshot_in_bytes" : 0,
"percent" : "87.1%"
},
"files" : {
Expand Down Expand Up @@ -380,6 +382,8 @@ The API returns the following response:
"reused_in_bytes" : 26001617,
"recovered" : "0b",
"recovered_in_bytes" : 0,
"recovered_from_snapshot" : "0b",
"recovered_from_snapshot_in_bytes" : 0,
"percent" : "100.0%"
},
"files" : {
Expand All @@ -394,11 +398,13 @@ The API returns the following response:
}, {
"name" : "_0.cfs",
"length" : 135306,
"recovered" : 135306
"recovered" : 135306,
"recovered_from_snapshot": 0
}, {
"name" : "segments_2",
"length" : 251,
"recovered" : 251
"recovered" : 251,
"recovered_from_snapshot": 0
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.recovery;

import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand All @@ -24,10 +25,13 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

public abstract class AbstractSnapshotBasedRecoveryRestTestCase extends ESRestTestCase {
Expand Down Expand Up @@ -57,9 +61,9 @@ public void testRecoveryUsingSnapshots() throws Exception {
);
ensureGreen(indexName);

final int numDocs = randomIntBetween(1, 500);
final int numDocs = randomIntBetween(500, 1000);
indexDocs(indexName, numDocs);

waitUntilGlobalCheckpointIsStable(indexName);
forceMerge(indexName, randomBoolean(), randomBoolean());

deleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, true);
Expand All @@ -68,28 +72,88 @@ public void testRecoveryUsingSnapshots() throws Exception {
// Add a new replica
updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1));
ensureGreen(indexName);
assertSnapshotIsUsed(indexName);

assertMatchAllReturnsAllDocuments(indexName, numDocs);
assertMatchQueryReturnsAllDocuments(indexName, numDocs);

for (int i = 0; i < 4; i++) {
assertSearchResultsAreCorrect(indexName, numDocs);
}
deleteSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, false);
}

private void assertSearchResultsAreCorrect(String indexName, int numDocs) throws IOException {
if (randomBoolean()) {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchAllQuery());
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
List<Map<String, Object>> hits = extractValue(searchResults, "hits.hits");
for (Map<String, Object> hit : hits) {
String docId = extractValue(hit, "_id");
assertThat(Integer.parseInt(docId), allOf(greaterThanOrEqualTo(0), lessThan(numDocs)));
assertThat(extractValue(hit, "_source.field"), equalTo(Integer.parseInt(docId)));
assertThat(extractValue(hit, "_source.text"), equalTo("Some text " + docId));
private void waitUntilGlobalCheckpointIsStable(String index) throws Exception {
assertBusy(() -> {
Request request = new Request(HttpGet.METHOD_NAME, '/' + index + "/_stats?level=shards");
Response response = client().performRequest(request);
assertOK(response);
Map<String, Object> responseAsMap = responseAsMap(response);
Map<String, Object> indices = extractValue(responseAsMap, "indices");
Map<String, Object> indexShardsStats = extractValue(extractValue(indices, index), "shards");
List<Map<String, Object>> shardStats = extractValue(indexShardsStats, "0");
for (Map<String, Object> shardStat : shardStats) {
final boolean isPrimary = extractValue(shardStat, "routing.primary");
if (isPrimary == false) {
continue;
}
Map<Object, Integer> seqNos = extractValue(shardStat, "seq_no");
assertThat(seqNos.toString(), seqNos.get("max_seq_no"), is(equalTo(seqNos.get("global_checkpoint"))));
}
}, 60, TimeUnit.SECONDS);
}

private void assertMatchAllReturnsAllDocuments(String indexName, int numDocs) throws IOException {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchAllQuery());
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
List<Map<String, Object>> hits = extractValue(searchResults, "hits.hits");
for (Map<String, Object> hit : hits) {
String docId = extractValue(hit, "_id");
assertThat(Integer.parseInt(docId), allOf(greaterThanOrEqualTo(0), lessThan(numDocs)));
assertThat(extractValue(hit, "_source.field"), equalTo(Integer.parseInt(docId)));
assertThat(extractValue(hit, "_source.text"), equalTo("Some text " + docId));
}
}

private void assertSnapshotIsUsed(String index) throws Exception {
Request request = new Request(HttpGet.METHOD_NAME, '/' + index + "/_recovery?detailed=true");
Response response = client().performRequest(request);
assertOK(response);
Map<String, Object> responseAsMap = responseAsMap(response);
List<Map<String, Object>> shardRecoveries = extractValue(responseAsMap, index + ".shards");
long totalRecoveredFromSnapshot = 0;
for (Map<String, Object> shardRecoveryState : shardRecoveries) {
String recoveryType = extractValue(shardRecoveryState, "type");
if (recoveryType.equals("PEER") == false) {
continue;
}
String stage = extractValue(shardRecoveryState, "stage");
assertThat(stage, is(equalTo("DONE")));

List<Map<String, Object>> fileDetails = extractValue(shardRecoveryState, "index.files.details");
for (Map<String, Object> fileDetail : fileDetails) {
int recoveredFromSnapshot = extractValue(fileDetail, "recovered_from_snapshot_in_bytes");
assertThat(recoveredFromSnapshot, is(greaterThan(0)));
totalRecoveredFromSnapshot += recoveredFromSnapshot;
}
} else {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchQuery("text", "some"));
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
}
long snapshotSize = getSnapshotSizeForIndex(index);
assertThat(totalRecoveredFromSnapshot, is(greaterThan(0L)));
assertThat(totalRecoveredFromSnapshot, is(equalTo(snapshotSize)));
}

private int getSnapshotSizeForIndex(String indexName) throws Exception {
Request request = new Request(HttpGet.METHOD_NAME, "/_snapshot/" + REPOSITORY_NAME + "/" + SNAPSHOT_NAME);
request.addParameter("index_details", "true");
Response response = client().performRequest(request);
assertOK(response);
Map<String, Object> snapshotsResponse = responseAsMap(response);
List<Map<String, Object>> snapshots = extractValue(snapshotsResponse, "snapshots");
assertThat(snapshots.size(), is(equalTo(1)));
Map<String, Object> snapshot = snapshots.get(0);
return extractValue(snapshot, "index_details." + indexName + ".size_in_bytes");
}

private void assertMatchQueryReturnsAllDocuments(String indexName, int numDocs) throws IOException {
Map<String, Object> searchResults = search(indexName, QueryBuilders.matchQuery("text", "some"));
assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
}

private static void forceMerge(String index, boolean onlyExpungeDeletes, boolean flush) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,52 @@
---
"Indices recovery test":
- skip:
# todo: change after backport
version: " - 7.14.99"
reason: recovery from snapshot bytes not available until 7.15

- do:
indices.create:
index: test_1
body:
settings:
index:
number_of_replicas: 0

- do:
cluster.health:
wait_for_status: green

- do:
indices.recovery:
index: [test_1]
human: true

- match: { test_1.shards.0.type: "EMPTY_STORE" }
- match: { test_1.shards.0.stage: "DONE" }
- match: { test_1.shards.0.primary: true }
- match: { test_1.shards.0.start_time: /^2\d\d\d-.+/ }
- match: { test_1.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ }
- gte: { test_1.shards.0.index.files.total: 0 }
- gte: { test_1.shards.0.index.files.reused: 0 }
- gte: { test_1.shards.0.index.files.recovered: 0 }
- match: { test_1.shards.0.index.files.percent: /^\d+\.\d\%$/ }
- gte: { test_1.shards.0.index.size.total_in_bytes: 0 }
- gte: { test_1.shards.0.index.size.reused_in_bytes: 0 }
- gte: { test_1.shards.0.index.size.recovered_in_bytes: 0 }
- gte: { test_1.shards.0.index.size.recovered_from_snapshot_in_bytes: 0 }
- match: { test_1.shards.0.index.size.percent: /^\d+\.\d\%$/ }
- gte: { test_1.shards.0.index.source_throttle_time_in_millis: 0 }
- gte: { test_1.shards.0.index.target_throttle_time_in_millis: 0 }
- gte: { test_1.shards.0.translog.recovered: 0 }
- gte: { test_1.shards.0.translog.total: -1 }
- gte: { test_1.shards.0.translog.total_on_start: 0 }
- gte: { test_1.shards.0.translog.total_time_in_millis: 0 }
- gte: { test_1.shards.0.verify_index.check_index_time_in_millis: 0 }
- gte: { test_1.shards.0.verify_index.total_time_in_millis: 0 }

---
"Indices recovery test without recovery from snapshot":

- do:
indices.create:
Expand Down Expand Up @@ -71,27 +118,27 @@
index: [test_2]
human: true

- match: { test_2.shards.0.type: "EXISTING_STORE" }
- match: { test_2.shards.0.stage: "DONE" }
- match: { test_2.shards.0.primary: true }
- match: { test_2.shards.0.start_time: /^2\d\d\d-.+/ }
- match: { test_2.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ }
- gte: { test_2.shards.0.index.files.total: 0 }
- gte: { test_2.shards.0.index.files.reused: 0 }
- gte: { test_2.shards.0.index.files.recovered: 0 }
- match: { test_2.shards.0.index.files.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.size.total_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.reused_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.recovered_in_bytes: 0 }
- match: { test_2.shards.0.index.size.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.source_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.index.target_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.translog.recovered: 0 }
- gte: { test_2.shards.0.translog.total: 0 }
- gte: { test_2.shards.0.translog.total_on_start: 0 }
- gte: { test_2.shards.0.translog.total_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.check_index_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.total_time_in_millis: 0 }
- match: { test_2.shards.0.type: "EXISTING_STORE" }
- match: { test_2.shards.0.stage: "DONE" }
- match: { test_2.shards.0.primary: true }
- match: { test_2.shards.0.start_time: /^2\d\d\d-.+/ }
- match: { test_2.shards.0.target.ip: /^\d+\.\d+\.\d+\.\d+$/ }
- gte: { test_2.shards.0.index.files.total: 0 }
- gte: { test_2.shards.0.index.files.reused: 0 }
- gte: { test_2.shards.0.index.files.recovered: 0 }
- match: { test_2.shards.0.index.files.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.size.total_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.reused_in_bytes: 0 }
- gte: { test_2.shards.0.index.size.recovered_in_bytes: 0 }
- match: { test_2.shards.0.index.size.percent: /^\d+\.\d\%$/ }
- gte: { test_2.shards.0.index.source_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.index.target_throttle_time_in_millis: 0 }
- gte: { test_2.shards.0.translog.recovered: 0 }
- gte: { test_2.shards.0.translog.total: 0 }
- gte: { test_2.shards.0.translog.total_on_start: 0 }
- gte: { test_2.shards.0.translog.total_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.check_index_time_in_millis: 0 }
- gte: { test_2.shards.0.verify_index.total_time_in_millis: 0 }
---
"Indices recovery test index name not matching":

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ public void testPeerRecoveryUsesSnapshots() throws Exception {

// segments_N and .si files are recovered from the file metadata directly
long expectedRecoveredBytesFromRepo = 0;
long totalBytesRecoveredFromSnapshot = 0;
for (RecoveryState.FileDetail fileDetail : recoveryState.getIndex().fileDetails()) {
totalBytesRecoveredFromSnapshot += fileDetail.recoveredFromSnapshot();
if (fileDetail.name().startsWith("segments") || fileDetail.name().endsWith(".si")) {
continue;
}
Expand All @@ -264,6 +266,7 @@ public void testPeerRecoveryUsesSnapshots() throws Exception {
long snapshotSizeForIndex = getSnapshotSizeForIndex(repoName, snapshot, indexName);
assertThat(repository.totalBytesRead.get(), is(greaterThan(0L)));
assertThat(repository.totalBytesRead.get(), is(lessThanOrEqualTo(snapshotSizeForIndex)));
assertThat(totalBytesRecoveredFromSnapshot, is(equalTo(snapshotSizeForIndex)));

assertDocumentsAreEqual(indexName, numDocs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void writeFile(StoreFileMetadata fileMetadata, long readSnapshotFileBuffe
long bytesWritten = 0;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, length);
indexState.addRecoveredBytesToFile(fileName, length);
indexState.addRecoveredFromSnapshotBytesToFile(fileName, length);
bytesWritten += length;
}

Expand Down
Loading

0 comments on commit 105a699

Please sign in to comment.