Skip to content

Commit

Permalink
Merge branch 'main' into segrep-backpressure-stats
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com>
  • Loading branch information
Rishikesh1159 authored Oct 20, 2023
2 parents 5765b25 + 781968b commit 8567968
Show file tree
Hide file tree
Showing 23 changed files with 810 additions and 77 deletions.
1 change: 1 addition & 0 deletions .ci/bwcVersions
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ BWC_VERSION:
- "2.10.0"
- "2.10.1"
- "2.11.0"
- "2.11.1"
- "2.12.0"
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @reta @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @gbbafna @setiah @kartg @kotwanikunal @mch2 @nknize @owaiskazi19 @peternied @Rishikesh1159 @ryanbogan @saratvemulapalli @shwetathareja @dreamer-89 @tlfeng @VachaShah @dbwiddis @sachinpkale @sohami @msfroh
* @abbashus @adnapibar @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @dbwiddis @dreamer-89 @gbbafna @kartg @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @peternied @reta @Rishikesh1159 @ryanbogan @sachinpkale @saratvemulapalli @setiah @shwetathareja @sohami @tlfeng @VachaShah
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
1 change: 1 addition & 0 deletions libs/core/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {
public static final Version V_2_10_0 = new Version(2100099, org.apache.lucene.util.Version.LUCENE_9_7_0);
public static final Version V_2_10_1 = new Version(2100199, org.apache.lucene.util.Version.LUCENE_9_7_0);
public static final Version V_2_11_0 = new Version(2110099, org.apache.lucene.util.Version.LUCENE_9_7_0);
public static final Version V_2_11_1 = new Version(2110199, org.apache.lucene.util.Version.LUCENE_9_7_0);
public static final Version V_2_12_0 = new Version(2120099, org.apache.lucene.util.Version.LUCENE_9_8_0);
public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_8_0);
public static final Version CURRENT = V_3_0_0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return builder.build();
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/10735")
@Override
public void testRequestStats() throws Exception {
final String repository = createRepository(randomName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -140,28 +141,39 @@ private static void uploadPart(
ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH
? priorityExecutorService
: executorService;
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
CompletableFuture<UploadPartResponse> uploadPartResponseFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.uploadPart(
uploadPartRequest,
AsyncRequestBody.fromInputStream(
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)),
inputStreamContainer.getContentLength(),
streamReadExecutor
)
AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor)
)
);

CompletableFuture<CompletedPart> convertFuture = uploadPartResponseFuture.thenApply(
uploadPartResponse -> convertUploadPartResponse(
completedParts,
inputStreamContainers,
uploadPartResponse,
partNumber,
uploadRequest.doRemoteDataIntegrityCheck()
)
);
CompletableFuture<CompletedPart> convertFuture = uploadPartResponseFuture.whenComplete((resp, throwable) -> {
try {
inputStream.close();
} catch (IOException ex) {
log.error(
() -> new ParameterizedMessage(
"Failed to close stream while uploading a part of idx {} and file {}.",
uploadPartRequest.partNumber(),
uploadPartRequest.key()
),
ex
);
}
})
.thenApply(
uploadPartResponse -> convertUploadPartResponse(
completedParts,
inputStreamContainers,
uploadPartResponse,
partNumber,
uploadRequest.doRemoteDataIntegrityCheck()
)
);
futures.add(convertFuture);

CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartResponseFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
Expand Down Expand Up @@ -310,17 +311,22 @@ private void uploadInOneChunk(
ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH
? priorityExecutorService
: executorService;
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
CompletableFuture<Void> putObjectFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.putObject(
putObjectRequestBuilder.build(),
AsyncRequestBody.fromInputStream(
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)),
inputStreamContainer.getContentLength(),
streamReadExecutor
)
AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor)
).handle((resp, throwable) -> {
try {
inputStream.close();
} catch (IOException e) {
log.error(
() -> new ParameterizedMessage("Failed to close stream while uploading single file {}.", uploadRequest.getKey()),
e
);
}
if (throwable != null) {
Throwable unwrappedThrowable = ExceptionsHelper.unwrap(throwable, S3Exception.class);
if (unwrappedThrowable != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,14 @@
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -71,17 +76,16 @@ public void testOneChunkUpload() {
putObjectResponseCompletableFuture
);

AtomicReference<InputStream> streamRef = new AtomicReference<>();
CompletableFuture<Void> resultFuture = asyncTransferManager.uploadObject(
s3AsyncClient,
new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> {
// do nothing
}, false, null),
new StreamContext(
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position),
ByteSizeUnit.MB.toBytes(1),
ByteSizeUnit.MB.toBytes(1),
1
),
new StreamContext((partIdx, partSize, position) -> {
streamRef.set(new ZeroInputStream(partSize));
return new InputStreamContainer(streamRef.get(), partSize, position);
}, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 1),
new StatsMetricPublisher()
);

Expand All @@ -92,6 +96,14 @@ public void testOneChunkUpload() {
}

verify(s3AsyncClient, times(1)).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class));

