Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ set(EXEC_FILES
broker_writer.cpp
parquet_scanner.cpp
parquet_reader.cpp
parquet_writer.cpp
orc_scanner.cpp
json_scanner.cpp
assert_num_rows_node.cpp
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/broker_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class BrokerWriter : public FileWriter {
BrokerWriter(ExecEnv* env,
const std::vector<TNetworkAddress>& broker_addresses,
const std::map<std::string, std::string>& properties,
const std::string& dir,
const std::string& path,
int64_t start_offset);
virtual ~BrokerWriter();

Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/local_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

namespace doris {

class RuntimeState;

class LocalFileWriter : public FileWriter {
public:
LocalFileWriter(const std::string& path, int64_t start_offset);
virtual ~LocalFileWriter();

Status open() override;
Status open() override;

virtual Status write(const uint8_t* buf, size_t buf_len, size_t* written_len) override;

Expand Down
91 changes: 91 additions & 0 deletions be/src/exec/parquet_writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/parquet_writer.h"

#include <time.h>
#include <arrow/status.h>
#include <arrow/array.h>

#include "exec/file_writer.h"
#include "common/logging.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/TPaloBrokerService.h"
#include "runtime/broker_mgr.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/tuple.h"
#include "runtime/descriptors.h"
#include "runtime/mem_pool.h"
#include "util/thrift_util.h"

namespace doris {

/// ParquetOutputStream
ParquetOutputStream::ParquetOutputStream(FileWriter* file_writer): _file_writer(file_writer) {
set_mode(arrow::io::FileMode::WRITE);
}

ParquetOutputStream::~ParquetOutputStream() {
Close();
}

arrow::Status ParquetOutputStream::Write(const void* data, int64_t nbytes) {
size_t written_len = 0;
Status st = _file_writer->write(reinterpret_cast<const uint8_t*>(data), nbytes, &written_len);
if (!st.ok()) {
return arrow::Status::IOError(st.get_error_msg());
}
_cur_pos += written_len;
return arrow::Status::OK();
}

arrow::Status ParquetOutputStream::Tell(int64_t* position) const {
*position = _cur_pos;
return arrow::Status::OK();
}

arrow::Status ParquetOutputStream::Close() {
Status st = _file_writer->close();
if (!st.ok()) {
return arrow::Status::IOError(st.get_error_msg());
}
_is_closed = true;
return arrow::Status::OK();
}

/// ParquetWriterWrapper
ParquetWriterWrapper::ParquetWriterWrapper(FileWriter *file_writer, const std::vector<ExprContext*>& output_expr_ctxs) :
_output_expr_ctxs(output_expr_ctxs) {
// TODO(cmy): implement
_outstream = new ParquetOutputStream(file_writer);
}

Status ParquetWriterWrapper::write(const RowBatch& row_batch) {
// TODO(cmy): implement
return Status::OK();
}

void ParquetWriterWrapper::close() {
// TODO(cmy): implement
}

ParquetWriterWrapper::~ParquetWriterWrapper() {
close();
}

} // end namespace
81 changes: 81 additions & 0 deletions be/src/exec/parquet_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <stdint.h>

#include <string>
#include <map>
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <arrow/io/file.h>
#include <arrow/io/interfaces.h>
#include <arrow/buffer.h>
#include <parquet/api/reader.h>
#include <parquet/api/writer.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>

#include "common/status.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/PlanNodes_types.h"

namespace doris {

class ExprContext;
class FileWriter;
class RowBatch;

class ParquetOutputStream : public arrow::io::OutputStream {
public:
ParquetOutputStream(FileWriter* file_writer);
virtual ~ParquetOutputStream();

arrow::Status Write(const void* data, int64_t nbytes) override;
// return the current write position of the stream
arrow::Status Tell(int64_t* position) const override;
arrow::Status Close() override;

bool closed() const override {
return _is_closed;
}
private:
FileWriter* _file_writer; // not owned
int64_t _cur_pos; // current write position
bool _is_closed = false;
};

// a wrapper of parquet output stream
class ParquetWriterWrapper {
public:
ParquetWriterWrapper(FileWriter *file_writer, const std::vector<ExprContext*>& output_expr_ctxs);
virtual ~ParquetWriterWrapper();

Status write(const RowBatch& row_batch);

void close();

private:
ParquetOutputStream* _outstream;
const std::vector<ExprContext*>& _output_expr_ctxs;
};

}

4 changes: 3 additions & 1 deletion be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ set(RUNTIME_FILES
raw_value.cpp
raw_value_ir.cpp
result_sink.cpp
result_writer.cpp
result_buffer_mgr.cpp
result_writer.cpp
row_batch.cpp
runtime_state.cpp
string_value.cpp
Expand Down Expand Up @@ -105,6 +105,8 @@ set(RUNTIME_FILES
result_queue_mgr.cpp
memory_scratch_sink.cpp
external_scan_context_mgr.cpp
file_result_writer.cpp
mysql_result_writer.cpp
memory/system_allocator.cpp
memory/chunk_allocator.cpp
)
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ class BufferControlBlock {
void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
_query_statistics = statistics;
}

void update_num_written_rows(int64_t num_rows) {
// _query_statistics may be null when the result sink init failed
// or some other failure.
// and the number of written rows is only needed when all things go well.
if (_query_statistics.get() != nullptr) {
_query_statistics->set_returned_rows(num_rows);
}
}
private:
typedef std::list<TFetchDataResult*> ResultQueue;

Expand Down
Loading