Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions be/src/exec/es/es_scan_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ const std::string REQUEST_SEPARATOR = "/";

const std::string REQUEST_SEARCH_FILTER_PATH = "filter_path=hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields";

ESScanReader::ESScanReader(const std::string& target, const std::map<std::string, std::string>& props) : _scroll_keep_alive(config::es_scroll_keepalive), _http_timeout_ms(config::es_http_timeout_ms) {
ESScanReader::ESScanReader(const std::string& target, const std::map<std::string, std::string>& props, bool doc_value_mode) :
_scroll_keep_alive(config::es_scroll_keepalive),
_http_timeout_ms(config::es_http_timeout_ms),
_doc_value_mode(doc_value_mode) {
_target = target;
_index = props.at(KEY_INDEX);
_type = props.at(KEY_TYPE);
Expand Down Expand Up @@ -142,7 +145,7 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
}
}

scroll_parser.reset(new ScrollParser());
scroll_parser.reset(new ScrollParser(_doc_value_mode));
VLOG(1) << "get_next request ES, returned response: " << response;
Status status = scroll_parser->parse(response, _exactly_once);
if (!status.ok()){
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/es/es_scan_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ESScanReader {
static constexpr const char* KEY_QUERY = "query";
static constexpr const char* KEY_BATCH_SIZE = "batch_size";
static constexpr const char* KEY_TERMINATE_AFTER = "limit";
ESScanReader(const std::string& target, const std::map<std::string, std::string>& props);
ESScanReader(const std::string& target, const std::map<std::string, std::string>& props, bool doc_value_mode);
~ESScanReader();

// launch the first scroll request, this method will cache the first scroll response, and return the this cached response when invoke get_next
Expand Down Expand Up @@ -94,6 +94,8 @@ class ESScanReader {
int _http_timeout_ms;

bool _exactly_once;

bool _doc_value_mode;
};
}

40 changes: 38 additions & 2 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,12 @@ static Status get_float_value(const rapidjson::Value &col, PrimitiveType type, v
return Status::OK();
}

ScrollParser::ScrollParser() :
ScrollParser::ScrollParser(bool doc_value_mode) :
_scroll_id(""),
_total(0),
_size(0),
_line_index(0) {
_line_index(0),
_doc_value_mode(doc_value_mode) {
}

ScrollParser::~ScrollParser() {
Expand Down Expand Up @@ -247,6 +248,16 @@ Status ScrollParser::parse(const std::string& scroll_result, bool exactly_once)
}

VLOG(1) << "es_scan_reader parse scroll result: " << scroll_result;
if (!outer_hits_node.HasMember(FIELD_INNER_HITS)) {
// this is caused by query some columns which are not exit, e.g.
// A Index has fields: k1,k2,k3. and we put some rows into this Index (some fields dose NOT contain any data)
// e.g.
// put index/_doc/1 {"k2":"123"}
// put index/_doc/2 {"k3":"123}
// then we use sql `select k1 from table`
// what ES return is like this: {hits: {total:2}
return Status::OK();
}
const rapidjson::Value &inner_hits_node = outer_hits_node[FIELD_INNER_HITS];
if (!inner_hits_node.IsArray()) {
LOG(WARNING) << "exception maybe happend on es cluster, reponse:" << scroll_result;
Expand Down Expand Up @@ -275,7 +286,32 @@ int ScrollParser::get_total() {
Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
Tuple* tuple, MemPool* tuple_pool, bool* line_eof, const std::map<std::string, std::string>& docvalue_context) {
*line_eof = true;

if (_size <= 0 || _line_index >= _size) {
// _source is fetched from ES
if (!_doc_value_mode) {
return Status::OK();
}

// _fields(doc_value) is fetched from ES
if (_total <= 0 || _line_index >= _total) {
return Status::OK();
}


// here is operations for `enable_doc_value_scan`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// here is operations for `enable_doc_value_scan`.
// here is operations for `use_doc_value`.

// This indicates that the fields does not exist(e.g. never assign values to these fields), but other fields have values.
// so, number of rows is >= 0, we need fill `NULL` to these fields that does not exist.
_line_index++;
tuple->init(tuple_desc->byte_size());
for (int i = 0; i < tuple_desc->slots().size(); ++i) {
const SlotDescriptor* slot_desc = tuple_desc->slots()[i];
if (slot_desc->is_materialized()) {
tuple->set_null(slot_desc->null_indicator_offset());
}
}

*line_eof = false;
return Status::OK();
}

Expand Down
8 changes: 7 additions & 1 deletion be/src/exec/es/es_scroll_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Status;
class ScrollParser {

public:
ScrollParser();
ScrollParser(bool doc_value_mode);
~ScrollParser();

Status parse(const std::string& scroll_result, bool exactly_once = false);
Expand All @@ -50,5 +50,11 @@ class ScrollParser {

rapidjson::Document _document_node;
rapidjson::Value _inner_hits_node;

// todo(milimin): ScrollParser should be divided into two classes: SourceParser and DocValueParser,
// including remove some variables in the current implementation, e.g. pure_doc_value.
// All above will be done in the DOE refactoring projects.
// Current bug fixes minimize the scope of changes to avoid introducing other new bugs.
bool _doc_value_mode;
};
}
6 changes: 5 additions & 1 deletion be/src/exec/es/es_scroll_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ std::string ESScrollQueryBuilder::build_clear_scroll_body(const std::string& scr

std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>& properties,
const std::vector<std::string>& fields,
std::vector<EsPredicate*>& predicates, const std::map<std::string, std::string>& docvalue_context) {
std::vector<EsPredicate*>& predicates, const std::map<std::string, std::string>& docvalue_context,
bool* doc_value_mode) {
rapidjson::Document es_query_dsl;
rapidjson::Document::AllocatorType &allocator = es_query_dsl.GetAllocator();
es_query_dsl.SetObject();
Expand All @@ -86,6 +87,9 @@ std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>
}
}
}

*doc_value_mode = pure_docvalue;

rapidjson::Value source_node(rapidjson::kArrayType);
if (pure_docvalue) {
for (auto& select_field : fields) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/es/es_scroll_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ESScrollQueryBuilder {
// @note: predicates should processed before pass it to this method,
// tie breaker for predicate wheather can push down es can reference the push-down filters
static std::string build(const std::map<std::string, std::string>& properties,
const std::vector<std::string>& fields, std::vector<EsPredicate*>& predicates, const std::map<std::string, std::string>& docvalue_context);
const std::vector<std::string>& fields, std::vector<EsPredicate*>& predicates, const std::map<std::string, std::string>& docvalue_context,
bool* doc_value_mode);
};
}
6 changes: 4 additions & 2 deletions be/src/exec/es_http_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,14 +443,16 @@ void EsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Stat
if (limit() != -1 && limit() <= _runtime_state->batch_size()) {
properties[ESScanReader::KEY_TERMINATE_AFTER] = std::to_string(limit());
}

bool doc_value_mode = false;
properties[ESScanReader::KEY_QUERY]
= ESScrollQueryBuilder::build(properties, _column_names, _predicates, _docvalue_context);
= ESScrollQueryBuilder::build(properties, _column_names, _predicates, _docvalue_context, &doc_value_mode);


// start scanner to scan
std::unique_ptr<EsHttpScanner> scanner(new EsHttpScanner(
_runtime_state, runtime_profile(), _tuple_id,
properties, scanner_expr_ctxs, &counter));
properties, scanner_expr_ctxs, &counter, doc_value_mode));
status = scanner_scan(std::move(scanner), scanner_expr_ctxs, &counter);
if (!status.ok()) {
LOG(WARNING) << "Scanner[" << start_idx << "] process failed. status="
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/es_http_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ EsHttpScanner::EsHttpScanner(
TupleId tuple_id,
const std::map<std::string, std::string>& properties,
const std::vector<ExprContext*>& conjunct_ctxs,
EsScanCounter* counter) :
EsScanCounter* counter,
bool doc_value_mode) :
_state(state),
_profile(profile),
_tuple_id(tuple_id),
Expand All @@ -56,6 +57,7 @@ EsHttpScanner::EsHttpScanner(
_counter(counter),
_es_reader(nullptr),
_es_scroll_parser(nullptr),
_doc_value_mode(doc_value_mode),
_rows_read_counter(nullptr),
_read_timer(nullptr),
_materialize_timer(nullptr) {
Expand All @@ -74,7 +76,7 @@ Status EsHttpScanner::open() {
}

const std::string& host = _properties.at(ESScanReader::KEY_HOST_PORT);
_es_reader.reset(new ESScanReader(host, _properties));
_es_reader.reset(new ESScanReader(host, _properties, _doc_value_mode));
if (_es_reader == nullptr) {
return Status::InternalError("Es reader construct failed.");
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/exec/es_http_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class EsHttpScanner {
TupleId tuple_id,
const std::map<std::string, std::string>& properties,
const std::vector<ExprContext*>& conjunct_ctxs,
EsScanCounter* counter);
EsScanCounter* counter,
bool doc_value_mode);
~EsHttpScanner();

Status open();
Expand Down Expand Up @@ -94,6 +95,8 @@ class EsHttpScanner {
std::unique_ptr<ESScanReader> _es_reader;
std::unique_ptr<ScrollParser> _es_scroll_parser;

bool _doc_value_mode;

// Profile
RuntimeProfile::Counter* _rows_read_counter;
RuntimeProfile::Counter* _read_timer;
Expand Down
5 changes: 3 additions & 2 deletions be/test/exec/es_scan_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ TEST_F(MockESServerTest, workflow) {
props[ESScanReader::KEY_BATCH_SIZE] = "1";
std::vector<EsPredicate*> predicates;
std::map<std::string, std::string> docvalue_context;
props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props, fields, predicates, docvalue_context);
ESScanReader reader(target, props);
bool doc_value_mode = false;
props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props, fields, predicates, docvalue_context, &doc_value_mode);
ESScanReader reader(target, props, doc_value_mode);
auto st = reader.open();
ASSERT_TRUE(st.ok());
bool eos = false;
Expand Down