Skip to content

Commit e85d9d1

Browse files
authored
EQL: Add option for returning results from the tail of the stream (elastic#64869)
Introduce option for specifying whether the results are returned from the tail (end) of the stream or the head (beginning). Improve sequencing algorithm by significantly eliminating the number of in-flight sequences for spare datasets. Refactor the sequence class by eliminating some of the redundant code. Change matching behavior for tail sequences. Return results based on their first entry ordinal instead of insertion order (which was ordered on the last match ordinal). Randomize results position inside test suite. Close elastic#58646
1 parent 6e9490a commit e85d9d1

File tree

20 files changed

+529
-290
lines changed

20 files changed

+529
-290
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/eql/EqlSearchRequest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
3939
private QueryBuilder filter = null;
4040
private String timestampField = "@timestamp";
4141
private String eventCategoryField = "event.category";
42+
private String resultPosition = "head";
4243

4344
private int size = 10;
4445
private int fetchSize = 1000;
@@ -57,6 +58,7 @@ public class EqlSearchRequest implements Validatable, ToXContentObject {
5758
static final String KEY_SIZE = "size";
5859
static final String KEY_FETCH_SIZE = "fetch_size";
5960
static final String KEY_QUERY = "query";
61+
static final String KEY_RESULT_POSITION = "result_position";
6062
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
6163
static final String KEY_KEEP_ALIVE = "keep_alive";
6264
static final String KEY_KEEP_ON_COMPLETION = "keep_on_completion";
@@ -79,6 +81,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
7981
builder.field(KEY_EVENT_CATEGORY_FIELD, eventCategoryField());
8082
builder.field(KEY_SIZE, size());
8183
builder.field(KEY_FETCH_SIZE, fetchSize());
84+
builder.field(KEY_RESULT_POSITION, resultPosition());
8285

8386
builder.field(KEY_QUERY, query);
8487
if (waitForCompletionTimeout != null) {
@@ -140,6 +143,19 @@ public EqlSearchRequest eventCategoryField(String eventCategoryField) {
140143
return this;
141144
}
142145

146+
public String resultPosition() {
147+
return resultPosition;
148+
}
149+
150+
public EqlSearchRequest resultPosition(String position) {
151+
if ("head".equals(position) || "tail".equals(position)) {
152+
resultPosition = position;
153+
} else {
154+
throw new IllegalArgumentException("result position needs to be 'head' or 'tail', received '" + position + "'");
155+
}
156+
return this;
157+
}
158+
143159
public int size() {
144160
return this.size;
145161
}
@@ -211,6 +227,7 @@ public boolean equals(Object o) {
211227
EqlSearchRequest that = (EqlSearchRequest) o;
212228
return size == that.size &&
213229
fetchSize == that.fetchSize &&
230+
resultPosition == that.resultPosition &&
214231
Arrays.equals(indices, that.indices) &&
215232
Objects.equals(indicesOptions, that.indicesOptions) &&
216233
Objects.equals(filter, that.filter) &&
@@ -237,6 +254,7 @@ public int hashCode() {
237254
tiebreakerField,
238255
eventCategoryField,
239256
query,
257+
resultPosition,
240258
waitForCompletionTimeout,
241259
keepAlive,
242260
keepOnCompletion);

x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/BaseEqlSpecTestCase.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.elasticsearch.test.eql;
88

99
import org.apache.http.HttpHost;
10+
import org.apache.http.client.config.RequestConfig;
1011
import org.elasticsearch.client.EqlClient;
1112
import org.elasticsearch.client.Request;
1213
import org.elasticsearch.client.RequestOptions;
@@ -118,11 +119,19 @@ protected EqlSearchResponse runQuery(String index, String query) throws Exceptio
118119
// some queries return more than 10 results
119120
request.size(50);
120121
request.fetchSize(randomIntBetween(2, 50));
122+
request.resultPosition(randomBoolean() ? "head" : "tail");
121123
return runRequest(eqlClient(), request);
122124
}
123125

124126
protected EqlSearchResponse runRequest(EqlClient eqlClient, EqlSearchRequest request) throws IOException {
125-
return eqlClient.search(request, RequestOptions.DEFAULT);
127+
int timeout = Math.toIntExact(timeout().millis());
128+
129+
RequestConfig config = RequestConfig.copy(RequestConfig.DEFAULT)
130+
.setConnectionRequestTimeout(timeout)
131+
.setConnectTimeout(timeout)
132+
.setSocketTimeout(timeout)
133+
.build();
134+
return eqlClient.search(request, RequestOptions.DEFAULT.toBuilder().setRequestConfig(config).build());
126135
}
127136

128137
protected EqlClient eqlClient() {

x-pack/plugin/eql/qa/common/src/main/resources/additional_test_queries.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ sequence by unique_pid
268268
[any where true]
269269
[any where serial_event_id < 72]
270270
'''
271-
expected_event_ids = [54, 55, 59,
271+
expected_event_ids = [
272+
54, 55, 59,
272273
55, 59, 61,
273274
59, 61, 65,
274275
16, 60, 66,

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/action/EqlSearchRequest.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
4949
private int size = RequestDefaults.SIZE;
5050
private int fetchSize = RequestDefaults.FETCH_SIZE;
5151
private String query;
52+
private String resultPosition = "head";
5253

5354
// Async settings
5455
private TimeValue waitForCompletionTimeout = null;
@@ -65,6 +66,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
6566
static final String KEY_WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
6667
static final String KEY_KEEP_ALIVE = "keep_alive";
6768
static final String KEY_KEEP_ON_COMPLETION = "keep_on_completion";
69+
static final String KEY_RESULT_POSITION = "result_position";
6870

6971
static final ParseField FILTER = new ParseField(KEY_FILTER);
7072
static final ParseField TIMESTAMP_FIELD = new ParseField(KEY_TIMESTAMP_FIELD);
@@ -76,6 +78,7 @@ public class EqlSearchRequest extends ActionRequest implements IndicesRequest.Re
7678
static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField(KEY_WAIT_FOR_COMPLETION_TIMEOUT);
7779
static final ParseField KEEP_ALIVE = new ParseField(KEY_KEEP_ALIVE);
7880
static final ParseField KEEP_ON_COMPLETION = new ParseField(KEY_KEEP_ON_COMPLETION);
81+
static final ParseField RESULT_POSITION = new ParseField(KEY_RESULT_POSITION);
7982

8083
private static final ObjectParser<EqlSearchRequest, Void> PARSER = objectParser(EqlSearchRequest::new);
8184

@@ -99,6 +102,9 @@ public EqlSearchRequest(StreamInput in) throws IOException {
99102
this.keepAlive = in.readOptionalTimeValue();
100103
this.keepOnCompletion = in.readBoolean();
101104
}
105+
if (in.getVersion().onOrAfter(Version.V_7_10_0)) {
106+
resultPosition = in.readString();
107+
}
102108
}
103109

104110
@Override
@@ -168,6 +174,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
168174
builder.field(KEY_KEEP_ALIVE, keepAlive);
169175
}
170176
builder.field(KEY_KEEP_ON_COMPLETION, keepOnCompletion);
177+
builder.field(KEY_RESULT_POSITION, resultPosition);
171178

172179
return builder;
173180
}
@@ -192,6 +199,7 @@ protected static <R extends EqlSearchRequest> ObjectParser<R, Void> objectParser
192199
parser.declareField(EqlSearchRequest::keepAlive,
193200
(p, c) -> TimeValue.parseTimeValue(p.text(), KEY_KEEP_ALIVE), KEEP_ALIVE, ObjectParser.ValueType.VALUE);
194201
parser.declareBoolean(EqlSearchRequest::keepOnCompletion, KEEP_ON_COMPLETION);
202+
parser.declareString(EqlSearchRequest::resultPosition, RESULT_POSITION);
195203
return parser;
196204
}
197205

@@ -281,6 +289,19 @@ public EqlSearchRequest keepOnCompletion(boolean keepOnCompletion) {
281289
return this;
282290
}
283291

292+
public String resultPosition() {
293+
return resultPosition;
294+
}
295+
296+
public EqlSearchRequest resultPosition(String position) {
297+
if ("head".equals(position) || "tail".equals(position)) {
298+
resultPosition = position;
299+
} else {
300+
throw new IllegalArgumentException("result position needs to be 'head' or 'tail', received '" + position + "'");
301+
}
302+
return this;
303+
}
304+
284305
@Override
285306
public void writeTo(StreamOutput out) throws IOException {
286307
super.writeTo(out);
@@ -298,6 +319,10 @@ public void writeTo(StreamOutput out) throws IOException {
298319
out.writeOptionalTimeValue(keepAlive);
299320
out.writeBoolean(keepOnCompletion);
300321
}
322+
323+
if (out.getVersion().onOrAfter(Version.V_7_10_0)) { // TODO: Remove after backport
324+
out.writeString(resultPosition);
325+
}
301326
}
302327

303328
@Override
@@ -321,7 +346,8 @@ public boolean equals(Object o) {
321346
Objects.equals(eventCategoryField, that.eventCategoryField) &&
322347
Objects.equals(query, that.query) &&
323348
Objects.equals(waitForCompletionTimeout, that.waitForCompletionTimeout) &&
324-
Objects.equals(keepAlive, that.keepAlive);
349+
Objects.equals(keepAlive, that.keepAlive) &&
350+
Objects.equals(resultPosition, that.resultPosition);
325351
}
326352

327353

@@ -338,7 +364,8 @@ public int hashCode() {
338364
eventCategoryField,
339365
query,
340366
waitForCompletionTimeout,
341-
keepAlive);
367+
keepAlive,
368+
resultPosition);
342369
}
343370

344371
@Override

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/BoxedQueryRequest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
public class BoxedQueryRequest implements QueryRequest {
2727

2828
private final RangeQueryBuilder timestampRange;
29-
3029
private final SearchSourceBuilder searchSource;
3130

3231
private Ordinal from, to;
@@ -61,6 +60,16 @@ public BoxedQueryRequest from(Ordinal begin) {
6160
return this;
6261
}
6362

63+
/**
64+
* Sets the upper boundary for the query (inclusive).
65+
* Can be removed through null.
66+
*/
67+
public BoxedQueryRequest to(Ordinal end) {
68+
to = end;
69+
timestampRange.lte(end != null ? end.timestamp() : null);
70+
return this;
71+
}
72+
6473
public Ordinal after() {
6574
return after;
6675
}
@@ -69,13 +78,8 @@ public Ordinal from() {
6978
return from;
7079
}
7180

72-
/**
73-
* Sets the upper boundary for the query (inclusive).
74-
*/
75-
public BoxedQueryRequest to(Ordinal end) {
76-
to = end;
77-
timestampRange.lte(end != null ? end.timestamp() : null);
78-
return this;
81+
public Ordinal to() {
82+
return to;
7983
}
8084

8185
@Override

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/Criterion.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,35 +23,39 @@ public class Criterion<Q extends QueryRequest> {
2323
private final HitExtractor timestamp;
2424
private final HitExtractor tiebreaker;
2525

26-
private final boolean reverse;
26+
private final boolean descending;
2727

2828
Criterion(int stage,
2929
Q queryRequest,
3030
List<HitExtractor> keys,
3131
HitExtractor timestamp,
3232
HitExtractor tiebreaker,
33-
boolean reverse) {
33+
boolean descending) {
3434
this.stage = stage;
3535
this.queryRequest = queryRequest;
3636
this.keys = keys;
3737
this.timestamp = timestamp;
3838
this.tiebreaker = tiebreaker;
3939

40-
this.reverse = reverse;
40+
this.descending = descending;
4141
}
4242

4343
public int stage() {
4444
return stage;
4545
}
4646

47-
public boolean reverse() {
48-
return reverse;
47+
public boolean descending() {
48+
return descending;
4949
}
5050

5151
public Q queryRequest() {
5252
return queryRequest;
5353
}
5454

55+
public int keySize() {
56+
return keys.size();
57+
}
58+
5559
public SequenceKey key(SearchHit hit) {
5660
SequenceKey key;
5761
if (keys.isEmpty()) {
@@ -89,6 +93,6 @@ public Ordinal ordinal(SearchHit hit) {
8993

9094
@Override
9195
public String toString() {
92-
return "[" + stage + "][" + reverse + "]";
96+
return "[" + stage + "][" + descending + "]";
9397
}
94-
}
98+
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/assembler/ExecutionManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ public Executable assemble(List<List<Attribute>> listOfKeys,
5757
HitExtractor tbExtractor = Expressions.isPresent(tiebreaker) ? hitExtractor(tiebreaker, extractorRegistry) : null;
5858
// NB: since there's no aliasing inside EQL, the attribute name is the same as the underlying field name
5959
String timestampName = Expressions.name(timestamp);
60-
String tiebreakerName = Expressions.isPresent(tiebreaker) ? Expressions.name(tiebreaker) : null;
6160

6261
// secondary criteria
6362
List<Criterion<BoxedQueryRequest>> criteria = new ArrayList<>(plans.size() - 1);

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/KeyToSequences.java

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313
import java.util.Map;
1414
import java.util.Map.Entry;
1515

16-
/** Dedicated collection for mapping a key to a list of sequences */
17-
/** The list represents the sequence for each stage (based on its index) and is fixed in size */
18-
16+
/**
17+
* Dedicated collection for mapping a key to a list of sequences
18+
* The list represents the sequence for each stage (based on its index) and is fixed in size
19+
*/
1920
class KeyToSequences {
2021

2122
private final int listSize;
@@ -52,24 +53,6 @@ void add(int stage, Sequence sequence) {
5253
groups[stage].add(sequence);
5354
}
5455

55-
void resetGroupInsertPosition() {
56-
for (SequenceGroup[] groups : keyToSequences.values()) {
57-
for (SequenceGroup group : groups) {
58-
if (group != null) {
59-
group.resetInsertPosition();
60-
}
61-
}
62-
}
63-
}
64-
65-
void resetUntilInsertPosition() {
66-
for (UntilGroup until : keyToUntil.values()) {
67-
if (until != null) {
68-
until.resetInsertPosition();
69-
}
70-
}
71-
}
72-
7356
void until(Iterable<KeyAndOrdinal> until) {
7457
for (KeyAndOrdinal keyAndOrdinal : until) {
7558
// ignore unknown keys
@@ -116,17 +99,26 @@ void dropUntil() {
11699
keyToUntil.clear();
117100
}
118101

102+
/**
103+
* Remove all matches expect the latest.
104+
*/
105+
void trimToTail() {
106+
for (SequenceGroup[] groups : keyToSequences.values()) {
107+
for (SequenceGroup group : groups) {
108+
if (group != null) {
109+
group.trimToLast();
110+
}
111+
}
112+
}
113+
}
114+
119115
public void clear() {
120116
keyToSequences.clear();
121117
keyToUntil.clear();
122118
}
123119

124-
int numberOfKeys() {
125-
return keyToSequences.size();
126-
}
127-
128120
@Override
129121
public String toString() {
130122
return LoggerMessageFormat.format(null, "Keys=[{}], Until=[{}]", keyToSequences.size(), keyToUntil.size());
131123
}
132-
}
124+
}

x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/Match.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import java.util.Objects;
1313

1414
/**
15-
* A match within a sequence, holding the result and occurrance time.
15+
* A match within a sequence, holding the result and occurrence time.
1616
*/
1717
class Match {
1818

0 commit comments

Comments
 (0)