diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt index 0ae56e86e9e3fb..da148ed0920890 100644 --- a/be/src/http/CMakeLists.txt +++ b/be/src/http/CMakeLists.txt @@ -48,6 +48,7 @@ add_library(Webserver STATIC action/metrics_action.cpp action/stream_load.cpp action/stream_load_2pc.cpp + action/stream_tvf_action.cpp action/meta_action.cpp action/compaction_action.cpp action/config_action.cpp diff --git a/be/src/http/action/stream_tvf_action.cpp b/be/src/http/action/stream_tvf_action.cpp new file mode 100644 index 00000000000000..7a65cd47e548c0 --- /dev/null +++ b/be/src/http/action/stream_tvf_action.cpp @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "stream_tvf_action.h" +namespace doris{ + StreamTvfAction::StreamTvfAction(ExecEnv *exec_env) :_exec_env(exec_env){} + + void StreamTvfAction::handle(HttpRequest *req) { + + } + + int StreamTvfAction::on_header(HttpRequest *req) { + return HttpHandler::on_header(req); + } + + void StreamTvfAction::on_chunk_data(HttpRequest *req) { + HttpHandler::on_chunk_data(req); + } +} // namespace doris + diff --git a/be/src/http/action/stream_tvf_action.h b/be/src/http/action/stream_tvf_action.h new file mode 100644 index 00000000000000..2c13b9d094a846 --- /dev/null +++ b/be/src/http/action/stream_tvf_action.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "http/http_handler.h" + +class ExecEnv; + +namespace doris { + class StreamTvfAction : public HttpHandler { + public: + StreamTvfAction(ExecEnv *exec_env); + ~StreamTvfAction() override = default; + + void handle(HttpRequest* req) override; + + bool request_will_be_read_progressively() override { return true; } + + int on_header(HttpRequest* req) override; + + void on_chunk_data(HttpRequest* req) override; + private: + ExecEnv *_exec_env; + }; +} // namespace doris + + diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 65bc429d1209ae..d3cdb5d0140668 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -153,6 +153,12 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS return Status::OK(); } +Status FileFactory::create_stream_tvf_pipe_reader(io::FileReaderSPtr * file_reader){ + *file_reader = std::make_shared(io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + 0/* total_length */); + return Status::OK(); +} + Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, std::shared_ptr* hdfs_file_system, io::FileReaderSPtr* reader, diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 2e034b1ab8ae92..536734e846b8ee 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -78,6 +78,8 @@ class FileFactory { // Create FileReader for stream load pipe static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader); + static Status create_stream_tvf_pipe_reader(io::FileReaderSPtr *file_reader); + static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path, std::shared_ptr* hdfs_file_system, io::FileReaderSPtr* reader, @@ -111,6 +113,7 @@ class FileFactory { } __builtin_unreachable(); } + }; } // namespace doris diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 65904c83be9488..ac0b9e6e53e7e0 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -41,6 +41,7 @@ #include "http/action/snapshot_action.h" #include "http/action/stream_load.h" #include "http/action/stream_load_2pc.h" +#include "http/action/stream_tvf_action.h" #include "http/action/tablet_migration_action.h" #include "http/action/tablets_distribution_action.h" #include "http/action/tablets_info_action.h" @@ -78,6 +79,11 @@ Status HttpService::start() { _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load_2pc", streamload_2pc_action); + // register stream tvf + StreamTvfAction* stream_tvf_action = _pool.add(new StreamTvfAction(_env)); + _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_tvf", + stream_tvf_action); + // register download action std::vector allow_paths; for (auto& path : _env->store_paths()) { diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 7cb36e94c72ca2..078b5e0700e6c9 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -177,8 +177,8 @@ Status CsvReader::init_reader(bool is_load) { io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx, io::PrefetchRange(_range.start_offset, _range.size))); } - if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && - _params.file_type != TFileType::FILE_BROKER) { + if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM + &&_params.file_type != TFileType::FILE_BROKER) { return Status::EndOfFile("init reader failed, empty csv file: " + _range.path); } @@ -635,11 +635,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { return Status::InvalidArgument( "start offset of TFileRangeDesc must be zero in get parsered schema"); } - if (_params.file_type == TFileType::FILE_STREAM || - _params.file_type == TFileType::FILE_BROKER) { + if (_params.file_type == TFileType::FILE_BROKER) { return Status::InternalError( - "Getting parsered schema from csv file do not support stream load and broker " - "load."); + "Getting parsered schema from csv file do not support broker load"); } // csv file without names line and types line. @@ -657,14 +655,17 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { } } - _file_description.start_offset = start_offset; - io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); - reader_options.modification_time = - _range.__isset.modification_time ? _range.modification_time : 0; - RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description, - &_file_system, &_file_reader, reader_options)); - if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && - _params.file_type != TFileType::FILE_BROKER) { + if (_system_properties.system_type== TFileType::FILE_STREAM) { + RETURN_IF_ERROR(FileFactory::create_stream_tvf_pipe_reader(&_file_reader)); + } else { + _file_description.start_offset = start_offset; + io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); + reader_options.modification_time = + _range.__isset.modification_time ? _range.modification_time : 0; + RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description, + &_file_system, &_file_reader, reader_options)); + } + if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&_params.file_type != TFileType::FILE_BROKER) { return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/StreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/StreamTableValuedFunction.java new file mode 100644 index 00000000000000..1c7cd44857d109 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/StreamTableValuedFunction.java @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; + +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.InputStreamEntity; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; + +/** + * The Implement of table valued function + * stream(xxx). + */ +public class StreamTableValuedFunction extends ExternalFileTableValuedFunction { + public static final String NAME = "stream"; + + private final HttpPut requset; + + public StreamTableValuedFunction(Map params) { + requset = getHttpRequest(params); + fileStatuses.add(new TBrokerFileStatus("", false, 1000, false)); + } + + private HttpPut getHttpRequest(Map params) { + // for test: + HttpPut httpPut = new HttpPut(); + httpPut.addHeader("Host", "127.0.0.1:8040"); + httpPut.addHeader("column_separator", ","); + httpPut.addHeader("Expect", "100-continue"); + + String file = "/Users/lian/Work/doris/test.csv"; + InputStreamEntity entity = null; + try { + entity = new InputStreamEntity(Files.newInputStream(Paths.get(file))); + } catch (IOException e) { + throw new RuntimeException(e); + } + + httpPut.setEntity(entity); + return httpPut; + } + + @Override + public TFileType getTFileType() { + return TFileType.FILE_STREAM; + } + + @Override + public String getFilePath() { + return requset.toString(); + } + + @Override + public BrokerDesc getBrokerDesc() { + return new BrokerDesc("StreamTvfBroker", StorageType.STREAM, locationProperties); + } + + @Override + public String getTableName() { + return "StreamTableValuedFunction"; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index d6ade9b5e98711..aba87e9ccf7b06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -55,6 +55,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map