Skip to content

Commit ed151d8

Browse files
authored
Migrate Search requests to use Writeable reading strategies (#26428)
Migrates many SearchRequest objects to use Writeable conventions and rejects usage of `readFrom` in these new classes.
1 parent ea3fa76 commit ed151d8

File tree

18 files changed

+224
-156
lines changed

18 files changed

+224
-156
lines changed

client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public class TransportNoopSearchAction extends HandledTransportAction<SearchRequ
4242
@Inject
4343
public TransportNoopSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters
4444
actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
45-
super(settings, NoopSearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
46-
SearchRequest::new);
45+
super(settings, NoopSearchAction.NAME, threadPool, transportService, actionFilters, SearchRequest::new,
46+
indexNameExpressionResolver);
4747
}
4848

4949
@Override

core/src/main/java/org/elasticsearch/action/search/MultiSearchRequest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,7 @@ public void readFrom(StreamInput in) throws IOException {
117117
maxConcurrentSearchRequests = in.readVInt();
118118
int size = in.readVInt();
119119
for (int i = 0; i < size; i++) {
120-
SearchRequest request = new SearchRequest();
121-
request.readFrom(in);
120+
SearchRequest request = new SearchRequest(in);
122121
requests.add(request);
123122
}
124123
}

core/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 51 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,55 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
109109
this.source = source;
110110
}
111111

