Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrent Segment Search] Add support for aggregation profiler with concurrent aggregation #8331

Closed
ticheng-aws opened this issue Jun 29, 2023 · 5 comments · Fixed by #8801 or #9017
Assignees
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request

Comments

@ticheng-aws
Copy link
Contributor

Placeholder issue to support aggregation profiler with concurrent aggregation flow (both for global and non-global aggregation). This will be a subtask for #7354.

@ticheng-aws ticheng-aws added enhancement Enhancement or improvement to existing feature or request untriaged labels Jun 29, 2023
@anasalkouz anasalkouz moved this from Todo to In Progress in Concurrent Search Jun 29, 2023
@ticheng-aws
Copy link
Contributor Author

ticheng-aws commented Jun 29, 2023

Background

The search profile aggregations section presents comprehensive timing information for the execution of the aggregation tree by a specific shard. The breakdown component provides detailed statistics regarding the low-level execution. Existing types of breakdown for aggregation timing include:

  1. initialize: This property measures the elapsed time, in nanoseconds, for the execution of preCollection() callback method during AggregationCollectorManager creation.
  2. build_leaf_collector: This property measures the time, in nanoseconds, spent running the getLeafCollector() method of the aggregation, which creates a new collector to collect the given context.
  3. collect: This property measures the time, in nanoseconds, spent collecting the documents into buckets.
  4. post_collection: This property measures time, in nanoseconds, spent running the aggregation’s postCollection() callback method.
  5. build_aggregation: This property measures the elapsed time, in nanoseconds, spent running the aggregation’s buildAggregations() method, which builds the results of this aggregation.
  6. reduce: The elapsed time in the reduce phase.

Taking build_leaf_collector as an example, in the non-concurrent search case, the build_leaf_collector timer is sequentially called, with a timer start() and stop() for each leaf search before moving to the next leaf. The actual workflow can be referred to in below:

Non-concurrent search with 3 leaf collectors

=== INITIALIZE timer.start() ===
=== INITIALIZE timer.stop() ===
ContextIndexSearcher search()
=== BUILD_LEAF_COLLECTOR timer.start() ===
=== BUILD_LEAF_COLLECTOR timer.stop() ===
=== COLLECT timer.start() ===
...
=== COLLECT timer.start() ===
...
=== BUILD_LEAF_COLLECTOR timer.start() ===
=== BUILD_LEAF_COLLECTOR timer.stop() ===
=== COLLECT timer.start() ===
...
=== COLLECT timer.start() ===
...
=== COLLECT timer.start() ===
...
=== BUILD_LEAF_COLLECTOR timer.start() ===
=== BUILD_LEAF_COLLECTOR timer.stop() ===
=== COLLECT timer.start() ===
...
=== COLLECT timer.start() ===
...
=== COLLECT timer.start() ===
...
=== POST_COLLECTION timer.start() ===
...
=== BUILD_AGGREGATION timer.start() ===
...

However, in the concurrent search case, we encountered an AssertionError in the profile timer, as shown below.

Caused by: java.lang.AssertionError: #start call misses a matching #stop call
    at org.opensearch.search.profile.Timer.start(Timer.java:63)
    at org.opensearch.search.profile.aggregation.ProfilingLeafBucketCollector.collect(ProfilingLeafBucketCollector.java:58)
    at org.opensearch.search.aggregations.LeafBucketCollector.collect(LeafBucketCollector.java:123)
    at org.apache.lucene.search.FilterLeafCollector.collect(FilterLeafCollector.java:42)
    at org.opensearch.search.profile.query.ProfileCollector$1.collect(ProfileCollector.java:99)
    at org.apache.lucene.search.MultiCollector$MultiLeafCollector.collect(MultiCollector.java:222)
    at org.apache.lucene.search.FilterLeafCollector.collect(FilterLeafCollector.java:42)
    at org.opensearch.search.profile.query.ProfileCollector$1.collect(ProfileCollector.java:99)
    at org.apache.lucene.search.Weight$DefaultBulkScorer.scoreAll(Weight.java:305)
    at org.apache.lucene.search.Weight$DefaultBulkScorer.score(Weight.java:247)
    at org.apache.lucene.search.BulkScorer.score(BulkScorer.java:38)
    at org.opensearch.search.internal.ContextIndexSearcher.searchLeaf(ContextIndexSearcher.java:321)
    at org.opensearch.search.internal.ContextIndexSearcher.search(ContextIndexSearcher.java:286)
    at org.apache.lucene.search.IndexSearcher.lambda$search$1(IndexSearcher.java:715)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    ... 8 more

The issue arises because, in the case of concurrent search, we are making asynchronous calls to build the leaf collector, following a workflow similar to the example below.

Concurrent search with 3 slice count and 1 leaf collector per slice

