Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 48 additions & 37 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,49 +470,32 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,

case TYPE_DATE:
case TYPE_DATETIME: {
// this would happend just only when `enable_docvalue_scan = false`, and field has timestamp format date from _source
if (col.IsNumber()) {
if (!reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.GetInt64(), "+08:00")) {
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
// ES process date/datetime field would use millisecond timestamp for index or docvalue
// processing date type field, if a number is encountered, Doris On ES will force it to be processed according to ms
// Doris On ES needs to be consistent with ES, so just divided by 1000 because the unit for from_unixtime is seconds
RETURN_IF_ERROR(fill_date_slot_with_timestamp(slot, col, type));
} else if (col.IsArray() && pure_doc_value) {
// this would happend just only when `enable_docvalue_scan = true`
// ES add default format for all field after ES 6.4, if we not provided format for `date` field ES would impose
// a standard date-format for date field as `2020-06-16T00:00:00.000Z`
// At present, we just process this string format date. After some PR were merged into Doris, we would impose `epoch_mills` for
// date field's docvalue
if (col[0].IsString()) {
RETURN_IF_ERROR(fill_date_slot_with_strval(slot, col[0], type));
break;
}

if (type == TYPE_DATE) {
reinterpret_cast<DateTimeValue*>(slot)->cast_to_date();
} else {
reinterpret_cast<DateTimeValue*>(slot)->set_type(TIME_DATETIME);
}
break;
}
if (pure_doc_value && col.IsArray()) {
if (!reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col[0].GetInt64(), "+08:00")) {
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}

if (type == TYPE_DATE) {
reinterpret_cast<DateTimeValue*>(slot)->cast_to_date();
} else {
reinterpret_cast<DateTimeValue*>(slot)->set_type(TIME_DATETIME);
}
break;
}

RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);

DateTimeValue* ts_slot = reinterpret_cast<DateTimeValue*>(slot);
const std::string& val = col.GetString();
size_t val_size = col.GetStringLength();
if (!ts_slot->from_date_str(val.c_str(), val_size)) {
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}

if (type == TYPE_DATE) {
ts_slot->cast_to_date();
// ES would return millisecond timestamp for date field, divided by 1000 because the unit for from_unixtime is seconds
RETURN_IF_ERROR(fill_date_slot_with_timestamp(slot, col[0], type));
} else {
ts_slot->to_datetime();
// this would happend just only when `enable_docvalue_scan = false`, and field has string format date from _source
RETURN_ERROR_IF_COL_IS_ARRAY(col, type);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
RETURN_IF_ERROR(fill_date_slot_with_strval(slot, col, type));
}
break;
}

default: {
DCHECK(false);
break;
Expand All @@ -523,4 +506,32 @@ Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
*line_eof = false;
return Status::OK();
}

Status ScrollParser::fill_date_slot_with_strval(void* slot, const rapidjson::Value& col, PrimitiveType type) {
DateTimeValue* ts_slot = reinterpret_cast<DateTimeValue*>(slot);
const std::string& val = col.GetString();
size_t val_size = col.GetStringLength();
if (!ts_slot->from_date_str(val.c_str(), val_size)) {
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}
if (type == TYPE_DATE) {
ts_slot->cast_to_date();
} else {
ts_slot->to_datetime();
}
return Status::OK();
}

Status ScrollParser::fill_date_slot_with_timestamp(void* slot, const rapidjson::Value& col, PrimitiveType type) {
if (!reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.GetInt64() / 1000, "+08:00")) {
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}
if (type == TYPE_DATE) {
reinterpret_cast<DateTimeValue*>(slot)->cast_to_date();
} else {
reinterpret_cast<DateTimeValue*>(slot)->set_type(TIME_DATETIME);
}
return Status::OK();
}

}
8 changes: 8 additions & 0 deletions be/src/exec/es/es_scroll_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class ScrollParser {
int get_total();
int get_size();

private:
// helper method for processing date/datetime cols with rapidjson::Value
// type is used for distinguish date and datetime
// fill date slot with string format date
Status fill_date_slot_with_strval(void* slot, const rapidjson::Value& col, PrimitiveType type);
// fill date slot with timestamp
Status fill_date_slot_with_timestamp(void* slot, const rapidjson::Value& col, PrimitiveType type);

private:

std::string _scroll_id;
Expand Down