Skip to content

Commit

Permalink
feature: add execution timeout for remote UDFs
Browse files Browse the repository at this point in the history
  • Loading branch information
amamiya-len committed Aug 23, 2024
1 parent 75a150a commit 4ffb74a
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 10 deletions.
18 changes: 15 additions & 3 deletions src/Common/sendRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ std::pair<String, Int32> sendRequest(
const String & password,
const String & payload,
const std::vector<std::pair<String, String>> & headers,
Poco::Logger * log)
{
Poco::Logger * log,
/// One second for connect/send/receive
ConnectionTimeouts timeouts({2, 0}, {5, 0}, {10, 0});
ConnectionTimeouts timeouts)
{

PooledHTTPSessionPtr session;
try
Expand Down Expand Up @@ -109,6 +109,18 @@ std::pair<String, Int32> sendRequest(
{
session->attachSessionData(e.message());
}
if (e.code() == 1000){
LOG_ERROR(
log,
"Execution Timeout from uri={} method={} payload={} query_id={} error={} exception={}",
uri.toString(),
method,
payload,
query_id,
e.message(),
getCurrentExceptionMessage(true, true));
return {"Execution Timeout", toHTTPCode(e)};
}

LOG_ERROR(
log,
Expand Down
5 changes: 4 additions & 1 deletion src/Common/sendRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <base/types.h>

#include <IO/ConnectionTimeouts.h>

#include <Poco/URI.h>

#include <utility>
Expand All @@ -23,5 +25,6 @@ std::pair<String, Int32> sendRequest(
const String & password,
const String & payload,
const std::vector<std::pair<String, String>> & headers,
Poco::Logger * log);
Poco::Logger * log,
ConnectionTimeouts timeouts = ConnectionTimeouts({2, 0}, {5, 0}, {10, 0}));
}
4 changes: 3 additions & 1 deletion src/Functions/UserDefined/RemoteUserDefinedFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Functions/UserDefined/UserDefinedFunctionBase.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ConnectionTimeouts.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Common/sendRequest.h>
Expand Down Expand Up @@ -53,7 +54,8 @@ class RemoteUserDefinedFunction final : public UserDefinedFunctionBase
"",
out,
{{config.auth_context.key_name, config.auth_context.key_value}, {"", context->getCurrentQueryId()}},
&Poco::Logger::get("UserDefinedFunction"));
&Poco::Logger::get("UserDefinedFunction"),
ConnectionTimeouts({2, 0}, {5, 0}, {static_cast<long>(config.command_execution_timeout_milliseconds / 1000), static_cast<long>((config.command_execution_timeout_milliseconds % 1000u) * 1000u)}));

if (http_status != Poco::Net::HTTPResponse::HTTP_OK)
throw Exception(
Expand Down
1 change: 1 addition & 0 deletions src/Functions/UserDefined/UDFHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ createUserDefinedExecutableFunction(ContextPtr context, const std::string & name
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid url for remote UDF, msg: {}", e.message());
}
cfg->command_read_timeout_milliseconds = config.getUInt64(key_in_config + ".command_read_timeout", 10000);
cfg->command_execution_timeout_milliseconds = config.getUInt64(key_in_config + ".command_execution_timeout", 60000);
cfg->auth_method = std::move(auth_method);
cfg->auth_context = std::move(auth_ctx);
};
Expand Down
2 changes: 2 additions & 0 deletions src/Functions/UserDefined/UserDefinedFunctionConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ struct RemoteUserDefinedFunctionConfiguration : public UserDefinedFunctionConfig
/// Timeout for reading data from input format
size_t command_read_timeout_milliseconds = 10000;

size_t command_execution_timeout_milliseconds = 60000;

