Skip to content

Commit

Permalink
Batch translog sync/upload per x ms for remote-backed indexes (#5854)
Browse files Browse the repository at this point in the history
* Batch translog upload per x ms to allow high index throughput

Signed-off-by: Ashish Singh <ssashish@amazon.com>
Co-authored-by: Ashwin Pankaj <appankaj@amazon.com>
Co-authored-by: Laxman Muttineni <muttil@amazon.com>
  • Loading branch information
3 people authored Jan 29, 2023
1 parent d5a87af commit af566e1
Show file tree
Hide file tree
Showing 13 changed files with 541 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -301,6 +302,8 @@ public Iterator<Setting<?>> settings() {

public static final String SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY = "index.remote_store.translog.repository";

public static final String SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL = "index.remote_store.translog.buffer_interval";

/**
* Used to specify if the index data should be persisted in the remote store.
*/
Expand Down Expand Up @@ -446,6 +449,45 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public static final Setting<TimeValue> INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting(
SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL,
TimeValue.timeValueMillis(100),
TimeValue.timeValueMillis(50),
new Setting.Validator<>() {

@Override
public void validate(final TimeValue value) {}

@Override
public void validate(final TimeValue value, final Map<Setting<?>, Object> settings) {
if (value == null) {
throw new IllegalArgumentException(
"Setting " + SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL + " should be provided with a valid time value"
);
} else {
final Boolean isRemoteTranslogStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING);
if (isRemoteTranslogStoreEnabled == null || isRemoteTranslogStoreEnabled == false) {
throw new IllegalArgumentException(
"Setting "
+ SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL
+ " can only be set when "
+ SETTING_REMOTE_TRANSLOG_STORE_ENABLED
+ " is set to true"
);
}
}
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING);
return settings.iterator();
}
},
Property.IndexScope,
Property.Final
);

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
),
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/common/settings/Setting.java
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,23 @@ public static Setting<TimeValue> timeSetting(String key, Setting<TimeValue> fall
return new Setting<>(key, fallbackSetting, (s) -> TimeValue.parseTimeValue(s, key), properties);
}

public static Setting<TimeValue> timeSetting(
String key,
TimeValue defaultValue,
TimeValue minValue,
Validator<TimeValue> validator,
Property... properties
) {
final SimpleKey simpleKey = new SimpleKey(key);
return new Setting<>(
simpleKey,
s -> defaultValue.getStringRep(),
minTimeValueParser(key, minValue, isFiltered(properties)),
validator,
properties
);
}

