Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions be/src/exec/es/es_scan_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
#include "exec/es/es_scroll_query.h"

namespace doris {
const std::string REUQEST_SCROLL_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields";

const std::string SOURCE_SCROLL_SEARCH_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id";
const std::string DOCVALUE_SCROLL_SEARCH_FILTER_PATH = "filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields";

const std::string REQUEST_SCROLL_PATH = "_scroll";
const std::string REQUEST_PREFERENCE_PREFIX = "&preference=_shards:";
const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll";
const std::string REQUEST_SEPARATOR = "/";

const std::string REQUEST_SEARCH_FILTER_PATH = "filter_path=hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields";

ESScanReader::ESScanReader(const std::string& target, const std::map<std::string, std::string>& props, bool doc_value_mode) :
_scroll_keep_alive(config::es_scroll_keepalive),
_http_timeout_ms(config::es_http_timeout_ms),
Expand All @@ -57,6 +58,7 @@ ESScanReader::ESScanReader(const std::string& target, const std::map<std::string

std::string batch_size_str = props.at(KEY_BATCH_SIZE);
_batch_size = atoi(batch_size_str.c_str());
std::string filter_path = _doc_value_mode ? DOCVALUE_SCROLL_SEARCH_FILTER_PATH : SOURCE_SCROLL_SEARCH_FILTER_PATH;

if (props.find(KEY_TERMINATE_AFTER) != props.end()) {
_exactly_once = true;
Expand All @@ -65,7 +67,7 @@ ESScanReader::ESScanReader(const std::string& target, const std::map<std::string
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type << "/_search?"
<< "terminate_after=" << props.at(KEY_TERMINATE_AFTER)
<< REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << REUQEST_SCROLL_FILTER_PATH;
<< "&" << filter_path;
_search_url = scratch.str();
} else {
_exactly_once = false;
Expand All @@ -75,13 +77,12 @@ ESScanReader::ESScanReader(const std::string& target, const std::map<std::string
scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type << "/_search?"
<< "scroll=" << _scroll_keep_alive
<< REQUEST_PREFERENCE_PREFIX << _shards
<< "&" << REUQEST_SCROLL_FILTER_PATH
<< "&" << filter_path
<< "&terminate_after=" << batch_size_str;
_init_scroll_url = scratch.str();
_next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH;
_next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + filter_path;
}
_eos = false;

}

ESScanReader::~ESScanReader() {
Expand Down
41 changes: 4 additions & 37 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ Status ScrollParser::parse(const std::string& scroll_result, bool exactly_once)
const rapidjson::Value &outer_hits_node = _document_node[FIELD_HITS];
const rapidjson::Value &field_total = outer_hits_node[FIELD_TOTAL];
// after es 7.x "total": { "value": 1, "relation": "eq" }
// it is not necessary to parse `total`, this logic would be removed the another pr.
if (field_total.IsObject()) {
const rapidjson::Value &field_relation_value = field_total["relation"];
std::string relation = field_relation_value.GetString();
Expand All @@ -242,26 +243,16 @@ Status ScrollParser::parse(const std::string& scroll_result, bool exactly_once)
} else {
_total = field_total.GetInt();
}

// just used for the first scroll, maybe we should remove this logic from the `get_next`
if (_total == 0) {
return Status::OK();
}

VLOG(1) << "es_scan_reader parse scroll result: " << scroll_result;
if (!outer_hits_node.HasMember(FIELD_INNER_HITS)) {
// this is caused by query some columns which are not exit, e.g.
// A Index has fields: k1,k2,k3. and we put some rows into this Index (some fields dose NOT contain any data)
// e.g.
// put index/_doc/1 {"k2":"123"}
// put index/_doc/2 {"k3":"123}
// then we use sql `select k1 from table`
// what ES return is like this: {hits: {total:2}
return Status::OK();
}
const rapidjson::Value &inner_hits_node = outer_hits_node[FIELD_INNER_HITS];
// this happened just the end of scrolling
if (!inner_hits_node.IsArray()) {
LOG(WARNING) << "exception maybe happend on es cluster, reponse:" << scroll_result;
return Status::InternalError("inner hits node is not an array");
return Status::OK();
}

rapidjson::Document::AllocatorType& a = _document_node.GetAllocator();
Expand All @@ -288,30 +279,6 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
*line_eof = true;

if (_size <= 0 || _line_index >= _size) {
// _source is fetched from ES
if (!_doc_value_mode) {
return Status::OK();
}

// _fields(doc_value) is fetched from ES
if (_total <= 0 || _line_index >= _total) {
return Status::OK();
}


// here is operations for `enable_doc_value_scan`.
// This indicates that the fields does not exist(e.g. never assign values to these fields), but other fields have values.
// so, number of rows is >= 0, we need fill `NULL` to these fields that does not exist.
_line_index++;
tuple->init(tuple_desc->byte_size());
for (int i = 0; i < tuple_desc->slots().size(); ++i) {
const SlotDescriptor* slot_desc = tuple_desc->slots()[i];
if (slot_desc->is_materialized()) {
tuple->set_null(slot_desc->null_indicator_offset());
}
}

*line_eof = false;
return Status::OK();
}

Expand Down