Skip to content

Commit

Permalink
Add perms for remote snapshot cache eviction on scripted query (opens…
Browse files Browse the repository at this point in the history
…earch-project#14411)

Signed-off-by: Finn Carroll <carrofin@amazon.com>
  • Loading branch information
finnegancarroll authored and wangdongyu.danny committed Aug 22, 2024
1 parent 4e15890 commit 0b811bc
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix NPE in ReplicaShardAllocator ([#14385](https://github.com/opensearch-project/OpenSearch/pull/14385))
- Fix constant_keyword field type used when creating index ([#14807](https://github.com/opensearch-project/OpenSearch/pull/14807))
- Use circuit breaker in InternalHistogram when adding empty buckets ([#14754](https://github.com/opensearch-project/OpenSearch/pull/14754))
- Fix searchable snapshot failure with scripted fields ([#14411](https://github.com/opensearch-project/OpenSearch/pull/14411))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,22 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
final Path key = blobFetchRequest.getFilePath();
logger.trace("fetchBlob called for {}", key.toString());

final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
logger.trace("Transfer Manager - IndexInput closed or not in cache");
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
} else {
logger.trace("Transfer Manager - Already in cache");
// already in the cache and ready to be used (open)
return cachedIndexInput;
}
// We need to do a privileged action here in order to fetch from remote
// and write/evict from local file cache in case this is invoked as a side
// effect of a plugin (such as a scripted search) that doesn't have the
// necessary permissions.
final CachedIndexInput cacheEntry = AccessController.doPrivileged((PrivilegedAction<CachedIndexInput>) () -> {
return fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
logger.trace("Transfer Manager - IndexInput closed or not in cache");
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
} else {
logger.trace("Transfer Manager - Already in cache");
// already in the cache and ready to be used (open)
return cachedIndexInput;
}
});
});

// Cache entry was either retrieved from the cache or newly added, either
Expand All @@ -88,37 +94,31 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio

@SuppressWarnings("removal")
private static FileCachedIndexInput createIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) {
// We need to do a privileged action here in order to fetch from remote
// and write to the local file cache in case this is invoked as a side
// effect of a plugin (such as a scripted search) that doesn't have the
// necessary permissions.
return AccessController.doPrivileged((PrivilegedAction<FileCachedIndexInput>) () -> {
try {
if (Files.exists(request.getFilePath()) == false) {
logger.trace("Fetching from Remote in createIndexInput of Transfer Manager");
try (
OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath());
OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream)
) {
for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) {
try (
InputStream snapshotFileInputStream = streamReader.read(
blobPart.getBlobName(),
blobPart.getPosition(),
blobPart.getLength()
);
) {
snapshotFileInputStream.transferTo(localFileOutputStream);
}
try {
if (Files.exists(request.getFilePath()) == false) {
logger.trace("Fetching from Remote in createIndexInput of Transfer Manager");
try (
OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath());
OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream)
) {
for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) {
try (
InputStream snapshotFileInputStream = streamReader.read(
blobPart.getBlobName(),
blobPart.getPosition(),
blobPart.getLength()
);
) {
snapshotFileInputStream.transferTo(localFileOutputStream);
}
}
}
final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ);
return new FileCachedIndexInput(fileCache, request.getFilePath(), luceneIndexInput);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ);
return new FileCachedIndexInput(fileCache, request.getFilePath(), luceneIndexInput);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/**
Expand Down

0 comments on commit 0b811bc

Please sign in to comment.