diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index f949ef3f5abeab..4844338cae760e 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -92,6 +92,7 @@ set(EXEC_FILES broker_writer.cpp parquet_scanner.cpp parquet_reader.cpp + parquet_writer.cpp orc_scanner.cpp json_scanner.cpp assert_num_rows_node.cpp diff --git a/be/src/exec/broker_writer.h b/be/src/exec/broker_writer.h index f5efe7b20ec7e1..1bb5b757fa3b43 100644 --- a/be/src/exec/broker_writer.h +++ b/be/src/exec/broker_writer.h @@ -40,7 +40,7 @@ class BrokerWriter : public FileWriter { BrokerWriter(ExecEnv* env, const std::vector& broker_addresses, const std::map& properties, - const std::string& dir, + const std::string& path, int64_t start_offset); virtual ~BrokerWriter(); diff --git a/be/src/exec/local_file_writer.h b/be/src/exec/local_file_writer.h index dc116698e0bb32..a5d2b4d82d864e 100644 --- a/be/src/exec/local_file_writer.h +++ b/be/src/exec/local_file_writer.h @@ -24,12 +24,14 @@ namespace doris { +class RuntimeState; + class LocalFileWriter : public FileWriter { public: LocalFileWriter(const std::string& path, int64_t start_offset); virtual ~LocalFileWriter(); - Status open() override; + Status open() override; virtual Status write(const uint8_t* buf, size_t buf_len, size_t* written_len) override; diff --git a/be/src/exec/parquet_writer.cpp b/be/src/exec/parquet_writer.cpp new file mode 100644 index 00000000000000..6c9f4033166316 --- /dev/null +++ b/be/src/exec/parquet_writer.cpp @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/parquet_writer.h" + +#include +#include +#include + +#include "exec/file_writer.h" +#include "common/logging.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/TPaloBrokerService.h" +#include "runtime/broker_mgr.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/tuple.h" +#include "runtime/descriptors.h" +#include "runtime/mem_pool.h" +#include "util/thrift_util.h" + +namespace doris { + +/// ParquetOutputStream +ParquetOutputStream::ParquetOutputStream(FileWriter* file_writer): _file_writer(file_writer) { + set_mode(arrow::io::FileMode::WRITE); +} + +ParquetOutputStream::~ParquetOutputStream() { + Close(); +} + +arrow::Status ParquetOutputStream::Write(const void* data, int64_t nbytes) { + size_t written_len = 0; + Status st = _file_writer->write(reinterpret_cast(data), nbytes, &written_len); + if (!st.ok()) { + return arrow::Status::IOError(st.get_error_msg()); + } + _cur_pos += written_len; + return arrow::Status::OK(); +} + +arrow::Status ParquetOutputStream::Tell(int64_t* position) const { + *position = _cur_pos; + return arrow::Status::OK(); +} + +arrow::Status ParquetOutputStream::Close() { + Status st = _file_writer->close(); + if (!st.ok()) { + return arrow::Status::IOError(st.get_error_msg()); + } + _is_closed = true; + return arrow::Status::OK(); +} + +/// ParquetWriterWrapper +ParquetWriterWrapper::ParquetWriterWrapper(FileWriter *file_writer, const std::vector& output_expr_ctxs) : + _output_expr_ctxs(output_expr_ctxs) { + // TODO(cmy): implement + _outstream = new ParquetOutputStream(file_writer); +} + +Status ParquetWriterWrapper::write(const RowBatch& row_batch) { + // TODO(cmy): implement + return Status::OK(); +} + +void ParquetWriterWrapper::close() { + // TODO(cmy): implement +} + +ParquetWriterWrapper::~ParquetWriterWrapper() { + close(); +} + +} // end namespace diff --git a/be/src/exec/parquet_writer.h b/be/src/exec/parquet_writer.h new file mode 100644 index 00000000000000..5147d2105a8044 --- /dev/null +++ b/be/src/exec/parquet_writer.h @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "gen_cpp/Types_types.h" +#include "gen_cpp/PaloBrokerService_types.h" +#include "gen_cpp/PlanNodes_types.h" + +namespace doris { + +class ExprContext; +class FileWriter; +class RowBatch; + +class ParquetOutputStream : public arrow::io::OutputStream { +public: + ParquetOutputStream(FileWriter* file_writer); + virtual ~ParquetOutputStream(); + + arrow::Status Write(const void* data, int64_t nbytes) override; + // return the current write position of the stream + arrow::Status Tell(int64_t* position) const override; + arrow::Status Close() override; + + bool closed() const override { + return _is_closed; + } +private: + FileWriter* _file_writer; // not owned + int64_t _cur_pos; // current write position + bool _is_closed = false; +}; + +// a wrapper of parquet output stream +class ParquetWriterWrapper { +public: + ParquetWriterWrapper(FileWriter *file_writer, const std::vector& output_expr_ctxs); + virtual ~ParquetWriterWrapper(); + + Status write(const RowBatch& row_batch); + + void close(); + +private: + ParquetOutputStream* _outstream; + const std::vector& _output_expr_ctxs; +}; + +} + diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 40fa55174167cb..642668ef59e606 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -44,8 +44,8 @@ set(RUNTIME_FILES raw_value.cpp raw_value_ir.cpp result_sink.cpp - result_writer.cpp result_buffer_mgr.cpp + result_writer.cpp row_batch.cpp runtime_state.cpp string_value.cpp @@ -105,6 +105,8 @@ set(RUNTIME_FILES result_queue_mgr.cpp memory_scratch_sink.cpp external_scan_context_mgr.cpp + file_result_writer.cpp + mysql_result_writer.cpp memory/system_allocator.cpp memory/chunk_allocator.cpp ) diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 71682287e08368..cd3f80f7fb40bb 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -84,6 +84,15 @@ class BufferControlBlock { void set_query_statistics(std::shared_ptr statistics) { _query_statistics = statistics; } + + void update_num_written_rows(int64_t num_rows) { + // _query_statistics may be null when the result sink init failed + // or some other failure. + // and the number of written rows is only needed when all things go well. + if (_query_statistics.get() != nullptr) { + _query_statistics->set_returned_rows(num_rows); + } + } private: typedef std::list ResultQueue; diff --git a/be/src/runtime/file_result_writer.cpp b/be/src/runtime/file_result_writer.cpp new file mode 100644 index 00000000000000..2cc6102d816d16 --- /dev/null +++ b/be/src/runtime/file_result_writer.cpp @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/file_result_writer.h" + +#include "exec/broker_writer.h" +#include "exec/local_file_writer.h" +#include "exec/parquet_writer.h" +#include "exprs/expr.h" +#include "runtime/primitive_type.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" +#include "runtime/runtime_state.h" +#include "util/types.h" +#include "util/date_func.h" +#include "util/uid_util.h" + +#include "gen_cpp/PaloInternalService_types.h" + +namespace doris { + +const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; + +FileResultWriter::FileResultWriter( + const ResultFileOptions* file_opts, + const std::vector& output_expr_ctxs, + RuntimeProfile* parent_profile) : + _file_opts(file_opts), + _output_expr_ctxs(output_expr_ctxs), + _parent_profile(parent_profile) { +} + +FileResultWriter::~FileResultWriter() { + _close_file_writer(true); +} + +Status FileResultWriter::init(RuntimeState* state) { + _state = state; + _init_profile(); + + RETURN_IF_ERROR(_create_file_writer()); + return Status::OK(); +} + +void FileResultWriter::_init_profile() { + RuntimeProfile* profile = _parent_profile->create_child("FileResultWriter", true, true); + _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime"); + _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime"); + _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime"); + _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime"); + _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT); + _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES); +} + +Status FileResultWriter::_create_file_writer() { + std::string file_name = _get_next_file_name(); + if (_file_opts->is_local_file) { + _file_writer = new LocalFileWriter(file_name, 0 /* start offset */); + } else { + _file_writer = new BrokerWriter(_state->exec_env(), + _file_opts->broker_addresses, + _file_opts->broker_properties, + file_name, + 0 /*start offset*/); + } + RETURN_IF_ERROR(_file_writer->open()); + + switch (_file_opts->file_format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + // just use file writer is enough + break; + case TFileFormatType::FORMAT_PARQUET: + _parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs); + break; + default: + return Status::InternalError(strings::Substitute("unsupport file format: $0", _file_opts->file_format)); + } + LOG(INFO) << "create file for exporting query result. file name: " << file_name + << ". query id: " << print_id(_state->query_id()); + return Status::OK(); +} + +// file name format as: my_prefix_0.csv +std::string FileResultWriter::_get_next_file_name() { + std::stringstream ss; + ss << _file_opts->file_path << (_file_idx++) << "." << _file_format_to_name(); + return ss.str(); +} + +std::string FileResultWriter::_file_format_to_name() { + switch (_file_opts->file_format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + return "csv"; + case TFileFormatType::FORMAT_PARQUET: + return "parquet"; + default: + return "unknown"; + } +} + +Status FileResultWriter::append_row_batch(const RowBatch* batch) { + if (nullptr == batch || 0 == batch->num_rows()) { + return Status::OK(); + } + + SCOPED_TIMER(_append_row_batch_timer); + if (_parquet_writer != nullptr) { + RETURN_IF_ERROR(_parquet_writer->write(*batch)); + } else { + RETURN_IF_ERROR(_write_csv_file(*batch)); + } + + _written_rows += batch->num_rows(); + return Status::OK(); +} + +Status FileResultWriter::_write_csv_file(const RowBatch& batch) { + int num_rows = batch.num_rows(); + for (int i = 0; i < num_rows; ++i) { + TupleRow* row = batch.get_row(i); + RETURN_IF_ERROR(_write_one_row_as_csv(row)); + } + _flush_plain_text_outstream(true); + return Status::OK(); +} + +// actually, this logic is same as `ExportSink::gen_row_buffer` +// TODO(cmy): find a way to unify them. +Status FileResultWriter::_write_one_row_as_csv(TupleRow* row) { + { + SCOPED_TIMER(_convert_tuple_timer); + int num_columns = _output_expr_ctxs.size(); + for (int i = 0; i < num_columns; ++i) { + void* item = _output_expr_ctxs[i]->get_value(row); + + if (item == nullptr) { + _plain_text_outstream << NULL_IN_CSV; + continue; + } + + switch (_output_expr_ctxs[i]->root()->type().type) { + case TYPE_BOOLEAN: + case TYPE_TINYINT: + _plain_text_outstream << (int)*static_cast(item); + break; + case TYPE_SMALLINT: + _plain_text_outstream << *static_cast(item); + break; + case TYPE_INT: + _plain_text_outstream << *static_cast(item); + break; + case TYPE_BIGINT: + _plain_text_outstream << *static_cast(item); + break; + case TYPE_LARGEINT: + _plain_text_outstream << reinterpret_cast(item)->value; + break; + case TYPE_FLOAT: { + char buffer[MAX_FLOAT_STR_LENGTH + 2]; + float float_value = *static_cast(item); + buffer[0] = '\0'; + int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer); + DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value; + _plain_text_outstream << buffer; + break; + } + case TYPE_DOUBLE: { + // To prevent loss of precision on float and double types, + // they are converted to strings before output. + // For example: For a double value 27361919854.929001, + // the direct output of using std::stringstream is 2.73619e+10, + // and after conversion to a string, it outputs 27361919854.929001 + char buffer[MAX_DOUBLE_STR_LENGTH + 2]; + double double_value = *static_cast(item); + buffer[0] = '\0'; + int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer); + DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value; + _plain_text_outstream << buffer; + break; + } + case TYPE_DATE: + case TYPE_DATETIME: { + char buf[64]; + const DateTimeValue* time_val = (const DateTimeValue*)(item); + time_val->to_string(buf); + _plain_text_outstream << buf; + break; + } + case TYPE_VARCHAR: + case TYPE_CHAR: { + const StringValue* string_val = (const StringValue*)(item); + if (string_val->ptr == NULL) { + if (string_val->len != 0) { + _plain_text_outstream << NULL_IN_CSV; + } + } else { + _plain_text_outstream << std::string(string_val->ptr, string_val->len); + } + break; + } + case TYPE_DECIMAL: { + const DecimalValue* decimal_val = reinterpret_cast(item); + std::string decimal_str; + int output_scale = _output_expr_ctxs[i]->root()->output_scale(); + if (output_scale > 0 && output_scale <= 30) { + decimal_str = decimal_val->to_string(output_scale); + } else { + decimal_str = decimal_val->to_string(); + } + _plain_text_outstream << decimal_str; + break; + } + case TYPE_DECIMALV2: { + const DecimalV2Value decimal_val(reinterpret_cast(item)->value); + std::string decimal_str; + int output_scale = _output_expr_ctxs[i]->root()->output_scale(); + if (output_scale > 0 && output_scale <= 30) { + decimal_str = decimal_val.to_string(output_scale); + } else { + decimal_str = decimal_val.to_string(); + } + _plain_text_outstream << decimal_str; + break; + } + default: { + // not supported type, like BITMAP, HLL, just export null + _plain_text_outstream << NULL_IN_CSV; + } + } + if (i < num_columns - 1) { + _plain_text_outstream << _file_opts->column_separator; + } + } // end for columns + _plain_text_outstream << _file_opts->line_delimiter; + } + + // write one line to file + return _flush_plain_text_outstream(false); +} + +Status FileResultWriter::_flush_plain_text_outstream(bool eos) { + SCOPED_TIMER(_file_write_timer); + size_t pos = _plain_text_outstream.tellp(); + if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) { + return Status::OK(); + } + + const std::string& buf = _plain_text_outstream.str(); + size_t written_len = 0; + RETURN_IF_ERROR(_file_writer->write(reinterpret_cast(buf.c_str()), + buf.size(), &written_len)); + COUNTER_UPDATE(_written_data_bytes, written_len); + _current_written_bytes += written_len; + + // clear the stream + _plain_text_outstream.str(""); + _plain_text_outstream.clear(); + + // split file if exceed limit + RETURN_IF_ERROR(_create_new_file_if_exceed_size()); + + return Status::OK(); +} + +Status FileResultWriter::_create_new_file_if_exceed_size() { + if (_current_written_bytes < _file_opts->max_file_size_bytes) { + return Status::OK(); + } + // current file size exceed the max file size. close this file + // and create new one + { + SCOPED_TIMER(_writer_close_timer); + RETURN_IF_ERROR(_close_file_writer(false)); + } + _current_written_bytes = 0; + return Status::OK(); +} + +Status FileResultWriter::_close_file_writer(bool done) { + if (_parquet_writer != nullptr) { + _parquet_writer->close(); + delete _parquet_writer; + _parquet_writer = nullptr; + if (!done) { + //TODO(cmy): implement parquet writer later + } + } else if (_file_writer != nullptr) { + _file_writer->close(); + delete _file_writer; + _file_writer = nullptr; + } + + if (!done) { + // not finished, create new file writer for next file + RETURN_IF_ERROR(_create_file_writer()); + } + return Status::OK(); +} + +Status FileResultWriter::close() { + // the following 2 profile "_written_rows_counter" and "_writer_close_timer" + // must be outside the `_close_file_writer()`. + // because `_close_file_writer()` may be called in deconstructor, + // at that time, the RuntimeState may already been deconstructed, + // so does the profile in RuntimeState. + COUNTER_SET(_written_rows_counter, _written_rows); + SCOPED_TIMER(_writer_close_timer); + RETURN_IF_ERROR(_close_file_writer(true)); + return Status::OK(); +} + +} + diff --git a/be/src/runtime/file_result_writer.h b/be/src/runtime/file_result_writer.h new file mode 100644 index 00000000000000..bc0040327efc8e --- /dev/null +++ b/be/src/runtime/file_result_writer.h @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/result_writer.h" +#include "runtime/runtime_state.h" +#include "gen_cpp/DataSinks_types.h" + +namespace doris { + +class ExprContext; +class FileWriter; +class ParquetWriterWrapper; +class RowBatch; +class RuntimeProfile; +class TupleRow; + +struct ResultFileOptions { + bool is_local_file; + std::string file_path; + TFileFormatType::type file_format; + std::string column_separator; + std::string line_delimiter; + size_t max_file_size_bytes = 1 * 1024 * 1024 * 1024; // 1GB + std::vector broker_addresses; + std::map broker_properties; + + ResultFileOptions(const TResultFileSinkOptions& t_opt) { + file_path = t_opt.file_path; + file_format = t_opt.file_format; + column_separator = t_opt.__isset.column_separator ? t_opt.column_separator : "\t"; + line_delimiter = t_opt.__isset.line_delimiter ? t_opt.line_delimiter : "\n"; + max_file_size_bytes = t_opt.__isset.max_file_size_bytes ? + t_opt.max_file_size_bytes : max_file_size_bytes; + + is_local_file = true; + if (t_opt.__isset.broker_addresses) { + broker_addresses = t_opt.broker_addresses; + is_local_file = false; + } + if (t_opt.__isset.broker_properties) { + broker_properties = t_opt.broker_properties; + } + } +}; + +// write result to file +class FileResultWriter final : public ResultWriter { +public: + FileResultWriter(const ResultFileOptions* file_option, + const std::vector& output_expr_ctxs, + RuntimeProfile* parent_profile); + virtual ~FileResultWriter(); + + virtual Status init(RuntimeState* state) override; + virtual Status append_row_batch(const RowBatch* batch) override; + virtual Status close() override; + +private: + Status _write_csv_file(const RowBatch& batch); + Status _write_one_row_as_csv(TupleRow* row); + + // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer + // if eos, write the data even if buffer is not full. + Status _flush_plain_text_outstream(bool eos); + void _init_profile(); + + Status _create_file_writer(); + // get next export file name + std::string _get_next_file_name(); + std::string _file_format_to_name(); + // close file writer, and if !done, it will create new writer for next file + Status _close_file_writer(bool done); + // create a new file if current file size exceed limit + Status _create_new_file_if_exceed_size(); + +private: + RuntimeState* _state; // not owned, set when init + const ResultFileOptions* _file_opts; + const std::vector& _output_expr_ctxs; + + // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter. + // If the result file format is Parquet, this _file_writer is owned by _parquet_writer. + FileWriter* _file_writer = nullptr; + // parquet file writer + ParquetWriterWrapper* _parquet_writer = nullptr; + // Used to buffer the export data of plain text + // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling + // file writer's write() for every single row. + // But this cannot solve the problem of a row of data that is too large. + // For exampel: bitmap_to_string() may return large volumn of data. + // And the speed is relative low, in my test, is about 6.5MB/s. + std::stringstream _plain_text_outstream; + static const size_t OUTSTREAM_BUFFER_SIZE_BYTES; + + // current written bytes, used for split data + int64_t _current_written_bytes = 0; + // the suffix idx of export file name, start at 0 + int _file_idx = 0; + + RuntimeProfile* _parent_profile; // profile from result sink, not owned + // total time cost on append batch opertion + RuntimeProfile::Counter* _append_row_batch_timer = nullptr; + // tuple convert timer, child timer of _append_row_batch_timer + RuntimeProfile::Counter* _convert_tuple_timer = nullptr; + // file write timer, child timer of _append_row_batch_timer + RuntimeProfile::Counter* _file_write_timer = nullptr; + // time of closing the file writer + RuntimeProfile::Counter* _writer_close_timer = nullptr; + // number of written rows + RuntimeProfile::Counter* _written_rows_counter = nullptr; + // bytes of written data + RuntimeProfile::Counter* _written_data_bytes = nullptr; +}; + +} // end of namespace + diff --git a/be/src/runtime/mysql_result_writer.cpp b/be/src/runtime/mysql_result_writer.cpp new file mode 100644 index 00000000000000..a0d2426478c5e5 --- /dev/null +++ b/be/src/runtime/mysql_result_writer.cpp @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/mysql_result_writer.h" + +#include "exprs/expr.h" +#include "runtime/primitive_type.h" +#include "runtime/row_batch.h" +#include "runtime/tuple_row.h" +#include "runtime/result_buffer_mgr.h" +#include "runtime/buffer_control_block.h" +#include "util/mysql_row_buffer.h" +#include "util/types.h" +#include "util/date_func.h" + +#include "gen_cpp/PaloInternalService_types.h" + +namespace doris { + +MysqlResultWriter::MysqlResultWriter( + BufferControlBlock* sinker, + const std::vector& output_expr_ctxs, + RuntimeProfile* parent_profile) : + _sinker(sinker), + _output_expr_ctxs(output_expr_ctxs), + _row_buffer(NULL), + _parent_profile(parent_profile) { +} + +MysqlResultWriter::~MysqlResultWriter() { + delete _row_buffer; +} + +Status MysqlResultWriter::init(RuntimeState* state) { + _init_profile(); + if (NULL == _sinker) { + return Status::InternalError("sinker is NULL pointer."); + } + + _row_buffer = new(std::nothrow) MysqlRowBuffer(); + + if (NULL == _row_buffer) { + return Status::InternalError("no memory to alloc."); + } + + return Status::OK(); +} + +void MysqlResultWriter::_init_profile() { + RuntimeProfile* profile = _parent_profile->create_child("MySQLResultWriter", true, true); + _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime"); + _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime"); + _result_send_timer = ADD_CHILD_TIMER(profile, "ResultRendTime", "AppendBatchTime"); + _sent_rows_counter = ADD_COUNTER(profile, "NumSentRows", TUnit::UNIT); +} + +Status MysqlResultWriter::_add_one_row(TupleRow* row) { + SCOPED_TIMER(_convert_tuple_timer); + _row_buffer->reset(); + int num_columns = _output_expr_ctxs.size(); + int buf_ret = 0; + + for (int i = 0; 0 == buf_ret && i < num_columns; ++i) { + void* item = _output_expr_ctxs[i]->get_value(row); + + if (NULL == item) { + buf_ret = _row_buffer->push_null(); + continue; + } + + switch (_output_expr_ctxs[i]->root()->type().type) { + case TYPE_BOOLEAN: + case TYPE_TINYINT: + buf_ret = _row_buffer->push_tinyint(*static_cast(item)); + break; + + case TYPE_SMALLINT: + buf_ret = _row_buffer->push_smallint(*static_cast(item)); + break; + + case TYPE_INT: + buf_ret = _row_buffer->push_int(*static_cast(item)); + break; + + case TYPE_BIGINT: + buf_ret = _row_buffer->push_bigint(*static_cast(item)); + break; + + case TYPE_LARGEINT: { + char buf[48]; + int len = 48; + char* v = LargeIntValue::to_string( + reinterpret_cast(item)->value, buf, &len); + buf_ret = _row_buffer->push_string(v, len); + break; + } + + case TYPE_FLOAT: + buf_ret = _row_buffer->push_float(*static_cast(item)); + break; + + case TYPE_DOUBLE: + buf_ret = _row_buffer->push_double(*static_cast(item)); + break; + + case TYPE_TIME: { + double time = *static_cast(item); + std::string time_str = time_str_from_double(time); + buf_ret = _row_buffer->push_string(time_str.c_str(), time_str.size()); + break; + } + + case TYPE_DATE: + case TYPE_DATETIME: { + char buf[64]; + const DateTimeValue* time_val = (const DateTimeValue*)(item); + // TODO(zhaochun), this function has core risk + char* pos = time_val->to_string(buf); + buf_ret = _row_buffer->push_string(buf, pos - buf - 1); + break; + } + + case TYPE_HLL: + case TYPE_OBJECT: { + buf_ret = _row_buffer->push_null(); + break; + } + + case TYPE_VARCHAR: + case TYPE_CHAR: { + const StringValue* string_val = (const StringValue*)(item); + + if (string_val->ptr == NULL) { + if (string_val->len == 0) { + // 0x01 is a magic num, not usefull actually, just for present "" + char* tmp_val = reinterpret_cast(0x01); + buf_ret = _row_buffer->push_string(tmp_val, string_val->len); + } else { + buf_ret = _row_buffer->push_null(); + } + } else { + buf_ret = _row_buffer->push_string(string_val->ptr, string_val->len); + } + + break; + } + + case TYPE_DECIMAL: { + const DecimalValue* decimal_val = reinterpret_cast(item); + std::string decimal_str; + int output_scale = _output_expr_ctxs[i]->root()->output_scale(); + + if (output_scale > 0 && output_scale <= 30) { + decimal_str = decimal_val->to_string(output_scale); + } else { + decimal_str = decimal_val->to_string(); + } + + buf_ret = _row_buffer->push_string(decimal_str.c_str(), decimal_str.length()); + break; + } + + case TYPE_DECIMALV2: { + DecimalV2Value decimal_val(reinterpret_cast(item)->value); + std::string decimal_str; + int output_scale = _output_expr_ctxs[i]->root()->output_scale(); + + if (output_scale > 0 && output_scale <= 30) { + decimal_str = decimal_val.to_string(output_scale); + } else { + decimal_str = decimal_val.to_string(); + } + + buf_ret = _row_buffer->push_string(decimal_str.c_str(), decimal_str.length()); + break; + } + + default: + LOG(WARNING) << "can't convert this type to mysql type. type = " << + _output_expr_ctxs[i]->root()->type(); + buf_ret = -1; + break; + } + } + + if (0 != buf_ret) { + return Status::InternalError("pack mysql buffer failed."); + } + + return Status::OK(); +} + +Status MysqlResultWriter::append_row_batch(const RowBatch* batch) { + SCOPED_TIMER(_append_row_batch_timer); + if (NULL == batch || 0 == batch->num_rows()) { + return Status::OK(); + } + + Status status; + // convert one batch + TFetchDataResult* result = new(std::nothrow) TFetchDataResult(); + int num_rows = batch->num_rows(); + result->result_batch.rows.resize(num_rows); + + for (int i = 0; status.ok() && i < num_rows; ++i) { + TupleRow* row = batch->get_row(i); + status = _add_one_row(row); + + if (status.ok()) { + result->result_batch.rows[i].assign(_row_buffer->buf(), _row_buffer->length()); + } else { + LOG(WARNING) << "convert row to mysql result failed."; + break; + } + } + + if (status.ok()) { + SCOPED_TIMER(_sent_rows_counter); + // push this batch to back + status = _sinker->add_batch(result); + + if (status.ok()) { + result = NULL; + _written_rows += num_rows; + } else { + LOG(WARNING) << "append result batch to sink failed."; + } + } + + delete result; + result = NULL; + + return status; +} + +Status MysqlResultWriter::close() { + COUNTER_SET(_sent_rows_counter, _written_rows); + return Status::OK(); +} + +} + diff --git a/be/src/runtime/mysql_result_writer.h b/be/src/runtime/mysql_result_writer.h new file mode 100644 index 00000000000000..57134afc895953 --- /dev/null +++ b/be/src/runtime/mysql_result_writer.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/result_writer.h" +#include "runtime/runtime_state.h" + +namespace doris { + +class TupleRow; +class RowBatch; +class ExprContext; +class MysqlRowBuffer; +class BufferControlBlock; +class RuntimeProfile; + +// convert the row batch to mysql protol row +class MysqlResultWriter final : public ResultWriter { +public: + MysqlResultWriter(BufferControlBlock* sinker, + const std::vector& output_expr_ctxs, + RuntimeProfile* parent_profile); + virtual ~MysqlResultWriter(); + + virtual Status init(RuntimeState* state) override; + // convert one row batch to mysql result and + // append this batch to the result sink + virtual Status append_row_batch(const RowBatch* batch) override; + + virtual Status close() override; + +private: + void _init_profile(); + // convert one tuple row + Status _add_one_row(TupleRow* row); + +private: + BufferControlBlock* _sinker; + const std::vector& _output_expr_ctxs; + MysqlRowBuffer* _row_buffer; + + RuntimeProfile* _parent_profile; // parent profile from result sink. not owned + // total time cost on append batch opertion + RuntimeProfile::Counter* _append_row_batch_timer = nullptr; + // tuple convert timer, child timer of _append_row_batch_timer + RuntimeProfile::Counter* _convert_tuple_timer = nullptr; + // file write timer, child timer of _append_row_batch_timer + RuntimeProfile::Counter* _result_send_timer = nullptr; + // number of sent rows + RuntimeProfile::Counter* _sent_rows_counter = nullptr; +}; + +} // end of namespace + diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index d0a247d1a5a71b..a1106536186036 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -33,7 +33,7 @@ class QueryStatisticsRecvr; class QueryStatistics { public: - QueryStatistics() : scan_rows(0), scan_bytes(0) { + QueryStatistics() : scan_rows(0), scan_bytes(0), returned_rows(0) { } void merge(const QueryStatistics& other) { @@ -49,17 +49,23 @@ class QueryStatistics { this->scan_bytes += scan_bytes; } + void set_returned_rows(int64_t num_rows) { + this->returned_rows = num_rows; + } + void merge(QueryStatisticsRecvr* recvr); void clear() { scan_rows = 0; scan_bytes = 0; + returned_rows = 0; } void to_pb(PQueryStatistics* statistics) { DCHECK(statistics != nullptr); statistics->set_scan_rows(scan_rows); statistics->set_scan_bytes(scan_bytes); + statistics->set_returned_rows(returned_rows); } void merge_pb(const PQueryStatistics& statistics) { @@ -71,6 +77,9 @@ class QueryStatistics { int64_t scan_rows; int64_t scan_bytes; + // number rows returned by query. + // only set once by result sink when closing. + int64_t returned_rows; }; // It is used for collecting sub plan query statistics in DataStreamRecvr. diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp index 85b724d6adb7ec..be51a5b991254a 100644 --- a/be/src/runtime/result_sink.cpp +++ b/be/src/runtime/result_sink.cpp @@ -25,7 +25,8 @@ #include "runtime/exec_env.h" #include "runtime/result_buffer_mgr.h" #include "runtime/buffer_control_block.h" -#include "runtime/result_writer.h" +#include "runtime/file_result_writer.h" +#include "runtime/mysql_result_writer.h" #include "runtime/mem_tracker.h" namespace doris { @@ -35,6 +36,17 @@ ResultSink::ResultSink(const RowDescriptor& row_desc, const std::vector& : _row_desc(row_desc), _t_output_expr(t_output_expr), _buf_size(buffer_size) { + + if (!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCAL) { + _sink_type = TResultSinkType::MYSQL_PROTOCAL; + } else { + _sink_type = sink.type; + } + + if (_sink_type == TResultSinkType::FILE) { + CHECK(sink.__isset.file_options); + _file_opts.reset(new ResultFileOptions(sink.file_options)); + } } ResultSink::~ResultSink() { @@ -58,13 +70,25 @@ Status ResultSink::prepare(RuntimeState* state) { _profile = state->obj_pool()->add(new RuntimeProfile(state->obj_pool(), title.str())); // prepare output_expr RETURN_IF_ERROR(prepare_exprs(state)); + // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), _buf_size, &_sender)); - // create writer - _writer.reset(new(std::nothrow) ResultWriter(_sender.get(), _output_expr_ctxs)); - RETURN_IF_ERROR(_writer->init(state)); + state->fragment_instance_id(), _buf_size, &_sender)); + + // create writer based on sink type + switch (_sink_type) { + case TResultSinkType::MYSQL_PROTOCAL: + _writer.reset(new(std::nothrow) MysqlResultWriter(_sender.get(), _output_expr_ctxs, _profile)); + break; + case TResultSinkType::FILE: + CHECK(_file_opts.get() != nullptr); + _writer.reset(new(std::nothrow) FileResultWriter(_file_opts.get(), _output_expr_ctxs, _profile)); + break; + default: + return Status::InternalError("Unknown result sink type"); + } + RETURN_IF_ERROR(_writer->init(state)); return Status::OK(); } @@ -80,9 +104,19 @@ Status ResultSink::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } + + Status final_status = exec_status; + // close the writer + Status st = _writer->close(); + if (!st.ok() && exec_status.ok()) { + // close file writer failed, should return this error to client + final_status = st; + } + // close sender, this is normal path end if (_sender) { - _sender->close(exec_status); + _sender->update_num_written_rows(_writer->get_written_rows()); + _sender->close(final_status); } state->exec_env()->result_mgr()->cancel_at_time(time(NULL) + config::result_buffer_cancelled_interval_time, state->fragment_instance_id()); diff --git a/be/src/runtime/result_sink.h b/be/src/runtime/result_sink.h index 53739740bd573c..bf732b1c5d4457 100644 --- a/be/src/runtime/result_sink.h +++ b/be/src/runtime/result_sink.h @@ -35,6 +35,7 @@ class BufferControlBlock; class ExprContext; class ResultWriter; class MemTracker; +class ResultFileOptions; class ResultSink : public DataSink { public: @@ -60,6 +61,9 @@ class ResultSink : public DataSink { private: Status prepare_exprs(RuntimeState* state); + TResultSinkType::type _sink_type; + // set file options when sink type is FILE + std::unique_ptr _file_opts; ObjectPool* _obj_pool; // Owned by the RuntimeState. @@ -73,6 +77,7 @@ class ResultSink : public DataSink { boost::shared_ptr _writer; RuntimeProfile* _profile; // Allocated from _pool int _buf_size; // Allocated from _pool + }; } diff --git a/be/src/runtime/result_writer.cpp b/be/src/runtime/result_writer.cpp index 6d16348a34746e..b5537e486cb7e0 100644 --- a/be/src/runtime/result_writer.cpp +++ b/be/src/runtime/result_writer.cpp @@ -15,222 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "result_writer.h" - -#include "exprs/expr.h" -#include "runtime/primitive_type.h" -#include "runtime/row_batch.h" -#include "runtime/tuple_row.h" -#include "runtime/result_buffer_mgr.h" -#include "runtime/buffer_control_block.h" -#include "util/mysql_row_buffer.h" -#include "util/types.h" -#include "util/date_func.h" - -#include "gen_cpp/PaloInternalService_types.h" +#include "runtime/result_writer.h" namespace doris { -ResultWriter::ResultWriter( - BufferControlBlock* sinker, - const std::vector& output_expr_ctxs) : - _sinker(sinker), - _output_expr_ctxs(output_expr_ctxs), - _row_buffer(NULL) { -} - -ResultWriter::~ResultWriter() { - delete _row_buffer; -} - -Status ResultWriter::init(RuntimeState* state) { - if (NULL == _sinker) { - return Status::InternalError("sinker is NULL pointer."); - } - - _row_buffer = new(std::nothrow) MysqlRowBuffer(); - - if (NULL == _row_buffer) { - return Status::InternalError("no memory to alloc."); - } - - return Status::OK(); -} - -Status ResultWriter::add_one_row(TupleRow* row) { - _row_buffer->reset(); - int num_columns = _output_expr_ctxs.size(); - int buf_ret = 0; - - for (int i = 0; 0 == buf_ret && i < num_columns; ++i) { - void* item = _output_expr_ctxs[i]->get_value(row); - - if (NULL == item) { - buf_ret = _row_buffer->push_null(); - continue; - } - - switch (_output_expr_ctxs[i]->root()->type().type) { - case TYPE_BOOLEAN: - case TYPE_TINYINT: - buf_ret = _row_buffer->push_tinyint(*static_cast(item)); - break; - - case TYPE_SMALLINT: - buf_ret = _row_buffer->push_smallint(*static_cast(item)); - break; - - case TYPE_INT: - buf_ret = _row_buffer->push_int(*static_cast(item)); - break; - - case TYPE_BIGINT: - buf_ret = _row_buffer->push_bigint(*static_cast(item)); - break; - - case TYPE_LARGEINT: { - char buf[48]; - int len = 48; - char* v = LargeIntValue::to_string( - reinterpret_cast(item)->value, buf, &len); - buf_ret = _row_buffer->push_string(v, len); - break; - } - - case TYPE_FLOAT: - buf_ret = _row_buffer->push_float(*static_cast(item)); - break; - - case TYPE_DOUBLE: - buf_ret = _row_buffer->push_double(*static_cast(item)); - break; - - case TYPE_TIME: { - double time = *static_cast(item); - std::string time_str = time_str_from_double(time); - buf_ret = _row_buffer->push_string(time_str.c_str(), time_str.size()); - break; - } - - case TYPE_DATE: - case TYPE_DATETIME: { - char buf[64]; - const DateTimeValue* time_val = (const DateTimeValue*)(item); - // TODO(zhaochun), this function has core risk - char* pos = time_val->to_string(buf); - buf_ret = _row_buffer->push_string(buf, pos - buf - 1); - break; - } - - case TYPE_HLL: - case TYPE_OBJECT: { - buf_ret = _row_buffer->push_null(); - break; - } - - case TYPE_VARCHAR: - case TYPE_CHAR: { - const StringValue* string_val = (const StringValue*)(item); - - if (string_val->ptr == NULL) { - if (string_val->len == 0) { - // 0x01 is a magic num, not usefull actually, just for present "" - char* tmp_val = reinterpret_cast(0x01); - buf_ret = _row_buffer->push_string(tmp_val, string_val->len); - } else { - buf_ret = _row_buffer->push_null(); - } - } else { - buf_ret = _row_buffer->push_string(string_val->ptr, string_val->len); - } - - break; - } - - case TYPE_DECIMAL: { - const DecimalValue* decimal_val = reinterpret_cast(item); - std::string decimal_str; - int output_scale = _output_expr_ctxs[i]->root()->output_scale(); - - if (output_scale > 0 && output_scale <= 30) { - decimal_str = decimal_val->to_string(output_scale); - } else { - decimal_str = decimal_val->to_string(); - } - - buf_ret = _row_buffer->push_string(decimal_str.c_str(), decimal_str.length()); - break; - } - - case TYPE_DECIMALV2: { - DecimalV2Value decimal_val(reinterpret_cast(item)->value); - std::string decimal_str; - int output_scale = _output_expr_ctxs[i]->root()->output_scale(); - - if (output_scale > 0 && output_scale <= 30) { - decimal_str = decimal_val.to_string(output_scale); - } else { - decimal_str = decimal_val.to_string(); - } - - buf_ret = _row_buffer->push_string(decimal_str.c_str(), decimal_str.length()); - break; - } - - default: - LOG(WARNING) << "can't convert this type to mysql type. type = " << - _output_expr_ctxs[i]->root()->type(); - buf_ret = -1; - break; - } - } - - if (0 != buf_ret) { - return Status::InternalError("pack mysql buffer failed."); - } - - return Status::OK(); -} - -Status ResultWriter::append_row_batch(RowBatch* batch) { - if (NULL == batch || 0 == batch->num_rows()) { - return Status::OK(); - } - - Status status; - // convert one batch - TFetchDataResult* result = new(std::nothrow) TFetchDataResult(); - int num_rows = batch->num_rows(); - result->result_batch.rows.resize(num_rows); - - for (int i = 0; status.ok() && i < num_rows; ++i) { - TupleRow* row = batch->get_row(i); - status = add_one_row(row); - - if (status.ok()) { - result->result_batch.rows[i].assign(_row_buffer->buf(), _row_buffer->length()); - } else { - LOG(WARNING) << "convert row to mysql result failed."; - break; - } - } - - if (status.ok()) { - // push this batch to back - status = _sinker->add_batch(result); - - if (status.ok()) { - result = NULL; - } else { - LOG(WARNING) << "append result batch to sink failed."; - } - } - - delete result; - result = NULL; - - return status; -} +const std::string ResultWriter::NULL_IN_CSV = "\\N"; } diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h index 6a70157fb15752..a6b461f588e81c 100644 --- a/be/src/runtime/result_writer.h +++ b/be/src/runtime/result_writer.h @@ -15,43 +15,36 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_RUNTIME_RESULT_WRITER_H -#define DORIS_BE_RUNTIME_RESULT_WRITER_H - -#include +#pragma once #include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" namespace doris { -class TupleRow; +class Status; class RowBatch; -class ExprContext; -class MysqlRowBuffer; -class BufferControlBlock; class RuntimeState; -//convert the row batch to mysql protol row +// abstract class of the result writer class ResultWriter { public: - ResultWriter(BufferControlBlock* sinker, const std::vector& output_expr_ctxs); - ~ResultWriter(); - - Status init(RuntimeState* state); - // convert one row batch to mysql result and - // append this batch to the result sink - Status append_row_batch(RowBatch* batch); - -private: - // convert one tuple row - Status add_one_row(TupleRow* row); - - // The expressions that are run to create tuples to be written to hbase. - BufferControlBlock* _sinker; - const std::vector& _output_expr_ctxs; - MysqlRowBuffer* _row_buffer; + ResultWriter() {}; + ~ResultWriter() {}; + + virtual Status init(RuntimeState* state) = 0; + // convert and write one row batch + virtual Status append_row_batch(const RowBatch* batch) = 0; + + virtual Status close() = 0; + + int64_t get_written_rows() const { return _written_rows; } + + static const std::string NULL_IN_CSV; + +protected: + int64_t _written_rows = 0; // number of rows written }; } -#endif diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt index dd2471adf18368..e2574b31297d60 100644 --- a/be/test/runtime/CMakeLists.txt +++ b/be/test/runtime/CMakeLists.txt @@ -60,5 +60,6 @@ ADD_BE_TEST(heartbeat_flags_test) ADD_BE_TEST(result_queue_mgr_test) ADD_BE_TEST(memory_scratch_sink_test) ADD_BE_TEST(external_scan_context_mgr_test) + ADD_BE_TEST(memory/chunk_allocator_test) ADD_BE_TEST(memory/system_allocator_test) diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index 5efadc566cbbcc..db21dc81a060c0 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -106,6 +106,7 @@ module.exports = [ "colocation-join", "dynamic-partition", "export_manual", + "outfile", "privilege", "small-file-mgr", "sql-mode", diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index e2d3bd570e89ee..f6fa16d4634a0b 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -114,6 +114,7 @@ module.exports = [ "colocation-join", "dynamic-partition", "export-manual", + "outfile", "privilege", "segment-v2-usage", "small-file-mgr", diff --git a/docs/en/administrator-guide/outfile.md b/docs/en/administrator-guide/outfile.md new file mode 100644 index 00000000000000..77a535d8e76deb --- /dev/null +++ b/docs/en/administrator-guide/outfile.md @@ -0,0 +1,186 @@ +--- +{ + "title": "Export Query Result", + "language": "en" +} +--- + + + +# Export Query Result + +This document describes how to use the `SELECT INTO OUTFILE` command to export query results. + +## Syntax + +The `SELECT INTO OUTFILE` statement can export the query results to a file. Currently, it only supports exporting to remote storage such as HDFS, S3, and BOS through the Broker process. The syntax is as follows: + +``` +query_stmt +INTO OUTFILE "file_path" +[format_as] +WITH BROKER `broker_name` +[broker_properties] +[other_properties] +``` + +* `file_path` + + `file_path` specify the file path and file name prefix. Like: `hdfs://path/to/my_file_`. + + The final file name will be assembled as `my_file_`, file seq no and the format suffix. File seq no starts from 0, determined by the number of split. + + ``` + my_file_0.csv + my_file_1.csv + my_file_2.csv + ``` + +* `[format_as]` + + ``` + FORMAT AS CSV + ``` + + Specify the export format. The default is CSV. + +* `[properties]` + + Specify the relevant attributes. Currently only export through Broker process is supported. Broker related attributes need to be prefixed with `broker.`. For details, please refer to [Broker](./broker.html). + + ``` + PROPERTIES + ("broker.prop_key" = "broker.prop_val", ...) + ``` + + Other properties + + ``` + PROPERTIELS + ("key1" = "val1", "key2" = "val2", ...) + ``` + + currently supports the following properties: + + * `column_separator`: Column separator, only applicable to CSV format. The default is `\t`. + * `line_delimiter`: Line delimiter, only applicable to CSV format. The default is `\n`. + * `max_file_size`:The max size of a single file. Default is 1GB. Range from 5MB to 2GB. Files exceeding this size will be splitted. + +1. Example 1 + + Export simple query results to the file `hdfs:/path/to/result.txt`. Specify the export format as CSV. Use `my_broker` and set kerberos authentication information. Specify the column separator as `,` and the line delimiter as `\n`. + + ``` + SELECT * FROM tbl + INTO OUTFILE "hdfs:/path/to/result_" + FORMAT AS CSV + PROPERTIELS + ( + "broker.name" = "my_broker", + "broker.hadoop.security.authentication" = "kerberos", + "broker.kerberos_principal" = "doris@YOUR.COM", + "broker.kerberos_keytab" = "/home/doris/my.keytab" + "column_separator" = ",", + "line_delimiter" = "\n", + "max_file_size" = "100MB" + ); + ``` + + If the result is less than 100MB, file will be: `result_0.csv`. + + If larger than 100MB, may be: `result_0.csv, result_1.csv, ...`. + +2. Example 2 + + Export the query result of the CTE statement to the file `hdfs:/path/to/result.txt`. The default export format is CSV. Use `my_broker` and set hdfs high availability information. Use the default column separators and line delimiter. + + ``` + WITH + x1 AS + (SELECT k1, k2 FROM tbl1), + x2 AS + (SELECT k3 FROM tbl2) + SELEC k1 FROM x1 UNION SELECT k3 FROM x2 + INTO OUTFILE "hdfs:/path/to/result_" + PROPERTIELS + ( + "broker.name" = "my_broker", + "broker.username"="user", + "broker.password"="passwd", + "broker.dfs.nameservices" = "my_ha", + "broker.dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", + "broker.dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", + "broker.dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", + "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + ``` + + If the result is less than 1GB, file will be: `result_0.csv`. + + If larger than 1GB, may be: `result_0.csv, result_1.csv, ...`. + +3. Example 3 + + Export the query results of the UNION statement to the file `bos://bucket/result.parquet`. Specify the export format as PARQUET. Use `my_broker` and set hdfs high availability information. PARQUET format does not need to specify the column separator and line delimiter. + + ``` + SELECT k1 FROM tbl1 UNION SELECT k2 FROM tbl1 + INTO OUTFILE "bos://bucket/result_" + FORMAT AS PARQUET + PROPERTIELS + ( + "broker.name" = "my_broker", + "broker.bos_endpoint" = "http://bj.bcebos.com", + "broker.bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", + "broker.bos_secret_accesskey" = "yyyyyyyyyyyyyyyyyyyyyyyyyy" + ); + ``` + + If the result is less than 1GB, file will be: `result_0.parquet`. + + If larger than 1GB, may be: `result_0.parquet, result_1.parquet, ...`. + +## Return result + +The command is a synchronization command. The command returns, which means the operation is over. + +If it exports and returns normally, the result is as follows: + +``` +mysql> SELECT * FROM tbl INTO OUTFILE ... Query OK, 100000 row affected (5.86 sec) +``` + +`100000 row affected` Indicates the size of the exported result set. + +If the execution is incorrect, an error message will be returned, such as: + +``` +mysql> SELECT * FROM tbl INTO OUTFILE ... ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ... +``` + +## Notice + +* The CSV format does not support exporting binary types, such as BITMAP and HLL types. These types will be output as `\N`, which is null. +* The query results are exported from a single BE node and a single thread. Therefore, the export time and the export result set size are positively correlated. +* The export command does not check whether the file and file path exist. Whether the path will be automatically created or whether the existing file will be overwritten is entirely determined by the semantics of the remote storage system. +* If an error occurs during the export process, the exported file may remain on the remote storage system. Doris will not clean these files. The user needs to manually clean up. +* The timeout of the export command is the same as the timeout of the query. It can be set by `SET query_timeout = xxx`. +* For empty result query, there will be an empty file. +* File spliting will ensure that a row of data is stored in a single file. Therefore, the size of the file is not strictly equal to `max_file_size`. diff --git a/docs/zh-CN/administrator-guide/outfile.md b/docs/zh-CN/administrator-guide/outfile.md new file mode 100644 index 00000000000000..7a11aca301bbdd --- /dev/null +++ b/docs/zh-CN/administrator-guide/outfile.md @@ -0,0 +1,182 @@ +--- +{ + "title": "导出查询结果集", + "language": "zh-CN" +} +--- + + + +# 导出查询结果集 + +本文档介绍如何使用 `SELECT INTO OUTFILE` 命令进行查询结果的导出操作。 + +## 语法 + +`SELECT INTO OUTFILE` 语句可以将查询结果导出到文件中。目前仅支持通过 Broker 进程导出到远端存储,如 HDFS,S3,BOS 上。语法如下 + +``` +query_stmt +INTO OUTFILE "file_path" +[format_as] +[properties] +``` + +* `file_path` + + `file_path` 指向文件存储的路径以及文件前缀。如 `hdfs://path/to/my_file_`。 + + 最终的文件名将由 `my_file_`,文件序号以及文件格式后缀组成。其中文件序号由0开始,数量为文件被分割的数量。如: + + ``` + my_file_0.csv + my_file_1.csv + my_file_2.csv + ``` + +* `[format_as]` + + ``` + FORMAT AS CSV + ``` + + 指定导出格式。默认为 CSV。 + + +* `[properties]` + + 指定相关属性。目前仅支持通过 Broker 进程进行导出。Broker 相关属性需加前缀 `broker.`。具体参阅[Broker 文档](./broker.html)。 + + ``` + ("broker.prop_key" = "broker.prop_val", ...) + ``` + + 其他属性: + + ``` + ("key1" = "val1", "key2" = "val2", ...) + ``` + + 目前支持以下属性: + + * `column_separator`:列分隔符,仅对 CSV 格式适用。默认为 `\t`。 + * `line_delimiter`:行分隔符,仅对 CSV 格式适用。默认为 `\n`。 + * `max_file_size`:单个文件的最大大小。默认为 1GB。取值范围在 5MB 到 2GB 之间。超过这个大小的文件将会被切分。 + +1. 示例1 + + 将简单查询结果导出到文件 `hdfs:/path/to/result.txt`。指定导出格式为 CSV。使用 `my_broker` 并设置 kerberos 认证信息。指定列分隔符为 `,`,行分隔符为 `\n`。 + + ``` + SELECT * FROM tbl + INTO OUTFILE "hdfs:/path/to/result_" + FORMAT AS CSV + PROPERTIELS + ( + "broker.name" = "my_broker", + "broker.hadoop.security.authentication" = "kerberos", + "broker.kerberos_principal" = "doris@YOUR.COM", + "broker.kerberos_keytab" = "/home/doris/my.keytab" + "column_separator" = ",", + "line_delimiter" = "\n", + "max_file_size" = "100MB" + ); + ``` + + 最终生成文件如如果不大于 100MB,则为:`result_0.csv`。 + + 如果大于 100MB,则可能为 `result_0.csv, result_1.csv, ...`。 + +2. 示例2 + + 将 CTE 语句的查询结果导出到文件 `hdfs:/path/to/result.txt`。默认导出格式为 CSV。使用 `my_broker` 并设置 hdfs 高可用信息。使用默认的行列分隔符。 + + ``` + WITH + x1 AS + (SELECT k1, k2 FROM tbl1), + x2 AS + (SELECT k3 FROM tbl2) + SELEC k1 FROM x1 UNION SELECT k3 FROM x2 + INTO OUTFILE "hdfs:/path/to/result_" + PROPERTIELS + ( + "broker.name" = "my_broker", + "broker.username"="user", + "broker.password"="passwd", + "broker.dfs.nameservices" = "my_ha", + "broker.dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", + "broker.dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", + "broker.dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", + "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + ``` + + 最终生成文件如如果不大于 1GB,则为:`result_0.csv`。 + + 如果大于 1GB,则可能为 `result_0.csv, result_1.csv, ...`。 + +3. 示例3 + + 将 UNION 语句的查询结果导出到文件 `bos://bucket/result.txt`。指定导出格式为 PARQUET。使用 `my_broker` 并设置 hdfs 高可用信息。PARQUET 格式无需指定列分割符。 + + ``` + SELECT k1 FROM tbl1 UNION SELECT k2 FROM tbl1 + INTO OUTFILE "bos://bucket/result_" + FORMAT AS PARQUET + PROPERTIELS + ( + "broker.name" = "my_broker", + "broker.bos_endpoint" = "http://bj.bcebos.com", + "broker.bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", + "broker.bos_secret_accesskey" = "yyyyyyyyyyyyyyyyyyyyyyyyyy" + ); + ``` + + 最终生成文件如如果不大于 1GB,则为:`result_0.parquet`。 + + 如果大于 1GB,则可能为 `result_0.parquet, result_1.parquet, ...`。 + +## 返回结果 + +导出命令为同步命令。命令返回,即表示操作结束。 + +如果正常导出并返回,则结果如下: + +``` +mysql> SELECT * FROM tbl INTO OUTFILE ... Query OK, 100000 row affected (5.86 sec) +``` + +其中 `100000 row affected` 表示导出的结果集行数。 + +如果执行错误,则会返回错误信息,如: + +``` +mysql> SELECT * FROM tbl INTO OUTFILE ... ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ... +``` + +## 注意事项 + +* 查询结果是由单个 BE 节点,单线程导出的。因此导出时间和导出结果集大小正相关。 +* 导出命令不会检查文件及文件路径是否存在。是否会自动创建路径、或是否会覆盖已存在文件,完全由远端存储系统的语义决定。 +* 如果在导出过程中出现错误,可能会有导出文件残留在远端存储系统上。Doris 不会清理这些文件。需要用户手动清理。 +* 导出命令的超时时间同查询的超时时间。可以通过 `SET query_timeout=xxx` 进行设置。 +* 对于结果集为空的查询,依然会产生一个大小为0的文件。 +* 文件切分会保证一行数据完整的存储在单一文件中。因此文件的大小并不严格等于 `max_file_size`。 diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index 5c66fe3f6cab7b..17806d0312d379 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -246,7 +246,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_LOCAL, KW_LOCATION, KW_MAX, KW_MAX_VALUE, KW_MERGE, KW_MIN, KW_MINUTE, KW_MINUS, KW_MIGRATE, KW_MIGRATIONS, KW_MODIFY, KW_MONTH, KW_NAME, KW_NAMES, KW_NEGATIVE, KW_NO, KW_NOT, KW_NULL, KW_NULLS, - KW_OBSERVER, KW_OFFSET, KW_ON, KW_ONLY, KW_OPEN, KW_OR, KW_ORDER, KW_OUTER, KW_OVER, + KW_OBSERVER, KW_OFFSET, KW_ON, KW_ONLY, KW_OPEN, KW_OR, KW_ORDER, KW_OUTER, KW_OUTFILE, KW_OVER, KW_PARTITION, KW_PARTITIONS, KW_PASSWORD, KW_PATH, KW_PAUSE, KW_PIPE, KW_PRECEDING, KW_PLUGIN, KW_PLUGINS, KW_PRIMARY, @@ -469,6 +469,8 @@ nonterminal Boolean opt_builtin; nonterminal Boolean opt_tmp; +nonterminal OutFileClause opt_outfile; + precedence left KW_FULL, KW_MERGE; precedence left DOT; precedence left SET_VAR; @@ -2465,7 +2467,7 @@ delete_stmt ::= // even if the union has order by and limit. // ORDER BY and LIMIT bind to the preceding select statement by default. query_stmt ::= - opt_with_clause:w set_operand_list:operands + opt_with_clause:w set_operand_list:operands opt_outfile:outfile {: QueryStmt queryStmt = null; if (operands.size() == 1) { @@ -2474,15 +2476,27 @@ query_stmt ::= queryStmt = new SetOperationStmt(operands, null, LimitElement.NO_LIMIT); } queryStmt.setWithClause(w); + queryStmt.setOutFileClause(outfile); RESULT = queryStmt; :} - | opt_with_clause:w set_operation_with_order_by_or_limit:set_operation + | opt_with_clause:w set_operation_with_order_by_or_limit:set_operation opt_outfile:outfile {: set_operation.setWithClause(w); + set_operation.setOutFileClause(outfile); RESULT = set_operation; :} ; +opt_outfile ::= + {: + RESULT = null; + :} + | KW_INTO KW_OUTFILE STRING_LITERAL:file opt_file_format:fileFormat opt_properties:properties + {: + RESULT = new OutFileClause(file, fileFormat, properties); + :} + ; + opt_with_clause ::= KW_WITH with_view_def_list:list {: RESULT = new WithClause(list); :} diff --git a/fe/src/main/java/org/apache/doris/analysis/BaseViewStmt.java b/fe/src/main/java/org/apache/doris/analysis/BaseViewStmt.java index 49049bf816fbee..d78d8373f09443 100644 --- a/fe/src/main/java/org/apache/doris/analysis/BaseViewStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/BaseViewStmt.java @@ -17,9 +17,6 @@ package org.apache.doris.analysis; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; @@ -27,6 +24,11 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -141,5 +143,9 @@ protected void createColumnAndViewDefs(Analyzer analyzer) throws AnalysisExcepti @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); + + if (viewDefStmt.hasOutFileClause()) { + throw new AnalysisException("Not support OUTFILE clause in CREATE VIEW statement"); + } } } diff --git a/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java b/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java index a098ca69a9e8ba..8b3bac1cbcbe0e 100644 --- a/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java +++ b/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java @@ -19,6 +19,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.PrintableMap; import com.google.common.collect.Maps; @@ -78,4 +79,14 @@ public static BrokerDesc read(DataInput in) throws IOException { desc.readFields(in); return desc; } + + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append(" WITH BROKER ").append(name); + if (properties != null && !properties.isEmpty()) { + PrintableMap printableMap = new PrintableMap<>(properties, " = ", true, false); + sb.append(" (").append(printableMap.toString()).append(")"); + } + return sb.toString(); + } } diff --git a/fe/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/src/main/java/org/apache/doris/analysis/OutFileClause.java new file mode 100644 index 00000000000000..1d2a9057afe53b --- /dev/null +++ b/fe/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.ParseUtil; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TResultFileSinkOptions; + +import com.google.common.base.Strings; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +// For syntax select * from tbl INTO OUTFILE xxxx +public class OutFileClause { + private static final Logger LOG = LogManager.getLogger(OutFileClause.class); + + private static final String BROKER_PROP_PREFIX = "broker."; + private static final String PROP_BROKER_NAME = "broker.name"; + private static final String PROP_COLUMN_SEPARATOR = "column_separator"; + private static final String PROP_LINE_DELIMITER = "line_delimiter"; + private static final String PROP_MAX_FILE_SIZE = "max_file_size"; + + private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 * 1024; // 1GB + private static final long MIN_FILE_SIZE_BYTES = 5 * 1024 * 1024L; // 5MB + private static final long MAX_FILE_SIZE_BYTES = 2 * 1024 * 1024 * 1024L; // 2GB + + private String filePath; + private String format; + private Map properties; + + // set following members after analyzing + private String columnSeparator = "\t"; + private String lineDelimiter = "\n"; + private TFileFormatType fileFormatType; + private long maxFileSizeBytes = DEFAULT_MAX_FILE_SIZE_BYTES; + private BrokerDesc brokerDesc = null; + + public OutFileClause(String filePath, String format, Map properties) { + this.filePath = filePath; + this.format = Strings.isNullOrEmpty(format) ? "csv" : format.toLowerCase(); + this.properties = properties; + } + + public OutFileClause(OutFileClause other) { + this.filePath = other.filePath; + this.format = other.format; + this.properties = other.properties == null ? null : Maps.newHashMap(other.properties); + } + + public String getColumnSeparator() { + return columnSeparator; + } + + public String getLineDelimiter() { + return lineDelimiter; + } + + public TFileFormatType getFileFormatType() { + return fileFormatType; + } + + public long getMaxFileSizeBytes() { + return maxFileSizeBytes; + } + + public BrokerDesc getBrokerDesc() { + return brokerDesc; + } + + public void analyze(Analyzer analyzer) throws AnalysisException { + if (Strings.isNullOrEmpty(filePath)) { + throw new AnalysisException("Must specify file in OUTFILE clause"); + } + + if (!format.equals("csv")) { + throw new AnalysisException("Only support CSV format"); + } + fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; + + analyzeProperties(); + + if (brokerDesc == null) { + throw new AnalysisException("Must specify BROKER properties in OUTFILE clause"); + } + } + + private void analyzeProperties() throws AnalysisException { + if (properties == null || properties.isEmpty()) { + return; + } + + Set processedPropKeys = Sets.newHashSet(); + getBrokerProperties(processedPropKeys); + if (brokerDesc == null) { + return; + } + + if (properties.containsKey(PROP_COLUMN_SEPARATOR)) { + if (!isCsvFormat()) { + throw new AnalysisException(PROP_COLUMN_SEPARATOR + " is only for CSV format"); + } + columnSeparator = properties.get(PROP_COLUMN_SEPARATOR); + processedPropKeys.add(PROP_COLUMN_SEPARATOR); + } + + if (properties.containsKey(PROP_LINE_DELIMITER)) { + if (!isCsvFormat()) { + throw new AnalysisException(PROP_LINE_DELIMITER + " is only for CSV format"); + } + lineDelimiter = properties.get(PROP_LINE_DELIMITER); + processedPropKeys.add(PROP_LINE_DELIMITER); + } + + if (properties.containsKey(PROP_MAX_FILE_SIZE)) { + maxFileSizeBytes = ParseUtil.analyzeDataVolumn(properties.get(PROP_MAX_FILE_SIZE)); + if (maxFileSizeBytes > MAX_FILE_SIZE_BYTES || maxFileSizeBytes < MIN_FILE_SIZE_BYTES) { + throw new AnalysisException("max file size should between 5MB and 2GB. Given: " + maxFileSizeBytes); + } + processedPropKeys.add(PROP_MAX_FILE_SIZE); + } + + if (processedPropKeys.size() != properties.size()) { + LOG.debug("{} vs {}", processedPropKeys, properties); + throw new AnalysisException("Unknown properties: " + properties.keySet().stream() + .filter(k -> !processedPropKeys.contains(k)).collect(Collectors.toList())); + } + } + + private void getBrokerProperties(Set processedPropKeys) { + if (!properties.containsKey(PROP_BROKER_NAME)) { + return; + } + String brokerName = properties.get(PROP_BROKER_NAME); + processedPropKeys.add(PROP_BROKER_NAME); + + Map brokerProps = Maps.newHashMap(); + Iterator> iter = properties.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + if (entry.getKey().startsWith(BROKER_PROP_PREFIX) && !entry.getKey().equals(PROP_BROKER_NAME)) { + brokerProps.put(entry.getKey().substring(BROKER_PROP_PREFIX.length()), entry.getValue()); + processedPropKeys.add(entry.getKey()); + } + } + + brokerDesc = new BrokerDesc(brokerName, brokerProps); + } + + private boolean isCsvFormat() { + return fileFormatType == TFileFormatType.FORMAT_CSV_BZ2 + || fileFormatType == TFileFormatType.FORMAT_CSV_DEFLATE + || fileFormatType == TFileFormatType.FORMAT_CSV_GZ + || fileFormatType == TFileFormatType.FORMAT_CSV_LZ4FRAME + || fileFormatType == TFileFormatType.FORMAT_CSV_LZO + || fileFormatType == TFileFormatType.FORMAT_CSV_LZOP + || fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN; + } + + @Override + public OutFileClause clone() { + return new OutFileClause(this); + } + + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS ").append(format); + if (properties != null && !properties.isEmpty()) { + sb.append(" PROPERTIES("); + sb.append(new PrintableMap<>(properties, " = ", true, false)); + sb.append(")"); + } + return sb.toString(); + } + + public TResultFileSinkOptions toSinkOptions() { + TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions(filePath, fileFormatType); + if (isCsvFormat()) { + sinkOptions.setColumn_separator(columnSeparator); + sinkOptions.setLine_delimiter(lineDelimiter); + } + sinkOptions.setMax_file_size_bytes(maxFileSizeBytes); + if (brokerDesc != null) { + sinkOptions.setBroker_properties(brokerDesc.getProperties()); + // broker_addresses of sinkOptions will be set in Coordinator. + // Because we need to choose the nearest broker with the result sink node. + } + return sinkOptions; + } +} + + diff --git a/fe/src/main/java/org/apache/doris/analysis/QueryStmt.java b/fe/src/main/java/org/apache/doris/analysis/QueryStmt.java index e6f0bfb0a7fb3c..4da74e12d59160 100644 --- a/fe/src/main/java/org/apache/doris/analysis/QueryStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/QueryStmt.java @@ -23,12 +23,12 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.doris.qe.ConnectContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -110,6 +110,9 @@ public abstract class QueryStmt extends StatementBase { ///////////////////////////////////////// // END: Members that need to be reset() + // represent the "INTO OUTFILE" clause + protected OutFileClause outFileClause; + QueryStmt(ArrayList orderByElements, LimitElement limitElement) { this.orderByElements = orderByElements; this.limitElement = limitElement; @@ -124,6 +127,7 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); analyzeLimit(analyzer); if (hasWithClause()) withClause_.analyze(analyzer); + if (hasOutFileClause()) outFileClause.analyze(analyzer); } private void analyzeLimit(Analyzer analyzer) throws AnalysisException { @@ -553,12 +557,17 @@ public WithClause cloneWithClause() { return withClause_ != null ? withClause_.clone() : null; } + public OutFileClause cloneOutfileCluse() { + return outFileClause != null ? outFileClause.clone() : null; + } + /** * C'tor for cloning. */ protected QueryStmt(QueryStmt other) { super(other); withClause_ = other.cloneWithClause(); + outFileClause = other.cloneOutfileCluse(); orderByElements = other.cloneOrderByElements(); limitElement = other.limitElement.clone(); resultExprs = Expr.cloneList(other.resultExprs); @@ -597,4 +606,16 @@ public void setFromInsert(boolean value) { public abstract void substituteSelectList(Analyzer analyzer, List newColLabels) throws AnalysisException, UserException; + + public void setOutFileClause(OutFileClause outFileClause) { + this.outFileClause = outFileClause; + } + + public OutFileClause getOutFileClause() { + return outFileClause; + } + + public boolean hasOutFileClause() { + return outFileClause != null; + } } diff --git a/fe/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/src/main/java/org/apache/doris/analysis/SelectStmt.java index 122c5e5d94e2f7..98f21029e8172c 100644 --- a/fe/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -1540,6 +1540,10 @@ public String toSql() { if (hasLimitClause()) { strBuilder.append(limitElement.toSql()); } + + if (hasOutFileClause()) { + strBuilder.append(outFileClause.toSql()); + } return strBuilder.toString(); } diff --git a/fe/src/main/java/org/apache/doris/common/util/ParseUtil.java b/fe/src/main/java/org/apache/doris/common/util/ParseUtil.java index e51aeede69dc01..ed3c4947fe6b2c 100644 --- a/fe/src/main/java/org/apache/doris/common/util/ParseUtil.java +++ b/fe/src/main/java/org/apache/doris/common/util/ParseUtil.java @@ -18,7 +18,6 @@ package org.apache.doris.common.util; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.UserException; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -42,7 +41,7 @@ public class ParseUtil { private static Pattern dataVolumnPattern = Pattern.compile("(\\d+)(\\D*)"); - public static long analyzeDataVolumn(String dataVolumnStr) throws UserException { + public static long analyzeDataVolumn(String dataVolumnStr) throws AnalysisException { long dataVolumn = 0; Matcher m = dataVolumnPattern.matcher(dataVolumnStr); if (m.matches()) { diff --git a/fe/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/src/main/java/org/apache/doris/planner/PlanFragment.java index 8131b81982512a..ecff1a5401452f 100644 --- a/fe/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -17,19 +17,22 @@ package org.apache.doris.planner; -import org.apache.commons.collections.CollectionUtils; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; -import org.apache.doris.common.UserException; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.TreeNode; +import org.apache.doris.common.UserException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPlanFragment; +import org.apache.doris.thrift.TResultSinkType; + import com.google.common.base.Preconditions; -import org.apache.logging.log4j.Logger; + +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; @@ -174,8 +177,8 @@ public void finalize(Analyzer analyzer, boolean validateFileFormats) } // add ResultSink Preconditions.checkState(sink == null); - // we're streaming to an exchange node - ResultSink bufferSink = new ResultSink(planRoot.getId()); + // we're streaming to an result sink + ResultSink bufferSink = new ResultSink(planRoot.getId(), TResultSinkType.MYSQL_PROTOCAL); sink = bufferSink; } } diff --git a/fe/src/main/java/org/apache/doris/planner/Planner.java b/fe/src/main/java/org/apache/doris/planner/Planner.java index 0e82bd4bb3e7f8..1b283ec54a418c 100644 --- a/fe/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/src/main/java/org/apache/doris/planner/Planner.java @@ -195,9 +195,10 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue for (PlanFragment fragment : fragments) { fragment.finalize(analyzer, !queryOptions.allow_unsupported_formats); } - Collections.reverse(fragments); + setOutfileSink(queryStmt); + if (queryStmt instanceof SelectStmt) { SelectStmt selectStmt = (SelectStmt) queryStmt; if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) { @@ -210,6 +211,21 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue } } + // if query stmt has OUTFILE clause, set info into ResultSink. + // this should be done after fragments are generated. + private void setOutfileSink(QueryStmt queryStmt) { + if (!queryStmt.hasOutFileClause()) { + return; + } + PlanFragment topFragment = fragments.get(0); + if (!(topFragment.getSink() instanceof ResultSink)) { + return; + } + + ResultSink resultSink = (ResultSink) topFragment.getSink(); + resultSink.setOutfileInfo(queryStmt.getOutFileClause()); + } + /** * If there are unassigned conjuncts, returns a SelectNode on top of root that evaluate those conjuncts; otherwise * returns root unchanged. diff --git a/fe/src/main/java/org/apache/doris/planner/ResultSink.java b/fe/src/main/java/org/apache/doris/planner/ResultSink.java index e71242e9b465cc..688c2cbe8b7880 100644 --- a/fe/src/main/java/org/apache/doris/planner/ResultSink.java +++ b/fe/src/main/java/org/apache/doris/planner/ResultSink.java @@ -17,19 +17,33 @@ package org.apache.doris.planner; +import org.apache.doris.analysis.OutFileClause; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TResultFileSinkOptions; import org.apache.doris.thrift.TResultSink; +import org.apache.doris.thrift.TResultSinkType; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; /** - * Data sink that forwards data to an exchange node. + * Result sink that forwards data to + * 1. the FE data receiver, which result the final query result to user client. Or, + * 2. files that save the result data */ public class ResultSink extends DataSink { private final PlanNodeId exchNodeId; + private TResultSinkType sinkType; + private String brokerName; + private TResultFileSinkOptions fileSinkOptions; - public ResultSink(PlanNodeId exchNodeId) { + public ResultSink(PlanNodeId exchNodeId, TResultSinkType sinkType) { this.exchNodeId = exchNodeId; + this.sinkType = sinkType; } @Override @@ -43,6 +57,10 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) { protected TDataSink toThrift() { TDataSink result = new TDataSink(TDataSinkType.RESULT_SINK); TResultSink tResultSink = new TResultSink(); + tResultSink.setType(sinkType); + if (fileSinkOptions != null) { + tResultSink.setFile_options(fileSinkOptions); + } result.setResult_sink(tResultSink); return result; } @@ -56,6 +74,27 @@ public PlanNodeId getExchNodeId() { public DataPartition getOutputPartition() { return null; } -} -/* vim: set ts=4 sw=4 sts=4 tw=100 noet: */ + public boolean isOutputFileSink() { + return sinkType == TResultSinkType.FILE; + } + + public boolean needBroker() { + return !Strings.isNullOrEmpty(brokerName); + } + + public String getBrokerName() { + return brokerName; + } + + public void setOutfileInfo(OutFileClause outFileClause) { + sinkType = TResultSinkType.FILE; + fileSinkOptions = outFileClause.toSinkOptions(); + brokerName = outFileClause.getBrokerDesc() == null ? null : outFileClause.getBrokerDesc().getName(); + } + + public void setBrokerAddr(String ip, int port) { + Preconditions.checkNotNull(fileSinkOptions); + fileSinkOptions.setBroker_addresses(Lists.newArrayList(new TNetworkAddress(ip, port))); + } +} diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index ac123b646d5db3..36f9a0d5578bf9 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.Config; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; @@ -401,11 +402,21 @@ public void exec() throws Exception { PlanFragmentId topId = fragments.get(0).getFragmentId(); FragmentExecParams topParams = fragmentExecParamsMap.get(topId); if (topParams.fragment.getSink() instanceof ResultSink) { + TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; receiver = new ResultReceiver( topParams.instanceExecParams.get(0).instanceId, - addressToBackendID.get(topParams.instanceExecParams.get(0).host), - toBrpcHost(topParams.instanceExecParams.get(0).host), + addressToBackendID.get(execBeAddr), + toBrpcHost(execBeAddr), queryOptions.query_timeout * 1000); + + // set the broker address for OUTFILE sink + ResultSink resultSink = (ResultSink) topParams.fragment.getSink(); + if (resultSink.isOutputFileSink() && resultSink.needBroker()) { + FsBroker broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(resultSink.getBrokerName(), + execBeAddr.getHostname()); + resultSink.setBrokerAddr(broker.ip, broker.port); + } + } else { // This is a load process. this.queryOptions.setIs_report_success(true); diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index 9baea6eca44ba5..90133e2458e876 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -578,12 +578,23 @@ private void handleQueryStmt() throws Exception { // so We need to send fields after first batch arrived // send result + // 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx, + // We will not send real query result to client. Instead, we only send OK to client with + // number of rows selected. For example: + // mysql> select * from tbl1 into outfile xxx; + // Query OK, 10 rows affected (0.01 sec) + // + // 2. If this is a query, send the result expr fields first, and send result data back to client. RowBatch batch; MysqlChannel channel = context.getMysqlChannel(); - sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); + boolean isOutfileQuery = queryStmt.hasOutFileClause(); + if (!isOutfileQuery) { + sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); + } while (true) { batch = coord.getNext(); - if (batch.getBatch() != null) { + // for outfile query, there will be only one empty batch send back with eos flag + if (batch.getBatch() != null && !isOutfileQuery) { for (ByteBuffer row : batch.getBatch().getRows()) { channel.sendOnePacket(row); } @@ -595,7 +606,11 @@ private void handleQueryStmt() throws Exception { } statisticsForAuditLog = batch.getQueryStatistics(); - context.getState().setEof(); + if (!isOutfileQuery) { + context.getState().setEof(); + } else { + context.getState().setOk(statisticsForAuditLog.returned_rows, 0, ""); + } } // Process a select statement. @@ -612,6 +627,10 @@ private void handleInsertStmt() throws Exception { insertStmt = (InsertStmt) parsedStmt; } + if (insertStmt.getQueryStmt().hasOutFileClause()) { + throw new DdlException("Not support OUTFILE clause in INSERT statement"); + } + // assign query id before explain query return UUID uuid = insertStmt.getUUID(); context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits())); @@ -913,3 +932,4 @@ public PQueryStatistics getQueryStatisticsForAuditLog() { return statisticsForAuditLog; } } + diff --git a/fe/src/main/jflex/sql_scanner.flex b/fe/src/main/jflex/sql_scanner.flex index ec53beaef0a8ff..e6d4ea0a1a482a 100644 --- a/fe/src/main/jflex/sql_scanner.flex +++ b/fe/src/main/jflex/sql_scanner.flex @@ -264,6 +264,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("or", new Integer(SqlParserSymbols.KW_OR)); keywordMap.put("order", new Integer(SqlParserSymbols.KW_ORDER)); keywordMap.put("outer", new Integer(SqlParserSymbols.KW_OUTER)); + keywordMap.put("outfile", new Integer(SqlParserSymbols.KW_OUTFILE)); keywordMap.put("over", new Integer(SqlParserSymbols.KW_OVER)); keywordMap.put("partition", new Integer(SqlParserSymbols.KW_PARTITION)); keywordMap.put("partitions", new Integer(SqlParserSymbols.KW_PARTITIONS)); diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index ef13a405988870..09af10c3c1caf7 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -23,6 +23,7 @@ option java_package = "org.apache.doris.proto"; message PQueryStatistics { optional int64 scan_rows = 1; optional int64 scan_bytes = 2; + optional int64 returned_rows = 3; } message PRowBatch { diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 2792a07276517f..343cbbb7cad45b 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -22,6 +22,7 @@ include "Exprs.thrift" include "Types.thrift" include "Descriptors.thrift" include "Partitions.thrift" +include "PlanNodes.thrift" enum TDataSinkType { DATA_STREAM_SINK, @@ -33,6 +34,21 @@ enum TDataSinkType { MEMORY_SCRATCH_SINK } +enum TResultSinkType { + MYSQL_PROTOCAL, + FILE +} + +struct TResultFileSinkOptions { + 1: required string file_path + 2: required PlanNodes.TFileFormatType file_format + 3: optional string column_separator // only for csv + 4: optional string line_delimiter // only for csv + 5: optional i64 max_file_size_bytes + 6: optional list broker_addresses; // only for remote file + 7: optional map broker_properties // only for remote file +} + struct TMemoryScratchSink { } @@ -52,8 +68,9 @@ struct TDataStreamSink { 3: optional bool ignore_not_found } -// Reserved for struct TResultSink { + 1: optional TResultSinkType type; + 2: optional TResultFileSinkOptions file_options; } struct TMysqlTableSink {