Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 60 additions & 32 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {

explicit InvertedIndexColumnWriterImpl(const std::string& field_name,
InvertedIndexFileWriter* index_file_writer,
const TabletIndex* index_meta)
: _index_meta(index_meta), _index_file_writer(index_file_writer) {
const TabletIndex* index_meta,
const bool single_field = true)
: _single_field(single_field),
_index_meta(index_meta),
_index_file_writer(index_file_writer) {
_parser_type = get_inverted_index_parser_type_from_string(
get_parser_string_from_properties(_index_meta->properties()));
_value_key_coder = get_key_coder(field_type);
Expand Down Expand Up @@ -237,9 +240,14 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
RETURN_IF_ERROR(create_char_string_reader(_char_string_reader));
RETURN_IF_ERROR(create_analyzer(_analyzer));
RETURN_IF_ERROR(create_index_writer(_index_writer));
RETURN_IF_ERROR(create_field(&_field));
_doc = std::make_unique<lucene::document::Document>();
_doc->add(*_field);
if (_single_field) {
RETURN_IF_ERROR(create_field(&_field));
_doc->add(*_field);
} else {
// array's inverted index do need create field first
_doc->setNeedResetFieldData(true);
}
return Status::OK();
}

Expand Down Expand Up @@ -282,11 +290,9 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
return Status::OK();
}

