Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,12 @@ DEFINE_Int32(index_page_cache_percentage, "10");
DEFINE_mBool(disable_storage_page_cache, "false");
// whether to disable row cache feature in storage
DEFINE_mBool(disable_storage_row_cache, "true");
// Parquet page cache: threshold ratio for caching decompressed vs compressed pages
// If uncompressed_size / compressed_size <= threshold, cache decompressed;
// otherwise cache compressed if enable_parquet_cache_compressed_pages = true
DEFINE_Double(parquet_page_cache_decompress_threshold, "1.5");
// Parquet page cache: whether to enable caching compressed pages (when ratio exceeds threshold)
DEFINE_Bool(enable_parquet_cache_compressed_pages, "false");
// whether to disable pk page cache feature in storage
DEFINE_Bool(disable_pk_storage_page_cache, "false");

Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,12 @@ DECLARE_Int32(index_page_cache_percentage);
DECLARE_Bool(disable_storage_page_cache);
// whether to disable row cache feature in storage
DECLARE_mBool(disable_storage_row_cache);
// Parquet page cache: threshold ratio for caching decompressed vs compressed pages
// If uncompressed_size / compressed_size <= threshold, cache decompressed;
// otherwise cache compressed if enable_parquet_cache_compressed_pages = true
DECLARE_Double(parquet_page_cache_decompress_threshold);
// Parquet page cache: whether to enable caching compressed pages (when ratio exceeds threshold)
DECLARE_Bool(enable_parquet_cache_compressed_pages);
// whether to disable pk page cache feature in storage
DECLARE_Bool(disable_pk_storage_page_cache);

Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class CachedRemoteFileReader final : public FileReader {

static std::pair<size_t, size_t> s_align_size(size_t offset, size_t size, size_t length);

int64_t mtime() const override { return _remote_file_reader->mtime(); }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down
12 changes: 12 additions & 0 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,18 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
const io::FileSystemProperties& system_properties,
const io::FileDescription& file_description, const io::FileReaderOptions& reader_options,
RuntimeProfile* profile) {
auto reader_res = _create_file_reader_internal(system_properties, file_description,
reader_options, profile);
if (!reader_res.has_value()) {
return unexpected(std::move(reader_res).error());
}
return std::move(reader_res).value();
}

Result<io::FileReaderSPtr> FileFactory::_create_file_reader_internal(
const io::FileSystemProperties& system_properties,
const io::FileDescription& file_description, const io::FileReaderOptions& reader_options,
RuntimeProfile* profile) {
TFileType::type type = system_properties.system_type;
switch (type) {
case TFileType::FILE_LOCAL: {
Expand Down
6 changes: 6 additions & 0 deletions be/src/io/file_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ class FileFactory {

private:
static std::string _get_fs_name(const io::FileDescription& file_description);

/// Create FileReader without FS
static Result<io::FileReaderSPtr> _create_file_reader_internal(
const io::FileSystemProperties& system_properties,
const io::FileDescription& file_description,
const io::FileReaderOptions& reader_options, RuntimeProfile* profile = nullptr);
};

} // namespace doris
6 changes: 4 additions & 2 deletions be/src/io/fs/broker_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ struct IOContext;

BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size,
TBrokerFD fd,
std::shared_ptr<BrokerServiceConnection> connection)
std::shared_ptr<BrokerServiceConnection> connection,
int64_t mtime)
: _path(std::move(path)),
_file_size(file_size),
_broker_addr(broker_addr),
_fd(fd),
_connection(std::move(connection)) {
_connection(std::move(connection)),
_mtime(mtime) {
DorisMetrics::instance()->broker_file_open_reading->increment(1);
DorisMetrics::instance()->broker_file_reader_total->increment(1);
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/io/fs/broker_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct IOContext;
class BrokerFileReader final : public FileReader {
public:
BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd,
std::shared_ptr<BrokerServiceConnection> connection);
std::shared_ptr<BrokerServiceConnection> connection, int64_t mtime = 0);

~BrokerFileReader() override;

Expand All @@ -50,6 +50,8 @@ class BrokerFileReader final : public FileReader {

bool closed() const override { return _closed.load(std::memory_order_acquire); }

int64_t mtime() const override { return _mtime; }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand All @@ -62,6 +64,7 @@ class BrokerFileReader final : public FileReader {
TBrokerFD _fd;

std::shared_ptr<BrokerServiceConnection> _connection;
int64_t _mtime;
std::atomic<bool> _closed = false;
};
} // namespace doris::io
2 changes: 1 addition & 1 deletion be/src/io/fs/broker_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ Status BrokerFileSystem::open_file_internal(const Path& file, FileReaderSPtr* re
error_msg(response->opStatus.message));
}
*reader = std::make_shared<BrokerFileReader>(_broker_addr, file, fsize, response->fd,
_connection);
_connection, opts.mtime);
return Status::OK();
}

