Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ int StreamLoadAction::on_header(HttpRequest* req) {
}

LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db
<< ", tbl=" << ctx->table << ", group_commit=" << ctx->group_commit;
<< ", tbl=" << ctx->table << ", group_commit=" << ctx->group_commit
<< ", HTTP headers=" << req->get_all_headers();
ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos();

if (st.ok()) {
Expand Down
397 changes: 397 additions & 0 deletions be/src/http/action/stream_load_forward_handler.cpp

Large diffs are not rendered by default.

136 changes: 136 additions & 0 deletions be/src/http/action/stream_load_forward_handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <event2/http.h>

#include <deque>
#include <memory>
#include <mutex>
#include <string>

#include "common/status.h"
#include "http/http_handler.h"
#include "http/http_request.h"
#include "util/byte_buffer.h"

namespace doris {

// Context for storing stream load forward request information
class StreamLoadForwardContext {
public:
StreamLoadForwardContext() = default;
~StreamLoadForwardContext() {
struct evhttp_connection* conn_to_free = nullptr;
{
std::lock_guard<std::mutex> lock(_mutex);
// forward request will be released by libevent automatically
forward_req = nullptr;
if (conn && !connection_closed) {
// Only free connection if it wasn't already released by libevent
conn_to_free = conn;
conn = nullptr;
}
}
// Free connection outside of mutex to avoid deadlock with connection close callback
if (conn_to_free) {
evhttp_connection_free(conn_to_free);
}
}

void set_forward_request(evhttp_request* req) {
std::lock_guard<std::mutex> lock(_mutex);
forward_req = req;
}

evhttp_request* get_forward_request() {
std::lock_guard<std::mutex> lock(_mutex);
return forward_req;
}

void handle_connection_close() {
std::lock_guard<std::mutex> lock(_mutex);
connection_closed = true;
// Connection has been released by libevent automatically, set pointer to null
// to prevent double-free in destructor
conn = nullptr;
}

bool is_connection_closed() const {
std::lock_guard<std::mutex> lock(_mutex);
return connection_closed;
}

struct evhttp_connection* conn {nullptr};
// Original request reference, lifecycle managed by HTTP framework
HttpRequest* original_req {nullptr};

std::string target_host;
int target_port {0};

// Buffer for collecting response data
std::string response_data;

std::deque<ByteBufferPtr> request_data_chunks;
size_t total_request_size = 0;

private:
mutable std::mutex _mutex;
struct evhttp_request* forward_req {nullptr};
bool connection_closed {false};
};

// Stream Load request forward handler
// Forwards Stream Load requests to other BE nodes
// Supports streaming forward, maintains original request path format: /api/{db}/{table}/_stream_load_forward
class StreamLoadForwardHandler : public HttpHandler {
public:
StreamLoadForwardHandler() = default;
~StreamLoadForwardHandler() override = default;

void handle(HttpRequest* req) override;

bool request_will_be_read_progressively() override { return true; }

int on_header(HttpRequest* req) override;

void on_chunk_data(HttpRequest* req) override;

private:
Status init_forward_request(HttpRequest* req, const std::string& target_host, int target_port,
std::shared_ptr<StreamLoadForwardContext>& ctx);

static void forward_request_done(struct evhttp_request* req, void* arg);
static void forward_request_chunked_cb(struct evhttp_request* req, void* arg);
static void forward_connection_close_cb(struct evhttp_connection* conn, void* arg);

// Response helper functions
static void send_complete_response(struct evhttp_request* req, StreamLoadForwardContext* ctx,
int response_code);
static void copy_response_headers(struct evkeyvalq* input_headers,
struct evkeyvalq* output_headers);

Status parse_forward_target(const std::string& forward_to, std::string& host, int& port);

std::string build_forward_url(HttpRequest* req);

void setup_forward_headers(HttpRequest* req, struct evhttp_request* forward_req,
const std::string& target_host, int target_port);
};

} // namespace doris
8 changes: 8 additions & 0 deletions be/src/http/http_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ const std::string& HttpRequest::param(const std::string& key) const {
return iter->second;
}

std::string HttpRequest::get_all_headers() const {
std::stringstream headers;
for (const auto& header : _headers) {
headers << header.first << ":" << header.second + ", ";
}
return headers.str();
}

void HttpRequest::add_output_header(const char* key, const char* value) {
evhttp_add_header(evhttp_request_get_output_headers(_ev_req), key, value);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/http/http_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class HttpRequest {
// return params
const StringCaseUnorderedMap<std::string>& headers() { return _headers; }

std::string get_all_headers() const;

// return params
std::map<std::string, std::string>* params() { return &_params; }

Expand Down
6 changes: 6 additions & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
#include "http/action/snapshot_action.h"
#include "http/action/stream_load.h"
#include "http/action/stream_load_2pc.h"
#include "http/action/stream_load_forward_handler.h"
#include "http/action/tablet_migration_action.h"
#include "http/action/tablets_distribution_action.h"
#include "http/action/tablets_info_action.h"
Expand Down Expand Up @@ -133,6 +134,11 @@ Status HttpService::start() {
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load_2pc",
streamload_2pc_action);

// register stream load forward handler
auto* forward_handler = _pool.add(new StreamLoadForwardHandler());
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load_forward",
forward_handler);

// register http_stream
HttpStreamAction* http_stream_action = _pool.add(new HttpStreamAction(_env));
_ev_http_server->register_handler(HttpMethod::PUT, "/api/_http_stream", http_stream_action);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3253,6 +3253,14 @@ public static int metaServiceRpcRetryTimes() {
+ "public-private/public/private/direct/random-be and empty string" })
public static String streamload_redirect_policy = "";

@ConfField(mutable = true, description = {
"存算分离模式下是否启用group commit的streamload BE转发功能。"
+ "解决LB随机转发导致group commit攒批失效的问题,通过BE二次转发确保同表请求到达同一BE节点。",
"Whether to enable group commit streamload BE forward feature in cloud mode. "
+ "Solves the issue where LB random forwarding breaks group commit batching "
+ "by implementing BE-level forwarding to ensure same-table requests reach the same BE node." })
public static boolean enable_group_commit_streamload_be_forward = false;

@ConfField(description = {"存算分离模式下建表是否检查残留recycler key, 默认true",
"create table in cloud mode, check recycler key remained, default true"})
public static boolean check_create_table_recycle_key_remained = true;
Expand Down
Loading
Loading