Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/es/es_scan_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ESScanReader {
static constexpr const char* KEY_QUERY = "query";
static constexpr const char* KEY_BATCH_SIZE = "batch_size";
static constexpr const char* KEY_TERMINATE_AFTER = "limit";
static constexpr const char* KEY_DOC_VALUES_MODE = "doc_values_mode";
ESScanReader(const std::string& target, const std::map<std::string, std::string>& props, bool doc_value_mode);
~ESScanReader();

Expand Down
20 changes: 13 additions & 7 deletions be/src/exec/es/es_scroll_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,20 @@ std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>
// note: add `query` for this value....
es_query_dsl.AddMember("query", query_node, allocator);
bool pure_docvalue = true;
// check docvalue sacan optimization
if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) {
pure_docvalue = false;

// Doris FE already has checked docvalue-scan optimization
if (properties.find(ESScanReader::KEY_DOC_VALUES_MODE) != properties.end()) {
pure_docvalue = atoi(properties.at(ESScanReader::KEY_DOC_VALUES_MODE).c_str());
} else {
for (auto& select_field : fields) {
if (docvalue_context.find(select_field) == docvalue_context.end()) {
pure_docvalue = false;
break;
// check docvalue scan optimization, used for compatibility
if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) {
pure_docvalue = false;
} else {
for (auto& select_field : fields) {
if (docvalue_context.find(select_field) == docvalue_context.end()) {
pure_docvalue = false;
break;
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -4154,6 +4154,7 @@ public static void getDdlStmt(Table table, List<String> createTableStmt, List<St
sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n");
sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\",\n");
sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n");
sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n");
sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n");
sb.append(")");
} else if (table.getType() == TableType.HIVE) {
Expand Down
52 changes: 48 additions & 4 deletions fe/src/main/java/org/apache/doris/catalog/EsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,51 @@ public class EsTable extends Table {
public static final String TYPE = "type";
public static final String TRANSPORT = "transport";
public static final String VERSION = "version";
public static final String DOC_VALUES_MODE = "doc_values_mode";

public static final String TRANSPORT_HTTP = "http";
public static final String TRANSPORT_THRIFT = "thrift";
public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
public static final String KEYWORD_SNIFF = "enable_keyword_sniff";
public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields";

private String hosts;
private String[] seeds;
private String userName = "";
private String passwd = "";
// index name can be specific index、wildcard matched or alias.
private String indexName;

// which type used for `indexName`, default to `_doc`
private String mappingType = "_doc";
private String transport = "http";
// only save the partition definition, save the partition key,
// partition list is got from es cluster dynamically and is saved in esTableState
private PartitionInfo partitionInfo;
private EsTablePartitions esTablePartitions;
private boolean enableDocValueScan = false;
private boolean enableKeywordSniff = true;

// Whether to enable docvalues scan optimization for fetching fields more fast, default to true
private boolean enableDocValueScan = true;
// Whether to enable sniffing keyword for filtering more reasonable, default to true
private boolean enableKeywordSniff = true;
// if the number of fields which value extracted from `doc_value` exceeding this max limitation
// would downgrade to extract value from `stored_fields`
private int maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;

// Solr doc_values vs stored_fields performance-smackdown indicate:
// It is possible to notice that retrieving an high number of fields leads
// to a sensible worsening of performance if DocValues are used.
// Instead, the (almost) surprising thing is that, by returning less than 20 fields,
// DocValues performs better than stored fields and the difference gets little as the number of fields returned increases.
// Asking for 9 DocValues fields and 1 stored field takes an average query time is 6.86 (more than returning 10 stored fields)
// Here we have a slightly conservative value of 20, but at the same time we also provide configurable parameters for expert-using
// @see `MAX_DOCVALUE_FIELDS`
private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20;

// version would be used to be compatible with different ES Cluster
public EsMajorVersion majorVersion = null;

// tableContext is used for being convenient to persist some configuration parameters uniformly
private Map<String, String> tableContext = new HashMap<>();

// record the latest and recently exception when sync ES table metadata (mapping, shard location)
Expand All @@ -104,6 +127,10 @@ public Map<String, String> docValueContext() {
return esMetaStateTracker.searchContext().docValueFieldsContext();
}

public int maxDocValueFields() {
return maxDocValueFields;
}

public boolean isDocValueScanEnable() {
return enableDocValueScan;
}
Expand Down Expand Up @@ -166,8 +193,6 @@ private void validate(Map<String, String> properties) throws DdlException {
+ properties.get(VERSION).trim() + " ,`enable_docvalue_scan`"
+ " shoud be like 'true' or 'false', value should be double quotation marks");
}
} else {
enableDocValueScan = false;
}

if (properties.containsKey(KEYWORD_SNIFF)) {
Expand All @@ -194,6 +219,17 @@ private void validate(Map<String, String> properties) throws DdlException {
+ " but value is " + transport);
}
}

if (properties.containsKey(MAX_DOCVALUE_FIELDS)) {
try {
maxDocValueFields = Integer.parseInt(properties.get(MAX_DOCVALUE_FIELDS).trim());
if (maxDocValueFields < 0) {
maxDocValueFields = 0;
}
} catch (Exception e) {
maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
}
}
tableContext.put("hosts", hosts);
tableContext.put("userName", userName);
tableContext.put("passwd", passwd);
Expand All @@ -205,6 +241,7 @@ private void validate(Map<String, String> properties) throws DdlException {
}
tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan));
tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff));
tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields));
}

