Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Add support for table parameters #108169

Merged
merged 5 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_PAGE_MAPPING_TO_ITERATOR = def(8_645_00_0);
public static final TransportVersion BINARY_PIT_ID = def(8_646_00_0);
public static final TransportVersion SECURITY_ROLE_MAPPINGS_IN_CLUSTER_STATE = def(8_647_00_0);

public static final TransportVersion ESQL_REQUEST_TABLES = def(8_648_00_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Build;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
import org.elasticsearch.xpack.esql.analysis.Verifier;
Expand All @@ -30,6 +36,7 @@
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.type.DataType;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.type.DateUtils;
import org.elasticsearch.xpack.ql.type.EsField;
import org.elasticsearch.xpack.ql.type.TypesTests;
Expand All @@ -40,6 +47,7 @@
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
Expand All @@ -48,6 +56,7 @@
import java.util.regex.Pattern;

import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ListMatcher.matchesList;
import static org.elasticsearch.test.MapMatcher.assertMap;
Expand Down Expand Up @@ -109,6 +118,8 @@ public boolean isIndexed(String field) {

public static final TestSearchStats TEST_SEARCH_STATS = new TestSearchStats();

private static final Map<String, Map<String, Column>> TABLES = tables();

public static final EsqlConfiguration TEST_CFG = configuration(new QueryPragmas(Settings.EMPTY));

public static final Verifier TEST_VERIFIER = new Verifier(new Metrics());
Expand All @@ -125,7 +136,8 @@ public static EsqlConfiguration configuration(QueryPragmas pragmas, String query
EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY),
EsqlPlugin.QUERY_RESULT_TRUNCATION_DEFAULT_SIZE.getDefault(Settings.EMPTY),
query,
false
false,
TABLES
);
}

Expand Down Expand Up @@ -263,4 +275,45 @@ public static void assertWarnings(List<String> warnings, List<String> allowedWar
}
}
}

static Map<String, Map<String, Column>> tables() {
BlockFactory factory = new BlockFactory(new NoopCircuitBreaker(CircuitBreaker.REQUEST), BigArrays.NON_RECYCLING_INSTANCE);
Map<String, Map<String, Column>> tables = new HashMap<>();
try (
IntBlock.Builder ints = factory.newIntBlockBuilder(10);
LongBlock.Builder longs = factory.newLongBlockBuilder(10);
BytesRefBlock.Builder names = factory.newBytesRefBlockBuilder(10);
) {
for (int i = 0; i < 10; i++) {
ints.appendInt(i);
longs.appendLong(i);
names.appendBytesRef(new BytesRef(switch (i) {
case 0 -> "zero";
case 1 -> "one";
case 2 -> "two";
case 3 -> "three";
case 4 -> "four";
case 5 -> "five";
case 6 -> "six";
case 7 -> "seven";
case 8 -> "eight";
case 9 -> "nine";
default -> throw new IllegalArgumentException();
}));
}

IntBlock intsBlock = ints.build();
LongBlock longsBlock = longs.build();
BytesRefBlock namesBlock = names.build();
tables.put(
"int_number_names",
Map.of("int", new Column(DataTypes.INTEGER, intsBlock), "name", new Column(DataTypes.KEYWORD, namesBlock))
);
tables.put(
"long_number_names",
Map.of("long", new Column(DataTypes.LONG, longsBlock), "name", new Column(DataTypes.KEYWORD, namesBlock))
);
}
return unmodifiableMap(tables);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql;

import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.elasticsearch.xpack.ql.type.DataType;

import java.io.IOException;

/**
* A column of data provided in the request.
*/
public record Column(DataType type, Block values) implements Releasable, Writeable {
public Column(BlockStreamInput in) throws IOException {
this(EsqlDataTypes.fromTypeName(in.readString()), in.readNamedWriteable(Block.class));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(type.typeName());
out.writeNamedWriteable(values);
}

@Override
public void close() {
Releasables.closeExpectNoException(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,26 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.esql.Column;
import org.elasticsearch.xpack.esql.parser.TypedParamValue;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.version.EsqlVersion;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;

import static org.elasticsearch.action.ValidateActions.addValidationError;

Expand All @@ -49,6 +54,11 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
private boolean keepOnCompletion;
private boolean onSnapshotBuild = Build.current().isSnapshot();

/**
* "Tables" provided in the request for use with things like {@code LOOKUP}.
*/
private final Map<String, Map<String, Column>> tables = new TreeMap<>();

static EsqlQueryRequest syncEsqlQueryRequest() {
return new EsqlQueryRequest(false);
}
Expand Down Expand Up @@ -84,11 +94,14 @@ public ActionRequestValidationException validate() {
if (Strings.hasText(query) == false) {
validationException = addValidationError("[" + RequestXContent.QUERY_FIELD + "] is required", validationException);
}
if (onSnapshotBuild == false && pragmas.isEmpty() == false) {
validationException = addValidationError(
"[" + RequestXContent.PRAGMA_FIELD + "] only allowed in snapshot builds",
validationException
);
if (onSnapshotBuild == false) {
if (pragmas.isEmpty() == false) {
validationException = addValidationError(
"[" + RequestXContent.PRAGMA_FIELD + "] only allowed in snapshot builds",
validationException
);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should reject requests with tables in non-snapshot build for now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH! I totally had that in there at some point. I'll add it back.


}
return validationException;
}
Expand Down Expand Up @@ -207,6 +220,36 @@ public void keepOnCompletion(boolean keepOnCompletion) {
this.keepOnCompletion = keepOnCompletion;
}

/**
* Add a "table" to the request for use with things like {@code LOOKUP}.
*/
public void addTable(String name, Map<String, Column> columns) {
for (Column c : columns.values()) {
if (false == c.values().blockFactory().breaker() instanceof NoopCircuitBreaker) {
throw new AssertionError("block tracking not supported on tables parameter");
}
}
Iterator<Column> itr = columns.values().iterator();
if (itr.hasNext()) {
int firstSize = itr.next().values().getPositionCount();
while (itr.hasNext()) {
int size = itr.next().values().getPositionCount();
if (size != firstSize) {
throw new IllegalArgumentException("mismatched column lengths: was [" + size + "] but expected [" + firstSize + "]");
}
}
}
var prev = tables.put(name, columns);
if (prev != null) {
Releasables.close(prev.values());
throw new IllegalArgumentException("duplicate table for [" + name + "]");
}
}

public Map<String, Map<String, Column>> tables() {
return tables;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// Pass the query as the description
Expand Down
Loading