Skip to content

Commit 6b861b2

Browse files
committed
Add test with concurrent requests
Signed-off-by: Craig Perkins <cwperx@amazon.com>
1 parent c7443c6 commit 6b861b2

File tree

1 file changed

+160
-0
lines changed

1 file changed

+160
-0
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.action.admin.indices.mapping.get;
9+
10+
import org.opensearch.action.admin.indices.create.CreateIndexRequestBuilder;
11+
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.common.xcontent.XContentFactory;
14+
import org.opensearch.core.common.breaker.CircuitBreakingException;
15+
import org.opensearch.core.xcontent.XContentBuilder;
16+
import org.opensearch.test.OpenSearchIntegTestCase;
17+
import org.opensearch.transport.client.Client;
18+
19+
import java.security.SecureRandom;
20+
import java.util.ArrayList;
21+
import java.util.Base64;
22+
import java.util.List;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
26+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
27+
import static org.hamcrest.Matchers.containsString;
28+
29+
@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 1, scope = OpenSearchIntegTestCase.Scope.TEST)
30+
public class GetMappingsBreakerConcurrencyIT extends OpenSearchIntegTestCase {
31+
32+
// For this suite: allow a single request, but keep the parent breaker tight so many concurrent in-flight requests trip it.
33+
@Override
34+
protected Settings nodeSettings(int nodeOrdinal) {
35+
return Settings.builder()
36+
.put(super.nodeSettings(nodeOrdinal))
37+
// Keep request breaker above a single mapping response size
38+
.put("indices.breaker.request.limit", "512kb")
39+
// Tight parent breaker so concurrent requests can exceed it
40+
.put("indices.breaker.total.limit", "768kb")
41+
.build();
42+
}
43+
44+
public void testSingleGetMappingsDoesNotTrip() throws Exception {
45+
final String index = "big-mappings-ok";
46+
47+
// ~256KB of low-compressibility pad under _meta to inflate the response materially,
48+
// but still below the 512kb request breaker.
49+
final String pad = randomBase64(256 * 1024);
50+
51+
final XContentBuilder mapping = XContentFactory.jsonBuilder()
52+
.startObject()
53+
.startObject("_meta")
54+
.field("pad", pad)
55+
.endObject()
56+
.startObject("properties")
57+
.startObject("title")
58+
.field("type", "text")
59+
.endObject()
60+
.startObject("year")
61+
.field("type", "integer")
62+
.endObject()
63+
.endObject()
64+
.endObject();
65+
66+
final Settings indexSettings = Settings.builder()
67+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
68+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
69+
.build();
70+
71+
final Client client = client();
72+
73+
CreateIndexRequestBuilder create = client.admin().indices().prepareCreate(index).setSettings(indexSettings).setMapping(mapping);
74+
assertAcked(create.get());
75+
ensureGreen(index);
76+
77+
// Should not trip.
78+
client.admin().indices().getMappings(new GetMappingsRequest().indices(index)).actionGet();
79+
}
80+
81+
public void testManyConcurrentGetMappingsTripParentBreaker() throws Exception {
82+
final String index = "big-mappings-concurrent";
83+
84+
// Each request fits under request breaker, but a burst of them should exceed the 768kb parent limit.
85+
final String pad = randomBase64(256 * 1024);
86+
87+
final XContentBuilder mapping = XContentFactory.jsonBuilder()
88+
.startObject()
89+
.startObject("_meta")
90+
.field("pad", pad)
91+
.endObject()
92+
.startObject("properties")
93+
.startObject("title")
94+
.field("type", "text")
95+
.endObject()
96+
.startObject("year")
97+
.field("type", "integer")
98+
.endObject()
99+
.endObject()
100+
.endObject();
101+
102+
final Settings indexSettings = Settings.builder()
103+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
104+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
105+
.build();
106+
107+
final Client client = client();
108+
109+
CreateIndexRequestBuilder create = client.admin().indices().prepareCreate(index).setSettings(indexSettings).setMapping(mapping);
110+
assertAcked(create.get());
111+
ensureGreen(index);
112+
113+
final int threads = 8; // small node, but enough to blow past ~768kb with in-flight results
114+
final CountDownLatch ready = new CountDownLatch(threads);
115+
final CountDownLatch startGun = new CountDownLatch(1);
116+
final List<Thread> workers = new ArrayList<>(threads);
117+
final AtomicReference<CircuitBreakingException> anyCBE = new AtomicReference<>();
118+
119+
for (int i = 0; i < threads; i++) {
120+
final Thread t = new Thread(() -> {
121+
try {
122+
ready.countDown();
123+
startGun.await();
124+
// Fire several requests per thread in quick succession to increase overlap
125+
for (int j = 0; j < 3; j++) {
126+
try {
127+
client.admin().indices().getMappings(new GetMappingsRequest().indices(index)).actionGet();
128+
} catch (CircuitBreakingException cbe) {
129+
anyCBE.compareAndSet(null, cbe);
130+
// Stop this worker early after a breaker trip to reduce noise
131+
break;
132+
}
133+
}
134+
} catch (InterruptedException ignored) {
135+
Thread.currentThread().interrupt();
136+
}
137+
});
138+
workers.add(t);
139+
t.start();
140+
}
141+
142+
// synchronize the start so requests overlap
143+
ready.await();
144+
startGun.countDown();
145+
146+
for (Thread t : workers) {
147+
t.join();
148+
}
149+
150+
final CircuitBreakingException cbe = anyCBE.get();
151+
assertNotNull("Expected at least one GET _mappings to trip the parent breaker under concurrent load", cbe);
152+
assertThat(cbe.getMessage(), containsString("Data too large"));
153+
}
154+
155+
private static String randomBase64(int approxBytes) {
156+
byte[] buf = new byte[approxBytes];
157+
new SecureRandom().nextBytes(buf);
158+
return Base64.getEncoder().encodeToString(buf);
159+
}
160+
}

0 commit comments

Comments
 (0)