Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/apache-orc
58 changes: 58 additions & 0 deletions be/src/io/fs/tracing_file_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once
#include "common/status.h"
#include "io/fs/file_reader.h"
#include "util/runtime_profile.h"

namespace doris {

namespace io {

class TracingFileReader : public FileReader {
public:
TracingFileReader(doris::io::FileReaderSPtr inner, FileReaderStats* stats)
: _inner(std::move(inner)), _stats(stats) {}

Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override {
SCOPED_RAW_TIMER(&_stats->read_time_ns);
Status st = _inner->read_at(offset, result, bytes_read, io_ctx);
_stats->read_calls++;
_stats->read_bytes += *bytes_read;
return st;
}

Status close() override { return _inner->close(); }
const doris::io::Path& path() const override { return _inner->path(); }
size_t size() const override { return _inner->size(); }
bool closed() const override { return _inner->closed(); }
const std::string& get_data_dir_path() override { return _inner->get_data_dir_path(); }

void _collect_profile_at_runtime() override { return _inner->collect_profile_at_runtime(); }
void _collect_profile_before_close() override { return _inner->collect_profile_before_close(); }

FileReaderStats* stats() const { return _stats; }

private:
doris::io::FileReaderSPtr _inner;
FileReaderStats* _stats;
};

} // namespace io
} // namespace doris
8 changes: 8 additions & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ enum class ReaderType : uint8_t {

namespace io {

struct FileReaderStats {
size_t read_calls = 0;
size_t read_bytes = 0;
int64_t read_time_ns = 0;
size_t read_rows = 0;
};

struct FileCacheStatistics {
int64_t num_local_io_total = 0;
int64_t num_remote_io_total = 0;
Expand Down Expand Up @@ -73,6 +80,7 @@ struct IOContext {
int64_t expiration_time = 0;
const TUniqueId* query_id = nullptr; // Ref
FileCacheStatistics* file_cache_stats = nullptr; // Ref
FileReaderStats* file_reader_stats = nullptr; // Ref
bool is_inverted_index = false;
// if is_dryrun, read IO will download data to cache but return no data to reader
// useful to skip cache data read from local disk to accelarate warm up
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
std::string name_suffix() const override;

private:
friend class vectorized::VFileScanner;
std::shared_ptr<vectorized::SplitSourceConnector> _split_source = nullptr;
int _max_scanners;
// A in memory cache to save some common components
Expand Down
2 changes: 2 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,8 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
io::IOContext io_ctx;
io::FileCacheStatistics file_cache_statis;
io_ctx.file_cache_stats = &file_cache_statis;
io::FileReaderStats file_reader_stats;
io_ctx.file_reader_stats = &file_reader_stats;
// file_slots is no use, but the lifetime should be longer than reader
std::vector<SlotDescriptor*> file_slots;
switch (params.format_type) {
Expand Down
15 changes: 12 additions & 3 deletions be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow_pip_input_stream.h"
#include "common/logging.h"
#include "io/fs/stream_load_pipe.h"
#include "io/fs/tracing_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
Expand All @@ -43,14 +44,22 @@ ArrowStreamReader::ArrowStreamReader(RuntimeState* state, RuntimeProfile* profil
const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs,
io::IOContext* io_ctx)
: _state(state), _range(range), _file_slot_descs(file_slot_descs), _file_reader(nullptr) {
: _state(state),
_range(range),
_file_slot_descs(file_slot_descs),
_io_ctx(io_ctx),
_file_reader(nullptr) {
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz);
}

ArrowStreamReader::~ArrowStreamReader() = default;

Status ArrowStreamReader::init_reader() {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, false));
io::FileReaderSPtr file_reader;
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &file_reader, _state, false));
_file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
_io_ctx->file_reader_stats)
: file_reader;
_pip_stream = ArrowPipInputStream::create_unique(_file_reader);
return Status::OK();
}
Expand Down Expand Up @@ -121,4 +130,4 @@ Status ArrowStreamReader::get_columns(std::unordered_map<std::string, TypeDescri
}

#include "common/compile_check_end.h"
} // namespace doris::vectorized
} // namespace doris::vectorized
1 change: 1 addition & 0 deletions be/src/vec/exec/format/arrow/arrow_stream_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class ArrowStreamReader : public GenericReader {
RuntimeState* _state;
const TFileRangeDesc& _range;
const std::vector<SlotDescriptor*>& _file_slot_descs;
io::IOContext* _io_ctx;
io::FileReaderSPtr _file_reader;
std::unique_ptr<doris::vectorized::ArrowPipInputStream> _pip_stream;
cctz::time_zone _ctzz;
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "io/fs/buffered_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/s3_file_reader.h"
#include "io/fs/tracing_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "util/string_util.h"
Expand Down Expand Up @@ -541,10 +542,13 @@ Status CsvReader::_create_file_reader(bool need_schema) {
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
_file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description, reader_options,
io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size)));
_file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
_io_ctx->file_reader_stats)
: file_reader;
}
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
_params.file_type != TFileType::FILE_BROKER) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,8 @@ NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile,
_more_input_bytes(0),
_more_output_bytes(0),
_current_offset(current_offset),
_bytes_read_counter(nullptr),
_read_timer(nullptr),
_bytes_decompress_counter(nullptr),
_decompress_timer(nullptr) {
_bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
_read_timer = ADD_TIMER(_profile, "FileReadTime");
_bytes_decompress_counter = ADD_COUNTER(_profile, "BytesDecompressed", TUnit::BYTES);
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
}
Expand Down Expand Up @@ -384,16 +380,12 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool
}
}