=== segment_slice_1 INITIALIZE timer.start() ===
=== segment_slice_1 INITIALIZE timer.stop() ===
=== segment_slice_2 INITIALIZE timer.start() ===
=== segment_slice_2 INITIALIZE timer.stop() ===
=== segment_slice_3 INITIALIZE timer.start() ===
=== segment_slice_3 INITIALIZE timer.stop() ===
segment_slice_1 ContextIndexSearcher search()
segment_slice_2 ContextIndexSearcher search()
segment_slice_3 ContextIndexSearcher search()
=== segment_slice_1 BUILD_LEAF_COLLECTOR timer.start() ===
=== segment_slice_2 BUILD_LEAF_COLLECTOR timer.start() ===
=== segment_slice_3 BUILD_LEAF_COLLECTOR timer.start() ===
=== segment_slice_2 AssertionError on BUILD_LEAF_COLLECTOR timer start is not 0 ===
=== segment_slice_3 AssertionError on BUILD_LEAF_COLLECTOR timer start is not 0 ===
=== segment_slice_1 BUILD_LEAF_COLLECTOR timer.stop() ===
=== segment_slice_2 BUILD_LEAF_COLLECTOR timer.stop() ===
=== segment_slice_3 BUILD_LEAF_COLLECTOR timer.stop() ===
=== COLLECT timer.start() ===
...
=== COLLECT timer.start() ===
...
=== COLLECT timer.start() ===
...
=== POST_COLLECTION timer.start() ===
=== POST_COLLECTION timer.stop() ===
=== BUILD_AGGREGATION timer.start() ===
=== BUILD_AGGREGATION timer.stop() ===
=== POST_COLLECTION timer.start() ===
=== POST_COLLECTION timer.stop() ===
=== BUILD_AGGREGATION timer.start() ===
=== BUILD_AGGREGATION timer.stop() ===
=== POST_COLLECTION timer.start() ===
=== POST_COLLECTION timer.stop() ===
=== BUILD_AGGREGATION timer.start() ===
=== BUILD_AGGREGATION timer.stop() ===
...

Proposed Solution

In the concurrent search case:

  1. Add the concept of segment slice collector ID to the profileBreakdownLookup map in AggregationProfiler.java, ensuring that each segment slice collector has its own AggregationProfileBreakdown() object instead of sharing a single AggregationProfileBreakdown() object across all segment slice collectors.
  2. Introduce profiled time stats (i.e. min/max/avg) for all existing breakdown properties and time_in_nanos field.
  3. Introduce profiled count stats for the existing build_leaf_collector_count and collect_count properties.
  4. The existing profiled time properties will reflect the total elapsed time across all slices (i.e. max(slice_end_time) - min(slice_start_time))

And we will maintain the same profile as today in the non-concurrent search case.

Sample query response

Non-concurrent search

{
   ...
    "profile":
    {
        "shards":
        [
            {
                "id": "[b7eRKgowR_2F59UyPr0EJw][index][0]",
                "inbound_network_time_in_millis": 0,
                "outbound_network_time_in_millis": 0,
                "searches": [...],
                "aggregations":
                [
                    {
                        "type": "GlobalOrdinalsStringTermsAggregator.LowCardinality",
                        "description": "f",
                        "time_in_nanos": 2072751,
                        "breakdown":
                        {
                            "reduce": 0,
                            "post_collection_count": 1,
                            "build_leaf_collector": 432042,
                            "build_aggregation": 1550125,
                            "build_aggregation_count": 1,
                            "build_leaf_collector_count": 1,
                            "post_collection": 36167,
                            "initialize": 12250,
                            "initialize_count": 1,
                            "reduce_count": 0,
                            "collect": 42167,
                            "collect_count": 20
                        },
                        "debug": {...}
                    }
                ]
            }
        ]
    }
}

Concurrent search with 3 segment slices

{
   ...
    "profile":
    {
        "shards":
        [
            {
                "id": "[Nbk7cN5jRuekkKVtns9SKw][index][0]",
                "inbound_network_time_in_millis": 0,
                "outbound_network_time_in_millis": 0,
                "searches": [...],
                "aggregations":
                [
                    {
                        "type": "GlobalOrdinalsStringTermsAggregator.LowCardinality",
                        "description": "f",
                        "time_in_nanos": 3235999,
                        "max_slice_time_in_nanos": 1702416,
                        "min_slice_time_in_nanos": 663126,
                        "avg_slice_time_in_nanos": 1030583,
                        "breakdown":
                        {
                            "initialize": 29439,
                            "max_initialize": 17667,
                            "min_initialize": 1959,
                            "avg_initialize": 9813,
                            "initialize_count": 3,
                            "build_leaf_collector": 1602612,
                            "max_build_leaf_collector": 632367,
                            "min_build_leaf_collector": 436042,
                            "avg_build_leaf_collector": 534204,
                            "build_leaf_collector_count": 3,
                            "max_build_leaf_collector_count": 1,
                            "min_build_leaf_collector_count": 1,
                            "avg_build_leaf_collector_count": 1,
                            "collect": 103785,
                            "max_collect": 45747,
                            "min_collect": 23443,
                            "avg_collect": 34595,
                            "collect_count": 21,
                            "max_collect_count": 9,
                            "min_collect_count": 5,
                            "avg_collect_count": 7,
                            "post_collection": 67248,
                            "max_post_collection": 22417,
                            "min_post_collection": 22415,
                            "avg_post_collection": 22416,
                            "post_collection_count": 3,
                            "build_aggregation": 1548187,
                            "max_build_aggregation": 1008750,
                            "min_build_aggregation": 23375,
                            "avg_build_aggregation": 516062,
                            "build_aggregation_count": 3,
                            "reduce": 0,
                            "reduce_count": 0
                        },
                        "debug": {...}
                    }
                ]
            }
        ]
    }
}

