diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java index 72cc0f5ee21d2..dadf98fbc570e 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java @@ -53,10 +53,10 @@ * @opensearch.internal */ public abstract class AsyncIOProcessor { - private final Logger logger; - private final ArrayBlockingQueue>> queue; + final Logger logger; + final ArrayBlockingQueue>> queue; private final ThreadContext threadContext; - private final Semaphore promiseSemaphore = new Semaphore(1); + final Semaphore promiseSemaphore = new Semaphore(1); protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadContext) { this.logger = logger; @@ -67,7 +67,7 @@ protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadCon /** * Adds the given item to the queue. The listener is notified once the item is processed */ - public final void put(Item item, Consumer listener) { + public void put(Item item, Consumer listener) { Objects.requireNonNull(item, "item must not be null"); Objects.requireNonNull(listener, "listener must not be null"); // the algorithm here tires to reduce the load on each individual caller. @@ -104,7 +104,7 @@ public final void put(Item item, Consumer listener) { } } - private void drainAndProcessAndRelease(List>> candidates) { + void drainAndProcessAndRelease(List>> candidates) { Exception exception; try { queue.drainTo(candidates); @@ -141,7 +141,7 @@ private void notifyList(List>> candidates, Excep } } - private Consumer preserveContext(Consumer consumer) { + Consumer preserveContext(Consumer consumer) { Supplier restorableContext = threadContext.newRestorableContext(false); return e -> { try (ThreadContext.StoredContext ignore = restorableContext.get()) { diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java new file mode 100644 index 0000000000000..76b37e86dda27 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java @@ -0,0 +1,114 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.common.util.concurrent; + +import org.apache.logging.log4j.Logger; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +public abstract class BufferedAsyncIOProcessor extends AsyncIOProcessor { + + private final ThreadPool threadpool; + private final TimeValue bufferInterval; + + protected BufferedAsyncIOProcessor( + Logger logger, + int queueSize, + ThreadContext threadContext, + ThreadPool threadpool, + TimeValue bufferInterval + ) { + super(logger, queueSize, threadContext); + this.threadpool = threadpool; + this.bufferInterval = bufferInterval; + } + + @Override + public void put(Item item, Consumer listener) { + Objects.requireNonNull(item, "item must not be null"); + Objects.requireNonNull(listener, "listener must not be null"); + + try { + long timeout = getPutBlockingTimeoutMillis(); + if (timeout > 0) { + if (!queue.offer(new Tuple<>(item, preserveContext(listener)), timeout, TimeUnit.MILLISECONDS)) { + listener.accept(new OpenSearchRejectedExecutionException("failed to queue request, queue full")); + } + } else { + queue.put(new Tuple<>(item, preserveContext(listener))); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + listener.accept(e); + } + + scheduleRefresh(); + } + + protected long getPutBlockingTimeoutMillis() { + return -1L; + } + + private void scheduleRefresh() { + Runnable processor = () -> { + final List>> candidates = new ArrayList<>(); + // since we made the promise to process we gotta do it here at least once + drainAndProcessAndRelease(candidates); + while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { + // yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing + drainAndProcessAndRelease(candidates); + } + }; + + if (promiseSemaphore.tryAcquire()) { + try { + threadpool.schedule(processor, this.bufferInterval, getBufferRefreshThreadPoolName()); + } catch (Exception e) { + logger.error("failed to schedule refresh"); + promiseSemaphore.release(); + throw e; + } + } + } + + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } + +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 8b4f6ce5166eb..c8a39e997461e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -95,6 +95,7 @@ import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.AsyncIOProcessor; +import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor; import org.opensearch.common.util.concurrent.RunOnce; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.set.Sets; @@ -363,7 +364,12 @@ public IndexShard( this.indexSortSupplier = indexSortSupplier; this.indexEventListener = indexEventListener; this.threadPool = threadPool; - this.translogSyncProcessor = createTranslogSyncProcessor(logger, threadPool.getThreadContext(), this::getEngine); + this.translogSyncProcessor = createTranslogSyncProcessor( + logger, + threadPool, + this::getEngine, + indexSettings.isRemoteTranslogStoreEnabled() + ); this.mapperService = mapperService; this.indexCache = indexCache; this.internalIndexingStats = new InternalIndexingStats(); @@ -3797,9 +3803,28 @@ public List getActiveOperations() { private static AsyncIOProcessor createTranslogSyncProcessor( Logger logger, - ThreadContext threadContext, - Supplier engineSupplier + ThreadPool threadPool, + Supplier engineSupplier, + boolean bufferAsyncIoProcessor ) { + ThreadContext threadContext = threadPool.getThreadContext(); + if (bufferAsyncIoProcessor) { + return new BufferedAsyncIOProcessor(logger, 1024, threadContext, threadPool, new TimeValue(500)) { + @Override + protected void write(List>> candidates) throws IOException { + try { + engineSupplier.get().translogManager().ensureTranslogSynced(candidates.stream().map(Tuple::v1)); + } catch (AlreadyClosedException ex) { + // that's fine since we already synced everything on engine close - this also is conform with the methods + // documentation + } catch (IOException ex) { // if this fails we are in deep shit - fail the request + logger.debug("failed to sync translog", ex); + throw ex; + } + } + }; + } + return new AsyncIOProcessor(logger, 1024, threadContext) { @Override protected void write(List>> candidates) throws IOException { diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 6f886de9ee88f..c731529b36145 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -109,6 +109,7 @@ public static class Names { public static final String SYSTEM_READ = "system_read"; public static final String SYSTEM_WRITE = "system_write"; public static final String TRANSLOG_TRANSFER = "translog_transfer"; + public static final String TRANSLOG_SYNC = "translog_sync"; } /** @@ -174,6 +175,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.SYSTEM_READ, ThreadPoolType.FIXED); map.put(Names.SYSTEM_WRITE, ThreadPoolType.FIXED); map.put(Names.TRANSLOG_TRANSFER, ThreadPoolType.SCALING); + map.put(Names.TRANSLOG_SYNC, ThreadPoolType.SCALING); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -250,6 +252,10 @@ public ThreadPool( Names.TRANSLOG_TRANSFER, new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) ); + builders.put( + Names.TRANSLOG_SYNC, + new ScalingExecutorBuilder(Names.TRANSLOG_SYNC, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) + ); for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index cd941feb37002..d68794c5bbec2 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -133,6 +133,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessorsMaxTen); + sizes.put(ThreadPool.Names.TRANSLOG_SYNC, ThreadPool::halfAllocatedProcessorsMaxTen); return sizes.get(threadPoolName).apply(numberOfProcessors); }