forked from elastic/elasticsearch
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ES|QL categorize v1 (elastic#112860)
* Prepare TokenListCategorizer for usage in ES|QL * Expose text categorization from ML module * Let esql plugin depend on ml plugin * Fix/suppress this-escape * Incomplete v1 of ES|QL Categorize * Refactor / remove CategorizeInternal
- Loading branch information
1 parent
e26fe9f
commit 139e620
Showing
18 changed files
with
423 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
13 changes: 13 additions & 0 deletions
13
x-pack/plugin/esql/qa/testFixtures/src/main/resources/categorize.csv-spec
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
categorize | ||
required_capability: categorize | ||
|
||
FROM sample_data | ||
| STATS count=COUNT(), values=VALUES(message) BY category=CATEGORIZE(message) | ||
| SORT count DESC, category ASC | ||
; | ||
|
||
count:long | values:keyword | category:integer | ||
3 | [Connected to 10.1.0.1, Connected to 10.1.0.2, Connected to 10.1.0.3] | 0 | ||
3 | [Connection error] | 1 | ||
1 | [Disconnected] | 2 | ||
; |
131 changes: 131 additions & 0 deletions
131
...erated/org/elasticsearch/xpack/esql/expression/function/grouping/CategorizeEvaluator.java
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
159 changes: 159 additions & 0 deletions
159
...l/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Categorize.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.esql.expression.function.grouping; | ||
|
||
import org.apache.lucene.analysis.TokenStream; | ||
import org.apache.lucene.analysis.core.WhitespaceTokenizer; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.util.BytesRefHash; | ||
import org.elasticsearch.compute.ann.Evaluator; | ||
import org.elasticsearch.compute.ann.Fixed; | ||
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; | ||
import org.elasticsearch.index.analysis.CharFilterFactory; | ||
import org.elasticsearch.index.analysis.CustomAnalyzer; | ||
import org.elasticsearch.index.analysis.TokenFilterFactory; | ||
import org.elasticsearch.index.analysis.TokenizerFactory; | ||
import org.elasticsearch.xpack.esql.capabilities.Validatable; | ||
import org.elasticsearch.xpack.esql.core.expression.Expression; | ||
import org.elasticsearch.xpack.esql.core.tree.NodeInfo; | ||
import org.elasticsearch.xpack.esql.core.tree.Source; | ||
import org.elasticsearch.xpack.esql.core.type.DataType; | ||
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; | ||
import org.elasticsearch.xpack.esql.expression.function.Param; | ||
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; | ||
import org.elasticsearch.xpack.ml.aggs.categorization.CategorizationBytesRefHash; | ||
import org.elasticsearch.xpack.ml.aggs.categorization.CategorizationPartOfSpeechDictionary; | ||
import org.elasticsearch.xpack.ml.aggs.categorization.TokenListCategorizer; | ||
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.function.Function; | ||
|
||
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT; | ||
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString; | ||
|
||
/** | ||
* Categorizes text messages. | ||
* | ||
* This implementation is incomplete and comes with the following caveats: | ||
* - it only works correctly on a single node. | ||
* - when running on multiple nodes, category IDs of the different nodes are | ||
* aggregated, even though the same ID can correspond to a totally different | ||
* category | ||
* - the output consists of category IDs, which should be replaced by category | ||
* regexes or keys | ||
* | ||
* TODO(jan, nik): fix this | ||
*/ | ||
public class Categorize extends GroupingFunction implements Validatable { | ||
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( | ||
Expression.class, | ||
"Categorize", | ||
Categorize::new | ||
); | ||
|
||
private final Expression field; | ||
|
||
@FunctionInfo(returnType = { "integer" }, description = "Categorizes text messages") | ||
public Categorize( | ||
Source source, | ||
@Param(name = "field", type = { "text", "keyword" }, description = "Expression to categorize") Expression field | ||
) { | ||
super(source, List.of(field)); | ||
this.field = field; | ||
} | ||
|
||
private Categorize(StreamInput in) throws IOException { | ||
this(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class)); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
source().writeTo(out); | ||
out.writeNamedWriteable(field); | ||
} | ||
|
||
@Override | ||
public String getWriteableName() { | ||
return ENTRY.name; | ||
} | ||
|
||
@Override | ||
public boolean foldable() { | ||
return field.foldable(); | ||
} | ||
|
||
@Evaluator | ||
static int process( | ||
BytesRef v, | ||
@Fixed(includeInToString = false, build = true) CategorizationAnalyzer analyzer, | ||
@Fixed(includeInToString = false, build = true) TokenListCategorizer.CloseableTokenListCategorizer categorizer | ||
) { | ||
String s = v.utf8ToString(); | ||
try (TokenStream ts = analyzer.tokenStream("text", s)) { | ||
return categorizer.computeCategory(ts, s.length(), 1).getId(); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public ExpressionEvaluator.Factory toEvaluator(Function<Expression, ExpressionEvaluator.Factory> toEvaluator) { | ||
return new CategorizeEvaluator.Factory( | ||
source(), | ||
toEvaluator.apply(field), | ||
context -> new CategorizationAnalyzer( | ||
// TODO(jan): get the correct analyzer in here, see CategorizationAnalyzerConfig::buildStandardCategorizationAnalyzer | ||
new CustomAnalyzer( | ||
TokenizerFactory.newFactory("whitespace", WhitespaceTokenizer::new), | ||
new CharFilterFactory[0], | ||
new TokenFilterFactory[0] | ||
), | ||
true | ||
), | ||
context -> new TokenListCategorizer.CloseableTokenListCategorizer( | ||
new CategorizationBytesRefHash(new BytesRefHash(2048, context.bigArrays())), | ||
CategorizationPartOfSpeechDictionary.getInstance(), | ||
0.70f | ||
) | ||
); | ||
} | ||
|
||
@Override | ||
protected TypeResolution resolveType() { | ||
return isString(field(), sourceText(), DEFAULT); | ||
} | ||
|
||
@Override | ||
public DataType dataType() { | ||
return DataType.INTEGER; | ||
} | ||
|
||
@Override | ||
public Expression replaceChildren(List<Expression> newChildren) { | ||
return new Categorize(source(), newChildren.get(0)); | ||
} | ||
|
||
@Override | ||
protected NodeInfo<? extends Expression> info() { | ||
return NodeInfo.create(this, Categorize::new, field); | ||
} | ||
|
||
public Expression field() { | ||
return field; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "Categorize{field=" + field + "}"; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.