Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ DEFINE_Int32(index_page_cache_percentage, "10");
DEFINE_mBool(disable_storage_page_cache, "false");
// whether to disable row cache feature in storage
DEFINE_mBool(disable_storage_row_cache, "true");
// Parquet page cache: threshold ratio for caching decompressed vs compressed pages
// If uncompressed_size / compressed_size <= threshold, cache decompressed; otherwise cache compressed
DEFINE_Double(parquet_page_cache_decompress_threshold, "1.5");
// Parquet page cache: whether to enable caching compressed pages (when ratio exceeds threshold)
DEFINE_Bool(enable_parquet_cache_compressed_pages, "false");
// whether to disable pk page cache feature in storage
DEFINE_Bool(disable_pk_storage_page_cache, "false");

Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,10 @@ DECLARE_Int32(index_page_cache_percentage);
DECLARE_Bool(disable_storage_page_cache);
// whether to disable row cache feature in storage
DECLARE_mBool(disable_storage_row_cache);
// Parquet page cache: threshold ratio for caching decompressed vs compressed pages
DECLARE_Double(parquet_page_cache_decompress_threshold);
// Parquet page cache: whether to enable caching compressed pages
DECLARE_Bool(enable_parquet_cache_compressed_pages);
// whether to disable pk page cache feature in storage
DECLARE_Bool(disable_pk_storage_page_cache);

Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class CachedRemoteFileReader final : public FileReader {

static std::pair<size_t, size_t> s_align_size(size_t offset, size_t size, size_t length);

int64_t mtime() const override { return _remote_file_reader->mtime(); }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down
15 changes: 15 additions & 0 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,21 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
const io::FileSystemProperties& system_properties,
const io::FileDescription& file_description, const io::FileReaderOptions& reader_options,
RuntimeProfile* profile) {
auto reader_res = _create_file_reader_internal(system_properties, file_description,
reader_options, profile);
if (!reader_res.has_value()) {
return unexpected(std::move(reader_res).error());
}
auto file_reader = std::move(reader_res).value();
LOG_INFO("create file reader for path={}, size={}, mtime={}", file_description.path,
file_description.file_size, file_description.mtime);
return file_reader;
}

Result<io::FileReaderSPtr> FileFactory::_create_file_reader_internal(
const io::FileSystemProperties& system_properties,
const io::FileDescription& file_description, const io::FileReaderOptions& reader_options,
RuntimeProfile* profile) {
TFileType::type type = system_properties.system_type;
switch (type) {
case TFileType::FILE_LOCAL: {
Expand Down
6 changes: 6 additions & 0 deletions be/src/io/file_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ class FileFactory {

private:
static std::string _get_fs_name(const io::FileDescription& file_description);

/// Create FileReader without FS
static Result<io::FileReaderSPtr> _create_file_reader_internal(
const io::FileSystemProperties& system_properties,
const io::FileDescription& file_description,
const io::FileReaderOptions& reader_options, RuntimeProfile* profile = nullptr);
};

} // namespace doris
6 changes: 4 additions & 2 deletions be/src/io/fs/broker_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ struct IOContext;

BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size,
TBrokerFD fd,
std::shared_ptr<BrokerServiceConnection> connection)
std::shared_ptr<BrokerServiceConnection> connection,
int64_t mtime)
: _path(std::move(path)),
_file_size(file_size),
_broker_addr(broker_addr),
_fd(fd),
_connection(std::move(connection)) {
_connection(std::move(connection)),
_mtime(mtime) {
DorisMetrics::instance()->broker_file_open_reading->increment(1);
DorisMetrics::instance()->broker_file_reader_total->increment(1);
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/io/fs/broker_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct IOContext;
class BrokerFileReader final : public FileReader {
public:
BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd,
std::shared_ptr<BrokerServiceConnection> connection);
std::shared_ptr<BrokerServiceConnection> connection, int64_t mtime = 0);

~BrokerFileReader() override;

Expand All @@ -50,6 +50,8 @@ class BrokerFileReader final : public FileReader {

bool closed() const override { return _closed.load(std::memory_order_acquire); }

int64_t mtime() const override { return _mtime; }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand All @@ -62,6 +64,7 @@ class BrokerFileReader final : public FileReader {
TBrokerFD _fd;

std::shared_ptr<BrokerServiceConnection> _connection;
int64_t _mtime;
std::atomic<bool> _closed = false;
};
} // namespace doris::io
2 changes: 1 addition & 1 deletion be/src/io/fs/broker_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ Status BrokerFileSystem::open_file_internal(const Path& file, FileReaderSPtr* re
error_msg(response->opStatus.message));
}
*reader = std::make_shared<BrokerFileReader>(_broker_addr, file, fsize, response->fd,
_connection);
_connection, opts.mtime);
return Status::OK();
}

