Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix flaky tests in CloseIndexIT by addressing cluster state synchronization issues ([#18878](https://github.com/opensearch-project/OpenSearch/issues/18878))
- [Tiered Caching] Handle query execution exception ([#19000](https://github.com/opensearch-project/OpenSearch/issues/19000))
- Grant access to testclusters dir for tests ([#19085](https://github.com/opensearch-project/OpenSearch/issues/19085))
- Fix assertion error when collapsing search results with concurrent segment search enabled ([#19053](https://github.com/opensearch-project/OpenSearch/pull/19053))
- Fix skip_unavailable setting changing to default during node drop issue ([#18766](https://github.com/opensearch-project/OpenSearch/pull/18766))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,3 +509,104 @@ setup:
- match: { hits.hits.2.inner_hits.sub_hits.hits.hits.1._id: "4" }
- gte: { hits.hits.2.inner_hits.sub_hits.hits.hits.1._seq_no: 0 }
- gte: { hits.hits.2.inner_hits.sub_hits.hits.hits.1._primary_term: 1 }

---
"Test field collapsing with sort":
- skip:
version: " - 3.2.99"
reason: Fixed in 3.3.0
- do:
indices.create:
index: test_1
body:
mappings:
properties:
sort_field: { type: integer }
collapse_field: { type: integer }
marker: {type: keyword}

- do:
index:
index: test_1
refresh: true
id: 1
body: { sort_field: 1, collapse_field: 1, marker: "doc1" }
- do:
index:
index: test_1
refresh: true
id: 2
body: { sort_field: 1, collapse_field: 2, marker: "doc2" }
- do:
index:
index: test_1
refresh: true
id: 3
body: { sort_field: 1, collapse_field: 2, marker: "doc3" }

- do:
search:
index: test_1
size: 2
body:
collapse: { field: collapse_field }
sort: [{ sort_field: desc }]
- match: { hits.total.value: 3 }
- length: { hits.hits: 2 }
- match: { hits.hits.0._id: '1' }
- match: { hits.hits.0._source.marker: 'doc1' }
- match: { hits.hits.1._id: '2' }
- match: { hits.hits.1._source.marker: 'doc2' }

---
"Test field collapsing with sort when concurrent segment search enabled":
- skip:
version: " - 3.2.99"
reason: Fixed in 3.3.0
- do:
indices.create:
index: test_1
body:
mappings:
properties:
sort_field: { type: integer }
collapse_field: { type: integer }
marker: {type: keyword}

- do:
index:
index: test_1
refresh: true
id: 1
body: { sort_field: 1, collapse_field: 1, marker: "doc1" }
- do:
index:
index: test_1
refresh: true
id: 2
body: { sort_field: 1, collapse_field: 2, marker: "doc2" }
- do:
index:
index: test_1
refresh: true
id: 3
body: { sort_field: 1, collapse_field: 2, marker: "doc3" }
- do:
indices.put_settings:
index: test_1
body:
index.search.concurrent_segment_search.mode: 'all'

- do:
search:
index: test_1
size: 2
body:
collapse: { field: collapse_field }
sort: [{ sort_field: desc }]
- match: { hits.total.value: 3 }
- length: { hits.hits: 2 }
- match: { hits.hits.0._id: '1' }
- match: { hits.hits.0._source.marker: 'doc1' }
- match: { hits.hits.1._id: '2' }
- match: { hits.hits.1._source.marker: 'doc2' }
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.core.common.util.CollectionUtils;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -55,6 +56,14 @@ public final class CollapseTopFieldDocs extends TopFieldDocs {
public final String field;
/** The collapse value for each top doc */
public final Object[] collapseValues;
/** Internal comparator with shardIndex */
private static final Comparator<ScoreDoc> SHARD_INDEX_TIE_BREAKER = Comparator.comparingInt(d -> d.shardIndex);

/** Internal comparator with docID */
private static final Comparator<ScoreDoc> DOC_ID_TIE_BREAKER = Comparator.comparingInt(d -> d.doc);

/** Default comparator */
private static final Comparator<ScoreDoc> DEFAULT_TIE_BREAKER = SHARD_INDEX_TIE_BREAKER.thenComparing(DOC_ID_TIE_BREAKER);

public CollapseTopFieldDocs(String field, TotalHits totalHits, ScoreDoc[] scoreDocs, SortField[] sortFields, Object[] values) {
super(totalHits, scoreDocs, sortFields);
Expand All @@ -67,55 +76,35 @@ private static final class ShardRef {
// Which shard (index into shardHits[]):
final int shardIndex;

// True if we should use the incoming ScoreDoc.shardIndex for sort order
final boolean useScoreDocIndex;

// Which hit within the shard:
int hitIndex;

ShardRef(int shardIndex, boolean useScoreDocIndex) {
ShardRef(int shardIndex) {
this.shardIndex = shardIndex;
this.useScoreDocIndex = useScoreDocIndex;
}

@Override
public String toString() {
return "ShardRef(shardIndex=" + shardIndex + " hitIndex=" + hitIndex + ")";
}

int getShardIndex(ScoreDoc scoreDoc) {
if (useScoreDocIndex) {
if (scoreDoc.shardIndex == -1) {
throw new IllegalArgumentException(
"setShardIndex is false but TopDocs[" + shardIndex + "].scoreDocs[" + hitIndex + "] is not set"
);
}
return scoreDoc.shardIndex;
} else {
// NOTE: we don't assert that shardIndex is -1 here, because caller could in fact have set it but asked us to ignore it now
return shardIndex;
}
}
}

/**
* if we need to tie-break since score / sort value are the same we first compare shard index (lower shard wins)
* and then iff shard index is the same we use the hit index.
* Use the default tie breaker. If tie breaker returns 0 signifying equal values, we use hit
* indices to tie break intra shard ties
*/
static boolean tieBreakLessThan(ShardRef first, ScoreDoc firstDoc, ShardRef second, ScoreDoc secondDoc) {
final int firstShardIndex = first.getShardIndex(firstDoc);
final int secondShardIndex = second.getShardIndex(secondDoc);
// Tie break: earlier shard wins
if (firstShardIndex < secondShardIndex) {
return true;
} else if (firstShardIndex > secondShardIndex) {
return false;
} else {
int value = DEFAULT_TIE_BREAKER.compare(firstDoc, secondDoc);

if (value == 0) {
// Equal Values
// Tie break in same shard: resolve however the
// shard had resolved it:
assert first.hitIndex != second.hitIndex;
return first.hitIndex < second.hitIndex;
}

return value < 0;
}

private static class MergeSortQueue extends PriorityQueue<ShardRef> {
Expand Down Expand Up @@ -173,8 +162,10 @@ public boolean lessThan(ShardRef first, ShardRef second) {
/**
* Returns a new CollapseTopDocs, containing topN collapsed results across
* the provided CollapseTopDocs, sorting by score. Each {@link CollapseTopFieldDocs} instance must be sorted.
* docIDs are expected to be in consistent pattern i.e. either all ScoreDocs have their shardIndex set,
* or all have them as -1 (signifying that all hits belong to same shard)
**/
public static CollapseTopFieldDocs merge(Sort sort, int start, int size, CollapseTopFieldDocs[] shardHits, boolean setShardIndex) {
public static CollapseTopFieldDocs merge(Sort sort, int start, int size, CollapseTopFieldDocs[] shardHits) {
String collapseField = shardHits[0].field;
for (int i = 1; i < shardHits.length; i++) {
if (collapseField.equals(shardHits[i].field) == false) {
Expand All @@ -200,12 +191,13 @@ public static CollapseTopFieldDocs merge(Sort sort, int start, int size, Collaps
}
if (CollectionUtils.isEmpty(shard.scoreDocs) == false) {
availHitCount += shard.scoreDocs.length;
queue.add(new ShardRef(shardIDX, setShardIndex == false));
queue.add(new ShardRef(shardIDX));
}
}

final ScoreDoc[] hits;
final Object[] values;
boolean unsetShardIndex = false;
if (availHitCount <= start) {
hits = new ScoreDoc[0];
values = new Object[0];
Expand All @@ -223,6 +215,15 @@ public static CollapseTopFieldDocs merge(Sort sort, int start, int size, Collaps
ShardRef ref = queue.top();
final ScoreDoc hit = shardHits[ref.shardIndex].scoreDocs[ref.hitIndex];
final Object collapseValue = shardHits[ref.shardIndex].collapseValues[ref.hitIndex++];
// Irrespective of whether we use shard indices for tie breaking or not, we check for
// consistent order in shard indices to defend against potential bugs
if (hitUpto > 0) {
if (unsetShardIndex != (hit.shardIndex == -1)) {
throw new IllegalArgumentException("Inconsistent order of shard indices");
}
}
unsetShardIndex |= hit.shardIndex == -1;

if (seen.contains(collapseValue)) {
if (ref.hitIndex < shardHits[ref.shardIndex].scoreDocs.length) {
queue.updateTop();
Expand All @@ -232,9 +233,6 @@ public static CollapseTopFieldDocs merge(Sort sort, int start, int size, Collaps
continue;
}
seen.add(collapseValue);
if (setShardIndex) {
hit.shardIndex = ref.shardIndex;
}
if (hitUpto >= start) {
hitList.add(hit);
collapseList.add(collapseValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ static TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
} else if (topDocs instanceof CollapseTopFieldDocs) {
final CollapseTopFieldDocs[] shardTopDocs = results.toArray(new CollapseTopFieldDocs[numShards]);
final Sort sort = createSort(shardTopDocs);
mergedTopDocs = CollapseTopFieldDocs.merge(sort, from, topN, shardTopDocs, false);
mergedTopDocs = CollapseTopFieldDocs.merge(sort, from, topN, shardTopDocs);
} else if (topDocs instanceof TopFieldDocs) {
final TopFieldDocs[] shardTopDocs = results.toArray(new TopFieldDocs[numShards]);
final Sort sort = createSort(shardTopDocs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ protected ReduceableSearchResult reduceWith(final Collection<CollapseTopFieldDoc
sort,
0,
numHits,
topFieldDocs.toArray(new CollapseTopFieldDocs[0]),
true
topFieldDocs.toArray(new CollapseTopFieldDocs[0])
);
result.topDocs(new TopDocsAndMaxScore(topDocs, maxScore), sortFmt);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
Expand Down Expand Up @@ -224,7 +225,7 @@ private <T extends Comparable<T>> void assertSearchCollapse(
subSearcher.search(weight, c);
shardHits[shardIDX] = c.getTopDocs();
}
CollapseTopFieldDocs mergedFieldDocs = CollapseTopFieldDocs.merge(sort, 0, expectedNumGroups, shardHits, true);
CollapseTopFieldDocs mergedFieldDocs = CollapseTopFieldDocs.merge(sort, 0, expectedNumGroups, shardHits);
assertTopDocsEquals(query, mergedFieldDocs, collapseTopFieldDocs);
w.close();
reader.close();
Expand Down Expand Up @@ -455,4 +456,42 @@ public void testEmptySortedSegment() throws Exception {
reader.close();
dir.close();
}

public void testInconsistentShardIndicesException() {
Sort sort = Sort.RELEVANCE;

// Create TopDocs with mixed shardIndex values - some set, some -1
ScoreDoc[] shard1Docs = {
new FieldDoc(1, 9.0f, new Object[] { 9.0f }, 0), // shardIndex = 0
new FieldDoc(2, 8.0f, new Object[] { 8.0f }, 0) // shardIndex = 0
};

ScoreDoc[] shard2Docs = {
new FieldDoc(3, 7.0f, new Object[] { 7.0f }, -1), // shardIndex = -1 (inconsistent!)
new FieldDoc(4, 6.0f, new Object[] { 6.0f }, -1) // shardIndex = -1
};

CollapseTopFieldDocs[] shardHits = {
new CollapseTopFieldDocs(
"field",
new TotalHits(2, TotalHits.Relation.EQUAL_TO),
shard1Docs,
sort.getSort(),
new Object[] { "val1", "val2" }
),
new CollapseTopFieldDocs(
"field",
new TotalHits(2, TotalHits.Relation.EQUAL_TO),
shard2Docs,
sort.getSort(),
new Object[] { "val3", "val4" }
) };

IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> {
CollapseTopFieldDocs.merge(sort, 0, 10, shardHits);
});

assertEquals("Inconsistent order of shard indices", exception.getMessage());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,9 @@ public void testCollapseQuerySearchResults() throws Exception {
assertEquals(2, context.queryResult().topDocs().topDocs.scoreDocs.length);
assertThat(context.queryResult().topDocs().topDocs.totalHits.value(), equalTo((long) numDocs));
assertThat(context.queryResult().topDocs().topDocs, instanceOf(CollapseTopFieldDocs.class));
for (ScoreDoc scoreDoc : context.queryResult().topDocs().topDocs.scoreDocs) {
assertEquals(-1, scoreDoc.shardIndex);
}

CollapseTopFieldDocs topDocs = (CollapseTopFieldDocs) context.queryResult().topDocs().topDocs;
assertThat(topDocs.collapseValues.length, equalTo(2));
Expand All @@ -1135,6 +1138,9 @@ public void testCollapseQuerySearchResults() throws Exception {
assertEquals(2, context.queryResult().topDocs().topDocs.scoreDocs.length);
assertThat(context.queryResult().topDocs().topDocs.totalHits.value(), equalTo((long) numDocs));
assertThat(context.queryResult().topDocs().topDocs, instanceOf(CollapseTopFieldDocs.class));
for (ScoreDoc scoreDoc : context.queryResult().topDocs().topDocs.scoreDocs) {
assertEquals(-1, scoreDoc.shardIndex);
}

topDocs = (CollapseTopFieldDocs) context.queryResult().topDocs().topDocs;
assertThat(topDocs.collapseValues.length, equalTo(2));
Expand All @@ -1147,6 +1153,9 @@ public void testCollapseQuerySearchResults() throws Exception {
assertEquals(2, context.queryResult().topDocs().topDocs.scoreDocs.length);
assertThat(context.queryResult().topDocs().topDocs.totalHits.value(), equalTo((long) numDocs));
assertThat(context.queryResult().topDocs().topDocs, instanceOf(CollapseTopFieldDocs.class));
for (ScoreDoc scoreDoc : context.queryResult().topDocs().topDocs.scoreDocs) {
assertEquals(-1, scoreDoc.shardIndex);
}

topDocs = (CollapseTopFieldDocs) context.queryResult().topDocs().topDocs;
assertThat(topDocs.collapseValues.length, equalTo(2));
Expand Down
Loading