Skip to content

Commit 47aa8ee

Browse files
laminelamLamine Idjeraoui
andauthored
Add support for Elastic Search _shard_doc equivalent (#18924)
* add a new sort tiebreaker based on shard id and docid - Implements ShardDocSortBuilder + comparator - TODO: Add unit + integ tests - Registers in SearchModule Signed-off-by: Lamine Idjeraoui <lidjeraoui@apple.com> * add a new sort tiebreaker based on shard id and docid Signed-off-by: Lamine Idjeraoui <lidjeraoui@apple.com> * add a test class plus some refactoring Signed-off-by: Lamine Idjeraoui <lidjeraoui@apple.com> * add microbenchmark Signed-off-by: Lamine Idjeraoui <lidjeraoui@apple.com> * spotless Signed-off-by: Lamine Idjeraoui <lidjeraoui@apple.com> * spotless Signed-off-by: Lamine Idjeraoui <lidjeraoui@apple.com> * add yaml rest test cases Signed-off-by: Lamine Idjeraoui <lidjeraoui@apple.com> * change log Signed-off-by: Lamine Idjeraoui <lidjeraoui@apple.com> * some changes to the shard_doc yaml rest test file Signed-off-by: Lamine Idjeraoui <lidjeraoui@apple.com> * Add PIT param to ShardDocFieldComparatorSourceIT's search requests Signed-off-by: Lamine Idjeraoui <lidjeraoui@apple.com> * add more logic for shard_doc parsing add more test cases Signed-off-by: Lamine Idjeraoui <lidjeraoui@apple.com> --------- Signed-off-by: Lamine Idjeraoui <lidjeraoui@apple.com> Co-authored-by: Lamine Idjeraoui <lidjeraoui@apple.com>
1 parent 3965f6f commit 47aa8ee

File tree

13 files changed

+1398
-0
lines changed

13 files changed

+1398
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2323
- Support system generated search pipeline. ([#19128](https://github.com/opensearch-project/OpenSearch/pull/19128))
2424
- Add `epoch_micros` date format ([#14669](https://github.com/opensearch-project/OpenSearch/issues/14669))
2525
- Grok processor supports capturing multiple values for same field name ([#18799](https://github.com/opensearch-project/OpenSearch/pull/18799))
26+
- Add support for search tie-breaking by _shard_doc ([#18924](https://github.com/opensearch-project/OpenSearch/pull/18924))
2627
- Upgrade opensearch-protobufs dependency to 0.13.0 and update transport-grpc module compatibility ([#19007](https://github.com/opensearch-project/OpenSearch/issues/19007))
2728
- Add new extensible method to DocRequest to specify type ([#19313](https://github.com/opensearch-project/OpenSearch/pull/19313))
2829
- [Rule based auto-tagging] Add Rule based auto-tagging IT ([#18550](https://github.com/opensearch-project/OpenSearch/pull/18550))
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.benchmark.search.sort;
10+
11+
import org.openjdk.jmh.annotations.Benchmark;
12+
import org.openjdk.jmh.annotations.BenchmarkMode;
13+
import org.openjdk.jmh.annotations.Fork;
14+
import org.openjdk.jmh.annotations.Measurement;
15+
import org.openjdk.jmh.annotations.Mode;
16+
import org.openjdk.jmh.annotations.OutputTimeUnit;
17+
import org.openjdk.jmh.annotations.Param;
18+
import org.openjdk.jmh.annotations.Scope;
19+
import org.openjdk.jmh.annotations.Setup;
20+
import org.openjdk.jmh.annotations.State;
21+
import org.openjdk.jmh.annotations.Warmup;
22+
import org.openjdk.jmh.infra.Blackhole;
23+
24+
import java.util.Random;
25+
import java.util.concurrent.TimeUnit;
26+
27+
/**
28+
* JMH microbenchmarks for the _shard_doc composite key path:
29+
* key = (shardKeyPrefix | (docBase + doc))
30+
*
31+
* Mirrors hot operations in ShardDocFieldComparatorSource without needing Lucene classes.
32+
*/
33+
@Fork(3)
34+
@Warmup(iterations = 5)
35+
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
36+
@BenchmarkMode(Mode.Throughput)
37+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
38+
@State(Scope.Benchmark)
39+
40+
public class ShardDocComparatorBenchmark {
41+
42+
@Param({ "1", "4", "16" })
43+
public int segments;
44+
45+
@Param({ "50000" })
46+
public int docsPerSegment;
47+
48+
@Param({ "7" })
49+
public int shardId;
50+
51+
private long shardKeyPrefix;
52+
private int[] docBases;
53+
private int[] docs;
54+
private long[] keys; // precomputed composite keys
55+
56+
// per-doc global doc (docBase + doc) for doc-only baseline
57+
private int[] globalDocs;
58+
59+
@Setup
60+
public void setup() {
61+
shardKeyPrefix = ((long) shardId) << 32; // Must mirror ShardDocFieldComparatorSource.shardKeyPrefix
62+
63+
docBases = new int[segments];
64+
for (int i = 1; i < segments; i++) {
65+
docBases[i] = docBases[i - 1] + docsPerSegment;
66+
}
67+
68+
int total = segments * docsPerSegment;
69+
docs = new int[total];
70+
keys = new long[total];
71+
globalDocs = new int[total];
72+
73+
Random r = new Random(42);
74+
int pos = 0;
75+
for (int s = 0; s < segments; s++) {
76+
int base = docBases[s];
77+
for (int d = 0; d < docsPerSegment; d++) {
78+
int doc = r.nextInt(docsPerSegment);
79+
docs[pos] = doc;
80+
keys[pos] = computeGlobalDocKey(base, doc);
81+
globalDocs[pos] = base + doc;
82+
pos++;
83+
}
84+
}
85+
}
86+
87+
/** Baseline: compare only globalDoc */
88+
@Benchmark
89+
public long compareDocOnlyAsc() {
90+
long acc = 0;
91+
for (int i = 1; i < globalDocs.length; i++) {
92+
acc += Integer.compare(globalDocs[i - 1], globalDocs[i]);
93+
}
94+
return acc;
95+
}
96+
97+
/** raw key packing cost */
98+
@Benchmark
99+
public void packKey(Blackhole bh) {
100+
int total = segments * docsPerSegment;
101+
int idx = 0;
102+
for (int s = 0; s < segments; s++) {
103+
int base = docBases[s];
104+
for (int d = 0; d < docsPerSegment; d++) {
105+
long k = computeGlobalDocKey(base, docs[idx++]);
106+
bh.consume(k);
107+
}
108+
}
109+
}
110+
111+
/** compare already-packed keys as ASC */
112+
@Benchmark
113+
public long compareAsc() {
114+
long acc = 0;
115+
for (int i = 1; i < keys.length; i++) {
116+
acc += Long.compare(keys[i - 1], keys[i]);
117+
}
118+
return acc;
119+
}
120+
121+
/** compare already-packed keys as DESC */
122+
@Benchmark
123+
public long compareDesc() {
124+
long acc = 0;
125+
for (int i = 1; i < keys.length; i++) {
126+
acc += Long.compare(keys[i], keys[i - 1]); // reversed
127+
}
128+
return acc;
129+
}
130+
131+
/** rough “collector loop” mix: copy + occasional compareBottom */
132+
@Benchmark
133+
public int copyAndCompareBottomAsc() {
134+
long bottom = Long.MIN_VALUE;
135+
int worse = 0;
136+
for (int i = 0; i < keys.length; i++) {
137+
long v = keys[i]; // simulate copy(slot, doc)
138+
if ((i & 31) == 0) bottom = v; // simulate setBottom every 32 items
139+
if (Long.compare(bottom, v) < 0) worse++;
140+
}
141+
return worse;
142+
}
143+
144+
// Must mirror ShardDocFieldComparatorSource.computeGlobalDocKey: (shardId << 32) | (docBase + doc)
145+
private long computeGlobalDocKey(int docBase, int doc) {
146+
return shardKeyPrefix | (docBase + doc);
147+
}
148+
}
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
---
2+
setup:
3+
- skip:
4+
version: " - 3.2.99"
5+
reason: "introduced in 3.3.0"
6+
7+
# Multi-shard index
8+
- do:
9+
indices.create:
10+
index: sharddoc_paging
11+
body:
12+
settings:
13+
number_of_shards: 4
14+
number_of_replicas: 0
15+
mappings:
16+
properties:
17+
id: { type: integer }
18+
txt: { type: keyword }
19+
- do:
20+
cluster.health:
21+
wait_for_status: green
22+
index: sharddoc_paging
23+
- do:
24+
bulk:
25+
refresh: true
26+
index: sharddoc_paging
27+
body: |
28+
{"index":{}}
29+
{"id":1,"txt":"a"}
30+
{"index":{}}
31+
{"id":2,"txt":"b"}
32+
{"index":{}}
33+
{"id":3,"txt":"c"}
34+
{"index":{}}
35+
{"id":4,"txt":"d"}
36+
{"index":{}}
37+
{"id":5,"txt":"e"}
38+
{"index":{}}
39+
{"id":6,"txt":"f"}
40+
{"index":{}}
41+
{"id":7,"txt":"g"}
42+
{"index":{}}
43+
{"id":8,"txt":"h"}
44+
{"index":{}}
45+
{"id":9,"txt":"i"}
46+
{"index":{}}
47+
{"id":10,"txt":"j"}
48+
{"index":{}}
49+
{"id":11,"txt":"k"}
50+
{"index":{}}
51+
{"id":12,"txt":"l"}
52+
{"index":{}}
53+
{"id":13,"txt":"m"}
54+
{"index":{}}
55+
{"id":14,"txt":"n"}
56+
{"index":{}}
57+
{"id":15,"txt":"o"}
58+
{"index":{}}
59+
{"id":16,"txt":"p"}
60+
{"index":{}}
61+
{"id":17,"txt":"q"}
62+
{"index":{}}
63+
{"id":18,"txt":"r"}
64+
{"index":{}}
65+
{"id":19,"txt":"s"}
66+
{"index":{}}
67+
{"id":20,"txt":"t"}
68+
{"index":{}}
69+
{"id":21,"txt":"u"}
70+
{"index":{}}
71+
{"id":22,"txt":"v"}
72+
73+
# -------------------------------------------------------------------
74+
# VALIDATION
75+
# -------------------------------------------------------------------
76+
77+
---
78+
"reject _shard_doc without PIT":
79+
- do:
80+
catch: bad_request
81+
search:
82+
index: sharddoc_paging
83+
body:
84+
sort:
85+
- _shard_doc
86+
- match: { status: 400 }
87+
- match: { error.type: action_request_validation_exception }
88+
- match: { error.reason: "/.*_shard_doc is only supported with point-in-time.*|.*PIT.*/" }
89+
90+
---
91+
"detect _shard_doc via FieldSortBuilder-style object without PIT":
92+
- do:
93+
catch: bad_request
94+
search:
95+
index: sharddoc_paging
96+
body:
97+
sort:
98+
- _shard_doc: { } # object form, still invalid without PIT
99+
- match: { status: 400 }
100+
- match: { error.type: action_request_validation_exception }
101+
- match: { error.reason: "/.*_shard_doc is only supported with point-in-time.*|.*PIT.*/" }
102+
103+
104+
# -------------------------------------------------------------------
105+
# HAPPY PATH: PAGINATION WITH PIT ON MULTI-SHARD INDEX
106+
# -------------------------------------------------------------------
107+
108+
---
109+
"accept _shard_doc with PIT + paginate with search_after (multi-shard)":
110+
- do:
111+
create_pit:
112+
index: sharddoc_paging
113+
keep_alive: 1m
114+
- set: { pit_id: pit_id }
115+
116+
# Page 1
117+
- do:
118+
search:
119+
body:
120+
size: 10
121+
pit: { id: "$pit_id", keep_alive: "1m" }
122+
sort:
123+
- _shard_doc: {}
124+
- match: { _shards.failed: 0 }
125+
- length: { hits.hits: 10 }
126+
- is_true: hits.hits.9.sort
127+
128+
- set: { hits.hits.9.sort: after1 }
129+
130+
# Page 2
131+
- do:
132+
search:
133+
body:
134+
size: 10
135+
pit: { id: "$pit_id", keep_alive: "1m" }
136+
sort:
137+
- _shard_doc: { }
138+
search_after: $after1
139+
140+
- match: { _shards.failed: 0 }
141+
- length: { hits.hits: 10 }
142+
- is_true: hits.hits.9.sort
143+
144+
- set: { hits.hits.9.sort: after2 }
145+
- set: { hits.hits.9.sort.0: last_value_page2 }
146+
147+
# Check that the sort values increase from one hit to the next without ever decreasing.
148+
- set: { hits.hits.0.sort.0: prev }
149+
- gt: { hits.hits.1.sort.0: $prev }
150+
151+
- set: { hits.hits.1.sort.0: prev }
152+
- gt: { hits.hits.2.sort.0: $prev }
153+
154+
- set: { hits.hits.2.sort.0: prev }
155+
- gt: { hits.hits.3.sort.0: $prev }
156+
157+
- set: { hits.hits.3.sort.0: prev }
158+
- gt: { hits.hits.4.sort.0: $prev }
159+
160+
- set: { hits.hits.4.sort.0: prev }
161+
- gt: { hits.hits.5.sort.0: $prev }
162+
163+
- set: { hits.hits.5.sort.0: prev }
164+
- gt: { hits.hits.6.sort.0: $prev }
165+
166+
- set: { hits.hits.6.sort.0: prev }
167+
- gt: { hits.hits.7.sort.0: $prev }
168+
169+
- set: { hits.hits.7.sort.0: prev }
170+
- gt: { hits.hits.8.sort.0: $prev }
171+
172+
- set: { hits.hits.8.sort.0: prev }
173+
- gt: { hits.hits.9.sort.0: $prev }
174+
175+
# Page 3: drain the rest (22 docs total => 10 + 10 + 2)
176+
- do:
177+
search:
178+
body:
179+
size: 10
180+
pit: { id: "$pit_id", keep_alive: "1m" }
181+
sort:
182+
- _shard_doc: {}
183+
search_after: $after2
184+
185+
- match: { _shards.failed: 0 }
186+
- length: { hits.hits: 2 }
187+
188+
- do:
189+
delete_pit:
190+
body:
191+
pit_id: [ "$pit_id" ]

0 commit comments

Comments
 (0)