Expand Down
3 changes: 0 additions & 3 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -819,12 +819,10 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset
int64_t buf_remaining = _buf_end_offset - _buf_start_offset;
int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - _buf_end_offset);
int64_t has_read = 0;
SCOPED_RAW_TIMER(&_statistics.read_time);
while (has_read < to_read) {
size_t loop_read = 0;
Slice result(_buf.get() + buf_remaining + has_read, to_read - has_read);
RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, result, &loop_read, io_ctx));
_statistics.read_calls++;
if (loop_read == 0) {
break;
}
Expand All @@ -833,7 +831,6 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset
if (has_read != to_read) {
return Status::Corruption("Try to read {} bytes, but received {} bytes", to_read, has_read);
}
_statistics.read_bytes += to_read;
_buf_end_offset += to_read;
*buf = _buf.get();
return Status::OK();
Expand Down
28 changes: 11 additions & 17 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class RangeCacheFileReader : public io::FileReader {

bool closed() const override { return _closed; }

int64_t mtime() const override { return _inner_reader->mtime(); }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down Expand Up @@ -225,7 +227,6 @@ class MergeRangeFileReader : public io::FileReader {
int64_t merged_io = 0;
int64_t request_bytes = 0;
int64_t merged_bytes = 0;
int64_t apply_bytes = 0;
};

struct RangeCachedData {
Expand Down Expand Up @@ -299,9 +300,6 @@ class MergeRangeFileReader : public io::FileReader {
_merged_read_slice_size = READ_SLICE_SIZE;
}

for (const PrefetchRange& range : _random_access_ranges) {
_statistics.apply_bytes += range.end_offset - range.start_offset;
}
if (_profile != nullptr) {
const char* random_profile = "MergedSmallIO";
ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
Expand All @@ -315,8 +313,6 @@ class MergeRangeFileReader : public io::FileReader {
random_profile, 1);
_merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedBytes", TUnit::BYTES,
random_profile, 1);
_apply_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ApplyBytes", TUnit::BYTES,
random_profile, 1);
}
}

