Skip to content

Commit

Permalink
Add Shard Indexing Pressure Store (#478) (#838)
Browse files Browse the repository at this point in the history
* Add Shard Indexing Pressure Store (#478)

Signed-off-by: Saurabh Singh <sisurab@amazon.com>

* Added comments and shard allocation based on compute in hot store.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>

Co-authored-by: Saurabh Singh <sisurab@amazon.com>
  • Loading branch information
2 people authored and adnapibar committed Sep 15, 2021
1 parent 7f37c55 commit 19cc0eb
Show file tree
Hide file tree
Showing 3 changed files with 323 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
Expand Down Expand Up @@ -580,7 +582,12 @@ public void apply(Settings value, Settings current, Settings previous) {
FsHealthService.REFRESH_INTERVAL_SETTING,
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
TransportMainAction.OVERRIDE_MAIN_RESPONSE_VERSION,
IndexingPressure.MAX_INDEXING_BYTES)));
IndexingPressure.MAX_INDEXING_BYTES,
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED,
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED,
ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW,
ShardIndexingPressureSettings.SHARD_MIN_LIMIT,
ShardIndexingPressureStore.MAX_COLD_STORE_SIZE)));

public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.index;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.shard.ShardId;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BooleanSupplier;

import static java.util.Objects.isNull;

