Skip to content

Commit

Permalink
Add repository metadata integrity check API
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Feb 13, 2023
1 parent 58a6e03 commit 2937ca8
Show file tree
Hide file tree
Showing 13 changed files with 2,594 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Strings;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;

import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;

public class ThrottledIterator<T> implements Releasable {

private static final Logger logger = LogManager.getLogger(ThrottledIterator.class);

/**
* Iterate through the given collection, performing an operation on each item which may fork background tasks, but with a limit on the
* number of such background tasks running concurrently to avoid overwhelming the rest of the system (e.g. starving other work of access
* to an executor).
*
* @param iterator The items to iterate. May be accessed by multiple threads, but accesses are all protected by synchronizing on itself.
* @param itemConsumer The operation to perform on each item. Each operation receives a {@link RefCounted} which can be used to track
* the execution of any background tasks spawned for this item. This operation may run on the thread which
* originally called {@link #run}, if this method has not yet returned. Otherwise it will run on a thread on which a
* background task previously called {@link RefCounted#decRef()} on its ref count. This operation should not throw
* any exceptions.
* @param maxConcurrency The maximum number of ongoing operations at any time.
* @param onItemCompletion Executed when each item is completed, which can be used for instance to report on progress. Must not throw
* exceptions.
* @param onCompletion Executed when all items are completed.
*/
public static <T> void run(
Iterator<T> iterator,
BiConsumer<Releasable, T> itemConsumer,
int maxConcurrency,
Runnable onItemCompletion,
Runnable onCompletion
) {
try (var throttledIterator = new ThrottledIterator<>(iterator, itemConsumer, maxConcurrency, onItemCompletion, onCompletion)) {
throttledIterator.run();
}
}

private final RefCounted throttleRefs;
private final Iterator<T> iterator;
private final BiConsumer<Releasable, T> itemConsumer;
private final Semaphore permits;
private final Runnable onItemCompletion;

private ThrottledIterator(
Iterator<T> iterator,
BiConsumer<Releasable, T> itemConsumer,
int maxConcurrency,
Runnable onItemCompletion,
Runnable onCompletion
) {
this.iterator = Objects.requireNonNull(iterator);
this.itemConsumer = Objects.requireNonNull(itemConsumer);
if (maxConcurrency <= 0) {
throw new IllegalArgumentException("maxConcurrency must be positive");
}
this.permits = new Semaphore(maxConcurrency);
this.onItemCompletion = Objects.requireNonNull(onItemCompletion);
this.throttleRefs = AbstractRefCounted.of(onCompletion);
}

private void run() {
while (permits.tryAcquire()) {
final T item;
synchronized (iterator) {
if (iterator.hasNext()) {
item = iterator.next();
} else {
permits.release();
return;
}
}
try (var itemRefs = new ItemRefCounted()) {
// TODO simplify, there's always exactly two refs?
itemRefs.incRef();
itemConsumer.accept(Releasables.releaseOnce(itemRefs::decRef), item);
} catch (Exception e) {
logger.error(Strings.format("exception when processing [%s] with [%s]", item, itemConsumer), e);
assert false : e;
}
}
}

@Override
public void close() {
throttleRefs.decRef();
}

// A RefCounted for a single item, including protection against calling back into run() if it's created and closed within a single
// invocation of run().
private class ItemRefCounted extends AbstractRefCounted implements Releasable {
private boolean isRecursive = true;

ItemRefCounted() {
throttleRefs.incRef();
}

@Override
protected void closeInternal() {
try {
onItemCompletion.run();
} catch (Exception e) {
logger.error("exception in onItemCompletion", e);
assert false : e;
} finally {
permits.release();
try {
// Someone must now pick up the next item. Here we might be called from the run() invocation which started processing
// the just-completed item (via close() -> decRef()) if that item's processing didn't fork or all its forked tasks
// finished first. If so, there's no need to call run() here, we can just return and the next iteration of the run()
// loop will continue the processing; moreover calling run() in this situation could lead to a stack overflow. However
// if we're not within that run() invocation then ...
if (isRecursive() == false) {
// ... we're not within any other run() invocation either, so it's safe (and necessary) to call run() here.
run();
}
} finally {
throttleRefs.decRef();
}
}
}

// Note on blocking: we call both of these synchronized methods exactly once (and must enter close() before calling isRecursive()).
// If close() releases the last ref and calls closeInternal(), and hence isRecursive(), then there's no other threads involved and
// hence no blocking. In contrast if close() doesn't release the last ref then it exits immediately, so the call to isRecursive()
// will proceed without delay in this case too.

private synchronized boolean isRecursive() {
return isRecursive;
}

@Override
public synchronized void close() {
decRef();
isRecursive = false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.gateway.CorruptStateException;
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentFragment;
Expand Down Expand Up @@ -318,7 +319,10 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
}
case WRITER_UUID -> {
writerUuid = new BytesRef(parser.binaryValue());
assert writerUuid.length > 0;
assert BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED == false || writerUuid.length > 0;
if (writerUuid.length == 0) {
throw new ElasticsearchParseException("invalid (empty) writer uuid");
}
}
default -> XContentParserUtils.throwUnknownField(currentFieldName, parser);
}
Expand All @@ -336,6 +340,11 @@ public static FileInfo fromXContent(XContentParser parser) throws IOException {
} else if (checksum == null) {
throw new ElasticsearchParseException("missing checksum for name [" + name + "]");
}
try {
org.apache.lucene.util.Version.parse(writtenBy);
} catch (Exception e) {
throw new ElasticsearchParseException("invalid written_by [" + writtenBy + "]");
}
return new FileInfo(name, new StoreFileMetadata(physicalName, length, checksum, writtenBy, metaHash, writerUuid), partSize);
}

