Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.action.admin.indices.mapping.get;

import org.opensearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.client.Client;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 1, scope = OpenSearchIntegTestCase.Scope.TEST)
public class GetMappingsBreakerConcurrencyIT extends OpenSearchIntegTestCase {

// For this suite: allow a single request, but keep the parent breaker tight so many concurrent in-flight requests trip it.
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// Keep request breaker above a single mapping response size
.put("indices.breaker.request.limit", "512kb")
// Tight parent breaker so concurrent requests can exceed it
.put("indices.breaker.total.limit", "768kb")
.build();
}

public void testSingleGetMappingsDoesNotTrip() throws Exception {
final String index = "big-mappings-ok";

// ~256KB of low-compressibility pad under _meta to inflate the response materially,
// but still below the 512kb request breaker.
final String pad = randomBase64(256 * 1024);

final XContentBuilder mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject("_meta")
.field("pad", pad)
.endObject()
.startObject("properties")
.startObject("title")
.field("type", "text")
.endObject()
.startObject("year")
.field("type", "integer")
.endObject()
.endObject()
.endObject();

final Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();

final Client client = client();

CreateIndexRequestBuilder create = client.admin().indices().prepareCreate(index).setSettings(indexSettings).setMapping(mapping);
assertAcked(create.get());
ensureGreen(index);

// Should not trip.
client.admin().indices().getMappings(new GetMappingsRequest().indices(index)).actionGet();
}

