Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions be/src/io/fs/file_meta_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,27 @@

#include "io/fs/file_meta_cache.h"

#include "vec/exec/format/parquet/parquet_thrift_util.h"

namespace doris {

Status FileMetaCache::get_parquet_footer(io::FileReaderSPtr file_reader, io::IOContext* io_ctx,
int64_t mtime, size_t* meta_size,
ObjLRUCache::CacheHandle* handle) {
ObjLRUCache::CacheHandle cache_handle;
std::string cache_key = file_reader->path().native() + std::to_string(mtime);
auto hit_cache = _cache.lookup({cache_key}, &cache_handle);
if (hit_cache) {
*handle = std::move(cache_handle);
*meta_size = 0;
std::string FileMetaCache::get_key(const std::string file_name, int64_t modification_time,
int64_t file_size) {
std::string meta_cache_key;
meta_cache_key.resize(file_name.size() + sizeof(int64_t));

memcpy(meta_cache_key.data(), file_name.data(), file_name.size());
if (modification_time != 0) {
memcpy(meta_cache_key.data() + file_name.size(), &modification_time, sizeof(int64_t));
} else {
vectorized::FileMetaData* meta = nullptr;
RETURN_IF_ERROR(vectorized::parse_thrift_footer(file_reader, &meta, meta_size, io_ctx));
_cache.insert({cache_key}, meta, handle);
memcpy(meta_cache_key.data() + file_name.size(), &file_size, sizeof(int64_t));
}
return meta_cache_key;
}

return Status::OK();
std::string FileMetaCache::get_key(io::FileReaderSPtr file_reader,
const io::FileDescription& _file_description) {
return FileMetaCache::get_key(
file_reader->path().native(), _file_description.mtime,
_file_description.file_size == -1 ? file_reader->size() : _file_description.file_size);
}

} // namespace doris
20 changes: 15 additions & 5 deletions be/src/io/fs/file_meta_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include "io/file_factory.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "util/obj_lru_cache.h"

Expand All @@ -34,14 +35,23 @@ class FileMetaCache {

ObjLRUCache& cache() { return _cache; }

Status get_parquet_footer(io::FileReaderSPtr file_reader, io::IOContext* io_ctx, int64_t mtime,
size_t* meta_size, ObjLRUCache::CacheHandle* handle);
static std::string get_key(const std::string file_name, int64_t modification_time,
int64_t file_size);

Status get_orc_footer() {
// TODO: implement
return Status::OK();
static std::string get_key(io::FileReaderSPtr file_reader,
const io::FileDescription& _file_description);

bool lookup(const std::string& key, ObjLRUCache::CacheHandle* handle) {
return _cache.lookup({key}, handle);
}

template <typename T>
void insert(const std::string& key, T* value, ObjLRUCache::CacheHandle* handle) {
_cache.insert({key}, value, handle);
}

bool enabled() const { return _cache.enabled(); }

private:
ObjLRUCache _cache;
};
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/file_meta_cache.h"
#include "io/hdfs_builder.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/delete_handler.h"
Expand Down
5 changes: 2 additions & 3 deletions be/src/util/obj_lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ namespace doris {
ObjLRUCache::ObjLRUCache(int64_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::COMMON_OBJ_LRU_CACHE, capacity,
LRUCacheType::NUMBER, config::common_obj_lru_cache_stale_sweep_time_sec,
num_shards) {
_enabled = (capacity > 0);
}
num_shards),
_enabled(capacity > 0) {}

bool ObjLRUCache::lookup(const ObjKey& key, CacheHandle* handle) {
if (!_enabled) {
Expand Down
12 changes: 8 additions & 4 deletions be/src/util/obj_lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ class ObjLRUCache : public LRUCachePolicy {
bool valid() { return _cache != nullptr && _handle != nullptr; }

LRUCachePolicy* cache() const { return _cache; }

template <typename T>
void* data() const {
return (void*)((ObjValue<T>*)_cache->value(_handle))->value;
const T* data() const {
return ((ObjValue<T>*)_cache->value(_handle))->value;
}

private:
Expand All @@ -98,16 +99,19 @@ class ObjLRUCache : public LRUCachePolicy {
CachePriority::NORMAL);
*cache_handle = CacheHandle {this, handle};
} else {
cache_handle = nullptr;
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"ObjLRUCache disable, can not insert.");
}
}

void erase(const ObjKey& key);

bool exceed_prune_limit() override;

bool enabled() const { return _enabled; }

private:
bool _enabled;
const bool _enabled;
};

} // namespace doris
5 changes: 5 additions & 0 deletions be/src/vec/exec/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/PlanNodes_types.h>

#include "common/status.h"
#include "io/fs/file_meta_cache.h"
#include "runtime/descriptors.h"
#include "runtime/types.h"
#include "util/profile_collector.h"
Expand Down Expand Up @@ -85,6 +86,10 @@ class GenericReader : public ProfileCollector {
/// Whether the underlying FileReader has filled the partition&missing columns
bool _fill_all_columns = false;
TPushAggOp::type _push_down_agg_type {};

// Cache to save some common part such as file footer.
// Maybe null if not used
FileMetaCache* _meta_cache = nullptr;
};

#include "common/compile_check_end.h"
Expand Down
89 changes: 61 additions & 28 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ void StripeStreamInputStream::read(void* buf, uint64_t length, uint64_t offset)
OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
size_t batch_size, const std::string& ctz, io::IOContext* io_ctx,
bool enable_lazy_mat)
FileMetaCache* meta_cache, bool enable_lazy_mat)
: _profile(profile),
_state(state),
_scan_params(params),
Expand All @@ -183,13 +183,15 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
VecDateTimeValue t;
t.from_unixtime(0, ctz);
_offset_days = t.day() == 31 ? -1 : 0; // If 1969-12-31, then returns -1.
_meta_cache = meta_cache;
_init_profile();
_init_system_properties();
_init_file_description();
}

OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat)
const std::string& ctz, io::IOContext* io_ctx, FileMetaCache* meta_cache,
bool enable_lazy_mat)
: _profile(nullptr),
_scan_params(params),
_scan_range(range),
Expand All @@ -199,6 +201,7 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r
_enable_lazy_mat(enable_lazy_mat),
_enable_filter_by_min_max(true),
_dict_cols_has_converted(false) {
_meta_cache = meta_cache;
_init_system_properties();
_init_file_description();
}
Expand All @@ -221,7 +224,8 @@ void OrcReader::_collect_profile_before_close() {
COUNTER_UPDATE(_orc_profile.predicate_filter_time, _statistics.predicate_filter_time);
COUNTER_UPDATE(_orc_profile.dict_filter_rewrite_time, _statistics.dict_filter_rewrite_time);
COUNTER_UPDATE(_orc_profile.lazy_read_filtered_rows, _statistics.lazy_read_filtered_rows);

COUNTER_UPDATE(_orc_profile.file_footer_read_calls, _statistics.file_footer_read_calls);
COUNTER_UPDATE(_orc_profile.file_footer_hit_cache, _statistics.file_footer_hit_cache);
if (_file_input_stream != nullptr) {
_file_input_stream->collect_profile_before_close();
}
Expand Down Expand Up @@ -260,10 +264,15 @@ void OrcReader::_init_profile() {
ADD_COUNTER_WITH_LEVEL(_profile, "SelectedRowGroupCount", TUnit::UNIT, 1);
_orc_profile.evaluated_row_group_count =
ADD_COUNTER_WITH_LEVEL(_profile, "EvaluatedRowGroupCount", TUnit::UNIT, 1);
_orc_profile.file_footer_read_calls =
ADD_COUNTER_WITH_LEVEL(_profile, "FileFooterReadCalls", TUnit::UNIT, 1);
_orc_profile.file_footer_hit_cache =
ADD_COUNTER_WITH_LEVEL(_profile, "FileFooterHitCache", TUnit::UNIT, 1);
}
}

Status OrcReader::_create_file_reader() {
SCOPED_RAW_TIMER(&_statistics.create_reader_time);
if (_reader != nullptr) {
return Status::OK();
}
Expand All @@ -283,27 +292,56 @@ Status OrcReader::_create_file_reader() {
if (_file_input_stream->getLength() == 0) {
return Status::EndOfFile("empty orc file: " + _scan_range.path);
}

// create orc reader
try {
orc::ReaderOptions options;
options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool());
options.setReaderMetrics(&_reader_metrics);
_reader = orc::createReader(
std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options);
} catch (std::exception& e) {
// invoker maybe just skip Status.NotFound and continue
// so we need distinguish between it and other kinds of errors
std::string _err_msg = e.what();
if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
return Status::EndOfFile("stop");
orc::ReaderOptions options;
options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool());
options.setReaderMetrics(&_reader_metrics);

auto create_orc_reader = [&]() {
try {
_reader = orc::createReader(
std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options);
} catch (std::exception& e) {
// invoker maybe just skip Status.NotFound and continue
// so we need distinguish between it and other kinds of errors
std::string _err_msg = e.what();
if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
return Status::EndOfFile("stop");
}
// one for fs, the other is for oss.
if (_err_msg.find("No such file or directory") != std::string::npos ||
_err_msg.find("NoSuchKey") != std::string::npos) {
return Status::NotFound(_err_msg);
}
return Status::InternalError("Init OrcReader failed. reason = {}", _err_msg);
}
// one for fs, the other is for oss.
if (_err_msg.find("No such file or directory") != std::string::npos ||
_err_msg.find("NoSuchKey") != std::string::npos) {
return Status::NotFound(_err_msg);
return Status::OK();
};

if (_meta_cache == nullptr) {
_statistics.file_footer_read_calls++;
RETURN_IF_ERROR(create_orc_reader());
} else {
auto inner_file_reader = _file_input_stream->get_inner_reader();
const auto& file_meta_cache_key =
FileMetaCache::get_key(inner_file_reader, _file_description);

// Local variables can be required because setSerializedFileTail is an assignment operation, not a reference.
ObjLRUCache::CacheHandle _meta_cache_handle;
if (_meta_cache->lookup(file_meta_cache_key, &_meta_cache_handle)) {
const std::string* footer_ptr = _meta_cache_handle.data<String>();
options.setSerializedFileTail(*footer_ptr);
RETURN_IF_ERROR(create_orc_reader());
_statistics.file_footer_hit_cache++;
} else {
_statistics.file_footer_read_calls++;
RETURN_IF_ERROR(create_orc_reader());
std::string* footer_ptr = new std::string {_reader->getSerializedFileTail()};
_meta_cache->insert(file_meta_cache_key, footer_ptr, &_meta_cache_handle);
}
return Status::InternalError("Init OrcReader failed. reason = {}", _err_msg);
}

return Status::OK();
}