Expand All @@ -335,6 +331,8 @@ class MergeRangeFileReader : public io::FileReader {

bool closed() const override { return _closed; }

int64_t mtime() const override { return _reader->mtime(); }

// for test only
size_t buffer_remaining() const { return _remaining; }

Expand All @@ -359,7 +357,6 @@ class MergeRangeFileReader : public io::FileReader {
COUNTER_UPDATE(_merged_io, _statistics.merged_io);
COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
if (_reader != nullptr) {
_reader->collect_profile_before_close();
}
Expand All @@ -373,7 +370,6 @@ class MergeRangeFileReader : public io::FileReader {
RuntimeProfile::Counter* _merged_io = nullptr;
RuntimeProfile::Counter* _request_bytes = nullptr;
RuntimeProfile::Counter* _merged_bytes = nullptr;
RuntimeProfile::Counter* _apply_bytes = nullptr;

int _search_read_range(size_t start_offset, size_t end_offset);
void _clean_cached_data(RangeCachedData& cached_data);
Expand Down Expand Up @@ -540,6 +536,8 @@ class PrefetchBufferedReader final : public io::FileReader {

bool closed() const override { return _closed; }

int64_t mtime() const override { return _reader->mtime(); }

void set_random_access_ranges(const std::vector<PrefetchRange>* random_access_ranges) {
_random_access_ranges = random_access_ranges;
for (auto& _pre_buffer : _pre_buffers) {
Expand Down Expand Up @@ -600,6 +598,8 @@ class InMemoryFileReader final : public io::FileReader {

bool closed() const override { return _closed; }

int64_t mtime() const override { return _reader->mtime(); }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand All @@ -619,12 +619,6 @@ class InMemoryFileReader final : public io::FileReader {
*/
class BufferedStreamReader {
public:
struct Statistics {
int64_t read_time = 0;
int64_t read_calls = 0;
int64_t read_bytes = 0;
};

/**
* Return the address of underlying buffer that locates the start of data between [offset, offset + bytes_to_read)
* @param buf the buffer address to save the start address of data
Expand All @@ -637,13 +631,11 @@ class BufferedStreamReader {
* Save the data address to slice.data, and the slice.size is the bytes to read.
*/
virtual Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) = 0;
Statistics& statistics() { return _statistics; }
virtual ~BufferedStreamReader() = default;
// return the file path
virtual std::string path() = 0;

protected:
Statistics _statistics;
virtual int64_t mtime() const = 0;
};

class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector {
Expand All @@ -657,6 +649,8 @@ class BufferedFileStreamReader : public BufferedStreamReader, public ProfileColl
Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) override;
std::string path() override { return _file->path(); }

int64_t mtime() const override { return _file->mtime(); }

protected:
void _collect_profile_before_close() override {
if (_file != nullptr) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/fs/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class FileReader : public doris::ProfileCollector {

virtual const std::string& get_data_dir_path() { return VIRTUAL_REMOTE_DATA_DIR; }

// File modification time (seconds since epoch). Default to 0 meaning unknown.
virtual int64_t mtime() const = 0;

protected:
virtual Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) = 0;
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/hdfs_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,17 @@ Result<FileReaderSPtr> HdfsFileReader::create(Path full_path, const hdfsFS& fs,
auto path = convert_path(full_path, fs_name);
return get_file(fs, path, opts.mtime, opts.file_size).transform([&](auto&& accessor) {
return std::make_shared<HdfsFileReader>(std::move(path), std::move(fs_name),
std::move(accessor), profile);
std::move(accessor), profile, opts.mtime);
});
}

HdfsFileReader::HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor,
RuntimeProfile* profile)
RuntimeProfile* profile, int64_t mtime)
: _path(std::move(path)),
_fs_name(std::move(fs_name)),
_accessor(std::move(accessor)),
_profile(profile) {
_profile(profile),
_mtime(mtime) {
_handle = _accessor.get();

DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
Expand Down
5 changes: 4 additions & 1 deletion be/src/io/fs/hdfs_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class HdfsFileReader final : public FileReader {
const FileReaderOptions& opts, RuntimeProfile* profile);

HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor accessor,
RuntimeProfile* profile);
RuntimeProfile* profile, int64_t mtime = 0);

~HdfsFileReader() override;

Expand All @@ -57,6 +57,8 @@ class HdfsFileReader final : public FileReader {

bool closed() const override { return _closed.load(std::memory_order_acquire); }

int64_t mtime() const override { return _mtime; }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down Expand Up @@ -86,6 +88,7 @@ class HdfsFileReader final : public FileReader {
CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle
std::atomic<bool> _closed = false;
RuntimeProfile* _profile = nullptr;
int64_t _mtime;
#ifdef USE_HADOOP_HDFS
HDFSProfile _hdfs_profile;
#endif
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/http_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,20 @@ Result<FileReaderSPtr> HttpFileReader::create(const std::string& url,
ofi.path = Path(url);
ofi.extend_info = props;

auto reader = std::make_shared<HttpFileReader>(ofi, url);
auto reader = std::make_shared<HttpFileReader>(ofi, url, opts.mtime);

// Open the file to detect Range support and validate configuration
RETURN_IF_ERROR_RESULT(reader->open(opts));

return reader;
}

HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url)
HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, int64_t mtime)
: _extend_kv(fileInfo.extend_info),
_path(fileInfo.path),
_url(std::move(url)),
_client(std::make_unique<HttpClient>()) {
_client(std::make_unique<HttpClient>()),
_mtime(mtime) {
auto etag_iter = _extend_kv.find("etag");
if (etag_iter != _extend_kv.end()) {
_etag = etag_iter->second;
Expand Down
5 changes: 4 additions & 1 deletion be/src/io/fs/http_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class HttpFileReader final : public FileReader {
const std::map<std::string, std::string>& props,
const FileReaderOptions& opts, RuntimeProfile* profile);

explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url);
explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url, int64_t mtime);
~HttpFileReader() override;

Status open(const FileReaderOptions& opts);
Expand All @@ -52,6 +52,8 @@ class HttpFileReader final : public FileReader {
bool closed() const override { return _closed.load(std::memory_order_acquire); }
size_t size() const override { return _file_size; }

int64_t mtime() const override { return _mtime; }

private:
// Prepare and initialize the HTTP client for a new request
Status prepare_client(bool set_fail_on_error = true);
Expand All @@ -78,6 +80,7 @@ class HttpFileReader final : public FileReader {
int64_t _last_modified = 0;
std::atomic<bool> _closed = false;
std::unique_ptr<HttpClient> _client;
int64_t _mtime;

// Configuration for non-Range request handling
bool _enable_range_request = true; // Whether Range request is required
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/http_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Status HttpFileSystem::open_file_internal(const Path& path, FileReaderSPtr* read
// Pass properties (including HTTP headers) to the file reader
file_info.extend_info = _properties;

auto http_reader = std::make_shared<HttpFileReader>(file_info, path.native());
auto http_reader = std::make_shared<HttpFileReader>(file_info, path.native(), opts.mtime);
RETURN_IF_ERROR(http_reader->open(opts));
*reader = http_reader;
return Status::OK();
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/local_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class LocalFileReader final : public FileReader {

const std::string& get_data_dir_path() override { return _data_dir_path; }

int64_t mtime() const override { return 0; }

private:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/s3_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class S3FileReader final : public FileReader {

bool closed() const override { return _closed.load(std::memory_order_acquire); }

int64_t mtime() const override { return 0; }

protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/fs/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {

size_t size() const override { return 0; }

int64_t mtime() const override { return 0; }

// called when consumer finished
Status close() override {
if (!(_finished || _cancelled)) {
Expand Down
Loading
Loading