diff --git a/.github/workflows/version.yml b/.github/workflows/version.yml index eaf4d085c6946..fdf42a9a2731e 100644 --- a/.github/workflows/version.yml +++ b/.github/workflows/version.yml @@ -30,7 +30,11 @@ jobs: CURRENT_VERSION_ARRAY[2]=$((CURRENT_VERSION_ARRAY[2]+1)) NEXT_VERSION=$(IFS=. ; echo "${CURRENT_VERSION_ARRAY[*]:0:3}") NEXT_VERSION_UNDERSCORE=$(IFS=_ ; echo "V_${CURRENT_VERSION_ARRAY[*]:0:3}") - NEXT_VERSION_ID=$(IFS=0 ; echo "${CURRENT_VERSION_ARRAY[*]:0:3}99") + if [[ ${#CURRENT_VERSION_ARRAY[2]} -gt 1 ]]; then + NEXT_VERSION_ID="${CURRENT_VERSION_ARRAY[0]:0:3}0${CURRENT_VERSION_ARRAY[1]:0:3}${CURRENT_VERSION_ARRAY[2]:0:3}99" + else + NEXT_VERSION_ID=$(IFS=0 ; echo "${CURRENT_VERSION_ARRAY[*]:0:3}99") + fi echo "TAG=$TAG" >> $GITHUB_ENV echo "BASE=$BASE" >> $GITHUB_ENV echo "BASE_X=$BASE_X" >> $GITHUB_ENV diff --git a/CHANGELOG.md b/CHANGELOG.md index 5375dfbecea65..24c86bf6ff714 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) - Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792)) - Pass localNode info to all plugins on node start ([#7919](https://github.com/opensearch-project/OpenSearch/pull/7919)) +- Improved performance of parsing floating point numbers ([#7909](https://github.com/opensearch-project/OpenSearch/pull/7909)) ### Deprecated @@ -75,6 +76,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460)) - Replaces ZipInputStream with ZipFile to fix Zip Slip vulnerability ([#7230](https://github.com/opensearch-project/OpenSearch/pull/7230)) - Add missing validation/parsing of SearchBackpressureMode of SearchBackpressureSettings ([#7541](https://github.com/opensearch-project/OpenSearch/pull/7541)) +- Adds log4j configuration for telemetry LogSpanExporter ([#8393](https://github.com/opensearch-project/OpenSearch/pull/8393)) ### Security @@ -92,6 +94,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038)) - Add API to initialize extensions ([#8029]()https://github.com/opensearch-project/OpenSearch/pull/8029) - Add distributed tracing framework ([#7543](https://github.com/opensearch-project/OpenSearch/issues/7543)) +- Enable Point based optimization for custom comparators ([#8168](https://github.com/opensearch-project/OpenSearch/pull/8168)) +- [Extensions] Support extension additional settings with extension REST initialization ([#8414](https://github.com/opensearch-project/OpenSearch/pull/8414)) - Adds mock implementation for TelemetryPlugin ([#7545](https://github.com/opensearch-project/OpenSearch/issues/7545)) ### Dependencies @@ -125,6 +129,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Upgrade] Lucene 9.7.0 release (#8272) - Bump `org.jboss.resteasy:resteasy-jackson2-provider` from 3.0.26.Final to 6.2.4.Final in /qa/wildfly ([#8209](https://github.com/opensearch-project/OpenSearch/pull/8209)) - Bump `com.google.api-client:google-api-client` from 1.34.0 to 2.2.0 ([#8276](https://github.com/opensearch-project/OpenSearch/pull/8276)) +- Update Apache HttpCore/ HttpClient and Apache HttpCore5 / HttpClient5 dependencies ([#8434](https://github.com/opensearch-project/OpenSearch/pull/8434)) ### Changed - Replace jboss-annotations-api_1.2_spec with jakarta.annotation-api ([#7836](https://github.com/opensearch-project/OpenSearch/pull/7836)) @@ -156,6 +161,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - With only GlobalAggregation in request causes unnecessary wrapping with MultiCollector ([#8125](https://github.com/opensearch-project/OpenSearch/pull/8125)) - Fix mapping char_filter when mapping a hashtag ([#7591](https://github.com/opensearch-project/OpenSearch/pull/7591)) - Fix NPE in multiterms aggregations involving empty buckets ([#7318](https://github.com/opensearch-project/OpenSearch/pull/7318)) +- Precise system clock time in MasterService debug logs ([#7902](https://github.com/opensearch-project/OpenSearch/pull/7902)) ### Security diff --git a/buildSrc/version.properties b/buildSrc/version.properties index f9eac9516cb18..408b03e60cc5d 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -33,10 +33,10 @@ netty = 4.1.94.Final joda = 2.12.2 # client dependencies -httpclient5 = 5.1.4 -httpcore5 = 5.1.5 -httpclient = 4.5.13 -httpcore = 4.4.15 +httpclient5 = 5.2.1 +httpcore5 = 5.2.2 +httpclient = 4.5.14 +httpcore = 4.4.16 httpasyncclient = 4.1.5 commonslogging = 1.2 commonscodec = 1.15 diff --git a/client/rest/licenses/httpclient5-5.1.4.jar.sha1 b/client/rest/licenses/httpclient5-5.1.4.jar.sha1 deleted file mode 100644 index 3c0cb1335fb88..0000000000000 --- a/client/rest/licenses/httpclient5-5.1.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -208f9eed6d6ab709e2ae7a75b457ef60c0baefa5 \ No newline at end of file diff --git a/client/rest/licenses/httpclient5-5.2.1.jar.sha1 b/client/rest/licenses/httpclient5-5.2.1.jar.sha1 new file mode 100644 index 0000000000000..3555fe22f8e12 --- /dev/null +++ b/client/rest/licenses/httpclient5-5.2.1.jar.sha1 @@ -0,0 +1 @@ +0c900514d3446d9ce5d9dbd90c21192048125440 \ No newline at end of file diff --git a/client/rest/licenses/httpcore5-5.1.5.jar.sha1 b/client/rest/licenses/httpcore5-5.1.5.jar.sha1 deleted file mode 100644 index 8da253152e970..0000000000000 --- a/client/rest/licenses/httpcore5-5.1.5.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -df9da3a1fa2351c4790245400ed28d78a8ddd3fc \ No newline at end of file diff --git a/client/rest/licenses/httpcore5-5.2.2.jar.sha1 b/client/rest/licenses/httpcore5-5.2.2.jar.sha1 new file mode 100644 index 0000000000000..b641256c7d4a4 --- /dev/null +++ b/client/rest/licenses/httpcore5-5.2.2.jar.sha1 @@ -0,0 +1 @@ +6da28f5aa6c2b129ef49632e041a5203ce7507b2 \ No newline at end of file diff --git a/client/rest/licenses/httpcore5-h2-5.1.5.jar.sha1 b/client/rest/licenses/httpcore5-h2-5.1.5.jar.sha1 deleted file mode 100644 index 097e6cc2a3be8..0000000000000 --- a/client/rest/licenses/httpcore5-h2-5.1.5.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -624660339afd5006d427457e6b10b10b32fd86f1 \ No newline at end of file diff --git a/client/rest/licenses/httpcore5-h2-5.2.2.jar.sha1 b/client/rest/licenses/httpcore5-h2-5.2.2.jar.sha1 new file mode 100644 index 0000000000000..94bc0fa49bdb0 --- /dev/null +++ b/client/rest/licenses/httpcore5-h2-5.2.2.jar.sha1 @@ -0,0 +1 @@ +54ee1ed58fe8ac40be1083ea9873a6c734939ab9 \ No newline at end of file diff --git a/client/sniffer/licenses/httpclient5-5.1.4.jar.sha1 b/client/sniffer/licenses/httpclient5-5.1.4.jar.sha1 deleted file mode 100644 index 3c0cb1335fb88..0000000000000 --- a/client/sniffer/licenses/httpclient5-5.1.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -208f9eed6d6ab709e2ae7a75b457ef60c0baefa5 \ No newline at end of file diff --git a/client/sniffer/licenses/httpclient5-5.2.1.jar.sha1 b/client/sniffer/licenses/httpclient5-5.2.1.jar.sha1 new file mode 100644 index 0000000000000..3555fe22f8e12 --- /dev/null +++ b/client/sniffer/licenses/httpclient5-5.2.1.jar.sha1 @@ -0,0 +1 @@ +0c900514d3446d9ce5d9dbd90c21192048125440 \ No newline at end of file diff --git a/client/sniffer/licenses/httpcore5-5.1.5.jar.sha1 b/client/sniffer/licenses/httpcore5-5.1.5.jar.sha1 deleted file mode 100644 index 8da253152e970..0000000000000 --- a/client/sniffer/licenses/httpcore5-5.1.5.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -df9da3a1fa2351c4790245400ed28d78a8ddd3fc \ No newline at end of file diff --git a/client/sniffer/licenses/httpcore5-5.2.2.jar.sha1 b/client/sniffer/licenses/httpcore5-5.2.2.jar.sha1 new file mode 100644 index 0000000000000..b641256c7d4a4 --- /dev/null +++ b/client/sniffer/licenses/httpcore5-5.2.2.jar.sha1 @@ -0,0 +1 @@ +6da28f5aa6c2b129ef49632e041a5203ce7507b2 \ No newline at end of file diff --git a/libs/x-content/src/main/java/org/opensearch/common/xcontent/cbor/CborXContent.java b/libs/x-content/src/main/java/org/opensearch/common/xcontent/cbor/CborXContent.java index d7df53e7a0cf5..46891b279ba43 100644 --- a/libs/x-content/src/main/java/org/opensearch/common/xcontent/cbor/CborXContent.java +++ b/libs/x-content/src/main/java/org/opensearch/common/xcontent/cbor/CborXContent.java @@ -36,6 +36,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.StreamReadConstraints; +import com.fasterxml.jackson.core.StreamReadFeature; import com.fasterxml.jackson.dataformat.cbor.CBORFactory; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.MediaType; @@ -75,6 +76,7 @@ public static XContentBuilder contentBuilder() throws IOException { cborFactory.configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, false); cborFactory.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, true); cborFactory.setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(DEFAULT_MAX_STRING_LEN).build()); + cborFactory.configure(StreamReadFeature.USE_FAST_DOUBLE_PARSER.mappedFeature(), true); cborXContent = new CborXContent(); } diff --git a/libs/x-content/src/main/java/org/opensearch/common/xcontent/json/JsonXContent.java b/libs/x-content/src/main/java/org/opensearch/common/xcontent/json/JsonXContent.java index 8ff8e7730b189..e6c27e4cf3eef 100644 --- a/libs/x-content/src/main/java/org/opensearch/common/xcontent/json/JsonXContent.java +++ b/libs/x-content/src/main/java/org/opensearch/common/xcontent/json/JsonXContent.java @@ -37,7 +37,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.StreamReadConstraints; - +import com.fasterxml.jackson.core.StreamReadFeature; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -78,6 +78,7 @@ public static XContentBuilder contentBuilder() throws IOException { jsonFactory.configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, false); jsonFactory.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, true); jsonFactory.setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(DEFAULT_MAX_STRING_LEN).build()); + jsonFactory.configure(StreamReadFeature.USE_FAST_DOUBLE_PARSER.mappedFeature(), true); jsonXContent = new JsonXContent(); } diff --git a/libs/x-content/src/main/java/org/opensearch/common/xcontent/smile/SmileXContent.java b/libs/x-content/src/main/java/org/opensearch/common/xcontent/smile/SmileXContent.java index e0a39df1589a2..eb968556de8c9 100644 --- a/libs/x-content/src/main/java/org/opensearch/common/xcontent/smile/SmileXContent.java +++ b/libs/x-content/src/main/java/org/opensearch/common/xcontent/smile/SmileXContent.java @@ -36,6 +36,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.StreamReadConstraints; +import com.fasterxml.jackson.core.StreamReadFeature; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileGenerator; import org.opensearch.core.xcontent.DeprecationHandler; @@ -77,6 +78,7 @@ public static XContentBuilder contentBuilder() throws IOException { smileFactory.configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, false); smileFactory.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, true); smileFactory.setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(DEFAULT_MAX_STRING_LEN).build()); + smileFactory.configure(StreamReadFeature.USE_FAST_DOUBLE_PARSER.mappedFeature(), true); smileXContent = new SmileXContent(); } diff --git a/libs/x-content/src/main/java/org/opensearch/common/xcontent/yaml/YamlXContent.java b/libs/x-content/src/main/java/org/opensearch/common/xcontent/yaml/YamlXContent.java index 1e73cb2bd9c5e..bb4fa9a09d448 100644 --- a/libs/x-content/src/main/java/org/opensearch/common/xcontent/yaml/YamlXContent.java +++ b/libs/x-content/src/main/java/org/opensearch/common/xcontent/yaml/YamlXContent.java @@ -35,6 +35,7 @@ import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.StreamReadConstraints; +import com.fasterxml.jackson.core.StreamReadFeature; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -70,6 +71,7 @@ public static XContentBuilder contentBuilder() throws IOException { yamlFactory = new YAMLFactory(); yamlFactory.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, true); yamlFactory.setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(DEFAULT_MAX_STRING_LEN).build()); + yamlFactory.configure(StreamReadFeature.USE_FAST_DOUBLE_PARSER.mappedFeature(), true); yamlXContent = new YamlXContent(); } diff --git a/plugins/discovery-azure-classic/licenses/httpclient-4.5.13.jar.sha1 b/plugins/discovery-azure-classic/licenses/httpclient-4.5.13.jar.sha1 deleted file mode 100644 index 3281e21595b39..0000000000000 --- a/plugins/discovery-azure-classic/licenses/httpclient-4.5.13.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e5f6cae5ca7ecaac1ec2827a9e2d65ae2869cada \ No newline at end of file diff --git a/plugins/discovery-azure-classic/licenses/httpclient-4.5.14.jar.sha1 b/plugins/discovery-azure-classic/licenses/httpclient-4.5.14.jar.sha1 new file mode 100644 index 0000000000000..66e05851c2e3c --- /dev/null +++ b/plugins/discovery-azure-classic/licenses/httpclient-4.5.14.jar.sha1 @@ -0,0 +1 @@ +1194890e6f56ec29177673f2f12d0b8e627dec98 \ No newline at end of file diff --git a/plugins/discovery-azure-classic/licenses/httpcore-4.4.15.jar.sha1 b/plugins/discovery-azure-classic/licenses/httpcore-4.4.15.jar.sha1 deleted file mode 100644 index 42a03b5d7a376..0000000000000 --- a/plugins/discovery-azure-classic/licenses/httpcore-4.4.15.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -7f2e0c573eaa7a74bac2e89b359e1f73d92a0a1d \ No newline at end of file diff --git a/plugins/discovery-azure-classic/licenses/httpcore-4.4.16.jar.sha1 b/plugins/discovery-azure-classic/licenses/httpcore-4.4.16.jar.sha1 new file mode 100644 index 0000000000000..172110694b5bd --- /dev/null +++ b/plugins/discovery-azure-classic/licenses/httpcore-4.4.16.jar.sha1 @@ -0,0 +1 @@ +51cf043c87253c9f58b539c9f7e44c8894223850 \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/httpclient-4.5.13.jar.sha1 b/plugins/discovery-ec2/licenses/httpclient-4.5.13.jar.sha1 deleted file mode 100644 index 3281e21595b39..0000000000000 --- a/plugins/discovery-ec2/licenses/httpclient-4.5.13.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e5f6cae5ca7ecaac1ec2827a9e2d65ae2869cada \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/httpclient-4.5.14.jar.sha1 b/plugins/discovery-ec2/licenses/httpclient-4.5.14.jar.sha1 new file mode 100644 index 0000000000000..66e05851c2e3c --- /dev/null +++ b/plugins/discovery-ec2/licenses/httpclient-4.5.14.jar.sha1 @@ -0,0 +1 @@ +1194890e6f56ec29177673f2f12d0b8e627dec98 \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/httpcore-4.4.15.jar.sha1 b/plugins/discovery-ec2/licenses/httpcore-4.4.15.jar.sha1 deleted file mode 100644 index 42a03b5d7a376..0000000000000 --- a/plugins/discovery-ec2/licenses/httpcore-4.4.15.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -7f2e0c573eaa7a74bac2e89b359e1f73d92a0a1d \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/httpcore-4.4.16.jar.sha1 b/plugins/discovery-ec2/licenses/httpcore-4.4.16.jar.sha1 new file mode 100644 index 0000000000000..172110694b5bd --- /dev/null +++ b/plugins/discovery-ec2/licenses/httpcore-4.4.16.jar.sha1 @@ -0,0 +1 @@ +51cf043c87253c9f58b539c9f7e44c8894223850 \ No newline at end of file diff --git a/plugins/discovery-gce/licenses/httpclient-4.5.13.jar.sha1 b/plugins/discovery-gce/licenses/httpclient-4.5.13.jar.sha1 deleted file mode 100644 index 3281e21595b39..0000000000000 --- a/plugins/discovery-gce/licenses/httpclient-4.5.13.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e5f6cae5ca7ecaac1ec2827a9e2d65ae2869cada \ No newline at end of file diff --git a/plugins/discovery-gce/licenses/httpclient-4.5.14.jar.sha1 b/plugins/discovery-gce/licenses/httpclient-4.5.14.jar.sha1 new file mode 100644 index 0000000000000..66e05851c2e3c --- /dev/null +++ b/plugins/discovery-gce/licenses/httpclient-4.5.14.jar.sha1 @@ -0,0 +1 @@ +1194890e6f56ec29177673f2f12d0b8e627dec98 \ No newline at end of file diff --git a/plugins/discovery-gce/licenses/httpcore-4.4.15.jar.sha1 b/plugins/discovery-gce/licenses/httpcore-4.4.15.jar.sha1 deleted file mode 100644 index 42a03b5d7a376..0000000000000 --- a/plugins/discovery-gce/licenses/httpcore-4.4.15.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -7f2e0c573eaa7a74bac2e89b359e1f73d92a0a1d \ No newline at end of file diff --git a/plugins/discovery-gce/licenses/httpcore-4.4.16.jar.sha1 b/plugins/discovery-gce/licenses/httpcore-4.4.16.jar.sha1 new file mode 100644 index 0000000000000..172110694b5bd --- /dev/null +++ b/plugins/discovery-gce/licenses/httpcore-4.4.16.jar.sha1 @@ -0,0 +1 @@ +51cf043c87253c9f58b539c9f7e44c8894223850 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/httpclient-4.5.13.jar.sha1 b/plugins/repository-s3/licenses/httpclient-4.5.13.jar.sha1 deleted file mode 100644 index 3281e21595b39..0000000000000 --- a/plugins/repository-s3/licenses/httpclient-4.5.13.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e5f6cae5ca7ecaac1ec2827a9e2d65ae2869cada \ No newline at end of file diff --git a/plugins/repository-s3/licenses/httpclient-4.5.14.jar.sha1 b/plugins/repository-s3/licenses/httpclient-4.5.14.jar.sha1 new file mode 100644 index 0000000000000..66e05851c2e3c --- /dev/null +++ b/plugins/repository-s3/licenses/httpclient-4.5.14.jar.sha1 @@ -0,0 +1 @@ +1194890e6f56ec29177673f2f12d0b8e627dec98 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/httpcore-4.4.15.jar.sha1 b/plugins/repository-s3/licenses/httpcore-4.4.15.jar.sha1 deleted file mode 100644 index 42a03b5d7a376..0000000000000 --- a/plugins/repository-s3/licenses/httpcore-4.4.15.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -7f2e0c573eaa7a74bac2e89b359e1f73d92a0a1d \ No newline at end of file diff --git a/plugins/repository-s3/licenses/httpcore-4.4.16.jar.sha1 b/plugins/repository-s3/licenses/httpcore-4.4.16.jar.sha1 new file mode 100644 index 0000000000000..172110694b5bd --- /dev/null +++ b/plugins/repository-s3/licenses/httpcore-4.4.16.jar.sha1 @@ -0,0 +1 @@ +51cf043c87253c9f58b539c9f7e44c8894223850 \ No newline at end of file diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index cf749eeffd903..49ebce77a59ad 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionListener; import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; import org.opensearch.common.blobstore.BlobContainer; @@ -296,6 +297,35 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List blobs .build(); } + @Override + public void listBlobsByPrefixInSortedOrder( + String blobNamePrefix, + int limit, + BlobNameSortOrder blobNameSortOrder, + ActionListener> listener + ) { + // As AWS S3 returns list of keys in Lexicographic order, we don't have to fetch all the keys in order to sort them + // We fetch only keys as per the given limit to optimize the fetch. If provided sort order is not Lexicographic, + // we fall-back to default implementation of fetching all the keys and sorting them. + if (blobNameSortOrder != BlobNameSortOrder.LEXICOGRAPHIC) { + super.listBlobsByPrefixInSortedOrder(blobNamePrefix, limit, blobNameSortOrder, listener); + } else { + if (limit < 0) { + throw new IllegalArgumentException("limit should not be a negative value"); + } + String prefix = blobNamePrefix == null ? keyPath : buildKey(blobNamePrefix); + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + List blobs = executeListing(clientReference, listObjectsRequest(prefix, limit), limit).stream() + .flatMap(listing -> listing.contents().stream()) + .map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size())) + .collect(Collectors.toList()); + listener.onResponse(blobs.subList(0, Math.min(limit, blobs.size()))); + } catch (final Exception e) { + listener.onFailure(new IOException("Exception when listing blobs by prefix [" + prefix + "]", e)); + } + } + } + @Override public Map listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException { String prefix = blobNamePrefix == null ? keyPath : buildKey(blobNamePrefix); @@ -339,10 +369,25 @@ public Map children() throws IOException { } private static List executeListing(AmazonS3Reference clientReference, ListObjectsV2Request listObjectsRequest) { + return executeListing(clientReference, listObjectsRequest, -1); + } + + private static List executeListing( + AmazonS3Reference clientReference, + ListObjectsV2Request listObjectsRequest, + int limit + ) { return SocketAccess.doPrivileged(() -> { final List results = new ArrayList<>(); + int totalObjects = 0; ListObjectsV2Iterable listObjectsIterable = clientReference.get().listObjectsV2Paginator(listObjectsRequest); - listObjectsIterable.forEach(results::add); + for (ListObjectsV2Response listObjectsV2Response : listObjectsIterable) { + results.add(listObjectsV2Response); + totalObjects += listObjectsV2Response.contents().size(); + if (limit != -1 && totalObjects > limit) { + break; + } + } return results; }); } @@ -356,6 +401,10 @@ private ListObjectsV2Request listObjectsRequest(String keyPath) { .build(); } + private ListObjectsV2Request listObjectsRequest(String keyPath, int limit) { + return listObjectsRequest(keyPath).toBuilder().maxKeys(Math.min(limit, 1000)).build(); + } + private String buildKey(String blobName) { return keyPath + blobName; } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index ec16f216f1777..a2a7ca8d8bdd5 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -33,6 +33,9 @@ package org.opensearch.repositories.s3; import org.mockito.ArgumentCaptor; +import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.blobstore.DeleteResult; @@ -74,9 +77,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -84,12 +91,12 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; public class S3BlobStoreContainerTests extends OpenSearchTestCase { @@ -187,26 +194,34 @@ private static class MockListObjectsV2ResponseIterator implements Iterator keysListed = new ArrayList<>(); + private final List keysListed; private final boolean throwExceptionOnNextInvocation; public MockListObjectsV2ResponseIterator(int totalPageCount, int s3ObjectsPerPage, long s3ObjectSize) { - this.totalPageCount = totalPageCount; - this.s3ObjectsPerPage = s3ObjectsPerPage; - this.s3ObjectSize = s3ObjectSize; - this.throwExceptionOnNextInvocation = false; + this(totalPageCount, s3ObjectsPerPage, s3ObjectSize, ""); + } + + public MockListObjectsV2ResponseIterator(int totalPageCount, int s3ObjectsPerPage, long s3ObjectSize, String blobPath) { + this(totalPageCount, s3ObjectsPerPage, s3ObjectSize, blobPath, false); } public MockListObjectsV2ResponseIterator( int totalPageCount, int s3ObjectsPerPage, long s3ObjectSize, + String blobPath, boolean throwExceptionOnNextInvocation ) { this.totalPageCount = totalPageCount; this.s3ObjectsPerPage = s3ObjectsPerPage; this.s3ObjectSize = s3ObjectSize; this.throwExceptionOnNextInvocation = throwExceptionOnNextInvocation; + keysListed = new ArrayList<>(); + for (int i = 0; i < totalPageCount * s3ObjectsPerPage; i++) { + keysListed.add(blobPath + UUID.randomUUID().toString()); + } + // S3 lists keys in lexicographic order + keysListed.sort(String::compareTo); } @Override @@ -220,11 +235,12 @@ public ListObjectsV2Response next() { throw SdkException.builder().build(); } if (currInvocationCount.getAndIncrement() < totalPageCount) { - String s3ObjectKey = UUID.randomUUID().toString(); - keysListed.add(s3ObjectKey); - return ListObjectsV2Response.builder() - .contents(Collections.nCopies(s3ObjectsPerPage, S3Object.builder().key(s3ObjectKey).size(s3ObjectSize).build())) - .build(); + List s3Objects = new ArrayList<>(); + for (int i = 0; i < s3ObjectsPerPage; i++) { + String s3ObjectKey = keysListed.get((currInvocationCount.get() - 1) * s3ObjectsPerPage + i); + s3Objects.add(S3Object.builder().key(s3ObjectKey).size(s3ObjectSize).build()); + } + return ListObjectsV2Response.builder().contents(s3Objects).build(); } throw new NoSuchElementException(); } @@ -232,6 +248,10 @@ public ListObjectsV2Response next() { public List getKeysListed() { return keysListed; } + + public int numberOfPagesFetched() { + return currInvocationCount.get(); + } } public void testDelete() throws IOException { @@ -273,10 +293,8 @@ public void testDelete() throws IOException { // keysDeleted will have blobPath also assertEquals(listObjectsV2ResponseIterator.getKeysListed().size(), keysDeleted.size() - 1); assertTrue(keysDeleted.contains(blobPath.buildAsString())); - assertArrayEquals( - listObjectsV2ResponseIterator.getKeysListed().toArray(String[]::new), - keysDeleted.stream().filter(key -> !blobPath.buildAsString().equals(key)).toArray(String[]::new) - ); + keysDeleted.remove(blobPath.buildAsString()); + assertEquals(new HashSet<>(listObjectsV2ResponseIterator.getKeysListed()), new HashSet<>(keysDeleted)); } public void testDeleteItemLevelErrorsDuringDelete() { @@ -772,4 +790,112 @@ private static void assertNumberOfMultiparts(final int expectedParts, final long assertEquals("Expected number of parts [" + expectedParts + "] but got [" + result.v1() + "]", expectedParts, (long) result.v1()); assertEquals("Expected remaining [" + expectedRemaining + "] but got [" + result.v2() + "]", expectedRemaining, (long) result.v2()); } + + public void testListBlobsByPrefix() throws IOException { + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + + final S3Client client = mock(S3Client.class); + final AmazonS3Reference clientReference = new AmazonS3Reference(client); + when(blobStore.clientReference()).thenReturn(clientReference); + + BlobPath blobPath = mock(BlobPath.class); + when(blobPath.buildAsString()).thenReturn("/dummy/path"); + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + final ListObjectsV2Iterable listObjectsV2Iterable = mock(ListObjectsV2Iterable.class); + when(client.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Iterable); + + MockListObjectsV2ResponseIterator iterator = new MockListObjectsV2ResponseIterator(2, 5, 100); + when(listObjectsV2Iterable.iterator()).thenReturn(iterator); + + Map listOfBlobs = blobContainer.listBlobsByPrefix(null); + assertEquals(10, listOfBlobs.size()); + + Set keys = iterator.keysListed.stream() + .map(s -> s.substring(blobPath.buildAsString().length())) + .collect(Collectors.toSet()); + assertEquals(keys, listOfBlobs.keySet()); + } + + private void testListBlobsByPrefixInLexicographicOrder( + int limit, + int expectedNumberofPagesFetched, + BlobContainer.BlobNameSortOrder blobNameSortOrder + ) throws IOException { + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + + final S3Client client = mock(S3Client.class); + final AmazonS3Reference clientReference = new AmazonS3Reference(client); + when(blobStore.clientReference()).thenReturn(clientReference); + + BlobPath blobPath = mock(BlobPath.class); + when(blobPath.buildAsString()).thenReturn("/dummy/path"); + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + final ListObjectsV2Iterable listObjectsV2Iterable = mock(ListObjectsV2Iterable.class); + when(client.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Iterable); + + final MockListObjectsV2ResponseIterator iterator = new MockListObjectsV2ResponseIterator(2, 5, 100, blobPath.buildAsString()); + when(listObjectsV2Iterable.iterator()).thenReturn(iterator); + + if (limit >= 0) { + blobContainer.listBlobsByPrefixInSortedOrder(null, limit, blobNameSortOrder, new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + int actualLimit = Math.max(0, Math.min(limit, 10)); + assertEquals(actualLimit, blobMetadata.size()); + + List keys = iterator.keysListed.stream() + .map(s -> s.substring(blobPath.buildAsString().length())) + .collect(Collectors.toList()); + Comparator keysComparator = String::compareTo; + if (blobNameSortOrder != BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC) { + keysComparator = Collections.reverseOrder(String::compareTo); + } + keys.sort(keysComparator); + List sortedKeys = keys.subList(0, actualLimit); + assertEquals(sortedKeys, blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList())); + assertEquals(expectedNumberofPagesFetched, iterator.numberOfPagesFetched()); + } + + @Override + public void onFailure(Exception e) { + fail("blobContainer.listBlobsByPrefixInLexicographicOrder failed with exception: " + e.getMessage()); + } + }); + } else { + assertThrows( + IllegalArgumentException.class, + () -> blobContainer.listBlobsByPrefixInSortedOrder(null, limit, blobNameSortOrder, new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) {} + + @Override + public void onFailure(Exception e) {} + }) + ); + } + } + + public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException { + testListBlobsByPrefixInLexicographicOrder(-5, 0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithZeroLimit() throws IOException { + testListBlobsByPrefixInLexicographicOrder(0, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitLessThanPageSize() throws IOException { + testListBlobsByPrefixInLexicographicOrder(2, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanPageSize() throws IOException { + testListBlobsByPrefixInLexicographicOrder(8, 2, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanNumberOfRecords() throws IOException { + testListBlobsByPrefixInLexicographicOrder(12, 2, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } } diff --git a/plugins/telemetry-otel/build.gradle b/plugins/telemetry-otel/build.gradle index 7a56621be5f1e..2c275388cce38 100644 --- a/plugins/telemetry-otel/build.gradle +++ b/plugins/telemetry-otel/build.gradle @@ -54,3 +54,25 @@ thirdPartyAudit { 'io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider' ) } + +tasks.named("bundlePlugin").configure { + from('config/telemetry-otel') { + into 'config' + } +} + +tasks.register("writeTestJavaPolicy") { + doLast { + final File tmp = file("${buildDir}/tmp") + if (tmp.exists() == false && tmp.mkdirs() == false) { + throw new GradleException("failed to create temporary directory [${tmp}]") + } + final File javaPolicy = file("${tmp}/java.policy") + javaPolicy.write( + [ + "grant {", + " permission java.io.FilePermission \"config\", \"read\";", + "};" + ].join("\n")) + } +} diff --git a/plugins/telemetry-otel/config/telemetry-otel/log4j2.properties b/plugins/telemetry-otel/config/telemetry-otel/log4j2.properties new file mode 100644 index 0000000000000..544f42bd5513b --- /dev/null +++ b/plugins/telemetry-otel/config/telemetry-otel/log4j2.properties @@ -0,0 +1,27 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + + +appender.tracing.type = RollingFile +appender.tracing.name = tracing +appender.tracing.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_otel_traces.log +appender.tracing.filePermissions = rw-r----- +appender.tracing.layout.type = PatternLayout +appender.tracing.layout.pattern = %m%n +appender.tracing.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_otel_traces-%i.log.gz +appender.tracing.policies.type = Policies +appender.tracing.policies.size.type = SizeBasedTriggeringPolicy +appender.tracing.policies.size.size = 1GB +appender.tracing.strategy.type = DefaultRolloverStrategy +appender.tracing.strategy.max = 4 + + +logger.exporter.name = io.opentelemetry.exporter.logging.LoggingSpanExporter +logger.exporter.level = INFO +logger.exporter.appenderRef.tracing.ref = tracing +logger.exporter.additivity = false diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java index 04bade9ec942a..292165979c2f2 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java @@ -42,7 +42,7 @@ private OTelResourceProvider() {} public static OpenTelemetry get(Settings settings) { return get( settings, - new LoggingSpanExporter(), + LoggingSpanExporter.create(), ContextPropagators.create(W3CTraceContextPropagator.getInstance()), Sampler.alwaysOn() ); diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index b867b90af333c..f1f469544f634 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -113,8 +113,6 @@ private void printClusterRouting() throws IOException, ParseException { /** * This test verifies that segment replication does not break when primary shards are on lower OS version. It does this * by verifying replica shards contains same number of documents as primary's. - * - * @throws Exception */ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { @@ -164,8 +162,6 @@ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { * This test creates a cluster with primary on higher version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider}; * replica shard allocation on lower OpenSearch version is prevented. Thus, this test though cover the use case where * primary shard containing nodes are running on higher OS version while replicas are unassigned. - * - * @throws Exception */ public void testIndexingWithReplicaOnBwcNodes() throws Exception { if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/370_multi_terms.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/370_multi_terms.yml index eeab8e78bf830..7db5f31d8e761 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/370_multi_terms.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/370_multi_terms.yml @@ -716,7 +716,7 @@ setup: --- "aggregate over multi-terms test": - skip: - version: "- 2.9.99" + version: "- 2.8.99" reason: "multi_terms aggregation was introduced in 2.1.0, NPE bug checked by this test case will manifest in any version < 3.0" - do: diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/60_empty.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/60_empty.yml index 7b374e3f6a409..54f61d46b6f6c 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/60_empty.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/60_empty.yml @@ -1,5 +1,8 @@ --- "Empty aggs Body": + - skip: + version: "- 2.8.99" + reason: "the fix was introduced in 2.9.0" - do: index: index: test diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/90_search_after.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/90_search_after.yml index 65c1527a68b96..55e1566656faf 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search/90_search_after.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/90_search_after.yml @@ -160,6 +160,28 @@ - match: {hits.hits.0._source.timestamp: "2019-10-21 00:30:04.828" } - match: {hits.hits.0.sort: [1571617804828] } + # search_after with the sort with missing + - do: + bulk: + refresh: true + index: test + body: | + {"index":{}} + {"timestamp": null} + - do: + search: + index: test + rest_total_hits_as_int: true + body: + "size": 5 + "sort": [ { "timestamp": { "order": "asc", "missing": "_last" } } ] + search_after: [ "2021-10-21 08:30:04.828" ] # making it out of min/max so only missing value hit is qualified + + - match: { hits.total: 3 } + - length: { hits.hits: 1 } + - match: { hits.hits.0._index: test } + - match: { hits.hits.0._source.timestamp: null } + --- "date_nanos": - skip: @@ -276,3 +298,25 @@ - match: {hits.hits.0._index: test } - match: {hits.hits.0._source.population: 15223372036854775800 } - match: {hits.hits.0.sort: [15223372036854775800] } + + # search_after with the sort with missing + - do: + bulk: + refresh: true + index: test + body: | + {"index":{}} + {"population": null} + - do: + search: + index: test + rest_total_hits_as_int: true + body: + "size": 5 + "sort": [ { "population": { "order": "asc", "missing": "_last" } } ] + search_after: [15223372036854775801] # making it out of min/max so only missing value hit is qualified + + - match: { hits.total: 3 } + - length: { hits.hits: 1 } + - match: { hits.hits.0._index: test } + - match: { hits.hits.0._source.population: null } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java index 9f382d6a2e9ef..b6ea3a094f496 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java @@ -96,7 +96,6 @@ public void testGlobalPrimaryAllocation() throws Exception { * This test in general passes without primary shard balance as well due to nature of allocation algorithm which * assigns all primary shards first followed by replica copies. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7751") public void testPerIndexPrimaryAllocation() throws Exception { internalCluster().startClusterManagerOnlyNode(); final int maxReplicaCount = 2; @@ -234,23 +233,30 @@ private void verifyPerIndexPrimaryBalance() throws Exception { RoutingNodes nodes = currentState.getRoutingNodes(); for (final Map.Entry index : currentState.getRoutingTable().indicesRouting().entrySet()) { final int totalPrimaryShards = index.getValue().primaryShardsActive(); - final int avgPrimaryShardsPerNode = (int) Math.ceil(totalPrimaryShards * 1f / currentState.getRoutingNodes().size()); + final int lowerBoundPrimaryShardsPerNode = (int) Math.floor(totalPrimaryShards * 1f / currentState.getRoutingNodes().size()) + - 1; + final int upperBoundPrimaryShardsPerNode = (int) Math.ceil(totalPrimaryShards * 1f / currentState.getRoutingNodes().size()) + + 1; for (RoutingNode node : nodes) { final int primaryCount = node.shardsWithState(index.getKey(), STARTED) .stream() .filter(ShardRouting::primary) .collect(Collectors.toList()) .size(); - if (primaryCount > avgPrimaryShardsPerNode) { - logger.info( - "--> Primary shard balance assertion failure for index {} on node {} {} <= {}", - index.getKey(), - node.node().getName(), - primaryCount, - avgPrimaryShardsPerNode - ); - } - assertTrue(primaryCount <= avgPrimaryShardsPerNode); + // Asserts value is within the variance threshold (-1/+1 of the average value). + assertTrue( + "--> Primary balance assertion failure for index " + + index + + "on node " + + node.node().getName() + + " " + + lowerBoundPrimaryShardsPerNode + + " <= " + + primaryCount + + " (assigned) <= " + + upperBoundPrimaryShardsPerNode, + lowerBoundPrimaryShardsPerNode <= primaryCount && primaryCount <= upperBoundPrimaryShardsPerNode + ); } } }, 60, TimeUnit.SECONDS); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index d226d0d757638..336646b35b5a6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -15,6 +15,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; +import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; @@ -74,6 +75,13 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) { return remoteStoreIndexSettings(numberOfReplicas, 1); } + protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit) { + return Settings.builder() + .put(remoteStoreIndexSettings(numberOfReplicas)) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), totalFieldLimit) + .build(); + } + protected Settings remoteTranslogIndexSettings(int numberOfReplicas, int numberOfShards) { return Settings.builder() .put(remoteStoreIndexSettings(numberOfReplicas, numberOfShards)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 70a41d74a57c5..f6ba8cfed00d0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -17,6 +17,7 @@ import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalTestCluster; @@ -277,4 +278,42 @@ public void testRemoteSegmentCleanup() throws Exception { public void testRemoteTranslogCleanup() throws Exception { verifyRemoteStoreCleanup(true); } + + public void testStaleCommitDeletionWithInvokeFlush() throws Exception { + internalCluster().startDataOnlyNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); + int numberOfIterations = randomIntBetween(5, 15); + indexData(numberOfIterations, true); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); + // Delete is async. + assertBusy(() -> { + int actualFileCount = getFileCount(indexPath); + if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) { + assertEquals(numberOfIterations, actualFileCount); + } else { + // As delete is async its possible that the file gets created before the deletion or after + // deletion. + assertTrue(actualFileCount >= 10 || actualFileCount <= 11); + } + }, 30, TimeUnit.SECONDS); + } + + public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { + internalCluster().startDataOnlyNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); + int numberOfIterations = randomIntBetween(5, 15); + indexData(numberOfIterations, false); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); + assertEquals(1, getFileCount(indexPath)); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java index 9f492bbaee01a..e362b7f61e8e6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java @@ -183,8 +183,8 @@ public void testRestoreRemoteStoreIndicesWithoutRemoteTranslog() throws IOExcept public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnabled) throws IOException, ExecutionException, InterruptedException { - internalCluster().startClusterManagerOnlyNode(); - final String primaryNode = internalCluster().startNode(); + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); + String primary = internalCluster().startDataOnlyNode(); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -216,7 +216,7 @@ public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnable indexDocuments(client, indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); - final String secondNode = internalCluster().startNode(); + internalCluster().startDataOnlyNode(); logger.info("--> snapshot"); CreateSnapshotResponse createSnapshotResponse = client.admin() .cluster() @@ -273,10 +273,12 @@ public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnable assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2); // deleting data for restoredIndexName1 and restoring from remote store. - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(restoredIndexName1))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); ensureRed(restoredIndexName1); - assertAcked(client().admin().indices().prepareClose(restoredIndexName1)); - client().admin() + // Re-initialize client to make sure we are not using client from stopped node. + client = client(clusterManagerNode); + assertAcked(client.admin().indices().prepareClose(restoredIndexName1)); + client.admin() .cluster() .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(restoredIndexName1), PlainActionFuture.newFuture()); ensureYellowAndNoInitializingShards(restoredIndexName1); @@ -300,7 +302,7 @@ public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnable assertEquals(restoreSnapshotResponse3.status(), RestStatus.ACCEPTED); ensureGreen(restoredIndexName1Seg); - GetIndexResponse getIndexResponse = client().admin() + GetIndexResponse getIndexResponse = client.admin() .indices() .getIndex(new GetIndexRequest().indices(restoredIndexName1Seg).includeDefaults(true)) .get(); @@ -331,7 +333,7 @@ public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnable assertEquals(restoreSnapshotResponse4.status(), RestStatus.ACCEPTED); ensureGreen(restoredIndexName1Doc); - getIndexResponse = client().admin() + getIndexResponse = client.admin() .indices() .getIndex(new GetIndexRequest().indices(restoredIndexName1Doc).includeDefaults(true)) .get(); @@ -347,8 +349,8 @@ public void testRestoreOperationsShallowCopyEnabled(boolean remoteTranslogEnable } public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { - internalCluster().startClusterManagerOnlyNode(); - internalCluster().startNode(); + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); + String primary = internalCluster().startDataOnlyNode(); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -378,7 +380,7 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { indexDocuments(client, indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); - final String secondNode = internalCluster().startNode(); + internalCluster().startDataOnlyNode(); logger.info("--> snapshot"); CreateSnapshotResponse createSnapshotResponse = client.admin() .cluster() @@ -435,10 +437,12 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { assertDocsPresentInIndex(client, restoredIndexName2, numDocsInIndex2); // deleting data for restoredIndexName1 and restoring from remote store. - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(indexName1))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); ensureRed(indexName1); - assertAcked(client().admin().indices().prepareClose(indexName1)); - client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indexName1), PlainActionFuture.newFuture()); + // Re-initialize client to make sure we are not using client from stopped node. + client = client(clusterManagerNode); + assertAcked(client.admin().indices().prepareClose(indexName1)); + client.admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indexName1), PlainActionFuture.newFuture()); ensureYellowAndNoInitializingShards(indexName1); ensureGreen(indexName1); assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1); @@ -449,8 +453,8 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { } public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException { - internalCluster().startClusterManagerOnlyNode(); - final String primaryNode = internalCluster().startNode(); + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); + String primary = internalCluster().startDataOnlyNode(); String indexName1 = "testindex1"; String indexName2 = "testindex2"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -479,7 +483,7 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException indexDocuments(client, indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); - final String secondNode = internalCluster().startNode(); + internalCluster().startDataOnlyNode(); logger.info("--> snapshot"); CreateSnapshotResponse createSnapshotResponse = client.admin() @@ -513,9 +517,11 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1); // deleting data for restoredIndexName1 and restoring from remote store. - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(restoredIndexName1))); - assertAcked(client().admin().indices().prepareClose(restoredIndexName1)); - client().admin() + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); + // Re-initialize client to make sure we are not using client from stopped node. + client = client(clusterManagerNode); + assertAcked(client.admin().indices().prepareClose(restoredIndexName1)); + client.admin() .cluster() .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(restoredIndexName1), PlainActionFuture.newFuture()); ensureYellowAndNoInitializingShards(restoredIndexName1); diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index 9712fdbfbe8ec..ffc6f81490c27 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -291,14 +291,14 @@ private void runTasks(TaskInputs taskInputs) { return; } - final long computationStartTime = threadPool.relativeTimeInMillis(); + final long computationStartTime = threadPool.preciseRelativeTimeInNanos(); final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState); taskOutputs.notifyFailedTasks(); final TimeValue computationTime = getTimeSince(computationStartTime); logExecutionTime(computationTime, "compute cluster state update", summary); if (taskOutputs.clusterStateUnchanged()) { - final long notificationStartTime = threadPool.relativeTimeInMillis(); + final long notificationStartTime = threadPool.preciseRelativeTimeInNanos(); taskOutputs.notifySuccessfulTasksOnUnchangedClusterState(); final TimeValue executionTime = getTimeSince(notificationStartTime); logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary); @@ -309,7 +309,7 @@ private void runTasks(TaskInputs taskInputs) { } else { logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary); } - final long publicationStartTime = threadPool.relativeTimeInMillis(); + final long publicationStartTime = threadPool.preciseRelativeTimeInNanos(); try { ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState); // new cluster state, notify all listeners @@ -335,8 +335,8 @@ private void runTasks(TaskInputs taskInputs) { } } - private TimeValue getTimeSince(long startTimeMillis) { - return TimeValue.timeValueMillis(Math.max(0, threadPool.relativeTimeInMillis() - startTimeMillis)); + private TimeValue getTimeSince(long startTimeNanos) { + return TimeValue.timeValueMillis(TimeValue.nsecToMSec(threadPool.preciseRelativeTimeInNanos() - startTimeNanos)); } protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) { @@ -358,7 +358,7 @@ protected boolean blockingAllowed() { } void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs) { - final long notificationStartTime = threadPool.relativeTimeInMillis(); + final long notificationStartTime = threadPool.preciseRelativeTimeInNanos(); taskOutputs.processedDifferentClusterState(clusterChangedEvent.previousState(), clusterChangedEvent.state()); try { diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index ac38768c9f3d3..e626824e7e271 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -32,10 +32,14 @@ package org.opensearch.common.blobstore; +import org.opensearch.action.ActionListener; + import java.io.IOException; import java.io.InputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; @@ -191,4 +195,47 @@ default long readBlobPreferredLength() { * @throws IOException if there were any failures in reading from the blob container. */ Map listBlobsByPrefix(String blobNamePrefix) throws IOException; + + /** + * The type representing sort order of blob names + */ + enum BlobNameSortOrder { + + LEXICOGRAPHIC(Comparator.comparing(BlobMetadata::name)); + + final Comparator comparator; + + public Comparator comparator() { + return comparator; + } + + BlobNameSortOrder(final Comparator comparator) { + this.comparator = comparator; + } + } + + /** + * Lists all blobs in the container that match the specified prefix in lexicographic order + * @param blobNamePrefix The prefix to match against blob names in the container. + * @param limit Limits the result size to min(limit, number of keys) + * @param blobNameSortOrder Comparator to sort keys with + * @param listener the listener to be notified upon request completion + */ + default void listBlobsByPrefixInSortedOrder( + String blobNamePrefix, + int limit, + BlobNameSortOrder blobNameSortOrder, + ActionListener> listener + ) { + if (limit < 0) { + throw new IllegalArgumentException("limit should not be a negative value"); + } + try { + List blobNames = new ArrayList<>(listBlobsByPrefix(blobNamePrefix).values()); + blobNames.sort(blobNameSortOrder.comparator()); + listener.onResponse(blobNames.subList(0, Math.min(blobNames.size(), limit))); + } catch (Exception e) { + listener.onFailure(e); + } + } } diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java b/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java index 1423a30bbe307..56c9f0387b13e 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionDependency.java @@ -16,10 +16,6 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; -import org.opensearch.core.common.Strings; -import org.opensearch.core.xcontent.XContentParser; - -import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * This class handles the dependent extensions information @@ -60,39 +56,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVersion(version); } - public static ExtensionDependency parse(XContentParser parser) throws IOException { - String uniqueId = null; - Version version = null; - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String fieldName = parser.currentName(); - parser.nextToken(); - - switch (fieldName) { - case UNIQUE_ID: - uniqueId = parser.text(); - break; - case VERSION: - try { - version = Version.fromString(parser.text()); - } catch (IllegalArgumentException e) { - throw e; - } - break; - default: - parser.skipChildren(); - break; - } - } - if (Strings.isNullOrEmpty(uniqueId)) { - throw new IOException("Required field [uniqueId] is missing in the request for the dependent extension"); - } else if (version == null) { - throw new IOException("Required field [version] is missing in the request for the dependent extension"); - } - return new ExtensionDependency(uniqueId, version); - - } - /** * The uniqueId of the dependency extension * diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 9987497b5fac0..cb22c8d864b1b 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -105,7 +105,7 @@ public static enum OpenSearchRequestType { /** * Instantiate a new ExtensionsManager object to handle requests and responses from extensions. This is called during Node bootstrap. * - * @param additionalSettings Additional settings to read in from extensions.yml + * @param additionalSettings Additional settings to read in from extension initialization request * @throws IOException If the extensions discovery file is not properly retrieved. */ public ExtensionsManager(Set> additionalSettings) throws IOException { @@ -504,4 +504,8 @@ void setAddSettingsUpdateConsumerRequestHandler(AddSettingsUpdateConsumerRequest Settings getEnvironmentSettings() { return environmentSettings; } + + public Set> getAdditionalSettings() { + return this.additionalSettings; + } } diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java b/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java index e0806f8172278..f47f342617732 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestInitializeExtensionAction.java @@ -8,9 +8,14 @@ package org.opensearch.extensions.rest; +import org.opensearch.Version; import org.opensearch.client.node.NodeClient; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.core.common.Strings; +import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; import org.opensearch.extensions.ExtensionDependency; import org.opensearch.extensions.ExtensionScopedSettings; import org.opensearch.extensions.ExtensionsManager; @@ -23,12 +28,16 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; -import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.rest.RestRequest.Method.POST; /** @@ -62,36 +71,79 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client String openSearchVersion = null; String minimumCompatibleVersion = null; List dependencies = new ArrayList<>(); + Set additionalSettingsKeys = extensionsManager.getAdditionalSettings() + .stream() + .map(s -> s.getKey()) + .collect(Collectors.toSet()); - try (XContentParser parser = request.contentParser()) { - parser.nextToken(); - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String currentFieldName = parser.currentName(); - parser.nextToken(); - if ("name".equals(currentFieldName)) { - name = parser.text(); - } else if ("uniqueId".equals(currentFieldName)) { - uniqueId = parser.text(); - } else if ("hostAddress".equals(currentFieldName)) { - hostAddress = parser.text(); - } else if ("port".equals(currentFieldName)) { - port = parser.text(); - } else if ("version".equals(currentFieldName)) { - version = parser.text(); - } else if ("opensearchVersion".equals(currentFieldName)) { - openSearchVersion = parser.text(); - } else if ("minimumCompatibleVersion".equals(currentFieldName)) { - minimumCompatibleVersion = parser.text(); - } else if ("dependencies".equals(currentFieldName)) { - ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - dependencies.add(ExtensionDependency.parse(parser)); + Tuple> unreadExtensionTuple = XContentHelper.convertToMap( + request.content(), + false, + request.getXContentType().xContent().mediaType() + ); + Map extensionMap = unreadExtensionTuple.v2(); + + ExtensionScopedSettings extAdditionalSettings = new ExtensionScopedSettings(extensionsManager.getAdditionalSettings()); + + try { + // checking to see whether any required fields are missing from extension initialization request or not + String[] requiredFields = { + "name", + "uniqueId", + "hostAddress", + "port", + "version", + "opensearchVersion", + "minimumCompatibleVersion" }; + List missingFields = Arrays.stream(requiredFields) + .filter(field -> !extensionMap.containsKey(field)) + .collect(Collectors.toList()); + if (!missingFields.isEmpty()) { + throw new IOException("Extension is missing these required fields : " + missingFields); + } + + // Parse extension dependencies + List extensionDependencyList = new ArrayList(); + if (extensionMap.get("dependencies") != null) { + List> extensionDependencies = new ArrayList<>( + (Collection>) extensionMap.get("dependencies") + ); + for (HashMap dependency : extensionDependencies) { + if (Strings.isNullOrEmpty((String) dependency.get("uniqueId"))) { + throw new IOException("Required field [uniqueId] is missing in the request for the dependent extension"); + } else if (dependency.get("version") == null) { + throw new IOException("Required field [version] is missing in the request for the dependent extension"); } + extensionDependencyList.add( + new ExtensionDependency( + dependency.get("uniqueId").toString(), + Version.fromString(dependency.get("version").toString()) + ) + ); } } + + Map additionalSettingsMap = extensionMap.entrySet() + .stream() + .filter(kv -> additionalSettingsKeys.contains(kv.getKey())) + .collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue())); + + Settings.Builder output = Settings.builder(); + output.loadFromMap(additionalSettingsMap); + extAdditionalSettings.applySettings(output.build()); + + // Create extension read from initialization request + name = extensionMap.get("name").toString(); + uniqueId = extensionMap.get("uniqueId").toString(); + hostAddress = extensionMap.get("hostAddress").toString(); + port = extensionMap.get("port").toString(); + version = extensionMap.get("version").toString(); + openSearchVersion = extensionMap.get("opensearchVersion").toString(); + minimumCompatibleVersion = extensionMap.get("minimumCompatibleVersion").toString(); + dependencies = extensionDependencyList; } catch (IOException e) { - throw new IOException("Missing attribute", e); + logger.warn("loading extension has been failed because of exception : " + e.getMessage()); + return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); } Extension extension = new Extension( @@ -103,8 +155,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client openSearchVersion, minimumCompatibleVersion, dependencies, - // TODO add this to the API (https://github.com/opensearch-project/OpenSearch/issues/8032) - new ExtensionScopedSettings(Collections.emptySet()) + extAdditionalSettings ); try { extensionsManager.loadExtension(extension); diff --git a/server/src/main/java/org/opensearch/index/fielddata/IndexFieldData.java b/server/src/main/java/org/opensearch/index/fielddata/IndexFieldData.java index 529405c90c9a4..36e6a242ecdec 100644 --- a/server/src/main/java/org/opensearch/index/fielddata/IndexFieldData.java +++ b/server/src/main/java/org/opensearch/index/fielddata/IndexFieldData.java @@ -120,11 +120,13 @@ abstract class XFieldComparatorSource extends FieldComparatorSource { protected final MultiValueMode sortMode; protected final Object missingValue; protected final Nested nested; + protected boolean enableSkipping; public XFieldComparatorSource(Object missingValue, MultiValueMode sortMode, Nested nested) { this.sortMode = sortMode; this.missingValue = missingValue; this.nested = nested; + this.enableSkipping = true; // true by default } public MultiValueMode sortMode() { @@ -135,6 +137,10 @@ public Nested nested() { return this.nested; } + public void disableSkipping() { + this.enableSkipping = false; + } + /** * Simple wrapper class around a filter that matches parent documents * and a filter that matches child documents. For every root document R, diff --git a/server/src/main/java/org/opensearch/index/fielddata/IndexNumericFieldData.java b/server/src/main/java/org/opensearch/index/fielddata/IndexNumericFieldData.java index 052a679de9765..ae8ffd8fe6b97 100644 --- a/server/src/main/java/org/opensearch/index/fielddata/IndexNumericFieldData.java +++ b/server/src/main/java/org/opensearch/index/fielddata/IndexNumericFieldData.java @@ -198,24 +198,35 @@ private XFieldComparatorSource comparatorSource( MultiValueMode sortMode, Nested nested ) { + final XFieldComparatorSource source; switch (targetNumericType) { case HALF_FLOAT: case FLOAT: - return new FloatValuesComparatorSource(this, missingValue, sortMode, nested); + source = new FloatValuesComparatorSource(this, missingValue, sortMode, nested); + break; case DOUBLE: - return new DoubleValuesComparatorSource(this, missingValue, sortMode, nested); + source = new DoubleValuesComparatorSource(this, missingValue, sortMode, nested); + break; case UNSIGNED_LONG: - return new UnsignedLongValuesComparatorSource(this, missingValue, sortMode, nested); + source = new UnsignedLongValuesComparatorSource(this, missingValue, sortMode, nested); + break; case DATE: - return dateComparatorSource(missingValue, sortMode, nested); + source = dateComparatorSource(missingValue, sortMode, nested); + break; case DATE_NANOSECONDS: - return dateNanosComparatorSource(missingValue, sortMode, nested); + source = dateNanosComparatorSource(missingValue, sortMode, nested); + break; case LONG: - return new LongValuesComparatorSource(this, missingValue, sortMode, nested); + source = new LongValuesComparatorSource(this, missingValue, sortMode, nested); + break; default: assert !targetNumericType.isFloatingPoint(); - return new IntValuesComparatorSource(this, missingValue, sortMode, nested); + source = new IntValuesComparatorSource(this, missingValue, sortMode, nested); } + if (targetNumericType != getNumericType()) { + source.disableSkipping(); // disable skipping logic for caste of sort field + } + return source; } protected XFieldComparatorSource dateComparatorSource(@Nullable Object missingValue, MultiValueMode sortMode, Nested nested) { diff --git a/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/DoubleValuesComparatorSource.java b/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/DoubleValuesComparatorSource.java index f8af86c904f2c..34e86070054c9 100644 --- a/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/DoubleValuesComparatorSource.java +++ b/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/DoubleValuesComparatorSource.java @@ -104,7 +104,7 @@ public FieldComparator newComparator(String fieldname, int numHits, boolean e final double dMissingValue = (Double) missingObject(missingValue, reversed); // NOTE: it's important to pass null as a missing value in the constructor so that // the comparator doesn't check docsWithField since we replace missing values in select() - return new DoubleComparator(numHits, null, null, reversed, false) { + return new DoubleComparator(numHits, fieldname, null, reversed, enableSkipping && this.enableSkipping) { @Override public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { return new DoubleLeafComparator(context) { diff --git a/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/FloatValuesComparatorSource.java b/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/FloatValuesComparatorSource.java index 686bef479c179..04a34cd418520 100644 --- a/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/FloatValuesComparatorSource.java +++ b/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/FloatValuesComparatorSource.java @@ -97,7 +97,7 @@ public FieldComparator newComparator(String fieldname, int numHits, boolean e final float fMissingValue = (Float) missingObject(missingValue, reversed); // NOTE: it's important to pass null as a missing value in the constructor so that // the comparator doesn't check docsWithField since we replace missing values in select() - return new FloatComparator(numHits, null, null, reversed, false) { + return new FloatComparator(numHits, fieldname, null, reversed, enableSkipping && this.enableSkipping) { @Override public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { return new FloatLeafComparator(context) { diff --git a/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/IntValuesComparatorSource.java b/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/IntValuesComparatorSource.java index 90afa5fc64c29..d5ea1eaf7263d 100644 --- a/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/IntValuesComparatorSource.java +++ b/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/IntValuesComparatorSource.java @@ -76,7 +76,7 @@ public FieldComparator newComparator(String fieldname, int numHits, boolean e final int iMissingValue = (Integer) missingObject(missingValue, reversed); // NOTE: it's important to pass null as a missing value in the constructor so that // the comparator doesn't check docsWithField since we replace missing values in select() - return new IntComparator(numHits, null, null, reversed, false) { + return new IntComparator(numHits, fieldname, null, reversed, enableSkipping && this.enableSkipping) { @Override public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { return new IntLeafComparator(context) { diff --git a/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/LongValuesComparatorSource.java b/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/LongValuesComparatorSource.java index 462092dca5110..43e033dd59716 100644 --- a/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/LongValuesComparatorSource.java +++ b/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/LongValuesComparatorSource.java @@ -120,7 +120,7 @@ public FieldComparator newComparator(String fieldname, int numHits, boolean e final long lMissingValue = (Long) missingObject(missingValue, reversed); // NOTE: it's important to pass null as a missing value in the constructor so that // the comparator doesn't check docsWithField since we replace missing values in select() - return new LongComparator(numHits, null, null, reversed, false) { + return new LongComparator(numHits, fieldname, null, reversed, enableSkipping && this.enableSkipping) { @Override public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { return new LongLeafComparator(context) { diff --git a/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/UnsignedLongValuesComparatorSource.java b/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/UnsignedLongValuesComparatorSource.java index d8b2e9528d2cf..be56b50179114 100644 --- a/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/UnsignedLongValuesComparatorSource.java +++ b/server/src/main/java/org/opensearch/index/fielddata/fieldcomparator/UnsignedLongValuesComparatorSource.java @@ -92,7 +92,7 @@ public FieldComparator newComparator(String fieldname, int numHits, boolean e assert indexFieldData == null || fieldname.equals(indexFieldData.getFieldName()); final BigInteger ulMissingValue = (BigInteger) missingObject(missingValue, reversed); - return new UnsignedLongComparator(numHits, null, null, reversed, false) { + return new UnsignedLongComparator(numHits, fieldname, null, reversed, enableSkipping && this.enableSkipping) { @Override public LeafFieldComparator getLeafComparator(LeafReaderContext context) throws IOException { return new UnsignedLongLeafComparator(context) { diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java new file mode 100644 index 0000000000000..1eeadfe228a45 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import java.util.Arrays; + +/** + * Utils for remote store + * + * @opensearch.internal + */ +public class RemoteStoreUtils { + public static final int LONG_MAX_LENGTH = String.valueOf(Long.MAX_VALUE).length(); + + /** + * This method subtracts given numbers from Long.MAX_VALUE and returns a string representation of the result. + * The resultant string is guaranteed to be of the same length that of Long.MAX_VALUE. If shorter, we add left padding + * of 0s to the string. + * @param num number to get the inverted long string for + * @return String value of Long.MAX_VALUE - num + */ + public static String invertLong(long num) { + if (num < 0) { + throw new IllegalArgumentException("Negative long values are not allowed"); + } + String invertedLong = String.valueOf(Long.MAX_VALUE - num); + char[] characterArray = new char[LONG_MAX_LENGTH - invertedLong.length()]; + Arrays.fill(characterArray, '0'); + + return new String(characterArray) + invertedLong; + } + + /** + * This method converts the given string into long and subtracts it from Long.MAX_VALUE + * @param str long in string format to be inverted + * @return long value of the invert result + */ + public static long invertLong(String str) { + long num = Long.parseLong(str); + if (num < 0) { + throw new IllegalArgumentException("Strings representing negative long values are not allowed"); + } + return Long.MAX_VALUE - num; + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d89d51c713d70..01c0a12d463ea 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2632,7 +2632,7 @@ public void restoreFromSnapshotAndRemoteStore( assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + recoveryState.getRecoverySource(); StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener); + storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 7cfaaafcadd39..ddca12d9283f3 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -85,7 +85,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres // Visible for testing static final Set EXCLUDE_FILES = Set.of("write.lock"); // Visible for testing - static final int LAST_N_METADATA_FILES_TO_KEEP = 10; + public static final int LAST_N_METADATA_FILES_TO_KEEP = 10; private final IndexShard indexShard; private final Directory storeDirectory; @@ -200,9 +200,8 @@ private synchronized void syncSegments(boolean isRetry) { // if a new segments_N file is present in local that is not uploaded to remote store yet, it // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. // This is done to avoid delete post each refresh. - // Ideally, we want this to be done in async flow. (GitHub issue #4315) if (isRefreshAfterCommit()) { - deleteStaleCommits(); + remoteDirectory.deleteStaleSegmentsAsync(LAST_N_METADATA_FILES_TO_KEEP); } try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { @@ -381,14 +380,6 @@ private String getChecksumOfLocalFile(String file) throws IOException { return localSegmentChecksumMap.get(file); } - private void deleteStaleCommits() { - try { - remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP); - } catch (IOException e) { - logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); - } - } - /** * Updates the last refresh time and refresh seq no which is seen by local store. */ diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 119524e8caf8a..da4e9113143af 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -69,6 +69,7 @@ import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.channels.FileChannel; @@ -356,7 +357,8 @@ void recoverFromSnapshotAndRemoteStore( final IndexShard indexShard, Repository repository, RepositoriesService repositoriesService, - ActionListener listener + ActionListener listener, + ThreadPool threadPool ) { try { if (canRecover(indexShard)) { @@ -384,7 +386,10 @@ void recoverFromSnapshotAndRemoteStore( remoteStoreRepository = shallowCopyShardMetadata.getRemoteStoreRepository(); } - RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService); + RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory( + () -> repositoriesService, + threadPool + ); RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory( remoteStoreRepository, indexUUID, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index addd8a24af9c5..ac129aca8baf7 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -27,6 +27,7 @@ import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.threadpool.ThreadPool; import java.io.FileNotFoundException; import java.io.IOException; @@ -75,6 +76,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final RemoteStoreLockManager mdLockManager; + private final ThreadPool threadPool; + /** * To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation * This is achieved by uploading refresh metadata file with the same UUID suffix. @@ -96,15 +99,23 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class); + /** + * AtomicBoolean that ensures only one staleCommitDeletion activity is scheduled at a time. + * Visible for testing + */ + protected final AtomicBoolean canDeleteStaleCommits = new AtomicBoolean(true); + public RemoteSegmentStoreDirectory( RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory, - RemoteStoreLockManager mdLockManager + RemoteStoreLockManager mdLockManager, + ThreadPool threadPool ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; + this.threadPool = threadPool; init(); } @@ -574,7 +585,7 @@ public Map getSegmentsUploadedToRemoteStore(lon * @param lastNMetadataFilesToKeep number of metadata files to keep * @throws IOException in case of I/O error while reading from / writing to remote segment store */ - public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { + private void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); List sortedMetadataFileList = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList()); if (sortedMetadataFileList.size() <= lastNMetadataFilesToKeep) { @@ -656,6 +667,33 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException } } + /** + * Delete stale segment and metadata files asynchronously. + * This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int)} in an async manner. + * @param lastNMetadataFilesToKeep number of metadata files to keep + */ + public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) { + if (canDeleteStaleCommits.compareAndSet(true, false)) { + try { + threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { + try { + deleteStaleSegments(lastNMetadataFilesToKeep); + } catch (Exception e) { + logger.info( + "Exception while deleting stale commits from remote segment store, will retry delete post next commit", + e + ); + } finally { + canDeleteStaleCommits.set(true); + } + }); + } catch (Exception e) { + logger.info("Exception occurred while scheduling deleteStaleCommits", e); + canDeleteStaleCommits.set(true); + } + } + } + /* Tries to delete shard level directory if it is empty Return true if it deleted it successfully @@ -680,7 +718,7 @@ private boolean deleteIfEmpty() throws IOException { } public void close() throws IOException { - deleteStaleSegments(0); + deleteStaleSegmentsAsync(0); deleteIfEmpty(); } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 03995d5913fb3..3bec84f287ce4 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -20,6 +20,7 @@ import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.function.Supplier; @@ -34,8 +35,11 @@ public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.Dire private final Supplier repositoriesService; - public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService) { + private final ThreadPool threadPool; + + public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService, ThreadPool threadPool) { this.repositoriesService = repositoriesService; + this.threadPool = threadPool; } @Override @@ -62,7 +66,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh shardId ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index f0a15f1dc9f26..aecf5659de7fe 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -717,7 +717,8 @@ protected Node( clusterService.setRerouteService(rerouteService); final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( - repositoriesServiceReference::get + repositoriesServiceReference::get, + threadPool ); final IndicesService indicesService = new IndicesService( diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 7d67c6c3b45f4..9daad9112e473 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -1550,7 +1550,9 @@ private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefre } public static boolean canMatchSearchAfter(FieldDoc searchAfter, MinAndMax minMax, FieldSortBuilder primarySortField) { - if (searchAfter != null && minMax != null && primarySortField != null) { + // Check for sort.missing == null, since in case of missing values sort queries, if segment/shard's min/max + // is out of search_after range, it still should be printed and hence we should not skip segment/shard. + if (searchAfter != null && minMax != null && primarySortField != null && primarySortField.missing() == null) { final Object searchAfterPrimary = searchAfter.fields[0]; if (primarySortField.order() == SortOrder.DESC) { if (minMax.compareMin(searchAfterPrimary) > 0) { diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 2c91d5aa33090..ebc68c288e25a 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -330,6 +330,19 @@ public long relativeTimeInNanos() { return cachedTimeThread.relativeTimeInNanos(); } + /** + * Returns a value of nanoseconds that may be used for relative time calculations + * that require the highest precision possible. Performance critical code must use + * either {@link #relativeTimeInNanos()} or {@link #relativeTimeInMillis()} which + * give better performance at the cost of lower precision. + * + * This method should only be used for calculating time deltas. For an epoch based + * timestamp, see {@link #absoluteTimeInMillis()}. + */ + public long preciseRelativeTimeInNanos() { + return System.nanoTime(); + } + /** * Returns the value of milliseconds since UNIX epoch. * diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index fb47cb8e2d65a..3c27748daa87d 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -97,14 +97,14 @@ public class MasterServiceTests extends OpenSearchTestCase { private static ThreadPool threadPool; - private static long relativeTimeInMillis; + private static long timeDiffInMillis; @BeforeClass public static void createThreadPool() { threadPool = new TestThreadPool(MasterServiceTests.class.getName()) { @Override - public long relativeTimeInMillis() { - return relativeTimeInMillis; + public long preciseRelativeTimeInNanos() { + return timeDiffInMillis * TimeValue.NSEC_PER_MSEC; } }; } @@ -119,7 +119,7 @@ public static void stopThreadPool() { @Before public void randomizeCurrentTime() { - relativeTimeInMillis = randomLongBetween(0L, 1L << 62); + timeDiffInMillis = randomLongBetween(0L, 1L << 50); } private ClusterManagerService createClusterManagerService(boolean makeClusterManager) { @@ -426,7 +426,7 @@ public void testClusterStateUpdateLogging() throws Exception { clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - relativeTimeInMillis += TimeValue.timeValueSeconds(1).millis(); + timeDiffInMillis += TimeValue.timeValueSeconds(1).millis(); return currentState; } @@ -441,7 +441,7 @@ public void onFailure(String source, Exception e) { clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - relativeTimeInMillis += TimeValue.timeValueSeconds(2).millis(); + timeDiffInMillis += TimeValue.timeValueSeconds(2).millis(); throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); } @@ -456,13 +456,13 @@ public void onFailure(String source, Exception e) {} clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - relativeTimeInMillis += TimeValue.timeValueSeconds(3).millis(); + timeDiffInMillis += TimeValue.timeValueSeconds(3).millis(); return ClusterState.builder(currentState).incrementVersion().build(); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - relativeTimeInMillis += TimeValue.timeValueSeconds(4).millis(); + timeDiffInMillis += TimeValue.timeValueSeconds(4).millis(); } @Override @@ -1080,12 +1080,12 @@ public void testLongClusterStateUpdateLogging() throws Exception { final AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { if (event.source().contains("test5")) { - relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( + timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( Settings.EMPTY ).millis() + randomLongBetween(1, 1000000); } if (event.source().contains("test6")) { - relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( + timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( Settings.EMPTY ).millis() + randomLongBetween(1, 1000000); throw new OpenSearchException("simulated error during slow publication which should trigger logging"); @@ -1101,7 +1101,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - relativeTimeInMillis += randomLongBetween( + timeDiffInMillis += randomLongBetween( 0L, ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis() ); @@ -1124,7 +1124,7 @@ public void onFailure(String source, Exception e) { clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( + timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( Settings.EMPTY ).millis() + randomLongBetween(1, 1000000); throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); @@ -1143,7 +1143,7 @@ public void onFailure(String source, Exception e) { clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( + timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( Settings.EMPTY ).millis() + randomLongBetween(1, 1000000); return ClusterState.builder(currentState).incrementVersion().build(); @@ -1162,7 +1162,7 @@ public void onFailure(String source, Exception e) { clusterManagerService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( + timeDiffInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( Settings.EMPTY ).millis() + randomLongBetween(1, 1000000); return currentState; diff --git a/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java index 6c36368bfe446..f139a5d4e3bb1 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/fs/FsBlobContainerTests.java @@ -34,6 +34,9 @@ import org.apache.lucene.tests.mockfile.FilterFileSystemProvider; import org.apache.lucene.tests.mockfile.FilterSeekableByteChannel; import org.apache.lucene.tests.util.LuceneTestCase; +import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.io.PathUtils; import org.opensearch.common.io.PathUtilsForTesting; @@ -54,10 +57,14 @@ import java.nio.file.Path; import java.nio.file.attribute.FileAttribute; import java.nio.file.spi.FileSystemProvider; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Locale; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -118,6 +125,79 @@ public void testIsTempBlobName() { assertThat(FsBlobContainer.isTempBlobName(tempBlobName), is(true)); } + private void testListBlobsByPrefixInSortedOrder(int limit, BlobContainer.BlobNameSortOrder blobNameSortOrder) throws IOException { + + final Path path = PathUtils.get(createTempDir().toString()); + + List blobsInFileSystem = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final String blobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT); + final byte[] blobData = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb + Files.write(path.resolve(blobName), blobData); + blobsInFileSystem.add(blobName); + } + + final FsBlobContainer container = new FsBlobContainer( + new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false), + BlobPath.cleanPath(), + path + ); + + if (limit >= 0) { + container.listBlobsByPrefixInSortedOrder(null, limit, blobNameSortOrder, new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + int actualLimit = Math.min(limit, 10); + assertEquals(actualLimit, blobMetadata.size()); + + if (blobNameSortOrder == BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC) { + blobsInFileSystem.sort(String::compareTo); + } else { + blobsInFileSystem.sort(Collections.reverseOrder(String::compareTo)); + } + List keys = blobsInFileSystem.subList(0, actualLimit); + assertEquals(keys, blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList())); + } + + @Override + public void onFailure(Exception e) { + fail("blobContainer.listBlobsByPrefixInLexicographicOrder failed with exception: " + e.getMessage()); + } + }); + } else { + assertThrows( + IllegalArgumentException.class, + () -> container.listBlobsByPrefixInSortedOrder(null, limit, blobNameSortOrder, new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) {} + + @Override + public void onFailure(Exception e) {} + }) + ); + } + } + + public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException { + testListBlobsByPrefixInSortedOrder(-5, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithZeroLimit() throws IOException { + testListBlobsByPrefixInSortedOrder(0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitLessThanNumberOfRecords() throws IOException { + testListBlobsByPrefixInSortedOrder(8, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitNumberOfRecords() throws IOException { + testListBlobsByPrefixInSortedOrder(10, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + + public void testListBlobsByPrefixInLexicographicOrderWithLimitGreaterThanNumberOfRecords() throws IOException { + testListBlobsByPrefixInSortedOrder(12, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); + } + static class MockFileSystemProvider extends FilterFileSystemProvider { final Consumer onRead; diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index f8ec138d8eff2..713a70c6a7d3e 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -45,8 +45,6 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterSettingsResponse; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.core.xcontent.XContentParser; import org.opensearch.env.EnvironmentSettingsResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -398,16 +396,6 @@ public void testExtensionDependency() throws Exception { } } - public void testParseExtensionDependency() throws Exception { - XContentParser parser = createParser(JsonXContent.jsonXContent, "{\"uniqueId\": \"test1\", \"version\": \"2.0.0\"}"); - - assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken()); - ExtensionDependency dependency = ExtensionDependency.parse(parser); - - assertEquals("test1", dependency.getUniqueId()); - assertEquals(Version.fromString("2.0.0"), dependency.getVersion()); - } - public void testInitialize() throws Exception { ExtensionsManager extensionsManager = new ExtensionsManager(Set.of()); diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java index 8d027b7fca9c2..7dd616c678e74 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java @@ -9,24 +9,33 @@ package org.opensearch.extensions.rest; import java.util.Collections; +import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import org.junit.After; import org.junit.Before; +import org.mockito.Mockito; import org.opensearch.Version; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.common.xcontent.XContentType; import org.opensearch.extensions.ExtensionsManager; +import org.opensearch.extensions.ExtensionsSettings; import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestStatus; @@ -88,8 +97,12 @@ public void testRestInitializeExtensionActionResponse() throws Exception { ExtensionsManager extensionsManager = mock(ExtensionsManager.class); RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(extensionsManager); final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"ad-extension\",\"hostAddress\":\"127.0.0.1\"," - + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"3.0.0\"," - + "\"minimumCompatibleVersion\":\"3.0.0\"}"; + + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"" + + Version.CURRENT.toString() + + "\"," + + "\"minimumCompatibleVersion\":\"" + + Version.CURRENT.minimumCompatibilityVersion().toString() + + "\"}"; RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent(new BytesArray(content), XContentType.JSON) .withMethod(RestRequest.Method.POST) .build(); @@ -106,8 +119,12 @@ public void testRestInitializeExtensionActionFailure() throws Exception { RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(extensionsManager); final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"\",\"hostAddress\":\"127.0.0.1\"," - + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"3.0.0\"," - + "\"minimumCompatibleVersion\":\"3.0.0\"}"; + + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"" + + Version.CURRENT.toString() + + "\"," + + "\"minimumCompatibleVersion\":\"" + + Version.CURRENT.minimumCompatibilityVersion().toString() + + "\"}"; RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent(new BytesArray(content), XContentType.JSON) .withMethod(RestRequest.Method.POST) .build(); @@ -121,4 +138,98 @@ public void testRestInitializeExtensionActionFailure() throws Exception { ); } + public void testRestInitializeExtensionActionResponseWithAdditionalSettings() throws Exception { + Setting boolSetting = Setting.boolSetting("boolSetting", false, Setting.Property.ExtensionScope); + Setting stringSetting = Setting.simpleString("stringSetting", "default", Setting.Property.ExtensionScope); + Setting intSetting = Setting.intSetting("intSetting", 0, Setting.Property.ExtensionScope); + Setting listSetting = Setting.listSetting( + "listSetting", + List.of("first", "second", "third"), + Function.identity(), + Setting.Property.ExtensionScope + ); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of(boolSetting, stringSetting, intSetting, listSetting)); + ExtensionsManager spy = spy(extensionsManager); + + // optionally, you can stub out some methods: + when(spy.getAdditionalSettings()).thenCallRealMethod(); + Mockito.doCallRealMethod().when(spy).loadExtension(any(ExtensionsSettings.Extension.class)); + Mockito.doNothing().when(spy).initialize(); + RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(spy); + final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"ad-extension\",\"hostAddress\":\"127.0.0.1\"," + + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"" + + Version.CURRENT.toString() + + "\"," + + "\"minimumCompatibleVersion\":\"" + + Version.CURRENT.minimumCompatibilityVersion().toString() + + "\",\"boolSetting\":true,\"stringSetting\":\"customSetting\",\"intSetting\":5,\"listSetting\":[\"one\",\"two\",\"three\"]}"; + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent(new BytesArray(content), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(); + + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + restInitializeExtensionAction.handleRequest(request, channel, null); + + assertEquals(channel.capturedResponse().status(), RestStatus.ACCEPTED); + assertTrue(channel.capturedResponse().content().utf8ToString().contains("A request to initialize an extension has been sent.")); + + Optional extension = spy.lookupExtensionSettingsById("ad-extension"); + assertTrue(extension.isPresent()); + assertEquals(true, extension.get().getAdditionalSettings().get(boolSetting)); + assertEquals("customSetting", extension.get().getAdditionalSettings().get(stringSetting)); + assertEquals(5, extension.get().getAdditionalSettings().get(intSetting)); + + List listSettingValue = (List) extension.get().getAdditionalSettings().get(listSetting); + assertTrue(listSettingValue.contains("one")); + assertTrue(listSettingValue.contains("two")); + assertTrue(listSettingValue.contains("three")); + } + + public void testRestInitializeExtensionActionResponseWithAdditionalSettingsUsingDefault() throws Exception { + Setting boolSetting = Setting.boolSetting("boolSetting", false, Setting.Property.ExtensionScope); + Setting stringSetting = Setting.simpleString("stringSetting", "default", Setting.Property.ExtensionScope); + Setting intSetting = Setting.intSetting("intSetting", 0, Setting.Property.ExtensionScope); + Setting listSetting = Setting.listSetting( + "listSetting", + List.of("first", "second", "third"), + Function.identity(), + Setting.Property.ExtensionScope + ); + ExtensionsManager extensionsManager = new ExtensionsManager(Set.of(boolSetting, stringSetting, intSetting, listSetting)); + ExtensionsManager spy = spy(extensionsManager); + + // optionally, you can stub out some methods: + when(spy.getAdditionalSettings()).thenCallRealMethod(); + Mockito.doCallRealMethod().when(spy).loadExtension(any(ExtensionsSettings.Extension.class)); + Mockito.doNothing().when(spy).initialize(); + RestInitializeExtensionAction restInitializeExtensionAction = new RestInitializeExtensionAction(spy); + final String content = "{\"name\":\"ad-extension\",\"uniqueId\":\"ad-extension\",\"hostAddress\":\"127.0.0.1\"," + + "\"port\":\"4532\",\"version\":\"1.0\",\"opensearchVersion\":\"" + + Version.CURRENT.toString() + + "\"," + + "\"minimumCompatibleVersion\":\"" + + Version.CURRENT.minimumCompatibilityVersion().toString() + + "\"}"; + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent(new BytesArray(content), XContentType.JSON) + .withMethod(RestRequest.Method.POST) + .build(); + + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + restInitializeExtensionAction.handleRequest(request, channel, null); + + assertEquals(channel.capturedResponse().status(), RestStatus.ACCEPTED); + assertTrue(channel.capturedResponse().content().utf8ToString().contains("A request to initialize an extension has been sent.")); + + Optional extension = spy.lookupExtensionSettingsById("ad-extension"); + assertTrue(extension.isPresent()); + assertEquals(false, extension.get().getAdditionalSettings().get(boolSetting)); + assertEquals("default", extension.get().getAdditionalSettings().get(stringSetting)); + assertEquals(0, extension.get().getAdditionalSettings().get(intSetting)); + + List listSettingValue = (List) extension.get().getAdditionalSettings().get(listSetting); + assertTrue(listSettingValue.contains("first")); + assertTrue(listSettingValue.contains("second")); + assertTrue(listSettingValue.contains("third")); + } + } diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index d9d87196ca289..32b8fb5a4dc62 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -252,7 +252,7 @@ private IndexService newIndexService(IndexModule module) throws IOException { writableRegistry(), () -> false, null, - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), translogFactorySupplier ); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java new file mode 100644 index 0000000000000..5b9135afb66f3 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.test.OpenSearchTestCase; + +public class RemoteStoreUtilsTests extends OpenSearchTestCase { + + public void testInvertToStrInvalid() { + assertThrows(IllegalArgumentException.class, () -> RemoteStoreUtils.invertLong(-1)); + } + + public void testInvertToStrValid() { + assertEquals("9223372036854774573", RemoteStoreUtils.invertLong(1234)); + assertEquals("0000000000000001234", RemoteStoreUtils.invertLong(9223372036854774573L)); + } + + public void testInvertToLongInvalid() { + assertThrows(IllegalArgumentException.class, () -> RemoteStoreUtils.invertLong("-5")); + } + + public void testInvertToLongValid() { + assertEquals(1234, RemoteStoreUtils.invertLong("9223372036854774573")); + assertEquals(9223372036854774573L, RemoteStoreUtils.invertLong("0000000000000001234")); + } + + public void testinvert() { + assertEquals(0, RemoteStoreUtils.invertLong(RemoteStoreUtils.invertLong(0))); + assertEquals(Long.MAX_VALUE, RemoteStoreUtils.invertLong(RemoteStoreUtils.invertLong(Long.MAX_VALUE))); + for (int i = 0; i < 10; i++) { + long num = randomLongBetween(1, Long.MAX_VALUE); + assertEquals(num, RemoteStoreUtils.invertLong(RemoteStoreUtils.invertLong(num))); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index 7a9cbc12d823b..324315505987b 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -24,6 +24,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; @@ -41,14 +42,16 @@ public class RemoteSegmentStoreDirectoryFactoryTests extends OpenSearchTestCase private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; + private ThreadPool threadPool; private RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory; @Before public void setup() { repositoriesServiceSupplier = mock(Supplier.class); repositoriesService = mock(RepositoriesService.class); + threadPool = mock(ThreadPool.class); when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); - remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier); + remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier, threadPool); } public void testNewDirectory() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 3417e7b0aee04..66e4b9a357b85 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -30,12 +30,14 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.NoSuchFileException; @@ -45,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -56,6 +59,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.doReturn; +import static org.hamcrest.CoreMatchers.is; public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private RemoteDirectory remoteDataDirectory; @@ -65,21 +69,31 @@ public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; private IndexShard indexShard; private SegmentInfos segmentInfos; + private ThreadPool threadPool; @Before public void setup() throws IOException { remoteDataDirectory = mock(RemoteDirectory.class); remoteMetadataDirectory = mock(RemoteDirectory.class); mdLockManager = mock(RemoteStoreMetadataLockManager.class); + threadPool = mock(ThreadPool.class); - remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory, mdLockManager); + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + remoteDataDirectory, + remoteMetadataDirectory, + mdLockManager, + threadPool + ); Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build(); + ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); try (Store store = indexShard.store()) { segmentInfos = store.readLastCommittedSegmentsInfo(); } + + when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); } @After @@ -766,41 +780,76 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException { assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init()); } - public void testDeleteStaleCommitsException() throws IOException { + public void testDeleteStaleCommitsException() throws Exception { + populateMetadata(); when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenThrow( new IOException("Error reading") ); - assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.deleteStaleSegments(5)); + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not + // invoked + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); + verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); + } + + public void testDeleteStaleCommitsExceptionWhileScheduling() throws Exception { + populateMetadata(); + doThrow(new IllegalArgumentException()).when(threadPool).executor(any(String.class)); + + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not + // invoked + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); + verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); + } + + public void testDeleteStaleCommitsWithDeletionAlreadyInProgress() throws Exception { + populateMetadata(); + remoteSegmentStoreDirectory.canDeleteStaleCommits.set(false); + + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not + // invoked + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(false))); + verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); } - public void testDeleteStaleCommitsWithinThreshold() throws IOException { + public void testDeleteStaleCommitsWithinThreshold() throws Exception { populateMetadata(); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=5 here so that none of the metadata files will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(5); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(5); + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).openInput(any(String.class), eq(IOContext.DEFAULT)); } - public void testDeleteStaleCommitsActualDelete() throws IOException { + public void testDeleteStaleCommitsActualDelete() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); } - public void testDeleteStaleCommitsActualDeleteIOException() throws IOException { + public void testDeleteStaleCommitsActualDeleteIOException() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -813,17 +862,18 @@ public void testDeleteStaleCommitsActualDeleteIOException() throws IOException { doThrow(new IOException("Error")).when(remoteDataDirectory).deleteFile(segmentFileWithException); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).deleteFile("metadata__1__5__abc"); } - public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOException { + public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -836,13 +886,14 @@ public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOExc doThrow(new NoSuchFileException(segmentFileWithException)).when(remoteDataDirectory).deleteFile(segmentFileWithException); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); } diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index 8f8789a3a0323..74ef289c4b75f 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1748,4 +1748,21 @@ public void testCanMatchSearchAfterDescEqualMin() throws IOException { primarySort.order(SortOrder.DESC); assertEquals(SearchService.canMatchSearchAfter(searchAfter, minMax, primarySort), true); } + + /** + * Test canMatchSearchAfter with missing value, even if min/max is out of range + * Min = 0L, Max = 9L, search_after = -1L + * Expected result is canMatch = true + */ + public void testCanMatchSearchAfterWithMissing() throws IOException { + FieldDoc searchAfter = new FieldDoc(0, 0, new Long[] { -1L }); + MinAndMax minMax = new MinAndMax(0L, 9L); + FieldSortBuilder primarySort = new FieldSortBuilder("test"); + primarySort.order(SortOrder.DESC); + // Should be false without missing values + assertEquals(SearchService.canMatchSearchAfter(searchAfter, minMax, primarySort), false); + primarySort.missing("_last"); + // Should be true with missing values + assertEquals(SearchService.canMatchSearchAfter(searchAfter, minMax, primarySort), true); + } } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 0bb2b604e8f1a..88899a1b282af 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1835,7 +1835,7 @@ public void onFailure(final Exception e) { emptyMap(), null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), repositoriesServiceReference::get, fileCacheCleaner ); diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle index 310b088674d5c..b1a87fe6c3112 100644 --- a/test/fixtures/hdfs-fixture/build.gradle +++ b/test/fixtures/hdfs-fixture/build.gradle @@ -44,6 +44,7 @@ dependencies { exclude module: 'guava' exclude module: 'protobuf-java' exclude group: 'org.codehaus.jackson' + exclude group: "org.bouncycastle" } api "org.codehaus.jettison:jettison:${versions.jettison}" api "org.apache.commons:commons-compress:1.23.0" @@ -51,7 +52,6 @@ dependencies { api "org.apache.logging.log4j:log4j-core:${versions.log4j}" api "io.netty:netty-all:${versions.netty}" api 'com.google.code.gson:gson:2.10.1' - api "org.bouncycastle:bcpkix-jdk15to18:${versions.bouncycastle}" api "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:${versions.jackson}" api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api "com.fasterxml.woodstox:woodstox-core:${versions.woodstox}" @@ -65,4 +65,5 @@ dependencies { api "org.apache.commons:commons-text:1.10.0" api "commons-net:commons-net:3.9.0" runtimeOnly "com.google.guava:guava:${versions.guava}" + } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index ea9e9342673db..7f3819563dcbd 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -666,7 +666,7 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException {