Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[load-refactor](tvf pipe) step-1 Support stream tvf #20032

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/http/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ add_library(Webserver STATIC
action/metrics_action.cpp
action/stream_load.cpp
action/stream_load_2pc.cpp
action/stream_tvf_action.cpp
action/meta_action.cpp
action/compaction_action.cpp
action/config_action.cpp
Expand Down
34 changes: 34 additions & 0 deletions be/src/http/action/stream_tvf_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "stream_tvf_action.h"
namespace doris{
StreamTvfAction::StreamTvfAction(ExecEnv *exec_env) :_exec_env(exec_env){}

void StreamTvfAction::handle(HttpRequest *req) {

}

int StreamTvfAction::on_header(HttpRequest *req) {
return HttpHandler::on_header(req);
}

void StreamTvfAction::on_chunk_data(HttpRequest *req) {
HttpHandler::on_chunk_data(req);
}
} // namespace doris

42 changes: 42 additions & 0 deletions be/src/http/action/stream_tvf_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "http/http_handler.h"

class ExecEnv;

namespace doris {
class StreamTvfAction : public HttpHandler {
public:
StreamTvfAction(ExecEnv *exec_env);
~StreamTvfAction() override = default;

void handle(HttpRequest* req) override;

bool request_will_be_read_progressively() override { return true; }

int on_header(HttpRequest* req) override;

void on_chunk_data(HttpRequest* req) override;
private:
ExecEnv *_exec_env;
};
} // namespace doris


6 changes: 6 additions & 0 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
return Status::OK();
}

Status FileFactory::create_stream_tvf_pipe_reader(io::FileReaderSPtr * file_reader){
*file_reader = std::make_shared<io::StreamLoadPipe>(io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
0/* total_length */);
return Status::OK();
}

Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
std::shared_ptr<io::FileSystem>* hdfs_file_system,
io::FileReaderSPtr* reader,
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/file_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class FileFactory {
// Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader);

static Status create_stream_tvf_pipe_reader(io::FileReaderSPtr *file_reader);

static Status create_hdfs_reader(const THdfsParams& hdfs_params, const std::string& path,
std::shared_ptr<io::FileSystem>* hdfs_file_system,
io::FileReaderSPtr* reader,
Expand Down Expand Up @@ -111,6 +113,7 @@ class FileFactory {
}
__builtin_unreachable();
}

};

} // namespace doris
6 changes: 6 additions & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "http/action/snapshot_action.h"
#include "http/action/stream_load.h"
#include "http/action/stream_load_2pc.h"
#include "http/action/stream_tvf_action.h"
#include "http/action/tablet_migration_action.h"
#include "http/action/tablets_distribution_action.h"
#include "http/action/tablets_info_action.h"
Expand Down Expand Up @@ -78,6 +79,11 @@ Status HttpService::start() {
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load_2pc",
streamload_2pc_action);

// register stream tvf
StreamTvfAction* stream_tvf_action = _pool.add(new StreamTvfAction(_env));
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_tvf",
stream_tvf_action);

// register download action
std::vector<std::string> allow_paths;
for (auto& path : _env->store_paths()) {
Expand Down
29 changes: 15 additions & 14 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ Status CsvReader::init_reader(bool is_load) {
io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx,
io::PrefetchRange(_range.start_offset, _range.size)));
}
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
_params.file_type != TFileType::FILE_BROKER) {
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM
&&_params.file_type != TFileType::FILE_BROKER) {
return Status::EndOfFile("init reader failed, empty csv file: " + _range.path);
}

Expand Down Expand Up @@ -635,11 +635,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
return Status::InvalidArgument(
"start offset of TFileRangeDesc must be zero in get parsered schema");
}
if (_params.file_type == TFileType::FILE_STREAM ||
_params.file_type == TFileType::FILE_BROKER) {
if (_params.file_type == TFileType::FILE_BROKER) {
return Status::InternalError(
"Getting parsered schema from csv file do not support stream load and broker "
"load.");
"Getting parsered schema from csv file do not support broker load");
}

// csv file without names line and types line.
Expand All @@ -657,14 +655,17 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
}
}

_file_description.start_offset = start_offset;
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
reader_options.modification_time =
_range.__isset.modification_time ? _range.modification_time : 0;
RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description,
&_file_system, &_file_reader, reader_options));
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
_params.file_type != TFileType::FILE_BROKER) {
if (_system_properties.system_type== TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_stream_tvf_pipe_reader(&_file_reader));
} else {
_file_description.start_offset = start_offset;
io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state);
reader_options.modification_time =
_range.__isset.modification_time ? _range.modification_time : 0;
RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description,
&_file_system, &_file_reader, reader_options));
}
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&_params.file_type != TFileType::FILE_BROKER) {
return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.tablefunction;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileType;

import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.InputStreamEntity;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;

/**
* The Implement of table valued function
* stream(xxx).
*/
public class StreamTableValuedFunction extends ExternalFileTableValuedFunction {
public static final String NAME = "stream";

private final HttpPut requset;

public StreamTableValuedFunction(Map<String, String> params) {
requset = getHttpRequest(params);
fileStatuses.add(new TBrokerFileStatus("", false, 1000, false));
}

private HttpPut getHttpRequest(Map<String, String> params) {
// for test:
HttpPut httpPut = new HttpPut();
httpPut.addHeader("Host", "127.0.0.1:8040");
httpPut.addHeader("column_separator", ",");
httpPut.addHeader("Expect", "100-continue");

String file = "/Users/lian/Work/doris/test.csv";
InputStreamEntity entity = null;
try {
entity = new InputStreamEntity(Files.newInputStream(Paths.get(file)));
} catch (IOException e) {
throw new RuntimeException(e);
}

httpPut.setEntity(entity);
return httpPut;
}

@Override
public TFileType getTFileType() {
return TFileType.FILE_STREAM;
}

@Override
public String getFilePath() {
return requset.toString();
}

@Override
public BrokerDesc getBrokerDesc() {
return new BrokerDesc("StreamTvfBroker", StorageType.STREAM, locationProperties);
}

@Override
public String getTableName() {
return "StreamTableValuedFunction";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map<String
return new BackendsTableValuedFunction(params);
case ResourceGroupsTableValuedFunction.NAME:
return new ResourceGroupsTableValuedFunction(params);
case StreamTableValuedFunction.NAME:
return new StreamTableValuedFunction(params);
default:
throw new AnalysisException("Could not find table function " + funcName);
}
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ message PTabletWriteSlaveDoneResult {

message PFetchTableSchemaRequest {
optional bytes file_scan_range = 1;
optional int64 stream_tvf_id = 2;
};

message PFetchTableSchemaResult {
Expand Down