diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index c42afcea254b78..39ab4c86deee3f 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -30,6 +30,7 @@ set(EXEC_FILES blocking_join_node.cpp broker_scan_node.cpp broker_reader.cpp + buffered_reader.cpp base_scanner.cpp broker_scanner.cpp cross_join_node.cpp diff --git a/be/src/exec/broker_reader.cpp b/be/src/exec/broker_reader.cpp index dac2a5b6c6d951..2125bb31cbb2a9 100644 --- a/be/src/exec/broker_reader.cpp +++ b/be/src/exec/broker_reader.cpp @@ -155,6 +155,8 @@ Status BrokerReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_rea return status; } + VLOG_RPC << "send pread request to broker:" << broker_addr << " position:" << position << ", read bytes length:" << nbytes; + try { client->pread(response, request); } catch (apache::thrift::transport::TTransportException& e) { @@ -253,3 +255,4 @@ void BrokerReader::close() { } } // namespace doris + diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp new file mode 100644 index 00000000000000..696067fb6db70c --- /dev/null +++ b/be/src/exec/buffered_reader.cpp @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/buffered_reader.h" + +#include +#include + +#include "common/logging.h" + +namespace doris { + +// buffered reader +BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size) + : _reader(reader), + _buffer_size(buffer_size), + _buffer_offset(0), + _buffer_limit(0), + _cur_offset(0) { + _buffer = new char[_buffer_size]; +} + +BufferedReader::~BufferedReader() { + close(); +} + +Status BufferedReader::open() { + if (!_reader) { + std::stringstream ss; + ss << "Open buffered reader failed, reader is null"; + return Status::InternalError(ss.str()); + } + RETURN_IF_ERROR(_reader->open()); + RETURN_IF_ERROR(_fill()); + return Status::OK(); +} + +//not support +Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) { + return Status::NotSupported("Not support"); +} + +Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) { + DCHECK_NE(*buf_len, 0); + int64_t bytes_read; + RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf)); + if (bytes_read == 0) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { + if (nbytes <= 0) { + *bytes_read = 0; + return Status::OK(); + } + RETURN_IF_ERROR(_read_once(position, nbytes, bytes_read, out)); + //EOF + if (*bytes_read <= 0) { + return Status::OK(); + } + while (*bytes_read < nbytes) { + int64_t len; + RETURN_IF_ERROR(_read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast(out) + *bytes_read)); + // EOF + if (len <= 0) { + break; + } + *bytes_read += len; + } + return Status::OK(); +} + +Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { + // requested bytes missed the local buffer + if (position >= _buffer_limit || position < _buffer_offset) { + // if requested length is larger than the capacity of buffer, do not + // need to copy the character into local buffer. + if (nbytes > _buffer_size) { + return _reader->readat(position, nbytes, bytes_read, out); + } + _buffer_offset = position; + RETURN_IF_ERROR(_fill()); + if (position >= _buffer_limit) { + *bytes_read = 0; + return Status::OK(); + } + } + int64_t len = std::min(_buffer_limit - position, nbytes); + int64_t off = position - _buffer_offset; + memcpy(out, _buffer + off, len); + *bytes_read = len; + _cur_offset = position + *bytes_read; + return Status::OK(); +} + +Status BufferedReader::_fill() { + if (_buffer_offset >= 0) { + int64_t bytes_read; + // retry for new content + int retry_times = 1; + do { + // fill the buffer + RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer)); + } while (bytes_read == 0 && retry_times++ < 2); + _buffer_limit = _buffer_offset + bytes_read; + } + return Status::OK(); +} + +int64_t BufferedReader::size() { + return _reader->size(); +} + +Status BufferedReader::seek(int64_t position) { + _cur_offset = position; + return Status::OK(); +} + +Status BufferedReader::tell(int64_t* position) { + *position = _cur_offset; + return Status::OK(); +} + +void BufferedReader::close() { + _reader->close(); + SAFE_DELETE_ARRAY(_buffer); +} + +bool BufferedReader::closed() { + return _reader->closed(); +} + +} // namespace doris + diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h new file mode 100644 index 00000000000000..d7f2fbd7e675cb --- /dev/null +++ b/be/src/exec/buffered_reader.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "olap/olap_define.h" +#include "exec/file_reader.h" + +namespace doris { + +// Buffered Reader +// Add a cache layer between the caller and the file reader to reduce the +// times of calls to the read function to speed up. +class BufferedReader : public FileReader { +public: + // If the reader need the file size, set it when construct FileReader. + // There is no other way to set the file size. + BufferedReader(FileReader* reader, int64_t = 1024 * 1024); + virtual ~BufferedReader(); + + virtual Status open() override; + + // Read + virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; + virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; + virtual Status read_one_message(uint8_t** buf, size_t* length) override; + virtual int64_t size() override; + virtual Status seek(int64_t position) override; + virtual Status tell(int64_t* position) override; + virtual void close() override; + virtual bool closed() override; + +private: + Status _fill(); + Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out); +private: + FileReader* _reader; + char* _buffer; + int64_t _buffer_size; + int64_t _buffer_offset; + int64_t _buffer_limit; + int64_t _cur_offset; +}; + +} diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index 34af303a003bb0..b3e37d5c974015 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -158,6 +158,9 @@ inline Status ParquetReaderWrap::set_field_null(Tuple* tuple, const SlotDescript Status ParquetReaderWrap::read_record_batch(const std::vector& tuple_slot_descs, bool* eof) { if (_current_line_of_group >= _rows_of_group) {// read next row group + VLOG(7) << "read_record_batch, current group id:" << _current_group << " current line of group:" + << _current_line_of_group << " is larger than rows group size:" + << _rows_of_group << ". start to read next row group"; _current_group++; if (_current_group >= _total_groups) {// read completed. _parquet_column_ids.clear(); @@ -177,6 +180,9 @@ Status ParquetReaderWrap::read_record_batch(const std::vector& } _current_line_of_batch = 0; } else if (_current_line_of_batch >= _batch->num_rows()) { + VLOG(7) << "read_record_batch, current group id:" << _current_group << " current line of batch:" + << _current_line_of_batch << " is larger than batch size:" + << _batch->num_rows() << ". start to read next batch"; arrow::Status status = _rb_batch->ReadNext(&_batch); if (!status.ok()) { return Status::InternalError("Read Batch Error With Libarrow."); diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 375d90f9766de6..cb2268760e6728 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -29,6 +29,7 @@ #include "exec/text_converter.hpp" #include "exec/local_file_reader.h" #include "exec/broker_reader.h" +#include "exec/buffered_reader.h" #include "exec/decompressor.h" #include "exec/parquet_reader.h" @@ -119,8 +120,8 @@ Status ParquetScanner::open_next_reader() { int64_t file_size = 0; // for compatibility if (range.__isset.file_size) { file_size = range.file_size; } - file_reader.reset(new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties, - range.path, range.start_offset, file_size)); + file_reader.reset(new BufferedReader(new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties, + range.path, range.start_offset, file_size))); break; } #if 0 diff --git a/be/test/exec/CMakeLists.txt b/be/test/exec/CMakeLists.txt index 2021480eb3eb1f..9c496671ebce51 100644 --- a/be/test/exec/CMakeLists.txt +++ b/be/test/exec/CMakeLists.txt @@ -51,6 +51,7 @@ ADD_BE_TEST(broker_scanner_test) ADD_BE_TEST(broker_scan_node_test) ADD_BE_TEST(tablet_info_test) ADD_BE_TEST(tablet_sink_test) +ADD_BE_TEST(buffered_reader_test) # ADD_BE_TEST(es_scan_node_test) ADD_BE_TEST(es_http_scan_node_test) ADD_BE_TEST(es_predicate_test) diff --git a/be/test/exec/buffered_reader_test.cpp b/be/test/exec/buffered_reader_test.cpp new file mode 100644 index 00000000000000..3c0bbeb03dcbe7 --- /dev/null +++ b/be/test/exec/buffered_reader_test.cpp @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "exec/local_file_reader.h" +#include "exec/buffered_reader.h" +#include "util/stopwatch.hpp" + +namespace doris { +class BufferedReaderTest : public testing::Test { +public: + BufferedReaderTest() {} + +protected: + virtual void SetUp() { + } + virtual void TearDown() { + } +}; + +TEST_F(BufferedReaderTest, normal_use) { + // buffered_reader_test_file 950 bytes + LocalFileReader file_reader( + "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0); + BufferedReader reader(&file_reader, 1024); + auto st = reader.open(); + ASSERT_TRUE(st.ok()); + uint8_t buf[1024]; + MonotonicStopWatch watch; + watch.start(); + int64_t read_length = 0; + st = reader.readat(0, 1024, &read_length, buf); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(950, read_length); + LOG(INFO) << "read bytes " << read_length << " using time " << watch.elapsed_time(); +} + +TEST_F(BufferedReaderTest, test_validity) { + // buffered_reader_test_file.txt 45 bytes + LocalFileReader file_reader( + "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); + BufferedReader reader(&file_reader, 64); + auto st = reader.open(); + ASSERT_TRUE(st.ok()); + uint8_t buf[10]; + bool eof = false; + size_t buf_len = 10; + + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("vxzAbCdEfG", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("rStUvWxYz\n", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("IjKl", std::string((char*)buf, 4).c_str()); + ASSERT_FALSE(eof); + + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(eof); +} + +TEST_F(BufferedReaderTest, test_seek) { + // buffered_reader_test_file.txt 45 bytes + LocalFileReader file_reader( + "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); + BufferedReader reader(&file_reader, 64); + auto st = reader.open(); + ASSERT_TRUE(st.ok()); + uint8_t buf[10]; + bool eof = false; + size_t buf_len = 10; + + // Seek to the end of the file + st = reader.seek(45); + ASSERT_TRUE(st.ok()); + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(eof); + + // Seek to the beginning of the file + st = reader.seek(0); + ASSERT_TRUE(st.ok()); + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + // Seek to a wrong position + st = reader.seek(-1); + ASSERT_TRUE(st.ok()); + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + // Seek to a wrong position + st = reader.seek(-1000); + ASSERT_TRUE(st.ok()); + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + // Seek to a wrong position + st = reader.seek(1000); + ASSERT_TRUE(st.ok()); + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(eof); +} + +TEST_F(BufferedReaderTest, test_miss) { + // buffered_reader_test_file.txt 45 bytes + LocalFileReader file_reader( + "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); + BufferedReader reader(&file_reader, 64); + auto st = reader.open(); + ASSERT_TRUE(st.ok()); + uint8_t buf[128]; + int64_t bytes_read; + + st = reader.readat(20, 10, &bytes_read, buf); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf, (size_t)bytes_read).c_str()); + ASSERT_EQ(10, bytes_read); + + st = reader.readat(0, 5, &bytes_read, buf); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bdfhj", std::string((char*)buf, (size_t)bytes_read).c_str()); + ASSERT_EQ(5, bytes_read); + + st = reader.readat(5, 10, &bytes_read, buf); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("lnprtvxzAb", std::string((char*)buf, (size_t)bytes_read).c_str()); + ASSERT_EQ(10, bytes_read); + + // if requested length is larger than the capacity of buffer, do not + // need to copy the character into local buffer. + st = reader.readat(0, 128, &bytes_read, buf); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, 10).c_str()); + ASSERT_EQ(45, bytes_read); +} + +} // end namespace doris + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/exec/test_data/buffered_reader/buffered_reader_test_file b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file new file mode 100644 index 00000000000000..88f4883ae807a5 Binary files /dev/null and b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file differ diff --git a/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt new file mode 100644 index 00000000000000..e0e5fb6404add1 --- /dev/null +++ b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt @@ -0,0 +1,4 @@ +bdfhjlnprtvxzAbCdEfGhIj + +MnOpQrStUvWxYz +IjKl diff --git a/run-ut.sh b/run-ut.sh index 64e6324e057db2..32e910481bfa69 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -212,6 +212,7 @@ ${DORIS_TEST_BINARY_DIR}/exec/es_scan_reader_test ${DORIS_TEST_BINARY_DIR}/exec/es_query_builder_test ${DORIS_TEST_BINARY_DIR}/exec/tablet_info_test ${DORIS_TEST_BINARY_DIR}/exec/tablet_sink_test +${DORIS_TEST_BINARY_DIR}/exec/buffered_reader_test # Running runtime Unittest ${DORIS_TEST_BINARY_DIR}/runtime/external_scan_context_mgr_test