112+
/**
113+
* Constructs a new search request from reading the specified stream.
114+
*
115+
* @param in The stream the request is read from
116+
* @throws IOException if there is an issue reading the stream
117+
*/
118+
public SearchRequest(StreamInput in) throws IOException {
119+
super(in);
120+
searchType = SearchType.fromId(in.readByte());
121+
indices = new String[in.readVInt()];
122+
for (int i = 0; i < indices.length; i++) {
123+
indices[i] = in.readString();
124+
}
125+
routing = in.readOptionalString();
126+
preference = in.readOptionalString();
127+
scroll = in.readOptionalWriteable(Scroll::new);
128+
source = in.readOptionalWriteable(SearchSourceBuilder::new);
129+
types = in.readStringArray();
130+
indicesOptions = IndicesOptions.readIndicesOptions(in);
131+
requestCache = in.readOptionalBoolean();
132+
batchedReduceSize = in.readVInt();
133+
if (in.getVersion().onOrAfter(Version.V_5_6_0)) {
134+
maxConcurrentShardRequests = in.readVInt();
135+
preFilterShardSize = in.readVInt();
136+
}
137+
}
138+
139+
@Override
140+
public void writeTo(StreamOutput out) throws IOException {
141+
super.writeTo(out);
142+
out.writeByte(searchType.id());
143+
out.writeVInt(indices.length);
144+
for (String index : indices) {
145+
out.writeString(index);
146+
}
147+
out.writeOptionalString(routing);
148+
out.writeOptionalString(preference);
149+
out.writeOptionalWriteable(scroll);
150+
out.writeOptionalWriteable(source);
151+
out.writeStringArray(types);
152+
indicesOptions.writeIndicesOptions(out);
153+
out.writeOptionalBoolean(requestCache);
154+
out.writeVInt(batchedReduceSize);
155+
if (out.getVersion().onOrAfter(Version.V_5_6_0)) {
156+
out.writeVInt(maxConcurrentShardRequests);
157+
out.writeVInt(preFilterShardSize);
158+
}
159+
}
160+
112161
@Override
113162
public ActionRequestValidationException validate() {
114163
ActionRequestValidationException validationException = null;
@@ -385,7 +434,7 @@ public String getDescription() {
385434
sb.append("], ");
386435
sb.append("search_type[").append(searchType).append("], ");
387436
if (source != null) {
388-
437+
389438
sb.append("source[").append(source.toString(FORMAT_PARAMS)).append("]");
390439
} else {
391440
sb.append("source[]");
@@ -397,46 +446,7 @@ public String getDescription() {
397446

398447
@Override
399448
public void readFrom(StreamInput in) throws IOException {
400-
super.readFrom(in);
401-
searchType = SearchType.fromId(in.readByte());
402-
indices = new String[in.readVInt()];
403-
for (int i = 0; i < indices.length; i++) {
404-
indices[i] = in.readString();
405-
}
406-
routing = in.readOptionalString();
407-
preference = in.readOptionalString();
408-
scroll = in.readOptionalWriteable(Scroll::new);
409-
source = in.readOptionalWriteable(SearchSourceBuilder::new);
410-
types = in.readStringArray();
411-
indicesOptions = IndicesOptions.readIndicesOptions(in);
412-
requestCache = in.readOptionalBoolean();
413-
batchedReduceSize = in.readVInt();
414-
if (in.getVersion().onOrAfter(Version.V_5_6_0)) {
415-
maxConcurrentShardRequests = in.readVInt();
416-
preFilterShardSize = in.readVInt();
417-
}
418-
}
419-
420-
@Override
421-
public void writeTo(StreamOutput out) throws IOException {
422-
super.writeTo(out);
423-
out.writeByte(searchType.id());
424-
out.writeVInt(indices.length);
425-
for (String index : indices) {
426-
out.writeString(index);
427-
}
428-
out.writeOptionalString(routing);
429-
out.writeOptionalString(preference);
430-
out.writeOptionalWriteable(scroll);
431-
out.writeOptionalWriteable(source);
432-
out.writeStringArray(types);
433-
indicesOptions.writeIndicesOptions(out);
434-
out.writeOptionalBoolean(requestCache);
435-
out.writeVInt(batchedReduceSize);
436-
if (out.getVersion().onOrAfter(Version.V_5_6_0)) {
437-
out.writeVInt(maxConcurrentShardRequests);
438-
out.writeVInt(preFilterShardSize);
439-
}
449+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
440450
}
441451

442452
@Override

core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,19 @@ public SearchScrollRequest(String scrollId) {
4848
this.scrollId = scrollId;
4949
}
5050

51+
public SearchScrollRequest(StreamInput in) throws IOException {
52+
super(in);
53+
scrollId = in.readString();
54+
scroll = in.readOptionalWriteable(Scroll::new);
55+
}
56+
57+
@Override
58+
public void writeTo(StreamOutput out) throws IOException {
59+
super.writeTo(out);
60+
out.writeString(scrollId);
61+
out.writeOptionalWriteable(scroll);
62+
}
63+
5164
@Override
5265
public ActionRequestValidationException validate() {
5366
ActionRequestValidationException validationException = null;
@@ -100,16 +113,7 @@ public SearchScrollRequest scroll(String keepAlive) {
100113

101114
@Override
102115
public void readFrom(StreamInput in) throws IOException {
103-
super.readFrom(in);
104-
scrollId = in.readString();
105-
scroll = in.readOptionalWriteable(Scroll::new);
106-
}
107-
108-
@Override
109-
public void writeTo(StreamOutput out) throws IOException {
110-
super.writeTo(out);
111-
out.writeString(scrollId);
112-
out.writeOptionalWriteable(scroll);
116+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
113117
}
114118

115119
@Override

core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -203,13 +203,8 @@ static class ScrollFreeContextRequest extends TransportRequest {
203203
this.id = id;
204204
}
205205

206-
public long id() {
207-
return this.id;
208-
}
209-
210-
@Override
211-
public void readFrom(StreamInput in) throws IOException {
212-
super.readFrom(in);
206+
ScrollFreeContextRequest(StreamInput in) throws IOException {
207+
super(in);
213208
id = in.readLong();
214209
}
215210

@@ -218,6 +213,15 @@ public void writeTo(StreamOutput out) throws IOException {
218213
super.writeTo(out);
219214
out.writeLong(id);
220215
}
216+
217+
public long id() {
218+
return this.id;
219+
}
220+
221+
@Override
222+
public void readFrom(StreamInput in) throws IOException {
223+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
224+
}
221225
}
222226

223227
static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest {
@@ -231,6 +235,17 @@ static class SearchFreeContextRequest extends ScrollFreeContextRequest implement
231235
this.originalIndices = originalIndices;
232236
}
233237

238+
SearchFreeContextRequest(StreamInput in) throws IOException {
239+
super(in);
240+
originalIndices = OriginalIndices.readOriginalIndices(in);
241+
}
242+
243+
@Override
244+
public void writeTo(StreamOutput out) throws IOException {
245+
super.writeTo(out);
246+
OriginalIndices.writeOriginalIndices(originalIndices, out);
247+
}
248+
234249
@Override
235250
public String[] indices() {
236251
if (originalIndices == null) {
@@ -249,14 +264,7 @@ public IndicesOptions indicesOptions() {
249264

250265
@Override
251266
public void readFrom(StreamInput in) throws IOException {
252-
super.readFrom(in);
253-
originalIndices = OriginalIndices.readOriginalIndices(in);
254-
}
255-
256-
@Override
257-
public void writeTo(StreamOutput out) throws IOException {
258-
super.writeTo(out);
259-
OriginalIndices.writeOriginalIndices(originalIndices, out);
267+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
260268
}
261269
}
262270

@@ -289,7 +297,7 @@ public void writeTo(StreamOutput out) throws IOException {
289297
}
290298

291299
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
292-
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME,
300+
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new,
293301
new TaskAwareTransportRequestHandler<ScrollFreeContextRequest>() {
294302
@Override
295303
public void messageReceived(ScrollFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
@@ -298,7 +306,7 @@ public void messageReceived(ScrollFreeContextRequest request, TransportChannel c
298306
}
299307
});
300308
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new);
301-
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME,
309+
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new,
302310
new TaskAwareTransportRequestHandler<SearchFreeContextRequest>() {
303311
@Override
304312
public void messageReceived(SearchFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
@@ -318,7 +326,7 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha
318326
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
319327
() -> TransportResponse.Empty.INSTANCE);
320328

321-
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
329+
transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
322330
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
323331
@Override
324332
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
@@ -346,7 +354,7 @@ public void onFailure(Exception e) {
346354
});
347355
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);
348356

349-
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
357+
transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
350358
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
351359
@Override
352360
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
@@ -373,7 +381,7 @@ public void onFailure(Exception e) {
373381
});
374382
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new);
375383

376-
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
384+
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
377385
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
378386
@Override
379387
public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception {
@@ -383,7 +391,7 @@ public void messageReceived(QuerySearchRequest request, TransportChannel channel
383391
});
384392
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);
385393

386-
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
394+
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
387395
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
388396
@Override
389397
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
@@ -393,7 +401,7 @@ public void messageReceived(InternalScrollSearchRequest request, TransportChanne
393401
});
394402
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);
395403

396-
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
404+
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
397405
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
398406
@Override
399407
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
@@ -403,7 +411,7 @@ public void messageReceived(InternalScrollSearchRequest request, TransportChanne
403411
});
404412
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);
405413

