Skip to content

Commit 77696aa

Browse files
committed
avoid remounting 3rd time
1 parent c56e4e0 commit 77696aa

File tree

2 files changed

+90
-131
lines changed

2 files changed

+90
-131
lines changed

x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java

Lines changed: 22 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -214,62 +214,6 @@ public void testBlobStoreCache() throws Exception {
214214
);
215215
}
216216

217-
logger.info("--> verifying number of documents in index [{}]", restoredIndex);
218-
assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
219-
assertAcked(client().admin().indices().prepareDelete(restoredIndex));
220-
221-
assertBusy(() -> {
222-
refreshSystemIndex();
223-
assertThat(
224-
systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value,
225-
greaterThan(0L)
226-
);
227-
});
228-
229-
logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage);
230-
final String restoredIndexSecondTime = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
231-
mountSnapshot(
232-
repositoryName,
233-
snapshot.getName(),
234-
indexName,
235-
restoredIndexSecondTime,
236-
Settings.builder()
237-
.put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true)
238-
.put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false)
239-
.put(SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH, blobCacheMaxLength)
240-
.build(),
241-
storage
242-
);
243-
ensureGreen(restoredIndexSecondTime);
244-
245-
// wait for all async cache fills to complete
246-
assertBusy(() -> {
247-
for (final SearchableSnapshotShardStats shardStats : client().execute(
248-
SearchableSnapshotsStatsAction.INSTANCE,
249-
new SearchableSnapshotsStatsRequest()
250-
).actionGet().getStats()) {
251-
for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) {
252-
assertThat(Strings.toString(indexInputStats), indexInputStats.getCurrentIndexCacheFills(), equalTo(0L));
253-
}
254-
}
255-
});
256-
257-
logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX);
258-
if (numberOfDocs > 0) {
259-
ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX);
260-
refreshSystemIndex();
261-
262-
logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX);
263-
assertThat(
264-
systemClient().admin()
265-
.indices()
266-
.prepareGetSettings(SNAPSHOT_BLOB_CACHE_INDEX)
267-
.get()
268-
.getSetting(SNAPSHOT_BLOB_CACHE_INDEX, DataTierAllocationDecider.INDEX_ROUTING_PREFER),
269-
equalTo(DATA_TIERS_CACHE_INDEX_PREFERENCE)
270-
);
271-
}
272-
273217
final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX)
274218
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
275219
.get()
@@ -286,25 +230,35 @@ public void testBlobStoreCache() throws Exception {
286230
.getIndexing();
287231
final long numberOfCacheWrites = indexingStats != null ? indexingStats.getTotal().getIndexCount() : 0L;
288232

289-
assertAcked(client().admin().indices().prepareDelete(restoredIndexSecondTime));
233+
logger.info("--> verifying number of documents in index [{}]", restoredIndex);
234+
assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
235+
assertAcked(client().admin().indices().prepareDelete(restoredIndex));
236+
237+
assertBusy(() -> {
238+
refreshSystemIndex();
239+
assertThat(
240+
systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value,
241+
greaterThan(0L)
242+
);
243+
});
290244

291-
logger.info("--> mount snapshot [{}] as an index for the third time [storage={}]", snapshot, storage);
292-
final String restoredIndexThirdTime = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
245+
logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage);
246+
final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
293247
mountSnapshot(
294248
repositoryName,
295249
snapshot.getName(),
296250
indexName,
297-
restoredIndexThirdTime,
251+
restoredAgainIndex,
298252
Settings.builder()
299253
.put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true)
300254
.put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false)
301255
.put(SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH, blobCacheMaxLength)
302256
.build(),
303257
storage
304258
);
305-
ensureGreen(restoredIndexThirdTime);
259+
ensureGreen(restoredAgainIndex);
306260