public TTableDescriptor toThrift() {
Expand Down Expand Up @@ -294,6 +331,13 @@ public void readFields(DataInput in) throws IOException {
} else {
enableKeywordSniff = true;
}
if (tableContext.containsKey("maxDocValueFields")) {
try {
maxDocValueFields = Integer.parseInt(tableContext.get("maxDocValueFields"));
} catch (Exception e) {
maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
}
}

PartitionType partType = PartitionType.valueOf(Text.readString(in));
if (partType == PartitionType.UNPARTITIONED) {
Expand Down
54 changes: 42 additions & 12 deletions fe/src/main/java/org/apache/doris/planner/EsScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.planner;

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.EsTable;
Expand All @@ -40,16 +41,16 @@
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -59,7 +60,7 @@
import java.util.Set;

public class EsScanNode extends ScanNode {

private static final Logger LOG = LogManager.getLogger(EsScanNode.class);

private final Random random = new Random(System.currentTimeMillis());
Expand All @@ -80,10 +81,10 @@ public EsScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);

assignBackends();
}

@Override
public int getNumInstances() {
return shardScanRanges.size();
Expand All @@ -93,7 +94,7 @@ public int getNumInstances() {
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
return shardScanRanges;
}

@Override
public void finalize(Analyzer analyzer) throws UserException {
if (isFinalized) {
Expand All @@ -109,6 +110,34 @@ public void finalize(Analyzer analyzer) throws UserException {
isFinalized = true;
}

/**
* return whether can use the doc_values scan
* 0 and 1 are returned to facilitate Doris BE processing
*
* @param desc the fields needs to read from ES
* @param docValueContext the mapping for docvalues fields from origin field to doc_value fields
* @return
*/
private int useDocValueScan(TupleDescriptor desc, Map<String, String> docValueContext) {
ArrayList<SlotDescriptor> slotDescriptors = desc.getSlots();
List<String> selectedFields = new ArrayList<>(slotDescriptors.size());
for (SlotDescriptor slotDescriptor : slotDescriptors) {
selectedFields.add(slotDescriptor.getColumn().getName());
}
if (selectedFields.size() > table.maxDocValueFields()) {
return 0;
}
Set<String> docValueFields = docValueContext.keySet();
boolean useDocValue = true;
for (String selectedField : selectedFields) {
if (!docValueFields.contains(selectedField)) {
useDocValue = false;
break;
}
}
return useDocValue ? 1 : 0;
}

@Override
protected void toThrift(TPlanNode msg) {
if (EsTable.TRANSPORT_HTTP.equals(table.getTransport())) {
Expand All @@ -123,6 +152,7 @@ protected void toThrift(TPlanNode msg) {
esScanNode.setProperties(properties);
if (table.isDocValueScanEnable()) {
esScanNode.setDocvalue_context(table.docValueContext());
properties.put(EsTable.DOC_VALUES_MODE, String.valueOf(useDocValueScan(desc, table.docValueContext())));
}
if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) {
esScanNode.setFields_context(table.fieldsContext());
Expand Down Expand Up @@ -169,9 +199,9 @@ private List<TScanRangeLocations> getShardLocations() throws UserException {
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("partition prune finished, unpartitioned index [{}], "
+ "partitioned index [{}]",
String.join(",", unPartitionedIndices),
LOG.debug("partition prune finished, unpartitioned index [{}], "
+ "partitioned index [{}]",
String.join(",", unPartitionedIndices),
String.join(",", partitionedIndices));
}
int beIndex = random.nextInt(backendList.size());
Expand Down Expand Up @@ -241,7 +271,7 @@ private List<TScanRangeLocations> getShardLocations() throws UserException {
* if the index name is an alias or index pattern, then the es table is related
* with one or more indices some indices could be pruned by using partition info
* in index settings currently only support range partition setting
*
*
* @param partitionInfo
* @return
* @throws AnalysisException
Expand All @@ -254,7 +284,7 @@ private Collection<Long> partitionPrune(PartitionInfo partitionInfo) throws Anal
switch (partitionInfo.getType()) {
case RANGE: {
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
Map<Long, Range<PartitionKey>> keyRangeById = rangePartitionInfo.getIdToRange(false);
Map<Long, Range<PartitionKey>> keyRangeById = rangePartitionInfo.getIdToRange(false);
partitionPruner = new RangePartitionPruner(keyRangeById, rangePartitionInfo.getPartitionColumns(),
columnFilters);
return partitionPruner.prune();
Expand Down