Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust the length of blob cache docs for Lucene metadata files #69431

Merged
merged 20 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService;

Expand Down Expand Up @@ -116,6 +117,17 @@ protected void mountSnapshot(
String indexName,
String restoredIndexName,
Settings restoredIndexSettings
) throws Exception {
mountSnapshot(repositoryName, snapshotName, indexName, restoredIndexName, restoredIndexSettings, Storage.FULL_COPY);
}

protected void mountSnapshot(
String repositoryName,
String snapshotName,
String indexName,
String restoredIndexName,
Settings restoredIndexSettings,
final Storage storage
) throws Exception {
final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest(
restoredIndexName,
Expand All @@ -128,7 +140,7 @@ protected void mountSnapshot(
.build(),
Strings.EMPTY_ARRAY,
true,
MountSearchableSnapshotRequest.Storage.FULL_COPY
storage
);

final RestoreSnapshotResponse restoreResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexFileNames;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
Expand All @@ -23,15 +24,24 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange;

import java.time.Instant;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -41,14 +51,24 @@ public class BlobStoreCacheService {

private static final Logger logger = LogManager.getLogger(BlobStoreCacheService.class);

public static final int DEFAULT_CACHED_BLOB_SIZE = ByteSizeUnit.KB.toIntBytes(4);
/**
* Before 8.0.0 blobs were cached using a 4KB or 8KB maximum length.
*/
private static final Version OLD_CACHED_BLOB_SIZE_VERSION = Version.V_8_0_0; // TODO adjust after backport

public static final int DEFAULT_CACHED_BLOB_SIZE = ByteSizeUnit.KB.toIntBytes(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this change affect documents in the blob cache that have been created with a previous ES version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delayed response, I had to deal with other duties.

Looking at the current code in 7.x, reducing the size of cached blob might throw an exception in Cached/FrozenIndexInput at this line:

final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(toIntBytes(position), length).iterator();

in the case position is larger than one or two buffered reads.

I suggest

  • to keep the 4KB/8KB limit for non metadata files as long as there is at least a node in version < 7.13 in the cluster
  • to write some BWC test that ensure indices are correctly assigned during rolling upgrades

private static final Cache<String, String> LOG_EXCEEDING_FILES_CACHE = CacheBuilder.<String, String>builder()
.setExpireAfterAccess(TimeValue.timeValueMinutes(60L))
.build();

private final ClusterService clusterService;
private final ThreadPool threadPool;
private final Client client;
private final String index;

public BlobStoreCacheService(ThreadPool threadPool, Client client, String index) {
public BlobStoreCacheService(ClusterService clusterService, ThreadPool threadPool, Client client, String index) {
this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN);
this.clusterService = clusterService;
this.threadPool = threadPool;
this.index = index;
}
Expand Down Expand Up @@ -155,4 +175,112 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}
}

private static final Set<String> METADATA_FILES_EXTENSIONS;
private static final Set<String> OTHER_FILES_EXTENSIONS;
static {
// List of Lucene file extensions that are considered as "metadata" and should therefore be fully cached in the blob store cache.
// Those files are usually fully read by Lucene when it opens a Directory.
METADATA_FILES_EXTENSIONS = Set.of(
"cfe", // compound file's entry table
"dvm", // doc values metadata file
"fdm", // stored fields metadata file
"fnm", // field names metadata file
"kdm", // Lucene 8.6 point format metadata file
"nvm", // norms metadata file
"tmd", // Lucene 8.6 terms metadata file
"tvm", // terms vectors metadata file
"vem" // Lucene 9.0 indexed vectors metadata
);

// List of extensions for which Lucene usually only reads the first 1024 byte and checks a header checksum when opening a Directory.
OTHER_FILES_EXTENSIONS = Set.of(
"cfs",
"dii",
"dim",
"doc",
"dvd",
"fdt",
"fdx",
"kdd",
"kdi",
"liv",
"nvd",
"pay",
"pos",
"tim",
"tip",
"tvd",
"tvx",
"vec"
);
assert Sets.intersection(METADATA_FILES_EXTENSIONS, OTHER_FILES_EXTENSIONS).isEmpty();
}

/**
* Computes the {@link ByteRange} corresponding to the header of a Lucene file. This range can vary depending of the type of the file
* which is indicated by the file's extension. The returned byte range can never be larger than the file's length but it can be smaller.
*
* For files that are declared as metadata files in {@link #METADATA_FILES_EXTENSIONS}, the header can be as large as the specified
* maximum metadata length parameter {@code maxMetadataLength}. Non-metadata files have a fixed length header of maximum 1KB.
*
* @param fileName the name of the file
* @param fileLength the length of the file
* @param maxMetadataLength the maximum accepted length for metadata files
*
* @return the header {@link ByteRange}
*/
public ByteRange computeBlobCacheByteRange(String fileName, long fileLength, ByteSizeValue maxMetadataLength) {
final String fileExtension = IndexFileNames.getExtension(fileName);
assert fileExtension == null || METADATA_FILES_EXTENSIONS.contains(fileExtension) || OTHER_FILES_EXTENSIONS.contains(fileExtension)
: "unknown Lucene file extension [" + fileExtension + "] - should it be considered a metadata file?";

if (useLegacyCachedBlobSizes()) {
if (fileLength <= ByteSizeUnit.KB.toBytes(8L)) {
return ByteRange.of(0L, fileLength);
} else {
return ByteRange.of(0L, ByteSizeUnit.KB.toBytes(4L));
}
}

if (METADATA_FILES_EXTENSIONS.contains(fileExtension)) {
final long maxAllowedLengthInBytes = maxMetadataLength.getBytes();
if (fileLength > maxAllowedLengthInBytes) {
logExceedingFile(fileExtension, fileLength, maxMetadataLength);
}
return ByteRange.of(0L, Math.min(fileLength, maxAllowedLengthInBytes));
}
return ByteRange.of(0L, Math.min(fileLength, DEFAULT_CACHED_BLOB_SIZE));
}

protected boolean useLegacyCachedBlobSizes() {
final Version minNodeVersion = clusterService.state().nodes().getMinNodeVersion();
return minNodeVersion.before(OLD_CACHED_BLOB_SIZE_VERSION);
}

private static void logExceedingFile(String extension, long length, ByteSizeValue maxAllowedLength) {
if (logger.isWarnEnabled()) {
try {
// Use of a cache to prevent too many log traces per hour
LOG_EXCEEDING_FILES_CACHE.computeIfAbsent(extension, key -> {
logger.warn(
"file with extension [{}] is larger ([{}]) than the max. length allowed [{}] to cache metadata files in blob cache",
extension,
length,
maxAllowedLength
);
return key;
});
} catch (ExecutionException e) {
logger.warn(
() -> new ParameterizedMessage(
"Failed to log information about exceeding file type [{}] with length [{}]",
extension,
length
),
e
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,27 @@ public static CachedBlob fromSource(final Map<String, Object> source) {
to.longValue()
);
}

@Override
public String toString() {
return "CachedBlob ["
+ "creationTime="
+ creationTime
+ ", version="
+ version
+ ", repository='"
+ repository
+ '\''
+ ", name='"
+ name
+ '\''
+ ", path='"
+ path
+ '\''
+ ", from="
+ from
+ ", to="
+ to
+ ']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -29,34 +30,41 @@
public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput {

protected final Logger logger;
protected final SearchableSnapshotDirectory directory;
protected final BlobContainer blobContainer;
protected final FileInfo fileInfo;
protected final IOContext context;
protected final IndexInputStats stats;
protected final long offset;
protected final long length;

/** Range of bytes that should be cached in the blob cache for the current index input **/
protected final ByteRange blobCacheByteRange;

// the following are only mutable so they can be adjusted after cloning/slicing
protected volatile boolean isClone;
private AtomicBoolean closed;

public BaseSearchableSnapshotIndexInput(
Logger logger,
String resourceDesc,
BlobContainer blobContainer,
SearchableSnapshotDirectory directory,
FileInfo fileInfo,
IOContext context,
IndexInputStats stats,
long offset,
long length
long length,
ByteRange blobCacheByteRange
) {
super(resourceDesc, context);
this.logger = Objects.requireNonNull(logger);
this.blobContainer = Objects.requireNonNull(blobContainer);
this.directory = Objects.requireNonNull(directory);
this.blobContainer = Objects.requireNonNull(directory.blobContainer());
this.fileInfo = Objects.requireNonNull(fileInfo);
this.context = Objects.requireNonNull(context);
assert fileInfo.metadata().hashEqualsContents() == false
: "this method should only be used with blobs that are NOT stored in metadata's hash field " + "(fileInfo: " + fileInfo + ')';
this.blobCacheByteRange = Objects.requireNonNull(blobCacheByteRange);
this.stats = Objects.requireNonNull(stats);
this.offset = offset;
this.length = length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.LazyInitializable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand All @@ -59,6 +60,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService.FrozenCacheFile;
Expand Down Expand Up @@ -86,6 +88,7 @@

import static org.apache.lucene.store.BufferedIndexInput.bufferSize;
import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
Expand Down Expand Up @@ -136,6 +139,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
private final AtomicBoolean closed;
private final boolean partial;
private final FrozenCacheService frozenCacheService;
private final ByteSizeValue blobStoreCacheMaxLength;

// volatile fields are updated once under `this` lock, all together, iff loaded is not true.
private volatile BlobStoreIndexShardSnapshot snapshot;
Expand Down Expand Up @@ -179,6 +183,7 @@ public SearchableSnapshotDirectory(
this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings));
this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes();
this.blobStoreCachePath = String.join("/", snapshotId.getUUID(), indexId.getId(), String.valueOf(shardId.id()));
this.blobStoreCacheMaxLength = SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH_SETTING.get(indexSettings);
this.threadPool = threadPool;
this.loaded = false;
this.frozenCacheService = frozenCacheService;
Expand Down Expand Up @@ -419,14 +424,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I
);
}
} else {
return new DirectBlobContainerIndexInput(
blobContainer(),
fileInfo,
context,
inputStats,
getUncachedChunkSize(),
bufferSize(context)
);
return new DirectBlobContainerIndexInput(this, fileInfo, context, inputStats, getUncachedChunkSize(), bufferSize(context));
}
}

Expand Down Expand Up @@ -679,10 +677,19 @@ public static SearchableSnapshotDirectory unwrapDirectory(Directory dir) {
return null;
}

public CachedBlob getCachedBlob(String name, long offset, int length) {
final CachedBlob cachedBlob = blobStoreCacheService.get(repository, name, blobStoreCachePath, offset);
assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= offset;
assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || offset + length <= cachedBlob.to();
public ByteRange getBlobCacheByteRange(String fileName, long fileLength) {
return blobStoreCacheService.computeBlobCacheByteRange(fileName, fileLength, blobStoreCacheMaxLength);
}

public CachedBlob getCachedBlob(String name, ByteRange range) {
final CachedBlob cachedBlob = blobStoreCacheService.get(repository, name, blobStoreCachePath, range.start());
if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) {
return cachedBlob;
} else if (cachedBlob.from() != range.start() || cachedBlob.to() != range.end()) {
// expected range in cache might differ with the returned cached blob; this can happen if the range to put in cache is changed
// between versions or through the index setting. In this case we assume it is a cache miss to force the blob to be cached again
return CachedBlob.CACHE_MISS;
}
return cachedBlob;
}

Expand Down
Loading