Skip to content

Commit

Permalink
Merge branch 'apache:master' into refactor-memtable-flush
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Jul 6, 2023
2 parents 949f180 + 013bfc6 commit 2c32d66
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 42 deletions.
14 changes: 7 additions & 7 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ Status ColumnReader::new_bitmap_index_iterator(BitmapIndexIterator** iterator) {

Status ColumnReader::new_inverted_index_iterator(const TabletIndex* index_meta,
OlapReaderStatistics* stats,
InvertedIndexIterator** iterator) {
std::unique_ptr<InvertedIndexIterator>* iterator) {
RETURN_IF_ERROR(_ensure_inverted_index_loaded(index_meta));
if (_inverted_index) {
RETURN_IF_ERROR(_inverted_index->new_iterator(stats, iterator));
Expand Down Expand Up @@ -505,16 +505,16 @@ Status ColumnReader::_load_inverted_index_index(const TabletIndex* index_meta) {

if (is_string_type(type)) {
if (parser_type != InvertedIndexParserType::PARSER_NONE) {
_inverted_index.reset(new FullTextIndexReader(
_file_reader->fs(), _file_reader->path().native(), index_meta));
_inverted_index = FullTextIndexReader::create_shared(
_file_reader->fs(), _file_reader->path().native(), index_meta);
return Status::OK();
} else {
_inverted_index.reset(new StringTypeInvertedIndexReader(
_file_reader->fs(), _file_reader->path().native(), index_meta));
_inverted_index = StringTypeInvertedIndexReader::create_shared(
_file_reader->fs(), _file_reader->path().native(), index_meta);
}
} else if (is_numeric_type(type)) {
_inverted_index.reset(
new BkdIndexReader(_file_reader->fs(), _file_reader->path().native(), index_meta));
_inverted_index = BkdIndexReader::create_shared(_file_reader->fs(),
_file_reader->path().native(), index_meta);
} else {
_inverted_index.reset();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class ColumnReader {
Status new_bitmap_index_iterator(BitmapIndexIterator** iterator);

Status new_inverted_index_iterator(const TabletIndex* index_meta, OlapReaderStatistics* stats,
InvertedIndexIterator** iterator);
std::unique_ptr<InvertedIndexIterator>* iterator);

// Seek to the first entry in the column.
Status seek_to_first(OrdinalPageIndexIterator* iter);
Expand Down Expand Up @@ -237,7 +237,7 @@ class ColumnReader {
std::unique_ptr<ZoneMapIndexReader> _zone_map_index;
std::unique_ptr<OrdinalIndexReader> _ordinal_index;
std::unique_ptr<BitmapIndexReader> _bitmap_index;
std::unique_ptr<InvertedIndexReader> _inverted_index;
std::shared_ptr<InvertedIndexReader> _inverted_index;
std::unique_ptr<BloomFilterIndexReader> _bloom_filter_index;
DorisCallOnce<Status> _load_zone_map_index_once;
DorisCallOnce<Status> _load_ordinal_index_once;
Expand Down
17 changes: 9 additions & 8 deletions be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ Status InvertedIndexReader::read_null_bitmap(InvertedIndexQueryCacheHandle* cach
}

Status FullTextIndexReader::new_iterator(OlapReaderStatistics* stats,
InvertedIndexIterator** iterator) {
*iterator = new InvertedIndexIterator(stats, this);
std::unique_ptr<InvertedIndexIterator>* iterator) {
*iterator = InvertedIndexIterator::create_unique(stats, shared_from_this());
return Status::OK();
}

Expand Down Expand Up @@ -404,9 +404,9 @@ InvertedIndexReaderType FullTextIndexReader::type() {
return InvertedIndexReaderType::FULLTEXT;
}

Status StringTypeInvertedIndexReader::new_iterator(OlapReaderStatistics* stats,
InvertedIndexIterator** iterator) {
*iterator = new InvertedIndexIterator(stats, this);
Status StringTypeInvertedIndexReader::new_iterator(
OlapReaderStatistics* stats, std::unique_ptr<InvertedIndexIterator>* iterator) {
*iterator = InvertedIndexIterator::create_unique(stats, shared_from_this());
return Status::OK();
}

Expand Down Expand Up @@ -543,13 +543,14 @@ BkdIndexReader::BkdIndexReader(io::FileSystemSPtr fs, const std::string& path,
LOG(WARNING) << "bkd index: " << index_file.string() << " not exist.";
return;
}
_compoundReader = new DorisCompoundReader(
_compoundReader = std::make_unique<DorisCompoundReader>(
DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), index_file_name.c_str(),
config::inverted_index_read_buffer_size);
}

Status BkdIndexReader::new_iterator(OlapReaderStatistics* stats, InvertedIndexIterator** iterator) {
*iterator = new InvertedIndexIterator(stats, this);
Status BkdIndexReader::new_iterator(OlapReaderStatistics* stats,
std::unique_ptr<InvertedIndexIterator>* iterator) {
*iterator = InvertedIndexIterator::create_unique(stats, shared_from_this());
return Status::OK();
}

Expand Down
57 changes: 38 additions & 19 deletions be/src/olap/rowset/segment_v2/inverted_index_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,16 @@ enum class InvertedIndexReaderType {
BKD = 2,
};

class InvertedIndexReader {
class InvertedIndexReader : public std::enable_shared_from_this<InvertedIndexReader> {
public:
explicit InvertedIndexReader(io::FileSystemSPtr fs, const std::string& path,
const TabletIndex* index_meta)
: _fs(std::move(fs)), _path(path), _index_meta(*index_meta) {}
: _fs(fs), _path(path), _index_meta(*index_meta) {}
virtual ~InvertedIndexReader() = default;

// create a new column iterator. Client should delete returned iterator
virtual Status new_iterator(OlapReaderStatistics* stats, InvertedIndexIterator** iterator) = 0;
virtual Status new_iterator(OlapReaderStatistics* stats,
std::unique_ptr<InvertedIndexIterator>* iterator) = 0;
virtual Status query(OlapReaderStatistics* stats, const std::string& column_name,
const void* query_value, InvertedIndexQueryType query_type,
roaring::Roaring* bit_map) = 0;
Expand All @@ -86,9 +87,9 @@ class InvertedIndexReader {
virtual InvertedIndexReaderType type() = 0;
bool indexExists(io::Path& index_file_path);

uint32_t get_index_id() const { return _index_meta.index_id(); }
[[nodiscard]] uint32_t get_index_id() const { return _index_meta.index_id(); }

const std::map<string, string>& get_index_properties() const {
[[nodiscard]] const std::map<string, string>& get_index_properties() const {
return _index_meta.properties();
}

Expand All @@ -103,18 +104,21 @@ class InvertedIndexReader {
bool _is_match_query(InvertedIndexQueryType query_type);
friend class InvertedIndexIterator;
io::FileSystemSPtr _fs;
std::string _path;
const std::string& _path;
TabletIndex _index_meta;
};

class FullTextIndexReader : public InvertedIndexReader {
ENABLE_FACTORY_CREATOR(FullTextIndexReader);

public:
explicit FullTextIndexReader(io::FileSystemSPtr fs, const std::string& path,
const TabletIndex* index_meta)
: InvertedIndexReader(std::move(fs), path, index_meta) {}
: InvertedIndexReader(fs, path, index_meta) {}
~FullTextIndexReader() override = default;

Status new_iterator(OlapReaderStatistics* stats, InvertedIndexIterator** iterator) override;
Status new_iterator(OlapReaderStatistics* stats,
std::unique_ptr<InvertedIndexIterator>* iterator) override;
Status query(OlapReaderStatistics* stats, const std::string& column_name,
const void* query_value, InvertedIndexQueryType query_type,
roaring::Roaring* bit_map) override;
Expand All @@ -128,13 +132,16 @@ class FullTextIndexReader : public InvertedIndexReader {
};

class StringTypeInvertedIndexReader : public InvertedIndexReader {
ENABLE_FACTORY_CREATOR(StringTypeInvertedIndexReader);

public:
explicit StringTypeInvertedIndexReader(io::FileSystemSPtr fs, const std::string& path,
const TabletIndex* index_meta)
: InvertedIndexReader(std::move(fs), path, index_meta) {}
: InvertedIndexReader(fs, path, index_meta) {}
~StringTypeInvertedIndexReader() override = default;

Status new_iterator(OlapReaderStatistics* stats, InvertedIndexIterator** iterator) override;
Status new_iterator(OlapReaderStatistics* stats,
std::unique_ptr<InvertedIndexIterator>* iterator) override;
Status query(OlapReaderStatistics* stats, const std::string& column_name,
const void* query_value, InvertedIndexQueryType query_type,
roaring::Roaring* bit_map) override;
Expand Down Expand Up @@ -181,18 +188,28 @@ class InvertedIndexVisitor : public lucene::util::bkd::bkd_reader::intersect_vis
};

class BkdIndexReader : public InvertedIndexReader {
ENABLE_FACTORY_CREATOR(BkdIndexReader);

public:
explicit BkdIndexReader(io::FileSystemSPtr fs, const std::string& path,
const TabletIndex* index_meta);
~BkdIndexReader() override {
if (_compoundReader != nullptr) {
_compoundReader->close();
delete _compoundReader;
_compoundReader = nullptr;
try {
_compoundReader->close();
} catch (const CLuceneError& e) {
// Handle exception, e.g., log it, but don't rethrow.
LOG(ERROR) << "Exception caught in BkdIndexReader destructor: " << e.what()
<< std::endl;
} catch (...) {
// Handle all other exceptions, but don't rethrow.
LOG(ERROR) << "Unknown exception caught in BkdIndexReader destructor." << std::endl;
}
}
}

Status new_iterator(OlapReaderStatistics* stats, InvertedIndexIterator** iterator) override;
Status new_iterator(OlapReaderStatistics* stats,
std::unique_ptr<InvertedIndexIterator>* iterator) override;

Status query(OlapReaderStatistics* stats, const std::string& column_name,
const void* query_value, InvertedIndexQueryType query_type,
Expand All @@ -211,12 +228,14 @@ class BkdIndexReader : public InvertedIndexReader {
private:
const TypeInfo* _type_info {};
const KeyCoder* _value_key_coder {};
DorisCompoundReader* _compoundReader;
std::unique_ptr<DorisCompoundReader> _compoundReader;
};

class InvertedIndexIterator {
ENABLE_FACTORY_CREATOR(InvertedIndexIterator);

public:
InvertedIndexIterator(OlapReaderStatistics* stats, InvertedIndexReader* reader)
InvertedIndexIterator(OlapReaderStatistics* stats, std::shared_ptr<InvertedIndexReader> reader)
: _stats(stats), _reader(reader) {}

Status read_from_inverted_index(const std::string& column_name, const void* query_value,
Expand All @@ -230,12 +249,12 @@ class InvertedIndexIterator {
return _reader->read_null_bitmap(cache_handle, dir);
}

InvertedIndexReaderType get_inverted_index_reader_type() const;
const std::map<string, string>& get_index_properties() const;
[[nodiscard]] InvertedIndexReaderType get_inverted_index_reader_type() const;
[[nodiscard]] const std::map<string, string>& get_index_properties() const;

private:
OlapReaderStatistics* _stats;
InvertedIndexReader* _reader;
std::shared_ptr<InvertedIndexReader> _reader;
};

} // namespace segment_v2
Expand Down
4 changes: 1 addition & 3 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,8 @@ Status Segment::new_inverted_index_iterator(const TabletColumn& tablet_column,
std::unique_ptr<InvertedIndexIterator>* iter) {
auto col_unique_id = tablet_column.unique_id();
if (_column_readers.count(col_unique_id) > 0 && index_meta) {
InvertedIndexIterator* it;
RETURN_IF_ERROR(_column_readers.at(col_unique_id)
->new_inverted_index_iterator(index_meta, stats, &it));
iter->reset(it);
->new_inverted_index_iterator(index_meta, stats, iter));
return Status::OK();
}
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ std::string SegmentIterator::_gen_predicate_result_sign(ColumnPredicateInfo* pre

bool SegmentIterator::_column_has_fulltext_index(int32_t unique_id) {
bool has_fulltext_index =
_inverted_index_iterators.count(unique_id) > 0 &&
_inverted_index_iterators[unique_id] != nullptr &&
_inverted_index_iterators[unique_id]->get_inverted_index_reader_type() ==
InvertedIndexReaderType::FULLTEXT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,9 @@ public static ColumnDef newSequenceColumnDef(Type type, AggregateType aggregateT
"sequence column hidden column", false);
}

public static ColumnDef newRowStoreColumnDef() {
return new ColumnDef(Column.ROW_STORE_COL, TypeDef.create(PrimitiveType.STRING), false, null, false, false,
public static ColumnDef newRowStoreColumnDef(AggregateType aggregateType) {
return new ColumnDef(Column.ROW_STORE_COL, TypeDef.create(PrimitiveType.STRING), false,
aggregateType, false, false,
new ColumnDef.DefaultValue(true, ""), "doris row store hidden column", false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,18 @@ public void analyze(Analyzer analyzer) throws UserException, AnalysisException {
}
// add a hidden column as row store
if (properties != null && PropertyAnalyzer.analyzeStoreRowColumn(new HashMap<>(properties))) {
columnDefs.add(ColumnDef.newRowStoreColumnDef());
if (keysDesc != null && keysDesc.getKeysType() == KeysType.AGG_KEYS) {
throw new AnalysisException("Aggregate table can't support row column now");
}
if (keysDesc != null && keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) {
if (enableUniqueKeyMergeOnWrite) {
columnDefs.add(ColumnDef.newRowStoreColumnDef(AggregateType.NONE));
} else {
columnDefs.add(ColumnDef.newRowStoreColumnDef(AggregateType.REPLACE));
}
} else {
columnDefs.add(ColumnDef.newRowStoreColumnDef(null));
}
}
if (Config.enable_hidden_version_column_by_default && keysDesc != null
&& keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) {
Expand Down
70 changes: 70 additions & 0 deletions regression-test/suites/row_store/load.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_row_store", "p0") {
def testTable = "tbl_unique"
sql "DROP TABLE IF EXISTS ${testTable}"
sql """
CREATE TABLE `${testTable}` (
`tag` varchar(45) NULL,
`tag_value` varchar(45) NULL,
`user_ids` decimalv3(30, 8) NULL,
`test` datetime NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=OLAP
UNIQUE KEY(`tag`, `tag_value`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`tag`) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2",
"light_schema_change" = "true",
"store_row_column" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);
"""
sql "insert into ${testTable} (tag,tag_value,user_ids) values ('10001','23',34.234),('10001','23',34.234);"
sql "insert into ${testTable} (tag,tag_value,user_ids) values ('10001','23',34.234);"
sql "select * from ${testTable}"

testTable = "tbl_dup"
sql "DROP TABLE IF EXISTS ${testTable}"
sql """
CREATE TABLE `${testTable}` (
`tag` varchar(45) NULL,
`tag_value` varchar(45) NULL,
`user_ids` decimalv3(30, 8) NULL,
`test` datetime NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=OLAP
UNIQUE KEY(`tag`, `tag_value`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`tag`) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"storage_format" = "V2",
"light_schema_change" = "true",
"store_row_column" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);
"""
sql "insert into ${testTable} (tag,tag_value,user_ids) values ('10001','23',34.234),('10001','23',34.234);"
sql "insert into ${testTable} (tag,tag_value,user_ids) values ('10001','23',34.234);"
sql "select * from ${testTable}"
}

0 comments on commit 2c32d66

Please sign in to comment.