Skip to content

Commit

Permalink
Batch translog upload per x ms to allow high index throughput
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Jan 13, 2023
1 parent ff244c0 commit 6a2e359
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@
* @opensearch.internal
*/
public abstract class AsyncIOProcessor<Item> {
private final Logger logger;
private final ArrayBlockingQueue<Tuple<Item, Consumer<Exception>>> queue;
final Logger logger;
final ArrayBlockingQueue<Tuple<Item, Consumer<Exception>>> queue;
private final ThreadContext threadContext;
private final Semaphore promiseSemaphore = new Semaphore(1);
final Semaphore promiseSemaphore = new Semaphore(1);

protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadContext) {
this.logger = logger;
Expand All @@ -67,7 +67,7 @@ protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadCon
/**
* Adds the given item to the queue. The listener is notified once the item is processed
*/
public final void put(Item item, Consumer<Exception> listener) {
public void put(Item item, Consumer<Exception> listener) {
Objects.requireNonNull(item, "item must not be null");
Objects.requireNonNull(listener, "listener must not be null");
// the algorithm here tires to reduce the load on each individual caller.
Expand Down Expand Up @@ -104,7 +104,7 @@ public final void put(Item item, Consumer<Exception> listener) {
}
}

private void drainAndProcessAndRelease(List<Tuple<Item, Consumer<Exception>>> candidates) {
void drainAndProcessAndRelease(List<Tuple<Item, Consumer<Exception>>> candidates) {
Exception exception;
try {
queue.drainTo(candidates);
Expand Down Expand Up @@ -141,7 +141,7 @@ private void notifyList(List<Tuple<Item, Consumer<Exception>>> candidates, Excep
}
}

private Consumer<Exception> preserveContext(Consumer<Exception> consumer) {
Consumer<Exception> preserveContext(Consumer<Exception> consumer) {
Supplier<ThreadContext.StoredContext> restorableContext = threadContext.newRestorableContext(false);
return e -> {
try (ThreadContext.StoredContext ignore = restorableContext.get()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.common.util.concurrent;

import org.apache.logging.log4j.Logger;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public abstract class BufferedAsyncIOProcessor<Item> extends AsyncIOProcessor<Item> {

private final ThreadPool threadpool;
private final TimeValue bufferInterval;

protected BufferedAsyncIOProcessor(
Logger logger,
int queueSize,
ThreadContext threadContext,
ThreadPool threadpool,
TimeValue bufferInterval
) {
super(logger, queueSize, threadContext);
this.threadpool = threadpool;
this.bufferInterval = bufferInterval;
}

@Override
public void put(Item item, Consumer<Exception> listener) {
Objects.requireNonNull(item, "item must not be null");
Objects.requireNonNull(listener, "listener must not be null");

try {
long timeout = getPutBlockingTimeoutMillis();
if (timeout > 0) {
if (!queue.offer(new Tuple<>(item, preserveContext(listener)), timeout, TimeUnit.MILLISECONDS)) {
listener.accept(new OpenSearchRejectedExecutionException("failed to queue request, queue full"));
}
} else {
queue.put(new Tuple<>(item, preserveContext(listener)));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
listener.accept(e);
}

scheduleRefresh();
}

protected long getPutBlockingTimeoutMillis() {
return -1L;
}

private void scheduleRefresh() {
Runnable processor = () -> {
final List<Tuple<Item, Consumer<Exception>>> candidates = new ArrayList<>();
// since we made the promise to process we gotta do it here at least once
drainAndProcessAndRelease(candidates);
while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) {
// yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing
drainAndProcessAndRelease(candidates);
}
};

if (promiseSemaphore.tryAcquire()) {
try {
threadpool.schedule(processor, this.bufferInterval, getBufferRefreshThreadPoolName());
} catch (Exception e) {
logger.error("failed to schedule refresh");
promiseSemaphore.release();
throw e;
}
}
}

protected String getBufferRefreshThreadPoolName() {
return ThreadPool.Names.TRANSLOG_SYNC;
}

}
31 changes: 28 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
import org.opensearch.common.util.concurrent.RunOnce;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
Expand Down Expand Up @@ -363,7 +364,12 @@ public IndexShard(
this.indexSortSupplier = indexSortSupplier;
this.indexEventListener = indexEventListener;
this.threadPool = threadPool;
this.translogSyncProcessor = createTranslogSyncProcessor(logger, threadPool.getThreadContext(), this::getEngine);
this.translogSyncProcessor = createTranslogSyncProcessor(
logger,
threadPool,
this::getEngine,
indexSettings.isRemoteTranslogStoreEnabled()
);
this.mapperService = mapperService;
this.indexCache = indexCache;
this.internalIndexingStats = new InternalIndexingStats();
Expand Down Expand Up @@ -3797,9 +3803,28 @@ public List<String> getActiveOperations() {

private static AsyncIOProcessor<Translog.Location> createTranslogSyncProcessor(
Logger logger,
ThreadContext threadContext,
Supplier<Engine> engineSupplier
ThreadPool threadPool,
Supplier<Engine> engineSupplier,
boolean bufferAsyncIoProcessor
) {
ThreadContext threadContext = threadPool.getThreadContext();
if (bufferAsyncIoProcessor) {
return new BufferedAsyncIOProcessor<Translog.Location>(logger, 1024, threadContext, threadPool, new TimeValue(500)) {
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
try {
engineSupplier.get().translogManager().ensureTranslogSynced(candidates.stream().map(Tuple::v1));
} catch (AlreadyClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods
// documentation
} catch (IOException ex) { // if this fails we are in deep shit - fail the request
logger.debug("failed to sync translog", ex);
throw ex;
}
}
};
}

return new AsyncIOProcessor<Translog.Location>(logger, 1024, threadContext) {
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public static class Names {
public static final String SYSTEM_READ = "system_read";
public static final String SYSTEM_WRITE = "system_write";
public static final String TRANSLOG_TRANSFER = "translog_transfer";
public static final String TRANSLOG_SYNC = "translog_sync";
}

/**
Expand Down Expand Up @@ -174,6 +175,7 @@ public static ThreadPoolType fromType(String type) {
map.put(Names.SYSTEM_READ, ThreadPoolType.FIXED);
map.put(Names.SYSTEM_WRITE, ThreadPoolType.FIXED);
map.put(Names.TRANSLOG_TRANSFER, ThreadPoolType.SCALING);
map.put(Names.TRANSLOG_SYNC, ThreadPoolType.SCALING);
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
}

Expand Down Expand Up @@ -250,6 +252,10 @@ public ThreadPool(
Names.TRANSLOG_TRANSFER,
new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))
);
builders.put(
Names.TRANSLOG_SYNC,
new ScalingExecutorBuilder(Names.TRANSLOG_SYNC, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))
);

for (final ExecutorBuilder<?> builder : customBuilders) {
if (builders.containsKey(builder.name())) {
Expand Down

0 comments on commit 6a2e359

Please sign in to comment.