diff --git a/src/Common/sendRequest.cpp b/src/Common/sendRequest.cpp index 8ae54f7b2ed..33cc74c1b26 100644 --- a/src/Common/sendRequest.cpp +++ b/src/Common/sendRequest.cpp @@ -33,10 +33,10 @@ std::pair sendRequest( const String & password, const String & payload, const std::vector> & headers, - Poco::Logger * log) -{ + Poco::Logger * log, /// One second for connect/send/receive - ConnectionTimeouts timeouts({2, 0}, {5, 0}, {10, 0}); + ConnectionTimeouts timeouts) +{ PooledHTTPSessionPtr session; try @@ -109,6 +109,18 @@ std::pair sendRequest( { session->attachSessionData(e.message()); } + if (e.code() == 1000){ + LOG_ERROR( + log, + "Execution Timeout from uri={} method={} payload={} query_id={} error={} exception={}", + uri.toString(), + method, + payload, + query_id, + e.message(), + getCurrentExceptionMessage(true, true)); + return {"Execution Timeout", toHTTPCode(e)}; + } LOG_ERROR( log, diff --git a/src/Common/sendRequest.h b/src/Common/sendRequest.h index 8ad3694ab3a..37068c81ed6 100644 --- a/src/Common/sendRequest.h +++ b/src/Common/sendRequest.h @@ -2,6 +2,8 @@ #include +#include + #include #include @@ -23,5 +25,6 @@ std::pair sendRequest( const String & password, const String & payload, const std::vector> & headers, - Poco::Logger * log); + Poco::Logger * log, + ConnectionTimeouts timeouts = ConnectionTimeouts({2, 0}, {5, 0}, {10, 0})); } diff --git a/src/Functions/UserDefined/RemoteUserDefinedFunction.h b/src/Functions/UserDefined/RemoteUserDefinedFunction.h index ff683f3e9ba..656d38f5468 100644 --- a/src/Functions/UserDefined/RemoteUserDefinedFunction.h +++ b/src/Functions/UserDefined/RemoteUserDefinedFunction.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -53,7 +54,8 @@ class RemoteUserDefinedFunction final : public UserDefinedFunctionBase "", out, {{config.auth_context.key_name, config.auth_context.key_value}, {"", context->getCurrentQueryId()}}, - &Poco::Logger::get("UserDefinedFunction")); + &Poco::Logger::get("UserDefinedFunction"), + ConnectionTimeouts({2, 0}, {5, 0}, {static_cast(config.command_execution_timeout_milliseconds / 1000), static_cast((config.command_execution_timeout_milliseconds % 1000u) * 1000u)})); if (http_status != Poco::Net::HTTPResponse::HTTP_OK) throw Exception( diff --git a/src/Functions/UserDefined/UDFHelper.cpp b/src/Functions/UserDefined/UDFHelper.cpp index fec7addfef5..8bd1957387d 100644 --- a/src/Functions/UserDefined/UDFHelper.cpp +++ b/src/Functions/UserDefined/UDFHelper.cpp @@ -255,6 +255,7 @@ createUserDefinedExecutableFunction(ContextPtr context, const std::string & name throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid url for remote UDF, msg: {}", e.message()); } cfg->command_read_timeout_milliseconds = config.getUInt64(key_in_config + ".command_read_timeout", 10000); + cfg->command_execution_timeout_milliseconds = config.getUInt64(key_in_config + ".command_execution_timeout", 60000); cfg->auth_method = std::move(auth_method); cfg->auth_context = std::move(auth_ctx); }; diff --git a/src/Functions/UserDefined/UserDefinedFunctionConfiguration.h b/src/Functions/UserDefined/UserDefinedFunctionConfiguration.h index 67bf34813f9..9f273334ac8 100644 --- a/src/Functions/UserDefined/UserDefinedFunctionConfiguration.h +++ b/src/Functions/UserDefined/UserDefinedFunctionConfiguration.h @@ -80,6 +80,8 @@ struct RemoteUserDefinedFunctionConfiguration : public UserDefinedFunctionConfig /// Timeout for reading data from input format size_t command_read_timeout_milliseconds = 10000; + size_t command_execution_timeout_milliseconds = 60000; + /// url of remote endpoint, only available when 'type' is 'remote' Poco::URI url; enum AuthMethod diff --git a/src/Parsers/ASTCreateFunctionQuery.cpp b/src/Parsers/ASTCreateFunctionQuery.cpp index b9478dedc3c..d7cf4234537 100644 --- a/src/Parsers/ASTCreateFunctionQuery.cpp +++ b/src/Parsers/ASTCreateFunctionQuery.cpp @@ -81,6 +81,7 @@ void ASTCreateFunctionQuery::formatImpl(const IAST::FormatSettings & settings, I settings.ostr << fmt::format("AUTH_HEADER '{}'\n", function_core->children[1]->as()->value.safeGet()); settings.ostr << fmt::format("AUTH_KEY '{}'\n", function_core->children[2]->as()->value.safeGet()); } + settings.ostr << fmt::format("EXECUTION_TIMEOUT {}", function_core->children.back()->as()->value.safeGet()); return; } /// proton: ends @@ -167,6 +168,8 @@ Poco::JSON::Object::Ptr ASTCreateFunctionQuery::toJSON() const auth_context->set("key_value", function_core->children[2]->as()->value.safeGet()); inner_func->set("auth_context", auth_context); } + auto execution_timeout = function_core->children.back()->as()->value.safeGet(); + inner_func->set("command_execution_timeout", execution_timeout); } func->set("function", inner_func); /// Remote function don't have source, return early. diff --git a/src/Parsers/ParserCreateFunctionQuery.cpp b/src/Parsers/ParserCreateFunctionQuery.cpp index 8be17aa39ff..5d4ec99de0d 100644 --- a/src/Parsers/ParserCreateFunctionQuery.cpp +++ b/src/Parsers/ParserCreateFunctionQuery.cpp @@ -40,11 +40,13 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp ParserKeyword s_auth_method("AUTH_METHOD"); ParserKeyword s_auth_header("AUTH_HEADER"); ParserKeyword s_auth_key("AUTH_KEY"); + ParserKeyword s_execution_timeout("EXECUTION_TIMEOUT"); ParserLiteral value; ASTPtr url; ASTPtr auth_method; ASTPtr auth_header; ASTPtr auth_key; + ASTPtr execution_timeout; ParserArguments arguments_p; ParserDataType return_p; ParserStringLiteral js_src_p; @@ -167,6 +169,20 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp throw Exception("AUTH_METHOD must be 'none' or 'auth_header'", ErrorCodes::UNKNOWN_FUNCTION); } } + else + { + url->children.push_back(std::make_shared(Field{String("none")})); + } + if (s_execution_timeout.ignore(pos, expected)) + { + if(!value.parse(pos, execution_timeout, expected)) + return false; + url->children.push_back(std::move(execution_timeout)); + } + else + { + url->children.push_back(std::make_shared(Field{UInt64(60000)})); + } function_core = std::move(url); } /// proton: ends diff --git a/src/Parsers/tests/gtest_create_remote_func_parser.cpp b/src/Parsers/tests/gtest_create_remote_func_parser.cpp index 8d9c218dde0..d61f1cc2969 100644 --- a/src/Parsers/tests/gtest_create_remote_func_parser.cpp +++ b/src/Parsers/tests/gtest_create_remote_func_parser.cpp @@ -41,7 +41,7 @@ TEST(ParserCreateRemoteFunctionQuery, UDFNoHeaderMethod) EXPECT_EQ( remote_func_settings->as()->value.safeGet(), "https://hn6wip76uexaeusz5s7bh3e4u40lrrrz.lambda-url.us-west-2.on.aws/"); - EXPECT_TRUE(remote_func_settings->children.empty()); + EXPECT_TRUE(remote_func_settings->children.size() == 2); } TEST(ParserCreateRemoteFunctionQuery, UDFHeaderMethodIsNone) @@ -68,7 +68,7 @@ TEST(ParserCreateRemoteFunctionQuery, UDFHeaderMethodIsNone) EXPECT_EQ( remote_func_settings->as()->value.safeGet(), "https://hn6wip76uexaeusz5s7bh3e4u40lrrrz.lambda-url.us-west-2.on.aws/"); - EXPECT_EQ(remote_func_settings->children.size(), 1); + EXPECT_EQ(remote_func_settings->children.size(), 2); EXPECT_EQ(remote_func_settings->children[0]->as()->value.safeGet(), "none"); // Auth method } @@ -78,7 +78,8 @@ TEST(ParserCreateRemoteFunctionQuery, UDFHeaderMethodIsAuthHeader) "URL 'https://hn6wip76uexaeusz5s7bh3e4u40lrrrz.lambda-url.us-west-2.on.aws/'" "AUTH_METHOD 'auth_header'" "AUTH_HEADER 'auth'" - "AUTH_KEY 'proton'"; + "AUTH_KEY 'proton'" + "EXECUTION_TIMEOUT 10000"; ParserCreateFunctionQuery parser; ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, 0); ASTCreateFunctionQuery * create = ast->as(); @@ -99,10 +100,11 @@ TEST(ParserCreateRemoteFunctionQuery, UDFHeaderMethodIsAuthHeader) EXPECT_EQ( remote_func_settings->as()->value.safeGet(), "https://hn6wip76uexaeusz5s7bh3e4u40lrrrz.lambda-url.us-west-2.on.aws/"); - EXPECT_EQ(remote_func_settings->children.size(), 3); + EXPECT_EQ(remote_func_settings->children.size(), 4); EXPECT_EQ(remote_func_settings->children[0]->as()->value.safeGet(), "auth_header"); /// Auth method EXPECT_EQ(remote_func_settings->children[1]->as()->value.safeGet(), "auth"); /// auth_header EXPECT_EQ(remote_func_settings->children[2]->as()->value.safeGet(), "proton"); /// auth key + EXPECT_EQ(remote_func_settings->children[3]->as()->value.safeGet(), 10000u); } diff --git a/tests/stream/test_stream_smoke/0022_udf3_create_remote_func.yaml b/tests/stream/test_stream_smoke/0022_udf3_create_remote_func.yaml index 2f7a5c62fc5..a2ebd570f14 100644 --- a/tests/stream/test_stream_smoke/0022_udf3_create_remote_func.yaml +++ b/tests/stream/test_stream_smoke/0022_udf3_create_remote_func.yaml @@ -91,7 +91,8 @@ tests: URL 'https://hn6wip76uexaeusz5s7bh3e4u40lrrrz.lambda-url.us-west-2.on.aws/' AUTH_METHOD 'auth_header' AUTH_HEADER 'auth' - AUTH_KEY 'proton'; + AUTH_KEY 'proton' + EXECUTION_TIMEOUT 60000; - client: python query_id: udf-30-1 query_end_timer: 7