Expand Down Expand Up @@ -337,14 +375,8 @@ Status OrcReader::init_reader(
_orc_max_merge_distance_bytes = _state->query_options().orc_max_merge_distance_bytes;
}

{
SCOPED_RAW_TIMER(&_statistics.create_reader_time);
RETURN_IF_ERROR(_create_file_reader());
}
{
SCOPED_RAW_TIMER(&_statistics.init_column_time);
RETURN_IF_ERROR(_init_read_columns());
}
RETURN_IF_ERROR(_create_file_reader());
RETURN_IF_ERROR(_init_read_columns());
return Status::OK();
}

Expand All @@ -364,6 +396,7 @@ Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
}

Status OrcReader::_init_read_columns() {
SCOPED_RAW_TIMER(&_statistics.init_column_time);
const auto& root_type = _reader->getType();
if (_is_acid) {
for (uint64_t i = 0; i < root_type.getSubtypeCount(); ++i) {
Expand Down
11 changes: 9 additions & 2 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "exec/olap_common.h"
#include "io/file_factory.h"
#include "io/fs/buffered_reader.h"
#include "io/fs/file_meta_cache.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/tracing_file_reader.h"
Expand Down Expand Up @@ -132,14 +133,18 @@ class OrcReader : public GenericReader {
int64_t predicate_filter_time = 0;
int64_t dict_filter_rewrite_time = 0;
int64_t lazy_read_filtered_rows = 0;
int64_t file_footer_read_calls = 0;
int64_t file_footer_hit_cache = 0;
};

OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, size_t batch_size, const std::string& ctz,
io::IOContext* io_ctx, bool enable_lazy_mat = true);
io::IOContext* io_ctx, FileMetaCache* meta_cache = nullptr,
bool enable_lazy_mat = true);

OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat = true);
const std::string& ctz, io::IOContext* io_ctx, FileMetaCache* meta_cache = nullptr,
bool enable_lazy_mat = true);

~OrcReader() override;
//If you want to read the file by index instead of column name, set hive_use_column_names to false.
Expand Down Expand Up @@ -240,6 +245,8 @@ class OrcReader : public GenericReader {
RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr;
RuntimeProfile::Counter* selected_row_group_count = nullptr;
RuntimeProfile::Counter* evaluated_row_group_count = nullptr;
RuntimeProfile::Counter* file_footer_read_calls = nullptr;
RuntimeProfile::Counter* file_footer_hit_cache = nullptr;
};

class ORCFilterImpl : public orc::ORCFilter {
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/exec/format/parquet/parquet_thrift_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
constexpr size_t INIT_META_SIZE = 48 * 1024; // 48k

static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** file_metadata,
size_t* meta_size, io::IOContext* io_ctx) {
static Status parse_thrift_footer(io::FileReaderSPtr file,
std::unique_ptr<FileMetaData>* file_metadata, size_t* meta_size,
io::IOContext* io_ctx) {
size_t file_size = file->size();
size_t bytes_read = std::min(file_size, INIT_META_SIZE);
std::vector<uint8_t> footer(bytes_read);
Expand Down Expand Up @@ -75,7 +76,7 @@ static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** file_m
tparquet::FileMetaData t_metadata;
// deserialize footer
RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, &t_metadata));
*file_metadata = new FileMetaData(t_metadata, metadata_size);
*file_metadata = std::make_unique<FileMetaData>(t_metadata, metadata_size);
RETURN_IF_ERROR((*file_metadata)->init_schema());
*meta_size = PARQUET_FOOTER_SIZE + metadata_size;
return Status::OK();
Expand Down
Loading
Loading