406-
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH,
414+
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new,
407415
new TaskAwareTransportRequestHandler<ShardFetchRequest>() {
408416
@Override
409417
public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception {
@@ -413,7 +421,7 @@ public void messageReceived(ShardFetchRequest request, TransportChannel channel,
413421
});
414422
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);
415423

416-
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
424+
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchSearchRequest::new,
417425
new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
418426
@Override
419427
public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
@@ -424,7 +432,7 @@ public void messageReceived(ShardFetchSearchRequest request, TransportChannel ch
424432
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
425433

426434
// this is super cheap and should not hit thread-pool rejections
427-
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
435+
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
428436
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
429437
@Override
430438
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {

core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public TransportSearchAction(Settings settings, ThreadPool threadPool, Transport
8282
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
8383
ClusterService clusterService, ActionFilters actionFilters,
8484
IndexNameExpressionResolver indexNameExpressionResolver) {
85-
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new);
85+
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, SearchRequest::new, indexNameExpressionResolver);
8686
this.searchPhaseController = searchPhaseController;
8787
this.searchTransportService = searchTransportService;
8888
this.remoteClusterService = searchTransportService.getRemoteClusterService();

core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, Tra
4545
ClusterService clusterService, ActionFilters actionFilters,
4646
IndexNameExpressionResolver indexNameExpressionResolver,
4747
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController) {
48-
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
49-
SearchScrollRequest::new);
48+
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, SearchScrollRequest::new,
49+
indexNameExpressionResolver);
5050
this.clusterService = clusterService;
5151
this.searchTransportService = searchTransportService;
5252
this.searchPhaseController = searchPhaseController;

core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,8 +399,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId)
399399
@Override
400400
public void readFrom(StreamInput in) throws IOException {
401401
super.readFrom(in);
402-
searchRequest = new SearchRequest();
403-
searchRequest.readFrom(in);
402+
searchRequest = new SearchRequest(in);
404403
abortOnVersionConflict = in.readBoolean();
405404
size = in.readVInt();
406405
refresh = in.readBoolean();

0 commit comments

Comments
 (0)