Skip to content

Commit 57bc645

Browse files
committed
toBytesArray
Signed-off-by: Karen X <karenxyr@gmail.com>
1 parent 3b80f63 commit 57bc645

File tree

2 files changed

+282
-33
lines changed

2 files changed

+282
-33
lines changed

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,10 @@
3434
import org.opensearch.protobufs.UpdateOperation;
3535
import org.opensearch.protobufs.WriteOperation;
3636
import org.opensearch.script.Script;
37-
import org.opensearch.search.SearchHit;
3837
import org.opensearch.search.fetch.subphase.FetchSourceContext;
3938
import org.opensearch.transport.grpc.proto.request.common.FetchSourceContextProtoUtils;
4039
import org.opensearch.transport.grpc.proto.request.common.ScriptProtoUtils;
4140
import org.opensearch.transport.grpc.proto.response.document.common.VersionTypeProtoUtils;
42-
import org.opensearch.transport.grpc.proto.response.search.SearchHitProtoUtils;
4341

4442
import java.util.List;
4543
import java.util.Objects;
@@ -88,45 +86,23 @@ private static Boolean valueOrDefault(Boolean value, Boolean globalDefault) {
8886
}
8987

9088
/**
91-
* Converts a protobuf ByteString to OpenSearch BytesReference with zero-copy optimization.
89+
* Converts a protobuf ByteString to OpenSearch BytesReference.
9290
*
93-
* This method uses asReadOnlyByteBuffer() to get a view of the ByteString's internal data
94-
* without copying. The ByteBuffer is then converted to a BytesRef which wraps the underlying
95-
* byte array. This approach matches the pattern used in
96-
* {@link SearchHitProtoUtils#processSource(SearchHit, org.opensearch.protobufs.HitsMetadataHitsInner.Builder)}.
91+
* This method uses ByteString.toByteArray() which delegates to protobuf's internal
92+
* implementation. Protobuf optimizes this based on the ByteString's internal representation:
93+
* - For ByteStrings created with UnsafeByteOperations.unsafeWrap(), it returns the wrapped array directly (zero-copy)
94+
* - For other ByteStrings, it creates a copy
9795
*
9896
* @param byteString The protobuf ByteString to convert
99-
* @return A BytesReference wrapping the ByteString data without copying
97+
* @return A BytesReference wrapping the ByteString data
10098
*/
10199
private static BytesReference byteStringToBytesReference(ByteString byteString) {
102100
if (byteString == null || byteString.isEmpty()) {
103101
return BytesArray.EMPTY;
104102
}
105-
106-
// Use asReadOnlyByteBuffer() to get a zero-copy view of the ByteString's internal data
107-
// Then extract the backing array and wrap it in BytesArray
108-
java.nio.ByteBuffer buffer = byteString.asReadOnlyByteBuffer();
109-
110-
if (buffer.hasArray()) {
111-
// Fast path: ByteBuffer is backed by an array
112-
byte[] array = buffer.array();
113-
int offset = buffer.arrayOffset() + buffer.position();
114-
int length = buffer.remaining();
115-
116-
if (offset == 0 && length == array.length) {
117-
// No offset, can use the array directly
118-
return new BytesArray(array);
119-
} else {
120-
// Has offset or partial array
121-
return new BytesArray(array, offset, length);
122-
}
123-
} else {
124-
// Fallback: ByteBuffer is not array-backed (rare case)
125-
// Must copy in this case
126-
byte[] bytes = new byte[buffer.remaining()];
127-
buffer.get(bytes);
128-
return new BytesArray(bytes);
129-
}
103+
// Let protobuf handle the conversion efficiently
104+
// For ByteStrings created with UnsafeByteOperations.unsafeWrap(), this returns the backing array
105+
return new BytesArray(byteString.toByteArray());
130106
}
131107

