From 791963631ffd9f2ea3b629d18fd84d202da924c4 Mon Sep 17 00:00:00 2001 From: kolchfa-aws <105444904+kolchfa-aws@users.noreply.github.com> Date: Fri, 22 Sep 2023 17:31:04 -0400 Subject: [PATCH] Add concurrent segment search documentation (#4990) * Add concurrent segment search documentation Signed-off-by: Fanit Kolchina * Add API information Signed-off-by: Fanit Kolchina * Small rewording Signed-off-by: Fanit Kolchina * Add tech review feedback Signed-off-by: Fanit Kolchina * Fix link Signed-off-by: Fanit Kolchina * Add API section Signed-off-by: Fanit Kolchina * Small rewriting Signed-off-by: Fanit Kolchina * Rewriting Signed-off-by: Fanit Kolchina * Tech review feedback Signed-off-by: Fanit Kolchina * Apply suggestions from code review Co-authored-by: Melissa Vagi Signed-off-by: kolchfa-aws <105444904+kolchfa-aws@users.noreply.github.com> * Update profile.md Signed-off-by: kolchfa-aws <105444904+kolchfa-aws@users.noreply.github.com> * Update _api-reference/profile.md Co-authored-by: Melissa Vagi Signed-off-by: kolchfa-aws <105444904+kolchfa-aws@users.noreply.github.com> * Update concurrent-segment-search.md Signed-off-by: kolchfa-aws <105444904+kolchfa-aws@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Nathan Bower Signed-off-by: kolchfa-aws <105444904+kolchfa-aws@users.noreply.github.com> * Implemented editorial comments Signed-off-by: Fanit Kolchina --------- Signed-off-by: Fanit Kolchina Signed-off-by: kolchfa-aws <105444904+kolchfa-aws@users.noreply.github.com> Co-authored-by: Melissa Vagi Co-authored-by: Nathan Bower --- _api-reference/index-apis/stats.md | 17 +- _api-reference/nodes-apis/nodes-stats.md | 4 + _api-reference/profile.md | 219 ++++++++++++++++++- _search-plugins/concurrent-segment-search.md | 162 ++++++++++++++ 4 files changed, 396 insertions(+), 6 deletions(-) create mode 100644 _search-plugins/concurrent-segment-search.md diff --git a/_api-reference/index-apis/stats.md b/_api-reference/index-apis/stats.md index 4bd1b2f80e..baffd2edb0 100644 --- a/_api-reference/index-apis/stats.md +++ b/_api-reference/index-apis/stats.md @@ -5,7 +5,7 @@ parent: Index APIs nav_order: 72 --- -# Index stats +# Index Stats The Index Stats API provides index statistics. For data streams, the API provides statistics for the stream's backing indexes. By default, the returned statistics are index level. To receive shard-level statistics, set the `level` parameter to `shards`. @@ -792,4 +792,17 @@ GET /testindex*/_stats?expand_wildcards=open,hidden ```json GET /testindex/_stats?level=shards ``` -{% include copy-curl.html %} \ No newline at end of file +{% include copy-curl.html %} + +## Concurrent segment search + +Starting in OpenSearch 2.10, [concurrent segment search]({{site.url}}{{site.baseurl}}/search-plugins/concurrent-segment-search/) allows each shard-level request to search segments in parallel during the query phase. If you [enable the experimental concurrent segment search feature flag]({{site.url}}{{site.baseurl}}/search-plugins/concurrent-segment-search#enabling-the-feature-flag), the Index Stats API response will contain several additional fields with statistics about slices (units of work executed by a thread). These fields will be provided whether or not the cluster and index settings for concurrent segment search are enabled. For more information about slices, see [Concurrent segment search]({{site.url}}{{site.baseurl}}/search-plugins/concurrent-segment-search#searching-segments-concurrently). + +The following table provides information about the added response fields. + +|Response field | Description | +|:--- |:--- | +|`search.concurrent_avg_slice_count` |The average slice count of all search requests. This is computed as the total slice count divided by the total number of concurrent search requests. | +|`search.concurrent_query_total` |The total number of query operations that use concurrent segment search. | +|`search.concurrent_query_time_in_millis` |The total amount of time taken by all query operations that use concurrent segment search, in milliseconds. | +|`search.concurrent_query_current` |The number of currently running query operations that use concurrent segment search. | diff --git a/_api-reference/nodes-apis/nodes-stats.md b/_api-reference/nodes-apis/nodes-stats.md index b17cc3ec38..8eaf965893 100644 --- a/_api-reference/nodes-apis/nodes-stats.md +++ b/_api-reference/nodes-apis/nodes-stats.md @@ -1140,6 +1140,10 @@ total_rejections_breakup_shadow_mode.throughput_degradation_limits | Integer | T enabled | Boolean | Specifies whether the shard indexing pressure feature is turned on for the node. enforced | Boolean | If true, the shard indexing pressure runs in enforced mode (there are rejections). If false, the shard indexing pressure runs in shadow mode (there are no rejections, but statistics are recorded and can be retrieved in the `total_rejections_breakup_shadow_mode` object). Only applicable if shard indexing pressure is enabled. +## Concurrent segment search + +Starting in OpenSearch 2.10, [concurrent segment search]({{site.url}}{{site.baseurl}}/search-plugins/concurrent-segment-search/) allows each shard-level request to search segments in parallel during the query phase. If you [enable the experimental concurrent segment search feature flag]({{site.url}}{{site.baseurl}}/search-plugins/concurrent-segment-search#enabling-the-feature-flag), the Nodes Stats API response will contain several additional fields with statistics about slices (units of work executed by a thread). For the descriptions of those fields, see [Index Stats API]({{site.url}}{{site.baseurl}}/api-reference/index-apis/stats#concurrent-segment-search). + ## Required permissions If you use the Security plugin, make sure you have the appropriate permissions: `cluster:monitor/nodes/stats`. diff --git a/_api-reference/profile.md b/_api-reference/profile.md index a09b5b8753..27ea4d9c55 100644 --- a/_api-reference/profile.md +++ b/_api-reference/profile.md @@ -221,14 +221,14 @@ Field | Data type | Description `profile.shards` | Array of objects | A search request can be executed against one or more shards in the index, and a search may involve one or more indexes. Thus, the `profile.shards` array contains profiling information for each shard that was involved in the search. `profile.shards.id` | String | The shard ID of the shard in the `[node-ID][index-name][shard-ID]` format. `profile.shards.searches` | Array of objects | A search represents a query executed against the underlying Lucene index. Most search requests execute a single search against a Lucene index, but some search requests can execute more than one search. For example, including a global aggregation results in a secondary `match_all` query for the global context. The `profile.shards` array contains profiling information about each search execution. -[`profile.shards.searches.query`](#the-query-object) | Array of objects | Profiling information about the query execution. +[`profile.shards.searches.query`](#the-query-array) | Array of objects | Profiling information about the query execution. `profile.shards.searches.rewrite_time` | Integer | All Lucene queries are rewritten. A query and its children may be rewritten more than once, until the query stops changing. The rewriting process involves performing optimizations, such as removing redundant clauses or replacing a query path with a more efficient one. After the rewriting process, the original query may change significantly. The `rewrite_time` field contains the cumulative total rewrite time for the query and all its children, in nanoseconds. [`profile.shards.searches.collector`](#the-collector-array) | Array of objects | Profiling information about the Lucene collectors that ran the search. [`profile.shards.aggregations`](#aggregations) | Array of objects | Profiling information about the aggregation execution. -### The `query` object +### The `query` array -The `query` object contains the following fields. +The `query` array contains objects with the following fields. Field | Data type | Description :--- | :--- | :--- @@ -753,4 +753,215 @@ Field | Description `post_collection`| Contains the time spent running the aggregation’s `postCollection()` callback method. `build_aggregation`| Contains the time spent running the aggregation’s `buildAggregations()` method, which builds the results of this aggregation. `reduce`| Contains the time spent in the `reduce` phase. -`_count` | Contains the number of invocations of a ``. For example, `build_leaf_collector_count` contains the number of invocations of the `build_leaf_collector` method. \ No newline at end of file +`_count` | Contains the number of invocations of a ``. For example, `build_leaf_collector_count` contains the number of invocations of the `build_leaf_collector` method. + +## Concurrent segment search + +Starting in OpenSearch 2.10, [concurrent segment search]({{site.url}}{{site.baseurl}}/search-plugins/concurrent-segment-search/) allows each shard-level request to search segments in parallel during the query phase. If you enable the experimental concurrent segment search feature flag, the Profile API response will contain several additional fields with statistics about _slices_. + +A slice is the unit of work that can be executed by a thread. Each query can be partitioned into multiple slices, with each slice containing one or more segments. All the slices can be executed either in parallel or in some order depending on the available threads in the pool. + +In general, the max/min/avg slice time captures statistics across all slices for a timing type. For example, when profiling aggregations, the `max_slice_time_in_nanos` field in the `aggregations` section shows the maximum time consumed by the aggregation operation and its children across all slices. + +#### Example response + +The following is an example response for a concurrent search with three segment slices: + +
+ + Response + + {: .text-delta} + +```json +{ + "took": 76, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": { + "value": 5, + "relation": "eq" + }, + "max_score": 1, + "hits": [ + ... + ] + }, + "aggregations": { + ... + }, + "profile": { + "shards": [ + { + "id": "[Sn2zHhcMTRetEjXvppU8bA][idx][0]", + "inbound_network_time_in_millis": 0, + "outbound_network_time_in_millis": 0, + "searches": [ + { + "query": [ + { + "type": "MatchAllDocsQuery", + "description": "*:*", + "time_in_nanos": 429246, + "breakdown": { + "set_min_competitive_score_count": 0, + "match_count": 0, + "shallow_advance_count": 0, + "set_min_competitive_score": 0, + "next_doc": 5485, + "match": 0, + "next_doc_count": 5, + "score_count": 5, + "compute_max_score_count": 0, + "compute_max_score": 0, + "advance": 3350, + "advance_count": 3, + "score": 5920, + "build_scorer_count": 6, + "create_weight": 429246, + "shallow_advance": 0, + "create_weight_count": 1, + "build_scorer": 2221054 + } + } + ], + "rewrite_time": 12442, + "collector": [ + { + "name": "QueryCollectorManager", + "reason": "search_multi", + "time_in_nanos": 6786930, + "reduce_time_in_nanos": 5892759, + "max_slice_time_in_nanos": 5951808, + "min_slice_time_in_nanos": 5798174, + "avg_slice_time_in_nanos": 5876588, + "slice_count": 3, + "children": [ + { + "name": "SimpleTopDocsCollectorManager", + "reason": "search_top_hits", + "time_in_nanos": 1340186, + "reduce_time_in_nanos": 1084060, + "max_slice_time_in_nanos": 457165, + "min_slice_time_in_nanos": 433706, + "avg_slice_time_in_nanos": 443332, + "slice_count": 3 + }, + { + "name": "NonGlobalAggCollectorManager: [histo]", + "reason": "aggregation", + "time_in_nanos": 5366791, + "reduce_time_in_nanos": 4637260, + "max_slice_time_in_nanos": 4526680, + "min_slice_time_in_nanos": 4414049, + "avg_slice_time_in_nanos": 4487122, + "slice_count": 3 + } + ] + } + ] + } + ], + "aggregations": [ + { + "type": "NumericHistogramAggregator", + "description": "histo", + "time_in_nanos": 16454372, + "max_slice_time_in_nanos": 7342096, + "min_slice_time_in_nanos": 4413728, + "avg_slice_time_in_nanos": 5430066, + "breakdown": { + "min_build_leaf_collector": 4320259, + "build_aggregation_count": 3, + "post_collection": 9942, + "max_collect_count": 2, + "initialize_count": 3, + "reduce_count": 0, + "avg_collect": 146319, + "max_build_aggregation": 2826399, + "avg_collect_count": 1, + "max_build_leaf_collector": 4322299, + "min_build_leaf_collector_count": 1, + "build_aggregation": 3038635, + "min_initialize": 1057, + "max_reduce": 0, + "build_leaf_collector_count": 3, + "avg_reduce": 0, + "min_collect_count": 1, + "avg_build_leaf_collector_count": 1, + "avg_build_leaf_collector": 4321197, + "max_collect": 181266, + "reduce": 0, + "avg_build_aggregation": 954896, + "min_post_collection": 1236, + "max_initialize": 11603, + "max_post_collection": 5350, + "collect_count": 5, + "avg_post_collection": 2793, + "avg_initialize": 4860, + "post_collection_count": 3, + "build_leaf_collector": 4322299, + "min_collect": 78519, + "min_build_aggregation": 8543, + "initialize": 11971068, + "max_build_leaf_collector_count": 1, + "min_reduce": 0, + "collect": 181838 + }, + "debug": { + "total_buckets": 1 + } + } + ] + } + ] + } +} +``` +
+ +### Modified or added response fields + +The following sections contain definitions of all modified or added response fields for concurrent segment search. + +#### The `query` array + +|Field |Description | +|:--- |:--- | +|`time_in_nanos` |For concurrent segment search, `time_in_nanos` is the cumulative amount of time taken to run all methods across all slices, in nanoseconds. This is not equivalent to the actual amount of time the query took to run because it does not take into account that multiple slices can run the methods in parallel. | +|`breakdown.` |For concurrent segment search, this field contains the total amount of time taken by all segments to run a method. | +|`breakdown._count` |For concurrent segment search, this field contains the total number of invocations of a `` obtained by adding the number of method invocations for all segments. | + +#### The `collector` array + +|Field |Description | +|:--- |:--- | +|`time_in_nanos` |The total elapsed time for this collector, in nanoseconds. For concurrent segment search, `time_in_nanos` is the total amount of time across all slices (`max(slice_end_time) - min(slice_start_time)`). | +|`max_slice_time_in_nanos` |The maximum amount of time taken by any slice, in nanoseconds. | +|`min_slice_time_in_nanos` |The minimum amount of time taken by any slice, in nanoseconds. | +|`avg_slice_time_in_nanos` |The average amount of time taken by any slice, in nanoseconds. | +|`slice_count` |The total slice count for this query. | +|`reduce_time_in_nanos` |The amount of time taken to reduce results for all slice collectors, in nanoseconds. | + +#### The `aggregations` array + +|Field |Description | +|:--- |:--- | +|`time_in_nanos` |The total elapsed time for this aggregation, in nanoseconds. For concurrent segment search, `time_in_nanos` is the total amount of time across all slices (`max(slice_end_time) - min(slice_start_time)`). | +|`max_slice_time_in_nanos` |The maximum amount of time taken by any slice to run an aggregation, in nanoseconds. | +|`min_slice_time_in_nanos` |The minimum amount of time taken by any slice to run an aggregation, in nanoseconds. | +|`avg_slice_time_in_nanos` |The average amount of time taken by any slice to run an aggregation, in nanoseconds. | +|`` |The total elapsed time across all slices (`max(slice_end_time) - min(slice_start_time)`). For example, for the `collect` method, it is the total time spent collecting documents into buckets across all slices. | +|`max_` |The maximum amount of time taken by any slice to run an aggregation method. | +|`min_`|The minimum amount of time taken by any slice to run an aggregation method. | +|`avg_` |The average amount of time taken by any slice to run an aggregation method. | +|`_count` |The total method count across all slices. For example, for the `collect` method, it is the total number of invocations of this method needed to collect documents into buckets across all slices. | +|`max__count` |The maximum number of invocations of a `` on any slice. | +|`min__count` |The minimum number of invocations of a `` on any slice. | +|`avg__count` |The average number of invocations of a `` on any slice. | diff --git a/_search-plugins/concurrent-segment-search.md b/_search-plugins/concurrent-segment-search.md new file mode 100644 index 0000000000..89380dde19 --- /dev/null +++ b/_search-plugins/concurrent-segment-search.md @@ -0,0 +1,162 @@ +--- +layout: default +title: Concurrent segment search +nav_order: 53 +--- + +# Concurrent segment search + +This is an experimental feature and is not recommended for use in a production environment. For updates on the progress of the feature or if you want to leave feedback, see the associated [GitHub issue](https://github.com/opensearch-project/OpenSearch/issues/2587) or the [project board](https://github.com/orgs/opensearch-project/projects/117/views/1). +{: .warning} + +Use concurrent segment search to search segments in parallel during the query phase. Cases in which concurrent segment search improves search latency include the following: + +- When sending long-running requests, for example, requests that contain aggregations or large ranges +- As an alternative to force-merging segments into a single segment in order to improve performance + +## Background + +In OpenSearch, each search request follows the scatter-gather protocol. The coordinating node receives a search request, evaluates which shards are needed to serve this request, and sends a shard-level search request to each of those shards. Each shard that receives the request executes the request locally using Lucene and returns the results. The coordinating node merges the responses received from all shards and sends the search response back to the client. Optionally, the coordinating node can perform a fetch phase before returning the final results to the client if any document field or the entire document is requested by the client as part of the response. + +## Searching segments concurrently + +Without concurrent segment search, Lucene executes a request sequentially across all segments on each shard during the query phase. The query phase then collects the top hits for the search request. With concurrent segment search, each shard-level request will search the segments in parallel during the query phase. For each shard, the segments are divided into multiple _slices_. Each slice is the unit of work that can be executed in parallel on a separate thread, so the slice count determines the maximum degree of parallelism for a shard-level request. Once all the slices complete their work, Lucene performs a reduce operation on the slices, merging them and creating the final result for this shard-level request. Slices are executed using a new `index_searcher` thread pool, which is different from the `search` thread pool that handles shard-level requests. + +## Enabling the feature flag + +There are several methods for enabling concurrent segment search, depending on the installation type. + +### Enable in opensearch.yml + +If you are running an OpenSearch cluster and want to enable concurrent segment search in the config file, add the following line to `opensearch.yml`: + +```yaml +opensearch.experimental.feature.concurrent_segment_search.enabled=true +``` +{% include copy.html %} + +### Enable with Docker containers + +If you’re running Docker, add the following line to `docker-compose.yml` under the `opensearch-node` > `environment` section: + +```bash +OPENSEARCH_JAVA_OPTS="-Dopensearch.experimental.feature.concurrent_segment_search.enabled=true" +``` +{% include copy.html %} + +### Enable on a node using a tarball installation + +To enable concurrent segment search on a tarball installation, provide the new JVM parameter either in `config/jvm.options` or `OPENSEARCH_JAVA_OPTS`. + +#### OPTION 1: Modify jvm.options + +Add the following lines to `config/jvm.options` before starting the `opensearch` process to enable the feature and its dependency: + +```bash +-Dopensearch.experimental.feature.concurrent_segment_search.enabled=true +``` +{% include copy.html %} + +Then run OpenSearch: + +```bash +./bin/opensearch +``` +{% include copy.html %} + +#### OPTION 2: Enable with an environment variable + +As an alternative to directly modifying `config/jvm.options`, you can define the properties by using an environment variable. This can be done using a single command when you start OpenSearch or by defining the variable with `export`. + +To add these flags inline when starting OpenSearch, run the following command: + +```bash +OPENSEARCH_JAVA_OPTS="-Dopensearch.experimental.feature.concurrent_segment_search.enabled=true" ./opensearch-{{site.opensearch_version}}/bin/opensearch +``` +{% include copy.html %} + +If you want to define the environment variable separately prior to running OpenSearch, run the following commands: + +```bash +export OPENSEARCH_JAVA_OPTS="-Dopensearch.experimental.feature.concurrent_segment_search.enabled=true" +``` +{% include copy.html %} + +```bash +./bin/opensearch +``` +{% include copy.html %} + +## Disabling concurrent search at the index or cluster level + +After you enable the experimental feature flag, all search requests will use concurrent segment search during the query phase. To disable concurrent segment search for all indexes, set the following dynamic cluster setting: + +```json +PUT _cluster/settings +{ + "persistent":{ + "search.concurrent_segment_search.enabled": false + } +} +``` +{% include copy-curl.html %} + +To disable concurrent segment search for a particular index, specify the index name in the endpoint: + +```json +PUT /_settings +{ + "index.search.concurrent_segment_search.enabled": false +} +``` +{% include copy-curl.html %} + +## Slicing mechanisms + +You can choose one of two available mechanisms for assigning segments to slices: the default [Lucene mechanism](#the-lucene-mechanism) or the [max slice count mechanism](#the-max-slice-count-mechanism). + +### The Lucene mechanism + +By default, Lucene assigns a maximum of 250K documents or 5 segments (whichever is met first) to each slice in a shard. For example, consider a shard with 11 segments. The first 5 segments have 250K documents each, and the next 6 segments have 20K documents each. The first 5 segments will be assigned to 1 slice each because they each contain the maximum number of documents allowed for a slice. Then the next 5 segments will all be assigned to another single slice because of the maximum allowed segment count for a slice. The 11th slice will be assigned to a separate slice. + +### The max slice count mechanism + +The _max slice count_ mechanism is an alternative slicing mechanism that uses a statically configured maximum number of slices and divides segments among the slices in a round-robin fashion. This is useful when there are already too many top-level shard requests and you want to limit the number of slices per request in order to reduce competition between the slices. + +### Setting the slicing mechanism + +By default, concurrent segment search uses the Lucene mechanism to calculate the number of slices for each shard-level request. To use the max slice count mechanism instead, configure the `search.concurrent.max_slice_count` static setting in the `opensearch.yml` config file: + +```yaml +search.concurrent.max_slice_count: 2 +``` +{% include copy.html %} + +The `search.concurrent.max_slice_count` setting can take the following valid values: +- `0`: Use the default Lucene mechanism. +- Positive integer: Use the max target slice count mechanism. Usually, a value between 2 and 8 should be sufficient. + +## The `terminate_after` search parameter + +The [`terminate_after` search parameter]({{site.url}}{{site.baseurl}}/api-reference/search/#url-parameters) is used to terminate a search request once a specified number of documents has been collected. In the non-concurrent search workflow, this count is evaluated for each shard. However, in the concurrent search workflow, it is evaluated for each leaf slice instead in order to avoid synchronizing document counts between threads. With concurrent search, the request performs more work than expected because each segment slice on the shard collects up to the specified number of documents. The intent to terminate collection after the threshold is reached is evaluated at the slice level. Thus, the hit count in the results will be greater than the `terminate_after` threshold but less than `slice_count * terminate_after`. The actual number of returned hits will be controlled by the `size` parameter. + +## API changes + +If you enable the concurrent segment search feature flag, the following Stats API responses will contain several additional fields with statistics about slices: + +- [Index Stats]({{site.url}}{{site.baseurl}}/api-reference/index-apis/stats/) +- [Nodes Stats]({{site.url}}{{site.baseurl}}/api-reference/nodes-apis/nodes-stats/) + +For descriptions of the added fields, see [Index Stats API]({{site.url}}{{site.baseurl}}/api-reference/index-apis/stats#concurrent-segment-search). + +Additionally, some [Profile API]({{site.url}}{{site.baseurl}}/api-reference/profile/) response fields will be modified and others added. For more information, see the [concurrent segment search section of the Profile API]({{site.url}}{{site.baseurl}}/api-reference/profile#concurrent-segment-search). + +## Limitations + +Parent aggregations on [join]({{site.url}}{{site.baseurl}}/field-types/supported-field-types/join/) fields do not support the concurrent search model. Thus, if a search request contains a parent aggregation, the aggregation will be executed using the non-concurrent path even if concurrent segment search is enabled at the cluster level. + +## Developer information: AggregatorFactory changes + +Because of implementation details, not all aggregator types can support concurrent segment search. To accommodate this, we have introduced a [`supportsConcurrentSegmentSearch()`](https://github.com/opensearch-project/OpenSearch/blob/bb38ed4836496ac70258c2472668325a012ea3ed/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java#L121) method in the `AggregatorFactory` class to indicate whether a given aggregation type supports concurrent segment search. By default, this method returns `false`. Any aggregator that needs to support concurrent segment search must override this method in its own factory implementation. + +To ensure that a custom plugin-based `Aggregator` implementation works with the concurrent search path, plugin developers can verify their implementation with concurrent search enabled and then update the plugin to override the [`supportsConcurrentSegmentSearch()`](https://github.com/opensearch-project/OpenSearch/blob/bb38ed4836496ac70258c2472668325a012ea3ed/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java#L121) method to return `true`.