Expand Down
12 changes: 12 additions & 0 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class RangeCacheFileReader : public io::FileReader {

bool closed() const override { return _closed; }

int64_t mtime() const override { return _inner_reader->mtime(); }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down Expand Up @@ -329,6 +331,8 @@ class MergeRangeFileReader : public io::FileReader {

bool closed() const override { return _closed; }

int64_t mtime() const override { return _reader->mtime(); }

// for test only
size_t buffer_remaining() const { return _remaining; }

Expand Down Expand Up @@ -532,6 +536,8 @@ class PrefetchBufferedReader final : public io::FileReader {

bool closed() const override { return _closed; }

int64_t mtime() const override { return _reader->mtime(); }

void set_random_access_ranges(const std::vector<PrefetchRange>* random_access_ranges) {
_random_access_ranges = random_access_ranges;
for (auto& _pre_buffer : _pre_buffers) {
Expand Down Expand Up @@ -592,6 +598,8 @@ class InMemoryFileReader final : public io::FileReader {

bool closed() const override { return _closed; }

int64_t mtime() const override { return _reader->mtime(); }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down Expand Up @@ -626,6 +634,8 @@ class BufferedStreamReader {
virtual ~BufferedStreamReader() = default;
// return the file path
virtual std::string path() = 0;

virtual int64_t mtime() const = 0;
};

class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector {
Expand All @@ -639,6 +649,8 @@ class BufferedFileStreamReader : public BufferedStreamReader, public ProfileColl
Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) override;
std::string path() override { return _file->path(); }

int64_t mtime() const override { return _file->mtime(); }

protected:
void _collect_profile_before_close() override {
if (_file != nullptr) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/fs/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class FileReader : public doris::ProfileCollector {

virtual const std::string& get_data_dir_path() { return VIRTUAL_REMOTE_DATA_DIR; }

// File modification time (seconds since epoch). Default to 0 meaning unknown.
virtual int64_t mtime() const = 0;

protected:
virtual Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) = 0;
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/hdfs_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,17 @@ Result<FileReaderSPtr> HdfsFileReader::create(Path full_path, const hdfsFS& fs,
auto path = convert_path(full_path, fs_name);
return get_file(fs, path, opts.mtime, opts.file_size).transform([&](auto&& accessor) {
return std::make_shared<HdfsFileReader>(std::move(path), std::move(fs_name),
std::move(accessor), profile);
std::move(accessor), profile, opts.mtime);
});
}

HdfsFileReader::HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor,
RuntimeProfile* profile)
RuntimeProfile* profile, int64_t mtime)
: _path(std::move(path)),
_fs_name(std::move(fs_name)),
_accessor(std::move(accessor)),
_profile(profile) {
_profile(profile),
_mtime(mtime) {
_handle = _accessor.get();

DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
Expand Down
5 changes: 4 additions & 1 deletion be/src/io/fs/hdfs_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class HdfsFileReader final : public FileReader {
const FileReaderOptions& opts, RuntimeProfile* profile);

HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor,
RuntimeProfile* profile);
RuntimeProfile* profile, int64_t mtime = 0);

~HdfsFileReader() override;

Expand All @@ -57,6 +57,8 @@ class HdfsFileReader final : public FileReader {

bool closed() const override { return _closed.load(std::memory_order_acquire); }

int64_t mtime() const override { return _mtime; }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down Expand Up @@ -86,6 +88,7 @@ class HdfsFileReader final : public FileReader {
CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle
std::atomic<bool> _closed = false;
RuntimeProfile* _profile = nullptr;
int64_t _mtime;
#ifdef USE_HADOOP_HDFS
HDFSProfile _hdfs_profile;
#endif
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/http_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,20 @@ Result<FileReaderSPtr> HttpFileReader::create(const std::string& url,
ofi.path = Path(url);
ofi.extend_info = props;

auto reader = std::make_shared<HttpFileReader>(ofi, url);
auto reader = std::make_shared<HttpFileReader>(ofi, url, opts.mtime);

// Open the file to detect Range support and validate configuration
RETURN_IF_ERROR_RESULT(reader->open(opts));

return reader;
}

HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url)
HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, int64_t mtime)
: _extend_kv(fileInfo.extend_info),
_path(fileInfo.path),
_url(std::move(url)),
_client(std::make_unique<HttpClient>()) {
_client(std::make_unique<HttpClient>()),
_mtime(mtime) {
auto etag_iter = _extend_kv.find("etag");
if (etag_iter != _extend_kv.end()) {
_etag = etag_iter->second;
Expand Down
5 changes: 4 additions & 1 deletion be/src/io/fs/http_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class HttpFileReader final : public FileReader {
const std::map<std::string, std::string>& props,
const FileReaderOptions& opts, RuntimeProfile* profile);

explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url);
explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url, int64_t mtime);
~HttpFileReader() override;

Status open(const FileReaderOptions& opts);
Expand All @@ -52,6 +52,8 @@ class HttpFileReader final : public FileReader {
bool closed() const override { return _closed.load(std::memory_order_acquire); }
size_t size() const override { return _file_size; }

int64_t mtime() const override { return _mtime; }

private:
// Prepare and initialize the HTTP client for a new request
Status prepare_client(bool set_fail_on_error = true);
Expand All @@ -78,6 +80,7 @@ class HttpFileReader final : public FileReader {
int64_t _last_modified = 0;
std::atomic<bool> _closed = false;
std::unique_ptr<HttpClient> _client;
int64_t _mtime;

// Configuration for non-Range request handling
bool _enable_range_request = true; // Whether Range request is required
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/http_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Status HttpFileSystem::open_file_internal(const Path& path, FileReaderSPtr* read
// Pass properties (including HTTP headers) to the file reader
file_info.extend_info = _properties;

auto http_reader = std::make_shared<HttpFileReader>(file_info, path.native());
auto http_reader = std::make_shared<HttpFileReader>(file_info, path.native(), opts.mtime);
RETURN_IF_ERROR(http_reader->open(opts));
*reader = http_reader;
return Status::OK();
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/local_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class LocalFileReader final : public FileReader {

const std::string& get_data_dir_path() override { return _data_dir_path; }

int64_t mtime() const override { return 0; }

private:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/packed_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class PackedFileReader final : public FileReader {

bool closed() const override { return _closed; }

int64_t mtime() const override { return _inner_reader->mtime(); }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/s3_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class S3FileReader final : public FileReader {

bool closed() const override { return _closed.load(std::memory_order_acquire); }

int64_t mtime() const override { return 0; }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {

size_t size() const override { return 0; }

int64_t mtime() const override { return 0; }

// called when consumer finished
Status close() override {
if (!(_finished || _cancelled)) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/tracing_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class TracingFileReader : public FileReader {
void _collect_profile_at_runtime() override { return _inner->collect_profile_at_runtime(); }
void _collect_profile_before_close() override { return _inner->collect_profile_before_close(); }

int64_t mtime() const override { return _inner->mtime(); }

FileReaderStats* stats() const { return _stats; }
doris::io::FileReaderSPtr inner_reader() { return _inner; }

Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/format/orc/orc_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class OrcMergeRangeFileReader : public io::FileReader {

bool closed() const override { return _closed; }

int64_t mtime() const override { return _inner_reader->mtime(); }

// for test only
const Statistics& statistics() const { return _statistics; }

Expand Down
Loading
Loading