From bec31a76f4ca8ab337415633e4925aa671569261 Mon Sep 17 00:00:00 2001 From: xy720 Date: Tue, 16 Jun 2020 10:14:27 +0800 Subject: [PATCH 1/8] add cache buffer reader --- be/src/exec/CMakeLists.txt | 1 + be/src/exec/broker_reader.cpp | 3 + be/src/exec/buffered_reader.cpp | 162 ++++++++++++++++++++++++++++++++ be/src/exec/buffered_reader.h | 61 ++++++++++++ be/src/exec/parquet_reader.cpp | 8 ++ be/src/exec/parquet_scanner.cpp | 5 +- 6 files changed, 238 insertions(+), 2 deletions(-) create mode 100644 be/src/exec/buffered_reader.cpp create mode 100644 be/src/exec/buffered_reader.h diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index c42afcea254b78..39ab4c86deee3f 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -30,6 +30,7 @@ set(EXEC_FILES blocking_join_node.cpp broker_scan_node.cpp broker_reader.cpp + buffered_reader.cpp base_scanner.cpp broker_scanner.cpp cross_join_node.cpp diff --git a/be/src/exec/broker_reader.cpp b/be/src/exec/broker_reader.cpp index dac2a5b6c6d951..b30f484f514977 100644 --- a/be/src/exec/broker_reader.cpp +++ b/be/src/exec/broker_reader.cpp @@ -155,6 +155,8 @@ Status BrokerReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_rea return status; } + LOG(DEBUG) << "send readat request to broker:" << broker_addr << " position:" << position << ", read bytes length:" << nbytes; + try { client->pread(response, request); } catch (apache::thrift::transport::TTransportException& e) { @@ -253,3 +255,4 @@ void BrokerReader::close() { } } // namespace doris + diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp new file mode 100644 index 00000000000000..c2bf04846297e0 --- /dev/null +++ b/be/src/exec/buffered_reader.cpp @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/buffered_reader.h" + +#include +#include + +#include "common/logging.h" + +namespace doris { + +// buffered reader + +BufferedReader::BufferedReader(FileReader* reader) + : _reader(reader), + _buffer_size(1024 * 1024), + _buffer_offset(0), + _buffer_limit(0), + _cur_offset(0) { + _buffer = new char[_buffer_size]; +} + +BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size) + : _reader(reader), + _buffer_size(buffer_size), + _buffer_offset(0), + _buffer_limit(0), + _cur_offset(0) { + _buffer = new char[_buffer_size]; +} + +BufferedReader::~BufferedReader() { + close(); +} + +Status BufferedReader::open() { + if (!_reader) { + std::stringstream ss; + ss << "Open buffered reader failed, reader is null"; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + RETURN_IF_ERROR(_reader->open()); + RETURN_IF_ERROR(fill()); + return Status::OK(); +} + +//not support +Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) { + return Status::NotSupported("Not support"); +} + +Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) { + DCHECK_NE(*buf_len, 0); + int64_t bytes_read; + RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf)); + if (bytes_read == 0) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { + if (nbytes <= 0) { + *bytes_read = 0; + return Status::OK(); + } + RETURN_IF_ERROR(read_once(position, nbytes, bytes_read, out)); + //EOF + if (*bytes_read <= 0) { + return Status::OK(); + } + while (*bytes_read < nbytes) { + int64_t len; + RETURN_IF_ERROR(read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast(out) + *bytes_read)); + // EOF + if (len <= 0) { + break; + } + *bytes_read += len; + } + return Status::OK(); +} + +Status BufferedReader::read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { + // requested bytes missed the local buffer + if (position >= _buffer_limit || position < _buffer_offset) { + // if requested length is larger than the capacity of buffer, do not + // need to copy the character into local buffer. + if (nbytes > _buffer_limit - _buffer_offset) { + return _reader->readat(position, nbytes, bytes_read, out); + } + _buffer_offset = position; + RETURN_IF_ERROR(fill()); + if (position >= _buffer_limit) { + *bytes_read = 0; + return Status::OK(); + } + } + int64_t len = std::min(_buffer_limit - position, nbytes); + int64_t off = position - _buffer_offset; + memcpy(out, _buffer + off, len); + *bytes_read = len; + _cur_offset = position + *bytes_read; + return Status::OK(); +} + +Status BufferedReader::fill() { + if (_buffer_offset >= 0) { + int64_t bytes_read; + int retry_times = 1; + do { + // fill the buffer + RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer)); + } while (bytes_read == 0 && retry_times++ < 2); + _buffer_limit = _buffer_offset + bytes_read; + } + return Status::OK(); +} + +int64_t BufferedReader::size() { + return _reader->size(); +} + +Status BufferedReader::seek(int64_t position) { + _cur_offset = position; + return Status::OK(); +} + +Status BufferedReader::tell(int64_t* position) { + *position = _cur_offset; + return Status::OK(); +} + +void BufferedReader::close() { + _reader->close(); + SAFE_DELETE_ARRAY(_buffer); +} + +bool BufferedReader::closed() { + return _reader->closed(); +} + +} // namespace doris + diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h new file mode 100644 index 00000000000000..111e75c015f4a3 --- /dev/null +++ b/be/src/exec/buffered_reader.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "olap/olap_define.h" +#include "exec/file_reader.h" + +namespace doris { + +// Buffered Reader of broker +class BufferedReader : public FileReader { +public: + // If the reader need the file size, set it when construct BrokerReader. + // There is no other way to set the file size. + BufferedReader(FileReader* reader); + BufferedReader(FileReader* reader, int64_t buffer_size); + virtual ~BufferedReader(); + + virtual Status open() override; + + // Read + virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; + virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; + virtual Status read_one_message(uint8_t** buf, size_t* length) override; + virtual int64_t size() override; + virtual Status seek(int64_t position) override; + virtual Status tell(int64_t* position) override; + virtual void close() override; + virtual bool closed() override; + +private: + Status fill(); + Status read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out); +private: + FileReader* _reader; + char* _buffer; + int64_t _buffer_size; + int64_t _buffer_offset; + int64_t _buffer_limit; + int64_t _cur_offset; +}; + +} diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index 34af303a003bb0..42245557d66ad2 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -158,6 +158,10 @@ inline Status ParquetReaderWrap::set_field_null(Tuple* tuple, const SlotDescript Status ParquetReaderWrap::read_record_batch(const std::vector& tuple_slot_descs, bool* eof) { if (_current_line_of_group >= _rows_of_group) {// read next row group + LOG(DEBUG) << "read_record_batch, current group id:" << _current_group << " current line of group:" + << _current_line_of_group << " is larger than rows group size:" + << _rows_of_group << ". start to read next row group"; + _current_group++; if (_current_group >= _total_groups) {// read completed. _parquet_column_ids.clear(); @@ -177,6 +181,10 @@ Status ParquetReaderWrap::read_record_batch(const std::vector& } _current_line_of_batch = 0; } else if (_current_line_of_batch >= _batch->num_rows()) { + LOG(DEBUG) << "read_record_batch, current group id:" << _current_group << " current line of batch:" + << _current_line_of_batch << " is larger than batch size:" + << _batch->num_rows() << ". start to read next batch"; + arrow::Status status = _rb_batch->ReadNext(&_batch); if (!status.ok()) { return Status::InternalError("Read Batch Error With Libarrow."); diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 375d90f9766de6..cb2268760e6728 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -29,6 +29,7 @@ #include "exec/text_converter.hpp" #include "exec/local_file_reader.h" #include "exec/broker_reader.h" +#include "exec/buffered_reader.h" #include "exec/decompressor.h" #include "exec/parquet_reader.h" @@ -119,8 +120,8 @@ Status ParquetScanner::open_next_reader() { int64_t file_size = 0; // for compatibility if (range.__isset.file_size) { file_size = range.file_size; } - file_reader.reset(new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties, - range.path, range.start_offset, file_size)); + file_reader.reset(new BufferedReader(new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties, + range.path, range.start_offset, file_size))); break; } #if 0 From dfc7453b6fc663f5ef2a780aa171851c11db9a85 Mon Sep 17 00:00:00 2001 From: xy720 Date: Thu, 18 Jun 2020 23:51:39 +0800 Subject: [PATCH 2/8] add ut --- be/src/exec/buffered_reader.cpp | 14 +-- be/src/exec/buffered_reader.h | 8 +- be/test/exec/CMakeLists.txt | 1 + be/test/exec/buffered_reader_test.cpp | 102 ++++++++++++++++++ .../buffered_reader/buffered_reader_test_file | Bin 0 -> 950 bytes .../buffered_reader_test_file.txt | 4 + run-ut.sh | 8 +- 7 files changed, 123 insertions(+), 14 deletions(-) create mode 100644 be/test/exec/buffered_reader_test.cpp create mode 100644 be/test/exec/test_data/buffered_reader/buffered_reader_test_file create mode 100644 be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp index c2bf04846297e0..119a2e78df1a86 100644 --- a/be/src/exec/buffered_reader.cpp +++ b/be/src/exec/buffered_reader.cpp @@ -52,11 +52,10 @@ Status BufferedReader::open() { if (!_reader) { std::stringstream ss; ss << "Open buffered reader failed, reader is null"; - LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } RETURN_IF_ERROR(_reader->open()); - RETURN_IF_ERROR(fill()); + RETURN_IF_ERROR(_fill()); return Status::OK(); } @@ -82,14 +81,14 @@ Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r *bytes_read = 0; return Status::OK(); } - RETURN_IF_ERROR(read_once(position, nbytes, bytes_read, out)); + RETURN_IF_ERROR(_read_once(position, nbytes, bytes_read, out)); //EOF if (*bytes_read <= 0) { return Status::OK(); } while (*bytes_read < nbytes) { int64_t len; - RETURN_IF_ERROR(read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast(out) + *bytes_read)); + RETURN_IF_ERROR(_read_once(position + *bytes_read, nbytes - *bytes_read, &len, reinterpret_cast(out) + *bytes_read)); // EOF if (len <= 0) { break; @@ -99,7 +98,7 @@ Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r return Status::OK(); } -Status BufferedReader::read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { +Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) { // requested bytes missed the local buffer if (position >= _buffer_limit || position < _buffer_offset) { // if requested length is larger than the capacity of buffer, do not @@ -108,7 +107,7 @@ Status BufferedReader::read_once(int64_t position, int64_t nbytes, int64_t* byte return _reader->readat(position, nbytes, bytes_read, out); } _buffer_offset = position; - RETURN_IF_ERROR(fill()); + RETURN_IF_ERROR(_fill()); if (position >= _buffer_limit) { *bytes_read = 0; return Status::OK(); @@ -122,9 +121,10 @@ Status BufferedReader::read_once(int64_t position, int64_t nbytes, int64_t* byte return Status::OK(); } -Status BufferedReader::fill() { +Status BufferedReader::_fill() { if (_buffer_offset >= 0) { int64_t bytes_read; + // retry for new content int retry_times = 1; do { // fill the buffer diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h index 111e75c015f4a3..0c3ae5786d3837 100644 --- a/be/src/exec/buffered_reader.h +++ b/be/src/exec/buffered_reader.h @@ -25,10 +25,10 @@ namespace doris { -// Buffered Reader of broker +// Buffered Reader class BufferedReader : public FileReader { public: - // If the reader need the file size, set it when construct BrokerReader. + // If the reader need the file size, set it when construct FileReader. // There is no other way to set the file size. BufferedReader(FileReader* reader); BufferedReader(FileReader* reader, int64_t buffer_size); @@ -47,8 +47,8 @@ class BufferedReader : public FileReader { virtual bool closed() override; private: - Status fill(); - Status read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out); + Status _fill(); + Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out); private: FileReader* _reader; char* _buffer; diff --git a/be/test/exec/CMakeLists.txt b/be/test/exec/CMakeLists.txt index 2021480eb3eb1f..9c496671ebce51 100644 --- a/be/test/exec/CMakeLists.txt +++ b/be/test/exec/CMakeLists.txt @@ -51,6 +51,7 @@ ADD_BE_TEST(broker_scanner_test) ADD_BE_TEST(broker_scan_node_test) ADD_BE_TEST(tablet_info_test) ADD_BE_TEST(tablet_sink_test) +ADD_BE_TEST(buffered_reader_test) # ADD_BE_TEST(es_scan_node_test) ADD_BE_TEST(es_http_scan_node_test) ADD_BE_TEST(es_predicate_test) diff --git a/be/test/exec/buffered_reader_test.cpp b/be/test/exec/buffered_reader_test.cpp new file mode 100644 index 00000000000000..fe46bc3e7b9850 --- /dev/null +++ b/be/test/exec/buffered_reader_test.cpp @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "exec/local_file_reader.h" +#include "exec/buffered_reader.h" +#include "util/stopwatch.hpp" + +namespace doris { +class BufferedReaderTest : public testing::Test { +public: + BufferedReaderTest() {} + +protected: + virtual void SetUp() { + } + virtual void TearDown() { + } +}; + +TEST_F(BufferedReaderTest, normal_use) { + LocalFileReader file_reader( + "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0); + BufferedReader reader(&file_reader); + auto st = reader.open(); + ASSERT_TRUE(st.ok()); + uint8_t buf[32 * 1024]; + MonotonicStopWatch watch; + watch.start(); + bool eof = false; + size_t read_length = 0; + while (!eof) { + size_t buf_len = 32 * 1024; + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + read_length += buf_len; + } + + LOG(INFO) << "read bytes " << read_length << " using time " << watch.elapsed_time(); +} + +TEST_F(BufferedReaderTest, test_validity) { + LocalFileReader file_reader( + "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); + BufferedReader reader(&file_reader, 128 * 1024); + auto st = reader.open(); + ASSERT_TRUE(st.ok()); + uint8_t buf[10]; + bool eof = false; + size_t buf_len = 10; + + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("vxzAbCdEfG", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("rStUvWxYz\n", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("IjKl", std::string((char*)buf, 4).c_str()); + ASSERT_FALSE(eof); + + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(eof); +} + +} // end namespace doris + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/exec/test_data/buffered_reader/buffered_reader_test_file b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file new file mode 100644 index 0000000000000000000000000000000000000000..88f4883ae807a58feeb02ce838edbc48b251dd6b GIT binary patch literal 950 zcmZvbO>5LZ7{@2c>^7DlDn3IZIpory&@P*_$!1}pD~Kp~DB@Y!&CX(IlCA01_E1VK zJ+0t3@Fu87{Sf^sUZj2i59>2|sr#10B=eu=_q@!X>^*+z00kAa7a#*AglZqA(Q{LRz~nqO&M23A<6=+pu9{ARi|QIrtbkOYu~bJ81a_JJY7mr%VoWI zEhZsgf+e~oWw+n1esp!Z>wVhILvQaC-3mb*VcaG(X8`rd$N1OZ?+?1Vy7lw(pRZR^ zHwW3P0<6R;3==F=9L`TzRTLwOBan^LXIhjF_{kf`@tc7I8j2gTF7G?lhf6?1g{HlP zZy=-MrXHo?DB%RwakExzFiw#5Fpk3gL&B(G8nLaKSj1Cl-4HoJW|WQDsjYSv%rgpG zWnmKqF`s=Y&9d$ddnqA2=iLhUTsA=~3+Jup_=UtKl6@|BO3VH`w<5(vF+s%?1bZ(B z&Xc@meS4x{4|9GRCU%ZjAUgkz_WoRj=tL3*&Nrjm_Dyw-04mAYl_w9 zyqzK0ml~tQF!j5~Sv2NZBN+{N+z|HZYy6~_9K@XH&kuUpyCXi{N;2y`&kmzeYB`Od z;aU&-M^QYmLf>=URuK4Zn|tkM&)eqxRy%0z^xNLh54VRy#~C_)_=r5knfLII{ReV7 Bss;c6 literal 0 HcmV?d00001 diff --git a/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt new file mode 100644 index 00000000000000..e0e5fb6404add1 --- /dev/null +++ b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt @@ -0,0 +1,4 @@ +bdfhjlnprtvxzAbCdEfGhIj + +MnOpQrStUvWxYz +IjKl diff --git a/run-ut.sh b/run-ut.sh index 64e6324e057db2..aafaada8eb15e5 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -89,7 +89,7 @@ fi cd ${DORIS_HOME}/be/ut_build/ -${CMAKE_CMD} ../ -DWITH_MYSQL=OFF -DMAKE_TEST=ON -DCMAKE_BUILD_TYPE=DEBUG +${CMAKE_CMD} ../ -DWITH_MYSQL=OFF -DMAKE_TEST=ON make -j${PARALLEL} if [ ${RUN} -ne 1 ]; then @@ -212,6 +212,7 @@ ${DORIS_TEST_BINARY_DIR}/exec/es_scan_reader_test ${DORIS_TEST_BINARY_DIR}/exec/es_query_builder_test ${DORIS_TEST_BINARY_DIR}/exec/tablet_info_test ${DORIS_TEST_BINARY_DIR}/exec/tablet_sink_test +${DORIS_TEST_BINARY_DIR}/exec/buffered_reader_test # Running runtime Unittest ${DORIS_TEST_BINARY_DIR}/runtime/external_scan_context_mgr_test @@ -267,7 +268,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/serialize_test ${DORIS_TEST_BINARY_DIR}/olap/options_test # Running memory engine Unittest -${DORIS_TEST_BINARY_DIR}/olap/memory/hash_index_test +# ${DORIS_TEST_BINARY_DIR}/olap/memory/hash_index_test # Running segment v2 test ${DORIS_TEST_BINARY_DIR}/olap/tablet_meta_manager_test @@ -306,7 +307,8 @@ ${DORIS_TEST_BINARY_DIR}/olap/short_key_index_test ${DORIS_TEST_BINARY_DIR}/olap/key_coder_test ${DORIS_TEST_BINARY_DIR}/olap/page_cache_test ${DORIS_TEST_BINARY_DIR}/olap/hll_test -${DORIS_TEST_BINARY_DIR}/olap/selection_vector_test +# ${DORIS_TEST_BINARY_DIR}/olap/selection_vector_tes +${DORIS_TEST_BINARY_DIR}/olap/push_handler_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test From 3dcd6a8bd4bcff3b3c47a0a27e00d77130dc3396 Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 19 Jun 2020 10:31:19 +0800 Subject: [PATCH 3/8] be/src/exec/buffered_reader.h --- be/src/exec/buffered_reader.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h index 0c3ae5786d3837..295cb440d22470 100644 --- a/be/src/exec/buffered_reader.h +++ b/be/src/exec/buffered_reader.h @@ -26,6 +26,8 @@ namespace doris { // Buffered Reader +// Add a cache layer between the caller and the file reader to reduce the +// times of calls to the read function to speed up. class BufferedReader : public FileReader { public: // If the reader need the file size, set it when construct FileReader. From 91651659e0a6c250c586abeb170e5cfbaad7bf5b Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 19 Jun 2020 10:42:18 +0800 Subject: [PATCH 4/8] fix run-ut.sh --- run-ut.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/run-ut.sh b/run-ut.sh index aafaada8eb15e5..93f645b67791b6 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -89,7 +89,7 @@ fi cd ${DORIS_HOME}/be/ut_build/ -${CMAKE_CMD} ../ -DWITH_MYSQL=OFF -DMAKE_TEST=ON +${CMAKE_CMD} ../ -DWITH_MYSQL=OFF -DMAKE_TEST=ON -DCMAKE_BUILD_TYPE=DEBUG make -j${PARALLEL} if [ ${RUN} -ne 1 ]; then @@ -268,7 +268,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/serialize_test ${DORIS_TEST_BINARY_DIR}/olap/options_test # Running memory engine Unittest -# ${DORIS_TEST_BINARY_DIR}/olap/memory/hash_index_test +${DORIS_TEST_BINARY_DIR}/olap/memory/hash_index_test # Running segment v2 test ${DORIS_TEST_BINARY_DIR}/olap/tablet_meta_manager_test @@ -307,7 +307,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/short_key_index_test ${DORIS_TEST_BINARY_DIR}/olap/key_coder_test ${DORIS_TEST_BINARY_DIR}/olap/page_cache_test ${DORIS_TEST_BINARY_DIR}/olap/hll_test -# ${DORIS_TEST_BINARY_DIR}/olap/selection_vector_tes +${DORIS_TEST_BINARY_DIR}/olap/selection_vector_tes ${DORIS_TEST_BINARY_DIR}/olap/push_handler_test # Running routine load test From 6b62621b4beb539eafefdd2bcc6a91596b9bfd1a Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 19 Jun 2020 10:44:33 +0800 Subject: [PATCH 5/8] fix run-ut.sh --- run-ut.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/run-ut.sh b/run-ut.sh index 93f645b67791b6..e163b5c9277197 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -308,7 +308,6 @@ ${DORIS_TEST_BINARY_DIR}/olap/key_coder_test ${DORIS_TEST_BINARY_DIR}/olap/page_cache_test ${DORIS_TEST_BINARY_DIR}/olap/hll_test ${DORIS_TEST_BINARY_DIR}/olap/selection_vector_tes -${DORIS_TEST_BINARY_DIR}/olap/push_handler_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test From 50f6ea119373612919a02b8cb5df5aa53e5f8569 Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 19 Jun 2020 10:45:24 +0800 Subject: [PATCH 6/8] fix run-ut.sh --- run-ut.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run-ut.sh b/run-ut.sh index e163b5c9277197..32e910481bfa69 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -307,7 +307,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/short_key_index_test ${DORIS_TEST_BINARY_DIR}/olap/key_coder_test ${DORIS_TEST_BINARY_DIR}/olap/page_cache_test ${DORIS_TEST_BINARY_DIR}/olap/hll_test -${DORIS_TEST_BINARY_DIR}/olap/selection_vector_tes +${DORIS_TEST_BINARY_DIR}/olap/selection_vector_test # Running routine load test ${DORIS_TEST_BINARY_DIR}/runtime/kafka_consumer_pipe_test From dc80a3713258feb7c25336add4280f56efbccba0 Mon Sep 17 00:00:00 2001 From: xy720 Date: Mon, 22 Jun 2020 12:25:15 +0800 Subject: [PATCH 7/8] add some ut --- be/src/exec/buffered_reader.cpp | 2 +- be/test/exec/buffered_reader_test.cpp | 104 +++++++++++++++++++++++--- 2 files changed, 93 insertions(+), 13 deletions(-) diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp index 119a2e78df1a86..2e6985319e2673 100644 --- a/be/src/exec/buffered_reader.cpp +++ b/be/src/exec/buffered_reader.cpp @@ -103,7 +103,7 @@ Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* byt if (position >= _buffer_limit || position < _buffer_offset) { // if requested length is larger than the capacity of buffer, do not // need to copy the character into local buffer. - if (nbytes > _buffer_limit - _buffer_offset) { + if (nbytes > _buffer_size) { return _reader->readat(position, nbytes, bytes_read, out); } _buffer_offset = position; diff --git a/be/test/exec/buffered_reader_test.cpp b/be/test/exec/buffered_reader_test.cpp index fe46bc3e7b9850..3c0bbeb03dcbe7 100644 --- a/be/test/exec/buffered_reader_test.cpp +++ b/be/test/exec/buffered_reader_test.cpp @@ -34,30 +34,27 @@ class BufferedReaderTest : public testing::Test { }; TEST_F(BufferedReaderTest, normal_use) { + // buffered_reader_test_file 950 bytes LocalFileReader file_reader( "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0); - BufferedReader reader(&file_reader); + BufferedReader reader(&file_reader, 1024); auto st = reader.open(); ASSERT_TRUE(st.ok()); - uint8_t buf[32 * 1024]; + uint8_t buf[1024]; MonotonicStopWatch watch; watch.start(); - bool eof = false; - size_t read_length = 0; - while (!eof) { - size_t buf_len = 32 * 1024; - st = reader.read(buf, &buf_len, &eof); - ASSERT_TRUE(st.ok()); - read_length += buf_len; - } - + int64_t read_length = 0; + st = reader.readat(0, 1024, &read_length, buf); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(950, read_length); LOG(INFO) << "read bytes " << read_length << " using time " << watch.elapsed_time(); } TEST_F(BufferedReaderTest, test_validity) { + // buffered_reader_test_file.txt 45 bytes LocalFileReader file_reader( "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); - BufferedReader reader(&file_reader, 128 * 1024); + BufferedReader reader(&file_reader, 64); auto st = reader.open(); ASSERT_TRUE(st.ok()); uint8_t buf[10]; @@ -94,6 +91,89 @@ TEST_F(BufferedReaderTest, test_validity) { ASSERT_TRUE(eof); } +TEST_F(BufferedReaderTest, test_seek) { + // buffered_reader_test_file.txt 45 bytes + LocalFileReader file_reader( + "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); + BufferedReader reader(&file_reader, 64); + auto st = reader.open(); + ASSERT_TRUE(st.ok()); + uint8_t buf[10]; + bool eof = false; + size_t buf_len = 10; + + // Seek to the end of the file + st = reader.seek(45); + ASSERT_TRUE(st.ok()); + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(eof); + + // Seek to the beginning of the file + st = reader.seek(0); + ASSERT_TRUE(st.ok()); + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + // Seek to a wrong position + st = reader.seek(-1); + ASSERT_TRUE(st.ok()); + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + // Seek to a wrong position + st = reader.seek(-1000); + ASSERT_TRUE(st.ok()); + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str()); + ASSERT_FALSE(eof); + + // Seek to a wrong position + st = reader.seek(1000); + ASSERT_TRUE(st.ok()); + st = reader.read(buf, &buf_len, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(eof); +} + +TEST_F(BufferedReaderTest, test_miss) { + // buffered_reader_test_file.txt 45 bytes + LocalFileReader file_reader( + "./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0); + BufferedReader reader(&file_reader, 64); + auto st = reader.open(); + ASSERT_TRUE(st.ok()); + uint8_t buf[128]; + int64_t bytes_read; + + st = reader.readat(20, 10, &bytes_read, buf); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf, (size_t)bytes_read).c_str()); + ASSERT_EQ(10, bytes_read); + + st = reader.readat(0, 5, &bytes_read, buf); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bdfhj", std::string((char*)buf, (size_t)bytes_read).c_str()); + ASSERT_EQ(5, bytes_read); + + st = reader.readat(5, 10, &bytes_read, buf); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("lnprtvxzAb", std::string((char*)buf, (size_t)bytes_read).c_str()); + ASSERT_EQ(10, bytes_read); + + // if requested length is larger than the capacity of buffer, do not + // need to copy the character into local buffer. + st = reader.readat(0, 128, &bytes_read, buf); + ASSERT_TRUE(st.ok()); + ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, 10).c_str()); + ASSERT_EQ(45, bytes_read); +} + } // end namespace doris int main(int argc, char **argv) { From adecd22e15c9949cab191c693db74541fdd58624 Mon Sep 17 00:00:00 2001 From: xy720 Date: Mon, 22 Jun 2020 16:20:18 +0800 Subject: [PATCH 8/8] fix vlog --- be/src/exec/broker_reader.cpp | 2 +- be/src/exec/buffered_reader.cpp | 10 ---------- be/src/exec/buffered_reader.h | 3 +-- be/src/exec/parquet_reader.cpp | 6 ++---- 4 files changed, 4 insertions(+), 17 deletions(-) diff --git a/be/src/exec/broker_reader.cpp b/be/src/exec/broker_reader.cpp index b30f484f514977..2125bb31cbb2a9 100644 --- a/be/src/exec/broker_reader.cpp +++ b/be/src/exec/broker_reader.cpp @@ -155,7 +155,7 @@ Status BrokerReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_rea return status; } - LOG(DEBUG) << "send readat request to broker:" << broker_addr << " position:" << position << ", read bytes length:" << nbytes; + VLOG_RPC << "send pread request to broker:" << broker_addr << " position:" << position << ", read bytes length:" << nbytes; try { client->pread(response, request); diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp index 2e6985319e2673..696067fb6db70c 100644 --- a/be/src/exec/buffered_reader.cpp +++ b/be/src/exec/buffered_reader.cpp @@ -25,16 +25,6 @@ namespace doris { // buffered reader - -BufferedReader::BufferedReader(FileReader* reader) - : _reader(reader), - _buffer_size(1024 * 1024), - _buffer_offset(0), - _buffer_limit(0), - _cur_offset(0) { - _buffer = new char[_buffer_size]; -} - BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size) : _reader(reader), _buffer_size(buffer_size), diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h index 295cb440d22470..d7f2fbd7e675cb 100644 --- a/be/src/exec/buffered_reader.h +++ b/be/src/exec/buffered_reader.h @@ -32,8 +32,7 @@ class BufferedReader : public FileReader { public: // If the reader need the file size, set it when construct FileReader. // There is no other way to set the file size. - BufferedReader(FileReader* reader); - BufferedReader(FileReader* reader, int64_t buffer_size); + BufferedReader(FileReader* reader, int64_t = 1024 * 1024); virtual ~BufferedReader(); virtual Status open() override; diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index 42245557d66ad2..b3e37d5c974015 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -158,10 +158,9 @@ inline Status ParquetReaderWrap::set_field_null(Tuple* tuple, const SlotDescript Status ParquetReaderWrap::read_record_batch(const std::vector& tuple_slot_descs, bool* eof) { if (_current_line_of_group >= _rows_of_group) {// read next row group - LOG(DEBUG) << "read_record_batch, current group id:" << _current_group << " current line of group:" + VLOG(7) << "read_record_batch, current group id:" << _current_group << " current line of group:" << _current_line_of_group << " is larger than rows group size:" << _rows_of_group << ". start to read next row group"; - _current_group++; if (_current_group >= _total_groups) {// read completed. _parquet_column_ids.clear(); @@ -181,10 +180,9 @@ Status ParquetReaderWrap::read_record_batch(const std::vector& } _current_line_of_batch = 0; } else if (_current_line_of_batch >= _batch->num_rows()) { - LOG(DEBUG) << "read_record_batch, current group id:" << _current_group << " current line of batch:" + VLOG(7) << "read_record_batch, current group id:" << _current_group << " current line of batch:" << _current_line_of_batch << " is larger than batch size:" << _batch->num_rows() << ". start to read next batch"; - arrow::Status status = _rb_batch->ReadNext(&_batch); if (!status.ok()) { return Status::InternalError("Read Batch Error With Libarrow.");