/**
* Shard indexing pressure store acts as a central repository for all the shard-level tracker objects currently being
* used at the Node level, for tracking indexing pressure requests.
* Store manages the tracker lifecycle, from creation, access, until it is evicted to be collected.
*
* Trackers are maintained at two levels for access simplicity and better memory management:
*
* 1. shardIndexingPressureHotStore : As the name suggests, it is hot store for tracker objects which are currently live i.e. being used
* to track an ongoing request.
*
* 2. shardIndexingPressureColdStore : This acts as the store for all the shard tracking objects which are currently being used
* by the framework. In addition to hot trackers, the recently used trackers which are although not currently live, but again can be used
* in near future, are also part of this store. To limit any memory implications, this store has an upper limit on the maximum number of
* trackers its can hold at any given time, which is a configurable dynamic setting.
*
* Tracking objects when created are part of both the hot store as well as cold store. However, once the object
* is no more live it is removed from the hot store. Objects in the cold store are evicted once the cold store
* reaches its maximum limit. Think of it like a periodic purge when upper limit is hit.
* During get if tracking object is not present in the hot store, a lookup is made into the cache store. If found,
* object is brought into the hot store again, until it remains active. If not present in the either store, a fresh
* object is instantiated and registered in both the stores for concurrent accesses.
*
* Note: The implementation of shardIndexingPressureColdStore methods is such that get,
* update and evict operations can be abstracted out to support any other strategy such as LRU, if
* discovered a need later.
*
*/
public class ShardIndexingPressureStore {

// This represents the maximum value for the cold store size.
public static final Setting<Integer> MAX_COLD_STORE_SIZE =
Setting.intSetting("shard_indexing_pressure.cache_store.max_size", 200, 100, 1000,
Setting.Property.NodeScope, Setting.Property.Dynamic);

private final Map<ShardId, ShardIndexingPressureTracker> shardIndexingPressureHotStore =
ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private final Map<ShardId, ShardIndexingPressureTracker> shardIndexingPressureColdStore = new HashMap<>();
private final ShardIndexingPressureSettings shardIndexingPressureSettings;

private volatile int maxColdStoreSize;

public ShardIndexingPressureStore(ShardIndexingPressureSettings shardIndexingPressureSettings,
ClusterSettings clusterSettings, Settings settings) {
this.shardIndexingPressureSettings = shardIndexingPressureSettings;
this.maxColdStoreSize = MAX_COLD_STORE_SIZE.get(settings).intValue();
clusterSettings.addSettingsUpdateConsumer(MAX_COLD_STORE_SIZE, this::setMaxColdStoreSize);
}

public ShardIndexingPressureTracker getShardIndexingPressureTracker(ShardId shardId) {
ShardIndexingPressureTracker tracker = shardIndexingPressureHotStore.get(shardId);
if (isNull(tracker)) {
// Attempt from Indexing pressure cold store
tracker = shardIndexingPressureColdStore.get(shardId);
// If not already present in cold store instantiate a new one
if (isNull(tracker)) {
tracker = shardIndexingPressureHotStore.computeIfAbsent(shardId, (k) ->
new ShardIndexingPressureTracker(shardId,
this.shardIndexingPressureSettings.getShardPrimaryAndCoordinatingBaseLimits(),
this.shardIndexingPressureSettings.getShardReplicaBaseLimits())
);
// Write through into the cold store for future reference
updateShardIndexingPressureColdStore(tracker);
} else {
// Attempt update tracker to the hot store and return the tracker which finally made to the hot store to avoid any race
ShardIndexingPressureTracker newTracker = shardIndexingPressureHotStore.putIfAbsent(shardId, tracker);
tracker = newTracker == null ? tracker : newTracker;
}
}
return tracker;
}

public Map<ShardId, ShardIndexingPressureTracker> getShardIndexingPressureHotStore() {
return Collections.unmodifiableMap(shardIndexingPressureHotStore);
}

public Map<ShardId, ShardIndexingPressureTracker> getShardIndexingPressureColdStore() {
return Collections.unmodifiableMap(shardIndexingPressureColdStore);
}

public void tryTrackerCleanupFromHotStore(ShardIndexingPressureTracker tracker, BooleanSupplier condition) {
if (condition.getAsBoolean()) {
// Try inserting into cold store again in case there was an eviction triggered
shardIndexingPressureColdStore.putIfAbsent(tracker.getShardId(), tracker);
// Remove from the hot store
shardIndexingPressureHotStore.remove(tracker.getShardId(), tracker);
}
}

/**
* This is used to update the reference of tracker in cold store, to be re-used later of tracker is removed from hot store upon request
* completion. When the cold store size reaches maximum, all the tracker objects in cold store are flushed. Flush is a less frequent
* (periodic) operation, can be sized based on workload. It is okay to not to synchronize counters being flushed, as
* objects in the cold store are only empty references, and can be re-initialized if needed.
*/
private void updateShardIndexingPressureColdStore(ShardIndexingPressureTracker tracker) {
if (shardIndexingPressureColdStore.size() > maxColdStoreSize) {
shardIndexingPressureColdStore.clear();
}
shardIndexingPressureColdStore.put(tracker.getShardId(), tracker);
}

private void setMaxColdStoreSize(int maxColdStoreSize) {
this.maxColdStoreSize = maxColdStoreSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.junit.Before;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.ShardId;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Map;

public class ShardIndexingPressureStoreTests extends OpenSearchTestCase {

private final Settings settings = Settings.builder()
.put(ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.getKey(), 200)
.build();
private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private final ShardIndexingPressureSettings shardIndexingPressureSettings =
new ShardIndexingPressureSettings(new ClusterService(settings, clusterSettings, null), settings,
IndexingPressure.MAX_INDEXING_BYTES.get(settings).getBytes());
private ShardIndexingPressureStore store;
private ShardId testShardId;

@Before
public void beforeTest() {
store = new ShardIndexingPressureStore(shardIndexingPressureSettings, clusterSettings, settings);
testShardId = new ShardId("index", "uuid", 0);
}

public void testShardIndexingPressureStoreGet() {
ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(testShardId);
ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(testShardId);
assertEquals(tracker1, tracker2);
}

public void testGetVerifyTrackerInHotStore() {
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId);

Map<ShardId, ShardIndexingPressureTracker> hostStoreTrackers = store.getShardIndexingPressureHotStore();
assertEquals(1, hostStoreTrackers.size());
ShardIndexingPressureTracker hotStoreTracker = hostStoreTrackers.get(testShardId);
assertEquals(tracker, hotStoreTracker);
}

public void testTrackerCleanupFromHotStore() {
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId);
Map<ShardId, ShardIndexingPressureTracker> hostStoreTrackers = store.getShardIndexingPressureHotStore();
assertEquals(1, hostStoreTrackers.size());

store.tryTrackerCleanupFromHotStore(tracker, () -> true);

hostStoreTrackers = store.getShardIndexingPressureHotStore();
assertEquals(0, hostStoreTrackers.size());

Map<ShardId, ShardIndexingPressureTracker> coldStoreTrackers = store.getShardIndexingPressureColdStore();
assertEquals(1, coldStoreTrackers.size());
ShardIndexingPressureTracker coldStoreTracker = coldStoreTrackers.get(testShardId);
assertEquals(tracker, coldStoreTracker);
}

public void testTrackerCleanupSkippedFromHotStore() {
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId);
Map<ShardId, ShardIndexingPressureTracker> hostStoreTrackers = store.getShardIndexingPressureHotStore();
assertEquals(1, hostStoreTrackers.size());

store.tryTrackerCleanupFromHotStore(tracker, () -> false);

hostStoreTrackers = store.getShardIndexingPressureHotStore();
assertEquals(1, hostStoreTrackers.size());
ShardIndexingPressureTracker coldStoreTracker = hostStoreTrackers.get(testShardId);
assertEquals(tracker, coldStoreTracker);
}

