Skip to content

Commit

Permalink
Merge branch 'main' into retrievers
Browse files Browse the repository at this point in the history
  • Loading branch information
jdconrad committed Feb 27, 2024
2 parents 4300470 + 73a170b commit 35bb53e
Show file tree
Hide file tree
Showing 58 changed files with 3,418 additions and 228 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/105067.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 105067
summary: "ESQL: Use faster field caps"
area: ES|QL
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/105421.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 105421
summary: "ESQL: Add timers to many status results"
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.example.rescore;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FloatField;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.search.rescore.RescoreContext;
import org.elasticsearch.test.AbstractBuilderTestCase;

import java.io.IOException;

public class ExampleRescoreBuilderFieldDataTests extends AbstractBuilderTestCase {

//to test that the rescore plugin is able to pull data from the indexed documents
//these following helper methods are called from the test below,
//some helpful examples related to this are located circa feb 14 2024 at:
//https://github.com/apache/lucene/blob/main/lucene/core/src/test/org/apache/lucene/search/TestQueryRescorer.java

private String fieldFactorFieldName = "literalNameOfFieldUsedAsFactor";
private float fieldFactorValue = 2.0f;

private IndexSearcher getSearcher(IndexReader r) {
IndexSearcher searcher = newSearcher(r);
return searcher;
}

private IndexReader publishDocs(int numDocs, String fieldName, Directory dir) throws Exception {
//here we populate a collection of documents into the mock search context
//note they all have the same field factor value for convenience
RandomIndexWriter w = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
for (int i = 0; i < numDocs; i++) {
Document d = new Document();
d.add(newStringField("id", Integer.toString(i), Field.Store.YES));
d.add(new FloatField(fieldName, fieldFactorValue, Field.Store.YES ));
w.addDocument(d);
}
IndexReader reader = w.getReader();
w.close();
return reader;
}
@Override
protected void initializeAdditionalMappings(MapperService mapperService) throws IOException {

mapperService.merge(
"_doc",
new CompressedXContent(Strings.toString(PutMappingRequest.simpleMapping(fieldFactorFieldName, "type=float"))),
MapperService.MergeReason.MAPPING_UPDATE
);
}


public void testRescoreUsingFieldData() throws Exception {
//we want the originalScoreOfTopDocs to be lower than the rescored values
//so that the order of the result has moved the rescored window to the top of the results
float originalScoreOfTopDocs = 1.0f;

//just like in the associated rescore builder factor testing
//we will test a random factor on the incoming score docs
//the division is just to leave room for whatever values are picked
float factor = (float) randomDoubleBetween(1.0d, Float.MAX_VALUE/(fieldFactorValue * originalScoreOfTopDocs)-1, false);

// Testing factorField specifically here for more example rescore debugging
// setup a mock search context that will be able to locate fieldIndexData
// provided from the index reader that follows

Directory dir = newDirectory();
//the rest of this test does not actually need more than 3 docs in the mock
//however any number >= 3 is fine
int numDocs = 3;
IndexReader reader = publishDocs(numDocs, fieldFactorFieldName, dir);
IndexSearcher searcher = getSearcher(reader);

ExampleRescoreBuilder builder = new ExampleRescoreBuilder(factor, fieldFactorFieldName).windowSize(2);

RescoreContext context = builder.buildContext(createSearchExecutionContext(searcher));

//create and populate the TopDocs that will be provided to the rescore function
TopDocs docs = new TopDocs(new TotalHits(10, TotalHits.Relation.EQUAL_TO), new ScoreDoc[3]);
docs.scoreDocs[0] = new ScoreDoc(0, originalScoreOfTopDocs);
docs.scoreDocs[1] = new ScoreDoc(1, originalScoreOfTopDocs);
docs.scoreDocs[2] = new ScoreDoc(2, originalScoreOfTopDocs);
context.rescorer().rescore(docs, searcher, context);

//here we expect that windowSize docs have been re-scored, with remaining doc in the original state
assertEquals(originalScoreOfTopDocs*factor*fieldFactorValue, docs.scoreDocs[0].score, 0.0f);
assertEquals(originalScoreOfTopDocs*factor*fieldFactorValue, docs.scoreDocs[1].score, 0.0f);
assertEquals(originalScoreOfTopDocs, docs.scoreDocs[2].score, 0.0f);

//just to clean up the mocks
reader.close();
dir.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INGEST_GRAPH_STRUCTURE_EXCEPTION = def(8_594_00_0);
public static final TransportVersion ML_MODEL_IN_SERVICE_SETTINGS = def(8_595_00_0);
public static final TransportVersion RANDOM_AGG_SHARD_SEED = def(8_596_00_0);
public static final TransportVersion ESQL_TIMINGS = def(8_597_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Map;
import java.util.Objects;

final class FieldCapabilitiesIndexResponse implements Writeable {
public final class FieldCapabilitiesIndexResponse implements Writeable {
private static final TransportVersion MAPPING_HASH_VERSION = TransportVersions.V_8_2_0;

private final String indexName;
Expand All @@ -34,7 +34,7 @@ final class FieldCapabilitiesIndexResponse implements Writeable {
private final boolean canMatch;
private final transient TransportVersion originVersion;

FieldCapabilitiesIndexResponse(
public FieldCapabilitiesIndexResponse(
String indexName,
@Nullable String indexMappingHash,
Map<String, IndexFieldCapabilities> responseMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public FieldCapabilitiesRequest() {}
* <p>
* Note that when using the high-level REST client, results are always merged (this flag is always considered 'true').
*/
boolean isMergeResults() {
public boolean isMergeResults() {
return mergeResults;
}

Expand All @@ -85,7 +85,7 @@ boolean isMergeResults() {
* <p>
* Note that when using the high-level REST client, results are always merged (this flag is always considered 'true').
*/
void setMergeResults(boolean mergeResults) {
public void setMergeResults(boolean mergeResults) {
this.mergeResults = mergeResults;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public FieldCapabilitiesResponse(String[] indices, Map<String, Map<String, Field
this(indices, responseMap, Collections.emptyList(), Collections.emptyList());
}

FieldCapabilitiesResponse(List<FieldCapabilitiesIndexResponse> indexResponses, List<FieldCapabilitiesFailure> failures) {
public FieldCapabilitiesResponse(List<FieldCapabilitiesIndexResponse> indexResponses, List<FieldCapabilitiesFailure> failures) {
this(Strings.EMPTY_ARRAY, Collections.emptyMap(), indexResponses, failures);
}

Expand Down Expand Up @@ -117,7 +117,7 @@ public List<FieldCapabilitiesFailure> getFailures() {
/**
* Returns the actual per-index field caps responses
*/
List<FieldCapabilitiesIndexResponse> getIndexResponses() {
public List<FieldCapabilitiesIndexResponse> getIndexResponses() {
return indexResponses;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public boolean test(Set<String> nodesVersions) {
.orElseThrow(() -> new IllegalArgumentException("Checks against a version range require semantic version format (x.y.z)"));
return minimumNodeVersion.onOrAfter(lower) && minimumNodeVersion.onOrBefore(upper);
}

@Override
public String toString() {
return "MinimumContainedInVersionRange{lower=" + lower + ", upper=" + upper + '}';
}
}

static List<Predicate<Set<String>>> parseVersionRanges(String rawRanges) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public Page getOutput() {
assert remainingDocs <= 0 : remainingDocs;
return null;
}
long start = System.nanoTime();
try {
final LuceneScorer scorer = getCurrentOrLoadNextScorer();
// no scorer means no more docs
Expand Down Expand Up @@ -171,6 +172,8 @@ public Page getOutput() {
return page;
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
processingNanos += System.nanoTime() - start;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand All @@ -24,6 +25,7 @@
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.xcontent.XContentBuilder;
Expand Down Expand Up @@ -60,6 +62,7 @@ public abstract class LuceneOperator extends SourceOperator {

private LuceneScorer currentScorer;

long processingNanos;
int pagesEmitted;
boolean doneCollecting;

Expand Down Expand Up @@ -198,6 +201,7 @@ public static class Status implements Operator.Status {
private final int processedSlices;
private final Set<String> processedQueries;
private final Set<String> processedShards;
private final long processingNanos;
private final int totalSlices;
private final int pagesEmitted;
private final int sliceIndex;
Expand All @@ -208,6 +212,7 @@ public static class Status implements Operator.Status {
private Status(LuceneOperator operator) {
processedSlices = operator.processedSlices;
processedQueries = operator.processedQueries.stream().map(Query::toString).collect(Collectors.toCollection(TreeSet::new));
processingNanos = operator.processingNanos;
processedShards = new TreeSet<>(operator.processedShards);
sliceIndex = operator.sliceIndex;
totalSlices = operator.sliceQueue.totalSlices();
Expand All @@ -233,6 +238,7 @@ private Status(LuceneOperator operator) {
int processedSlices,
Set<String> processedQueries,
Set<String> processedShards,
long processingNanos,
int sliceIndex,
int totalSlices,
int pagesEmitted,
Expand All @@ -243,6 +249,7 @@ private Status(LuceneOperator operator) {
this.processedSlices = processedSlices;
this.processedQueries = processedQueries;
this.processedShards = processedShards;
this.processingNanos = processingNanos;
this.sliceIndex = sliceIndex;
this.totalSlices = totalSlices;
this.pagesEmitted = pagesEmitted;
Expand All @@ -260,6 +267,7 @@ private Status(LuceneOperator operator) {
processedQueries = Collections.emptySet();
processedShards = Collections.emptySet();
}
processingNanos = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS) ? in.readVLong() : 0;
sliceIndex = in.readVInt();
totalSlices = in.readVInt();
pagesEmitted = in.readVInt();
Expand All @@ -275,6 +283,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(processedQueries, StreamOutput::writeString);
out.writeCollection(processedShards, StreamOutput::writeString);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TIMINGS)) {
out.writeVLong(processingNanos);
}
out.writeVInt(sliceIndex);
out.writeVInt(totalSlices);
out.writeVInt(pagesEmitted);
Expand All @@ -300,6 +311,10 @@ public Set<String> processedShards() {
return processedShards;
}

public long processNanos() {
return processingNanos;
}

public int sliceIndex() {
return sliceIndex;
}
Expand Down Expand Up @@ -330,6 +345,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("processed_slices", processedSlices);
builder.field("processed_queries", processedQueries);
builder.field("processed_shards", processedShards);
builder.field("processing_nanos", processingNanos);
if (builder.humanReadable()) {
builder.field("processing_time", TimeValue.timeValueNanos(processingNanos));
}
builder.field("slice_index", sliceIndex);
builder.field("total_slices", totalSlices);
builder.field("pages_emitted", pagesEmitted);
Expand All @@ -347,6 +366,7 @@ public boolean equals(Object o) {
return processedSlices == status.processedSlices
&& processedQueries.equals(status.processedQueries)
&& processedShards.equals(status.processedShards)
&& processingNanos == status.processingNanos
&& sliceIndex == status.sliceIndex
&& totalSlices == status.totalSlices
&& pagesEmitted == status.pagesEmitted
Expand All @@ -364,6 +384,11 @@ public int hashCode() {
public String toString() {
return Strings.toString(this);
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.V_8_11_X;
}
}

static Function<ShardContext, Weight> weightFunction(Function<ShardContext, Query> queryFunction, ScoreMode scoreMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public Page getOutput() {
assert currentPagePos == 0 : currentPagePos;
return null;
}
long start = System.nanoTime();
try {
final LuceneScorer scorer = getCurrentOrLoadNextScorer();
if (scorer == null) {
Expand Down Expand Up @@ -163,6 +164,8 @@ public Page getOutput() {
return page;
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
processingNanos += System.nanoTime() - start;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,15 @@ public Page getOutput() {
if (isFinished()) {
return null;
}
if (isEmitting()) {
return emit(false);
} else {
return collect();
long start = System.nanoTime();
try {
if (isEmitting()) {
return emit(false);
} else {
return collect();
}
} finally {
processingNanos += System.nanoTime() - start;
}
}

Expand Down
Loading

0 comments on commit 35bb53e

Please sign in to comment.