diff --git a/be/src/exec/broker_reader.cpp b/be/src/exec/broker_reader.cpp index e5f67b618c427a..64e587e65e0f84 100644 --- a/be/src/exec/broker_reader.cpp +++ b/be/src/exec/broker_reader.cpp @@ -122,7 +122,7 @@ Status BrokerReader::open() { } //not support -Status BrokerReader::read_one_message(uint8_t** buf, size_t* length) { +Status BrokerReader::read_one_message(std::unique_ptr* buf, size_t* length) { return Status::NotSupported("Not support"); } diff --git a/be/src/exec/broker_reader.h b/be/src/exec/broker_reader.h index 4730260588c4f3..d6efc8e9843569 100644 --- a/be/src/exec/broker_reader.h +++ b/be/src/exec/broker_reader.h @@ -50,7 +50,7 @@ class BrokerReader : public FileReader { virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; - virtual Status read_one_message(uint8_t** buf, size_t* length) override; + virtual Status read_one_message(std::unique_ptr* buf, size_t* length) override; virtual int64_t size() override; virtual Status seek(int64_t position) override; virtual Status tell(int64_t* position) override; diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp index 96640b8a0703f3..e9eb0e96472ff0 100644 --- a/be/src/exec/buffered_reader.cpp +++ b/be/src/exec/buffered_reader.cpp @@ -50,8 +50,9 @@ Status BufferedReader::open() { } //not support -Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) { +Status BufferedReader::read_one_message(std::unique_ptr* buf, size_t* length) { return Status::NotSupported("Not support"); + } Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) { diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h index c347ba4ce5a837..1eb185c13cc97e 100644 --- a/be/src/exec/buffered_reader.h +++ b/be/src/exec/buffered_reader.h @@ -41,7 +41,7 @@ class BufferedReader : public FileReader { virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; - virtual Status read_one_message(uint8_t** buf, size_t* length) override; + virtual Status read_one_message(std::unique_ptr* buf, size_t* length) override; virtual int64_t size() override; virtual Status seek(int64_t position) override; virtual Status tell(int64_t* position) override; diff --git a/be/src/exec/file_reader.h b/be/src/exec/file_reader.h index 19b4660a58b815..6c7cb32394f9fa 100644 --- a/be/src/exec/file_reader.h +++ b/be/src/exec/file_reader.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include "common/status.h" @@ -39,12 +40,8 @@ class FileReader { * * if read eof then return Status::OK and length is set 0 and buf is set NULL, * other return readed bytes. - * - * !! Important !! - * the buf must be deleted by user, otherwise leak memory - * !! Important !! */ - virtual Status read_one_message(uint8_t** buf, size_t* length) = 0; + virtual Status read_one_message(std::unique_ptr* buf, size_t* length) = 0; virtual int64_t size() = 0; virtual Status seek(int64_t position) = 0; virtual Status tell(int64_t* position) = 0; diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index bf09b09dde2038..a07bf8ac99032b 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -272,7 +272,7 @@ void JsonReader::_close() { Status JsonReader::_parse_json_doc(bool* eof) { // read a whole message, must be delete json_str by `delete[]` SCOPED_TIMER(_file_read_timer); - uint8_t* json_str = nullptr; + std::unique_ptr json_str; size_t length = 0; RETURN_IF_ERROR(_file_reader->read_one_message(&json_str, &length)); _bytes_read_counter += length; @@ -283,15 +283,16 @@ Status JsonReader::_parse_json_doc(bool* eof) { bool has_parse_error = false; // parse jsondata to JsonDoc + // As the issue: https://github.com/Tencent/rapidjson/issues/1458 // Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag. if (_num_as_string) { has_parse_error = _origin_json_doc - .Parse((char*)json_str, length) + .Parse((char*)json_str.get(), length) .HasParseError(); } else { - has_parse_error = _origin_json_doc.Parse((char*)json_str, length).HasParseError(); + has_parse_error = _origin_json_doc.Parse((char*)json_str.get(), length).HasParseError(); } if (has_parse_error) { @@ -299,12 +300,10 @@ Status JsonReader::_parse_json_doc(bool* eof) { str_error << "Parse json data for JsonDoc failed. code = " << _origin_json_doc.GetParseError() << ", error-info:" << rapidjson::GetParseError_En(_origin_json_doc.GetParseError()); - _state->append_error_msg_to_file(std::string((char*)json_str, length), str_error.str()); + _state->append_error_msg_to_file(std::string((char*)json_str.get(), length), str_error.str()); _counter->num_rows_filtered++; - delete[] json_str; return Status::DataQualityError(str_error.str()); } - delete[] json_str; // set json root if (_parsed_json_root.size() != 0) { diff --git a/be/src/exec/local_file_reader.cpp b/be/src/exec/local_file_reader.cpp index 70245573777bf2..04a795ac5cf0cf 100644 --- a/be/src/exec/local_file_reader.cpp +++ b/be/src/exec/local_file_reader.cpp @@ -53,21 +53,17 @@ bool LocalFileReader::closed() { } // Read all bytes -Status LocalFileReader::read_one_message(uint8_t** buf, size_t* length) { +Status LocalFileReader::read_one_message(std::unique_ptr* buf, size_t* length) { bool eof; int64_t file_size = size() - _current_offset; if (file_size <= 0) { - *buf = nullptr; + buf->reset(); *length = 0; return Status::OK(); } *length = file_size; - *buf = new uint8_t[file_size]; - read(*buf, length, &eof); - if (*length == 0) { - delete *buf; - *buf = nullptr; - } + buf->reset(new uint8_t[file_size]); + read(buf->get(), length, &eof); return Status::OK(); } diff --git a/be/src/exec/local_file_reader.h b/be/src/exec/local_file_reader.h index 07c1d191d20bbb..4a302cca2d1bc8 100644 --- a/be/src/exec/local_file_reader.h +++ b/be/src/exec/local_file_reader.h @@ -38,7 +38,7 @@ class LocalFileReader : public FileReader { virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override; virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) override; - virtual Status read_one_message(uint8_t** buf, size_t* length) override; + virtual Status read_one_message(std::unique_ptr* buf, size_t* length) override; virtual int64_t size() override; virtual Status seek(int64_t position) override; virtual Status tell(int64_t* position) override; diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index 6b36847d65ffdb..0861da5181023c 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -86,7 +86,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { // If _total_length == -1, this should be a Kafka routine load task, // just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data. // Otherwise, this should be a stream load task that needs to read the specified amount of data. - Status read_one_message(uint8_t** data, size_t* length) override { + Status read_one_message(std::unique_ptr* data, size_t* length) override { if (_total_length < -1) { std::stringstream ss; ss << "invalid, _total_length is: " << _total_length; @@ -102,10 +102,10 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { } // _total_length > 0, read the entire data - *data = new uint8_t[_total_length]; + data->reset(new uint8_t[_total_length]); *length = _total_length; bool eof = false; - Status st = read(*data, length, &eof); + Status st = read(data->get(), length, &eof); if (eof) { *length = 0; } @@ -188,7 +188,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { private: // read the next buffer from _buf_queue - Status _read_next_buffer(uint8_t** data, size_t* length) { + Status _read_next_buffer(std::unique_ptr* data, size_t* length) { std::unique_lock l(_lock); while (!_cancelled && !_finished && _buf_queue.empty()) { _get_cond.wait(l); @@ -200,14 +200,14 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { // finished if (_buf_queue.empty()) { DCHECK(_finished); - *data = nullptr; + data->reset(); *length = 0; return Status::OK(); } auto buf = _buf_queue.front(); *length = buf->remaining(); - *data = new uint8_t[*length]; - buf->get_bytes((char*)(*data), *length); + data->reset(new uint8_t[*length]); + buf->get_bytes((char*)(data->get()), *length); _buf_queue.pop_front(); _buffered_bytes -= buf->limit;