/// url of remote endpoint, only available when 'type' is 'remote'
Poco::URI url;
enum AuthMethod
Expand Down
3 changes: 3 additions & 0 deletions src/Parsers/ASTCreateFunctionQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void ASTCreateFunctionQuery::formatImpl(const IAST::FormatSettings & settings, I
settings.ostr << fmt::format("AUTH_HEADER '{}'\n", function_core->children[1]->as<ASTLiteral>()->value.safeGet<String>());
settings.ostr << fmt::format("AUTH_KEY '{}'\n", function_core->children[2]->as<ASTLiteral>()->value.safeGet<String>());
}
settings.ostr << fmt::format("EXECUTION_TIMEOUT {}", function_core->children.back()->as<ASTLiteral>()->value.safeGet<UInt64>());
return;
}
/// proton: ends
Expand Down Expand Up @@ -167,6 +168,8 @@ Poco::JSON::Object::Ptr ASTCreateFunctionQuery::toJSON() const
auth_context->set("key_value", function_core->children[2]->as<ASTLiteral>()->value.safeGet<String>());
inner_func->set("auth_context", auth_context);
}
auto execution_timeout = function_core->children.back()->as<ASTLiteral>()->value.safeGet<UInt64>();
inner_func->set("command_execution_timeout", execution_timeout);
}
func->set("function", inner_func);
/// Remote function don't have source, return early.
Expand Down
16 changes: 16 additions & 0 deletions src/Parsers/ParserCreateFunctionQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
ParserKeyword s_auth_method("AUTH_METHOD");
ParserKeyword s_auth_header("AUTH_HEADER");
ParserKeyword s_auth_key("AUTH_KEY");
ParserKeyword s_execution_timeout("EXECUTION_TIMEOUT");
ParserLiteral value;
ASTPtr url;
ASTPtr auth_method;
ASTPtr auth_header;
ASTPtr auth_key;
ASTPtr execution_timeout;
ParserArguments arguments_p;
ParserDataType return_p;
ParserStringLiteral js_src_p;
Expand Down Expand Up @@ -167,6 +169,20 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
throw Exception("AUTH_METHOD must be 'none' or 'auth_header'", ErrorCodes::UNKNOWN_FUNCTION);
}
}
else
{
url->children.push_back(std::make_shared<ASTLiteral>(Field{String("none")}));
}
if (s_execution_timeout.ignore(pos, expected))
{
if(!value.parse(pos, execution_timeout, expected))
return false;
url->children.push_back(std::move(execution_timeout));
}
else
{
url->children.push_back(std::make_shared<ASTLiteral>(Field{UInt64(60000)}));
}
function_core = std::move(url);
}
/// proton: ends
Expand Down
10 changes: 6 additions & 4 deletions src/Parsers/tests/gtest_create_remote_func_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ TEST(ParserCreateRemoteFunctionQuery, UDFNoHeaderMethod)
EXPECT_EQ(
remote_func_settings->as<ASTLiteral>()->value.safeGet<String>(),
"https://hn6wip76uexaeusz5s7bh3e4u40lrrrz.lambda-url.us-west-2.on.aws/");
EXPECT_TRUE(remote_func_settings->children.empty());
EXPECT_TRUE(remote_func_settings->children.size() == 2);
}

TEST(ParserCreateRemoteFunctionQuery, UDFHeaderMethodIsNone)
Expand All @@ -68,7 +68,7 @@ TEST(ParserCreateRemoteFunctionQuery, UDFHeaderMethodIsNone)
EXPECT_EQ(
remote_func_settings->as<ASTLiteral>()->value.safeGet<String>(),
"https://hn6wip76uexaeusz5s7bh3e4u40lrrrz.lambda-url.us-west-2.on.aws/");
EXPECT_EQ(remote_func_settings->children.size(), 1);
EXPECT_EQ(remote_func_settings->children.size(), 2);
EXPECT_EQ(remote_func_settings->children[0]->as<ASTLiteral>()->value.safeGet<String>(), "none"); // Auth method
}

Expand All @@ -78,7 +78,8 @@ TEST(ParserCreateRemoteFunctionQuery, UDFHeaderMethodIsAuthHeader)
"URL 'https://hn6wip76uexaeusz5s7bh3e4u40lrrrz.lambda-url.us-west-2.on.aws/'"
"AUTH_METHOD 'auth_header'"
"AUTH_HEADER 'auth'"
"AUTH_KEY 'proton'";
"AUTH_KEY 'proton'"
"EXECUTION_TIMEOUT 10000";
ParserCreateFunctionQuery parser;
ASTPtr ast = parseQuery(parser, input.data(), input.data() + input.size(), "", 0, 0);
ASTCreateFunctionQuery * create = ast->as<ASTCreateFunctionQuery>();
Expand All @@ -99,10 +100,11 @@ TEST(ParserCreateRemoteFunctionQuery, UDFHeaderMethodIsAuthHeader)
EXPECT_EQ(
remote_func_settings->as<ASTLiteral>()->value.safeGet<String>(),
"https://hn6wip76uexaeusz5s7bh3e4u40lrrrz.lambda-url.us-west-2.on.aws/");
EXPECT_EQ(remote_func_settings->children.size(), 3);
EXPECT_EQ(remote_func_settings->children.size(), 4);
EXPECT_EQ(remote_func_settings->children[0]->as<ASTLiteral>()->value.safeGet<String>(), "auth_header"); /// Auth method
EXPECT_EQ(remote_func_settings->children[1]->as<ASTLiteral>()->value.safeGet<String>(), "auth"); /// auth_header
EXPECT_EQ(remote_func_settings->children[2]->as<ASTLiteral>()->value.safeGet<String>(), "proton"); /// auth key
EXPECT_EQ(remote_func_settings->children[3]->as<ASTLiteral>()->value.safeGet<UInt64>(), 10000u);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ tests:
URL 'https://hn6wip76uexaeusz5s7bh3e4u40lrrrz.lambda-url.us-west-2.on.aws/'
AUTH_METHOD 'auth_header'
AUTH_HEADER 'auth'
AUTH_KEY 'proton';
AUTH_KEY 'proton'
EXECUTION_TIMEOUT 60000;
- client: python
query_id: udf-30-1
query_end_timer: 7
Expand Down

0 comments on commit 4ffb74a

Please sign in to comment.