Skip to content

Commit 44fb5ac

Browse files
gaobinlongasimmahmood1
authored andcommitted
Remove the setShardIndex parameter in CollapseTopFieldDocs.merge() (opensearch-project#19053)
* Do not set shardIndex of top docs in CollapsingTopDocsCollectorContext Signed-off-by: Binlong Gao <gbinlong@amazon.com> * Modify change log Signed-off-by: Binlong Gao <gbinlong@amazon.com> * Format code Signed-off-by: Binlong Gao <gbinlong@amazon.com> * Remove setShardIndex parameter in CollapseTopFieldDocs.merge() Signed-off-by: Binlong Gao <gbinlong@amazon.com> * Modify change log Signed-off-by: Binlong Gao <gbinlong@amazon.com> * tiny change Signed-off-by: Binlong Gao <gbinlong@amazon.com> --------- Signed-off-by: Binlong Gao <gbinlong@amazon.com>
1 parent cd2dc07 commit 44fb5ac

File tree

7 files changed

+185
-38
lines changed

7 files changed

+185
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3131
- Fix flaky tests in CloseIndexIT by addressing cluster state synchronization issues ([#18878](https://github.com/opensearch-project/OpenSearch/issues/18878))
3232
- [Tiered Caching] Handle query execution exception ([#19000](https://github.com/opensearch-project/OpenSearch/issues/19000))
3333
- Grant access to testclusters dir for tests ([#19085](https://github.com/opensearch-project/OpenSearch/issues/19085))
34+
- Fix assertion error when collapsing search results with concurrent segment search enabled ([#19053](https://github.com/opensearch-project/OpenSearch/pull/19053))
3435
- Fix skip_unavailable setting changing to default during node drop issue ([#18766](https://github.com/opensearch-project/OpenSearch/pull/18766))
3536

3637
### Dependencies

rest-api-spec/src/main/resources/rest-api-spec/test/search/110_field_collapsing.yml

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,3 +509,104 @@ setup:
509509
- match: { hits.hits.2.inner_hits.sub_hits.hits.hits.1._id: "4" }
510510
- gte: { hits.hits.2.inner_hits.sub_hits.hits.hits.1._seq_no: 0 }
511511
- gte: { hits.hits.2.inner_hits.sub_hits.hits.hits.1._primary_term: 1 }
512+
513+
---
514+
"Test field collapsing with sort":
515+
- skip:
516+
version: " - 3.2.99"
517+
reason: Fixed in 3.3.0
518+
- do:
519+
indices.create:
520+
index: test_1
521+
body:
522+
mappings:
523+
properties:
524+
sort_field: { type: integer }
525+
collapse_field: { type: integer }
526+
marker: {type: keyword}
527+
528+
- do:
529+
index:
530+
index: test_1
531+
refresh: true
532+
id: 1
533+
body: { sort_field: 1, collapse_field: 1, marker: "doc1" }
534+
- do:
535+
index:
536+
index: test_1
537+
refresh: true
538+
id: 2
539+
body: { sort_field: 1, collapse_field: 2, marker: "doc2" }
540+
- do:
541+
index:
542+
index: test_1
543+
refresh: true
544+
id: 3
545+
body: { sort_field: 1, collapse_field: 2, marker: "doc3" }
546+
547+
- do:
548+
search:
549+
index: test_1
550+
size: 2
551+
body:
552+
collapse: { field: collapse_field }
553+
sort: [{ sort_field: desc }]
554+
- match: { hits.total.value: 3 }
555+
- length: { hits.hits: 2 }
556+
- match: { hits.hits.0._id: '1' }
557+
- match: { hits.hits.0._source.marker: 'doc1' }
558+
- match: { hits.hits.1._id: '2' }
559+
- match: { hits.hits.1._source.marker: 'doc2' }
560+
561+
---
562+
"Test field collapsing with sort when concurrent segment search enabled":
563+
- skip:
564+
version: " - 3.2.99"
565+
reason: Fixed in 3.3.0
566+
- do:
567+
indices.create:
568+
index: test_1
569+
body:
570+
mappings:
571+
properties:
572+
sort_field: { type: integer }
573+
collapse_field: { type: integer }
574+
marker: {type: keyword}
575+
576+
- do:
577+
index:
578+
index: test_1
579+
refresh: true
580+
id: 1
581+
body: { sort_field: 1, collapse_field: 1, marker: "doc1" }
582+
- do:
583+
index:
584+
index: test_1
585+
refresh: true
586+
id: 2
587+
body: { sort_field: 1, collapse_field: 2, marker: "doc2" }
588+
- do:
589+
index:
590+
index: test_1
591+
refresh: true
592+
id: 3
593+
body: { sort_field: 1, collapse_field: 2, marker: "doc3" }
594+
- do:
595+
indices.put_settings:
596+
index: test_1
597+
body:
598+
index.search.concurrent_segment_search.mode: 'all'
599+
600+
- do:
601+
search:
602+
index: test_1
603+
size: 2
604+
body:
605+
collapse: { field: collapse_field }
606+
sort: [{ sort_field: desc }]
607+
- match: { hits.total.value: 3 }
608+
- length: { hits.hits: 2 }
609+
- match: { hits.hits.0._id: '1' }
610+
- match: { hits.hits.0._source.marker: 'doc1' }
611+
- match: { hits.hits.1._id: '2' }
612+
- match: { hits.hits.1._source.marker: 'doc2' }

server/src/main/java/org/apache/lucene/search/grouping/CollapseTopFieldDocs.java

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.core.common.util.CollectionUtils;
4444

4545
import java.util.ArrayList;
46+
import java.util.Comparator;
4647
import java.util.HashSet;
4748
import java.util.List;
4849
import java.util.Set;
@@ -55,6 +56,14 @@ public final class CollapseTopFieldDocs extends TopFieldDocs {
5556
public final String field;
5657
/** The collapse value for each top doc */
5758
public final Object[] collapseValues;
59+
/** Internal comparator with shardIndex */
60+
private static final Comparator<ScoreDoc> SHARD_INDEX_TIE_BREAKER = Comparator.comparingInt(d -> d.shardIndex);
61+
62+
/** Internal comparator with docID */
63+
private static final Comparator<ScoreDoc> DOC_ID_TIE_BREAKER = Comparator.comparingInt(d -> d.doc);
64+
65+
/** Default comparator */
66+
private static final Comparator<ScoreDoc> DEFAULT_TIE_BREAKER = SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER);
5867

5968
public CollapseTopFieldDocs(String field, TotalHits totalHits, ScoreDoc[] scoreDocs, SortField[] sortFields, Object[] values) {
6069
super(totalHits, scoreDocs, sortFields);
@@ -67,55 +76,35 @@ private static final class ShardRef {
6776
// Which shard (index into shardHits[]):
6877
final int shardIndex;
6978

70-
// True if we should use the incoming ScoreDoc.shardIndex for sort order
71-
final boolean useScoreDocIndex;
72-
7379
// Which hit within the shard:
7480
int hitIndex;
7581

76-
ShardRef(int shardIndex, boolean useScoreDocIndex) {
82+
ShardRef(int shardIndex) {
7783
this.shardIndex = shardIndex;
78-
this.useScoreDocIndex = useScoreDocIndex;
7984
}
8085

8186
@Override
8287
public String toString() {
8388
return "ShardRef(shardIndex=" + shardIndex + " hitIndex=" + hitIndex + ")";
8489
}
85-
86-
int getShardIndex(ScoreDoc scoreDoc) {
87-
if (useScoreDocIndex) {
88-
if (scoreDoc.shardIndex == -1) {
89-
throw new IllegalArgumentException(
90-
"setShardIndex is false but TopDocs[" + shardIndex + "].scoreDocs[" + hitIndex + "] is not set"
91-
);
92-
}
93-
return scoreDoc.shardIndex;
94-
} else {
95-
// NOTE: we don't assert that shardIndex is -1 here, because caller could in fact have set it but asked us to ignore it now
96-
return shardIndex;
97-
}
98-
}
9990
}
10091

10192
/**
102-
* if we need to tie-break since score / sort value are the same we first compare shard index (lower shard wins)
103-
* and then iff shard index is the same we use the hit index.
93+
* Use the default tie breaker. If tie breaker returns 0 signifying equal values, we use hit
94+
* indices to tie break intra shard ties
10495
*/
10596
static boolean tieBreakLessThan(ShardRef first, ScoreDoc firstDoc, ShardRef second, ScoreDoc secondDoc) {
106-
final int firstShardIndex = first.getShardIndex(firstDoc);
107-
final int secondShardIndex = second.getShardIndex(secondDoc);
108-
// Tie break: earlier shard wins
109-
if (firstShardIndex < secondShardIndex) {
110-
return true;
111-
} else if (firstShardIndex > secondShardIndex) {
112-
return false;
113-
} else {
97+
int value = DEFAULT_TIE_BREAKER.compare(firstDoc, secondDoc);
98+
99+
if (value == 0) {
100+
// Equal Values
114101
// Tie break in same shard: resolve however the
115102
// shard had resolved it:
116103
assert first.hitIndex != second.hitIndex;
117104
return first.hitIndex < second.hitIndex;
118105
}
106+
107+
return value < 0;
119108
}
120109

121110
private static class MergeSortQueue extends PriorityQueue<ShardRef> {
@@ -173,8 +162,10 @@ public boolean lessThan(ShardRef first, ShardRef second) {
173162
/**
174163
* Returns a new CollapseTopDocs, containing topN collapsed results across
175164
* the provided CollapseTopDocs, sorting by score. Each {@link CollapseTopFieldDocs} instance must be sorted.
165+
* docIDs are expected to be in consistent pattern i.e. either all ScoreDocs have their shardIndex set,
166+
* or all have them as -1 (signifying that all hits belong to same shard)
176167
**/
177-
public static CollapseTopFieldDocs merge(Sort sort, int start, int size, CollapseTopFieldDocs[] shardHits, boolean setShardIndex) {
168+
public static CollapseTopFieldDocs merge(Sort sort, int start, int size, CollapseTopFieldDocs[] shardHits) {
178169
String collapseField = shardHits[0].field;
179170
for (int i = 1; i < shardHits.length; i++) {
180171
if (collapseField.equals(shardHits[i].field) == false) {
@@ -200,12 +191,13 @@ public static CollapseTopFieldDocs merge(Sort sort, int start, int size, Collaps
200191
}
201192
if (CollectionUtils.isEmpty(shard.scoreDocs) == false) {
202193
availHitCount += shard.scoreDocs.length;
203-
queue.add(new ShardRef(shardIDX, setShardIndex == false));
194+
queue.add(new ShardRef(shardIDX));
204195
}
205196
}
206197

207198
final ScoreDoc[] hits;
208199
final Object[] values;
200+
boolean unsetShardIndex = false;
209201
if (availHitCount <= start) {
210202
hits = new ScoreDoc[0];
211203
values = new Object[0];
@@ -223,6 +215,15 @@ public static CollapseTopFieldDocs merge(Sort sort, int start, int size, Collaps
223215
ShardRef ref = queue.top();
224216
final ScoreDoc hit = shardHits[ref.shardIndex].scoreDocs[ref.hitIndex];
225217
final Object collapseValue = shardHits[ref.shardIndex].collapseValues[ref.hitIndex++];
218+
// Irrespective of whether we use shard indices for tie breaking or not, we check for
219+
// consistent order in shard indices to defend against potential bugs
220+
if (hitUpto > 0) {
221+
if (unsetShardIndex != (hit.shardIndex == -1)) {
222+
throw new IllegalArgumentException("Inconsistent order of shard indices");
223+
}
224+
}
225+
unsetShardIndex |= hit.shardIndex == -1;
226+
226227
if (seen.contains(collapseValue)) {
227228
if (ref.hitIndex < shardHits[ref.shardIndex].scoreDocs.length) {
228229
queue.updateTop();
@@ -232,9 +233,6 @@ public static CollapseTopFieldDocs merge(Sort sort, int start, int size, Collaps
232233
continue;
233234
}
234235
seen.add(collapseValue);
235-
if (setShardIndex) {
236-
hit.shardIndex = ref.shardIndex;
237-
}
238236
if (hitUpto >= start) {
239237
hitList.add(hit);
240238
collapseList.add(collapseValue);

server/src/main/java/org/opensearch/action/search/SearchPhaseController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ static TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
233233
} else if (topDocs instanceof CollapseTopFieldDocs) {
234234
final CollapseTopFieldDocs[] shardTopDocs = results.toArray(new CollapseTopFieldDocs[numShards]);
235235
final Sort sort = createSort(shardTopDocs);
236-
mergedTopDocs = CollapseTopFieldDocs.merge(sort, from, topN, shardTopDocs, false);
236+
mergedTopDocs = CollapseTopFieldDocs.merge(sort, from, topN, shardTopDocs);
237237
} else if (topDocs instanceof TopFieldDocs) {
238238
final TopFieldDocs[] shardTopDocs = results.toArray(new TopFieldDocs[numShards]);
239239
final Sort sort = createSort(shardTopDocs);

server/src/main/java/org/opensearch/search/query/TopDocsCollectorContext.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,7 @@ protected ReduceableSearchResult reduceWith(final Collection<CollapseTopFieldDoc
331331
sort,
332332
0,
333333
numHits,
334-
topFieldDocs.toArray(new CollapseTopFieldDocs[0]),
335-
true
334+
topFieldDocs.toArray(new CollapseTopFieldDocs[0])
336335
);
337336
result.topDocs(new TopDocsAndMaxScore(topDocs, maxScore), sortFmt);
338337
};

server/src/test/java/org/opensearch/lucene/grouping/CollapsingTopDocsCollectorTests.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.lucene.search.IndexSearcher;
4646
import org.apache.lucene.search.MatchAllDocsQuery;
4747
import org.apache.lucene.search.Query;
48+
import org.apache.lucene.search.ScoreDoc;
4849
import org.apache.lucene.search.ScoreMode;
4950
import org.apache.lucene.search.Sort;
5051
import org.apache.lucene.search.SortField;
@@ -224,7 +225,7 @@ private <T extends Comparable<T>> void assertSearchCollapse(
224225
subSearcher.search(weight, c);
225226
shardHits[shardIDX] = c.getTopDocs();
226227
}
227-
CollapseTopFieldDocs mergedFieldDocs = CollapseTopFieldDocs.merge(sort, 0, expectedNumGroups, shardHits, true);
228+
CollapseTopFieldDocs mergedFieldDocs = CollapseTopFieldDocs.merge(sort, 0, expectedNumGroups, shardHits);
228229
assertTopDocsEquals(query, mergedFieldDocs, collapseTopFieldDocs);
229230
w.close();
230231
reader.close();
@@ -455,4 +456,42 @@ public void testEmptySortedSegment() throws Exception {
455456
reader.close();
456457
dir.close();
457458
}
459+
460+
public void testInconsistentShardIndicesException() {
461+
Sort sort = Sort.RELEVANCE;
462+
463+
// Create TopDocs with mixed shardIndex values - some set, some -1
464+
ScoreDoc[] shard1Docs = {
465+
new FieldDoc(1, 9.0f, new Object[] { 9.0f }, 0), // shardIndex = 0
466+
new FieldDoc(2, 8.0f, new Object[] { 8.0f }, 0) // shardIndex = 0
467+
};
468+
469+
ScoreDoc[] shard2Docs = {
470+
new FieldDoc(3, 7.0f, new Object[] { 7.0f }, -1), // shardIndex = -1 (inconsistent!)
471+
new FieldDoc(4, 6.0f, new Object[] { 6.0f }, -1) // shardIndex = -1
472+
};
473+
474+
CollapseTopFieldDocs[] shardHits = {
475+
new CollapseTopFieldDocs(
476+
"field",
477+
new TotalHits(2, TotalHits.Relation.EQUAL_TO),
478+
shard1Docs,
479+
sort.getSort(),
480+
new Object[] { "val1", "val2" }
481+
),
482+
new CollapseTopFieldDocs(
483+
"field",
484+
new TotalHits(2, TotalHits.Relation.EQUAL_TO),
485+
shard2Docs,
486+
sort.getSort(),
487+
new Object[] { "val3", "val4" }
488+
) };
489+
490+
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> {
491+
CollapseTopFieldDocs.merge(sort, 0, 10, shardHits);
492+
});
493+
494+
assertEquals("Inconsistent order of shard indices", exception.getMessage());
495+
}
496+
458497
}

server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1123,6 +1123,9 @@ public void testCollapseQuerySearchResults() throws Exception {
11231123
assertEquals(2, context.queryResult().topDocs().topDocs.scoreDocs.length);
11241124
assertThat(context.queryResult().topDocs().topDocs.totalHits.value(), equalTo((long) numDocs));
11251125
assertThat(context.queryResult().topDocs().topDocs, instanceOf(CollapseTopFieldDocs.class));
1126+
for (ScoreDoc scoreDoc : context.queryResult().topDocs().topDocs.scoreDocs) {
1127+
assertEquals(-1, scoreDoc.shardIndex);
1128+
}
11261129

11271130
CollapseTopFieldDocs topDocs = (CollapseTopFieldDocs) context.queryResult().topDocs().topDocs;
11281131
assertThat(topDocs.collapseValues.length, equalTo(2));
@@ -1135,6 +1138,9 @@ public void testCollapseQuerySearchResults() throws Exception {
11351138
assertEquals(2, context.queryResult().topDocs().topDocs.scoreDocs.length);
11361139
assertThat(context.queryResult().topDocs().topDocs.totalHits.value(), equalTo((long) numDocs));
11371140
assertThat(context.queryResult().topDocs().topDocs, instanceOf(CollapseTopFieldDocs.class));
1141+
for (ScoreDoc scoreDoc : context.queryResult().topDocs().topDocs.scoreDocs) {
1142+
assertEquals(-1, scoreDoc.shardIndex);
1143+
}
11381144

11391145
topDocs = (CollapseTopFieldDocs) context.queryResult().topDocs().topDocs;
11401146
assertThat(topDocs.collapseValues.length, equalTo(2));
@@ -1147,6 +1153,9 @@ public void testCollapseQuerySearchResults() throws Exception {
11471153
assertEquals(2, context.queryResult().topDocs().topDocs.scoreDocs.length);
11481154
assertThat(context.queryResult().topDocs().topDocs.totalHits.value(), equalTo((long) numDocs));
11491155
assertThat(context.queryResult().topDocs().topDocs, instanceOf(CollapseTopFieldDocs.class));
1156+
for (ScoreDoc scoreDoc : context.queryResult().topDocs().topDocs.scoreDocs) {
1157+
assertEquals(-1, scoreDoc.shardIndex);
1158+
}
11501159

11511160
topDocs = (CollapseTopFieldDocs) context.queryResult().topDocs().topDocs;
11521161
assertThat(topDocs.collapseValues.length, equalTo(2));

0 commit comments

Comments
 (0)