boolean closeError = false;
try {
streamRef.get().available();
} catch (IOException e) {
closeError = e.getMessage().equals("Stream closed");
}
assertTrue("InputStream was still open after upload", closeError);
}

public void testOneChunkUploadCorruption() {
Expand Down Expand Up @@ -162,17 +174,17 @@ public void testMultipartUpload() {
abortMultipartUploadResponseCompletableFuture
);

List<InputStream> streams = new ArrayList<>();
CompletableFuture<Void> resultFuture = asyncTransferManager.uploadObject(
s3AsyncClient,
new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> {
// do nothing
}, true, 3376132981L),
new StreamContext(
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position),
ByteSizeUnit.MB.toBytes(1),
ByteSizeUnit.MB.toBytes(1),
5
),
new StreamContext((partIdx, partSize, position) -> {
InputStream stream = new ZeroInputStream(partSize);
streams.add(stream);
return new InputStreamContainer(stream, partSize, position);
}, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 5),
new StatsMetricPublisher()
);

Expand All @@ -182,6 +194,16 @@ public void testMultipartUpload() {
fail("did not expect resultFuture to fail");
}

streams.forEach(stream -> {
boolean closeError = false;
try {
stream.available();
} catch (IOException e) {
closeError = e.getMessage().equals("Stream closed");
}
assertTrue("InputStream was still open after upload", closeError);
});

verify(s3AsyncClient, times(1)).createMultipartUpload(any(CreateMultipartUploadRequest.class));
verify(s3AsyncClient, times(5)).uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class));
verify(s3AsyncClient, times(1)).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public NodeStats(StreamInput in) throws IOException {
} else {
segmentReplicationRejectionStats = null;

Check warning on line 222 in server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java#L222

Added line #L222 was not covered by tests
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
repositoriesStats = in.readOptionalWriteable(RepositoriesStats::new);
} else {
repositoriesStats = null;
Expand Down Expand Up @@ -485,7 +485,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(segmentReplicationRejectionStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(repositoriesStats);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilderVisitor;
import org.opensearch.index.query.QueryShapeVisitor;
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.SortBuilder;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.util.List;
import java.util.ListIterator;

/**
* Class to categorize the search queries based on the type and increment the relevant counters.
* Class also logs the query shape.
*/
final class SearchQueryCategorizer {

private static final Logger log = LogManager.getLogger(SearchQueryCategorizer.class);

final SearchQueryCounters searchQueryCounters;

public SearchQueryCategorizer(MetricsRegistry metricsRegistry) {
searchQueryCounters = new SearchQueryCounters(metricsRegistry);
}

public void categorize(SearchSourceBuilder source) {
QueryBuilder topLevelQueryBuilder = source.query();

logQueryShape(topLevelQueryBuilder);
incrementQueryTypeCounters(topLevelQueryBuilder);
incrementQueryAggregationCounters(source.aggregations());
incrementQuerySortCounters(source.sorts());
}

private void incrementQuerySortCounters(List<SortBuilder<?>> sorts) {
if (sorts != null && sorts.size() > 0) {
for (ListIterator<SortBuilder<?>> it = sorts.listIterator(); it.hasNext();) {
SortBuilder sortBuilder = it.next();
String sortOrder = sortBuilder.order().toString();
searchQueryCounters.sortCounter.add(1, Tags.create().addTag("sort_order", sortOrder));
}
}
}

private void incrementQueryAggregationCounters(AggregatorFactories.Builder aggregations) {
if (aggregations != null) {
searchQueryCounters.aggCounter.add(1);
}
}

private void incrementQueryTypeCounters(QueryBuilder topLevelQueryBuilder) {
if (topLevelQueryBuilder == null) {
return;
}
QueryBuilderVisitor searchQueryVisitor = new SearchQueryCategorizingVisitor(searchQueryCounters);
topLevelQueryBuilder.visit(searchQueryVisitor);
}

private void logQueryShape(QueryBuilder topLevelQueryBuilder) {
if (topLevelQueryBuilder == null) {
return;
}
QueryShapeVisitor shapeVisitor = new QueryShapeVisitor();
topLevelQueryBuilder.visit(shapeVisitor);
log.debug("Query shape : {}", shapeVisitor.prettyPrintTree(" "));
}

}
Loading

0 comments on commit 8567968

Please sign in to comment.