{
SCOPED_TIMER(_read_timer);
Slice file_slice(file_buf, buffer_len);
RETURN_IF_ERROR(
_file_reader->read_at(_current_offset, file_slice, &read_len, io_ctx));
_current_offset += read_len;
if (read_len == 0) {
_file_eof = true;
}
COUNTER_UPDATE(_bytes_read_counter, read_len);
Slice file_slice(file_buf, buffer_len);
RETURN_IF_ERROR(
_file_reader->read_at(_current_offset, file_slice, &read_len, io_ctx));
_current_offset += read_len;
if (read_len == 0) {
_file_eof = true;
}
if (_file_eof || read_len == 0) {
if (!stream_end) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,6 @@ class NewPlainTextLineReader : public LineReader {
size_t _current_offset;

// Profile counters
RuntimeProfile::Counter* _bytes_read_counter = nullptr;
RuntimeProfile::Counter* _read_timer = nullptr;
RuntimeProfile::Counter* _bytes_decompress_counter = nullptr;
RuntimeProfile::Counter* _decompress_timer = nullptr;
};
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class GenericReader : public ProfileCollector {

virtual Status close() { return Status::OK(); }

/// The reader is responsible for counting the number of rows read,
/// because some readers, such as parquet/orc,
/// can skip some pages/rowgroups through indexes.
virtual bool count_read_rows() { return false; }

protected:
const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding)

Expand Down
15 changes: 8 additions & 7 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "io/fs/buffered_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/stream_load_pipe.h"
#include "io/fs/tracing_file_reader.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -95,9 +96,7 @@ NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, Scann
_scanner_eof(scanner_eof),
_current_offset(0),
_io_ctx(io_ctx) {
_bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
_read_timer = ADD_TIMER(_profile, "ReadTime");
_file_read_timer = ADD_TIMER(_profile, "FileReadTime");
if (_range.__isset.compress_type) {
// for compatibility
_file_compress_type = _range.compress_type;
Expand Down Expand Up @@ -438,10 +437,13 @@ Status NewJsonReader::_open_file_reader(bool need_schema) {
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
_file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description, reader_options,
io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
io::PrefetchRange(_range.start_offset, _range.size)));
_file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
_io_ctx->file_reader_stats)
: file_reader;
}
return Status::OK();
}
Expand Down Expand Up @@ -662,7 +664,7 @@ Status NewJsonReader::_parse_json(bool* is_empty_row, bool* eof) {
// return Status::OK() if parse succeed or reach EOF.
Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) {
// read a whole message
SCOPED_TIMER(_file_read_timer);
SCOPED_TIMER(_read_timer);
const uint8_t* json_str = nullptr;
std::unique_ptr<uint8_t[]> json_str_ptr;
if (_line_reader != nullptr) {
Expand All @@ -675,7 +677,6 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) {
}
}

_bytes_read_counter += *size;
if (*eof) {
return Status::OK();
}
Expand Down Expand Up @@ -1931,7 +1932,7 @@ Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::st

Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
simdjson::error_code* error) {
SCOPED_TIMER(_file_read_timer);
SCOPED_TIMER(_read_timer);
// step1: read buf from pipe.
if (_line_reader != nullptr) {
RETURN_IF_ERROR(_line_reader->read_line(&_json_str, size, eof, _io_ctx));
Expand Down Expand Up @@ -1992,7 +1993,7 @@ Status NewJsonReader::_judge_empty_row(size_t size, bool eof, bool* is_empty_row

Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_code* error,
bool* is_empty_row) {
SCOPED_TIMER(_file_read_timer);
SCOPED_TIMER(_read_timer);
auto return_quality_error = [&](fmt::memory_buffer& error_msg,
const std::string& doc_info) -> Status {
_counter->num_rows_filtered++;
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/exec/format/json/new_json_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,7 @@ class NewJsonReader : public GenericReader {

io::IOContext* _io_ctx = nullptr;

RuntimeProfile::Counter* _bytes_read_counter = nullptr;
RuntimeProfile::Counter* _read_timer = nullptr;
RuntimeProfile::Counter* _file_read_timer = nullptr;

// ======SIMD JSON======
// name mapping
Expand Down
Loading
Loading