From d5ba44868870163118f82ba97ff973140963de92 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 13 Jan 2023 18:40:53 +0530 Subject: [PATCH 01/17] Batch translog upload per x ms to allow high index throughput Signed-off-by: Ashish Singh --- .../common/settings/IndexScopedSettings.java | 1 + .../util/concurrent/AsyncIOProcessor.java | 12 +- .../concurrent/BufferedAsyncIOProcessor.java | 123 ++++++++++++++++++ .../org/opensearch/index/IndexSettings.java | 26 ++++ .../opensearch/index/shard/IndexShard.java | 33 ++++- .../org/opensearch/threadpool/ThreadPool.java | 6 + .../threadpool/ScalingThreadPoolTests.java | 1 + 7 files changed, 193 insertions(+), 9 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 1efce2eba8867..5236c64798c6a 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -144,6 +144,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.MAX_ANALYZED_OFFSET_SETTING, IndexSettings.MAX_TERMS_COUNT_SETTING, IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, + IndexSettings.INDEX_REMOTE_STORE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndexSettings.DEFAULT_FIELD_SETTING, IndexSettings.QUERY_STRING_LENIENT_SETTING, IndexSettings.ALLOW_UNMAPPED, diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java index 72cc0f5ee21d2..dadf98fbc570e 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java @@ -53,10 +53,10 @@ * @opensearch.internal */ public abstract class AsyncIOProcessor { - private final Logger logger; - private final ArrayBlockingQueue>> queue; + final Logger logger; + final ArrayBlockingQueue>> queue; private final ThreadContext threadContext; - private final Semaphore promiseSemaphore = new Semaphore(1); + final Semaphore promiseSemaphore = new Semaphore(1); protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadContext) { this.logger = logger; @@ -67,7 +67,7 @@ protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadCon /** * Adds the given item to the queue. The listener is notified once the item is processed */ - public final void put(Item item, Consumer listener) { + public void put(Item item, Consumer listener) { Objects.requireNonNull(item, "item must not be null"); Objects.requireNonNull(listener, "listener must not be null"); // the algorithm here tires to reduce the load on each individual caller. @@ -104,7 +104,7 @@ public final void put(Item item, Consumer listener) { } } - private void drainAndProcessAndRelease(List>> candidates) { + void drainAndProcessAndRelease(List>> candidates) { Exception exception; try { queue.drainTo(candidates); @@ -141,7 +141,7 @@ private void notifyList(List>> candidates, Excep } } - private Consumer preserveContext(Consumer consumer) { + Consumer preserveContext(Consumer consumer) { Supplier restorableContext = threadContext.newRestorableContext(false); return e -> { try (ThreadContext.StoredContext ignore = restorableContext.get()) { diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java new file mode 100644 index 0000000000000..3b8a99f7e00a2 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.common.util.concurrent; + +import org.apache.logging.log4j.Logger; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * A variant of AsyncIOProcessor which will wait for interval before processing items + * processing happens in another thread from same threadpool after {@link #bufferInterval} + * + * Requests are buffered till processor thread calls @{link drainAndProcessAndRelease} after bufferInterval. + * If more requests are enequed between invocations of drainAndProcessAndRelease, another processor thread + * gets scheduled. Subsequent requests will get buffered till drainAndProcessAndRelease gets called in this new + * processor thread. + */ +public abstract class BufferedAsyncIOProcessor extends AsyncIOProcessor { + + private final ThreadPool threadpool; + private final TimeValue bufferInterval; + + protected BufferedAsyncIOProcessor( + Logger logger, + int queueSize, + ThreadContext threadContext, + ThreadPool threadpool, + TimeValue bufferInterval + ) { + super(logger, queueSize, threadContext); + this.threadpool = threadpool; + this.bufferInterval = bufferInterval; + } + + @Override + public void put(Item item, Consumer listener) { + Objects.requireNonNull(item, "item must not be null"); + Objects.requireNonNull(listener, "listener must not be null"); + + try { + long timeout = getPutBlockingTimeoutMillis(); + if (timeout > 0) { + if (!queue.offer(new Tuple<>(item, preserveContext(listener)), timeout, TimeUnit.MILLISECONDS)) { + listener.accept(new OpenSearchRejectedExecutionException("failed to queue request, queue full")); + } + } else { + queue.put(new Tuple<>(item, preserveContext(listener))); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + listener.accept(e); + } + + scheduleRefresh(); + } + + protected long getPutBlockingTimeoutMillis() { + return -1L; + } + + private void scheduleRefresh() { + Runnable processor = () -> { + final List>> candidates = new ArrayList<>(); + // since we made the promise to process we gotta do it here at least once + drainAndProcessAndRelease(candidates); + while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { + // yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing + drainAndProcessAndRelease(candidates); + } + }; + + if (promiseSemaphore.tryAcquire()) { + try { + threadpool.schedule(processor, this.bufferInterval, getBufferRefreshThreadPoolName()); + } catch (Exception e) { + logger.error("failed to schedule refresh"); + promiseSemaphore.release(); + throw e; + } + } + } + + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } + +} diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index dc54ace237070..3a6b6437be2ba 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -115,6 +115,13 @@ public final class IndexSettings { Property.Dynamic, Property.IndexScope ); + public static final Setting INDEX_REMOTE_STORE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting( + "index.remote_store.translog.buffer_interval", + TimeValue.timeValueMillis(100), + TimeValue.timeValueMillis(1), + Property.Dynamic, + Property.IndexScope + ); public static final Setting INDEX_SEARCH_IDLE_AFTER = Setting.timeSetting( "index.search.idle.after", TimeValue.timeValueSeconds(30), @@ -601,6 +608,7 @@ public final class IndexSettings { private final boolean defaultAllowUnmappedFields; private volatile Translog.Durability durability; private volatile TimeValue syncInterval; + private volatile TimeValue bufferInterval; private volatile TimeValue refreshInterval; private volatile ByteSizeValue flushThresholdSize; private volatile TimeValue translogRetentionAge; @@ -770,6 +778,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING); defaultFields = scopedSettings.get(DEFAULT_FIELD_SETTING); syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings); + bufferInterval = INDEX_REMOTE_STORE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); @@ -845,6 +854,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval); + scopedSettings.addSettingsUpdateConsumer(INDEX_REMOTE_STORE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setBufferInterval); scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow); scopedSettings.addSettingsUpdateConsumer(MAX_INNER_RESULT_WINDOW_SETTING, this::setMaxInnerResultWindow); scopedSettings.addSettingsUpdateConsumer(MAX_ADJACENCY_MATRIX_FILTERS_SETTING, this::setMaxAdjacencyMatrixFilters); @@ -1135,6 +1145,22 @@ public void setTranslogSyncInterval(TimeValue translogSyncInterval) { this.syncInterval = translogSyncInterval; } + /** + * TODO + * @return + */ + public TimeValue getBufferInterval() { + return bufferInterval; + } + + /** + * TODO + * @param bufferInterval + */ + public void setBufferInterval(TimeValue bufferInterval) { + this.bufferInterval = bufferInterval; + } + /** * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled. */ diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 96a54ab65d268..73db50014802d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -95,6 +95,7 @@ import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.AsyncIOProcessor; +import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor; import org.opensearch.common.util.concurrent.RunOnce; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.set.Sets; @@ -364,7 +365,13 @@ public IndexShard( this.indexSortSupplier = indexSortSupplier; this.indexEventListener = indexEventListener; this.threadPool = threadPool; - this.translogSyncProcessor = createTranslogSyncProcessor(logger, threadPool.getThreadContext(), this::getEngine); + this.translogSyncProcessor = createTranslogSyncProcessor( + logger, + threadPool, + this::getEngine, + indexSettings.isRemoteTranslogStoreEnabled(), + indexSettings.getBufferInterval() + ); this.mapperService = mapperService; this.indexCache = indexCache; this.internalIndexingStats = new InternalIndexingStats(); @@ -3808,9 +3815,29 @@ public List getActiveOperations() { private static AsyncIOProcessor createTranslogSyncProcessor( Logger logger, - ThreadContext threadContext, - Supplier engineSupplier + ThreadPool threadPool, + Supplier engineSupplier, + boolean bufferAsyncIoProcessor, + TimeValue bufferInterval ) { + ThreadContext threadContext = threadPool.getThreadContext(); + if (bufferAsyncIoProcessor) { + return new BufferedAsyncIOProcessor<>(logger, 102400, threadContext, threadPool, bufferInterval) { + @Override + protected void write(List>> candidates) throws IOException { + try { + engineSupplier.get().translogManager().ensureTranslogSynced(candidates.stream().map(Tuple::v1)); + } catch (AlreadyClosedException ex) { + // that's fine since we already synced everything on engine close - this also is conform with the methods + // documentation + } catch (IOException ex) { // if this fails we are in deep shit - fail the request + logger.debug("failed to sync translog", ex); + throw ex; + } + } + }; + } + return new AsyncIOProcessor(logger, 1024, threadContext) { @Override protected void write(List>> candidates) throws IOException { diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 6f886de9ee88f..c731529b36145 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -109,6 +109,7 @@ public static class Names { public static final String SYSTEM_READ = "system_read"; public static final String SYSTEM_WRITE = "system_write"; public static final String TRANSLOG_TRANSFER = "translog_transfer"; + public static final String TRANSLOG_SYNC = "translog_sync"; } /** @@ -174,6 +175,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.SYSTEM_READ, ThreadPoolType.FIXED); map.put(Names.SYSTEM_WRITE, ThreadPoolType.FIXED); map.put(Names.TRANSLOG_TRANSFER, ThreadPoolType.SCALING); + map.put(Names.TRANSLOG_SYNC, ThreadPoolType.SCALING); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -250,6 +252,10 @@ public ThreadPool( Names.TRANSLOG_TRANSFER, new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) ); + builders.put( + Names.TRANSLOG_SYNC, + new ScalingExecutorBuilder(Names.TRANSLOG_SYNC, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) + ); for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index cd941feb37002..d68794c5bbec2 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -133,6 +133,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessorsMaxTen); + sizes.put(ThreadPool.Names.TRANSLOG_SYNC, ThreadPool::halfAllocatedProcessorsMaxTen); return sizes.get(threadPoolName).apply(numberOfProcessors); } From 4cfc52ee6b2226a618584590f3525b0462a182fa Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 16 Jan 2023 14:47:48 +0530 Subject: [PATCH 02/17] Add delay optimisations & log time taken for upload Signed-off-by: Ashish Singh --- .../util/concurrent/AsyncIOProcessor.java | 17 ++++++++++---- .../concurrent/BufferedAsyncIOProcessor.java | 23 +++++++++---------- .../index/translog/RemoteFsTranslog.java | 8 +++++++ 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java index dadf98fbc570e..2e4438e0909aa 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java @@ -57,6 +57,7 @@ public abstract class AsyncIOProcessor { final ArrayBlockingQueue>> queue; private final ThreadContext threadContext; final Semaphore promiseSemaphore = new Semaphore(1); + long lastRunStartTimeInMs; protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadContext) { this.logger = logger; @@ -95,22 +96,28 @@ public void put(Item item, Consumer listener) { // no need to preserve context for listener since it runs in current thread. candidates.add(new Tuple<>(item, listener)); } - // since we made the promise to process we gotta do it here at least once + process(candidates); + } + } + + protected void process(List>> candidates) { + // since we made the promise to process we gotta do it here at least once + drainAndProcessAndRelease(candidates); + while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { + // yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing drainAndProcessAndRelease(candidates); - while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { - // yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing - drainAndProcessAndRelease(candidates); - } } } void drainAndProcessAndRelease(List>> candidates) { + lastRunStartTimeInMs = System.currentTimeMillis(); Exception exception; try { queue.drainTo(candidates); exception = processList(candidates); } finally { promiseSemaphore.release(); + logger.info("step=drainAndProcessAndRelease timeTakenInMs={}", (System.currentTimeMillis() - lastRunStartTimeInMs)); } notifyList(candidates, exception); candidates.clear(); diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java index 3b8a99f7e00a2..b11f2a0f78843 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java @@ -37,7 +37,6 @@ import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; -import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -45,7 +44,7 @@ /** * A variant of AsyncIOProcessor which will wait for interval before processing items * processing happens in another thread from same threadpool after {@link #bufferInterval} - * + *

* Requests are buffered till processor thread calls @{link drainAndProcessAndRelease} after bufferInterval. * If more requests are enequed between invocations of drainAndProcessAndRelease, another processor thread * gets scheduled. Subsequent requests will get buffered till drainAndProcessAndRelease gets called in this new @@ -95,19 +94,11 @@ protected long getPutBlockingTimeoutMillis() { } private void scheduleRefresh() { - Runnable processor = () -> { - final List>> candidates = new ArrayList<>(); - // since we made the promise to process we gotta do it here at least once - drainAndProcessAndRelease(candidates); - while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { - // yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing - drainAndProcessAndRelease(candidates); - } - }; + Runnable processor = () -> { process(new ArrayList<>()); }; if (promiseSemaphore.tryAcquire()) { try { - threadpool.schedule(processor, this.bufferInterval, getBufferRefreshThreadPoolName()); + threadpool.schedule(processor, getBufferInterval(), getBufferRefreshThreadPoolName()); } catch (Exception e) { logger.error("failed to schedule refresh"); promiseSemaphore.release(); @@ -116,6 +107,14 @@ private void scheduleRefresh() { } } + private TimeValue getBufferInterval() { + long timeSinceLastRunStartInMS = System.currentTimeMillis() - lastRunStartTimeInMs; + if (timeSinceLastRunStartInMS >= bufferInterval.getMillis()) { + return TimeValue.ZERO; + } + return TimeValue.timeValueMillis(bufferInterval.getMillis() - timeSinceLastRunStartInMS); + } + protected String getBufferRefreshThreadPoolName() { return ThreadPool.Names.TRANSLOG_SYNC; } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 0cfaa5234c1fe..45a6008fa9f41 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -202,6 +202,7 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc } private boolean upload(Long primaryTerm, Long generation) throws IOException { + long uploadStartTime = System.currentTimeMillis(); // During primary relocation (primary-primary peer recovery), both the old and the new primary have engine // created with the RemoteFsTranslog. Both primaries are equipped to upload the translogs. The primary mode check // below ensures that the real primary only is uploading. Before the primary mode is set as true for the new @@ -231,11 +232,13 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti closeFilesIfNoPendingRetentionLocks(); maxRemoteTranslogGenerationUploaded = generation; logger.trace("uploaded translog for {} {} ", primaryTerm, generation); + logUploadStats(uploadStartTime, true); } @Override public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException { transferReleasable.close(); + logUploadStats(uploadStartTime, false); closeFilesIfNoPendingRetentionLocks(); if (ex instanceof IOException) { throw (IOException) ex; @@ -248,6 +251,11 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro } + private void logUploadStats(long uploadStartTime, boolean uploadStatus) { + logger.info("Translog Upload status={} timeTaken={}", uploadStatus, (System.currentTimeMillis() - uploadStartTime)); + + } + // Visible for testing public Set allUploaded() { return fileTransferTracker.allUploaded(); From 6d383df77d5581f8d089d5ea9b2c69bd4b93e3f3 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 16 Jan 2023 16:03:28 +0530 Subject: [PATCH 03/17] Change log level to debug Signed-off-by: Ashish Singh --- .../org/opensearch/common/util/concurrent/AsyncIOProcessor.java | 2 +- .../java/org/opensearch/index/translog/RemoteFsTranslog.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java index 2e4438e0909aa..00373aba076b1 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java @@ -117,7 +117,7 @@ void drainAndProcessAndRelease(List>> candidates exception = processList(candidates); } finally { promiseSemaphore.release(); - logger.info("step=drainAndProcessAndRelease timeTakenInMs={}", (System.currentTimeMillis() - lastRunStartTimeInMs)); + logger.debug("step=drainAndProcessAndRelease timeTakenInMs={}", (System.currentTimeMillis() - lastRunStartTimeInMs)); } notifyList(candidates, exception); candidates.clear(); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 45a6008fa9f41..1c3eb3284ff6d 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -252,7 +252,7 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro } private void logUploadStats(long uploadStartTime, boolean uploadStatus) { - logger.info("Translog Upload status={} timeTaken={}", uploadStatus, (System.currentTimeMillis() - uploadStartTime)); + logger.debug("Translog Upload status={} timeTaken={}", uploadStatus, (System.currentTimeMillis() - uploadStartTime)); } From 1f8736cebd94472c41a3d48d62d3458e79418a34 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 19 Jan 2023 13:47:35 +0530 Subject: [PATCH 04/17] Move setting from Built-in to feature flag Signed-off-by: Ashish Singh --- .../cluster/metadata/IndexMetadata.java | 42 +++++++++++++++++++ .../common/settings/IndexScopedSettings.java | 4 +- .../opensearch/common/settings/Setting.java | 27 ++++++++++++ .../org/opensearch/index/IndexSettings.java | 25 ++++------- .../opensearch/index/IndexSettingsTests.java | 28 +++++++++++++ ...overyWithRemoteTranslogOnPrimaryTests.java | 1 + ...teStorePeerRecoverySourceHandlerTests.java | 1 + 7 files changed, 110 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 0161f4376c168..376ecd6d6e9d2 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -59,6 +59,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.ToXContent; import org.opensearch.common.xcontent.ToXContentFragment; import org.opensearch.common.xcontent.XContentBuilder; @@ -301,6 +302,8 @@ public Iterator> settings() { public static final String SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY = "index.remote_store.translog.repository"; + public static final String SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL = "index.remote_store.translog.buffer_interval"; + /** * Used to specify if the index data should be persisted in the remote store. */ @@ -446,6 +449,45 @@ public Iterator> settings() { Property.Final ); + public static final Setting INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting( + SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, + TimeValue.timeValueMillis(100), + TimeValue.timeValueMillis(50), + new Setting.Validator<>() { + + @Override + public void validate(final TimeValue value) {} + + @Override + public void validate(final TimeValue value, final Map, Object> settings) { + if (value == null) { + throw new IllegalArgumentException( + "Setting " + SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL + " should be provided with a valid time value" + ); + } else { + final Boolean isRemoteTranslogStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING); + if (isRemoteTranslogStoreEnabled == null || isRemoteTranslogStoreEnabled == false) { + throw new IllegalArgumentException( + "Settings " + + SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL + + " can only be set when " + + SETTING_REMOTE_TRANSLOG_STORE_ENABLED + + " is set to true" + ); + } + } + } + + @Override + public Iterator> settings() { + final List> settings = Collections.singletonList(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING); + return settings.iterator(); + } + }, + Property.Dynamic, + Property.IndexScope + ); + public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; public static final Setting INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING; diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 5236c64798c6a..a81c330177129 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -144,7 +144,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.MAX_ANALYZED_OFFSET_SETTING, IndexSettings.MAX_TERMS_COUNT_SETTING, IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, - IndexSettings.INDEX_REMOTE_STORE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndexSettings.DEFAULT_FIELD_SETTING, IndexSettings.QUERY_STRING_LENIENT_SETTING, IndexSettings.ALLOW_UNMAPPED, @@ -228,7 +227,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING, IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING, IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING, - IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING + IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING, + IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING ), FeatureFlags.SEARCHABLE_SNAPSHOT, List.of( diff --git a/server/src/main/java/org/opensearch/common/settings/Setting.java b/server/src/main/java/org/opensearch/common/settings/Setting.java index f86fe6771dfcd..2e05a29822ca6 100644 --- a/server/src/main/java/org/opensearch/common/settings/Setting.java +++ b/server/src/main/java/org/opensearch/common/settings/Setting.java @@ -2073,6 +2073,23 @@ public static Setting timeSetting( ); } + public static Setting timeSetting( + final String key, + Function defaultValue, + final TimeValue minValue, + final Validator validator, + final Property... properties + ) { + final SimpleKey simpleKey = new SimpleKey(key); + return new Setting<>( + simpleKey, + s -> defaultValue.apply(s).getStringRep(), + minTimeValueParser(key, minValue, isFiltered(properties)), + validator, + properties + ); + } + public static Setting timeSetting( final String key, Function defaultValue, @@ -2173,6 +2190,16 @@ public static Setting timeSetting(String key, Setting fall return new Setting<>(key, fallbackSetting, (s) -> TimeValue.parseTimeValue(s, key), properties); } + public static Setting timeSetting( + String key, + TimeValue defaultValue, + TimeValue minValue, + Validator validator, + Property... properties + ) { + return timeSetting(key, (s) -> defaultValue, minValue, validator, properties); + } + public static Setting timeSetting( String key, Setting fallBackSetting, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 3a6b6437be2ba..dbabc55520ced 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -115,13 +115,6 @@ public final class IndexSettings { Property.Dynamic, Property.IndexScope ); - public static final Setting INDEX_REMOTE_STORE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting( - "index.remote_store.translog.buffer_interval", - TimeValue.timeValueMillis(100), - TimeValue.timeValueMillis(1), - Property.Dynamic, - Property.IndexScope - ); public static final Setting INDEX_SEARCH_IDLE_AFTER = Setting.timeSetting( "index.search.idle.after", TimeValue.timeValueSeconds(30), @@ -593,6 +586,7 @@ public final class IndexSettings { private final ReplicationType replicationType; private final boolean isRemoteStoreEnabled; private final boolean isRemoteTranslogStoreEnabled; + private volatile TimeValue bufferInterval; private final String remoteStoreTranslogRepository; private final String remoteStoreRepository; private final boolean isRemoteSnapshot; @@ -608,7 +602,6 @@ public final class IndexSettings { private final boolean defaultAllowUnmappedFields; private volatile Translog.Durability durability; private volatile TimeValue syncInterval; - private volatile TimeValue bufferInterval; private volatile TimeValue refreshInterval; private volatile ByteSizeValue flushThresholdSize; private volatile TimeValue translogRetentionAge; @@ -761,6 +754,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false); remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY); + bufferInterval = settings.getAsTime(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, TimeValue.timeValueMillis(100)); remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings); @@ -778,7 +772,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING); defaultFields = scopedSettings.get(DEFAULT_FIELD_SETTING); syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings); - bufferInterval = INDEX_REMOTE_STORE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); @@ -854,7 +847,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval); - scopedSettings.addSettingsUpdateConsumer(INDEX_REMOTE_STORE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setBufferInterval); scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow); scopedSettings.addSettingsUpdateConsumer(MAX_INNER_RESULT_WINDOW_SETTING, this::setMaxInnerResultWindow); scopedSettings.addSettingsUpdateConsumer(MAX_ADJACENCY_MATRIX_FILTERS_SETTING, this::setMaxAdjacencyMatrixFilters); @@ -893,6 +885,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime); scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled); scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy); + + if (isRemoteTranslogStoreEnabled && FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { + scopedSettings.addSettingsUpdateConsumer(IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setBufferInterval); + } } private void setSearchIdleAfter(TimeValue searchIdleAfter) { @@ -1146,17 +1142,14 @@ public void setTranslogSyncInterval(TimeValue translogSyncInterval) { } /** - * TODO - * @return + * Returns the translog sync/upload buffer interval when remote translog store is enabled and index setting + * {@code index.translog.durability} is set as {@code request}. + * @return the buffer interval. */ public TimeValue getBufferInterval() { return bufferInterval; } - /** - * TODO - * @param bufferInterval - */ public void setBufferInterval(TimeValue bufferInterval) { this.bufferInterval = bufferInterval; } diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 957eb337f5c85..77887432aea69 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -964,11 +964,13 @@ public void testRemoteTranslogExplicitSetting() { .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "tlog-store") + .put(IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "200ms") .build() ); IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); assertNull(settings.getRemoteStoreRepository()); assertEquals("tlog-store", settings.getRemoteStoreTranslogRepository()); + assertEquals(TimeValue.timeValueMillis(200), settings.getBufferInterval()); } public void testSetRemoteTranslogRepositoryFailsWhenRemoteTranslogIsNotEnabled() { @@ -1000,6 +1002,32 @@ public void testSetRemoteTranslogRepositoryFailsWhenEmptyString() { assertEquals("Setting index.remote_store.translog.repository should be provided with non-empty repository ID", iae.getMessage()); } + public void testSetRemoteTranslogBufferIntervalDefaultSetting() { + Version createdVersion = VersionUtils.randomVersionBetween(random(), Version.V_2_0_0, Version.CURRENT); + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), createdVersion) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .build(); + assertTrue(IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)); + assertEquals(TimeValue.timeValueMillis(100), IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings)); + } + + public void testSetRemoteTranslogBufferIntervalFailsWhenRemoteTranslogIsNotEnabled() { + Settings indexSettings = Settings.builder() + .put("index.replication.type", ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false) + .put(IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "200ms") + .build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(indexSettings) + ); + assertEquals( + "Settings index.remote_store.translog.buffer_interval can only be set when index.remote_store.translog.enabled is set to true", + iae.getMessage() + ); + } + @SuppressForbidden(reason = "sets the SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY feature flag") public void testExtendedCompatibilityVersionForRemoteSnapshot() throws Exception { try (FeatureFlagSetter f = FeatureFlagSetter.set(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) { diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index ba707cc30e6b8..d76afca51e354 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -36,6 +36,7 @@ public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchI .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "100ms") .build(); public void testStartSequenceForReplicaRecovery() throws Exception { diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index 465629406b54b..33c748f46bd86 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -23,6 +23,7 @@ public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLe .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "100ms") .build(); public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { From 08a2879c9ad48bcf115576d7a1bd5662ee183364 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 23 Jan 2023 14:26:10 +0530 Subject: [PATCH 05/17] Add UTs Signed-off-by: Ashish Singh --- .../org/opensearch/index/IndexSettings.java | 2 +- .../opensearch/index/IndexSettingsTests.java | 43 +++++++++++++++++-- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index dbabc55520ced..c40201713120a 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -886,7 +886,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled); scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy); - if (isRemoteTranslogStoreEnabled && FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE) && isRemoteTranslogStoreEnabled) { scopedSettings.addSettingsUpdateConsumer(IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setBufferInterval); } } diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 77887432aea69..350080e2402e7 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -1008,15 +1008,14 @@ public void testSetRemoteTranslogBufferIntervalDefaultSetting() { .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), createdVersion) .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) .build(); - assertTrue(IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)); assertEquals(TimeValue.timeValueMillis(100), IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings)); } public void testSetRemoteTranslogBufferIntervalFailsWhenRemoteTranslogIsNotEnabled() { Settings indexSettings = Settings.builder() - .put("index.replication.type", ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false) - .put(IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "200ms") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "200ms") .build(); IllegalArgumentException iae = expectThrows( IllegalArgumentException.class, @@ -1028,6 +1027,44 @@ public void testSetRemoteTranslogBufferIntervalFailsWhenRemoteTranslogIsNotEnabl ); } + public void testSetRemoteTranslogBufferIntervalFailsWhenEmpty() { + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "") + .build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(indexSettings) + ); + assertEquals( + "failed to parse setting [index.remote_store.translog.buffer_interval] with value [] as a time value: unit is missing or unrecognized", + iae.getMessage() + ); + } + + public void testUpdateRemoteTranslogBufferInterval() throws Exception { + try (FeatureFlagSetter f = FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE)) { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "250ms") + .build(); + IndexMetadata metadata = newIndexMeta("index", settings); + IndexScopedSettings scopedSettings = IndexScopedSettings.DEFAULT_SCOPED_SETTINGS; + scopedSettings.registerSetting(IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING); + IndexSettings indexSettings = new IndexSettings(metadata, settings, scopedSettings); + assertEquals(TimeValue.timeValueMillis(250), indexSettings.getBufferInterval()); + + // Update settings + indexSettings.updateIndexMetadata( + newIndexMeta("index", Settings.builder().put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "150ms").build()) + ); + assertEquals(TimeValue.timeValueMillis(150), indexSettings.getBufferInterval()); + } + } + @SuppressForbidden(reason = "sets the SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY feature flag") public void testExtendedCompatibilityVersionForRemoteSnapshot() throws Exception { try (FeatureFlagSetter f = FeatureFlagSetter.set(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) { From 38f20168ad7b1b69abc9b6bcbc9f5bc0713836a9 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 23 Jan 2023 21:01:26 +0530 Subject: [PATCH 06/17] Refactor code & change threadpool for translog sync Signed-off-by: Ashish Singh --- .../util/concurrent/AsyncIOProcessor.java | 41 +++++++++++++------ .../concurrent/BufferedAsyncIOProcessor.java | 36 ++++------------ .../org/opensearch/threadpool/ThreadPool.java | 7 +--- .../opensearch/index/IndexSettingsTests.java | 2 +- 4 files changed, 39 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java index 00373aba076b1..9785c5d76357a 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java @@ -53,11 +53,11 @@ * @opensearch.internal */ public abstract class AsyncIOProcessor { - final Logger logger; - final ArrayBlockingQueue>> queue; + private final Logger logger; + private final ArrayBlockingQueue>> queue; private final ThreadContext threadContext; - final Semaphore promiseSemaphore = new Semaphore(1); - long lastRunStartTimeInMs; + private final Semaphore promiseSemaphore = new Semaphore(1); + private long lastRunStartTimeInMs; protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadContext) { this.logger = logger; @@ -78,13 +78,7 @@ public void put(Item item, Consumer listener) { // we first try make a promise that we are responsible for the processing final boolean promised = promiseSemaphore.tryAcquire(); if (promised == false) { - // in this case we are not responsible and can just block until there is space - try { - queue.put(new Tuple<>(item, preserveContext(listener))); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - listener.accept(e); - } + addToQueue(item, listener); } // here we have to try to make the promise again otherwise there is a race when a thread puts an entry without making the promise @@ -100,7 +94,17 @@ public void put(Item item, Consumer listener) { } } - protected void process(List>> candidates) { + void addToQueue(Item item, Consumer listener) { + // in this case we are not responsible and can just block until there is space + try { + queue.put(new Tuple<>(item, preserveContext(listener))); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + listener.accept(e); + } + } + + void process(List>> candidates) { // since we made the promise to process we gotta do it here at least once drainAndProcessAndRelease(candidates); while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { @@ -117,7 +121,6 @@ void drainAndProcessAndRelease(List>> candidates exception = processList(candidates); } finally { promiseSemaphore.release(); - logger.debug("step=drainAndProcessAndRelease timeTakenInMs={}", (System.currentTimeMillis() - lastRunStartTimeInMs)); } notifyList(candidates, exception); candidates.clear(); @@ -161,4 +164,16 @@ Consumer preserveContext(Consumer consumer) { * Writes or processes the items out or to disk. */ protected abstract void write(List>> candidates) throws IOException; + + Logger getLogger() { + return logger; + } + + Semaphore getPromiseSemaphore() { + return promiseSemaphore; + } + + long getLastRunStartTimeInMs() { + return lastRunStartTimeInMs; + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java index b11f2a0f78843..633296c5d8c2f 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java @@ -32,13 +32,11 @@ package org.opensearch.common.util.concurrent; import org.apache.logging.log4j.Logger; -import org.opensearch.common.collect.Tuple; import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Objects; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** @@ -49,6 +47,8 @@ * If more requests are enequed between invocations of drainAndProcessAndRelease, another processor thread * gets scheduled. Subsequent requests will get buffered till drainAndProcessAndRelease gets called in this new * processor thread. + * + * @opensearch.internal */ public abstract class BufferedAsyncIOProcessor extends AsyncIOProcessor { @@ -71,44 +71,24 @@ protected BufferedAsyncIOProcessor( public void put(Item item, Consumer listener) { Objects.requireNonNull(item, "item must not be null"); Objects.requireNonNull(listener, "listener must not be null"); - - try { - long timeout = getPutBlockingTimeoutMillis(); - if (timeout > 0) { - if (!queue.offer(new Tuple<>(item, preserveContext(listener)), timeout, TimeUnit.MILLISECONDS)) { - listener.accept(new OpenSearchRejectedExecutionException("failed to queue request, queue full")); - } - } else { - queue.put(new Tuple<>(item, preserveContext(listener))); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - listener.accept(e); - } - + addToQueue(item, listener); scheduleRefresh(); } - protected long getPutBlockingTimeoutMillis() { - return -1L; - } - private void scheduleRefresh() { - Runnable processor = () -> { process(new ArrayList<>()); }; - - if (promiseSemaphore.tryAcquire()) { + if (getPromiseSemaphore().tryAcquire()) { try { - threadpool.schedule(processor, getBufferInterval(), getBufferRefreshThreadPoolName()); + threadpool.schedule(() -> process(new ArrayList<>()), getBufferInterval(), getBufferRefreshThreadPoolName()); } catch (Exception e) { - logger.error("failed to schedule refresh"); - promiseSemaphore.release(); + getLogger().error("failed to schedule refresh"); + getPromiseSemaphore().release(); throw e; } } } private TimeValue getBufferInterval() { - long timeSinceLastRunStartInMS = System.currentTimeMillis() - lastRunStartTimeInMs; + long timeSinceLastRunStartInMS = System.currentTimeMillis() - getLastRunStartTimeInMs(); if (timeSinceLastRunStartInMS >= bufferInterval.getMillis()) { return TimeValue.ZERO; } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index c731529b36145..dd52e0f7ecf76 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -175,7 +175,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.SYSTEM_READ, ThreadPoolType.FIXED); map.put(Names.SYSTEM_WRITE, ThreadPoolType.FIXED); map.put(Names.TRANSLOG_TRANSFER, ThreadPoolType.SCALING); - map.put(Names.TRANSLOG_SYNC, ThreadPoolType.SCALING); + map.put(Names.TRANSLOG_SYNC, ThreadPoolType.FIXED); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -252,10 +252,7 @@ public ThreadPool( Names.TRANSLOG_TRANSFER, new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) ); - builders.put( - Names.TRANSLOG_SYNC, - new ScalingExecutorBuilder(Names.TRANSLOG_SYNC, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) - ); + builders.put(Names.TRANSLOG_SYNC, new FixedExecutorBuilder(settings, Names.TRANSLOG_SYNC, allocatedProcessors * 4, 10000)); for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 350080e2402e7..9fbe61a27cb5c 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -1053,10 +1053,10 @@ public void testUpdateRemoteTranslogBufferInterval() throws Exception { .build(); IndexMetadata metadata = newIndexMeta("index", settings); IndexScopedSettings scopedSettings = IndexScopedSettings.DEFAULT_SCOPED_SETTINGS; + // Register the setting to index-scoped settings since this is under experimental feature flag scopedSettings.registerSetting(IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING); IndexSettings indexSettings = new IndexSettings(metadata, settings, scopedSettings); assertEquals(TimeValue.timeValueMillis(250), indexSettings.getBufferInterval()); - // Update settings indexSettings.updateIndexMetadata( newIndexMeta("index", Settings.builder().put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "150ms").build()) From ed67ae95a92729803449b3343d4369a1330e19c3 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 23 Jan 2023 23:19:13 +0530 Subject: [PATCH 07/17] Add BufferedAsyncIOProcessorTests - UTs Signed-off-by: Ashish Singh --- .../BufferedAsyncIOProcessorTests.java | 195 ++++++++++++++++++ 1 file changed, 195 insertions(+) create mode 100644 server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java new file mode 100644 index 0000000000000..d75e1a575d8ca --- /dev/null +++ b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java @@ -0,0 +1,195 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.lang.Thread.sleep; + +public class BufferedAsyncIOProcessorTests extends OpenSearchTestCase { + + private ThreadPool threadpool; + private ThreadContext threadContext; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadpool = new TestThreadPool("buffered-async-io"); + threadContext = new ThreadContext(Settings.EMPTY); + } + + @After + public void cleanup() { + terminate(threadpool); + } + + public void testConsumerCanThrowExceptions() { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + + AsyncIOProcessor processor = new BufferedAsyncIOProcessor<>( + logger, + scaledRandomIntBetween(1, 2024), + threadContext, + threadpool, + TimeValue.timeValueMillis(50) + ) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + } + }; + processor.put(new Object(), (e) -> { + notified.incrementAndGet(); + throw new RuntimeException(); + }); + processor.put(new Object(), (e) -> { + notified.incrementAndGet(); + throw new RuntimeException(); + }); + try { + sleep(200); // Give the processor few chances to run + } catch (InterruptedException e) { + logger.error("Error while trying to sleep", e); + } + assertEquals(2, notified.get()); + assertEquals(2, received.get()); + } + + public void testPreserveThreadContext() throws InterruptedException { + final int threadCount = randomIntBetween(2, 10); + final String testHeader = "test-header"; + + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + + CountDownLatch processed = new CountDownLatch(threadCount); + AsyncIOProcessor processor = new BufferedAsyncIOProcessor<>( + logger, + scaledRandomIntBetween(1, 2024), + threadContext, + threadpool, + TimeValue.timeValueMillis(100) + ) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + } + }; + + // all threads should be non-blocking. + List threads = IntStream.range(0, threadCount).mapToObj(i -> new Thread(getTestName() + "_" + i) { + private final String response = randomAlphaOfLength(10); + + { + setDaemon(true); + } + + @Override + public void run() { + threadContext.addResponseHeader(testHeader, response); + processor.put(new Object(), (e) -> { + final Map> expected = Collections.singletonMap(testHeader, Collections.singletonList(response)); + assertEquals(expected, threadContext.getResponseHeaders()); + notified.incrementAndGet(); + processed.countDown(); + }); + } + }).collect(Collectors.toList()); + threads.forEach(Thread::start); + threads.forEach(t -> { + try { + t.join(20000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + assertTrue(processed.await(1, TimeUnit.SECONDS)); + + assertEquals(threadCount, notified.get()); + assertEquals(threadCount, received.get()); + threads.forEach(t -> assertFalse(t.isAlive())); + } + + public void testSlowConsumer() { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + + AsyncIOProcessor processor = new BufferedAsyncIOProcessor<>( + logger, + scaledRandomIntBetween(1, 2024), + threadContext, + threadpool, + TimeValue.timeValueMillis(100) + ) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + } + }; + + int threadCount = randomIntBetween(2, 10); + Semaphore serializePutSemaphore = new Semaphore(1); + CountDownLatch allDone = new CountDownLatch(threadCount); + List threads = IntStream.range(0, threadCount).mapToObj(i -> new Thread(getTestName() + "_" + i) { + { + setDaemon(true); + } + + @Override + public void run() { + try { + assertTrue(serializePutSemaphore.tryAcquire(10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + processor.put(new Object(), (e) -> { + serializePutSemaphore.release(); + notified.incrementAndGet(); + allDone.countDown(); + }); + } + }).collect(Collectors.toList()); + threads.forEach(Thread::start); + threads.forEach(t -> { + try { + t.join(20000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + try { + assertTrue(allDone.await(20000, TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertEquals(threadCount, notified.get()); + assertEquals(threadCount, received.get()); + threads.forEach(t -> assertFalse(t.isAlive())); + } +} From d76427e57ac75d7921b1fb587514e109ee52d3d0 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 24 Jan 2023 00:53:35 +0530 Subject: [PATCH 08/17] Add UTs for BufferedAsyncIOProcessor Signed-off-by: Ashish Singh --- .../BufferedAsyncIOProcessorTests.java | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java index d75e1a575d8ca..d85a2c67739bc 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -192,4 +193,51 @@ public void run() { assertEquals(threadCount, received.get()); threads.forEach(t -> assertFalse(t.isAlive())); } + + public void testConsecutiveWritesAtLeastBufferIntervalAway() throws InterruptedException { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + long bufferIntervalMs = randomLongBetween(50, 150); + List writeInvocationTimes = new LinkedList<>(); + + AsyncIOProcessor processor = new BufferedAsyncIOProcessor<>( + logger, + scaledRandomIntBetween(1, 2024), + threadContext, + threadpool, + TimeValue.timeValueMillis(bufferIntervalMs) + ) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + writeInvocationTimes.add(System.currentTimeMillis()); + } + }; + + int runCount = randomIntBetween(2, 5); + CountDownLatch processed = new CountDownLatch(runCount); + IntStream.range(0, runCount).forEach(i -> { + processor.put(new Object(), (e) -> { + notified.incrementAndGet(); + processed.countDown(); + }); + try { + long startTime = System.currentTimeMillis(); + while (received.get() != (i + 1) && (System.currentTimeMillis() - startTime) < 2 * bufferIntervalMs) { + sleep(50); + } + sleep(50); + } catch (InterruptedException ex) { + logger.error("Exception while trying to sleep", ex); + } + } + + ); + assertTrue(processed.await(bufferIntervalMs * (runCount + 1), TimeUnit.MILLISECONDS)); + assertEquals(runCount, notified.get()); + assertEquals(runCount, received.get()); + for (int i = 1; i < runCount; i++) { + assertTrue(writeInvocationTimes.get(i) - writeInvocationTimes.get(i - 1) > bufferIntervalMs); + } + } } From 8a14c89ed615f82cb80cf4c64d37758bebd06fae Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 24 Jan 2023 13:15:18 +0530 Subject: [PATCH 09/17] Make buffer_interval setting Final Signed-off-by: Ashish Singh --- .../cluster/metadata/IndexMetadata.java | 4 ++-- .../org/opensearch/index/IndexSettings.java | 4 ---- .../opensearch/index/IndexSettingsTests.java | 22 ------------------- 3 files changed, 2 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 376ecd6d6e9d2..466895c1f7729 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -484,8 +484,8 @@ public Iterator> settings() { return settings.iterator(); } }, - Property.Dynamic, - Property.IndexScope + Property.IndexScope, + Property.Final ); public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index c40201713120a..d25bff3b2b6da 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -885,10 +885,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime); scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled); scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy); - - if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE) && isRemoteTranslogStoreEnabled) { - scopedSettings.addSettingsUpdateConsumer(IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setBufferInterval); - } } private void setSearchIdleAfter(TimeValue searchIdleAfter) { diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 9fbe61a27cb5c..91fcb43546665 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -1043,28 +1043,6 @@ public void testSetRemoteTranslogBufferIntervalFailsWhenEmpty() { ); } - public void testUpdateRemoteTranslogBufferInterval() throws Exception { - try (FeatureFlagSetter f = FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE)) { - Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "250ms") - .build(); - IndexMetadata metadata = newIndexMeta("index", settings); - IndexScopedSettings scopedSettings = IndexScopedSettings.DEFAULT_SCOPED_SETTINGS; - // Register the setting to index-scoped settings since this is under experimental feature flag - scopedSettings.registerSetting(IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING); - IndexSettings indexSettings = new IndexSettings(metadata, settings, scopedSettings); - assertEquals(TimeValue.timeValueMillis(250), indexSettings.getBufferInterval()); - // Update settings - indexSettings.updateIndexMetadata( - newIndexMeta("index", Settings.builder().put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "150ms").build()) - ); - assertEquals(TimeValue.timeValueMillis(150), indexSettings.getBufferInterval()); - } - } - @SuppressForbidden(reason = "sets the SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY feature flag") public void testExtendedCompatibilityVersionForRemoteSnapshot() throws Exception { try (FeatureFlagSetter f = FeatureFlagSetter.set(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) { From e3525654a44c79d5abe923175186534277e90e1b Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 24 Jan 2023 21:09:54 +0530 Subject: [PATCH 10/17] Incorporate PR review feedback Signed-off-by: Ashish Singh --- .../util/concurrent/AsyncIOProcessor.java | 6 +- .../concurrent/BufferedAsyncIOProcessor.java | 16 +++-- .../opensearch/index/shard/IndexShard.java | 5 ++ .../BufferedAsyncIOProcessorTests.java | 63 ++++++++++++++++++- 4 files changed, 83 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java index 9785c5d76357a..dce2ea6697902 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java @@ -104,7 +104,7 @@ void addToQueue(Item item, Consumer listener) { } } - void process(List>> candidates) { + private void process(List>> candidates) { // since we made the promise to process we gotta do it here at least once drainAndProcessAndRelease(candidates); while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { @@ -176,4 +176,8 @@ Semaphore getPromiseSemaphore() { long getLastRunStartTimeInMs() { return lastRunStartTimeInMs; } + + ArrayBlockingQueue>> getQueue() { + return queue; + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java index 633296c5d8c2f..bff976b97306e 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java @@ -32,10 +32,12 @@ package org.opensearch.common.util.concurrent; import org.apache.logging.log4j.Logger; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.function.Consumer; @@ -72,10 +74,10 @@ public void put(Item item, Consumer listener) { Objects.requireNonNull(item, "item must not be null"); Objects.requireNonNull(listener, "listener must not be null"); addToQueue(item, listener); - scheduleRefresh(); + scheduleProcess(); } - private void scheduleRefresh() { + private void scheduleProcess() { if (getPromiseSemaphore().tryAcquire()) { try { threadpool.schedule(() -> process(new ArrayList<>()), getBufferInterval(), getBufferRefreshThreadPoolName()); @@ -87,6 +89,12 @@ private void scheduleRefresh() { } } + private void process(List>> candidates) { + // since we made the promise to process we gotta do it here at least once + drainAndProcessAndRelease(candidates); + scheduleProcess(); + } + private TimeValue getBufferInterval() { long timeSinceLastRunStartInMS = System.currentTimeMillis() - getLastRunStartTimeInMs(); if (timeSinceLastRunStartInMS >= bufferInterval.getMillis()) { @@ -95,8 +103,6 @@ private TimeValue getBufferInterval() { return TimeValue.timeValueMillis(bufferInterval.getMillis() - timeSinceLastRunStartInMS); } - protected String getBufferRefreshThreadPoolName() { - return ThreadPool.Names.TRANSLOG_SYNC; - } + protected abstract String getBufferRefreshThreadPoolName(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 73db50014802d..25e0e1e609552 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3835,6 +3835,11 @@ protected void write(List>> candida throw ex; } } + + @Override + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } }; } diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java index d85a2c67739bc..cd438f17b35d5 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java @@ -64,6 +64,11 @@ public void testConsumerCanThrowExceptions() { protected void write(List>> candidates) throws IOException { received.addAndGet(candidates.size()); } + + @Override + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } }; processor.put(new Object(), (e) -> { notified.incrementAndGet(); @@ -101,6 +106,11 @@ public void testPreserveThreadContext() throws InterruptedException { protected void write(List>> candidates) throws IOException { received.addAndGet(candidates.size()); } + + @Override + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } }; // all threads should be non-blocking. @@ -152,6 +162,11 @@ public void testSlowConsumer() { protected void write(List>> candidates) throws IOException { received.addAndGet(candidates.size()); } + + @Override + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } }; int threadCount = randomIntBetween(2, 10); @@ -194,7 +209,7 @@ public void run() { threads.forEach(t -> assertFalse(t.isAlive())); } - public void testConsecutiveWritesAtLeastBufferIntervalAway() throws InterruptedException { + public void testConsecutiveWritesAtLeastBufferIntervalAwayWithDelayInWrites() throws InterruptedException { AtomicInteger received = new AtomicInteger(0); AtomicInteger notified = new AtomicInteger(0); long bufferIntervalMs = randomLongBetween(50, 150); @@ -212,6 +227,11 @@ protected void write(List>> candidates) throws received.addAndGet(candidates.size()); writeInvocationTimes.add(System.currentTimeMillis()); } + + @Override + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } }; int runCount = randomIntBetween(2, 5); @@ -240,4 +260,45 @@ protected void write(List>> candidates) throws assertTrue(writeInvocationTimes.get(i) - writeInvocationTimes.get(i - 1) > bufferIntervalMs); } } + + public void testConsecutiveWritesAtLeastBufferIntervalAway() throws InterruptedException { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + long bufferIntervalMs = randomLongBetween(50, 150); + List writeInvocationTimes = new LinkedList<>(); + + AsyncIOProcessor processor = new BufferedAsyncIOProcessor<>( + logger, + scaledRandomIntBetween(1, 2024), + threadContext, + threadpool, + TimeValue.timeValueMillis(bufferIntervalMs) + ) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + writeInvocationTimes.add(System.currentTimeMillis()); + } + + @Override + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } + }; + + int runCount = randomIntBetween(3, 10); + CountDownLatch processed = new CountDownLatch(runCount); + IntStream.range(0, runCount).forEach(i -> { + processor.put(new Object(), (e) -> { + notified.incrementAndGet(); + processed.countDown(); + }); + }); + assertTrue(processed.await(bufferIntervalMs * (runCount + 1), TimeUnit.MILLISECONDS)); + assertEquals(runCount, notified.get()); + assertEquals(runCount, received.get()); + for (int i = 1; i < writeInvocationTimes.size(); i++) { + assertTrue(writeInvocationTimes.get(i) - writeInvocationTimes.get(i - 1) > bufferIntervalMs); + } + } } From 1f1ed7da9b825b6d9d27be1e26d8ff554de19a29 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 24 Jan 2023 22:34:00 +0530 Subject: [PATCH 11/17] Incorporate PR review feedback Signed-off-by: Ashish Singh --- .../concurrent/BufferedAsyncIOProcessor.java | 31 ++--------- .../opensearch/index/shard/IndexShard.java | 33 +++++------- .../BufferedAsyncIOProcessorTests.java | 52 ------------------- .../threadpool/ScalingThreadPoolTests.java | 2 +- 4 files changed, 19 insertions(+), 99 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java index bff976b97306e..2f97b4669d5f6 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java @@ -6,29 +6,6 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - package org.opensearch.common.util.concurrent; import org.apache.logging.log4j.Logger; @@ -42,11 +19,11 @@ import java.util.function.Consumer; /** - * A variant of AsyncIOProcessor which will wait for interval before processing items - * processing happens in another thread from same threadpool after {@link #bufferInterval} + * A variant of {@link AsyncIOProcessor} that allows to batch and buffer processing items at every + * {@link BufferedAsyncIOProcessor#bufferInterval} in a separate threadpool. *

* Requests are buffered till processor thread calls @{link drainAndProcessAndRelease} after bufferInterval. - * If more requests are enequed between invocations of drainAndProcessAndRelease, another processor thread + * If more requests are enqueued between invocations of drainAndProcessAndRelease, another processor thread * gets scheduled. Subsequent requests will get buffered till drainAndProcessAndRelease gets called in this new * processor thread. * @@ -82,7 +59,7 @@ private void scheduleProcess() { try { threadpool.schedule(() -> process(new ArrayList<>()), getBufferInterval(), getBufferRefreshThreadPoolName()); } catch (Exception e) { - getLogger().error("failed to schedule refresh"); + getLogger().error("failed to schedule process"); getPromiseSemaphore().release(); throw e; } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 25e0e1e609552..fda82539f1e98 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3821,19 +3821,22 @@ private static AsyncIOProcessor createTranslogSyncProcessor( TimeValue bufferInterval ) { ThreadContext threadContext = threadPool.getThreadContext(); + CheckedConsumer>>, IOException> writeConsumer = candidates -> { + try { + engineSupplier.get().translogManager().ensureTranslogSynced(candidates.stream().map(Tuple::v1)); + } catch (AlreadyClosedException ex) { + // that's fine since we already synced everything on engine close - this also is conform with the methods + // documentation + } catch (IOException ex) { // if this fails we are in deep shit - fail the request + logger.debug("failed to sync translog", ex); + throw ex; + } + }; if (bufferAsyncIoProcessor) { return new BufferedAsyncIOProcessor<>(logger, 102400, threadContext, threadPool, bufferInterval) { @Override protected void write(List>> candidates) throws IOException { - try { - engineSupplier.get().translogManager().ensureTranslogSynced(candidates.stream().map(Tuple::v1)); - } catch (AlreadyClosedException ex) { - // that's fine since we already synced everything on engine close - this also is conform with the methods - // documentation - } catch (IOException ex) { // if this fails we are in deep shit - fail the request - logger.debug("failed to sync translog", ex); - throw ex; - } + writeConsumer.accept(candidates); } @Override @@ -3843,18 +3846,10 @@ protected String getBufferRefreshThreadPoolName() { }; } - return new AsyncIOProcessor(logger, 1024, threadContext) { + return new AsyncIOProcessor<>(logger, 1024, threadContext) { @Override protected void write(List>> candidates) throws IOException { - try { - engineSupplier.get().translogManager().ensureTranslogSynced(candidates.stream().map(Tuple::v1)); - } catch (AlreadyClosedException ex) { - // that's fine since we already synced everything on engine close - this also is conform with the methods - // documentation - } catch (IOException ex) { // if this fails we are in deep shit - fail the request - logger.debug("failed to sync translog", ex); - throw ex; - } + writeConsumer.accept(candidates); } }; } diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java index cd438f17b35d5..6fd0a33fd5a7c 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java @@ -209,58 +209,6 @@ public void run() { threads.forEach(t -> assertFalse(t.isAlive())); } - public void testConsecutiveWritesAtLeastBufferIntervalAwayWithDelayInWrites() throws InterruptedException { - AtomicInteger received = new AtomicInteger(0); - AtomicInteger notified = new AtomicInteger(0); - long bufferIntervalMs = randomLongBetween(50, 150); - List writeInvocationTimes = new LinkedList<>(); - - AsyncIOProcessor processor = new BufferedAsyncIOProcessor<>( - logger, - scaledRandomIntBetween(1, 2024), - threadContext, - threadpool, - TimeValue.timeValueMillis(bufferIntervalMs) - ) { - @Override - protected void write(List>> candidates) throws IOException { - received.addAndGet(candidates.size()); - writeInvocationTimes.add(System.currentTimeMillis()); - } - - @Override - protected String getBufferRefreshThreadPoolName() { - return ThreadPool.Names.TRANSLOG_SYNC; - } - }; - - int runCount = randomIntBetween(2, 5); - CountDownLatch processed = new CountDownLatch(runCount); - IntStream.range(0, runCount).forEach(i -> { - processor.put(new Object(), (e) -> { - notified.incrementAndGet(); - processed.countDown(); - }); - try { - long startTime = System.currentTimeMillis(); - while (received.get() != (i + 1) && (System.currentTimeMillis() - startTime) < 2 * bufferIntervalMs) { - sleep(50); - } - sleep(50); - } catch (InterruptedException ex) { - logger.error("Exception while trying to sleep", ex); - } - } - - ); - assertTrue(processed.await(bufferIntervalMs * (runCount + 1), TimeUnit.MILLISECONDS)); - assertEquals(runCount, notified.get()); - assertEquals(runCount, received.get()); - for (int i = 1; i < runCount; i++) { - assertTrue(writeInvocationTimes.get(i) - writeInvocationTimes.get(i - 1) > bufferIntervalMs); - } - } - public void testConsecutiveWritesAtLeastBufferIntervalAway() throws InterruptedException { AtomicInteger received = new AtomicInteger(0); AtomicInteger notified = new AtomicInteger(0); diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index d68794c5bbec2..eca5a8eb19e47 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -133,7 +133,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessorsMaxTen); - sizes.put(ThreadPool.Names.TRANSLOG_SYNC, ThreadPool::halfAllocatedProcessorsMaxTen); + sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n); return sizes.get(threadPoolName).apply(numberOfProcessors); } From 70558bdfac18fa7b97c53bb085dc48a5e91f52b2 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 24 Jan 2023 22:52:41 +0530 Subject: [PATCH 12/17] Incorporate PR review feedback Signed-off-by: Ashish Singh --- .../common/util/concurrent/AsyncIOProcessor.java | 8 ++++---- .../common/util/concurrent/BufferedAsyncIOProcessor.java | 6 +++--- .../util/concurrent/BufferedAsyncIOProcessorTests.java | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java index dce2ea6697902..ad4f128a79dfe 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java @@ -57,7 +57,7 @@ public abstract class AsyncIOProcessor { private final ArrayBlockingQueue>> queue; private final ThreadContext threadContext; private final Semaphore promiseSemaphore = new Semaphore(1); - private long lastRunStartTimeInMs; + private long lastRunStartTimeInNs; protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadContext) { this.logger = logger; @@ -114,7 +114,7 @@ private void process(List>> candidates) { } void drainAndProcessAndRelease(List>> candidates) { - lastRunStartTimeInMs = System.currentTimeMillis(); + lastRunStartTimeInNs = System.nanoTime(); Exception exception; try { queue.drainTo(candidates); @@ -173,8 +173,8 @@ Semaphore getPromiseSemaphore() { return promiseSemaphore; } - long getLastRunStartTimeInMs() { - return lastRunStartTimeInMs; + long getLastRunStartTimeInNs() { + return lastRunStartTimeInNs; } ArrayBlockingQueue>> getQueue() { diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java index 2f97b4669d5f6..e5f8bda6577c8 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java @@ -73,11 +73,11 @@ private void process(List>> candidates) { } private TimeValue getBufferInterval() { - long timeSinceLastRunStartInMS = System.currentTimeMillis() - getLastRunStartTimeInMs(); - if (timeSinceLastRunStartInMS >= bufferInterval.getMillis()) { + long timeSinceLastRunStartInNS = System.nanoTime() - getLastRunStartTimeInNs(); + if (timeSinceLastRunStartInNS >= bufferInterval.getNanos()) { return TimeValue.ZERO; } - return TimeValue.timeValueMillis(bufferInterval.getMillis() - timeSinceLastRunStartInMS); + return TimeValue.timeValueNanos(bufferInterval.getNanos() - timeSinceLastRunStartInNS); } protected abstract String getBufferRefreshThreadPoolName(); diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java index 6fd0a33fd5a7c..365b4bfd5a8a8 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java @@ -225,7 +225,7 @@ public void testConsecutiveWritesAtLeastBufferIntervalAway() throws InterruptedE @Override protected void write(List>> candidates) throws IOException { received.addAndGet(candidates.size()); - writeInvocationTimes.add(System.currentTimeMillis()); + writeInvocationTimes.add(System.nanoTime()); } @Override @@ -246,7 +246,7 @@ protected String getBufferRefreshThreadPoolName() { assertEquals(runCount, notified.get()); assertEquals(runCount, received.get()); for (int i = 1; i < writeInvocationTimes.size(); i++) { - assertTrue(writeInvocationTimes.get(i) - writeInvocationTimes.get(i - 1) > bufferIntervalMs); + assertTrue(writeInvocationTimes.get(i) - writeInvocationTimes.get(i - 1) > bufferIntervalMs * 1_000_000); } } } From 3a5f960ef1836661c103e40d49cb52ff709d069a Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 25 Jan 2023 00:29:36 +0530 Subject: [PATCH 13/17] Incorporate PR review feedback Co-authored-by: Ashwin Pankaj Co-authored-by: Laxman Muttineni Signed-off-by: Ashish Singh --- .../cluster/metadata/IndexMetadata.java | 2 +- .../util/concurrent/AsyncIOProcessor.java | 18 +++++++----------- .../opensearch/index/IndexSettingsTests.java | 2 +- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 466895c1f7729..9adc052de0b48 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -468,7 +468,7 @@ public void validate(final TimeValue value, final Map, Object> settin final Boolean isRemoteTranslogStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING); if (isRemoteTranslogStoreEnabled == null || isRemoteTranslogStoreEnabled == false) { throw new IllegalArgumentException( - "Settings " + "Setting " + SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL + " can only be set when " + SETTING_REMOTE_TRANSLOG_STORE_ENABLED diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java index ad4f128a79dfe..f01a959465024 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java @@ -78,6 +78,7 @@ public void put(Item item, Consumer listener) { // we first try make a promise that we are responsible for the processing final boolean promised = promiseSemaphore.tryAcquire(); if (promised == false) { + // in this case we are not responsible and can just block until there is space addToQueue(item, listener); } @@ -90,12 +91,16 @@ public void put(Item item, Consumer listener) { // no need to preserve context for listener since it runs in current thread. candidates.add(new Tuple<>(item, listener)); } - process(candidates); + // since we made the promise to process we gotta do it here at least once + drainAndProcessAndRelease(candidates); + while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { + // yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing + drainAndProcessAndRelease(candidates); + } } } void addToQueue(Item item, Consumer listener) { - // in this case we are not responsible and can just block until there is space try { queue.put(new Tuple<>(item, preserveContext(listener))); } catch (InterruptedException e) { @@ -104,15 +109,6 @@ void addToQueue(Item item, Consumer listener) { } } - private void process(List>> candidates) { - // since we made the promise to process we gotta do it here at least once - drainAndProcessAndRelease(candidates); - while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) { - // yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing - drainAndProcessAndRelease(candidates); - } - } - void drainAndProcessAndRelease(List>> candidates) { lastRunStartTimeInNs = System.nanoTime(); Exception exception; diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 91fcb43546665..5f93b50c42777 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -1022,7 +1022,7 @@ public void testSetRemoteTranslogBufferIntervalFailsWhenRemoteTranslogIsNotEnabl () -> IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(indexSettings) ); assertEquals( - "Settings index.remote_store.translog.buffer_interval can only be set when index.remote_store.translog.enabled is set to true", + "Setting index.remote_store.translog.buffer_interval can only be set when index.remote_store.translog.enabled is set to true", iae.getMessage() ); } From 9dd5d88ec342d4083db07195a8439ffc506d115e Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 25 Jan 2023 10:13:50 +0530 Subject: [PATCH 14/17] Incorporate PR review feedback Signed-off-by: Ashish Singh --- .../util/concurrent/BufferedAsyncIOProcessor.java | 11 ++++------- .../opensearch/index/translog/RemoteFsTranslog.java | 8 -------- .../concurrent/BufferedAsyncIOProcessorTests.java | 2 +- 3 files changed, 5 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java index e5f8bda6577c8..a04866e78b0c6 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java @@ -9,12 +9,10 @@ package org.opensearch.common.util.concurrent; import org.apache.logging.log4j.Logger; -import org.opensearch.common.collect.Tuple; import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; -import java.util.List; import java.util.Objects; import java.util.function.Consumer; @@ -55,9 +53,9 @@ public void put(Item item, Consumer listener) { } private void scheduleProcess() { - if (getPromiseSemaphore().tryAcquire()) { + if (getQueue().isEmpty() == false && getPromiseSemaphore().tryAcquire()) { try { - threadpool.schedule(() -> process(new ArrayList<>()), getBufferInterval(), getBufferRefreshThreadPoolName()); + threadpool.schedule(this::process, getBufferInterval(), getBufferRefreshThreadPoolName()); } catch (Exception e) { getLogger().error("failed to schedule process"); getPromiseSemaphore().release(); @@ -66,9 +64,8 @@ private void scheduleProcess() { } } - private void process(List>> candidates) { - // since we made the promise to process we gotta do it here at least once - drainAndProcessAndRelease(candidates); + private void process() { + drainAndProcessAndRelease(new ArrayList<>()); scheduleProcess(); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 1c3eb3284ff6d..0cfaa5234c1fe 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -202,7 +202,6 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc } private boolean upload(Long primaryTerm, Long generation) throws IOException { - long uploadStartTime = System.currentTimeMillis(); // During primary relocation (primary-primary peer recovery), both the old and the new primary have engine // created with the RemoteFsTranslog. Both primaries are equipped to upload the translogs. The primary mode check // below ensures that the real primary only is uploading. Before the primary mode is set as true for the new @@ -232,13 +231,11 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti closeFilesIfNoPendingRetentionLocks(); maxRemoteTranslogGenerationUploaded = generation; logger.trace("uploaded translog for {} {} ", primaryTerm, generation); - logUploadStats(uploadStartTime, true); } @Override public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException { transferReleasable.close(); - logUploadStats(uploadStartTime, false); closeFilesIfNoPendingRetentionLocks(); if (ex instanceof IOException) { throw (IOException) ex; @@ -251,11 +248,6 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro } - private void logUploadStats(long uploadStartTime, boolean uploadStatus) { - logger.debug("Translog Upload status={} timeTaken={}", uploadStatus, (System.currentTimeMillis() - uploadStartTime)); - - } - // Visible for testing public Set allUploaded() { return fileTransferTracker.allUploaded(); diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java index 365b4bfd5a8a8..3ec7bdc54d3cb 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java @@ -246,7 +246,7 @@ protected String getBufferRefreshThreadPoolName() { assertEquals(runCount, notified.get()); assertEquals(runCount, received.get()); for (int i = 1; i < writeInvocationTimes.size(); i++) { - assertTrue(writeInvocationTimes.get(i) - writeInvocationTimes.get(i - 1) > bufferIntervalMs * 1_000_000); + assertTrue(writeInvocationTimes.get(i) - writeInvocationTimes.get(i - 1) >= bufferIntervalMs * 1_000_000); } } } From 2ddda270f01c8851bf2b62243c7021ee660ddf4e Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 25 Jan 2023 11:39:20 +0530 Subject: [PATCH 15/17] Incorporate PR review feedback Signed-off-by: Ashish Singh --- .../opensearch/common/settings/Setting.java | 26 ++++++------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/Setting.java b/server/src/main/java/org/opensearch/common/settings/Setting.java index 2e05a29822ca6..176f24e3cc1ff 100644 --- a/server/src/main/java/org/opensearch/common/settings/Setting.java +++ b/server/src/main/java/org/opensearch/common/settings/Setting.java @@ -2073,23 +2073,6 @@ public static Setting timeSetting( ); } - public static Setting timeSetting( - final String key, - Function defaultValue, - final TimeValue minValue, - final Validator validator, - final Property... properties - ) { - final SimpleKey simpleKey = new SimpleKey(key); - return new Setting<>( - simpleKey, - s -> defaultValue.apply(s).getStringRep(), - minTimeValueParser(key, minValue, isFiltered(properties)), - validator, - properties - ); - } - public static Setting timeSetting( final String key, Function defaultValue, @@ -2197,7 +2180,14 @@ public static Setting timeSetting( Validator validator, Property... properties ) { - return timeSetting(key, (s) -> defaultValue, minValue, validator, properties); + final SimpleKey simpleKey = new SimpleKey(key); + return new Setting<>( + simpleKey, + s -> defaultValue.getStringRep(), + minTimeValueParser(key, minValue, isFiltered(properties)), + validator, + properties + ); } public static Setting timeSetting( From d455c21729e5078514a2f5c6ece4dd1a28319350 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 25 Jan 2023 12:16:11 +0530 Subject: [PATCH 16/17] Handle scheduling failures Signed-off-by: Ashish Singh --- .../common/util/concurrent/AsyncIOProcessor.java | 2 +- .../util/concurrent/BufferedAsyncIOProcessor.java | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java index f01a959465024..e9b9442c555e5 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java @@ -136,7 +136,7 @@ private Exception processList(List>> candidates) return exception; } - private void notifyList(List>> candidates, Exception exception) { + void notifyList(List>> candidates, Exception exception) { for (Tuple> tuple : candidates) { Consumer consumer = tuple.v2(); try { diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java index a04866e78b0c6..0a3e839838e46 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java @@ -9,10 +9,12 @@ package org.opensearch.common.util.concurrent; import org.apache.logging.log4j.Logger; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.function.Consumer; @@ -58,12 +60,22 @@ private void scheduleProcess() { threadpool.schedule(this::process, getBufferInterval(), getBufferRefreshThreadPoolName()); } catch (Exception e) { getLogger().error("failed to schedule process"); + // process scheduling failure + processSchedulingFailure(e); getPromiseSemaphore().release(); - throw e; + // This is to make sure that any new items that are added to the queue between processSchedulingFailure + // and releasing the semaphore is handled by a subsequent refresh and not starved. + scheduleProcess(); } } } + private void processSchedulingFailure(Exception e) { + List>> candidates = new ArrayList<>(); + getQueue().drainTo(candidates); + notifyList(candidates, e); + } + private void process() { drainAndProcessAndRelease(new ArrayList<>()); scheduleProcess(); From 105cbc524d39daeb42a34cc826daba876286cf91 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sat, 28 Jan 2023 23:51:16 +0530 Subject: [PATCH 17/17] Incorporate PR review feedback Signed-off-by: Ashish Singh --- .../util/concurrent/BufferedAsyncIOProcessor.java | 1 - .../java/org/opensearch/index/IndexSettings.java | 15 +++++++-------- .../org/opensearch/index/shard/IndexShard.java | 2 +- .../org/opensearch/index/IndexSettingsTests.java | 2 +- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java index 0a3e839838e46..e3157ea8dfdb4 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java @@ -60,7 +60,6 @@ private void scheduleProcess() { threadpool.schedule(this::process, getBufferInterval(), getBufferRefreshThreadPoolName()); } catch (Exception e) { getLogger().error("failed to schedule process"); - // process scheduling failure processSchedulingFailure(e); getPromiseSemaphore().release(); // This is to make sure that any new items that are added to the queue between processSchedulingFailure diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index d25bff3b2b6da..de61ad7a6cfef 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -586,7 +586,7 @@ public final class IndexSettings { private final ReplicationType replicationType; private final boolean isRemoteStoreEnabled; private final boolean isRemoteTranslogStoreEnabled; - private volatile TimeValue bufferInterval; + private final TimeValue remoteTranslogUploadBufferInterval; private final String remoteStoreTranslogRepository; private final String remoteStoreRepository; private final boolean isRemoteSnapshot; @@ -754,7 +754,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false); remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY); - bufferInterval = settings.getAsTime(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, TimeValue.timeValueMillis(100)); + remoteTranslogUploadBufferInterval = settings.getAsTime( + IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, + TimeValue.timeValueMillis(100) + ); remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings); @@ -1142,12 +1145,8 @@ public void setTranslogSyncInterval(TimeValue translogSyncInterval) { * {@code index.translog.durability} is set as {@code request}. * @return the buffer interval. */ - public TimeValue getBufferInterval() { - return bufferInterval; - } - - public void setBufferInterval(TimeValue bufferInterval) { - this.bufferInterval = bufferInterval; + public TimeValue getRemoteTranslogUploadBufferInterval() { + return remoteTranslogUploadBufferInterval; } /** diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index fda82539f1e98..a4701f0ad1f30 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -370,7 +370,7 @@ public IndexShard( threadPool, this::getEngine, indexSettings.isRemoteTranslogStoreEnabled(), - indexSettings.getBufferInterval() + indexSettings.getRemoteTranslogUploadBufferInterval() ); this.mapperService = mapperService; this.indexCache = indexCache; diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 5f93b50c42777..28044410a21f4 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -970,7 +970,7 @@ public void testRemoteTranslogExplicitSetting() { IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); assertNull(settings.getRemoteStoreRepository()); assertEquals("tlog-store", settings.getRemoteStoreTranslogRepository()); - assertEquals(TimeValue.timeValueMillis(200), settings.getBufferInterval()); + assertEquals(TimeValue.timeValueMillis(200), settings.getRemoteTranslogUploadBufferInterval()); } public void testSetRemoteTranslogRepositoryFailsWhenRemoteTranslogIsNotEnabled() {