diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h index 3bd64cf88aca71..52b936caf9e93d 100644 --- a/be/src/exec/es/es_scan_reader.h +++ b/be/src/exec/es/es_scan_reader.h @@ -40,6 +40,7 @@ class ESScanReader { static constexpr const char* KEY_QUERY = "query"; static constexpr const char* KEY_BATCH_SIZE = "batch_size"; static constexpr const char* KEY_TERMINATE_AFTER = "limit"; + static constexpr const char* KEY_DOC_VALUES_MODE = "doc_values_mode"; ESScanReader(const std::string& target, const std::map& props, bool doc_value_mode); ~ESScanReader(); diff --git a/be/src/exec/es/es_scroll_query.cpp b/be/src/exec/es/es_scroll_query.cpp index 0c4f581e513486..90d68f05f7d1be 100644 --- a/be/src/exec/es/es_scroll_query.cpp +++ b/be/src/exec/es/es_scroll_query.cpp @@ -76,14 +76,20 @@ std::string ESScrollQueryBuilder::build(const std::map // note: add `query` for this value.... es_query_dsl.AddMember("query", query_node, allocator); bool pure_docvalue = true; - // check docvalue sacan optimization - if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) { - pure_docvalue = false; + + // Doris FE already has checked docvalue-scan optimization + if (properties.find(ESScanReader::KEY_DOC_VALUES_MODE) != properties.end()) { + pure_docvalue = atoi(properties.at(ESScanReader::KEY_DOC_VALUES_MODE).c_str()); } else { - for (auto& select_field : fields) { - if (docvalue_context.find(select_field) == docvalue_context.end()) { - pure_docvalue = false; - break; + // check docvalue scan optimization, used for compatibility + if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) { + pure_docvalue = false; + } else { + for (auto& select_field : fields) { + if (docvalue_context.find(select_field) == docvalue_context.end()) { + pure_docvalue = false; + break; + } } } } diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 0c0251bcdde292..b9a06a261daed7 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4154,6 +4154,7 @@ public static void getDdlStmt(Table table, List createTableStmt, List tableContext = new HashMap<>(); // record the latest and recently exception when sync ES table metadata (mapping, shard location) @@ -104,6 +127,10 @@ public Map docValueContext() { return esMetaStateTracker.searchContext().docValueFieldsContext(); } + public int maxDocValueFields() { + return maxDocValueFields; + } + public boolean isDocValueScanEnable() { return enableDocValueScan; } @@ -166,8 +193,6 @@ private void validate(Map properties) throws DdlException { + properties.get(VERSION).trim() + " ,`enable_docvalue_scan`" + " shoud be like 'true' or 'false', value should be double quotation marks"); } - } else { - enableDocValueScan = false; } if (properties.containsKey(KEYWORD_SNIFF)) { @@ -194,6 +219,17 @@ private void validate(Map properties) throws DdlException { + " but value is " + transport); } } + + if (properties.containsKey(MAX_DOCVALUE_FIELDS)) { + try { + maxDocValueFields = Integer.parseInt(properties.get(MAX_DOCVALUE_FIELDS).trim()); + if (maxDocValueFields < 0) { + maxDocValueFields = 0; + } + } catch (Exception e) { + maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS; + } + } tableContext.put("hosts", hosts); tableContext.put("userName", userName); tableContext.put("passwd", passwd); @@ -205,6 +241,7 @@ private void validate(Map properties) throws DdlException { } tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan)); tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff)); + tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields)); } public TTableDescriptor toThrift() { @@ -294,6 +331,13 @@ public void readFields(DataInput in) throws IOException { } else { enableKeywordSniff = true; } + if (tableContext.containsKey("maxDocValueFields")) { + try { + maxDocValueFields = Integer.parseInt(tableContext.get("maxDocValueFields")); + } catch (Exception e) { + maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS; + } + } PartitionType partType = PartitionType.valueOf(Text.readString(in)); if (partType == PartitionType.UNPARTITIONED) { diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java index 03cfc7076a9740..36984bffe1ad82 100644 --- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -18,6 +18,7 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.EsTable; @@ -40,9 +41,6 @@ import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -50,6 +48,9 @@ import com.google.common.collect.Range; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -59,7 +60,7 @@ import java.util.Set; public class EsScanNode extends ScanNode { - + private static final Logger LOG = LogManager.getLogger(EsScanNode.class); private final Random random = new Random(System.currentTimeMillis()); @@ -80,10 +81,10 @@ public EsScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { @Override public void init(Analyzer analyzer) throws UserException { super.init(analyzer); - + assignBackends(); } - + @Override public int getNumInstances() { return shardScanRanges.size(); @@ -93,7 +94,7 @@ public int getNumInstances() { public List getScanRangeLocations(long maxScanRangeLength) { return shardScanRanges; } - + @Override public void finalize(Analyzer analyzer) throws UserException { if (isFinalized) { @@ -109,6 +110,34 @@ public void finalize(Analyzer analyzer) throws UserException { isFinalized = true; } + /** + * return whether can use the doc_values scan + * 0 and 1 are returned to facilitate Doris BE processing + * + * @param desc the fields needs to read from ES + * @param docValueContext the mapping for docvalues fields from origin field to doc_value fields + * @return + */ + private int useDocValueScan(TupleDescriptor desc, Map docValueContext) { + ArrayList slotDescriptors = desc.getSlots(); + List selectedFields = new ArrayList<>(slotDescriptors.size()); + for (SlotDescriptor slotDescriptor : slotDescriptors) { + selectedFields.add(slotDescriptor.getColumn().getName()); + } + if (selectedFields.size() > table.maxDocValueFields()) { + return 0; + } + Set docValueFields = docValueContext.keySet(); + boolean useDocValue = true; + for (String selectedField : selectedFields) { + if (!docValueFields.contains(selectedField)) { + useDocValue = false; + break; + } + } + return useDocValue ? 1 : 0; + } + @Override protected void toThrift(TPlanNode msg) { if (EsTable.TRANSPORT_HTTP.equals(table.getTransport())) { @@ -123,6 +152,7 @@ protected void toThrift(TPlanNode msg) { esScanNode.setProperties(properties); if (table.isDocValueScanEnable()) { esScanNode.setDocvalue_context(table.docValueContext()); + properties.put(EsTable.DOC_VALUES_MODE, String.valueOf(useDocValueScan(desc, table.docValueContext()))); } if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) { esScanNode.setFields_context(table.fieldsContext()); @@ -169,9 +199,9 @@ private List getShardLocations() throws UserException { } } if (LOG.isDebugEnabled()) { - LOG.debug("partition prune finished, unpartitioned index [{}], " - + "partitioned index [{}]", - String.join(",", unPartitionedIndices), + LOG.debug("partition prune finished, unpartitioned index [{}], " + + "partitioned index [{}]", + String.join(",", unPartitionedIndices), String.join(",", partitionedIndices)); } int beIndex = random.nextInt(backendList.size()); @@ -241,7 +271,7 @@ private List getShardLocations() throws UserException { * if the index name is an alias or index pattern, then the es table is related * with one or more indices some indices could be pruned by using partition info * in index settings currently only support range partition setting - * + * * @param partitionInfo * @return * @throws AnalysisException @@ -254,7 +284,7 @@ private Collection partitionPrune(PartitionInfo partitionInfo) throws Anal switch (partitionInfo.getType()) { case RANGE: { RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo; - Map> keyRangeById = rangePartitionInfo.getIdToRange(false); + Map> keyRangeById = rangePartitionInfo.getIdToRange(false); partitionPruner = new RangePartitionPruner(keyRangeById, rangePartitionInfo.getPartitionColumns(), columnFilters); return partitionPruner.prune();