Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ class StoragePageCache {
return _get_page_cache(page_type)->mem_tracker();
}

// Erase the page with key from this cache.
void erase(const CacheKey& key, segment_v2::PageTypePB page_type) {
auto* cache = _get_page_cache(page_type);
cache->erase(key.encode());
}

private:
StoragePageCache();

Expand Down
31 changes: 31 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end)
"failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret,
errno);
}
RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path));

// rename inverted index files
RETURN_IF_ERROR(_rename_compacted_indices(begin, end, 0));
Expand Down Expand Up @@ -579,13 +580,43 @@ Status BetaRowsetWriter::_rename_compacted_segment_plain(uint32_t seg_id) {
"failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret,
errno);
}

RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path));
// rename remaining inverted index files
RETURN_IF_ERROR(_rename_compacted_indices(-1, -1, seg_id));

++_num_segcompacted;
return Status::OK();
}

Status BetaRowsetWriter::_remove_segment_footer_cache(const uint32_t seg_id,
const std::string& segment_path) {
auto* footer_page_cache = ExecEnv::GetInstance()->get_storage_page_cache();
if (!footer_page_cache) {
return Status::OK();
}

auto fs = _rowset_meta->fs();
bool exists = false;
RETURN_IF_ERROR(fs->exists(segment_path, &exists));
if (exists) {
io::FileReaderSPtr file_reader;
io::FileReaderOptions reader_options {
.cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE,
.is_doris_table = true,
.cache_base_path = "",
.file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)),
.tablet_id = _rowset_meta->tablet_id(),
};
RETURN_IF_ERROR(fs->open_file(segment_path, &file_reader, &reader_options));
DCHECK(file_reader != nullptr);
auto cache_key = segment_v2::Segment::get_segment_footer_cache_key(file_reader);
footer_page_cache->erase(cache_key, segment_v2::PageTypePB::INDEX_PAGE);
}
return Status::OK();
}

Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id) {
int ret;

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
Status _rename_compacted_segments(int64_t begin, int64_t end);
Status _rename_compacted_segment_plain(uint32_t seg_id);
Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id);
Status _remove_segment_footer_cache(const uint32_t seg_id, const std::string& segment_path);
void _clear_statistics_for_deleting_segments_unsafe(uint32_t begin, uint32_t end);

StorageEngine& _engine;
Expand Down
60 changes: 60 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type_agg_state.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
#include "vec/runtime/vdatetime_value.h" //for VecDateTime

namespace doris::segment_v2 {
Expand Down Expand Up @@ -296,6 +297,61 @@ int64_t ColumnReader::get_metadata_size() const {
return sizeof(ColumnReader) + (_segment_zone_map ? _segment_zone_map->ByteSizeLong() : 0);
}

#ifdef BE_TEST
/// This function is only used in UT to verify the correctness of data read from zone map
/// See UT case 'SegCompactionMoWTest.SegCompactionInterleaveWithBig_ooooOOoOooooooooO'
/// be/test/olap/segcompaction_mow_test.cpp
void ColumnReader::check_data_by_zone_map_for_test(const vectorized::MutableColumnPtr& dst) const {
if (!_segment_zone_map) {
return;
}

const auto rows = dst->size();
if (rows == 0) {
return;
}

FieldType type = _type_info->type();

if (type != FieldType::OLAP_FIELD_TYPE_INT) {
return;
}

auto* non_nullable_column = dst->is_nullable()
? assert_cast<vectorized::ColumnNullable*>(dst.get())
->get_nested_column_ptr()
.get()
: dst.get();

/// `PredicateColumnType<TYPE_INT>` does not support `void get(size_t n, Field& res)`,
/// So here only check `CoumnVector<TYPE_INT>`
if (vectorized::check_and_get_column<vectorized::ColumnVector<TYPE_INT>>(non_nullable_column) ==
nullptr) {
return;
}

std::unique_ptr<WrapperField> min_value(WrapperField::create_by_type(type, _meta_length));
std::unique_ptr<WrapperField> max_value(WrapperField::create_by_type(type, _meta_length));
THROW_IF_ERROR(_parse_zone_map(*_segment_zone_map, min_value.get(), max_value.get()));

if (min_value->is_null() || max_value->is_null()) {
return;
}

int32_t min_v = *reinterpret_cast<int32_t*>(min_value->cell_ptr());
int32_t max_v = *reinterpret_cast<int32_t*>(max_value->cell_ptr());

for (size_t i = 0; i != rows; ++i) {
vectorized::Field field;
dst->get(i, field);
DCHECK(!field.is_null());
const auto v = field.get<int32_t>();
DCHECK_GE(v, min_v);
DCHECK_LE(v, max_v);
}
}
#endif

Status ColumnReader::init(const ColumnMetaPB* meta) {
_type_info = get_type_info(meta);

Expand Down Expand Up @@ -1814,6 +1870,10 @@ Status FileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& d
}
*n -= remaining;
_opts.stats->bytes_read += (dst->byte_size() - curr_size) + BitmapSize(*n);

#ifdef BE_TEST
_reader->check_data_by_zone_map_for_test(dst);
#endif
return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ class ColumnReader : public MetadataAdder<ColumnReader>,

int64_t get_metadata_size() const override;

#ifdef BE_TEST
void check_data_by_zone_map_for_test(const vectorized::MutableColumnPtr& dst) const;
#endif

private:
friend class VariantColumnReader;

Expand Down
9 changes: 7 additions & 2 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1019,8 +1019,13 @@ StoragePageCache::CacheKey Segment::get_segment_footer_cache_key() const {
// The footer is always at the end of the segment file.
// The size of footer is 12.
// So we use the size of file minus 12 as the cache key, which is unique for each segment file.
return StoragePageCache::CacheKey(_file_reader->path().native(), _file_reader->size(),
_file_reader->size() - 12);
return get_segment_footer_cache_key(_file_reader);
}

StoragePageCache::CacheKey Segment::get_segment_footer_cache_key(
const io::FileReaderSPtr& file_reader) {
return {file_reader->path().native(), file_reader->size(),
static_cast<int64_t>(file_reader->size() - 12)};
}

#include "common/compile_check_end.h"
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include "agent/be_exec_version_manager.h"
#include "common/status.h" // Status
#include "io/fs/file_reader.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/file_system.h"
#include "olap/field.h"
Expand Down Expand Up @@ -207,6 +208,9 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd

Status traverse_column_meta_pbs(const std::function<void(const ColumnMetaPB&)>& visitor);

static StoragePageCache::CacheKey get_segment_footer_cache_key(
const io::FileReaderSPtr& file_reader);

private:
DISALLOW_COPY_AND_ASSIGN(Segment);
Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
Expand Down
Loading