Alternatives

Expand aggregation stats across all slices with concurrent execution

Instead of exposing min/max/avg stats across segment slices execution of a collector manager, we list out all the collectors response from the collector manager.

Sample query response

Concurrent search with 3 segment slices

...
    "aggregations":
    [
        {
            "type": "GlobalOrdinalsStringTermsAggregator.LowCardinality",
            "description": "f",
            "time_in_nanos": 1702416,
            "breakdown":
            {
                "reduce": 0,
                "post_collection_count": 1,
                "build_leaf_collector": 631083,
                "build_aggregation": 1008750,
                "build_aggregation_count": 1,
                "build_leaf_collector_count": 1,
                "post_collection": 26458,
                "initialize": 17667,
                "initialize_count": 1,
                "reduce_count": 0,
                "collect": 18458,
                "collect_count": 4
            },
            "debug": {...}
        },
        {
            "type": "GlobalOrdinalsStringTermsAggregator.LowCardinality",
            "description": "f",
            "time_in_nanos": 663126,
            "breakdown":
            {
                "reduce": 0,
                "post_collection_count": 1,
                "build_leaf_collector": 621375,
                "build_aggregation": 23375,
                "build_aggregation_count": 1,
                "build_leaf_collector_count": 1,
                "post_collection": 8375,
                "initialize": 1959,
                "initialize_count": 1,
                "reduce_count": 0,
                "collect": 8042,
                "collect_count": 2
            },
            "debug": {...}
        },
        {
            "type": "GlobalOrdinalsStringTermsAggregator.LowCardinality",
            "description": "f",
            "time_in_nanos": 726208,
            "breakdown":
            {
                "reduce": 0,
                "post_collection_count": 1,
                "build_leaf_collector": 681291,
                "build_aggregation": 14458,
                "build_aggregation_count": 1,
                "build_leaf_collector_count": 1,
                "post_collection": 13042,
                "initialize": 1292,
                "initialize_count": 1,
                "reduce_count": 0,
                "collect": 16125,
                "collect_count": 1
            },
            "debug": {...}
        }
    ]
...

@ticheng-aws
Copy link
Contributor Author

Tagging @sohami @yigithub @jed326 @andrross @reta for feedback. Thank you.

@reta
Copy link
Collaborator

reta commented Jun 30, 2023

@ticheng-aws thank you, makes perfect sense

Add the concept of segment slice collector ID to the profileBreakdownLookup

Definitely see the need to distinguish instances of the same aggregators (since will have same path I assume), but how this slice collector ID is going to be generated? May be we could have aggregator ID or even not use path but map by instance?

Expand aggregation stats across all slices with concurrent execution

This is really difficult call to make, I suspect for large indices the response could be huge. From pragmatic perspective, I think the min/max/avg could be a good start.

@sohami
Copy link
Collaborator

sohami commented Jul 7, 2023

@ticheng-aws thank you, makes perfect sense

Add the concept of segment slice collector ID to the profileBreakdownLookup

Definitely see the need to distinguish instances of the same aggregators (since will have same path I assume), but how this slice collector ID is going to be generated? May be we could have aggregator ID or even not use path but map by instance?

+1 on using instance in the map itself. This should be fine since for each slice a different instance object will be created.

Expand aggregation stats across all slices with concurrent execution

This is really difficult call to make, I suspect for large indices the response could be huge. From pragmatic perspective, I think the min/max/avg could be a good start.

I also think having a summary with min/max/avg should be a good starting point. Having the profile per slice can be very verbose depending on the slice count.

I will also recommend to call out the count stats for which min/max/avg will be computed vs the one for which only total will be computed. The ones with min/max/avg are where per slice operation is performed and can be different for different slices. Like for collect_count there can be different number of documents collected per slice so min/max/avg will be added whereas for initialize_count it will be 1 per slice so only total will be computed. This will be useful for the updating the documentation later.

@ticheng-aws
Copy link
Contributor Author

@ticheng-aws thank you, makes perfect sense

Add the concept of segment slice collector ID to the profileBreakdownLookup

Definitely see the need to distinguish instances of the same aggregators (since will have same path I assume), but how this slice collector ID is going to be generated? May be we could have aggregator ID or even not use path but map by instance?

+1 on using instance in the map itself. This should be fine since for each slice a different instance object will be created.

Thank you @reta and @sohami for the review. Utilizing instances in the map itself sounds like a good approach. Please find the implementation in the PR #8801

Expand aggregation stats across all slices with concurrent execution

This is really difficult call to make, I suspect for large indices the response could be huge. From pragmatic perspective, I think the min/max/avg could be a good start.

I also think having a summary with min/max/avg should be a good starting point. Having the profile per slice can be very verbose depending on the slice count.

Agreed, we can definitely do that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request
Projects
Status: Done
4 participants