void new_fulltext_field(const char* field_value_data, size_t field_value_size) {
if (_parser_type == InvertedIndexParserType::PARSER_ENGLISH ||
_parser_type == InvertedIndexParserType::PARSER_CHINESE ||
_parser_type == InvertedIndexParserType::PARSER_UNICODE ||
_parser_type == InvertedIndexParserType::PARSER_STANDARD) {
void new_inverted_index_field(const char* field_value_data, size_t field_value_size) {
if (_parser_type != InvertedIndexParserType::PARSER_UNKNOWN &&
_parser_type != InvertedIndexParserType::PARSER_NONE) {
new_char_token_stream(field_value_data, field_value_size, _field);
} else {
new_field_char_value(field_value_data, field_value_size, _field);
Expand Down Expand Up @@ -328,7 +334,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
(_parser_type != InvertedIndexParserType::PARSER_NONE && v->empty())) {
RETURN_IF_ERROR(add_null_document());
} else {
new_fulltext_field(v->get_data(), v->get_size());
new_inverted_index_field(v->get_data(), v->get_size());
RETURN_IF_ERROR(add_document());
}
++v;
Expand All @@ -348,39 +354,58 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
}
const auto* offsets = reinterpret_cast<const uint64_t*>(offsets_ptr);
if constexpr (field_is_slice_type(field_type)) {
if (_field == nullptr || _index_writer == nullptr) {
LOG(ERROR) << "field or index writer is null in inverted index writer.";
return Status::InternalError(
"field or index writer is null in inverted index writer");
if (_index_writer == nullptr) {
LOG(ERROR) << "index writer is null in inverted index writer.";
return Status::InternalError("index writer is null in inverted index writer");
}
auto ignore_above_value =
get_parser_ignore_above_value_from_properties(_index_meta->properties());
auto ignore_above = std::stoi(ignore_above_value);
for (int i = 0; i < count; ++i) {
// offsets[i+1] is now row element count
std::vector<std::string> strings;
// [0, 3, 6]
// [10,20,30] [20,30,40], [30,40,50]
auto start_off = offsets[i];
auto end_off = offsets[i + 1];
// TODO(Amory).later we use object pool to avoid field creation
lucene::document::Field* new_field = nullptr;
CL_NS(analysis)::TokenStream* ts = nullptr;
for (auto j = start_off; j < end_off; ++j) {
if (null_map[j] == 1) {
continue;
}
// now we temp create field . later make a pool
if (Status st = create_field(&new_field); st != Status::OK()) {
LOG(ERROR)
<< "create field " << string(_field_name.begin(), _field_name.end())
<< " error:" << st;
return st;
}
auto* v = (Slice*)((const uint8_t*)value_ptr + j * field_size);
strings.emplace_back(v->get_data(), v->get_size());
}

auto value = join(strings, " ");
// only ignore_above UNTOKENIZED strings and empty strings not tokenized
if ((_parser_type == InvertedIndexParserType::PARSER_NONE &&
value.length() > ignore_above) ||
(_parser_type != InvertedIndexParserType::PARSER_NONE && value.empty())) {
RETURN_IF_ERROR(add_null_document());
} else {
new_fulltext_field(value.c_str(), value.length());
RETURN_IF_ERROR(add_document());
if ((_parser_type == InvertedIndexParserType::PARSER_NONE &&
v->get_size() > ignore_above) ||
(_parser_type != InvertedIndexParserType::PARSER_NONE && v->empty())) {
// is here a null value?
// TODO. Maybe here has performance problem for large size string.
continue;
} else {
if (_parser_type != InvertedIndexParserType::PARSER_UNKNOWN &&
_parser_type != InvertedIndexParserType::PARSER_NONE) {
// in this case stream need to delete after add_document, because the
// stream can not reuse for different field
_char_string_reader->init(v->get_data(), v->get_size(), false);
ts = _analyzer->tokenStream(new_field->name(),
_char_string_reader.get());
new_field->setValue(ts);
} else {
new_field_char_value(v->get_data(), v->get_size(), new_field);
}
_doc->add(*new_field);
}
}
RETURN_IF_ERROR(add_document());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if already add_null_document, then add_document() will cause unexpected problem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so maybe we should not make this if branch?

_doc->clear();
_CLDELETE(ts);
_rid++;
}
} else if constexpr (field_is_numeric_type(field_type)) {
Expand Down Expand Up @@ -426,7 +451,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
item_data_ptr = (uint8_t*)item_data_ptr + field_size;
}
auto value = join(strings, " ");
new_fulltext_field(value.c_str(), value.length());
new_inverted_index_field(value.c_str(), value.length());
_rid++;
RETURN_IF_ERROR(add_document());
values++;
Expand Down Expand Up @@ -579,6 +604,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {

std::unique_ptr<lucene::document::Document> _doc = nullptr;
lucene::document::Field* _field = nullptr;
bool _single_field = true;
std::unique_ptr<lucene::index::IndexWriter> _index_writer = nullptr;
std::unique_ptr<lucene::analysis::Analyzer> _analyzer = nullptr;
std::unique_ptr<lucene::util::Reader> _char_string_reader = nullptr;
Expand All @@ -598,22 +624,24 @@ Status InvertedIndexColumnWriter::create(const Field* field,
const auto* typeinfo = field->type_info();
FieldType type = typeinfo->type();
std::string field_name = field->name();
bool single_field = true;
if (type == FieldType::OLAP_FIELD_TYPE_ARRAY) {
const auto* array_typeinfo = dynamic_cast<const ArrayTypeInfo*>(typeinfo);
if (array_typeinfo != nullptr) {
typeinfo = array_typeinfo->item_type_info();
type = typeinfo->type();
single_field = false;
} else {
return Status::NotSupported("unsupported array type for inverted index: " +
std::to_string(int(type)));
}
}

switch (type) {
#define M(TYPE) \
case TYPE: \
*res = std::make_unique<InvertedIndexColumnWriterImpl<TYPE>>( \
field_name, index_file_writer, index_meta); \
#define M(TYPE) \
case TYPE: \
*res = std::make_unique<InvertedIndexColumnWriterImpl<TYPE>>( \
field_name, index_file_writer, index_meta, single_field); \
break;
M(FieldType::OLAP_FIELD_TYPE_TINYINT)
M(FieldType::OLAP_FIELD_TYPE_SMALLINT)
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ class ExecEnv {
MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); }
WalManager* wal_mgr() { return _wal_manager.get(); }
#ifdef BE_TEST
void set_tmp_file_dir(std::unique_ptr<segment_v2::TmpFileDirs> tmp_file_dirs) {
this->_tmp_file_dirs = std::move(tmp_file_dirs);
}
void set_ready() { this->_s_ready = true; }
void set_not_ready() { this->_s_ready = false; }
void set_memtable_memory_limiter(MemTableMemoryLimiter* limiter) {
Expand Down
Loading