Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrent Segment Search] support forceTermination for terminate_after parameter #8371

Open
jed326 opened this issue Jun 29, 2023 · 34 comments · Fixed by #10200
Open

[Concurrent Segment Search] support forceTermination for terminate_after parameter #8371

jed326 opened this issue Jun 29, 2023 · 34 comments · Fixed by #10200
Labels
backlog distributed framework enhancement Enhancement or improvement to existing feature or request

Comments

@jed326
Copy link
Collaborator

jed326 commented Jun 29, 2023

As a follow-up to #8306, we need to add support for forceTermination in the concurrent search case. Creating a new issue to track this as we need to track the number of docs across the threads and the implementation will require some discussion.

Describe the solution you'd like
forceTermination should be properly supported in concurrent segment search.

@jed326 jed326 added enhancement Enhancement or improvement to existing feature or request untriaged labels Jun 29, 2023
@jed326 jed326 changed the title Concurrent Segment Search should support forceTermination [Concurrent Segment Search] should support terminate_after forceTermination Jul 5, 2023
@jed326
Copy link
Collaborator Author

jed326 commented Jul 5, 2023

Some more background:

With #8306 we will properly supporting the timeout and cancel_after_time_interval search parameters. This goal of this issue is to properly support the terminate_after workflow.

In the non-concurrent case today, if the terminate_after parameter is set to not default

if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
// add terminate_after before the filter collectors
// it will only be applied on documents accepted by these filter collectors
collectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter()));
// this collector can filter documents during the collection
hasFilterCollector = true;
}

then an EarlyTerminatingCollector is used which will throw an EarlyTerminationException once the specified number of docs have been collected.

if (numCollected >= maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
} else {
throw new CollectionTerminatedException();
}
}

In the concurrent search case, we will not throw the EarlyTerminationException and instead throw CollectionTerminatedException

@Override
public EarlyTerminatingCollector newCollector() throws IOException {
return new EarlyTerminatingCollector(manager.newCollector(), maxCountHits, false /* forced termination is not supported */);
}

Which we can think of as a "soft" termination. collect will still be called for all the doc and the search request will not immediately exit out due to the EarlyTerminationException. In order to keep this behavior the same between the concurrent and non-concurrent cases, we will need a way to track the number of collected documents across all the threads and subsequently throw the EarlyTerminationException once the sum of all docs collected reaches the specified doc count.

@yigithub yigithub assigned yigithub and jed326 and unassigned yigithub Jul 5, 2023
@jed326
Copy link
Collaborator Author

jed326 commented Jul 5, 2023

Some potential solutions

1. Synchronize count across threads

We can use an AtomicInteger to synchronize the collected document count across all of the threads. However, numCollected is updated on each LeafCollector::collect call, so every single concurrent segment search thread will need to both read and update the document count at the same time.

@Override
public void collect(int doc) throws IOException {
if (++numCollected > maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
} else {
throw new CollectionTerminatedException();
}
}
super.collect(doc);
}
};

We would need to do some benchmarking to see how this impacts performance, but intuitively this seems like not-insignificant overhead being introduced.

2. Keep "soft" termination behavior that exists today

Continue to throw CollectionTerminatedException, which will stop collecting docs after maxCountHits is reached for a given collector. All collectors will collect docs until maxCountHits in this scenario. This solution is what would happen if we do not make any changes.

3. Change (2) to do a "hard" termination.

Once one collector hits maxCountHits, stop all other collectors even if they have not hit maxCountHits then. This solution gets us some force termination behavior, however I don't think the behavior is very intuitive for the user.

4. Disallow terminate_after option when concurrent segment search is being used.

We could make concurrent segment search concretely not support forceTermination, at least for now, and revisit this at a later point as needed.


In terms of functionality I think solution 1 is best as it keeps the behavior the same between concurrent and non-concurrent search use cases. However, I am concerned about the potential performance hit doing such synchronization.

@jed326
Copy link
Collaborator Author

jed326 commented Jul 5, 2023

Tagging @reta @sohami @andrross for some feedback/thoughts. Thanks!

@andrross
Copy link
Member

andrross commented Jul 7, 2023

In terms of functionality I think solution 1 is best as it keeps the behavior the same between concurrent and non-concurrent search use cases. However, I am concerned about the potential performance hit doing such synchronization.

I agree with your analysis here. Any idea what the synchronization overhead might be?

I think option 4 is okay because it can be temporary and terminate_after can be added later if users need it. I'm also not totally opposed to option 2 where we essentially add the caveat that in the concurrent case terminate_after is per concurrent slice, not total count (I'm less sure this is a good idea). I think option 3 is my least preferred for the reason you cited.

There is potentially an option 1a where we don't keep an exact count (perhaps by updating the synchronized count every 10 or 100 or 1000 documents or whatever) in order to reduce the synchronization burden.

@reta
Copy link
Collaborator

reta commented Jul 7, 2023

The additional synchronization has to be avoided at all costs - it kills the whole concurrent part of the search, however Apache Lucene does that using fe TopScoreDocCollector::createSharedManager / hitsThresholdChecker, may we could leverage that.

@jed326
Copy link
Collaborator Author

jed326 commented Jul 7, 2023

Apache Lucene does that using fe TopScoreDocCollector::createSharedManager / hitsThresholdChecker, may we could leverage that.