public void testManyConcurrentGetMappingsTripParentBreaker() throws Exception {
final String index = "big-mappings-concurrent";

// Each request fits under request breaker, but a burst of them should exceed the 768kb parent limit.
final String pad = randomBase64(256 * 1024);

final XContentBuilder mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject("_meta")
.field("pad", pad)
.endObject()
.startObject("properties")
.startObject("title")
.field("type", "text")
.endObject()
.startObject("year")
.field("type", "integer")
.endObject()
.endObject()
.endObject();

final Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();

final Client client = client();

CreateIndexRequestBuilder create = client.admin().indices().prepareCreate(index).setSettings(indexSettings).setMapping(mapping);
assertAcked(create.get());
ensureGreen(index);

final int threads = 8; // small node, but enough to blow past ~768kb with in-flight results
final CountDownLatch ready = new CountDownLatch(threads);
final CountDownLatch startGun = new CountDownLatch(1);
final List<Thread> workers = new ArrayList<>(threads);
final AtomicReference<CircuitBreakingException> anyCBE = new AtomicReference<>();

for (int i = 0; i < threads; i++) {
final Thread t = new Thread(() -> {
try {
ready.countDown();
startGun.await();
// Fire several requests per thread in quick succession to increase overlap
for (int j = 0; j < 3; j++) {
try {
client.admin().indices().getMappings(new GetMappingsRequest().indices(index)).actionGet();
} catch (CircuitBreakingException cbe) {
anyCBE.compareAndSet(null, cbe);
// Stop this worker early after a breaker trip to reduce noise
break;
}
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
});
workers.add(t);
t.start();
}

// synchronize the start so requests overlap
ready.await();
startGun.countDown();

for (Thread t : workers) {
t.join();
}

final CircuitBreakingException cbe = anyCBE.get();
assertNotNull("Expected at least one GET _mappings to trip the parent breaker under concurrent load", cbe);
assertThat(cbe.getMessage(), containsString("Data too large"));
}

private static String randomBase64(int approxBytes) {
byte[] buf = new byte[approxBytes];
new SecureRandom().nextBytes(buf);
return Base64.getEncoder().encodeToString(buf);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.action.admin.indices.mapping.get;

import org.opensearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.client.Client;

import java.security.SecureRandom;
import java.util.Base64;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 1, scope = OpenSearchIntegTestCase.Scope.TEST)
public class GetMappingsBreakerIT extends OpenSearchIntegTestCase {

// Keep the limit very small so the mapping easily exceeds it.
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// Request breaker (used by TransportGetMappingsAction)
.put("indices.breaker.request.limit", "128kb")
// keep parent breaker high so it doesn't interfere
.put("indices.breaker.total.limit", "2gb")
.build();
}

public void testGetMappingsTripsRequestBreaker() throws Exception {
final String index = "big-mappings";

// Build a mapping with low-compressibility padding under _meta.
// Using ~256KB of random base64.
final String pad = randomBase64(256 * 1024);

final XContentBuilder mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject("_meta")
.field("pad", pad)
.endObject()
.startObject("properties")
.startObject("title")
.field("type", "text")
.endObject()
.startObject("year")
.field("type", "integer")
.endObject()
.endObject()
.endObject();

final Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();

final Client client = client();

// Create index with the mapping containing the big _meta pad
CreateIndexRequestBuilder create = client.admin().indices().prepareCreate(index).setSettings(indexSettings).setMapping(mapping);
assertAcked(create.get());

ensureGreen(index);

// Now call GET _mappings and expect the REQUEST breaker to trip.
final GetMappingsRequest req = new GetMappingsRequest().indices(index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write a test firing multiple GET mappings call to ensure that operation execution is parallel and triggering the CB

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new IT suite that runs requests concurrently. Each individual request is below the CB limit, but in aggregate it shows that it trips the parent CB.


CircuitBreakingException cbe = expectThrows(
CircuitBreakingException.class,
() -> client.admin().indices().getMappings(req).actionGet()
);

assertThat(cbe.getMessage(), containsString("Data too large"));
}

private static String randomBase64(int approxBytes) {
byte[] buf = new byte[approxBytes];
new SecureRandom().nextBytes(buf);
return Base64.getEncoder().encodeToString(buf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.info.TransportClusterInfoAction;
import org.opensearch.cluster.ClusterState;
Expand All @@ -42,7 +43,9 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.IndicesService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand All @@ -61,14 +64,17 @@ public class TransportGetMappingsAction extends TransportClusterInfoAction<GetMa

private final IndicesService indicesService;

private final CircuitBreaker circuitBreaker;

@Inject
public TransportGetMappingsAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService
IndicesService indicesService,
CircuitBreakerService circuitBreakerService
) {
super(
GetMappingsAction.NAME,
Expand All @@ -80,6 +86,7 @@ public TransportGetMappingsAction(
indexNameExpressionResolver
);
this.indicesService = indicesService;
this.circuitBreaker = circuitBreakerService.getBreaker(CircuitBreaker.REQUEST);
}

@Override
Expand All @@ -95,11 +102,38 @@ protected void doClusterManagerOperation(
final ActionListener<GetMappingsResponse> listener
) {
logger.trace("serving getMapping request based on version {}", state.version());

// Estimate memory usage for the mappings response
long estimatedBytes = estimateMappingsMemoryUsage(state, concreteIndices);

boolean charged = false;
try {
circuitBreaker.addEstimateBytesAndMaybeBreak(estimatedBytes, "get_mappings");
charged = true;

final Map<String, MappingMetadata> result = state.metadata().findMappings(concreteIndices, indicesService.getFieldFilter());

listener.onResponse(new GetMappingsResponse(result));
} catch (IOException e) {
} catch (Exception e) {
listener.onFailure(e);
} finally {
if (charged && estimatedBytes != 0L) {
circuitBreaker.addWithoutBreaking(-estimatedBytes);
}
}
}

private long estimateMappingsMemoryUsage(ClusterState state, String[] concreteIndices) {
long totalSize = 0;
for (String index : concreteIndices) {
MappingMetadata mm = state.metadata().index(index).mapping();
if (mm != null) {
final byte[] compressed = mm.source().compressed();
if (compressed != null) {
totalSize += RamUsageEstimator.sizeOf(compressed);
}
}
}
return totalSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
import org.opensearch.common.settings.SettingsModule;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.indices.IndicesService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
Expand All @@ -73,6 +76,7 @@
import static org.opensearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class GetMappingsActionTests extends OpenSearchTestCase {
private TransportService transportService;
Expand Down Expand Up @@ -120,6 +124,10 @@ public void setUp() throws Exception {
Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE),
Version.CURRENT
);
final CircuitBreakerService breakerService = mock(CircuitBreakerService.class);
final CircuitBreaker noopBreaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST);
when(breakerService.getBreaker(CircuitBreaker.REQUEST)).thenReturn(noopBreaker);

allNodes = new DiscoveryNode[] { localNode, remoteNode };
setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));
transportAction = new TransportGetMappingsAction(
Expand All @@ -128,7 +136,8 @@ public void setUp() throws Exception {
GetMappingsActionTests.this.threadPool,
new ActionFilters(emptySet()),
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
mock(IndicesService.class)
mock(IndicesService.class),
breakerService
);

}
Expand Down
Loading
Loading