diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index cde4876388d16d..56412863b38d14 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -215,7 +215,8 @@ int StreamLoadAction::on_header(HttpRequest* req) { } LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db - << ", tbl=" << ctx->table << ", group_commit=" << ctx->group_commit; + << ", tbl=" << ctx->table << ", group_commit=" << ctx->group_commit + << ", HTTP headers=" << req->get_all_headers(); ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos(); if (st.ok()) { diff --git a/be/src/http/action/stream_load_forward_handler.cpp b/be/src/http/action/stream_load_forward_handler.cpp new file mode 100644 index 00000000000000..3f1719312fb686 --- /dev/null +++ b/be/src/http/action/stream_load_forward_handler.cpp @@ -0,0 +1,397 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/stream_load_forward_handler.h" + +#include +#include +#include +#include + +#include "common/config.h" +#include "common/logging.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "util/byte_buffer.h" + +namespace doris { + +int StreamLoadForwardHandler::on_header(HttpRequest* req) { + std::ostringstream headers_info; + const auto& headers = req->headers(); + for (const auto& header : headers) { + headers_info << header.first << ":" << header.second << " "; + } + + std::ostringstream params_info; + const auto* params = req->params(); + for (const auto& param : *params) { + params_info << param.first << "=" << param.second << " "; + } + + LOG(INFO) << "StreamLoadForward request started - " + << "path: " << req->raw_path() << ", remote: " << req->remote_host() << ", headers: [" + << headers_info.str() << "]" + << ", params: [" << params_info.str() << "]"; + + std::shared_ptr ctx(new StreamLoadForwardContext()); + req->set_handler_ctx(ctx); + + auto it = params->find("forward_to"); + if (it == params->end()) { + LOG(WARNING) << "StreamLoadForward failed - missing forward_to parameter, path: " + << req->raw_path(); + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, + "Missing required parameter 'forward_to'. " + "Usage: ?forward_to=host:port"); + return HttpStatus::BAD_REQUEST; + } + + std::string target_host; + int target_port; + Status st = parse_forward_target(it->second, target_host, target_port); + if (!st.ok()) { + LOG(WARNING) << "StreamLoadForward failed - invalid forward target: " << st.to_string() + << ", path: " << req->raw_path(); + HttpChannel::send_reply( + req, HttpStatus::BAD_REQUEST, + "Invalid forward_to parameter: " + st.to_string() + ". Expected format: host:port"); + return HttpStatus::BAD_REQUEST; + } + + ctx->target_host = target_host; + ctx->target_port = target_port; + ctx->original_req = req; + + Status init_st = init_forward_request(req, target_host, target_port, ctx); + if (!init_st.ok()) { + LOG(WARNING) << "StreamLoadForward failed - failed to initialize forward request: " + << init_st.to_string() << ", target: " << target_host << ":" << target_port + << ", path: " << req->raw_path(); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + "Failed to initialize forward request: " + init_st.to_string()); + return HttpStatus::INTERNAL_SERVER_ERROR; + } + + return HttpStatus::OK; +} + +void StreamLoadForwardHandler::handle(HttpRequest* req) { + auto ctx = std::static_pointer_cast(req->handler_ctx()); + if (!ctx) { + LOG(WARNING) << "StreamLoadForward failed - context not found, path: " << req->raw_path(); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + "Internal error: context not found"); + return; + } + + auto* forward_req = ctx->get_forward_request(); + if (!forward_req) { + LOG(WARNING) << "Forward request not ready, path: " << req->raw_path(); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + "Internal error: forward request not initialized"); + return; + } + + setup_forward_headers(req, forward_req, ctx->target_host, ctx->target_port); + + if (!ctx->request_data_chunks.empty()) { + evbuffer* output = evhttp_request_get_output_buffer(forward_req); + while (!ctx->request_data_chunks.empty()) { + const auto& bb = ctx->request_data_chunks.front(); + if (evbuffer_add(output, bb->ptr, bb->limit) != 0) { + LOG(WARNING) << "Failed to add buffered data to output buffer, chunk size: " + << bb->limit << ", total size: " << ctx->total_request_size + << ", path: " << req->raw_path(); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + "Failed to prepare forward data"); + return; + } + ctx->request_data_chunks.pop_front(); + } + } + + if (evhttp_make_request(ctx->conn, forward_req, EVHTTP_REQ_PUT, + build_forward_url(req).c_str()) != 0) { + LOG(WARNING) << "Failed to make forward request to " << ctx->target_host << ":" + << ctx->target_port << ", path: " << req->raw_path(); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + "Failed to forward request to target server: " + ctx->target_host + + ":" + std::to_string(ctx->target_port)); + return; + } + + LOG(INFO) << "StreamLoadForward request sent - data size: " << ctx->total_request_size + << ", target: " << ctx->target_host << ":" << ctx->target_port + << ", path: " << req->raw_path(); +} + +void StreamLoadForwardHandler::on_chunk_data(HttpRequest* req) { + auto ctx = std::static_pointer_cast(req->handler_ctx()); + if (!ctx) { + LOG(WARNING) << "No context found for chunk data"; + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + "Internal error: context not found"); + return; + } + + evbuffer* input = evhttp_request_get_input_buffer(req->get_evhttp_request()); + while (evbuffer_get_length(input) > 0) { + size_t remaining_length = evbuffer_get_length(input); + ByteBufferPtr bb; + Status st = ByteBuffer::allocate(remaining_length, &bb); + if (!st.ok()) { + LOG(WARNING) << "Failed to allocate ByteBuffer: " << st.to_string() + << ", path: " << req->raw_path(); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + "Failed to allocate memory for request data"); + return; + } + + auto remove_bytes = evbuffer_remove(input, bb->ptr, bb->capacity); + bb->pos = remove_bytes; + bb->flip(); + + ctx->request_data_chunks.emplace_back(bb); + ctx->total_request_size += remove_bytes; + } +} + +Status StreamLoadForwardHandler::init_forward_request( + HttpRequest* req, const std::string& target_host, int target_port, + std::shared_ptr& ctx) { + ctx->original_req = req; + + struct event_base* ev_base = + evhttp_connection_get_base(evhttp_request_get_connection(req->get_evhttp_request())); + + struct evhttp_connection* conn = evhttp_connection_base_new(ev_base, + nullptr, // dns base + target_host.c_str(), target_port); + + if (!conn) { + return Status::InternalError("Failed to create connection to target server"); + } + + evhttp_connection_set_closecb(conn, forward_connection_close_cb, ctx.get()); + ctx->conn = conn; + + struct evhttp_request* forward_req = evhttp_request_new(forward_request_done, ctx.get()); + if (!forward_req) { + evhttp_connection_free(conn); + return Status::InternalError("Failed to create forward request"); + } + + evhttp_request_set_chunked_cb(forward_req, forward_request_chunked_cb); + + ctx->set_forward_request(forward_req); + return Status::OK(); +} + +void StreamLoadForwardHandler::forward_request_done(struct evhttp_request* req, void* arg) { + auto* ctx = static_cast(arg); + + if (!req) { + LOG(ERROR) << "Forward request failed - no response"; + evhttp_send_error(ctx->original_req->get_evhttp_request(), 503, + "Backend server unavailable"); + return; + } + + int response_code = evhttp_request_get_response_code(req); + const char* response_reason = evhttp_request_get_response_code_line(req); + + LOG(INFO) << "StreamLoadForward completed - " + << "status: " << response_code + << ", reason: " << (response_reason ? response_reason : "Unknown") + << ", response_size: " << ctx->response_data.size() << " bytes" + << ", path: " << ctx->original_req->raw_path(); + + send_complete_response(req, ctx, response_code); +} + +void StreamLoadForwardHandler::forward_request_chunked_cb(struct evhttp_request* req, void* arg) { + auto* ctx = static_cast(arg); + struct evbuffer* input_buffer = evhttp_request_get_input_buffer(req); + if (input_buffer) { + size_t data_len = evbuffer_get_length(input_buffer); + if (data_len > 0) { + // Read all available data and append to our response buffer + char* data = (char*)evbuffer_pullup(input_buffer, data_len); + if (data) { + ctx->response_data.append(data, data_len); + + // Remove the data from the buffer since we've copied it + evbuffer_drain(input_buffer, data_len); + } else { + LOG(WARNING) << "Failed to pullup " << data_len << " bytes from input buffer"; + } + } + } +} + +void StreamLoadForwardHandler::send_complete_response(struct evhttp_request* req, + StreamLoadForwardContext* ctx, + int response_code) { + struct evbuffer* response_body = evbuffer_new(); + if (!response_body) { + LOG(ERROR) << "Failed to create response buffer"; + HttpChannel::send_reply(ctx->original_req, HttpStatus::INTERNAL_SERVER_ERROR, + "Internal error: failed to create response buffer"); + return; + } + + size_t body_len = ctx->response_data.size(); + if (body_len > 0) { + evbuffer_add(response_body, ctx->response_data.c_str(), body_len); + } + + struct evkeyvalq* input_headers = evhttp_request_get_input_headers(req); + struct evkeyvalq* output_headers = + evhttp_request_get_output_headers(ctx->original_req->get_evhttp_request()); + + size_t final_body_len = evbuffer_get_length(response_body); + evhttp_add_header(output_headers, "Content-Length", std::to_string(final_body_len).c_str()); + + copy_response_headers(input_headers, output_headers); + + evhttp_send_reply(ctx->original_req->get_evhttp_request(), response_code, + evhttp_request_get_response_code_line(req), response_body); + + evbuffer_free(response_body); +} + +void StreamLoadForwardHandler::copy_response_headers(struct evkeyvalq* input_headers, + struct evkeyvalq* output_headers) { + // Copy headers from upstream, excluding specific ones we manage ourselves + for (struct evkeyval* header = input_headers->tqh_first; header != nullptr; + header = header->next.tqe_next) { + if (strcasecmp(header->key, "Transfer-Encoding") == 0 || + strcasecmp(header->key, "Content-Length") == 0 || + strcasecmp(header->key, "Date") == 0 || strcasecmp(header->key, "Server") == 0 || + strcasecmp(header->key, "Content-Type") == 0) { + continue; + } + const char* value = header->value ? header->value : ""; + evhttp_add_header(output_headers, header->key, value); + } +} + +void StreamLoadForwardHandler::forward_connection_close_cb(struct evhttp_connection* conn, + void* arg) { + auto* ctx = static_cast(arg); + if (!ctx) { + LOG(WARNING) << "Context is null in connection close callback"; + return; + } + + ctx->handle_connection_close(); +} + +Status StreamLoadForwardHandler::parse_forward_target(const std::string& forward_to, + std::string& host, int& port) { + size_t pos = forward_to.find(':'); + if (pos == std::string::npos) { + return Status::InvalidArgument("Invalid forward_to format, should be host:port, got: {}", + forward_to); + } + + host = forward_to.substr(0, pos); + std::string port_str = forward_to.substr(pos + 1); + + try { + port = std::stoi(port_str); + } catch (const std::exception& e) { + LOG(WARNING) << "Exception while parsing port: " << port_str << ", what(): " << e.what(); + return Status::InvalidArgument("Invalid port number in forward_to: {}, exception: {}", + port_str, e.what()); + } + + if (port <= 0 || port > 65535) { + return Status::InvalidArgument("Port number must be between 1 and 65535, got: {}", port); + } + + return Status::OK(); +} + +std::string StreamLoadForwardHandler::build_forward_url(HttpRequest* req) { + std::string url; + const std::string& raw_path = req->raw_path(); + + // Parse /api/{db}/{table}/ part + size_t pos = raw_path.find("/_stream_load_forward"); + if (pos != std::string::npos) { + // Keep path prefix, replace _stream_load_forward with _stream_load + url = raw_path.substr(0, pos) + "/_stream_load"; + } else { + // If not found, use original path + url = raw_path; + } + + // Remove forward_to parameter, keep other parameters + const auto& params = req->params(); + std::vector query_parts; + for (const auto& param : *params) { + if (param.first != "forward_to") { + query_parts.push_back(param.first + "=" + param.second); + } + } + + if (!query_parts.empty()) { + std::ostringstream oss; + for (size_t i = 0; i < query_parts.size(); ++i) { + if (i != 0) { + oss << "&"; + } + oss << query_parts[i]; + } + url += "?" + oss.str(); + } + + return url; +} + +void StreamLoadForwardHandler::setup_forward_headers(HttpRequest* req, + struct evhttp_request* forward_req, + const std::string& target_host, + int target_port) { + struct evkeyvalq* input_headers = evhttp_request_get_input_headers(req->get_evhttp_request()); + struct evkeyvalq* output_headers = evhttp_request_get_output_headers(forward_req); + + // Copy all headers from original request, except Host, Transfer-Encoding, and Content-Length + for (struct evkeyval* header = input_headers->tqh_first; header != nullptr; + header = header->next.tqe_next) { + // Skip headers that conflict with libevent's automatic handling + if (strcasecmp(header->key, "Host") == 0 || + strcasecmp(header->key, "Transfer-Encoding") == 0 || + strcasecmp(header->key, "Content-Length") == 0) { + continue; + } + evhttp_add_header(output_headers, header->key, header->value); + } + + // Set new Host header + evhttp_add_header(output_headers, "Host", + fmt::format("{}:{}", target_host, target_port).c_str()); + // Add forwarding related headers + evhttp_add_header(output_headers, "X-Forwarded-For", req->remote_host()); + evhttp_add_header(output_headers, "X-Forwarded-Proto", "http"); + evhttp_add_header(output_headers, "X-Forwarded-Host", + evhttp_request_get_host(req->get_evhttp_request())); +} + +} // namespace doris diff --git a/be/src/http/action/stream_load_forward_handler.h b/be/src/http/action/stream_load_forward_handler.h new file mode 100644 index 00000000000000..c5c7c674ddcdaa --- /dev/null +++ b/be/src/http/action/stream_load_forward_handler.h @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include +#include + +#include "common/status.h" +#include "http/http_handler.h" +#include "http/http_request.h" +#include "util/byte_buffer.h" + +namespace doris { + +// Context for storing stream load forward request information +class StreamLoadForwardContext { +public: + StreamLoadForwardContext() = default; + ~StreamLoadForwardContext() { + struct evhttp_connection* conn_to_free = nullptr; + { + std::lock_guard lock(_mutex); + // forward request will be released by libevent automatically + forward_req = nullptr; + if (conn && !connection_closed) { + // Only free connection if it wasn't already released by libevent + conn_to_free = conn; + conn = nullptr; + } + } + // Free connection outside of mutex to avoid deadlock with connection close callback + if (conn_to_free) { + evhttp_connection_free(conn_to_free); + } + } + + void set_forward_request(evhttp_request* req) { + std::lock_guard lock(_mutex); + forward_req = req; + } + + evhttp_request* get_forward_request() { + std::lock_guard lock(_mutex); + return forward_req; + } + + void handle_connection_close() { + std::lock_guard lock(_mutex); + connection_closed = true; + // Connection has been released by libevent automatically, set pointer to null + // to prevent double-free in destructor + conn = nullptr; + } + + bool is_connection_closed() const { + std::lock_guard lock(_mutex); + return connection_closed; + } + + struct evhttp_connection* conn {nullptr}; + // Original request reference, lifecycle managed by HTTP framework + HttpRequest* original_req {nullptr}; + + std::string target_host; + int target_port {0}; + + // Buffer for collecting response data + std::string response_data; + + std::deque request_data_chunks; + size_t total_request_size = 0; + +private: + mutable std::mutex _mutex; + struct evhttp_request* forward_req {nullptr}; + bool connection_closed {false}; +}; + +// Stream Load request forward handler +// Forwards Stream Load requests to other BE nodes +// Supports streaming forward, maintains original request path format: /api/{db}/{table}/_stream_load_forward +class StreamLoadForwardHandler : public HttpHandler { +public: + StreamLoadForwardHandler() = default; + ~StreamLoadForwardHandler() override = default; + + void handle(HttpRequest* req) override; + + bool request_will_be_read_progressively() override { return true; } + + int on_header(HttpRequest* req) override; + + void on_chunk_data(HttpRequest* req) override; + +private: + Status init_forward_request(HttpRequest* req, const std::string& target_host, int target_port, + std::shared_ptr& ctx); + + static void forward_request_done(struct evhttp_request* req, void* arg); + static void forward_request_chunked_cb(struct evhttp_request* req, void* arg); + static void forward_connection_close_cb(struct evhttp_connection* conn, void* arg); + + // Response helper functions + static void send_complete_response(struct evhttp_request* req, StreamLoadForwardContext* ctx, + int response_code); + static void copy_response_headers(struct evkeyvalq* input_headers, + struct evkeyvalq* output_headers); + + Status parse_forward_target(const std::string& forward_to, std::string& host, int& port); + + std::string build_forward_url(HttpRequest* req); + + void setup_forward_headers(HttpRequest* req, struct evhttp_request* forward_req, + const std::string& target_host, int target_port); +}; + +} // namespace doris diff --git a/be/src/http/http_request.cpp b/be/src/http/http_request.cpp index 14bde591b4ca1b..f743297801695b 100644 --- a/be/src/http/http_request.cpp +++ b/be/src/http/http_request.cpp @@ -110,6 +110,14 @@ const std::string& HttpRequest::param(const std::string& key) const { return iter->second; } +std::string HttpRequest::get_all_headers() const { + std::stringstream headers; + for (const auto& header : _headers) { + headers << header.first << ":" << header.second + ", "; + } + return headers.str(); +} + void HttpRequest::add_output_header(const char* key, const char* value) { evhttp_add_header(evhttp_request_get_output_headers(_ev_req), key, value); } diff --git a/be/src/http/http_request.h b/be/src/http/http_request.h index a9286410aff0a2..41d8cf98baaadd 100644 --- a/be/src/http/http_request.h +++ b/be/src/http/http_request.h @@ -55,6 +55,8 @@ class HttpRequest { // return params const StringCaseUnorderedMap& headers() { return _headers; } + std::string get_all_headers() const; + // return params std::map* params() { return &_params; } diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 32ea86b09b1d92..6c604dd09d006c 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -65,6 +65,7 @@ #include "http/action/snapshot_action.h" #include "http/action/stream_load.h" #include "http/action/stream_load_2pc.h" +#include "http/action/stream_load_forward_handler.h" #include "http/action/tablet_migration_action.h" #include "http/action/tablets_distribution_action.h" #include "http/action/tablets_info_action.h" @@ -133,6 +134,11 @@ Status HttpService::start() { _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load_2pc", streamload_2pc_action); + // register stream load forward handler + auto* forward_handler = _pool.add(new StreamLoadForwardHandler()); + _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load_forward", + forward_handler); + // register http_stream HttpStreamAction* http_stream_action = _pool.add(new HttpStreamAction(_env)); _ev_http_server->register_handler(HttpMethod::PUT, "/api/_http_stream", http_stream_action); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 12038c66c70c0e..e1058532df25f4 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -3253,6 +3253,14 @@ public static int metaServiceRpcRetryTimes() { + "public-private/public/private/direct/random-be and empty string" }) public static String streamload_redirect_policy = ""; + @ConfField(mutable = true, description = { + "存算分离模式下是否启用group commit的streamload BE转发功能。" + + "解决LB随机转发导致group commit攒批失效的问题,通过BE二次转发确保同表请求到达同一BE节点。", + "Whether to enable group commit streamload BE forward feature in cloud mode. " + + "Solves the issue where LB random forwarding breaks group commit batching " + + "by implementing BE-level forwarding to ensure same-table requests reach the same BE node." }) + public static boolean enable_group_commit_streamload_be_forward = false; + @ConfField(description = {"存算分离模式下建表是否检查残留recycler key, 默认true", "create table in cloud mode, check recycler key remained, default true"}) public static boolean check_create_table_recycle_key_remained = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 065c93bcc76dcd..b79b67fedcd5ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.cloud.qe.ComputeGroupException; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -324,7 +325,8 @@ private Object executeWithoutPassword(HttpServletRequest request, tableId = ((OlapTable) olapTable.get()).getId(); } - redirectAddr = selectRedirectBackend(request, groupCommit, tableId); + // Handle stream load with potential group commit forwarding + redirectAddr = handleStreamLoadRedirect(request, groupCommit, tableId, dbName, tableName, label); } if (LOG.isDebugEnabled()) { @@ -334,6 +336,9 @@ private Object executeWithoutPassword(HttpServletRequest request, RedirectView redirectView = redirectTo(request, redirectAddr); return redirectView; + } catch (StreamLoadForwardException e) { + // Special handling for stream load forwarding + return e.getRedirectView(); } catch (Exception e) { LOG.warn("load failed, stream: {}, db: {}, tbl: {}, label: {}, err: {}", isStreamLoad, db, table, label, e.getMessage()); @@ -398,6 +403,11 @@ private String getCloudClusterName(HttpServletRequest request) { private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit, long tableId) throws LoadException { + return selectRedirectBackend(request, groupCommit, tableId, null); + } + + private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolean groupCommit, long tableId, + Backend preSelectedBackend) throws LoadException { long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L); if (debugBackendId != -1L) { Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId); @@ -408,17 +418,17 @@ private TNetworkAddress selectRedirectBackend(HttpServletRequest request, boolea if (Strings.isNullOrEmpty(cloudClusterName)) { throw new LoadException("No cloud cluster name selected."); } - return selectCloudRedirectBackend(cloudClusterName, request, groupCommit, tableId); + return selectCloudRedirectBackend(cloudClusterName, request, groupCommit, tableId, preSelectedBackend); } else { if (groupCommit && tableId == -1) { throw new LoadException("Group commit table id wrong."); } - return selectLocalRedirectBackend(groupCommit, request, tableId); + return selectLocalRedirectBackend(groupCommit, request, tableId, preSelectedBackend); } } - private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServletRequest request, long tableId) - throws LoadException { + private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServletRequest request, long tableId, + Backend preSelectedBackend) throws LoadException { Backend backend = null; BeSelectionPolicy policy = null; String qualifiedUser = ConnectContext.get().getQualifiedUser(); @@ -438,7 +448,9 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } if (groupCommit) { - backend = selectBackendForGroupCommit("", request, tableId); + // Use pre-selected backend if provided to avoid duplicate calls + backend = preSelectedBackend != null ? preSelectedBackend + : selectBackendForGroupCommit("", request, tableId); } else { backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); } @@ -449,11 +461,12 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ } private TNetworkAddress selectCloudRedirectBackend(String clusterName, HttpServletRequest req, boolean groupCommit, - long tableId) - throws LoadException { + long tableId, Backend preSelectedBackend) throws LoadException { Backend backend = null; if (groupCommit) { - backend = selectBackendForGroupCommit(clusterName, req, tableId); + // Use pre-selected backend if provided to avoid duplicate calls + backend = preSelectedBackend != null ? preSelectedBackend + : selectBackendForGroupCommit(clusterName, req, tableId); } else { backend = StreamLoadHandler.selectBackend(clusterName); } @@ -917,4 +930,156 @@ public Object updateIngestionLoad(HttpServletRequest request, HttpServletRespons } + /* + * Create redirect URL for stream load forward mode. + * + * This method constructs the special redirect URL used in the group commit forwarding mechanism: + * + * Key modifications to the standard redirect: + * 1. Path modification: Changes "/_stream_load" to "/_stream_load_forward" + * - This tells the receiving BE that it needs to perform additional forwarding + * - The "_stream_load_forward" endpoint is specifically designed to handle forwarding logic + * + * 2. Forward target parameter: Adds "forward_to=host:port" to the query string + * - Specifies the actual target BE node that should process this request + * - Ensures all requests for the same table reach the same BE for optimal batching + * + * 3. Authentication preservation: Maintains user authentication in the URL if present + * - Ensures the forwarded request has proper authentication context + * + * Example transformation: + * Original: http://endpoint:port/api/db/table/_stream_load?param=value + * Forward: http://endpoint:port/api/db/table/_stream_load_forward?param=value&forward_to=target_be:port + * + * @param request the original HTTP request + * @param addr the endpoint address to redirect to (public/private endpoint) + * @param forwardTarget the target BE node in "host:port" format for final processing + * @return RedirectView configured for stream load forwarding + */ + private RedirectView redirectToStreamLoadForward(HttpServletRequest request, TNetworkAddress addr, + String forwardTarget) { + URI urlObj = null; + URI resultUriObj = null; + String urlStr = request.getRequestURI(); + String userInfo = null; + String modifiedPath = null; + + if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) { + ActionAuthorizationInfo authInfo = getAuthorizationInfo(request); + userInfo = ClusterNamespace.getNameFromFullName(authInfo.fullUserName) + + ":" + authInfo.password; + } + try { + urlObj = new URI(urlStr); + // Replace _stream_load with _stream_load_forward in the path + modifiedPath = urlObj.getPath().replace("/_stream_load", "/_stream_load_forward"); + resultUriObj = new URI("http", userInfo, addr.getHostname(), + addr.getPort(), modifiedPath, "", null); + } catch (Exception e) { + throw new RuntimeException(e); + } + String redirectUrl = resultUriObj.toASCIIString(); + + // Add forward_to parameter (note: toASCIIString() already includes '?' due to empty query) + String queryString = request.getQueryString(); + if (!Strings.isNullOrEmpty(queryString)) { + redirectUrl += queryString + "&forward_to=" + forwardTarget; + } else { + redirectUrl += "forward_to=" + forwardTarget; + } + + LOG.info("Redirect stream load forward url: {}, forward_to: {}", + "http://" + addr.getHostname() + ":" + addr.getPort() + modifiedPath, forwardTarget); + RedirectView redirectView = new RedirectView(redirectUrl); + redirectView.setContentType("text/html;charset=utf-8"); + redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT); + return redirectView; + } + + /** + * Handle stream load redirect with optional group commit forwarding. + * + * Group Commit Stream Load Forward Mode in Cloud Environment: + * + * Problem: + * Group commit requires that requests for the same table be sent to the same BE node + * to achieve better batching efficiency. However, in cloud mode with Load Balancer (LB), + * the LB randomly selects a BE node for forwarding, which breaks the group commit strategy + * and reduces batching effectiveness. + * + * Solution: + * Implement a two-stage forwarding mechanism: + * 1. FE redirects to public/private endpoint (LB) as usual + * 2. BE performs a second forwarding to the actual target BE node that handles the specific table + * + * This ensures that all requests for the same table ultimately reach the same BE node, + * preserving the group commit batching strategy while still utilizing the LB infrastructure. + * + * @param request the HTTP request + * @param groupCommit whether group commit is enabled + * @param tableId the table ID for group commit + * @param dbName database name for logging + * @param tableName table name for logging + * @param label label for logging + * @return redirect address for normal redirect + * @throws StreamLoadForwardException if forward redirect is applied + * @throws LoadException if redirect selection fails + */ + private TNetworkAddress handleStreamLoadRedirect(HttpServletRequest request, boolean groupCommit, + long tableId, String dbName, String tableName, String label) throws LoadException { + + // Check if group commit forwarding is needed + if (!Config.isCloudMode() || !groupCommit || !Config.enable_group_commit_streamload_be_forward) { + return selectRedirectBackend(request, groupCommit, tableId); + } + + String cloudClusterName = getCloudClusterName(request); + if (Strings.isNullOrEmpty(cloudClusterName)) { + throw new LoadException("No cloud cluster name selected for group commit forwarding."); + } + + // Get target backend for group commit + Backend targetBackend = selectBackendForGroupCommit(cloudClusterName, request, tableId); + if (targetBackend == null) { + throw new LoadException("Failed to select target backend for group commit forwarding."); + } + + // Get redirect address with optimized backend selection + TNetworkAddress redirectAddr = selectCloudRedirectBackend(cloudClusterName, request, groupCommit, tableId, + targetBackend); + TNetworkAddress targetAddr = new TNetworkAddress(targetBackend.getHost(), targetBackend.getHttpPort()); + + // Apply forwarding if addresses differ (compare hostname and port directly) + if (!redirectAddr.getHostname().equals(targetAddr.getHostname()) + || redirectAddr.getPort() != targetAddr.getPort()) { + // Apply stream load forwarding by throwing StreamLoadForwardException with RedirectView + String forwardTarget = targetAddr.getHostname() + ":" + targetAddr.getPort(); + RedirectView forwardRedirectView = redirectToStreamLoadForward(request, redirectAddr, forwardTarget); + + LOG.info("Using stream load forward mode for cloud group commit - " + + "db: {}, tbl: {}, label: {}, endpoint: {}, forward_to: {}, reason: redirect_differs_from_target", + dbName, tableName, label, redirectAddr.toString(), forwardTarget); + + throw new StreamLoadForwardException(forwardRedirectView); + } else { + LOG.debug("Skip stream load forward - redirect address matches target backend: {}", + redirectAddr.toString()); + return redirectAddr; + } + } + + /** + * Special exception to carry RedirectView for stream load forwarding. + */ + private static class StreamLoadForwardException extends RuntimeException { + private final RedirectView redirectView; + + public StreamLoadForwardException(RedirectView redirectView) { + this.redirectView = redirectView; + } + + public RedirectView getRedirectView() { + return redirectView; + } + } } diff --git a/regression-test/data/load_p0/stream_load/test_group_commit_redirect.out b/regression-test/data/load_p0/stream_load/test_group_commit_redirect.out new file mode 100644 index 00000000000000..abb03a16633fb5 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_group_commit_redirect.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +5 e -1 +5 e -1 +5 e -1 +6 f -1 +6 f -1 +6 f -1 + diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_redirect.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_redirect.groovy new file mode 100644 index 00000000000000..c7b7b6c5a94c53 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_redirect.groovy @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite('test_group_commit_redirect', 'docker') { + def databaseName = context.config.getDbNameByFile(context.file) + def tableName = "tbl" + def getRedirectLocation = { feIp, fePort, redirectPolicy, mode -> + def command = """ curl -v --max-redirs 0 --location-trusted -u root: + -H redirect-policy:$redirectPolicy + -H group_commit:$mode + -T ${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv + http://${feIp}:${fePort}/api/${databaseName}/${tableName}/_stream_load """ + log.info("redirect command: ${command}") + + def code = -1 + def location = "" + try { + def process = command.execute() + code = process.waitFor() + // Parse Location from stderr since curl -v outputs headers to stderr + def errorOutput = process.err.text + def locationLine = errorOutput.readLines().find { it.trim().startsWith('< Location: ') } + if (locationLine) { + location = locationLine.trim().substring('< Location: '.length()) + } + log.info("curl output: ${process.text}") + log.info("curl error: ${errorOutput}") + } catch (Exception e) { + log.info("exception: ${e}".toString()) + } + return location + } + + def groupCommitRedicetSteamLoad = { beIp, bePort, targetBe, targetBePort -> + def command = """ curl -v --location-trusted -u root: + -H group_commit:async_mode + -H column_separator:, + -H columns:id,name + -T ${context.config.dataPath}/load_p0/stream_load/test_stream_load1.csv + http://${beIp}:${bePort}/api/${databaseName}/${tableName}/_stream_load_forward?forward_to=${targetBe}:${targetBePort} """ + log.info("redirect command: ${command}") + + def process = command.execute() + def code = process.waitFor() + def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + def out = process.getText() + logger.info("code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def json = parseJson(out) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(2, json.NumberTotalRows) + assertEquals(2, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + + def getRowCount = { expectedRowCount -> + def retry = 0 + while (retry < 30) { + sleep(2000) + def rowCount = sql "select count(*) from ${tableName}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + retry++ + } + } + + // cloud mode + def options = new ClusterOptions() + options.feNum = 1 + options.beNum = 3 + options.cloudMode = true + options.beConfigs.add('enable_java_support=false') + options.feConfigs.add('enable_group_commit_streamload_be_forward=true') + docker(options) { + // get fe and be info + def feIp = cluster.getMasterFe().getHttpAddress()[0] + def fePort = cluster.getMasterFe().getHttpAddress()[1] + + def be1Ip = cluster.getBackends().get(0).getHttpAddress()[0] + def be1HttpPort = cluster.getBackends().get(0).getHttpAddress()[1] + def be1HeartbeatPort = cluster.getBackends().get(0).getHeartbeatPort() + + def be2Ip = cluster.getBackends().get(1).getHttpAddress()[0] + def be2HttpPort = cluster.getBackends().get(1).getHttpAddress()[1] + def be2HeartbeatPort = cluster.getBackends().get(1).getHeartbeatPort() + + def be3Ip = cluster.getBackends().get(2).getHttpAddress()[0] + def be3HttpPort = cluster.getBackends().get(2).getHttpAddress()[1] + def be3HeartbeatPort = cluster.getBackends().get(2).getHeartbeatPort() + + def msIp = cluster.getMetaservices().get(0).getHttpAddress()[0] + def msPort = cluster.getMetaservices().get(0).getHttpAddress()[1] + + sql """ CREATE DATABASE IF NOT EXISTS ${databaseName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${databaseName}.${tableName} ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "200" + ); + """ + // test be forward stream load + groupCommitRedicetSteamLoad(be1Ip, be1HttpPort, be1Ip, be1HttpPort) + groupCommitRedicetSteamLoad(be2Ip, be2HttpPort, be1Ip, be1HttpPort) + groupCommitRedicetSteamLoad(be2Ip, be2HttpPort, be1Ip, be1HttpPort) + + getRowCount(6) + qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; " + + // test fe redirect policy + log.info("Initial cluster setup - 3 BEs") + log.info("BE1: ${be1Ip}:${be1HeartbeatPort}, BE2: ${be2Ip}:${be2HeartbeatPort}, BE3: ${be3Ip}:${be3HeartbeatPort}") + sql """show backends""" + + log.info("Dropping all BE") + sql """ALTER SYSTEM DROPP BACKEND '${be1Ip}:${be1HeartbeatPort}'""" + sql """ALTER SYSTEM DROPP BACKEND '${be2Ip}:${be2HeartbeatPort}'""" + sql """ALTER SYSTEM DROPP BACKEND '${be3Ip}:${be3HeartbeatPort}'""" + log.info("after dropping all BE: ${sql """show backends""" }") + + log.info("Adding BE1 BE2 BE3 back with different custom endpoints") + sql """ALTER SYSTEM ADD BACKEND '${be1Ip}:${be1HeartbeatPort}','${be2Ip}:${be2HeartbeatPort}','${be3Ip}:${be3HeartbeatPort}' properties('tag.public_endpoint' = '12.20.20.20:8030', 'tag.private_endpoint' = '11.20.20.19:8040', "tag.compute_group_name" = "compute_cluster")""" + + sleep(30000) + + log.info("Final backends configuration: ${sql """show backends""" }") + + // Test redirect locations - should use one of the available BEs + def location = getRedirectLocation(feIp, fePort, "public", "async_mode") + log.info("public location: ${location}") + // redirect url: http://endpoint:port/api/db/table/_stream_load_forward?param=value&forward_to=target_be:port + assertTrue(location.contains("12.20.20.20:8030/api/$databaseName/$tableName/_stream_load_forward?") && (location.contains("forward_to=${be1Ip}:${be1HttpPort}") || location.contains("forward_to=${be2Ip}:${be2HttpPort}") || location.contains("forward_to=${be3Ip}:${be3HttpPort}"))) + + location = getRedirectLocation(feIp, fePort, "private", "sync_mode") + log.info("private location: ${location}") + assertTrue(location.contains("11.20.20.19:8040/api/$databaseName/$tableName/_stream_load_forward?") && (location.contains("forward_to=${be1Ip}:${be1HttpPort}") || location.contains("forward_to=${be2Ip}:${be2HttpPort}") || location.contains("forward_to=${be3Ip}:${be3HttpPort}"))) + + location = getRedirectLocation(feIp, fePort, "", "async_mode") + log.info("default location: ${location}") + assertTrue(location.contains("11.20.20.19:8040/api/$databaseName/$tableName/_stream_load_forward?") && (location.contains("forward_to=${be1Ip}:${be1HttpPort}") || location.contains("forward_to=${be2Ip}:${be2HttpPort}") || location.contains("forward_to=${be3Ip}:${be3HttpPort}"))) + + location = getRedirectLocation(feIp, fePort, "public", "off_mode") + log.info("public location: ${location}") + assertTrue(location.contains("12.20.20.20:8030/api/$databaseName/$tableName/_stream_load")) + + location = getRedirectLocation(feIp, fePort, "private", "off_mode") + log.info("public location: ${location}") + assertTrue(location.contains("11.20.20.19:8040/api/$databaseName/$tableName/_stream_load")) + + location = getRedirectLocation(feIp, fePort, "", "off_mode") + log.info("public location: ${location}") + assertTrue(location.contains("11.20.20.19:8040/api/$databaseName/$tableName/_stream_load")) + + } + +}