From 18390f9e314f79aa53a251db788217c01732cf1c Mon Sep 17 00:00:00 2001 From: Ashish Date: Tue, 2 May 2023 17:06:37 +0530 Subject: [PATCH] Add remote refresh segment pressure service, settings and tracker (#7227) Signed-off-by: Ashish Singh --- .../common/settings/ClusterSettings.java | 13 +- .../opensearch/common/util/MovingAverage.java | 42 +- .../RemoteRefreshSegmentPressureService.java | 330 ++++++++++++++ .../RemoteRefreshSegmentPressureSettings.java | 226 ++++++++++ .../remote/RemoteRefreshSegmentTracker.java | 425 ++++++++++++++++++ .../opensearch/index/remote/package-info.java | 10 + .../common/util/MovingAverageTests.java | 40 ++ ...oteRefreshSegmentPressureServiceTests.java | 165 +++++++ ...teRefreshSegmentPressureSettingsTests.java | 267 +++++++++++ .../RemoteRefreshSegmentTrackerTests.java | 404 +++++++++++++++++ 10 files changed, 1914 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java create mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java create mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java create mode 100644 server/src/main/java/org/opensearch/index/remote/package-info.java create mode 100644 server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java create mode 100644 server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java create mode 100644 server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 208a358d38395..0f50f8ae5fef2 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -39,6 +39,7 @@ import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; import org.opensearch.index.IndexingPressure; +import org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.ShardIndexingPressureMemoryManager; import org.opensearch.index.ShardIndexingPressureSettings; @@ -638,7 +639,17 @@ public void apply(Settings value, Settings current, Settings previous) { SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS, // Settings related to Searchable Snapshots - Node.NODE_SEARCH_CACHE_SIZE_SETTING + Node.NODE_SEARCH_CACHE_SIZE_SETTING, + + // Settings related to Remote Refresh Segment Pressure + RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED, + RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT, + RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR, + RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR, + RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT, + RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE, + RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE, + RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE ) ) ); diff --git a/server/src/main/java/org/opensearch/common/util/MovingAverage.java b/server/src/main/java/org/opensearch/common/util/MovingAverage.java index 650ba62ecd8c8..50d863709d489 100644 --- a/server/src/main/java/org/opensearch/common/util/MovingAverage.java +++ b/server/src/main/java/org/opensearch/common/util/MovingAverage.java @@ -17,19 +17,47 @@ public class MovingAverage { private final int windowSize; private final long[] observations; - private long count = 0; - private long sum = 0; - private double average = 0; + private volatile long count = 0; + private volatile long sum = 0; + private volatile double average = 0; public MovingAverage(int windowSize) { - if (windowSize <= 0) { - throw new IllegalArgumentException("window size must be greater than zero"); - } - + checkWindowSize(windowSize); this.windowSize = windowSize; this.observations = new long[windowSize]; } + /** + * Used for changing the window size of {@code MovingAverage}. + * + * @param newWindowSize new window size. + * @return copy of original object with updated size. + */ + public MovingAverage copyWithSize(int newWindowSize) { + MovingAverage copy = new MovingAverage(newWindowSize); + // Start is inclusive, but end is exclusive + long start, end = count; + if (isReady() == false) { + start = 0; + } else { + start = end - windowSize; + } + // If the newWindow Size is smaller than the elements eligible to be copied over, then we adjust the start value + if (end - start > newWindowSize) { + start = end - newWindowSize; + } + for (int i = (int) start; i < end; i++) { + copy.record(observations[i % observations.length]); + } + return copy; + } + + private void checkWindowSize(int size) { + if (size <= 0) { + throw new IllegalArgumentException("window size must be greater than zero"); + } + } + /** * Records a new observation and evicts the n-th last observation. */ diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java new file mode 100644 index 0000000000000..37935cc0eb29d --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java @@ -0,0 +1,330 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.index.shard.IndexEventListener; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.function.BiConsumer; + +/** + * Service used to validate if the incoming indexing request should be rejected based on the {@link RemoteRefreshSegmentTracker}. + * + * @opensearch.internal + */ +public class RemoteRefreshSegmentPressureService implements IndexEventListener { + + private static final Logger logger = LogManager.getLogger(RemoteRefreshSegmentPressureService.class); + + /** + * Keeps map of remote-backed index shards and their corresponding backpressure tracker. + */ + private final Map trackerMap = ConcurrentCollections.newConcurrentMap(); + + /** + * Remote refresh segment pressure settings which is used for creation of the backpressure tracker and as well as rejection. + */ + private final RemoteRefreshSegmentPressureSettings pressureSettings; + + private final List lagValidators; + + @Inject + public RemoteRefreshSegmentPressureService(ClusterService clusterService, Settings settings) { + pressureSettings = new RemoteRefreshSegmentPressureSettings(clusterService, settings, this); + lagValidators = Arrays.asList( + new RefreshSeqNoLagValidator(pressureSettings), + new BytesLagValidator(pressureSettings), + new TimeLagValidator(pressureSettings), + new ConsecutiveFailureValidator(pressureSettings) + ); + } + + /** + * Get {@code RemoteRefreshSegmentTracker} only if the underlying Index has remote segments integration enabled. + * + * @param shardId shard id + * @return the tracker if index is remote-backed, else null. + */ + public RemoteRefreshSegmentTracker getRemoteRefreshSegmentTracker(ShardId shardId) { + return trackerMap.get(shardId); + } + + @Override + public void afterIndexShardCreated(IndexShard indexShard) { + if (indexShard.indexSettings().isRemoteStoreEnabled() == false) { + return; + } + ShardId shardId = indexShard.shardId(); + trackerMap.put( + shardId, + new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ) + ); + logger.trace("Created tracker for shardId={}", shardId); + } + + @Override + public void afterIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) { + if (indexShard.indexSettings().isRemoteStoreEnabled() == false) { + return; + } + trackerMap.remove(shardId); + logger.trace("Deleted tracker for shardId={}", shardId); + } + + /** + * Check if remote refresh segments backpressure is enabled. This is backed by a cluster level setting. + * + * @return true if enabled, else false. + */ + public boolean isSegmentsUploadBackpressureEnabled() { + return pressureSettings.isRemoteRefreshSegmentPressureEnabled(); + } + + public void validateSegmentsUploadLag(ShardId shardId) { + RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = getRemoteRefreshSegmentTracker(shardId); + // Check if refresh checkpoint (a.k.a. seq number) lag is 2 or below - this is to handle segment merges that can + // increase the bytes to upload almost suddenly. + if (remoteRefreshSegmentTracker.getRefreshSeqNoLag() <= 1) { + return; + } + + for (LagValidator lagValidator : lagValidators) { + if (lagValidator.validate(remoteRefreshSegmentTracker, shardId) == false) { + remoteRefreshSegmentTracker.incrementRejectionCount(lagValidator.name()); + throw new OpenSearchRejectedExecutionException(lagValidator.rejectionMessage(remoteRefreshSegmentTracker, shardId)); + } + } + } + + void updateUploadBytesMovingAverageWindowSize(int updatedSize) { + updateMovingAverageWindowSize(RemoteRefreshSegmentTracker::updateUploadBytesMovingAverageWindowSize, updatedSize); + } + + void updateUploadBytesPerSecMovingAverageWindowSize(int updatedSize) { + updateMovingAverageWindowSize(RemoteRefreshSegmentTracker::updateUploadBytesPerSecMovingAverageWindowSize, updatedSize); + } + + void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { + updateMovingAverageWindowSize(RemoteRefreshSegmentTracker::updateUploadTimeMsMovingAverageWindowSize, updatedSize); + } + + void updateMovingAverageWindowSize(BiConsumer biConsumer, int updatedSize) { + trackerMap.values().forEach(tracker -> biConsumer.accept(tracker, updatedSize)); + } + + /** + * Abstract class for validating if lag is acceptable or not. + * + * @opensearch.internal + */ + private static abstract class LagValidator { + + final RemoteRefreshSegmentPressureSettings pressureSettings; + + private LagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { + this.pressureSettings = pressureSettings; + } + + /** + * Validates the lag and returns value accordingly. + * + * @param pressureTracker tracker which holds information about the shard. + * @param shardId shard id of the {@code IndexShard} currently being validated. + * @return true if successfully validated that lag is acceptable. + */ + abstract boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId); + + /** + * Returns the name of the lag validator. + * + * @return the name using class name. + */ + abstract String name(); + + abstract String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId); + } + + /** + * Check if the remote store seq no lag is above the min seq no lag limit + * + * @opensearch.internal + */ + private static class RefreshSeqNoLagValidator extends LagValidator { + + private static final String NAME = "refresh_seq_no_lag"; + + private RefreshSeqNoLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { + super(pressureSettings); + } + + @Override + public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + // Check if the remote store seq no lag is above the min seq no lag limit + return pressureTracker.getRefreshSeqNoLag() <= pressureSettings.getMinRefreshSeqNoLagLimit(); + } + + @Override + String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + return String.format( + Locale.ROOT, + "rejected execution on primary shard:%s due to remote segments lagging behind local segments." + + "remote_refresh_seq_no:%s local_refresh_seq_no:%s", + shardId, + pressureTracker.getRemoteRefreshSeqNo(), + pressureTracker.getLocalRefreshSeqNo() + ); + } + + @Override + String name() { + return NAME; + } + } + + /** + * Check if the remote store is lagging more than the upload bytes average multiplied by a variance factor + * + * @opensearch.internal + */ + private static class BytesLagValidator extends LagValidator { + + private static final String NAME = "bytes_lag"; + + private BytesLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { + super(pressureSettings); + } + + @Override + public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + if (pressureTracker.isUploadBytesAverageReady() == false) { + logger.trace("upload bytes moving average is not ready"); + return true; + } + double dynamicBytesLagThreshold = pressureTracker.getUploadBytesAverage() * pressureSettings.getBytesLagVarianceFactor(); + long bytesLag = pressureTracker.getBytesLag(); + return bytesLag <= dynamicBytesLagThreshold; + } + + @Override + public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + double dynamicBytesLagThreshold = pressureTracker.getUploadBytesAverage() * pressureSettings.getBytesLagVarianceFactor(); + return String.format( + Locale.ROOT, + "rejected execution on primary shard:%s due to remote segments lagging behind local segments." + + "bytes_lag:%s dynamic_bytes_lag_threshold:%s", + shardId, + pressureTracker.getBytesLag(), + dynamicBytesLagThreshold + ); + } + + @Override + String name() { + return NAME; + } + } + + /** + * Check if the remote store is lagging more than the upload time average multiplied by a variance factor + * + * @opensearch.internal + */ + private static class TimeLagValidator extends LagValidator { + + private static final String NAME = "time_lag"; + + private TimeLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { + super(pressureSettings); + } + + @Override + public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + if (pressureTracker.isUploadTimeMsAverageReady() == false) { + logger.trace("upload time moving average is not ready"); + return true; + } + long timeLag = pressureTracker.getTimeMsLag(); + double dynamicTimeLagThreshold = pressureTracker.getUploadTimeMsAverage() * pressureSettings.getUploadTimeLagVarianceFactor(); + return timeLag <= dynamicTimeLagThreshold; + } + + @Override + public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + double dynamicTimeLagThreshold = pressureTracker.getUploadTimeMsAverage() * pressureSettings.getUploadTimeLagVarianceFactor(); + return String.format( + Locale.ROOT, + "rejected execution on primary shard:%s due to remote segments lagging behind local segments." + + "time_lag:%s ms dynamic_time_lag_threshold:%s ms", + shardId, + pressureTracker.getTimeMsLag(), + dynamicTimeLagThreshold + ); + } + + @Override + String name() { + return NAME; + } + } + + /** + * Check if consecutive failure limit has been breached + * + * @opensearch.internal + */ + private static class ConsecutiveFailureValidator extends LagValidator { + + private static final String NAME = "consecutive_failures_lag"; + + private ConsecutiveFailureValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { + super(pressureSettings); + } + + @Override + public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + int failureStreakCount = pressureTracker.getConsecutiveFailureCount(); + int minConsecutiveFailureThreshold = pressureSettings.getMinConsecutiveFailuresLimit(); + return failureStreakCount <= minConsecutiveFailureThreshold; + } + + @Override + public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + return String.format( + Locale.ROOT, + "rejected execution on primary shard:%s due to remote segments lagging behind local segments." + + "failure_streak_count:%s min_consecutive_failure_threshold:%s", + shardId, + pressureTracker.getConsecutiveFailureCount(), + pressureSettings.getMinConsecutiveFailuresLimit() + ); + } + + @Override + String name() { + return NAME; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java new file mode 100644 index 0000000000000..6cb0d1d07e78b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettings.java @@ -0,0 +1,226 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; + +/** + * Settings related to back pressure on account of segments upload failures / lags. + * + * @opensearch.internal + */ +public class RemoteRefreshSegmentPressureSettings { + + private static class Defaults { + private static final long MIN_SEQ_NO_LAG_LIMIT = 5; + private static final long MIN_SEQ_NO_LAG_LIMIT_MIN_VALUE = 2; + private static final double BYTES_LAG_VARIANCE_FACTOR = 2.0; + private static final double UPLOAD_TIME_LAG_VARIANCE_FACTOR = 2.0; + private static final double VARIANCE_FACTOR_MIN_VALUE = 1.0; + private static final int MIN_CONSECUTIVE_FAILURES_LIMIT = 10; + private static final int MIN_CONSECUTIVE_FAILURES_LIMIT_MIN_VALUE = 1; + private static final int UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE = 20; + private static final int UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE = 20; + private static final int UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE = 20; + private static final int MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE = 5; + } + + public static final Setting REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED = Setting.boolSetting( + "remote_store.segment.pressure.enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting MIN_SEQ_NO_LAG_LIMIT = Setting.longSetting( + "remote_store.segment.pressure.seq_no_lag.limit", + Defaults.MIN_SEQ_NO_LAG_LIMIT, + Defaults.MIN_SEQ_NO_LAG_LIMIT_MIN_VALUE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting BYTES_LAG_VARIANCE_FACTOR = Setting.doubleSetting( + "remote_store.segment.pressure.bytes_lag.variance_factor", + Defaults.BYTES_LAG_VARIANCE_FACTOR, + Defaults.VARIANCE_FACTOR_MIN_VALUE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting UPLOAD_TIME_LAG_VARIANCE_FACTOR = Setting.doubleSetting( + "remote_store.segment.pressure.time_lag.variance_factor", + Defaults.UPLOAD_TIME_LAG_VARIANCE_FACTOR, + Defaults.VARIANCE_FACTOR_MIN_VALUE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting MIN_CONSECUTIVE_FAILURES_LIMIT = Setting.intSetting( + "remote_store.segment.pressure.consecutive_failures.limit", + Defaults.MIN_CONSECUTIVE_FAILURES_LIMIT, + Defaults.MIN_CONSECUTIVE_FAILURES_LIMIT_MIN_VALUE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( + "remote_store.segment.pressure.upload_bytes_moving_average_window_size", + Defaults.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE, + Defaults.MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( + "remote_store.segment.pressure.upload_bytes_per_sec_moving_average_window_size", + Defaults.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE, + Defaults.MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( + "remote_store.segment.pressure.upload_time_moving_average_window_size", + Defaults.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE, + Defaults.MOVING_AVERAGE_WINDOW_SIZE_MIN_VALUE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private volatile boolean remoteRefreshSegmentPressureEnabled; + + private volatile long minRefreshSeqNoLagLimit; + + private volatile double bytesLagVarianceFactor; + + private volatile double uploadTimeLagVarianceFactor; + + private volatile int minConsecutiveFailuresLimit; + + private volatile int uploadBytesMovingAverageWindowSize; + + private volatile int uploadBytesPerSecMovingAverageWindowSize; + + private volatile int uploadTimeMovingAverageWindowSize; + + public RemoteRefreshSegmentPressureSettings( + ClusterService clusterService, + Settings settings, + RemoteRefreshSegmentPressureService remoteUploadPressureService + ) { + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + + this.remoteRefreshSegmentPressureEnabled = REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.get(settings); + clusterSettings.addSettingsUpdateConsumer(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED, this::setRemoteRefreshSegmentPressureEnabled); + + this.minRefreshSeqNoLagLimit = MIN_SEQ_NO_LAG_LIMIT.get(settings); + clusterSettings.addSettingsUpdateConsumer(MIN_SEQ_NO_LAG_LIMIT, this::setMinRefreshSeqNoLagLimit); + + this.bytesLagVarianceFactor = BYTES_LAG_VARIANCE_FACTOR.get(settings); + clusterSettings.addSettingsUpdateConsumer(BYTES_LAG_VARIANCE_FACTOR, this::setBytesLagVarianceFactor); + + this.uploadTimeLagVarianceFactor = UPLOAD_TIME_LAG_VARIANCE_FACTOR.get(settings); + clusterSettings.addSettingsUpdateConsumer(UPLOAD_TIME_LAG_VARIANCE_FACTOR, this::setUploadTimeLagVarianceFactor); + + this.minConsecutiveFailuresLimit = MIN_CONSECUTIVE_FAILURES_LIMIT.get(settings); + clusterSettings.addSettingsUpdateConsumer(MIN_CONSECUTIVE_FAILURES_LIMIT, this::setMinConsecutiveFailuresLimit); + + this.uploadBytesMovingAverageWindowSize = UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.get(settings); + clusterSettings.addSettingsUpdateConsumer( + UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE, + remoteUploadPressureService::updateUploadBytesMovingAverageWindowSize + ); + clusterSettings.addSettingsUpdateConsumer(UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE, this::setUploadBytesMovingAverageWindowSize); + + this.uploadBytesPerSecMovingAverageWindowSize = UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.get(settings); + clusterSettings.addSettingsUpdateConsumer( + UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE, + remoteUploadPressureService::updateUploadBytesPerSecMovingAverageWindowSize + ); + clusterSettings.addSettingsUpdateConsumer( + UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE, + this::setUploadBytesPerSecMovingAverageWindowSize + ); + + this.uploadTimeMovingAverageWindowSize = UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.get(settings); + clusterSettings.addSettingsUpdateConsumer( + UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE, + remoteUploadPressureService::updateUploadTimeMsMovingAverageWindowSize + ); + clusterSettings.addSettingsUpdateConsumer(UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE, this::setUploadTimeMovingAverageWindowSize); + } + + public boolean isRemoteRefreshSegmentPressureEnabled() { + return remoteRefreshSegmentPressureEnabled; + } + + public void setRemoteRefreshSegmentPressureEnabled(boolean remoteRefreshSegmentPressureEnabled) { + this.remoteRefreshSegmentPressureEnabled = remoteRefreshSegmentPressureEnabled; + } + + public long getMinRefreshSeqNoLagLimit() { + return minRefreshSeqNoLagLimit; + } + + public void setMinRefreshSeqNoLagLimit(long minRefreshSeqNoLagLimit) { + this.minRefreshSeqNoLagLimit = minRefreshSeqNoLagLimit; + } + + public double getBytesLagVarianceFactor() { + return bytesLagVarianceFactor; + } + + public void setBytesLagVarianceFactor(double bytesLagVarianceFactor) { + this.bytesLagVarianceFactor = bytesLagVarianceFactor; + } + + public double getUploadTimeLagVarianceFactor() { + return uploadTimeLagVarianceFactor; + } + + public void setUploadTimeLagVarianceFactor(double uploadTimeLagVarianceFactor) { + this.uploadTimeLagVarianceFactor = uploadTimeLagVarianceFactor; + } + + public int getMinConsecutiveFailuresLimit() { + return minConsecutiveFailuresLimit; + } + + public void setMinConsecutiveFailuresLimit(int minConsecutiveFailuresLimit) { + this.minConsecutiveFailuresLimit = minConsecutiveFailuresLimit; + } + + public int getUploadBytesMovingAverageWindowSize() { + return uploadBytesMovingAverageWindowSize; + } + + public void setUploadBytesMovingAverageWindowSize(int uploadBytesMovingAverageWindowSize) { + this.uploadBytesMovingAverageWindowSize = uploadBytesMovingAverageWindowSize; + } + + public int getUploadBytesPerSecMovingAverageWindowSize() { + return uploadBytesPerSecMovingAverageWindowSize; + } + + public void setUploadBytesPerSecMovingAverageWindowSize(int uploadBytesPerSecMovingAverageWindowSize) { + this.uploadBytesPerSecMovingAverageWindowSize = uploadBytesPerSecMovingAverageWindowSize; + } + + public int getUploadTimeMovingAverageWindowSize() { + return uploadTimeMovingAverageWindowSize; + } + + public void setUploadTimeMovingAverageWindowSize(int uploadTimeMovingAverageWindowSize) { + this.uploadTimeMovingAverageWindowSize = uploadTimeMovingAverageWindowSize; + } +} diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java new file mode 100644 index 0000000000000..109eadf34509b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -0,0 +1,425 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.common.util.MovingAverage; +import org.opensearch.common.util.Streak; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.index.shard.ShardId; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Keeps track of remote refresh which happens in {@link org.opensearch.index.shard.RemoteStoreRefreshListener}. This consist of multiple critical metrics. + * + * @opensearch.internal + */ +public class RemoteRefreshSegmentTracker { + + /** + * ShardId for which this instance tracks the remote segment upload metadata. + */ + private final ShardId shardId; + + /** + * Every refresh is assigned a sequence number. This is the sequence number of the most recent refresh. + */ + private volatile long localRefreshSeqNo; + + /** + * The refresh time of the most recent refresh. + */ + private volatile long localRefreshTimeMs; + + /** + * Sequence number of the most recent remote refresh. + */ + private volatile long remoteRefreshSeqNo; + + /** + * The refresh time of most recent remote refresh. + */ + private volatile long remoteRefreshTimeMs; + + /** + * Keeps the seq no lag computed so that we do not compute it for every request. + */ + private volatile long refreshSeqNoLag; + + /** + * Keeps the time (ms) lag computed so that we do not compute it for every request. + */ + private volatile long timeMsLag; + + /** + * Cumulative sum of size in bytes of segment files for which upload has started during remote refresh. + */ + private volatile long uploadBytesStarted; + + /** + * Cumulative sum of size in bytes of segment files for which upload has failed during remote refresh. + */ + private volatile long uploadBytesFailed; + + /** + * Cumulative sum of size in bytes of segment files for which upload has succeeded during remote refresh. + */ + private volatile long uploadBytesSucceeded; + + /** + * Cumulative sum of count of remote refreshes that have started. + */ + private volatile long totalUploadsStarted; + + /** + * Cumulative sum of count of remote refreshes that have failed. + */ + private volatile long totalUploadsFailed; + + /** + * Cumulative sum of count of remote refreshes that have succeeded. + */ + private volatile long totalUploadsSucceeded; + + /** + * Cumulative sum of rejection counts for this shard. + */ + private final AtomicLong rejectionCount = new AtomicLong(); + + /** + * Keeps track of rejection count with each rejection reason. + */ + private final Map rejectionCountMap = ConcurrentCollections.newConcurrentMap(); + + /** + * Map of name to size of the segment files created as part of the most recent refresh. + */ + private volatile Map latestLocalFileNameLengthMap; + + /** + * Set of names of segment files that were uploaded as part of the most recent remote refresh. + */ + private final Set latestUploadFiles = new HashSet<>(); + + /** + * Keeps the bytes lag computed so that we do not compute it for every request. + */ + private volatile long bytesLag; + + /** + * Holds count of consecutive failures until last success. Gets reset to zero if there is a success. + */ + private final Streak failures = new Streak(); + + /** + * Provides moving average over the last N total size in bytes of segment files uploaded as part of remote refresh. + * N is window size. Wrapped with {@code AtomicReference} for dynamic changes in window size. + */ + private final AtomicReference uploadBytesMovingAverageReference; + + /** + * This lock object is used for making sure we do not miss any data + */ + private final Object uploadBytesMutex = new Object(); + + /** + * Provides moving average over the last N upload speed (in bytes/s) of segment files uploaded as part of remote refresh. + * N is window size. Wrapped with {@code AtomicReference} for dynamic changes in window size. + */ + private final AtomicReference uploadBytesPerSecMovingAverageReference; + + private final Object uploadBytesPerSecMutex = new Object(); + + /** + * Provides moving average over the last N overall upload time (in nanos) as part of remote refresh.N is window size. + * Wrapped with {@code AtomicReference} for dynamic changes in window size. + */ + private final AtomicReference uploadTimeMsMovingAverageReference; + + private final Object uploadTimeMsMutex = new Object(); + + public RemoteRefreshSegmentTracker( + ShardId shardId, + int uploadBytesMovingAverageWindowSize, + int uploadBytesPerSecMovingAverageWindowSize, + int uploadTimeMsMovingAverageWindowSize + ) { + this.shardId = shardId; + // Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises. + long currentTimeMs = System.nanoTime() / 1_000_000L; + localRefreshTimeMs = currentTimeMs; + remoteRefreshTimeMs = currentTimeMs; + uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize)); + uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize)); + uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize)); + } + + ShardId getShardId() { + return shardId; + } + + long getLocalRefreshSeqNo() { + return localRefreshSeqNo; + } + + void updateLocalRefreshSeqNo(long localRefreshSeqNo) { + assert localRefreshSeqNo > this.localRefreshSeqNo : "newLocalRefreshSeqNo=" + + localRefreshSeqNo + + ">=" + + "currentLocalRefreshSeqNo=" + + this.localRefreshSeqNo; + this.localRefreshSeqNo = localRefreshSeqNo; + computeRefreshSeqNoLag(); + } + + long getLocalRefreshTimeMs() { + return localRefreshTimeMs; + } + + void updateLocalRefreshTimeMs(long localRefreshTimeMs) { + assert localRefreshTimeMs > this.localRefreshTimeMs : "newLocalRefreshTimeMs=" + + localRefreshTimeMs + + ">=" + + "currentLocalRefreshTimeMs=" + + this.localRefreshTimeMs; + this.localRefreshTimeMs = localRefreshTimeMs; + computeTimeMsLag(); + } + + long getRemoteRefreshSeqNo() { + return remoteRefreshSeqNo; + } + + void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) { + assert remoteRefreshSeqNo > this.remoteRefreshSeqNo : "newRemoteRefreshSeqNo=" + + remoteRefreshSeqNo + + ">=" + + "currentRemoteRefreshSeqNo=" + + this.remoteRefreshSeqNo; + this.remoteRefreshSeqNo = remoteRefreshSeqNo; + computeRefreshSeqNoLag(); + } + + long getRemoteRefreshTimeMs() { + return remoteRefreshTimeMs; + } + + void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) { + assert remoteRefreshTimeMs > this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs=" + + remoteRefreshTimeMs + + ">=" + + "currentRemoteRefreshTimeMs=" + + this.remoteRefreshTimeMs; + this.remoteRefreshTimeMs = remoteRefreshTimeMs; + computeTimeMsLag(); + } + + private void computeRefreshSeqNoLag() { + refreshSeqNoLag = localRefreshSeqNo - remoteRefreshSeqNo; + } + + long getRefreshSeqNoLag() { + return refreshSeqNoLag; + } + + private void computeTimeMsLag() { + timeMsLag = localRefreshTimeMs - remoteRefreshTimeMs; + } + + long getTimeMsLag() { + return timeMsLag; + } + + long getBytesLag() { + return bytesLag; + } + + long getUploadBytesStarted() { + return uploadBytesStarted; + } + + void addUploadBytesStarted(long size) { + uploadBytesStarted += size; + } + + long getUploadBytesFailed() { + return uploadBytesFailed; + } + + void addUploadBytesFailed(long size) { + uploadBytesFailed += size; + } + + long getUploadBytesSucceeded() { + return uploadBytesSucceeded; + } + + void addUploadBytesSucceeded(long size) { + uploadBytesSucceeded += size; + } + + long getInflightUploadBytes() { + return uploadBytesStarted - uploadBytesFailed - uploadBytesSucceeded; + } + + long getTotalUploadsStarted() { + return totalUploadsStarted; + } + + void incrementTotalUploadsStarted() { + totalUploadsStarted += 1; + } + + long getTotalUploadsFailed() { + return totalUploadsFailed; + } + + void incrementTotalUploadsFailed() { + totalUploadsFailed += 1; + failures.record(true); + } + + long getTotalUploadsSucceeded() { + return totalUploadsSucceeded; + } + + void incrementTotalUploadSucceeded() { + totalUploadsSucceeded += 1; + failures.record(false); + } + + long getInflightUploads() { + return totalUploadsStarted - totalUploadsFailed - totalUploadsSucceeded; + } + + long getRejectionCount() { + return rejectionCount.get(); + } + + void incrementRejectionCount() { + rejectionCount.incrementAndGet(); + } + + void incrementRejectionCount(String rejectionReason) { + rejectionCountMap.computeIfAbsent(rejectionReason, k -> new AtomicLong()).incrementAndGet(); + } + + long getRejectionCount(String rejectionReason) { + return rejectionCountMap.get(rejectionReason).get(); + } + + Map getLatestLocalFileNameLengthMap() { + return latestLocalFileNameLengthMap; + } + + void setLatestLocalFileNameLengthMap(Map latestLocalFileNameLengthMap) { + this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap; + computeBytesLag(); + } + + void addToLatestUploadFiles(String file) { + this.latestUploadFiles.add(file); + computeBytesLag(); + } + + private void computeBytesLag() { + if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) { + return; + } + Set filesNotYetUploaded = latestLocalFileNameLengthMap.keySet() + .stream() + .filter(f -> !latestUploadFiles.contains(f)) + .collect(Collectors.toSet()); + this.bytesLag = filesNotYetUploaded.stream().map(latestLocalFileNameLengthMap::get).mapToLong(Long::longValue).sum(); + } + + int getConsecutiveFailureCount() { + return failures.length(); + } + + boolean isUploadBytesAverageReady() { + return uploadBytesMovingAverageReference.get().isReady(); + } + + double getUploadBytesAverage() { + return uploadBytesMovingAverageReference.get().getAverage(); + } + + void addUploadBytes(long size) { + synchronized (uploadBytesMutex) { + this.uploadBytesMovingAverageReference.get().record(size); + } + } + + /** + * Updates the window size for data collection of upload bytes. This also resets any data collected so far. + * + * @param updatedSize the updated size + */ + void updateUploadBytesMovingAverageWindowSize(int updatedSize) { + synchronized (uploadBytesMutex) { + this.uploadBytesMovingAverageReference.set(this.uploadBytesMovingAverageReference.get().copyWithSize(updatedSize)); + } + } + + boolean isUploadBytesPerSecAverageReady() { + return uploadBytesPerSecMovingAverageReference.get().isReady(); + } + + double getUploadBytesPerSecAverage() { + return uploadBytesPerSecMovingAverageReference.get().getAverage(); + } + + void addUploadBytesPerSec(long bytesPerSec) { + synchronized (uploadBytesPerSecMutex) { + this.uploadBytesPerSecMovingAverageReference.get().record(bytesPerSec); + } + } + + /** + * Updates the window size for data collection of upload bytes per second. This also resets any data collected so far. + * + * @param updatedSize the updated size + */ + void updateUploadBytesPerSecMovingAverageWindowSize(int updatedSize) { + synchronized (uploadBytesPerSecMutex) { + this.uploadBytesPerSecMovingAverageReference.set(this.uploadBytesPerSecMovingAverageReference.get().copyWithSize(updatedSize)); + } + } + + boolean isUploadTimeMsAverageReady() { + return uploadTimeMsMovingAverageReference.get().isReady(); + } + + double getUploadTimeMsAverage() { + return uploadTimeMsMovingAverageReference.get().getAverage(); + } + + void addUploadTimeMs(long timeMs) { + synchronized (uploadTimeMsMutex) { + this.uploadTimeMsMovingAverageReference.get().record(timeMs); + } + } + + /** + * Updates the window size for data collection of upload time (ms). This also resets any data collected so far. + * + * @param updatedSize the updated size + */ + void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { + synchronized (uploadTimeMsMutex) { + this.uploadTimeMsMovingAverageReference.set(this.uploadTimeMsMovingAverageReference.get().copyWithSize(updatedSize)); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/remote/package-info.java b/server/src/main/java/org/opensearch/index/remote/package-info.java new file mode 100644 index 0000000000000..a3aa969316372 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Core classes related to remote segments and translogs */ +package org.opensearch.index.remote; diff --git a/server/src/test/java/org/opensearch/common/util/MovingAverageTests.java b/server/src/test/java/org/opensearch/common/util/MovingAverageTests.java index 415058992e081..4f6b0ac4f7c88 100644 --- a/server/src/test/java/org/opensearch/common/util/MovingAverageTests.java +++ b/server/src/test/java/org/opensearch/common/util/MovingAverageTests.java @@ -46,4 +46,44 @@ public void testMovingAverageWithZeroSize() { fail("exception should have been thrown"); } + + public void testUpdateMovingAverageWindowSize() { + MovingAverage ma = new MovingAverage(5); + ma.record(1); + ma.record(2); + ma.record(3); + double avg = ma.getAverage(); + + // Test case 1 - Not ready and increasing size + MovingAverage newMa = ma.copyWithSize(10); + assertEquals(avg, newMa.getAverage(), 0.0d); + + // Test case 2 - Not ready and decreasing size + newMa = ma.copyWithSize(2); + assertEquals(2.5, newMa.getAverage(), 0.0d); + + // Test case 3 - Ready and increasing size + ma.record(4); + ma.record(5); + ma.record(6); + assertEquals(4.0, ma.getAverage(), 0.0d); + + newMa = ma.copyWithSize(10); + assertEquals(ma.getAverage(), newMa.getAverage(), 0.0d); + + // Test case 4 - Ready and decreasing size + newMa = ma.copyWithSize(3); + assertEquals(5.0, newMa.getAverage(), 0.0d); + + // Test case 5 - Ready, array overwritten and increasing size + for (int i = 7; i < 20; i++) { + ma.record(i); + } + newMa = ma.copyWithSize(6); + assertEquals(ma.getAverage(), newMa.getAverage(), 0.0d); + + // Test case 6 - Ready, array overwritten and decreasing size + newMa = ma.copyWithSize(3); + assertEquals(18.0, newMa.getAverage(), 0.0d); + } } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java new file mode 100644 index 0000000000000..c5a6c0323a6f9 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java @@ -0,0 +1,165 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteRefreshSegmentPressureServiceTests extends OpenSearchTestCase { + + private ClusterService clusterService; + + private ThreadPool threadPool; + + private ShardId shardId; + + private RemoteRefreshSegmentPressureService pressureService; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("remote_refresh_segment_pressure_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + shardId = new ShardId("index", "uuid", 0); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testIsSegmentsUploadBackpressureEnabled() { + pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + assertFalse(pressureService.isSegmentsUploadBackpressureEnabled()); + + Settings newSettings = Settings.builder() + .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), "true") + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + + assertTrue(pressureService.isSegmentsUploadBackpressureEnabled()); + } + + public void testAfterIndexShardCreatedForRemoteBackedIndex() { + IndexShard indexShard = createIndexShard(shardId, true); + pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService.afterIndexShardCreated(indexShard); + assertNotNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); + } + + public void testAfterIndexShardCreatedForNonRemoteBackedIndex() { + IndexShard indexShard = createIndexShard(shardId, false); + pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService.afterIndexShardCreated(indexShard); + assertNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); + } + + public void testAfterIndexShardClosed() { + IndexShard indexShard = createIndexShard(shardId, true); + pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService.afterIndexShardCreated(indexShard); + assertNotNull(pressureService.getRemoteRefreshSegmentTracker(shardId)); + + pressureService.afterIndexShardClosed(shardId, indexShard, indexShard.indexSettings().getSettings()); + assertNull(pressureService.getRemoteRefreshSegmentTracker(shardId)); + } + + public void testValidateSegmentUploadLag() { + // Create the pressure tracker + IndexShard indexShard = createIndexShard(shardId, true); + pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService.afterIndexShardCreated(indexShard); + + // 1. Seq no - add data points to the pressure tracker + RemoteRefreshSegmentTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(shardId); + pressureTracker.updateLocalRefreshSeqNo(6); + Exception e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); + assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); + assertTrue(e.getMessage().contains("remote_refresh_seq_no:0 local_refresh_seq_no:6")); + + // 2. time lag more than dynamic threshold + pressureTracker.updateRemoteRefreshSeqNo(3); + AtomicLong sum = new AtomicLong(); + IntStream.range(0, 20).forEach(i -> { + pressureTracker.addUploadTimeMs(i); + sum.addAndGet(i); + }); + double avg = (double) sum.get() / 20; + long currentMs = System.nanoTime() / 1_000_000; + pressureTracker.updateLocalRefreshTimeMs((long) (currentMs + 4 * avg)); + pressureTracker.updateRemoteRefreshTimeMs(currentMs); + e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); + assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); + assertTrue(e.getMessage().contains("time_lag:38 ms dynamic_time_lag_threshold:19.0 ms")); + + pressureTracker.updateRemoteRefreshTimeMs((long) (currentMs + 2 * avg)); + pressureService.validateSegmentsUploadLag(shardId); + + // 3. bytes lag more than dynamic threshold + sum.set(0); + IntStream.range(0, 20).forEach(i -> { + pressureTracker.addUploadBytes(i); + sum.addAndGet(i); + }); + avg = (double) sum.get() / 20; + Map nameSizeMap = new HashMap<>(); + nameSizeMap.put("a", (long) (4 * avg)); + pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); + e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); + assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); + assertTrue(e.getMessage().contains("bytes_lag:38 dynamic_bytes_lag_threshold:19.0")); + + nameSizeMap.put("a", (long) (2 * avg)); + pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); + pressureService.validateSegmentsUploadLag(shardId); + + // 4. Consecutive failures more than the limit + IntStream.range(0, 10).forEach(ignore -> pressureTracker.incrementTotalUploadsFailed()); + pressureService.validateSegmentsUploadLag(shardId); + pressureTracker.incrementTotalUploadsFailed(); + e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); + assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); + assertTrue(e.getMessage().contains("failure_streak_count:11 min_consecutive_failure_threshold:10")); + pressureTracker.incrementTotalUploadSucceeded(); + pressureService.validateSegmentsUploadLag(shardId); + } + + private static IndexShard createIndexShard(ShardId shardId, boolean remoteStoreEnabled) { + Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, String.valueOf(remoteStoreEnabled)).build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings); + IndexShard indexShard = mock(IndexShard.class); + when(indexShard.indexSettings()).thenReturn(indexSettings); + when(indexShard.shardId()).thenReturn(shardId); + return indexShard; + } + +} diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java new file mode 100644 index 0000000000000..66b5d6c4c19d8 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java @@ -0,0 +1,267 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +public class RemoteRefreshSegmentPressureSettingsTests extends OpenSearchTestCase { + + private ClusterService clusterService; + + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("remote_refresh_segment_pressure_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testGetDefaultSettings() { + RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + clusterService, + Settings.EMPTY, + mock(RemoteRefreshSegmentPressureService.class) + ); + + // Check remote refresh segment pressure enabled is false + assertFalse(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); + + // Check min sequence number lag limit default value + assertEquals(5L, pressureSettings.getMinRefreshSeqNoLagLimit()); + + // Check bytes lag variance threshold default value + assertEquals(2.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); + + // Check time lag variance threshold default value + assertEquals(2.0, pressureSettings.getUploadTimeLagVarianceFactor(), 0.0d); + + // Check minimum consecutive failures limit default value + assertEquals(10, pressureSettings.getMinConsecutiveFailuresLimit()); + + // Check upload bytes moving average window size default value + assertEquals(20, pressureSettings.getUploadBytesMovingAverageWindowSize()); + + // Check upload bytes per sec moving average window size default value + assertEquals(20, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); + + // Check upload time moving average window size default value + assertEquals(20, pressureSettings.getUploadTimeMovingAverageWindowSize()); + } + + public void testGetConfiguredSettings() { + Settings settings = Settings.builder() + .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100) + .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) + .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .build(); + RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + clusterService, + settings, + mock(RemoteRefreshSegmentPressureService.class) + ); + + // Check remote refresh segment pressure enabled is true + assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); + + // Check min sequence number lag limit configured value + assertEquals(100L, pressureSettings.getMinRefreshSeqNoLagLimit()); + + // Check bytes lag variance threshold configured value + assertEquals(50.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); + + // Check time lag variance threshold configured value + assertEquals(60.0, pressureSettings.getUploadTimeLagVarianceFactor(), 0.0d); + + // Check minimum consecutive failures limit configured value + assertEquals(121, pressureSettings.getMinConsecutiveFailuresLimit()); + + // Check upload bytes moving average window size configured value + assertEquals(102, pressureSettings.getUploadBytesMovingAverageWindowSize()); + + // Check upload bytes per sec moving average window size configured value + assertEquals(103, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); + + // Check upload time moving average window size configured value + assertEquals(104, pressureSettings.getUploadTimeMovingAverageWindowSize()); + } + + public void testUpdateAfterGetDefaultSettings() { + RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + clusterService, + Settings.EMPTY, + mock(RemoteRefreshSegmentPressureService.class) + ); + + Settings newSettings = Settings.builder() + .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100) + .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) + .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + + // Check updated remote refresh segment pressure enabled is false + assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); + + // Check min sequence number lag limit + assertEquals(100L, pressureSettings.getMinRefreshSeqNoLagLimit()); + + // Check bytes lag variance threshold updated + assertEquals(50.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); + + // Check time lag variance threshold updated + assertEquals(60.0, pressureSettings.getUploadTimeLagVarianceFactor(), 0.0d); + + // Check minimum consecutive failures limit updated + assertEquals(121, pressureSettings.getMinConsecutiveFailuresLimit()); + + // Check upload bytes moving average window size updated + assertEquals(102, pressureSettings.getUploadBytesMovingAverageWindowSize()); + + // Check upload bytes per sec moving average window size updated + assertEquals(103, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); + + // Check upload time moving average window size updated + assertEquals(104, pressureSettings.getUploadTimeMovingAverageWindowSize()); + } + + public void testUpdateAfterGetConfiguredSettings() { + Settings settings = Settings.builder() + .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100) + .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) + .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .build(); + RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + clusterService, + settings, + mock(RemoteRefreshSegmentPressureService.class) + ); + + Settings newSettings = Settings.builder() + .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 80) + .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 40.0) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 50.0) + .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 111) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 112) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 113) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 114) + .build(); + + clusterService.getClusterSettings().applySettings(newSettings); + + // Check updated remote refresh segment pressure enabled is true + assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); + + // Check min sequence number lag limit + assertEquals(80L, pressureSettings.getMinRefreshSeqNoLagLimit()); + + // Check bytes lag variance threshold updated + assertEquals(40.0, pressureSettings.getBytesLagVarianceFactor(), 0.0d); + + // Check time lag variance threshold updated + assertEquals(50.0, pressureSettings.getUploadTimeLagVarianceFactor(), 0.0d); + + // Check minimum consecutive failures limit updated + assertEquals(111, pressureSettings.getMinConsecutiveFailuresLimit()); + + // Check upload bytes moving average window size updated + assertEquals(112, pressureSettings.getUploadBytesMovingAverageWindowSize()); + + // Check upload bytes per sec moving average window size updated + assertEquals(113, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); + + // Check upload time moving average window size updated + assertEquals(114, pressureSettings.getUploadTimeMovingAverageWindowSize()); + } + + public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() { + + int toUpdateVal1 = 1121, toUpdateVal2 = 1123, toUpdateVal3 = 1125; + + AtomicInteger updatedUploadBytesWindowSize = new AtomicInteger(); + AtomicInteger updatedUploadBytesPerSecWindowSize = new AtomicInteger(); + AtomicInteger updatedUploadTimeWindowSize = new AtomicInteger(); + + RemoteRefreshSegmentPressureService pressureService = mock(RemoteRefreshSegmentPressureService.class); + + // Upload bytes + doAnswer(invocation -> { + updatedUploadBytesWindowSize.set(invocation.getArgument(0)); + return null; + }).when(pressureService).updateUploadBytesMovingAverageWindowSize(anyInt()); + + // Upload bytes per sec + doAnswer(invocation -> { + updatedUploadBytesPerSecWindowSize.set(invocation.getArgument(0)); + return null; + }).when(pressureService).updateUploadBytesPerSecMovingAverageWindowSize(anyInt()); + + // Upload time + doAnswer(invocation -> { + updatedUploadTimeWindowSize.set(invocation.getArgument(0)); + return null; + }).when(pressureService).updateUploadTimeMsMovingAverageWindowSize(anyInt()); + + RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + clusterService, + Settings.EMPTY, + pressureService + ); + Settings newSettings = Settings.builder() + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal1) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal2) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal3) + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + + // Assertions + assertEquals(toUpdateVal1, pressureSettings.getUploadBytesMovingAverageWindowSize()); + assertEquals(toUpdateVal1, updatedUploadBytesWindowSize.get()); + assertEquals(toUpdateVal2, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); + assertEquals(toUpdateVal2, updatedUploadBytesPerSecWindowSize.get()); + assertEquals(toUpdateVal3, pressureSettings.getUploadTimeMovingAverageWindowSize()); + assertEquals(toUpdateVal3, updatedUploadTimeWindowSize.get()); + } +} diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java new file mode 100644 index 0000000000000..48bc28e3a497d --- /dev/null +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java @@ -0,0 +1,404 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; + +public class RemoteRefreshSegmentTrackerTests extends OpenSearchTestCase { + + private RemoteRefreshSegmentPressureSettings pressureSettings; + + private ClusterService clusterService; + + private ThreadPool threadPool; + + private ShardId shardId; + + private RemoteRefreshSegmentTracker pressureTracker; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("remote_refresh_segment_pressure_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + pressureSettings = new RemoteRefreshSegmentPressureSettings( + clusterService, + Settings.EMPTY, + mock(RemoteRefreshSegmentPressureService.class) + ); + shardId = new ShardId("index", "uuid", 0); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testGetShardId() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + assertEquals(shardId, pressureTracker.getShardId()); + } + + public void testUpdateLocalRefreshSeqNo() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long refreshSeqNo = 2; + pressureTracker.updateLocalRefreshSeqNo(refreshSeqNo); + assertEquals(refreshSeqNo, pressureTracker.getLocalRefreshSeqNo()); + } + + public void testUpdateRemoteRefreshSeqNo() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long refreshSeqNo = 4; + pressureTracker.updateRemoteRefreshSeqNo(refreshSeqNo); + assertEquals(refreshSeqNo, pressureTracker.getRemoteRefreshSeqNo()); + } + + public void testUpdateLocalRefreshTimeMs() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long refreshTimeMs = System.nanoTime() / 1_000_000L + randomIntBetween(10, 100); + pressureTracker.updateLocalRefreshTimeMs(refreshTimeMs); + assertEquals(refreshTimeMs, pressureTracker.getLocalRefreshTimeMs()); + } + + public void testUpdateRemoteRefreshTimeMs() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long refreshTimeMs = System.nanoTime() / 1_000_000 + randomIntBetween(10, 100); + pressureTracker.updateRemoteRefreshTimeMs(refreshTimeMs); + assertEquals(refreshTimeMs, pressureTracker.getRemoteRefreshTimeMs()); + } + + public void testComputeSeqNoLagOnUpdate() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + int localRefreshSeqNo = randomIntBetween(50, 100); + int remoteRefreshSeqNo = randomIntBetween(20, 50); + pressureTracker.updateLocalRefreshSeqNo(localRefreshSeqNo); + assertEquals(localRefreshSeqNo, pressureTracker.getRefreshSeqNoLag()); + pressureTracker.updateRemoteRefreshSeqNo(remoteRefreshSeqNo); + assertEquals(localRefreshSeqNo - remoteRefreshSeqNo, pressureTracker.getRefreshSeqNoLag()); + } + + public void testComputeTimeLagOnUpdate() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long currentLocalRefreshTimeMs = pressureTracker.getLocalRefreshTimeMs(); + long currentTimeMs = System.nanoTime() / 1_000_000L; + long localRefreshTimeMs = currentTimeMs + randomIntBetween(100, 500); + long remoteRefreshTimeMs = currentTimeMs + randomIntBetween(50, 99); + pressureTracker.updateLocalRefreshTimeMs(localRefreshTimeMs); + assertEquals(localRefreshTimeMs - currentLocalRefreshTimeMs, pressureTracker.getTimeMsLag()); + pressureTracker.updateRemoteRefreshTimeMs(remoteRefreshTimeMs); + assertEquals(localRefreshTimeMs - remoteRefreshTimeMs, pressureTracker.getTimeMsLag()); + } + + public void testAddUploadBytesStarted() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long bytesToAdd = randomLongBetween(1000, 1000000); + pressureTracker.addUploadBytesStarted(bytesToAdd); + assertEquals(bytesToAdd, pressureTracker.getUploadBytesStarted()); + long moreBytesToAdd = randomLongBetween(1000, 10000); + pressureTracker.addUploadBytesStarted(moreBytesToAdd); + assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getUploadBytesStarted()); + } + + public void testAddUploadBytesFailed() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long bytesToAdd = randomLongBetween(1000, 1000000); + pressureTracker.addUploadBytesFailed(bytesToAdd); + assertEquals(bytesToAdd, pressureTracker.getUploadBytesFailed()); + long moreBytesToAdd = randomLongBetween(1000, 10000); + pressureTracker.addUploadBytesFailed(moreBytesToAdd); + assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getUploadBytesFailed()); + } + + public void testAddUploadBytesSucceeded() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long bytesToAdd = randomLongBetween(1000, 1000000); + pressureTracker.addUploadBytesSucceeded(bytesToAdd); + assertEquals(bytesToAdd, pressureTracker.getUploadBytesSucceeded()); + long moreBytesToAdd = randomLongBetween(1000, 10000); + pressureTracker.addUploadBytesSucceeded(moreBytesToAdd); + assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getUploadBytesSucceeded()); + } + + public void testGetInflightUploadBytes() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long bytesStarted = randomLongBetween(10000, 100000); + long bytesSucceeded = randomLongBetween(1000, 10000); + long bytesFailed = randomLongBetween(100, 1000); + pressureTracker.addUploadBytesStarted(bytesStarted); + pressureTracker.addUploadBytesSucceeded(bytesSucceeded); + pressureTracker.addUploadBytesFailed(bytesFailed); + assertEquals(bytesStarted - bytesSucceeded - bytesFailed, pressureTracker.getInflightUploadBytes()); + } + + public void testIncrementTotalUploadsStarted() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + pressureTracker.incrementTotalUploadsStarted(); + assertEquals(1, pressureTracker.getTotalUploadsStarted()); + pressureTracker.incrementTotalUploadsStarted(); + assertEquals(2, pressureTracker.getTotalUploadsStarted()); + } + + public void testIncrementTotalUploadsFailed() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + pressureTracker.incrementTotalUploadsFailed(); + assertEquals(1, pressureTracker.getTotalUploadsFailed()); + pressureTracker.incrementTotalUploadsFailed(); + assertEquals(2, pressureTracker.getTotalUploadsFailed()); + } + + public void testIncrementTotalUploadSucceeded() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + pressureTracker.incrementTotalUploadSucceeded(); + assertEquals(1, pressureTracker.getTotalUploadsSucceeded()); + pressureTracker.incrementTotalUploadSucceeded(); + assertEquals(2, pressureTracker.getTotalUploadsSucceeded()); + } + + public void testGetInflightUploads() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + pressureTracker.incrementTotalUploadsStarted(); + assertEquals(1, pressureTracker.getInflightUploads()); + pressureTracker.incrementTotalUploadsStarted(); + assertEquals(2, pressureTracker.getInflightUploads()); + pressureTracker.incrementTotalUploadSucceeded(); + assertEquals(1, pressureTracker.getInflightUploads()); + pressureTracker.incrementTotalUploadsFailed(); + assertEquals(0, pressureTracker.getInflightUploads()); + } + + public void testIncrementRejectionCount() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + pressureTracker.incrementRejectionCount(); + assertEquals(1, pressureTracker.getRejectionCount()); + pressureTracker.incrementRejectionCount(); + assertEquals(2, pressureTracker.getRejectionCount()); + } + + public void testGetConsecutiveFailureCount() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + pressureTracker.incrementTotalUploadsFailed(); + assertEquals(1, pressureTracker.getConsecutiveFailureCount()); + pressureTracker.incrementTotalUploadsFailed(); + assertEquals(2, pressureTracker.getConsecutiveFailureCount()); + pressureTracker.incrementTotalUploadSucceeded(); + assertEquals(0, pressureTracker.getConsecutiveFailureCount()); + } + + public void testComputeBytesLag() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + + // Create local file size map + Map fileSizeMap = new HashMap<>(); + fileSizeMap.put("a", 100L); + fileSizeMap.put("b", 105L); + pressureTracker.setLatestLocalFileNameLengthMap(fileSizeMap); + assertEquals(205L, pressureTracker.getBytesLag()); + + pressureTracker.addToLatestUploadFiles("a"); + assertEquals(105L, pressureTracker.getBytesLag()); + + fileSizeMap.put("c", 115L); + pressureTracker.setLatestLocalFileNameLengthMap(fileSizeMap); + assertEquals(220L, pressureTracker.getBytesLag()); + + pressureTracker.addToLatestUploadFiles("b"); + assertEquals(115L, pressureTracker.getBytesLag()); + + pressureTracker.addToLatestUploadFiles("c"); + assertEquals(0L, pressureTracker.getBytesLag()); + } + + public void testIsUploadBytesAverageReady() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + assertFalse(pressureTracker.isUploadBytesAverageReady()); + + long sum = 0; + for (int i = 1; i < 20; i++) { + pressureTracker.addUploadBytes(i); + sum += i; + assertFalse(pressureTracker.isUploadBytesAverageReady()); + assertEquals((double) sum / i, pressureTracker.getUploadBytesAverage(), 0.0d); + } + + pressureTracker.addUploadBytes(20); + sum += 20; + assertTrue(pressureTracker.isUploadBytesAverageReady()); + assertEquals((double) sum / 20, pressureTracker.getUploadBytesAverage(), 0.0d); + + pressureTracker.addUploadBytes(100); + sum = sum + 100 - 1; + assertEquals((double) sum / 20, pressureTracker.getUploadBytesAverage(), 0.0d); + } + + public void testIsUploadBytesPerSecAverageReady() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + assertFalse(pressureTracker.isUploadBytesPerSecAverageReady()); + + long sum = 0; + for (int i = 1; i < 20; i++) { + pressureTracker.addUploadBytesPerSec(i); + sum += i; + assertFalse(pressureTracker.isUploadBytesPerSecAverageReady()); + assertEquals((double) sum / i, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); + } + + pressureTracker.addUploadBytesPerSec(20); + sum += 20; + assertTrue(pressureTracker.isUploadBytesPerSecAverageReady()); + assertEquals((double) sum / 20, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); + + pressureTracker.addUploadBytesPerSec(100); + sum = sum + 100 - 1; + assertEquals((double) sum / 20, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); + } + + public void testIsUploadTimeMsAverageReady() { + pressureTracker = new RemoteRefreshSegmentTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + assertFalse(pressureTracker.isUploadTimeMsAverageReady()); + + long sum = 0; + for (int i = 1; i < 20; i++) { + pressureTracker.addUploadTimeMs(i); + sum += i; + assertFalse(pressureTracker.isUploadTimeMsAverageReady()); + assertEquals((double) sum / i, pressureTracker.getUploadTimeMsAverage(), 0.0d); + } + + pressureTracker.addUploadTimeMs(20); + sum += 20; + assertTrue(pressureTracker.isUploadTimeMsAverageReady()); + assertEquals((double) sum / 20, pressureTracker.getUploadTimeMsAverage(), 0.0d); + + pressureTracker.addUploadTimeMs(100); + sum = sum + 100 - 1; + assertEquals((double) sum / 20, pressureTracker.getUploadTimeMsAverage(), 0.0d); + } + +}