Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/exec/broker_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ Status BrokerReader::open() {
}

//not support
Status BrokerReader::read_one_message(uint8_t** buf, size_t* length) {
Status BrokerReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
return Status::NotSupported("Not support");
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/broker_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BrokerReader : public FileReader {
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(uint8_t** buf, size_t* length) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ Status BufferedReader::open() {
}

//not support
Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) {
Status BufferedReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
return Status::NotSupported("Not support");

}

Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BufferedReader : public FileReader {
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(uint8_t** buf, size_t* length) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
Expand Down
7 changes: 2 additions & 5 deletions be/src/exec/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <stdint.h>
#include <memory>

#include "common/status.h"

Expand All @@ -39,12 +40,8 @@ class FileReader {
*
* if read eof then return Status::OK and length is set 0 and buf is set NULL,
* other return readed bytes.
*
* !! Important !!
* the buf must be deleted by user, otherwise leak memory
* !! Important !!
*/
virtual Status read_one_message(uint8_t** buf, size_t* length) = 0;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) = 0;
virtual int64_t size() = 0;
virtual Status seek(int64_t position) = 0;
virtual Status tell(int64_t* position) = 0;
Expand Down
11 changes: 5 additions & 6 deletions be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ void JsonReader::_close() {
Status JsonReader::_parse_json_doc(bool* eof) {
// read a whole message, must be delete json_str by `delete[]`
SCOPED_TIMER(_file_read_timer);
uint8_t* json_str = nullptr;
std::unique_ptr<uint8_t[]> json_str;
size_t length = 0;
RETURN_IF_ERROR(_file_reader->read_one_message(&json_str, &length));
_bytes_read_counter += length;
Expand All @@ -283,28 +283,27 @@ Status JsonReader::_parse_json_doc(bool* eof) {

bool has_parse_error = false;
// parse jsondata to JsonDoc

// As the issue: https://github.com/Tencent/rapidjson/issues/1458
// Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag.
if (_num_as_string) {
has_parse_error =
_origin_json_doc
.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, length)
.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str.get(), length)
.HasParseError();
} else {
has_parse_error = _origin_json_doc.Parse((char*)json_str, length).HasParseError();
has_parse_error = _origin_json_doc.Parse((char*)json_str.get(), length).HasParseError();
}

if (has_parse_error) {
std::stringstream str_error;
str_error << "Parse json data for JsonDoc failed. code = "
<< _origin_json_doc.GetParseError() << ", error-info:"
<< rapidjson::GetParseError_En(_origin_json_doc.GetParseError());
_state->append_error_msg_to_file(std::string((char*)json_str, length), str_error.str());
_state->append_error_msg_to_file(std::string((char*)json_str.get(), length), str_error.str());
_counter->num_rows_filtered++;
delete[] json_str;
return Status::DataQualityError(str_error.str());
}
delete[] json_str;

// set json root
if (_parsed_json_root.size() != 0) {
Expand Down
12 changes: 4 additions & 8 deletions be/src/exec/local_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,17 @@ bool LocalFileReader::closed() {
}

// Read all bytes
Status LocalFileReader::read_one_message(uint8_t** buf, size_t* length) {
Status LocalFileReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) {
bool eof;
int64_t file_size = size() - _current_offset;
if (file_size <= 0) {
*buf = nullptr;
buf->reset();
*length = 0;
return Status::OK();
}
*length = file_size;
*buf = new uint8_t[file_size];
read(*buf, length, &eof);
if (*length == 0) {
delete *buf;
*buf = nullptr;
}
buf->reset(new uint8_t[file_size]);
read(buf->get(), length, &eof);
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/local_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LocalFileReader : public FileReader {
virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
virtual Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) override;
virtual Status read_one_message(uint8_t** buf, size_t* length) override;
virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, size_t* length) override;
virtual int64_t size() override;
virtual Status seek(int64_t position) override;
virtual Status tell(int64_t* position) override;
Expand Down
14 changes: 7 additions & 7 deletions be/src/runtime/stream_load/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
// If _total_length == -1, this should be a Kafka routine load task,
// just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data.
// Otherwise, this should be a stream load task that needs to read the specified amount of data.
Status read_one_message(uint8_t** data, size_t* length) override {
Status read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length) override {
if (_total_length < -1) {
std::stringstream ss;
ss << "invalid, _total_length is: " << _total_length;
Expand All @@ -102,10 +102,10 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
}

// _total_length > 0, read the entire data
*data = new uint8_t[_total_length];
data->reset(new uint8_t[_total_length]);
*length = _total_length;
bool eof = false;
Status st = read(*data, length, &eof);
Status st = read(data->get(), length, &eof);
if (eof) {
*length = 0;
}
Expand Down Expand Up @@ -188,7 +188,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {

private:
// read the next buffer from _buf_queue
Status _read_next_buffer(uint8_t** data, size_t* length) {
Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length) {
std::unique_lock<std::mutex> l(_lock);
while (!_cancelled && !_finished && _buf_queue.empty()) {
_get_cond.wait(l);
Expand All @@ -200,14 +200,14 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
// finished
if (_buf_queue.empty()) {
DCHECK(_finished);
*data = nullptr;
data->reset();
*length = 0;
return Status::OK();
}
auto buf = _buf_queue.front();
*length = buf->remaining();
*data = new uint8_t[*length];
buf->get_bytes((char*)(*data), *length);
data->reset(new uint8_t[*length]);
buf->get_bytes((char*)(data->get()), *length);

_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
Expand Down