307-
logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredIndexThirdTime);
261+
logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex);
308262
for (final SearchableSnapshotShardStats shardStats : client().execute(
309263
SearchableSnapshotsStatsAction.INSTANCE,
310264
new SearchableSnapshotsStatsRequest()
@@ -314,14 +268,13 @@ public void testBlobStoreCache() throws Exception {
314268
}
315269
}
316270

317-
logger.info("--> verifying number of documents in index [{}]", restoredIndexThirdTime);
318-
assertHitCount(client().prepareSearch(restoredIndexThirdTime).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
271+
logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex);
272+
assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
319273

320274
logger.info("--> verifying that no extra cached blobs were indexed [{}]", SNAPSHOT_BLOB_CACHE_INDEX);
321275
if (numberOfDocs > 0) {
322276
refreshSystemIndex();
323277
}
324-
325278
assertHitCount(
326279
systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).setSize(0).get(),
327280
numberOfCachedBlobs
@@ -347,9 +300,9 @@ public Settings onNodeStopped(String nodeName) throws Exception {
347300
.build();
348301
}
349302
});
350-
ensureGreen(restoredIndexThirdTime);
303+
ensureGreen(restoredAgainIndex);
351304

352-
logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredIndexThirdTime);
305+
logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex);
353306
for (final SearchableSnapshotShardStats shardStats : client().execute(
354307
SearchableSnapshotsStatsAction.INSTANCE,
355308
new SearchableSnapshotsStatsRequest()
@@ -375,8 +328,8 @@ public Settings onNodeStopped(String nodeName) throws Exception {
375328
.getIndexing();
376329
assertThat(indexingStats != null ? indexingStats.getTotal().getIndexCount() : 0L, equalTo(0L));
377330

378-
logger.info("--> verifying number of documents in index [{}]", restoredIndexThirdTime);
379-
assertHitCount(client().prepareSearch(restoredIndexThirdTime).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
331+
logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex);
332+
assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
380333

381334
// TODO also test when the index is frozen
382335
// TODO also test when prewarming is enabled

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java

Lines changed: 68 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -322,68 +322,11 @@ protected void doReadInternal(ByteBuffer b) throws IOException {
322322

323323
if (indexCacheMiss != null) {
324324

325-
// TODO also index footer for compound files if rangeToWrite encapsulates footerBlobCacheByteRange,
326-
// otherwise the blob cache won't be populate for the footer and only the header.
327-
328-
final Releasable onCacheFillComplete = stats.addIndexCacheFill();
329-
final int indexCacheMissLength = toIntBytes(indexCacheMiss.length());
330-
331-
// We assume that we only cache small portions of blobs so that we do not need to:
332-
// - use a BigArrays for allocation
333-
// - use an intermediate copy buffer to read the file in sensibly-sized chunks
334-
// - release the buffer once the indexing operation is complete
335-
final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength);
336-
337-
final StepListener<Integer> readListener = frozenCacheFile.readIfAvailableOrPending(
338-
indexCacheMiss,
339-
(channel, channelPos, relativePos, len) -> {
340-
assert len <= indexCacheMissLength;
341-
342-
if (len == 0) {
343-
return 0;
344-
}
345-
346-
// create slice that is positioned to read the given values
347-
final ByteBuffer dup = byteBuffer.duplicate();
348-
final int newPosition = dup.position() + Math.toIntExact(relativePos);
349-
assert newPosition <= dup.limit() : "newpos " + newPosition + " limit " + dup.limit();
350-
assert newPosition + len <= byteBuffer.limit();
351-
dup.position(newPosition);
352-
dup.limit(newPosition + Math.toIntExact(len));
353-
354-
final int read = channel.read(dup, channelPos);
355-
if (read < 0) {
356-
throw new EOFException("read past EOF. pos [" + relativePos + "] length: [" + len + "]");
357-
}
358-
// NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats
359-
assert read == len;
360-
return read;
361-
}
362-
);
363-
364-
if (readListener == null) {
365-
// Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically
366-
// possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that
367-
// case, simply move on.
368-
onCacheFillComplete.close();
369-
} else {
370-
readListener.whenComplete(read -> {
371-
assert read == indexCacheMissLength;
372-
byteBuffer.position(read); // mark all bytes as accounted for
373-
byteBuffer.flip();
374-
final BytesReference content = BytesReference.fromByteBuffer(byteBuffer);
375-
directory.putCachedBlob(fileName, indexCacheMiss.start(), content, new ActionListener<>() {
376-
@Override
377-
public void onResponse(Void response) {
378-
onCacheFillComplete.close();
379-
}
380-
381-
@Override
382-
public void onFailure(Exception e1) {
383-
onCacheFillComplete.close();
384-
}
385-
});
386-
}, e -> onCacheFillComplete.close());
325+
fillIndexCache(fileName, indexCacheMiss);
326+
if (compoundFileOffset > 0
327+
&& indexCacheMiss.equals(headerBlobCacheByteRange)
328+
&& footerBlobCacheByteRange != ByteRange.EMPTY) {
329+
fillIndexCache(fileName, footerBlobCacheByteRange);
387330
}
388331
}
389332

@@ -408,6 +351,69 @@ public void onFailure(Exception e1) {
408351
readComplete(position, length);
409352
}
410353

354+
private void fillIndexCache(String fileName, ByteRange indexCacheMiss) {
355+
final Releasable onCacheFillComplete = stats.addIndexCacheFill();
356+
final int indexCacheMissLength = toIntBytes(indexCacheMiss.length());
357+
358+
// We assume that we only cache small portions of blobs so that we do not need to:
359+
// - use a BigArrays for allocation
360+
// - use an intermediate copy buffer to read the file in sensibly-sized chunks
361+
// - release the buffer once the indexing operation is complete
362+
final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength);
363+
364+
final StepListener<Integer> readListener = frozenCacheFile.readIfAvailableOrPending(
365+
indexCacheMiss,
366+
(channel, channelPos, relativePos, len) -> {
367+
assert len <= indexCacheMissLength;
368+
369+
if (len == 0) {
370+
return 0;
371+
}
372+
373+
// create slice that is positioned to read the given values
374+
final ByteBuffer dup = byteBuffer.duplicate();
375+
final int newPosition = dup.position() + Math.toIntExact(relativePos);
376+
assert newPosition <= dup.limit() : "newpos " + newPosition + " limit " + dup.limit();
377+
assert newPosition + len <= byteBuffer.limit();
378+
dup.position(newPosition);
379+
dup.limit(newPosition + Math.toIntExact(len));
380+
381+
final int read = channel.read(dup, channelPos);
382+
if (read < 0) {
383+
throw new EOFException("read past EOF. pos [" + relativePos + "] length: [" + len + "]");
384+
}
385+
// NB use Channels.readFromFileChannelWithEofException not readCacheFile() to avoid counting this in the stats
386+
assert read == len;
387+
return read;
388+
}
389+
);
390+
391+
if (readListener == null) {
392+
// Normally doesn't happen, we're already obtaining a range covering all cache misses above, but theoretically
393+
// possible in the case that the real populateAndRead call already failed to obtain this range of the file. In that
394+
// case, simply move on.
395+
onCacheFillComplete.close();
396+
} else {
397+
readListener.whenComplete(read -> {
398+
assert read == indexCacheMissLength;
399+
byteBuffer.position(read); // mark all bytes as accounted for
400+
byteBuffer.flip();
401+
final BytesReference content = BytesReference.fromByteBuffer(byteBuffer);
402+
directory.putCachedBlob(fileName, indexCacheMiss.start(), content, new ActionListener<>() {
403+
@Override
404+
public void onResponse(Void response) {
405+
onCacheFillComplete.close();
406+
}
407+
408+
@Override
409+
public void onFailure(Exception e1) {
410+
onCacheFillComplete.close();
411+
}
412+
});
413+
}, e -> onCacheFillComplete.close());
414+
}
415+
}
416+
411417
private void readComplete(long position, int length) {
412418
stats.incrementBytesRead(lastReadPosition, position, length);
413419
lastReadPosition = position + length;

0 commit comments

Comments
 (0)