Skip to content

Commit f3818ca

Browse files
committed
Add pipeline, fix update to use doc instead of object field (still fallsback to object for bwc)
bytes optimize Set the default value of source to null to match REST Support allowExplicitIndex setting Signed-off-by: Karen Xu <karenxyr@gmail.com>
1 parent 022d594 commit f3818ca

File tree

7 files changed

+158
-58
lines changed

7 files changed

+158
-58
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6363
- Add S3Repository.LEGACY_MD5_CHECKSUM_CALCULATION to list of repository-s3 settings ([#19788](https://github.com/opensearch-project/OpenSearch/pull/19788))
6464
- Fix NullPointerException when restoring remote snapshot with missing shard size information ([#19684](https://github.com/opensearch-project/OpenSearch/pull/19684))
6565
- Fix NPE of ScriptScoreQuery ([#19650](https://github.com/opensearch-project/OpenSearch/pull/19650))
66+
- Fix GRPC Bulk ([#x](https://github.com/opensearch-project/OpenSearch/pull/x))
6667

6768
### Dependencies
6869
- Update to Gradle 9.2 ([#19575](https://github.com/opensearch-project/OpenSearch/pull/19575)) ([#19856](https://github.com/opensearch-project/OpenSearch/pull/19856))

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
210210

211211
return Collections.singletonMap(GRPC_TRANSPORT_SETTING_KEY, () -> {
212212
List<BindableService> grpcServices = new ArrayList<>(
213-
List.of(new DocumentServiceImpl(client), new SearchServiceImpl(client, queryUtils))
213+
List.of(new DocumentServiceImpl(client, settings), new SearchServiceImpl(client, queryUtils))
214214
);
215215
for (GrpcServiceFactory serviceFac : servicesFactory) {
216216
List<BindableService> pluginServices = serviceFac.initClient(client)
@@ -259,10 +259,9 @@ public Map<String, Supplier<AuxTransport>> getSecureAuxTransports(
259259
if (client == null || queryRegistry == null) {
260260
throw new RuntimeException("createComponents must be called first to initialize server provided resources.");
261261
}
262-
263262
return Collections.singletonMap(GRPC_SECURE_TRANSPORT_SETTING_KEY, () -> {
264263
List<BindableService> grpcServices = new ArrayList<>(
265-
List.of(new DocumentServiceImpl(client), new SearchServiceImpl(client, queryUtils))
264+
List.of(new DocumentServiceImpl(client, settings), new SearchServiceImpl(client, queryUtils))
266265
);
267266
for (GrpcServiceFactory serviceFac : servicesFactory) {
268267
List<BindableService> pluginServices = serviceFac.initClient(client)

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ private FetchSourceContextProtoUtils() {
3838
* @return A FetchSourceContext object based on the request parameters, or null if no source parameters are provided
3939
*/
4040
public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.BulkRequest request) {
41-
Boolean fetchSource = true;
41+
Boolean fetchSource = null;
4242
String[] sourceExcludes = null;
4343
String[] sourceIncludes = null;
4444

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java

Lines changed: 137 additions & 44 deletions
Large diffs are not rendered by default.

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.transport.grpc.proto.request.document.bulk;
1010

1111
import org.opensearch.action.bulk.BulkShardRequest;
12+
import org.opensearch.common.settings.Settings;
1213
import org.opensearch.protobufs.BulkRequest;
1314
import org.opensearch.rest.RestRequest;
1415
import org.opensearch.rest.action.document.RestBulkAction;
@@ -37,9 +38,10 @@ private BulkRequestProtoUtils() {
3738
* Please ensure to keep both implementations consistent.
3839
*
3940
* @param request the request to execute
41+
* @param settings node settings for security and configuration
4042
* @return a future of the bulk action that was executed
4143
*/
42-
public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest request) {
44+
public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest request, Settings settings) {
4345
org.opensearch.action.bulk.BulkRequest bulkRequest = Requests.bulkRequest();
4446

4547
String defaultIndex = request.hasIndex() ? request.getIndex() : null;
@@ -60,6 +62,9 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest
6062

6163
bulkRequest.setRefreshPolicy(RefreshProtoUtils.getRefreshPolicy(request.getRefresh()));
6264

65+
// Read the allowExplicitIndex setting (matches REST BulkAction line 74)
66+
boolean allowExplicitIndex = RestBulkAction.MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
67+
6368
// Note: batch_size is deprecated in OS 3.x. Add batch_size parameter when backporting to OS 2.x
6469
/*
6570
if (request.hasBatchSize()){
@@ -75,7 +80,8 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest
7580
defaultRouting,
7681
defaultFetchSourceContext,
7782
defaultPipeline,
78-
defaultRequireAlias
83+
defaultRequireAlias,
84+
allowExplicitIndex
7985
)
8086
);
8187

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.settings.Settings;
1314
import org.opensearch.protobufs.services.DocumentServiceGrpc;
1415
import org.opensearch.transport.client.Client;
1516
import org.opensearch.transport.grpc.listeners.BulkRequestActionListener;
@@ -25,14 +26,17 @@
2526
public class DocumentServiceImpl extends DocumentServiceGrpc.DocumentServiceImplBase {
2627
private static final Logger logger = LogManager.getLogger(DocumentServiceImpl.class);
2728
private final Client client;
29+
private final Settings settings;
2830

2931
/**
3032
* Creates a new DocumentServiceImpl.
3133
*
3234
* @param client Client for executing actions on the local node
35+
* @param settings Node settings for security and configuration
3336
*/
34-
public DocumentServiceImpl(Client client) {
37+
public DocumentServiceImpl(Client client, Settings settings) {
3538
this.client = client;
39+
this.settings = settings;
3640
}
3741

3842
/**
@@ -44,7 +48,7 @@ public DocumentServiceImpl(Client client) {
4448
@Override
4549
public void bulk(org.opensearch.protobufs.BulkRequest request, StreamObserver<org.opensearch.protobufs.BulkResponse> responseObserver) {
4650
try {
47-
org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request);
51+
org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, settings);
4852
BulkRequestActionListener listener = new BulkRequestActionListener(responseObserver);
4953
client.bulk(bulkRequest, listener);
5054
} catch (RuntimeException e) {

modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtilsTests.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,9 @@ public void testParseFromProtoRequestWithNoSourceParams() {
109109
FetchSourceContext context = FetchSourceContextProtoUtils.parseFromProtoRequest(request);
110110

111111
// Verify the result
112-
// The implementation returns a default FetchSourceContext with fetchSource=true
113-
// and empty includes/excludes arrays when no source parameters are provided
114-
assertNotNull("Context should not be null", context);
115-
assertTrue("fetchSource should be true", context.fetchSource());
116-
assertArrayEquals("includes should be empty", Strings.EMPTY_ARRAY, context.includes());
117-
assertArrayEquals("excludes should be empty", Strings.EMPTY_ARRAY, context.excludes());
112+
// When no source parameters are provided, should return null to match REST API behavior
113+
// This prevents the "get" field from being returned in update/upsert responses
114+
assertNull("Context should be null when no source parameters provided", context);
118115
}
119116

120117
public void testFromProtoWithFetch() {

0 commit comments

Comments
 (0)