132108
/**

modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,4 +1259,277 @@ public void testValueOrDefaultBooleanWithNullValue() {
12591259
IndexRequest indexRequest = (IndexRequest) requests[0];
12601260
assertTrue("RequireAlias should use global value", indexRequest.isRequireAlias());
12611261
}
1262+
1263+
/**
1264+
* Test ByteString to BytesReference conversion with UnsafeByteOperations.unsafeWrap()
1265+
* This tests the zero-copy path where ByteString wraps a byte array
1266+
*/
1267+
public void testByteStringToBytesReferenceZeroCopy() {
1268+
byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8);
1269+
ByteString byteString = UnsafeByteOperations.unsafeWrap(document);
1270+
1271+
WriteOperation createOp = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build();
1272+
1273+
IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest(
1274+
createOp,
1275+
byteString,
1276+
"default-index",
1277+
"default-id",
1278+
"default-routing",
1279+
Versions.MATCH_ANY,
1280+
VersionType.INTERNAL,
1281+
"default-pipeline",
1282+
SequenceNumbers.UNASSIGNED_SEQ_NO,
1283+
UNASSIGNED_PRIMARY_TERM,
1284+
false
1285+
);
1286+
1287+
assertNotNull("IndexRequest should not be null", indexRequest);
1288+
assertNotNull("Source should not be null", indexRequest.source());
1289+
// Verify the content is correct
1290+
assertEquals("Source content should match", new String(document, StandardCharsets.UTF_8), indexRequest.source().utf8ToString());
1291+
}
1292+
1293+
/**
1294+
* Test ByteString to BytesReference conversion with ByteString.copyFrom()
1295+
* This tests the copy path where ByteString creates an internal copy
1296+
*/
1297+
public void testByteStringToBytesReferenceCopy() {
1298+
byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8);
1299+
ByteString byteString = ByteString.copyFrom(document);
1300+
1301+
WriteOperation createOp = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build();
1302+
1303+
IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest(
1304+
createOp,
1305+
byteString,
1306+
"default-index",
1307+
"default-id",
1308+
"default-routing",
1309+
Versions.MATCH_ANY,
1310+
VersionType.INTERNAL,
1311+
"default-pipeline",
1312+
SequenceNumbers.UNASSIGNED_SEQ_NO,
1313+
UNASSIGNED_PRIMARY_TERM,
1314+
false
1315+
);
1316+
1317+
assertNotNull("IndexRequest should not be null", indexRequest);
1318+
assertNotNull("Source should not be null", indexRequest.source());
1319+
assertEquals("Source content should match", new String(document, StandardCharsets.UTF_8), indexRequest.source().utf8ToString());
1320+
}
1321+
1322+
/**
1323+
* Test ByteString to BytesReference conversion with empty ByteString
1324+
*/
1325+
public void testByteStringToBytesReferenceEmpty() {
1326+
ByteString byteString = ByteString.EMPTY;
1327+
1328+
WriteOperation createOp = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build();
1329+
1330+
IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest(
1331+
createOp,
1332+
byteString,
1333+
"default-index",
1334+
"default-id",
1335+
"default-routing",
1336+
Versions.MATCH_ANY,
1337+
VersionType.INTERNAL,
1338+
"default-pipeline",
1339+
SequenceNumbers.UNASSIGNED_SEQ_NO,
1340+
UNASSIGNED_PRIMARY_TERM,
1341+
false
1342+
);
1343+
1344+
assertNotNull("IndexRequest should not be null", indexRequest);
1345+
assertNotNull("Source should not be null", indexRequest.source());
1346+
assertEquals("Source should be empty", 0, indexRequest.source().length());
1347+
}
1348+
1349+
/**
1350+
* Test update request with doc field using UnsafeByteOperations.unsafeWrap()
1351+
*/
1352+
public void testUpdateRequestDocFieldZeroCopy() {
1353+
byte[] document = "{\"field\":\"updated_value\"}".getBytes(StandardCharsets.UTF_8);
1354+
ByteString docBytes = UnsafeByteOperations.unsafeWrap(document);
1355+
1356+
UpdateOperation updateOp = UpdateOperation.newBuilder().setXIndex("test-index").setXId("test-id").build();
1357+
1358+
org.opensearch.protobufs.UpdateAction updateAction = org.opensearch.protobufs.UpdateAction.newBuilder().setDoc(docBytes).build();
1359+
1360+
BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder()
1361+
.setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOp).build())
1362+
.setUpdateAction(updateAction)
1363+
.build();
1364+
1365+
UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest(
1366+
updateOp,
1367+
docBytes,
1368+
bulkRequestBody,
1369+
"default-index",
1370+
"default-id",
1371+
"default-routing",
1372+
null,
1373+
0,
1374+
"default-pipeline",
1375+
SequenceNumbers.UNASSIGNED_SEQ_NO,
1376+
UNASSIGNED_PRIMARY_TERM,
1377+
false
1378+
);
1379+
1380+
assertNotNull("UpdateRequest should not be null", updateRequest);
1381+
assertNotNull("Doc should not be null", updateRequest.doc());
1382+
assertEquals("Doc content should match", new String(document, StandardCharsets.UTF_8), updateRequest.doc().source().utf8ToString());
1383+
}
1384+
1385+
/**
1386+
* Test update request with upsert field using UnsafeByteOperations.unsafeWrap()
1387+
*/
1388+
public void testUpdateRequestUpsertFieldZeroCopy() {
1389+
byte[] docBytes = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8);
1390+
byte[] upsertBytes = "{\"field\":\"default_value\"}".getBytes(StandardCharsets.UTF_8);
1391+
1392+
UpdateOperation updateOp = UpdateOperation.newBuilder().setXIndex("test-index").setXId("test-id").build();
1393+
1394+
org.opensearch.protobufs.UpdateAction updateAction = org.opensearch.protobufs.UpdateAction.newBuilder()
1395+
.setDoc(UnsafeByteOperations.unsafeWrap(docBytes))
1396+
.setUpsert(UnsafeByteOperations.unsafeWrap(upsertBytes))
1397+
.build();
1398+
1399+
BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder()
1400+
.setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOp).build())
1401+
.setUpdateAction(updateAction)
1402+
.build();
1403+
1404+
UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest(
1405+
updateOp,
1406+
UnsafeByteOperations.unsafeWrap(docBytes),
1407+
bulkRequestBody,
1408+
"default-index",
1409+
"default-id",
1410+
"default-routing",
1411+
null,
1412+
0,
1413+
"default-pipeline",
1414+
SequenceNumbers.UNASSIGNED_SEQ_NO,
1415+
UNASSIGNED_PRIMARY_TERM,
1416+
false
1417+
);
1418+
1419+
assertNotNull("UpdateRequest should not be null", updateRequest);
1420+
assertNotNull("Upsert should not be null", updateRequest.upsertRequest());
1421+
assertEquals(
1422+
"Upsert content should match",
1423+
new String(upsertBytes, StandardCharsets.UTF_8),
1424+
updateRequest.upsertRequest().source().utf8ToString()
1425+
);
1426+
}
1427+
1428+
/**
1429+
* Test update request with both doc and upsert fields using ByteString.copyFrom()
1430+
*/
1431+
public void testUpdateRequestDocAndUpsertCopy() {
1432+
byte[] docBytes = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8);
1433+
byte[] upsertBytes = "{\"field\":\"default_value\"}".getBytes(StandardCharsets.UTF_8);
1434+
1435+
UpdateOperation updateOp = UpdateOperation.newBuilder().setXIndex("test-index").setXId("test-id").build();
1436+
1437+
org.opensearch.protobufs.UpdateAction updateAction = org.opensearch.protobufs.UpdateAction.newBuilder()
1438+
.setDoc(ByteString.copyFrom(docBytes))
1439+
.setUpsert(ByteString.copyFrom(upsertBytes))
1440+
.build();
1441+
1442+
BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder()
1443+
.setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOp).build())
1444+
.setUpdateAction(updateAction)
1445+
.build();
1446+
1447+
UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest(
1448+
updateOp,
1449+
ByteString.copyFrom(docBytes),
1450+
bulkRequestBody,
1451+
"default-index",
1452+
"default-id",
1453+
"default-routing",
1454+
null,
1455+
0,
1456+
"default-pipeline",
1457+
SequenceNumbers.UNASSIGNED_SEQ_NO,
1458+
UNASSIGNED_PRIMARY_TERM,
1459+
false
1460+
);
1461+
1462+
assertNotNull("UpdateRequest should not be null", updateRequest);
1463+
assertNotNull("Doc should not be null", updateRequest.doc());
1464+
assertNotNull("Upsert should not be null", updateRequest.upsertRequest());
1465+
assertEquals("Doc content should match", new String(docBytes, StandardCharsets.UTF_8), updateRequest.doc().source().utf8ToString());
1466+
assertEquals(
1467+
"Upsert content should match",
1468+
new String(upsertBytes, StandardCharsets.UTF_8),
1469+
updateRequest.upsertRequest().source().utf8ToString()
1470+
);
1471+
}
1472+
1473+
/**
1474+
* Test index request with large document using UnsafeByteOperations.unsafeWrap()
1475+
*/
1476+
public void testIndexRequestLargeDocumentZeroCopy() {
1477+
// Create a large document (> 1KB)
1478+
StringBuilder sb = new StringBuilder("{\"data\":\"");
1479+
for (int i = 0; i < 200; i++) {
1480+
sb.append("0123456789");
1481+
}
1482+
sb.append("\"}");
1483+
byte[] document = sb.toString().getBytes(StandardCharsets.UTF_8);
1484+
1485+
IndexOperation indexOp = IndexOperation.newBuilder().setXIndex("test-index").setXId("test-id").build();
1486+
1487+
IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest(
1488+
indexOp,
1489+
UnsafeByteOperations.unsafeWrap(document),
1490+
null,
1491+
"default-index",
1492+
"default-id",
1493+
"default-routing",
1494+
Versions.MATCH_ANY,
1495+
VersionType.INTERNAL,
1496+
"default-pipeline",
1497+
SequenceNumbers.UNASSIGNED_SEQ_NO,
1498+
UNASSIGNED_PRIMARY_TERM,
1499+
false
1500+
);
1501+
1502+
assertNotNull("IndexRequest should not be null", indexRequest);
1503+
assertNotNull("Source should not be null", indexRequest.source());
1504+
assertEquals("Source length should match", document.length, indexRequest.source().length());
1505+
assertEquals("Source content should match", new String(document, StandardCharsets.UTF_8), indexRequest.source().utf8ToString());
1506+
}
1507+
1508+
/**
1509+
* Test create request with UTF-8 encoded document using UnsafeByteOperations.unsafeWrap()
1510+
*/
1511+
public void testCreateRequestUtf8DocumentZeroCopy() {
1512+
String jsonWithUnicode = "{\"field\":\"Hello 世界 🌍\"}";
1513+
byte[] document = jsonWithUnicode.getBytes(StandardCharsets.UTF_8);
1514+
1515+
WriteOperation createOp = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build();
1516+
1517+
IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest(
1518+
createOp,
1519+
UnsafeByteOperations.unsafeWrap(document),
1520+
"default-index",
1521+
"default-id",
1522+
"default-routing",
1523+
Versions.MATCH_ANY,
1524+
VersionType.INTERNAL,
1525+
"default-pipeline",
1526+
SequenceNumbers.UNASSIGNED_SEQ_NO,
1527+
UNASSIGNED_PRIMARY_TERM,
1528+
false
1529+
);
1530+
1531+
assertNotNull("IndexRequest should not be null", indexRequest);
1532+
assertNotNull("Source should not be null", indexRequest.source());
1533+
assertEquals("Source content should match UTF-8", jsonWithUnicode, indexRequest.source().utf8ToString());
1534+
}
12621535
}

0 commit comments

Comments
 (0)