It looks like hitsThresholdChecker is implemented with an AtomicLong, which is basically the same as what I'm describing in solution 1: https://github.com/apache/lucene/blob/main/lucene/core/src/java/org/apache/lucene/search/HitsThresholdChecker.java#L26

I agree that synchronizing the doc count for each doc does kind of negate the concurrency part, but that would only be for the case where the terminate_after parameter is used. If terminate_after is not included in the request then we can skip the synchronization portions. I suppose I could do some benchmarking here to get a sense of the overhead here, but intuitively incrementing a global docs counter on each doc collected means we aren't collecting docs concurrently anymore so I don't think benchmarking results are really valuable at the moment.

That being said, I'm definitely inclined towards solution 2 right now and we can pick up this issue at a later date if necessary.

@reta
Copy link
Collaborator

reta commented Jul 7, 2023

That being said, I'm definitely inclined towards solution 2 right now and we can pick up this issue at a later date if necessary.

I think it makes sense, once we see concurrent search being used widely (hopefully), we could get back to this issue

@andrross
Copy link
Member

andrross commented Jul 7, 2023

That being said, I'm definitely inclined towards solution 2 right now and we can pick up this issue at a later date if necessary.

I think it makes sense, once we see concurrent search being used widely (hopefully), we could get back to this issue

The potential problem with solution 2 is that we would be codifying a behavior for terminate_after and changing that behavior to solution 1 at a later point would potentially be a breaking change. Solution 4 (fail with an "unsupported" error for terminate_after) is probably the better way to defer this work.

@reta
Copy link
Collaborator

reta commented Jul 7, 2023

The potential problem with solution 2 is that we would be codifying a behavior for terminate_after and changing that behavior to solution 1 at a later point would potentially be a breaking change.

To be fair, I don't think there are any risks here, right now we just do more work BUT we soft-limit the results. The outcome of the "hard" (to be implemented in the future) and "soft" flows should be exactly the same from the user facing perspective.

@sohami
Copy link
Collaborator

sohami commented Jul 7, 2023

@reta @jed326 There may be some change from user perspective. For example: if the terminate_after is set to 100. Now each slice may hit < 100 docs however sum of the documents collected across slices may be > 100. So in that case, we will still say that the request is not early terminated (as query result flag of early termination will not be set). Whereas with sequential flow, the flag will be set in the response. I guess @andrross is also pointing to same behavior as breaking change ? I think we will need to handle this with concurrent model, probably in reduce phase ?

The other cases, where a single collector hits the threshold is probably where user facing behavior will not change but in backend more work will be done and query may take longer to complete compared to sequential path. We can improve that later by using one of the option above but probably we will need to atleast document it that with concurrent path terminate_after is considered for each slices instead of at shard level ?

@jed326
Copy link
Collaborator Author

jed326 commented Jul 7, 2023

@sohami Good callout! I do think that if we are documenting that terminate_after is going to be for each slice instead of each shard in the concurrent case, then it's more logically consistent for the first scenario you described to not return as early termination.

In my opinion if we say that terminate_after is at a slice level, then the expectation is set that terminate_after doesn't work the same as in the non-concurrent case and it's actually more confusing for the user, or at least a new user (as opposed to a user who is replacing their non-concurrent search workflow with concurrent search), that the first scenario you described returns as terminated early.

On the other hand, just looking at the code alone the collectors aren't terminating early in that case, so it's also confusing from the developer perspective to add a check and change the flag to terminatedEarly is true in the reduce phase.

In short I think we would be adding some unintuitive and confusing behaviors here in order to make it seem like the same query is terminating early in the concurrent vs non-conurrent search cases and in that case I would prefer to go with solution 4 and return some sort of validation error if terminate_after is used with concurrent segment search enabled.

@sohami
Copy link
Collaborator

sohami commented Jul 8, 2023

I do think that if we are documenting that terminate_after is going to be for each slice instead of each shard in the concurrent case, then it's more logically consistent for the first scenario you described to not return as early termination.

I realized the same thing just after commenting

Taking a step back, I think the terminate_after is an intent from user to return result fast in case of big indices where there may be lot of matches and it takes time to find all because of sequential nature of the search. With concurrent search, given the work is happening in parallel treating it per slice and terminating the individual slices when the limit is hit is still going to preserve that user intent. If all the slices completes executing before hitting the limit then presumably the search is completed faster even though total documents collected across slices (not returned in the response) are still more than the limit. Whereas if index is big such that each slice has to do enough work with potential to go beyond the limit then putting this limit at slice level will help to return the results faster.
One can view the current non-concurrent model as a special case of slice = 1 and treating this parameter at slice level will have same behavior in concurrent vs current non-concurrent model for that case. I don't think we will need to preserve this exact behavior for slice > 1 case as well (as that is a new behavior). So I am leaning back to Option 2 here with change in documentation for parameter being at slice level and having soft limit for termination.

@reta
Copy link
Collaborator

reta commented Jul 9, 2023

Thanks @sohami and @jed326 , I agree with the conclusion. Also, we probably should keep in mind that the concurrent search would never produce the same end result sets (in general, but could look consistent sometimes) even for the same query when terminate_after is set: the concurrent part is obviously the one reason, but the shared searcher pool is another one - each query competes for execution (even sequential search have some flaws [1]).

Taking a step back, I think the terminate_after is an intent from user to return result fast in case of big indices where there may be lot of matches and it takes time to find all because of sequential nature of the search. With concurrent search, given the work is happening in parallel treating it per slice and terminating the individual slices when the limit is hit is still going to preserve that user intent.

