Skip to content

Commit 17c6b7a

Browse files
committed
Merge branch 'feature/3.x-lucene' into agg-perf-lucene
2 parents e90eaca + 9f7d8ce commit 17c6b7a

File tree

51 files changed

+1326
-171
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1326
-171
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1717
- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929))
1818
- Handle deleted documents for filter rewrite sub-aggregation optimization ([#19643](https://github.com/opensearch-project/OpenSearch/pull/19643))
1919
- Add bulk collect API for filter rewrite sub-aggregation optimization ([#19933](https://github.com/opensearch-project/OpenSearch/pull/19933))
20+
- Allow collectors take advantage of preaggregated data using collectRange API ([#20009](https://github.com/opensearch-project/OpenSearch/pull/20009))
2021
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
2122
- Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523))
2223
- Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629))
@@ -27,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2728
- Add async periodic flush task support for pull-based ingestion ([#19878](https://github.com/opensearch-project/OpenSearch/pull/19878))
2829
- Add support for context aware segments ([#19098](https://github.com/opensearch-project/OpenSearch/pull/19098))
2930
- Implement GRPC FunctionScoreQuery ([#19888](https://github.com/opensearch-project/OpenSearch/pull/19888))
31+
- Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765)]
3032

3133
### Changed
3234
- Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573))
@@ -55,6 +57,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5557
- Thread Context Preservation by gRPC Interceptor ([#19776](https://github.com/opensearch-project/OpenSearch/pull/19776))
5658
- Update NoOpResult constructors in the Engine to be public ([#19950](https://github.com/opensearch-project/OpenSearch/pull/19950))
5759
- Refactor the TranslogStats and RequestCacheStats class to use the Builder pattern instead of constructors ([#19961](https://github.com/opensearch-project/OpenSearch/pull/19961))
60+
- Refactor the IndexPressutreStats, DeviceStats and TransportStats class to use the Builder pattern instead of constructors ([#19991](https://github.com/opensearch-project/OpenSearch/pull/19991))
5861

5962
### Fixed
6063
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
@@ -72,6 +75,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
7275
- Add S3Repository.LEGACY_MD5_CHECKSUM_CALCULATION to list of repository-s3 settings ([#19788](https://github.com/opensearch-project/OpenSearch/pull/19788))
7376
- Fix NullPointerException when restoring remote snapshot with missing shard size information ([#19684](https://github.com/opensearch-project/OpenSearch/pull/19684))
7477
- Fix NPE of ScriptScoreQuery ([#19650](https://github.com/opensearch-project/OpenSearch/pull/19650))
78+
- Fix ClassCastException in FlightClientChannel for requests larger than 16KB ([#20010](https://github.com/opensearch-project/OpenSearch/pull/20010))
7579

7680
### Dependencies
7781
- Update to Gradle 9.2 ([#19575](https://github.com/opensearch-project/OpenSearch/pull/19575)) ([#19856](https://github.com/opensearch-project/OpenSearch/pull/19856))
@@ -112,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
112116
- Deprecated existing constructors in GetStats, FlushStats and QueryCacheStats in favor of the new Builder ([#19935](https://github.com/opensearch-project/OpenSearch/pull/19935))
113117
- Deprecated existing constructors in FieldDataStats and CompletionStats in favor of the new Builder ([#19936](https://github.com/opensearch-project/OpenSearch/pull/19936))
114118
- Deprecated existing constructors in TranslogStats and RequestCacheStats in favor of the new Builder ([#19961](https://github.com/opensearch-project/OpenSearch/pull/19961))
119+
- Deprecated existing constructors in IndexPressutreStats, DeviceStats and TransportStats in favor of the new Builder ([#19991](https://github.com/opensearch-project/OpenSearch/pull/19991))
115120

116121
### Removed
117122

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
</a>
44

55
[![License](https://img.shields.io/badge/license-Apache%20v2-blue.svg)](https://github.com/opensearch-project/OpenSearch/blob/main/LICENSE.txt)
6-
[![LFX Health Score](https://insights.production.lfx.dev/api/badge/health-score?project=opensearch-foundation)](https://insights.linuxfoundation.org/project/opensearch-foundation)
7-
[![LFX Active Contributors](https://insights.production.lfx.dev/api/badge/active-contributors?project=opensearch-foundation&repos=https://github.com/opensearch-project/OpenSearch)](https://insights.linuxfoundation.org/project/opensearch-foundation/repository/opensearch-project-opensearch)
6+
[![LFX Health Score](https://insights.linuxfoundation.org/api/badge/health-score?project=opensearch-foundation)](https://insights.linuxfoundation.org/project/opensearch-foundation)
7+
[![LFX Active Contributors](https://insights.linuxfoundation.org/api/badge/active-contributors?project=opensearch-foundation)](https://insights.linuxfoundation.org/project/opensearch-foundation)
88
[![Code Coverage](https://codecov.io/gh/opensearch-project/OpenSearch/branch/main/graph/badge.svg)](https://codecov.io/gh/opensearch-project/OpenSearch)
99
![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/opensearch-project/OpenSearch?sort=semver)
1010
[![Linkedin](https://img.shields.io/badge/Follow-Linkedin-blue)](https://www.linkedin.com/company/opensearch-project)

buildSrc/src/main/resources/forbidden/opensearch-test-signatures.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,8 @@ com.carrotsearch.randomizedtesting.annotations.Nightly @ We don't run nightly te
2626
org.junit.Test @defaultMessage Just name your test method testFooBar
2727

2828
java.lang.Math#random() @ Use one of the various randomization methods from LuceneTestCase or OpenSearchTestCase for reproducibility
29+
30+
@defaultMessage Disabling assertions is a JVM-wide setting and will break other tests
31+
java.lang.ClassLoader#setDefaultAssertionStatus(boolean)
32+
java.lang.ClassLoader#setPackageAssertionStatus(java.lang.String, boolean)
33+
java.lang.ClassLoader#setClassAssertionStatus(java.lang.String, boolean)

modules/aggs-matrix-stats/src/main/java/org/opensearch/search/aggregations/matrix/stats/MatrixStatsAggregator.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
package org.opensearch.search.aggregations.matrix.stats;
3333

3434
import org.apache.lucene.index.LeafReaderContext;
35+
import org.apache.lucene.search.DocIdStream;
3536
import org.apache.lucene.search.ScoreMode;
3637
import org.opensearch.common.lease.Releasables;
3738
import org.opensearch.common.util.BigArrays;
@@ -113,6 +114,16 @@ public void collect(int doc, long bucket) throws IOException {
113114
}
114115
}
115116

117+
@Override
118+
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
119+
super.collect(stream, owningBucketOrd);
120+
}
121+
122+
@Override
123+
public void collectRange(int min, int max) throws IOException {
124+
super.collectRange(min, max);
125+
}
126+
116127
/**
117128
* return a map of field names and data
118129
*/

plugins/arrow-flight-rpc/src/main/java/org/opensearch/arrow/flight/transport/FlightClientChannel.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.opensearch.cluster.node.DiscoveryNode;
1919
import org.opensearch.common.util.concurrent.ThreadContext;
2020
import org.opensearch.core.action.ActionListener;
21-
import org.opensearch.core.common.bytes.BytesArray;
2221
import org.opensearch.core.common.bytes.BytesReference;
2322
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
2423
import org.opensearch.core.common.transport.BoundTransportAddress;
@@ -35,7 +34,6 @@
3534
import java.io.IOException;
3635
import java.net.InetAddress;
3736
import java.net.InetSocketAddress;
38-
import java.util.Arrays;
3937
import java.util.List;
4038
import java.util.concurrent.CompletableFuture;
4139
import java.util.concurrent.CopyOnWriteArrayList;
@@ -351,8 +349,7 @@ private void notifyListener(ActionListener<Void> listener, CompletableFuture<Voi
351349
}
352350

353351
private Ticket serializeToTicket(BytesReference reference) {
354-
byte[] data = Arrays.copyOfRange(((BytesArray) reference).array(), 0, reference.length());
355-
return new Ticket(data);
352+
return new Ticket(BytesReference.toBytes(reference));
356353
}
357354

358355
@Override

plugins/arrow-flight-rpc/src/test/java/org/opensearch/arrow/flight/transport/FlightClientChannelTests.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.core.common.bytes.BytesArray;
1414
import org.opensearch.core.common.bytes.BytesReference;
1515
import org.opensearch.core.common.io.stream.StreamInput;
16+
import org.opensearch.core.common.io.stream.StreamOutput;
1617
import org.opensearch.core.transport.TransportResponse;
1718
import org.opensearch.threadpool.ThreadPool;
1819
import org.opensearch.transport.StreamTransportResponseHandler;
@@ -588,4 +589,82 @@ public void testSetMessageListenerTwice() {
588589
IllegalStateException exception = assertThrows(IllegalStateException.class, () -> flightTransport.setMessageListener(listener2));
589590
assertEquals("Cannot set message listener twice", exception.getMessage());
590591
}
592+
593+
static class LargeTestRequest extends TestRequest {
594+
private final String largeData;
595+
596+
LargeTestRequest(String data) {
597+
this.largeData = data;
598+
}
599+
600+
LargeTestRequest(StreamInput in) throws IOException {
601+
super(in);
602+
this.largeData = in.readString();
603+
}
604+
605+
@Override
606+
public void writeTo(StreamOutput out) throws IOException {
607+
super.writeTo(out);
608+
out.writeString(largeData);
609+
}
610+
}
611+
612+
public void testLargeRequest() throws Exception {
613+
String action = "internal:test/large";
614+
CountDownLatch latch = new CountDownLatch(1);
615+
AtomicReference<Exception> error = new AtomicReference<>();
616+
617+
streamTransportService.registerRequestHandler(action, ThreadPool.Names.SAME, LargeTestRequest::new, (request, channel, task) -> {
618+
try {
619+
channel.sendResponseBatch(new TestResponse("OK"));
620+
channel.completeStream();
621+
} catch (Exception e) {
622+
try {
623+
channel.sendResponse(e);
624+
} catch (IOException ex) {}
625+
}
626+
});
627+
628+
LargeTestRequest testRequest = new LargeTestRequest("X".repeat(20 * 1024));
629+
630+
streamTransportService.sendRequest(
631+
remoteNode,
632+
action,
633+
testRequest,
634+
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STREAM).build(),
635+
new StreamTransportResponseHandler<TestResponse>() {
636+
@Override
637+
public void handleStreamResponse(StreamTransportResponse<TestResponse> streamResponse) {
638+
try {
639+
while (streamResponse.nextResponse() != null) {
640+
}
641+
streamResponse.close();
642+
} catch (Exception e) {
643+
error.set(e);
644+
} finally {
645+
latch.countDown();
646+
}
647+
}
648+
649+
@Override
650+
public void handleException(TransportException exp) {
651+
error.set(exp);
652+
latch.countDown();
653+
}
654+
655+
@Override
656+
public String executor() {
657+
return ThreadPool.Names.SAME;
658+
}
659+
660+
@Override
661+
public TestResponse read(StreamInput in) throws IOException {
662+
return new TestResponse(in);
663+
}
664+
}
665+
);
666+
667+
assertTrue(latch.await(TIMEOUT_SEC, TimeUnit.SECONDS));
668+
assertNull(error.get());
669+
}
591670
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,123 @@ public void testAllActiveIngestionPeriodicFlush() throws Exception {
778778

779779
waitForSearchableDocs(10, Arrays.asList(nodeA));
780780
waitForState(() -> getPeriodicFlushCount(nodeA, indexName) >= 1);
781+
}
782+
783+
public void testRawPayloadMapperIngestion() throws Exception {
784+
// Start cluster
785+
internalCluster().startClusterManagerOnlyNode();
786+
final String nodeA = internalCluster().startDataOnlyNode();
787+
788+
// Publish 2 valid messages
789+
String validMessage1 = "{\"name\":\"alice\",\"age\":30}";
790+
String validMessage2 = "{\"name\":\"bob\",\"age\":25}";
791+
produceData(validMessage1);
792+
produceData(validMessage2);
793+
794+
// Create index with raw_payload mapper
795+
createIndex(
796+
indexName,
797+
Settings.builder()
798+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
799+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
800+
.put("ingestion_source.type", "kafka")
801+
.put("ingestion_source.param.topic", topicName)
802+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
803+
.put("ingestion_source.pointer.init.reset", "earliest")
804+
.put("ingestion_source.mapper_type", "raw_payload")
805+
.put("ingestion_source.error_strategy", "drop")
806+
.put("ingestion_source.all_active", true)
807+
.build(),
808+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
809+
);
810+
811+
ensureGreen(indexName);
812+
813+
// Wait for both messages to be indexed
814+
waitForSearchableDocs(2, List.of(nodeA));
815+
816+
// Verify stats show 2 processed messages
817+
waitForState(() -> {
818+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
819+
.getPollingIngestStats();
820+
return stats != null
821+
&& stats.getMessageProcessorStats().totalProcessedCount() == 2L
822+
&& stats.getConsumerStats().totalPolledCount() == 2L
823+
&& stats.getConsumerStats().totalPollerMessageFailureCount() == 0L
824+
&& stats.getConsumerStats().totalPollerMessageDroppedCount() == 0L
825+
&& stats.getMessageProcessorStats().totalInvalidMessageCount() == 0L;
826+
});
827+
828+
// Validate document content
829+
SearchResponse searchResponse = client().prepareSearch(indexName).get();
830+
assertEquals(2, searchResponse.getHits().getHits().length);
831+
for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
832+
Map<String, Object> source = searchResponse.getHits().getHits()[i].getSourceAsMap();
833+
assertTrue(source.containsKey("name"));
834+
assertTrue(source.containsKey("age"));
835+
}
836+
837+
// Publish invalid JSON message
838+
String invalidJsonMessage = "{ invalid json";
839+
produceData(invalidJsonMessage);
840+
841+
// Wait for consumer to encounter the error and drop it
842+
waitForState(() -> {
843+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
844+
.getPollingIngestStats();
845+
return stats != null
846+
&& stats.getConsumerStats().totalPolledCount() == 3L
847+
&& stats.getConsumerStats().totalPollerMessageFailureCount() == 1L
848+
&& stats.getConsumerStats().totalPollerMessageDroppedCount() == 1L
849+
&& stats.getMessageProcessorStats().totalProcessedCount() == 2L;
850+
});
851+
852+
// Publish message with invalid content that will fail at processor level
853+
String invalidFieldTypeMessage = "{\"name\":123,\"age\":\"not a number\"}";
854+
produceData(invalidFieldTypeMessage);
855+
856+
// Wait for processor to encounter the error
857+
waitForState(() -> {
858+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
859+
.getPollingIngestStats();
860+
return stats != null
861+
&& stats.getConsumerStats().totalPolledCount() == 4L
862+
&& stats.getConsumerStats().totalPollerMessageFailureCount() == 1L
863+
&& stats.getMessageProcessorStats().totalProcessedCount() == 3L
864+
&& stats.getMessageProcessorStats().totalFailedCount() == 1L
865+
&& stats.getMessageProcessorStats().totalFailuresDroppedCount() == 1L;
866+
});
867+
868+
// Pause ingestion, reset to offset 0, and resume
869+
pauseIngestion(indexName);
870+
waitForState(() -> {
871+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
872+
return ingestionState.getShardStates().length == 1
873+
&& ingestionState.getFailedShards() == 0
874+
&& ingestionState.getShardStates()[0].isPollerPaused()
875+
&& ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("paused");
876+
});
877+
878+
// Resume with reset to offset 0 (will re-process the 2 valid messages)
879+
resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0");
880+
waitForState(() -> {
881+
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
882+
return ingestionState.getShardStates().length == 1
883+
&& ingestionState.getShardStates()[0].isPollerPaused() == false
884+
&& (ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("polling")
885+
|| ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("processing"));
886+
});
781887

888+
// Wait for the 3 messages to be processed by the processor after reset (1 will be dropped by the poller)
889+
waitForState(() -> {
890+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
891+
.getPollingIngestStats();
892+
return stats != null && stats.getMessageProcessorStats().totalProcessedCount() == 3L;
893+
});
894+
895+
// Verify still only 2 documents (no duplicates must be indexed)
896+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
897+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
898+
assertThat(response.getHits().getTotalHits().value(), is(2L));
782899
}
783900
}

0 commit comments

Comments
 (0)