Expand Down Expand Up @@ -571,6 +580,13 @@ public static BlobStoreIndexShardSnapshot fromXContent(XContentParser parser) th
}
}

if (snapshot == null) {
throw new CorruptStateException("snapshot missing");
}
if (indexVersion < 0) {
throw new CorruptStateException("index version missing or corrupt");
}

return new BlobStoreIndexShardSnapshot(
snapshot,
indexVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

static volatile boolean INTEGRITY_ASSERTIONS_ENABLED = true;

public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token == null) { // New parser
Expand Down Expand Up @@ -309,7 +311,11 @@ public static BlobStoreIndexShardSnapshots fromXContent(XContentParser parser) t
List<FileInfo> fileInfosBuilder = new ArrayList<>();
for (String file : entry.v2()) {
FileInfo fileInfo = files.get(file);
assert fileInfo != null;
if (fileInfo == null) {
final var exception = new IllegalStateException("shard index inconsistent at file [" + file + "]");
assert INTEGRITY_ASSERTIONS_ENABLED == false : exception;
throw exception;
}
fileInfosBuilder.add(fileInfo);
}
snapshots.add(new SnapshotFiles(entry.v1(), Collections.unmodifiableList(fileInfosBuilder), historyUUIDs.get(entry.v1())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ public Collection<SnapshotId> getSnapshotIds() {
return snapshotIds.values();
}

public long getIndexSnapshotCount() {
return indexSnapshots.values().stream().mapToLong(List::size).sum();
}

/**
* @return whether some of the {@link SnapshotDetails} of the given snapshot are missing, due to BwC, so that they must be loaded from
* the {@link SnapshotInfo} blob instead.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.util.concurrent;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.stream.IntStream;

public class ThrottledIteratorTests extends ESTestCase {
private static final String CONSTRAINED = "constrained";
private static final String RELAXED = "relaxed";

public void testConcurrency() throws InterruptedException {
final var maxConstrainedThreads = between(1, 3);
final var maxRelaxedThreads = between(1, 100);
final var constrainedQueue = between(3, 6);
final var threadPool = new TestThreadPool(
"test",
new FixedExecutorBuilder(Settings.EMPTY, CONSTRAINED, maxConstrainedThreads, constrainedQueue, CONSTRAINED, false),
new ScalingExecutorBuilder(RELAXED, 1, maxRelaxedThreads, TimeValue.timeValueSeconds(30), true)
);
try {
final var items = between(1, 10000); // large enough that inadvertent recursion will trigger a StackOverflowError
final var itemStartLatch = new CountDownLatch(items);
final var completedItems = new AtomicInteger();
final var maxConcurrency = between(1, (constrainedQueue + maxConstrainedThreads) * 2);
final var itemPermits = new Semaphore(maxConcurrency);
final var completionLatch = new CountDownLatch(1);
final BooleanSupplier forkSupplier = randomFrom(
() -> false,
ESTestCase::randomBoolean,
LuceneTestCase::rarely,
LuceneTestCase::usually,
() -> true
);
final var blockPermits = new Semaphore(between(0, Math.min(maxRelaxedThreads, maxConcurrency) - 1));

ThrottledIterator.run(IntStream.range(0, items).boxed().iterator(), (releasable, item) -> {
try (var refs = new RefCountingRunnable(releasable::close)) {
assertTrue(itemPermits.tryAcquire());
if (forkSupplier.getAsBoolean()) {
var ref = refs.acquire();
final var executor = randomFrom(CONSTRAINED, RELAXED);
threadPool.executor(executor).execute(new AbstractRunnable() {

@Override
public void onRejection(Exception e) {
assertEquals(CONSTRAINED, executor);
itemStartLatch.countDown();
}

@Override
protected void doRun() {
itemStartLatch.countDown();
if (RELAXED.equals(executor) && randomBoolean() && blockPermits.tryAcquire()) {
// simulate at most (maxConcurrency-1) long-running operations, to demonstrate that they don't
// hold up the processing of the other operations
try {
assertTrue(itemStartLatch.await(30, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new AssertionError("unexpected", e);
} finally {
blockPermits.release();
}
}
}

@Override
public void onAfter() {
itemPermits.release();
ref.close();
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("unexpected", e);
}
});
} else {
itemStartLatch.countDown();
itemPermits.release();
}
}
}, maxConcurrency, completedItems::incrementAndGet, completionLatch::countDown);

assertTrue(completionLatch.await(30, TimeUnit.SECONDS));
assertEquals(items, completedItems.get());
assertTrue(itemPermits.tryAcquire(maxConcurrency));
assertTrue(itemStartLatch.await(0, TimeUnit.SECONDS));
} finally {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.snapshots.blobstore;

import org.elasticsearch.core.Releasable;

public class BlobStoreIndexShardSnapshotsIntegritySuppressor implements Releasable {

public BlobStoreIndexShardSnapshotsIntegritySuppressor() {
BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED = false;
}

@Override
public void close() {
BlobStoreIndexShardSnapshots.INTEGRITY_ASSERTIONS_ENABLED = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class Constants {
"cluster:admin/repository/get",
"cluster:admin/repository/put",
"cluster:admin/repository/verify",
"cluster:admin/repository/verify_integrity",
"cluster:admin/reroute",
"cluster:admin/script/delete",
"cluster:admin/script/get",
Expand Down
Loading

0 comments on commit 2937ca8

Please sign in to comment.