Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust the length of blob cache docs for Lucene metadata files #69431

Merged
merged 20 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService;

Expand Down Expand Up @@ -68,7 +69,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
builder.put(
CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(),
rarely()
? new ByteSizeValue(randomIntBetween(4, 1024), ByteSizeUnit.KB)
? new ByteSizeValue(randomIntBetween(1, 1024), ByteSizeUnit.KB)
: new ByteSizeValue(randomIntBetween(1, 10), ByteSizeUnit.MB)
);
}
Expand Down Expand Up @@ -116,6 +117,17 @@ protected void mountSnapshot(
String indexName,
String restoredIndexName,
Settings restoredIndexSettings
) throws Exception {
mountSnapshot(repositoryName, snapshotName, indexName, restoredIndexName, restoredIndexSettings, Storage.FULL_COPY);
}

protected void mountSnapshot(
String repositoryName,
String snapshotName,
String indexName,
String restoredIndexName,
Settings restoredIndexSettings,
final Storage storage
) throws Exception {
final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest(
restoredIndexName,
Expand All @@ -128,7 +140,7 @@ protected void mountSnapshot(
.build(),
Strings.EMPTY_ARRAY,
true,
MountSearchableSnapshotRequest.Storage.FULL_COPY
storage
);

final RestoreSnapshotResponse restoreResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexFileNames;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
Expand All @@ -25,13 +26,16 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange;

import java.time.Instant;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -41,7 +45,7 @@ public class BlobStoreCacheService {

private static final Logger logger = LogManager.getLogger(BlobStoreCacheService.class);

public static final int DEFAULT_CACHED_BLOB_SIZE = ByteSizeUnit.KB.toIntBytes(4);
public static final int DEFAULT_CACHED_BLOB_SIZE = ByteSizeUnit.KB.toIntBytes(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this change affect documents in the blob cache that have been created with a previous ES version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delayed response, I had to deal with other duties.

Looking at the current code in 7.x, reducing the size of cached blob might throw an exception in Cached/FrozenIndexInput at this line:

final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(toIntBytes(position), length).iterator();

in the case position is larger than one or two buffered reads.

I suggest

  • to keep the 4KB/8KB limit for non metadata files as long as there is at least a node in version < 7.13 in the cluster
  • to write some BWC test that ensure indices are correctly assigned during rolling upgrades


private final ThreadPool threadPool;
private final Client client;
Expand Down Expand Up @@ -155,4 +159,46 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}
}

private static final Set<String> METADATA_FILES_EXTENSIONS = Set.of(
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
"cfe", // compound file's entry table
"dvm", // doc values metadata file
"fdm", // stored fields metadata file
"fnm", // field names metadata file
"kdm", // Lucene 8.6 point format metadata file
"nvm", // norms metadata file
"tmd", // Lucene 8.6 terms metadata file
"tvm", // terms vectors metadata file
"vem" // Lucene 9.0 indexed vectors metadata
);

/**
* Computes the {@link ByteRange} corresponding to the header of a Lucene file. This range can vary depending of the type of the file
* which is indicated by the file's extension. The returned byte range can never be larger than the file's length but it can be smaller.
*
* For files that are declared as metadata files in {@link #METADATA_FILES_EXTENSIONS}, the header can be as large as the specified
* maximum metadata length parameter {@code maxMetadataLength}. Non-metadata files have a fixed length header of maximum 1KB.
*
* @param fileName the name of the file
* @param fileLength the length of the file
* @param maxMetadataLength the maximum accepted length for metadata files
*
* @return the header {@link ByteRange}
*/
public static ByteRange computeBlobCacheByteRange(String fileName, long fileLength, ByteSizeValue maxMetadataLength) {
final String fileExtension = IndexFileNames.getExtension(fileName);
if (METADATA_FILES_EXTENSIONS.contains(fileExtension)) {
final long maxAllowedLengthInBytes = maxMetadataLength.getBytes();
if (fileLength > maxAllowedLengthInBytes) {
logger.warn(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe too verbose?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a Cache to log this once per filetype (and expire in an hour)

"file of type [{}] and length [{}] is larger than the max. length allowed [{}] to cache metadata files in blob cache",
fileExtension,
fileLength,
maxMetadataLength
);
}
return ByteRange.of(0L, Math.min(fileLength, maxAllowedLengthInBytes));
}
return ByteRange.of(0L, Math.min(fileLength, DEFAULT_CACHED_BLOB_SIZE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,27 @@ public static CachedBlob fromSource(final Map<String, Object> source) {
to.longValue()
);
}

@Override
public String toString() {
return "CachedBlob ["
+ "creationTime="
+ creationTime
+ ", version="
+ version
+ ", repository='"
+ repository
+ '\''
+ ", name='"
+ name
+ '\''
+ ", path='"
+ path
+ '\''
+ ", from="
+ from
+ ", to="
+ to
+ ']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -28,34 +29,41 @@
public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput {

protected final Logger logger;
protected final SearchableSnapshotDirectory directory;
protected final BlobContainer blobContainer;
protected final FileInfo fileInfo;
protected final IOContext context;
protected final IndexInputStats stats;
protected final long offset;
protected final long length;

/** Range of bytes that should be cached in the blob cache for the current index input **/
protected final ByteRange blobCacheByteRange;

// the following are only mutable so they can be adjusted after cloning/slicing
protected volatile boolean isClone;
private AtomicBoolean closed;

public BaseSearchableSnapshotIndexInput(
Logger logger,
String resourceDesc,
BlobContainer blobContainer,
SearchableSnapshotDirectory directory,
FileInfo fileInfo,
IOContext context,
IndexInputStats stats,
long offset,
long length
long length,
ByteRange blobCacheByteRange
) {
super(resourceDesc, context);
this.logger = Objects.requireNonNull(logger);
this.blobContainer = Objects.requireNonNull(blobContainer);
this.directory = Objects.requireNonNull(directory);
this.blobContainer = Objects.requireNonNull(directory.blobContainer());
this.fileInfo = Objects.requireNonNull(fileInfo);
this.context = Objects.requireNonNull(context);
assert fileInfo.metadata().hashEqualsContents() == false
: "this method should only be used with blobs that are NOT stored in metadata's hash field " + "(fileInfo: " + fileInfo + ')';
this.blobCacheByteRange = Objects.requireNonNull(blobCacheByteRange);
this.stats = Objects.requireNonNull(stats);
this.offset = offset;
this.length = length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.LazyInitializable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -86,6 +87,7 @@

import static org.apache.lucene.store.BufferedIndexInput.bufferSize;
import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
Expand Down Expand Up @@ -136,6 +138,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
private final AtomicBoolean closed;
private final boolean partial;
private final FrozenCacheService frozenCacheService;
private final ByteSizeValue blobStoreCacheMaxLength;

// volatile fields are updated once under `this` lock, all together, iff loaded is not true.
private volatile BlobStoreIndexShardSnapshot snapshot;
Expand Down Expand Up @@ -179,6 +182,7 @@ public SearchableSnapshotDirectory(
this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings));
this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes();
this.blobStoreCachePath = String.join("/", snapshotId.getUUID(), indexId.getId(), String.valueOf(shardId.id()));
this.blobStoreCacheMaxLength = SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH_SETTING.get(indexSettings);
this.threadPool = threadPool;
this.loaded = false;
this.frozenCacheService = frozenCacheService;
Expand Down Expand Up @@ -419,14 +423,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I
);
}
} else {
return new DirectBlobContainerIndexInput(
blobContainer(),
fileInfo,
context,
inputStats,
getUncachedChunkSize(),
bufferSize(context)
);
return new DirectBlobContainerIndexInput(this, fileInfo, context, inputStats, getUncachedChunkSize(), bufferSize(context));
}
}

Expand Down Expand Up @@ -455,6 +452,10 @@ public boolean isRecoveryFinalized() {
return stage == RecoveryState.Stage.DONE || stage == RecoveryState.Stage.FINALIZE;
}

public ByteSizeValue getBlobStoreCacheMaxLength() {
return blobStoreCacheMaxLength;
}

@Override
public String toString() {
return this.getClass().getSimpleName() + "(snapshotId=" + snapshotId + ", indexId=" + indexId + " shardId=" + shardId + ')';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.function.Consumer;
import java.util.stream.IntStream;

import static org.elasticsearch.blobstore.cache.BlobStoreCacheService.computeBlobCacheByteRange;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes;

public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput {
Expand All @@ -56,7 +57,6 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
private static final Logger logger = LogManager.getLogger(CachedBlobContainerIndexInput.class);
private static final int COPY_BUFFER_SIZE = ByteSizeUnit.KB.toIntBytes(8);

private final SearchableSnapshotDirectory directory;
private final CacheFileReference cacheFileReference;
private final int defaultRangeSize;
private final int recoveryRangeSize;
Expand Down Expand Up @@ -84,7 +84,8 @@ public CachedBlobContainerIndexInput(
fileInfo.length(),
new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()),
rangeSize,
recoveryRangeSize
recoveryRangeSize,
computeBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length(), directory.getBlobStoreCacheMaxLength())
);
assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth
stats.incrementOpenCount();
Expand All @@ -100,10 +101,10 @@ private CachedBlobContainerIndexInput(
long length,
CacheFileReference cacheFileReference,
int rangeSize,
int recoveryRangeSize
int recoveryRangeSize,
ByteRange blobCacheByteRange
) {
super(logger, resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length);
this.directory = directory;
super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRange);
this.cacheFileReference = cacheFileReference;
this.lastReadPosition = this.offset;
this.lastSeekPosition = this.offset;
Expand Down Expand Up @@ -162,25 +163,18 @@ protected void doReadInternal(ByteBuffer b) throws IOException {

// We try to use the cache index if:
// - the file is small enough to be fully cached
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2;
// - we're reading the first N bytes of the file
final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE);

if (canBeFullyCached || isStartOfFile) {
final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), 0L, length);
if (blobCacheByteRange.contains(position, position + length)) {
final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange.start(), length);

if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) {
if (cachedBlob == CachedBlob.CACHE_MISS
|| cachedBlob == CachedBlob.CACHE_NOT_READY
|| cachedBlob.from() != blobCacheByteRange.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we leave it to the caller to check this and not just declare this a cache_miss?

|| cachedBlob.to() != blobCacheByteRange.end()) {
// We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested
// so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of
// {start, end} where positions are relative to the whole file.

if (canBeFullyCached) {
// if the index input is smaller than twice the size of the blob cache, it will be fully indexed
indexCacheMiss = ByteRange.of(0L, fileInfo.length());
} else {
// the index input is too large to fully cache, so just cache the initial range
indexCacheMiss = ByteRange.of(0L, BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE);
}
indexCacheMiss = blobCacheByteRange;

// We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put.
// TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case.
Expand Down Expand Up @@ -274,11 +268,12 @@ protected void doReadInternal(ByteBuffer b) throws IOException {
final Future<Integer> readFuture = cacheFile.readIfAvailableOrPending(indexCacheMiss, channel -> {
final int indexCacheMissLength = toIntBytes(indexCacheMiss.length());

// TODO: should we use BigArrays?
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
// We assume that we only cache small portions of blobs so that we do not need to:
// - use a BigArrays for allocation
// - use an intermediate copy buffer to read the file in sensibly-sized chunks
// - release the buffer once the indexing operation is complete
assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss;
// assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss;
ywelsch marked this conversation as resolved.
Show resolved Hide resolved

final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength);
Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.start(), byteBuffer);
Expand Down Expand Up @@ -600,7 +595,8 @@ public IndexInput slice(String sliceDescription, long offset, long length) {
length,
cacheFileReference,
defaultRangeSize,
recoveryRangeSize
recoveryRangeSize,
ByteRange.EMPTY // TODO implement blob cache for CFS
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
);
slice.isClone = true;
return slice;
Expand Down
Loading