Skip to content

Commit f7c7292

Browse files
committed
Fix unnecessary refreshes during update retry conflicts
When update operations with retry_on_conflict encounter version conflicts, each retry attempt was triggering the original refresh policy (IMMEDIATE, WAIT_UNTIL), causing unnecessary refresh operations that degrade performance. This change suppresses the refresh policy to NONE for retry attempts while preserving the original policy for the initial attempt. The refresh will still happen once when the update eventually succeeds or fails permanently. Fixes #15261 Signed-off-by: Atri Sharma <atri.jiit@gmail.com>
1 parent cff74ff commit f7c7292

File tree

7 files changed

+179
-4
lines changed

7 files changed

+179
-4
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4646
- Change default value of remote_data_ratio, which is used in Searchable Snapshots and Writeable Warm from 0 to 5 and min allowed value to 1 ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
4747
- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)
4848

49+
### Fixed
50+
- Fix unnecessary refreshes during update retry conflicts ([#15261](https://github.com/opensearch-project/OpenSearch/issues/15261))
51+
4952
### Dependencies
5053
- Bump `stefanzweifel/git-auto-commit-action` from 5 to 6 ([#18524](https://github.com/opensearch-project/OpenSearch/pull/18524))
5154
- Bump Apache Lucene to 10.2.2 ([#18573](https://github.com/opensearch-project/OpenSearch/pull/18573))

server/src/internalClusterTest/java/org/opensearch/update/UpdateIT.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.opensearch.action.delete.DeleteResponse;
4242
import org.opensearch.action.get.GetResponse;
4343
import org.opensearch.action.index.IndexResponse;
44+
import org.opensearch.action.support.WriteRequest;
4445
import org.opensearch.action.update.UpdateRequest;
4546
import org.opensearch.action.update.UpdateRequestBuilder;
4647
import org.opensearch.action.update.UpdateResponse;
@@ -898,6 +899,30 @@ private void waitForOutstandingRequests(TimeValue timeOut, Semaphore requestsOut
898899
}
899900
}
900901

902+
public void testUpdateRetryOnConflictDoesNotRefreshOnFailedAttempts() throws Exception {
903+
assertAcked(prepareCreate("test"));
904+
905+
client().prepareIndex("test")
906+
.setId("1")
907+
.setSource(XContentFactory.jsonBuilder().startObject().field("field", "value1").endObject())
908+
.get();
909+
910+
client().prepareUpdate("test", "1")
911+
.setDoc(XContentFactory.jsonBuilder().startObject().field("field", "value2").endObject())
912+
.setRetryOnConflict(3)
913+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
914+
.get();
915+
916+
GetResponse getResponse = client().prepareGet("test", "1").get();
917+
assertTrue(getResponse.isExists());
918+
assertEquals("value2", getResponse.getSourceAsMap().get("field"));
919+
920+
UpdateRequest updateRequest = new UpdateRequest("test", "1");
921+
assertFalse(updateRequest.isRetryAttempt());
922+
updateRequest.markAsRetryAttempt();
923+
assertTrue(updateRequest.isRetryAttempt());
924+
}
925+
901926
private static String indexOrAlias() {
902927
return randomBoolean() ? "test" : "alias";
903928
}

server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,8 @@ private void handleUpdateFailureWithRetry(
381381
request.getShardId(),
382382
request.id()
383383
);
384+
// Mark this as a retry attempt to suppress refresh on failure
385+
request.markAsRetryAttempt();
384386
threadPool.executor(executor(request.getShardId()))
385387
.execute(ActionRunnable.wrap(listener, l -> shardOperation(request, l, retryCount + 1)));
386388
return;

server/src/main/java/org/opensearch/action/update/UpdateHelper.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.opensearch.action.DocWriteResponse;
3939
import org.opensearch.action.delete.DeleteRequest;
4040
import org.opensearch.action.index.IndexRequest;
41+
import org.opensearch.action.support.WriteRequest;
4142
import org.opensearch.common.Nullable;
4243
import org.opensearch.common.collect.Tuple;
4344
import org.opensearch.common.io.stream.BytesStreamOutput;
@@ -180,7 +181,7 @@ Result prepareUpsert(ShardId shardId, UpdateRequest request, final GetResult get
180181

181182
indexRequest.index(request.index())
182183
.id(request.id())
183-
.setRefreshPolicy(request.getRefreshPolicy())
184+
.setRefreshPolicy(request.isRetryAttempt() ? WriteRequest.RefreshPolicy.NONE : request.getRefreshPolicy())
184185
.routing(request.routing())
185186
.timeout(request.timeout())
186187
.waitForActiveShards(request.waitForActiveShards())
@@ -255,7 +256,7 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu
255256
.setIfPrimaryTerm(getResult.getPrimaryTerm())
256257
.waitForActiveShards(request.waitForActiveShards())
257258
.timeout(request.timeout())
258-
.setRefreshPolicy(request.getRefreshPolicy());
259+
.setRefreshPolicy(request.isRetryAttempt() ? WriteRequest.RefreshPolicy.NONE : request.getRefreshPolicy());
259260
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
260261
}
261262
}
@@ -298,7 +299,7 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes
298299
.setIfPrimaryTerm(getResult.getPrimaryTerm())
299300
.waitForActiveShards(request.waitForActiveShards())
300301
.timeout(request.timeout())
301-
.setRefreshPolicy(request.getRefreshPolicy());
302+
.setRefreshPolicy(request.isRetryAttempt() ? WriteRequest.RefreshPolicy.NONE : request.getRefreshPolicy());
302303
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
303304
case DELETE:
304305
DeleteRequest deleteRequest = Requests.deleteRequest(request.index())
@@ -308,7 +309,7 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes
308309
.setIfPrimaryTerm(getResult.getPrimaryTerm())
309310
.waitForActiveShards(request.waitForActiveShards())
310311
.timeout(request.timeout())
311-
.setRefreshPolicy(request.getRefreshPolicy());
312+
.setRefreshPolicy(request.isRetryAttempt() ? WriteRequest.RefreshPolicy.NONE : request.getRefreshPolicy());
312313
return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType);
313314
default:
314315
// If it was neither an INDEX or DELETE operation, treat it as a noop

server/src/main/java/org/opensearch/action/update/UpdateRequest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
145145
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
146146

147147
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
148+
private transient boolean isRetryAttempt = false;
148149

149150
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
150151

@@ -617,6 +618,15 @@ public RefreshPolicy getRefreshPolicy() {
617618
return refreshPolicy;
618619
}
619620

621+
public UpdateRequest markAsRetryAttempt() {
622+
this.isRetryAttempt = true;
623+
return this;
624+
}
625+
626+
public boolean isRetryAttempt() {
627+
return this.isRetryAttempt;
628+
}
629+
620630
public ActiveShardCount waitForActiveShards() {
621631
return this.waitForActiveShards;
622632
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.update;
10+
11+
import org.opensearch.action.delete.DeleteRequest;
12+
import org.opensearch.action.index.IndexRequest;
13+
import org.opensearch.action.support.WriteRequest;
14+
import org.opensearch.test.OpenSearchTestCase;
15+
16+
/**
17+
* Tests for refresh policy behavior during update retries (issue #15261).
18+
*/
19+
public class UpdateRefreshPolicyTests extends OpenSearchTestCase {
20+
21+
public void testRefreshPolicySuppression() {
22+
// Normal request should preserve refresh policy
23+
UpdateRequest normalRequest = new UpdateRequest("test", "1");
24+
normalRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
25+
26+
IndexRequest normalIndexRequest = new IndexRequest("test");
27+
normalIndexRequest.setRefreshPolicy(
28+
normalRequest.isRetryAttempt() ? WriteRequest.RefreshPolicy.NONE : normalRequest.getRefreshPolicy()
29+
);
30+
31+
assertEquals(
32+
"Normal request should preserve IMMEDIATE refresh policy",
33+
WriteRequest.RefreshPolicy.IMMEDIATE,
34+
normalIndexRequest.getRefreshPolicy()
35+
);
36+
37+
// Retry request should suppress refresh policy
38+
UpdateRequest retryRequest = new UpdateRequest("test", "1");
39+
retryRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
40+
retryRequest.markAsRetryAttempt();
41+
42+
IndexRequest retryIndexRequest = new IndexRequest("test");
43+
retryIndexRequest.setRefreshPolicy(
44+
retryRequest.isRetryAttempt() ? WriteRequest.RefreshPolicy.NONE : retryRequest.getRefreshPolicy()
45+
);
46+
47+
assertEquals(
48+
"Retry request should suppress refresh policy to NONE",
49+
WriteRequest.RefreshPolicy.NONE,
50+
retryIndexRequest.getRefreshPolicy()
51+
);
52+
}
53+
54+
public void testRefreshPolicySuppressionForDeleteRequest() {
55+
// Normal delete should preserve refresh policy
56+
UpdateRequest normalRequest = new UpdateRequest("test", "1");
57+
normalRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
58+
59+
DeleteRequest normalDeleteRequest = new DeleteRequest("test", "1");
60+
normalDeleteRequest.setRefreshPolicy(
61+
normalRequest.isRetryAttempt() ? WriteRequest.RefreshPolicy.NONE : normalRequest.getRefreshPolicy()
62+
);
63+
64+
assertEquals(
65+
"Normal delete request should preserve WAIT_UNTIL refresh policy",
66+
WriteRequest.RefreshPolicy.WAIT_UNTIL,
67+
normalDeleteRequest.getRefreshPolicy()
68+
);
69+
70+
// Retry delete should suppress refresh policy
71+
UpdateRequest retryRequest = new UpdateRequest("test", "1");
72+
retryRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
73+
retryRequest.markAsRetryAttempt();
74+
75+
DeleteRequest retryDeleteRequest = new DeleteRequest("test", "1");
76+
retryDeleteRequest.setRefreshPolicy(
77+
retryRequest.isRetryAttempt() ? WriteRequest.RefreshPolicy.NONE : retryRequest.getRefreshPolicy()
78+
);
79+
80+
assertEquals(
81+
"Retry delete request should suppress refresh policy to NONE",
82+
WriteRequest.RefreshPolicy.NONE,
83+
retryDeleteRequest.getRefreshPolicy()
84+
);
85+
}
86+
87+
public void testAllRefreshPolicyValues() {
88+
WriteRequest.RefreshPolicy[] allPolicies = {
89+
WriteRequest.RefreshPolicy.IMMEDIATE,
90+
WriteRequest.RefreshPolicy.WAIT_UNTIL,
91+
WriteRequest.RefreshPolicy.NONE };
92+
93+
for (WriteRequest.RefreshPolicy originalPolicy : allPolicies) {
94+
// Normal request - should preserve original policy
95+
UpdateRequest normalRequest = new UpdateRequest("test", "1");
96+
normalRequest.setRefreshPolicy(originalPolicy);
97+
98+
WriteRequest.RefreshPolicy normalResultPolicy = normalRequest.isRetryAttempt()
99+
? WriteRequest.RefreshPolicy.NONE
100+
: normalRequest.getRefreshPolicy();
101+
102+
assertEquals("Normal request should preserve original policy: " + originalPolicy, originalPolicy, normalResultPolicy);
103+
104+
// Retry request - should always be NONE regardless of original policy
105+
UpdateRequest retryRequest = new UpdateRequest("test", "1");
106+
retryRequest.setRefreshPolicy(originalPolicy);
107+
retryRequest.markAsRetryAttempt();
108+
109+
WriteRequest.RefreshPolicy retryResultPolicy = retryRequest.isRetryAttempt()
110+
? WriteRequest.RefreshPolicy.NONE
111+
: retryRequest.getRefreshPolicy();
112+
113+
assertEquals(
114+
"Retry request should always suppress to NONE, original was: " + originalPolicy,
115+
WriteRequest.RefreshPolicy.NONE,
116+
retryResultPolicy
117+
);
118+
}
119+
}
120+
121+
}

server/src/test/java/org/opensearch/action/update/UpdateRequestTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,4 +709,17 @@ public void testGetChildIndexRequests() {
709709
assertEquals(childRequests.size(), 1);
710710
assertEquals(childRequests.get(0), docRequest);
711711
}
712+
713+
public void testRetryAttemptMarking() {
714+
UpdateRequest request = new UpdateRequest("test", "1");
715+
assertFalse(request.isRetryAttempt());
716+
717+
UpdateRequest returnedRequest = request.markAsRetryAttempt();
718+
assertSame(request, returnedRequest);
719+
assertTrue(request.isRetryAttempt());
720+
721+
UpdateRequest newRequest = new UpdateRequest("test", "1");
722+
newRequest.markAsRetryAttempt();
723+
assertTrue(newRequest.isRetryAttempt());
724+
}
712725
}

0 commit comments

Comments
 (0)