public static Setting<TimeValue> timeSetting(
String key,
Setting<TimeValue> fallBackSetting,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public abstract class AsyncIOProcessor<Item> {
private final ArrayBlockingQueue<Tuple<Item, Consumer<Exception>>> queue;
private final ThreadContext threadContext;
private final Semaphore promiseSemaphore = new Semaphore(1);
private long lastRunStartTimeInNs;

protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadContext) {
this.logger = logger;
Expand All @@ -67,7 +68,7 @@ protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadCon
/**
* Adds the given item to the queue. The listener is notified once the item is processed
*/
public final void put(Item item, Consumer<Exception> listener) {
public void put(Item item, Consumer<Exception> listener) {
Objects.requireNonNull(item, "item must not be null");
Objects.requireNonNull(listener, "listener must not be null");
// the algorithm here tires to reduce the load on each individual caller.
Expand All @@ -78,12 +79,7 @@ public final void put(Item item, Consumer<Exception> listener) {
final boolean promised = promiseSemaphore.tryAcquire();
if (promised == false) {
// in this case we are not responsible and can just block until there is space
try {
queue.put(new Tuple<>(item, preserveContext(listener)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
listener.accept(e);
}
addToQueue(item, listener);
}

// here we have to try to make the promise again otherwise there is a race when a thread puts an entry without making the promise
Expand All @@ -104,7 +100,17 @@ public final void put(Item item, Consumer<Exception> listener) {
}
}

private void drainAndProcessAndRelease(List<Tuple<Item, Consumer<Exception>>> candidates) {
void addToQueue(Item item, Consumer<Exception> listener) {
try {
queue.put(new Tuple<>(item, preserveContext(listener)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
listener.accept(e);
}
}

void drainAndProcessAndRelease(List<Tuple<Item, Consumer<Exception>>> candidates) {
lastRunStartTimeInNs = System.nanoTime();
Exception exception;
try {
queue.drainTo(candidates);
Expand All @@ -130,7 +136,7 @@ private Exception processList(List<Tuple<Item, Consumer<Exception>>> candidates)
return exception;
}

private void notifyList(List<Tuple<Item, Consumer<Exception>>> candidates, Exception exception) {
void notifyList(List<Tuple<Item, Consumer<Exception>>> candidates, Exception exception) {
for (Tuple<Item, Consumer<Exception>> tuple : candidates) {
Consumer<Exception> consumer = tuple.v2();
try {
Expand All @@ -141,7 +147,7 @@ private void notifyList(List<Tuple<Item, Consumer<Exception>>> candidates, Excep
}
}

private Consumer<Exception> preserveContext(Consumer<Exception> consumer) {
Consumer<Exception> preserveContext(Consumer<Exception> consumer) {
Supplier<ThreadContext.StoredContext> restorableContext = threadContext.newRestorableContext(false);
return e -> {
try (ThreadContext.StoredContext ignore = restorableContext.get()) {
Expand All @@ -154,4 +160,20 @@ private Consumer<Exception> preserveContext(Consumer<Exception> consumer) {
* Writes or processes the items out or to disk.
*/
protected abstract void write(List<Tuple<Item, Consumer<Exception>>> candidates) throws IOException;

Logger getLogger() {
return logger;
}

Semaphore getPromiseSemaphore() {
return promiseSemaphore;
}

long getLastRunStartTimeInNs() {
return lastRunStartTimeInNs;
}

ArrayBlockingQueue<Tuple<Item, Consumer<Exception>>> getQueue() {
return queue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util.concurrent;

import org.apache.logging.log4j.Logger;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

/**
* A variant of {@link AsyncIOProcessor} that allows to batch and buffer processing items at every
* {@link BufferedAsyncIOProcessor#bufferInterval} in a separate threadpool.
* <p>
* Requests are buffered till processor thread calls @{link drainAndProcessAndRelease} after bufferInterval.
* If more requests are enqueued between invocations of drainAndProcessAndRelease, another processor thread
* gets scheduled. Subsequent requests will get buffered till drainAndProcessAndRelease gets called in this new
* processor thread.
*
* @opensearch.internal
*/
public abstract class BufferedAsyncIOProcessor<Item> extends AsyncIOProcessor<Item> {

private final ThreadPool threadpool;
private final TimeValue bufferInterval;

protected BufferedAsyncIOProcessor(
Logger logger,
int queueSize,
ThreadContext threadContext,
ThreadPool threadpool,
TimeValue bufferInterval
) {
super(logger, queueSize, threadContext);
this.threadpool = threadpool;
this.bufferInterval = bufferInterval;
}

@Override
public void put(Item item, Consumer<Exception> listener) {
Objects.requireNonNull(item, "item must not be null");
Objects.requireNonNull(listener, "listener must not be null");
addToQueue(item, listener);
scheduleProcess();
}

private void scheduleProcess() {
if (getQueue().isEmpty() == false && getPromiseSemaphore().tryAcquire()) {
try {
threadpool.schedule(this::process, getBufferInterval(), getBufferRefreshThreadPoolName());
} catch (Exception e) {
getLogger().error("failed to schedule process");
processSchedulingFailure(e);
getPromiseSemaphore().release();
// This is to make sure that any new items that are added to the queue between processSchedulingFailure
// and releasing the semaphore is handled by a subsequent refresh and not starved.
scheduleProcess();
}
}
}

private void processSchedulingFailure(Exception e) {
List<Tuple<Item, Consumer<Exception>>> candidates = new ArrayList<>();
getQueue().drainTo(candidates);
notifyList(candidates, e);
}

private void process() {
drainAndProcessAndRelease(new ArrayList<>());
scheduleProcess();
}

private TimeValue getBufferInterval() {
long timeSinceLastRunStartInNS = System.nanoTime() - getLastRunStartTimeInNs();
if (timeSinceLastRunStartInNS >= bufferInterval.getNanos()) {
return TimeValue.ZERO;
}
return TimeValue.timeValueNanos(bufferInterval.getNanos() - timeSinceLastRunStartInNS);
}

protected abstract String getBufferRefreshThreadPoolName();

}
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ public final class IndexSettings {
private final ReplicationType replicationType;
private final boolean isRemoteStoreEnabled;
private final boolean isRemoteTranslogStoreEnabled;
private final TimeValue remoteTranslogUploadBufferInterval;
private final String remoteStoreTranslogRepository;
private final String remoteStoreRepository;
private final boolean isRemoteSnapshot;
Expand Down Expand Up @@ -753,6 +754,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
remoteTranslogUploadBufferInterval = settings.getAsTime(
IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL,
TimeValue.timeValueMillis(100)
);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);

Expand Down Expand Up @@ -1135,6 +1140,15 @@ public void setTranslogSyncInterval(TimeValue translogSyncInterval) {
this.syncInterval = translogSyncInterval;
}

/**
* Returns the translog sync/upload buffer interval when remote translog store is enabled and index setting
* {@code index.translog.durability} is set as {@code request}.
* @return the buffer interval.
*/
public TimeValue getRemoteTranslogUploadBufferInterval() {
return remoteTranslogUploadBufferInterval;
}

/**
* Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled.
*/
Expand Down
53 changes: 40 additions & 13 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
import org.opensearch.common.util.concurrent.RunOnce;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
Expand Down Expand Up @@ -363,7 +364,13 @@ public IndexShard(
this.indexSortSupplier = indexSortSupplier;
this.indexEventListener = indexEventListener;
this.threadPool = threadPool;
this.translogSyncProcessor = createTranslogSyncProcessor(logger, threadPool.getThreadContext(), this::getEngine);
this.translogSyncProcessor = createTranslogSyncProcessor(
logger,
threadPool,
this::getEngine,
indexSettings.isRemoteTranslogStoreEnabled(),
indexSettings.getRemoteTranslogUploadBufferInterval()
);
this.mapperService = mapperService;
this.indexCache = indexCache;
this.internalIndexingStats = new InternalIndexingStats();
Expand Down Expand Up @@ -3813,21 +3820,41 @@ public List<String> getActiveOperations() {

private static AsyncIOProcessor<Translog.Location> createTranslogSyncProcessor(
Logger logger,
ThreadContext threadContext,
Supplier<Engine> engineSupplier
ThreadPool threadPool,
Supplier<Engine> engineSupplier,
boolean bufferAsyncIoProcessor,
TimeValue bufferInterval
) {
return new AsyncIOProcessor<Translog.Location>(logger, 1024, threadContext) {
ThreadContext threadContext = threadPool.getThreadContext();
CheckedConsumer<List<Tuple<Translog.Location, Consumer<Exception>>>, IOException> writeConsumer = candidates -> {
try {
engineSupplier.get().translogManager().ensureTranslogSynced(candidates.stream().map(Tuple::v1));
} catch (AlreadyClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods
// documentation
} catch (IOException ex) { // if this fails we are in deep shit - fail the request
logger.debug("failed to sync translog", ex);
throw ex;
}
};
if (bufferAsyncIoProcessor) {
return new BufferedAsyncIOProcessor<>(logger, 102400, threadContext, threadPool, bufferInterval) {
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
writeConsumer.accept(candidates);
}

@Override
protected String getBufferRefreshThreadPoolName() {
return ThreadPool.Names.TRANSLOG_SYNC;
}
};
}

return new AsyncIOProcessor<>(logger, 1024, threadContext) {
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
try {
engineSupplier.get().translogManager().ensureTranslogSynced(candidates.stream().map(Tuple::v1));
} catch (AlreadyClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods
// documentation
} catch (IOException ex) { // if this fails we are in deep shit - fail the request
logger.debug("failed to sync translog", ex);
throw ex;
}
writeConsumer.accept(candidates);
}
};
}
Expand Down
Loading

0 comments on commit af566e1

Please sign in to comment.