Exactly, this is why I see no risks here (if implementation changes in future) - we will preserve the intent no matter what.

[1] elastic/elasticsearch#82995

@jed326
Copy link
Collaborator Author

jed326 commented Jul 10, 2023

@reta @sohami @andrross thanks for the discussion on this! It looks like our consensus has landed on solution 2, which won't require any changes right now. To recap solution 2 -- in each leaf slice after maxCountHits documents have been collected we will begin to short circuit collecting the documents in said leaf slice and throw CollectionTerminatedException.

@Override
public void collect(int doc) throws IOException {
if (++numCollected > maxCountHits) {
earlyTerminated = true;
if (forceTermination) {
throw new EarlyTerminationException("early termination [CountBased]");
} else {
throw new CollectionTerminatedException();
}
}
super.collect(doc);
}

This exception is swallowed in ContextIndexSearcher causing us to continue to the next leaf.

if (bulkScorer != null) {
try {
bulkScorer.score(leafCollector, liveDocs);
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
}
}

I will take a note on opensearch-project/documentation-webstie#2662 for the behavior update to treating terminate_after as a per leaf slice count. As for this issue I think we can leave it open as a backlog issue to be picked up at a later date.

@andrross
Copy link
Member

Exactly, this is why I see no risks here (if implementation changes in future) - we will preserve the intent no matter what.

@reta Thanks, I missed the nuance that the results will be more or less the same, just that the system may do more work than is optimal. I agree that this can change in the future to be more efficient and is a backward compatible change as long as we preserve the intent. I'm on board with solution 2 as @jed326 has described.

@jed326 jed326 changed the title [Concurrent Segment Search] should support terminate_after forceTermination [Concurrent Segment Search] support forceTermination for terminate_after parameter Jul 14, 2023
@sohami
Copy link
Collaborator

sohami commented Sep 18, 2023

Do you think the different hits count meaningfully changes the user experience here if the user intent of terminate_after is to return a subset of results quickly and if the precise hits count can still be limited by the size parameter?

I think we could document that as a limitation that when using concurrent search, the over-fetch could happen and the total may not be accurate.

@reta it is not only about over fetching and doing more work in the backend but also returning more hits to the users. In cases like Aggs the collected count will be different. Given the results will vary with concurrent and non-concurrent path and terminate_after is hinting to not include docs after a particular hit count I think it will be confusing to user. So by default, if we keep the user experience same by enforcing force termination and explicitly let user choose the soft termination behavior (as needed and with documenting the behavior change), that will be a much better experience and create less confusion.

Definitely we will also get the results from the benchmark which @jed326 is planning to run to see how much is the overhead. If the performance is too bad and shows poor performance as compared to non-concurrent flow then we can decide to disable concurrent flow for terminate_after cases.

@jed326
Copy link
Collaborator Author

jed326 commented Sep 19, 2023

@reta @sohami thanks for the discussion! Going to delay the benchmarking a little bit while I look into the issues described in #10054 to make sure the benchmarking results I come back with are accurate.

@jed326
Copy link
Collaborator Author

jed326 commented Sep 20, 2023

Hey @reta @sohami wanted to give an update on my investigation into terminate_after behavior. Basically I found 4 problems that need to be addressed:

1. NPE when size=0track_total_hits=true, and terminate_after are used together. Details

2.track_total_hits=true does not correctly track the total hits if terminate_after is used. Details

curl -X GET "---/_search?size=0&track_total_hits=true&pretty"  
{  
  "took" : 1,  
  "timed_out" : false,  
  "terminated_early" : true,  
  "_shards" : {  
    "total" : 1,  
    "successful" : 1,  
    "skipped" : 0,  
    "failed" : 0  
  },  
  "hits" : {  
    "total" : {  
      "value" : 165346692,  
      "relation" : "eq"  
    },  
    "max_score" : null,  
    "hits" : [ ]  
  }  
}  
curl -X GET "---/_search?size=1&track_total_hits=true&terminate_after=1&pretty"  
{  
  "took" : 1,  
  "timed_out" : false,  
  "terminated_early" : true,  
  "_shards" : {  
    "total" : 1,  
    "successful" : 1,  
    "skipped" : 0,  
    "failed" : 0  
  },  
  "hits" : {  
    "total" : {  
      "value" : 1,  
      "relation" : "gte"  
    },  
    "max_score" : 1.0,  
    "hits" : [  
      {  
        ...  
      }  
    ]  
  }  
}

This is because of the change here to simulate soft termination in the results: 