public void testTrackerRestoredToHotStorePostCleanup() {
ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(testShardId);
Map<ShardId, ShardIndexingPressureTracker> hostStoreTrackers = store.getShardIndexingPressureHotStore();
assertEquals(1, hostStoreTrackers.size());

store.tryTrackerCleanupFromHotStore(tracker1, () -> true);

hostStoreTrackers = store.getShardIndexingPressureHotStore();
assertEquals(0, hostStoreTrackers.size());

ShardIndexingPressureTracker tracker2 = store.getShardIndexingPressureTracker(testShardId);
hostStoreTrackers = store.getShardIndexingPressureHotStore();
assertEquals(1, hostStoreTrackers.size());
assertEquals(tracker1, tracker2);
}

public void testTrackerEvictedFromColdStore() {
for (int i = 0; i <= ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.get(settings); i++) {
ShardId shardId = new ShardId("index", "uuid", i);
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId);
store.tryTrackerCleanupFromHotStore(tracker, () -> true);
assertEquals(i + 1, store.getShardIndexingPressureColdStore().size());
}

// Verify cold store size is maximum
assertEquals(ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.get(settings) + 1,
store.getShardIndexingPressureColdStore().size());

// get and remove one more tracker object from hot store
ShardId shardId = new ShardId("index", "uuid",
ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.get(settings) + 1);
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId);
store.tryTrackerCleanupFromHotStore(tracker, () -> true);

// Verify all trackers objects purged from cold store except the last
assertEquals(1, store.getShardIndexingPressureColdStore().size());
assertEquals(tracker, store.getShardIndexingPressureColdStore().get(shardId));
}

public void testShardIndexingPressureStoreConcurrentGet() throws Exception {
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId);
final int NUM_THREADS = scaledRandomIntBetween(100, 500);
final Thread[] threads = new Thread[NUM_THREADS];
final ShardIndexingPressureTracker[] trackers = new ShardIndexingPressureTracker[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
int counter = i;
threads[i] = new Thread(() -> {
trackers[counter] = store.getShardIndexingPressureTracker(testShardId);
});
threads[i].start();
}

for (Thread t : threads) {
t.join();
}

for (int i = 0; i < NUM_THREADS; i++) {
assertEquals(tracker, trackers[i]);
}
assertEquals(1, store.getShardIndexingPressureHotStore().size());
assertEquals(1, store.getShardIndexingPressureColdStore().size());
assertEquals(tracker, store.getShardIndexingPressureHotStore().get(testShardId));
assertEquals(tracker, store.getShardIndexingPressureColdStore().get(testShardId));
}

public void testShardIndexingPressureStoreConcurrentGetAndCleanup() throws Exception {
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(testShardId);
final int NUM_THREADS = scaledRandomIntBetween(100, 500);
final Thread[] threads = new Thread[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
threads[i] = new Thread(() -> {
ShardIndexingPressureTracker tracker1 = store.getShardIndexingPressureTracker(testShardId);
assertEquals(tracker, tracker1);
store.tryTrackerCleanupFromHotStore(tracker, () -> true);
});
threads[i].start();
}

for (Thread t : threads) {
t.join();
}

assertEquals(0, store.getShardIndexingPressureHotStore().size());
assertEquals(1, store.getShardIndexingPressureColdStore().size());
assertEquals(tracker, store.getShardIndexingPressureColdStore().get(testShardId));
}

public void testTrackerConcurrentEvictionFromColdStore() throws Exception {
int maxColdStoreSize = ShardIndexingPressureStore.MAX_COLD_STORE_SIZE.get(settings);
final int NUM_THREADS = scaledRandomIntBetween(maxColdStoreSize * 2, maxColdStoreSize * 8);
final Thread[] threads = new Thread[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
int counter = i;
threads[i] = new Thread(() -> {
ShardId shardId = new ShardId("index", "uuid", counter);
ShardIndexingPressureTracker tracker = store.getShardIndexingPressureTracker(shardId);
store.tryTrackerCleanupFromHotStore(tracker, () -> true);
});
threads[i].start();
}

for (Thread t : threads) {
t.join();
}

assertEquals(0, store.getShardIndexingPressureHotStore().size());
assertTrue(store.getShardIndexingPressureColdStore().size() <= maxColdStoreSize + 1);
}
}

0 comments on commit 19cc0eb

Please sign in to comment.