// Since we cannot support early forced termination, we have to simulate it by
// artificially reducing the number of total hits and doc scores.
ScoreDoc[] scoreDocs = topDocs.scoreDocs;
if (terminatedAfter != null) {
if (totalHits.value > terminatedAfter) {
totalHits = new TotalHits(terminatedAfter, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
}
if (scoreDocs != null && scoreDocs.length > terminatedAfter) {
scoreDocs = Arrays.copyOf(scoreDocs, terminatedAfter);
}
}

There is an edge case this doesn’t cover though — if each slice has < terminate_after documents then we won’t go into this code block during reduce. See example below:

curl -X GET "localhost:9200/my-index-000001/_search?pretty&terminate_after=1"                                                                         
{  
  "took" : 3,  
  "timed_out" : false,  
  "terminated_early" : false,  
  "_shards" : {  
    "total" : 1,  
    "successful" : 1,  
    "skipped" : 0,  
    "failed" : 0  
  },  
  "hits" : {  
    "total" : {  
      "value" : 2,  
      "relation" : "eq"  
    },  
    "max_score" : 1.0,  
    "hits" : [  
      {  
        "_index" : "my-index-000001",  
        "_id" : "4yhLr4oBXBfDW6cmIi_l",  
        "_score" : 1.0,  
        "_source" : {  
          "@timestamp" : "2099-11-15T13:12:00",  
          "message" : "GET /search HTTP/1.1 200 1070000",  
          "user" : {  
            "id" : "kimchy"  
          }  
        }  
      },  
      {  
        "_index" : "my-index-000001",  
        "_id" : "5ChLr4oBXBfDW6cmSy8F",  
        "_score" : 1.0,  
        "_source" : {  
          "@timestamp" : "2099-11-15T13:12:00",  
          "message" : "GET /search HTTP/1.1 200 1070000",  
          "user" : {  
            "id" : "dengjay"  
          }  
        }  
      }  
    ]  
  }  
}

There is actually an assertion in SearchPhaseController::getTotalHits that is supposed to cover this scenario but given that we haven't seen this pop up with concurrent search test parameterization it means we're most likely missing coverage here.

} else if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_ACCURATE) {
assert totalHitsRelation == Relation.EQUAL_TO;
return new TotalHits(totalHits, totalHitsRelation);
} else {

3. illegal_argument_exception when size=0 and track_total_hits=true are used for concurrent aggs. Details

curl -X GET "opens-clust-1gj8zaf4fng1b-d6fa7bd00441ed0d.elb.us-east-1.amazonaws.com/_search?track_total_hits=true&pretty" -H 'Content-Type: application/json' -d'  
{  
            "size": 0,  
        "query": {  
          "bool": {  
            "filter": {  
              "range": {  
                "trip_distance": {  
                  "lt": 50,  
                  "gte": 0  
                }  
              }  
            }  
          }  
        },  
        "aggs": {  
          "distance_histo": {  
            "histogram": {  
              "field": "trip_distance",  
              "interval": 1  
            },  
            "aggs": {  
              "total_amount_stats": {  
                "stats": {  
                  "field": "total_amount"  
                }  
              }  
            }  
          }  
        }  
}'  
{  
  "error" : {  
    "root_cause" : [  
      {  
        "type" : "illegal_argument_exception",  
        "reason" : "Collector managers should all be non-null"  
      }  
    ],  
    "type" : "search_phase_execution_exception",  
    "reason" : "all shards failed",  
    "phase" : "query",  
    "grouped" : true,  
    "failed_shards" : [  
      {  
        "shard" : 0,  
        "index" : "nyc_taxis",  
        "node" : "K7DzKxU4Tyin-01SQBj-7A",  
        "reason" : {  
          "type" : "illegal_argument_exception",  
          "reason" : "Collector managers should all be non-null"  
        }  
      }  
    ],  
    "caused_by" : {  
      "type" : "illegal_argument_exception",  
      "reason" : "Collector managers should all be non-null",  
      "caused_by" : {  
        "type" : "illegal_argument_exception",  
        "reason" : "Collector managers should all be non-null"  
      }  
    }  
  },  
  "status" : 400  
}

This is the same issue as [1] above, the exception is being handled by MultiCollectorManager in Lucene here instead of failing as an NPE.

4. terminate_after has no effect for concurrent aggs

Concurrent Search Enabled:

curl -X GET "---/_search?track_total_hits=true&terminate_after=1&pretty" -H 'Content-Type: application/json' -d'  
{  
            "size": 1,  
        "query": {  
          "bool": {  
            "filter": {  
              "range": {  
                "trip_distance": {  
                  "lt": 50,  
                  "gte": 0  
                }  
              }  
            }  
          }  
        },  
        "aggs": {  
          "distance_histo": {  
            "histogram": {  
              "field": "trip_distance",  
              "interval": 1  
            },  
            "aggs": {  
              "total_amount_stats": {  
                "stats": {  
                  "field": "total_amount"  
                }  
              }  
            }  
          }  
        }  
}'  
{  
  "took" : 7345,  
  "timed_out" : false,  
  "terminated_early" : true,  
  "_shards" : {  
    "total" : 1,  
    "successful" : 1,  
    "skipped" : 0,  
    "failed" : 0  
  },  
  "hits" : {  
    "total" : {  
      "value" : 1,  
      "relation" : "gte"  
    },  
    "max_score" : 0.0,  
    "hits" : [  
      {  
        "_index" : "nyc_taxis",  
        "_id" : "47sxkYoBYW-wfCAOkuIQ",  
        "_score" : 0.0,  
        "_source" : {  
          "payment_type" : "1",  
          "rate_code_id" : "1",  
          "tip_amount" : 1.5,  
          "tolls_amount" : 0.0,  
          "extra" : 0.5,  
          "passenger_count" : 1,  
          "pickup_location" : [  
            -74.00631713867188,  
            40.733638763427734  
          ],  
          "dropoff_datetime" : "2015-01-07 21:15:13",  
          "trip_distance" : 0.43,  
          "store_and_fwd_flag" : "N",  
          "total_amount" : 8.3,  
          "fare_amount" : 5.5,  
          "pickup_datetime" : "2015-01-07 21:08:53",  
          "dropoff_location" : [  
            -74.00151062011719,  
            40.73076248168945  
          ],  
          "mta_tax" : 0.5,  
          "vendor_id" : "2",  
          "improvement_surcharge" : 0.3  
        }  
      }  
    ]  
  },  
  "aggregations" : {  
    "distance_histo" : {  
      "buckets" : [  
        {  
          "key" : 0.0,  
          "doc_count" : 37826898,  
          "total_amount_stats" : {  
            "count" : 37826898,  
            "min" : -499.0,  
            "max" : 989970.39,  
            "avg" : 7.954326743102223,  
            "sum" : 3.0088750637E8  
          }  
        },  
        {  
          "key" : 1.0,  
          "doc_count" : 54261042,  
          "total_amount_stats" : {  
            "count" : 54261042,  
            "min" : -69.7,  
            "max" : 650262.85,  
            "avg" : 10.610401890365468,  
            "sum" : 5.7573146261E8  
          }  
        },

Concurrent Search Disabled

curl -X GET "---/_search?track_total_hits=true&terminate_after=1&pretty" -H 'Content-Type: application/json' -d'  
{  
            "size": 1,  
        "query": {  
          "bool": {  
            "filter": {  
              "range": {  
                "trip_distance": {  
                  "lt": 50,  
                  "gte": 0  
                }  
              }  
            }  
          }  
        },  
        "aggs": {  
          "distance_histo": {  
            "histogram": {  
              "field": "trip_distance",  
              "interval": 1  
            },  
            "aggs": {  
              "total_amount_stats": {  
                "stats": {  
                  "field": "total_amount"  
                }  
              }  
            }  
          }  
        }  
}'  
{  
  "took" : 2,  
  "timed_out" : false,  
  "terminated_early" : true,  
  "_shards" : {  
    "total" : 1,  
    "successful" : 1,  
    "skipped" : 0,  
    "failed" : 0  
  },  
  "hits" : {  
    "total" : {  
      "value" : 1,  
      "relation" : "eq"  
    },  
    "max_score" : 0.0,  
    "hits" : [  
      {  
        "_index" : "nyc_taxis",  
        "_id" : "47sxkYoBYW-wfCAOkuIQ",  
        "_score" : 0.0,  
        "_source" : {  
          "payment_type" : "1",  
          "rate_code_id" : "1",  
          "tip_amount" : 1.5,  
          "tolls_amount" : 0.0,  
          "extra" : 0.5,  
          "passenger_count" : 1,  
          "pickup_location" : [  
            -74.00631713867188,  
            40.733638763427734  
          ],  
          "dropoff_datetime" : "2015-01-07 21:15:13",  
          "trip_distance" : 0.43,  
          "store_and_fwd_flag" : "N",  
          "total_amount" : 8.3,  
          "fare_amount" : 5.5,  
          "pickup_datetime" : "2015-01-07 21:08:53",  
          "dropoff_location" : [  
            -74.00151062011719,  
            40.73076248168945  
          ],  
          "mta_tax" : 0.5,  
          "vendor_id" : "2",  
          "improvement_surcharge" : 0.3  
        }  
      }  
    ]  
  },  
  "aggregations" : {  
    "distance_histo" : {  
      "buckets" : [  
        {  
          "key" : 0.0,  
          "doc_count" : 1,  
          "total_amount_stats" : {  
            "count" : 1,  
            "min" : 8.3,  
            "max" : 8.3,  
            "avg" : 8.3,  
            "sum" : 8.3  
          }  
        }  
      ]  
    }  
  }  
}

terminate_after depends on forceTermination being true to terminate Aggregators. This is because the MultiCollector swallows the soft termination CollectionTerminatedException and only rethrows it if all collectors have terminated, so during soft termination once the EarlyTerminatingCollector terminates the Aggregator will keep on collecting. See https://github.com/apache/lucene/blob/main/lucene/core/src/java/org/apache/lucene/search/MultiCollector.java#L214-L233.


Solutions:

[1],[3] are tracked in #10054
[2],[4] are both consequences of how terminate_after's soft termination is implemented today and would be fixed by switching over to force termination.

So for the following reasons I think the best solution is force terminate concurrent search by default, if necessary provide an option for the user to soft terminate:

  1. The behavior from problem [2] as is makes the track_total_hits=true not always give an eq relation which is a behavior change. We certainly could make the total hits still accurate in the soft termination case but in general I don't like the idea of fudging the hit count for it to be consistent. Moreover, this fix only applies to the TopDocs collector and all the doc counts for other collectors (ie Aggregators) would still be wrong.
  2. As seen in problem [4] the way that soft termination exists today does not support early terminating aggs (regardless of if it's concurrent search or not).
  3. The GlobalHitsThresholdChecker used by TopScoreDocCollector is already synchronizing the doc count across threads. Moreover, any contention will only happen when matching docs are found at the same time and even then the only part that is synchronized is the doc count check, the rest of the doc collection process can still be parallelized.
  4. Keep our goal of maintaining the exact same search results for both concurrent and non concurrent search cases.

That being said, I will still follow up with the benchmarking results since I don't think any of the issues identified here will have an impact on result accuracy. From those results we can determine if we should disable concurrent search for terminate_after workflow and/or if a soft termination option is necessary.

@jed326
Copy link
Collaborator Author

jed326 commented Sep 26, 2023

I have some benchmark numbers to discuss now, but before jumping into that I want to clarify some terminology we're using so that we're all on the same page. There are 2 things we need to look at here, which I will call force termination and soft/hard termination.

  • force termination refers to the exception that is thrown by EarlyTerminatingCollector and how that exception is subsequently handled:
    if (forceTermination) {
    throw new EarlyTerminationException("early termination [CountBased]");
    } else {
    throw new CollectionTerminatedException();
    }
  • soft/hard termination refers to whether or not we synchronize the doc count across threads, so hard termination refers to terminate_after collecting the exact count of documents while soft termination refers to some overfetching that can occur when we do not synchronize the doc count today.

That being said, we can call the current behavior of terminate_after that exists on main today "No Force Soft Termination". As explained in problem 4 in #8371 (comment) above, the soft termination part of this behavior is incorrect and will not early terminate any of the other operators in the collector tree, including (but not limited to) any aggregators. Therefore I want to drive this discussion towards whether we should adopt Force Hard Termination or Force Soft Termination.


All benchmarking was performed on the nyc_taxis data set using OSB:

health status index     uuid                   pri rep docs.count
green  open   nyc_taxis Q93xA6nqSXCBcQmRtA5biA   1   0  165346692

Force Hard Termination changes can be found here: https://github.com/jed326/OpenSearch/tree/force-terminate_after
Force Soft Termination changes can be found here: https://github.com/jed326/OpenSearch/tree/force-terminate-unsync

First, let's look at a common use case where there is filtering in our query and we use terminate_after with an agg:

With size=1

50th Percentile Service Times in ms

date_histogram_agg-10000 date_histogram_agg-100000 date_histogram_agg-1000000
Concurrent Search Disabled 16.3908 24.3959 106.87
Force Hard Termination / 0 slice 15.1163 17.1754 59.0757
Force Hard Termination / 4 slice 18.3088 20.3513 57.8736
Force Soft Termination / 0 slice 19.5888 32.0781 127.135
Force Soft Termination / 4 slice 20.5357 20.5357 112.918

In this case our index has around 160m docs and the range query filters it down to around 10m, which we then perform the aggregation on.

With size=0

50th Percentile Service Times in ms

date_histogram_agg-10000 date_histogram_agg-100000 date_histogram_agg-1000000
Concurrent Search Disabled 4.69771 12.6467 95.0182
Force Hard Termination / 0 slice 13.1265 12.5323 45.153
Force Hard Termination / 4 slice 15.86 17.2851 47.4627
Force Soft Termination / 0 slice 20.7045 30.8375 152.661
Force Soft Termination / 4 slice 21.359 27.841 143.685

On the other hand, we can also look at the worst case scenario - a query / agg that matches all the docs in the index

50th Percentile Service Times in ms:

default-terminate-after-10000000 default-terminate-after-100000000 default-terminate-after-1000000000 distance_amount_agg-terminate-after-100000000 distance_amount_agg-terminate-after-1000000000
Concurrent Search Disabled 302.061 2931.64 4832.69 14085.7 23283.7
No Force Soft Termination / 0 slice 351.496 966.018 971.802 9043.77 9049.07
Force Hard Termination / 0 slice 201.223 2077.96 3255.5 5248.2 8634.14
Force Hard Termination / 4 slice 204.913 2108.71 3192.53 5284.95 8985.04
Force Soft Termination / 0 slice 99.7383 662.852 596.704 6578.87 6580.42

From this we can see that as terminate_after approaches larger numbers we begin to see a bigger performance hit. Since the index has ~160m docs, the 1B terminate_after case is the absolute worst case scenario where we have a synchronized doc count up for the entire index.

Full results:
workloads.txt
terminate_after_benchmark.txt


Main takeaways:

  • For very small terminate_after values non-concurrent search may outperform concurrent search, however this is already known for small data sets due to the added overhead for concurrent search.
  • Above a certain threshold, Force Hard Termination becomes more performant and users will begin to realize the benefits of using concurrent search with terminate_after.
  • In this mid-range Force Hard Termination outperforms Force Soft Termination due to the latter overfetching documents.
  • As terminate_after gets even larger, eventually Force Soft Termination becomes more performant due to synchronization costs.

With the benchmarking data in mind, I see a few possible solutions:

  1. Default to use Force Hard Termination behavior and provide the user a mechanism to use Force Soft Termination
  2. Default to non-concurrent search if terminate_after is used. I think this is a reasonable solution by itself since we typically expect smaller numbers to be used in terminate_after cases.
  3. Default to non-concurrent search if terminate_after is used and provide a configurable threshold above which Force Hard Termination behavior is used.
  4. Default to Force Hard Termination and provide a request-level mechanism for the user to determine whether or not to use concurrent search.
  5. A combination of 1 & 3 - default to Force Hard Termination and as a follow-up provide a mechanism for both Force Soft Termination and terminate_after threshold.

I do not think we should default to Force Soft Termination because the behavior could be dramatically different between the concurernt and non-concurrent cases then. For example, we would collect up to slice_count * terminate_after docs and many of the cases that are terminated_early: true would be false when moving from non-concurrent search to concurrent search. Moreover there's also problem 2 in the previous comment about how track_total_hits=true expects an eq relation every time.

My preference is solution 5 -- most immediately I think we can support Force Hard Termination as the default case and as follow-ups we can do the optimization to introduce a terminate_after threshold below which we revert to non-concurrent search and also provide a Force Soft Termination mechanism for users who are looking to further speed up large terminate_after queries.

Please let me know what you think @sohami @reta!

@sohami
Copy link
Collaborator

sohami commented Sep 26, 2023

@jed326 Thanks for the detailed writeup and sharing the perf run numbers.

For very small terminate_after values non-concurrent search may outperform concurrent search, however this is already known for small data sets due to the added overhead for concurrent search.

terminate_after is essentially limiting the matching doc which is processed as part of the request. I would expect in most of the cases, the value of this parameter to be small (probably < 100k, this is based on my intuition) since it may be used to sample the document corpus. Also size=0 use case is common for aggs. so considering that I think in most common cases we will see concurrent search with force termination to perform relatively worse than non concurrent search. For use cases where terminate_after value is very large, then I will assume those to be as good as running the query without terminate_after clause and concurrent search can be used in those cases to get the latency benefit.

I do not think we should default to Force Soft Termination because the behavior could be dramatically different between the concurernt and non-concurrent cases then. For example, we would collect up to slice_count * terminate_after docs and many of the cases that are terminated_early: true would be false when moving from non-concurrent search to concurrent search. Moreover there's also problem 2 in the previous comment about how track_total_hits=true expects an eq relation every time.

I agree Force Soft Termination seems to cause lot more issues as the behavior with track_total_hits=true and results for other operations in collector tree (like aggs) or even hits count varies w.r.t non-concurrent search. The complexity to understand these change may not be worth the latency improvement here and can be taken as a follow-up based on feedback on usage pattern from different users. Also given in some cases, force hard termination is better than non-concurrent case we may not need this improvement.

My preference is solution 5 -- most immediately I think we can support Force Hard Termination as the default case and as follow-ups we can do the optimization to introduce a terminate_after threshold below which we revert to non-concurrent search and also provide a Force Soft Termination mechanism for users who are looking to further speed up large terminate_after queries.

I agree with Option 5 with the exception of default being using non-concurrent search. Since we don't have any clear winner for terminate_after, keeping it default to non-concurrent path atleast doesn't introduce any regression. I was thinking we can probably provide a cluster setting based threshold mechanism (with default value being very high to force non-concurrent path) and if set to a value such that it is < terminate_after value in search request, it can follow the concurrent path. Ideally, the best would be to have a smartness in server side to decide when to take concurrent vs non-concurrent path based on some heuristics like terminate_after value vs total doc count. But that will require us to look into different types to workloads to come up with a good default. Setting mechanism provide flexibility in absence of that default which can vary for different workloads. I am fine to have this as follow-up again as it is perf improvement to the current behavior and not creating any regression for now. But given you already have an implementation its about introducing a new setting we can as well provide this option now. Would like to hear thoughts from @reta as well on this.

@reta
Copy link
Collaborator

reta commented Sep 26, 2023

Thanks @jed326 and @sohami , it is hard to disagree with the conclusions (based on the results). +1 to @sohami that the default path should be non-concurrent search (for terminate_after use case), that would minimize the risk of possible regressions (we have done pretty limited set of benchmarks).

But given you already have an implementation its about introducing a new setting we can as well provide this option now.

I am referring to this a few times probably, but I believe we could make the choice driven by search requests, for example something like that:

{
    "terminate_after": ...,
    "terminate_after_threshold": "exact | hard | soft"
}

Using settings for thresholds (in my opinion) are very difficult to set right, even on per index basis: it either applies to all queries or none of them, basically we ask user to either make a guess or to profile her queries extensively. With per search request tuning the user could implement heuristic herself by running the same query in concurrent / non-concurrent fashion and returning the first results (ideally, that what we possibly could do but doing more work in background could impact cluster operations).

@jed326
Copy link
Collaborator Author

jed326 commented Sep 26, 2023

@reta @sohami thanks! Defaulting to non-concurrent search case sounds good to me.

For the settings part today we have cluster settings, index settings, and request parameters, each one taking higher priority and overriding the previous. Just like how for concurrent search enable/disable we have provided both cluster and index settings, I think it would make sense to introduce a cluster or index level setting like @sohami is suggesting and as a follow-up provide the request level options to override it. This would be similar to how the request cache index setting is overridden by the request cache request parameter. How does this sound to you @reta?

@sohami
Copy link
Collaborator

sohami commented Sep 26, 2023

Using settings for thresholds (in my opinion) are very difficult to set right, even on per index basis: it either applies to all queries or none of them, basically we ask user to either make a guess or to profile her queries extensively.

@reta Picking right value for thresholds will definitely require some profiling of the workload to come up with right defaults. But in homogeneous workload case that should be doable. The intention of cluster setting (I don't think idx setting will make sense here) was to cover such homogenous cases where concurrent search could be used even with terminate_after parameter. This will atleast help to learn from user workloads where it is enabled and works well with concurrent search. Using this data probably a better mechanism in the backend can be added to dynamically choose concurrent vs non-concurrent path or even come up with defaults for this setting which works best for most of the cases.

With per search request tuning the user could implement heuristic herself by running the same query in concurrent / non-concurrent fashion and returning the first results (ideally, that what we possibly could do but doing more work in background could impact cluster operations).

Request level parameter will help in mix workloads where coming up with a default is not easy. But that will be the case for all the search request types (not only limited to terminate_after). In such cases, in absence of request level option, user can choose to either enable/disable the concurrent search at cluster level. I am treating this as more of an expert level of control. The more we learn about such use cases, we can follow-up with adding this support and either clients/server can implement request hedging mechanism (as you are suggesting) at the expense of increased work. This will require cancellation mechanism to be improved to ensure extra work can be cancelled for sure as required.

@reta
Copy link
Collaborator

reta commented Sep 26, 2023

Thanks @jed326

This would be similar to how the request cache index setting is overridden by the request cache request parameter. How does this sound to you @reta?

I am not against settings, but I would like to understand what guidance should be provided to the users for configuring them. Do you have a clear set of steps or procedure (we will have to provide the documentation anyway) on how non-expert users should pick the value for each of them? And since we are dealing with changing indices, how users could be sure those settings that they picked yesterday are still relevant today? (figuratively speaking)

@jed326
Copy link
Collaborator Author

jed326 commented Sep 26, 2023

Do you have a clear set of steps or procedure (we will have to provide the documentation anyway) on how non-expert users should pick the value for each of them?

We will definitely provide documentation, but as @sohami mentioned above the main intention of a setting is to cover some broad spectrum of cases and give the user an additional control to revert back to non-concurrent search case if they see performance regressions in their workload.

Broadly speaking, if the user is using terminate_after > the threshold and is seeing performance regressions they can increase the threshold, while if the user is using terminate_after < the threshold and would like to see if they can improve their search request time they can try increasing the threshold.

@reta
Copy link
Collaborator

reta commented Sep 26, 2023

@sohami @jed326 sure, I as mentioned - not against that if we are able guide the user to configure those, request level setting significantly reduces the usage gap and onbroading process for the user.

@github-project-automation github-project-automation bot moved this from In Progress to Done in Concurrent Search Oct 3, 2023
@jed326 jed326 reopened this Oct 3, 2023
@github-project-automation github-project-automation bot moved this from Done to In Progress in Concurrent Search Oct 3, 2023
@jed326 jed326 removed the untriaged label Oct 3, 2023
@jed326
Copy link
Collaborator Author

jed326 commented Oct 3, 2023

Keeping this issue open for now since we still want to follow up to support concurrent search for terminate_after workflow since as a part of #10200 we disabled concurrent segment search for the terminate_after path.

@jed326
Copy link
Collaborator Author

jed326 commented Oct 3, 2023

Summarizing the remaining issues below:

Based on the discussion above in #8371 (comment), we want to go forward with implementing a threshold setting above which we can use concurrent segment search for the terminate_after workflow. A pre-requisite for this is to support forceTermination so that the EarlyTerminatingCollector is ablow to properly terminate the collector tree.

Summary of the issues:

  1. The TotalHitCountCollectorManager used when size=0 with terminate_after doesn't always compute the correct TotalHits.relation in the reduce phase. This requires a deep dive to figure out what is needed to support this edge case.
  2. TopDocsCollectorContext::newTopDocs also needs to compute the correct relation. Whenever track_total_hits=true the relation needs to always be eq.
  3. CollectionTerminatedException thrown in the soft termination path is swallowed in ContexIndexSearcher and will cause other operators in the collector tree (ex. Aggregators) to not terminate collection. This is why forceTermination is needed.

A POC that addresses many of these can be found here: 3bd8fe1

@austintlee
Copy link
Contributor

The TotalHitCountCollectorManager used when size=0 with terminate_after doesn't always compute the correct TotalHits.relation in the reduce phase. This requires a deep dive to figure out what is needed to support this edge case.

With the use of Weight#count in Collectors, you can no longer guarantee EQ relation when terminate_after is set. I think we should just document this behavior. Or make the use of Weight#count optional (which might be a breaking change). This is orthogonal to concurrent search.

Are 2 & 3 caused by concurrent search? I would focus on behavior changes caused by concurrent search.

@austintlee
Copy link
Contributor

Late to the party, but just wanted to quickly chime in.

Based on the discussion above in #8371 (comment), we want to go forward with implementing a threshold setting above which we can use concurrent segment search for the terminate_after workflow.

So, this threshold is actually some kind of ratio, right? As the search space (the size of indexed data) grows, this threshold can grow, too? In general, if there are at least three segments to process, I think concurrent search will do better than non concurrent. Maybe even two. If terminate_after > numDocs of the first segment, it may be safe to go with concurrent. Do you sort the segments in any way before starting searchLeaf? If you sort from smallest to largest, you might increase the benefits of concurrent search.

@reta
Copy link
Collaborator

reta commented Oct 26, 2023

So, this threshold is actually some kind of ratio, right? As the search space (the size of indexed data) grows, this threshold can grow, too?

I think we don't know yet for sure: there are many variables (number of segments, slices, , number documents, thread pool saturation etc), we probably need many more experiments to get insights here.

Maybe even two. If terminate_after > numDocs of the first segment, it may be safe to go with concurrent.

I think the decision which path to take is made way before and it also depends on sorting and index type (data streams), but again, more experiments could help.

Do you sort the segments in any way before starting searchLeaf? If you sort from smallest to largest, you might increase the benefits of concurrent search.

We do I believe but not by size, specifically for data streams, depending on sort criteria, we could reverse the traversal.

@jed326 jed326 moved this from Todo to Done in Concurrent Search Nov 7, 2023
@jed326 jed326 removed the status in Concurrent Search Nov 7, 2023
@jed326 jed326 removed their assignment Nov 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backlog distributed framework enhancement Enhancement or improvement to existing feature or request
Projects
Status: No status
Development

Successfully merging a pull request may close this issue.

8 participants