From 691f825b50b5b96c358c5157a0cdf622900e3504 Mon Sep 17 00:00:00 2001 From: meiyi Date: Thu, 13 Jun 2024 15:49:30 +0800 Subject: [PATCH] group commit timeout --- be/src/runtime/group_commit_mgr.cpp | 2 +- .../test_group_commit_timeout.groovy | 55 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 regression-test/suites/insert_p0/test_group_commit_timeout.groovy diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index c7b2492c610d49..6a277be24b80f0 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -149,7 +149,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* << ", runtime_state=" << runtime_state; } } - _get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds)); + _get_cond.wait_for(l, std::chrono::milliseconds(std::min(left_milliseconds, 10000L))); } if (runtime_state->is_cancelled()) { auto st = runtime_state->cancel_reason(); diff --git a/regression-test/suites/insert_p0/test_group_commit_timeout.groovy b/regression-test/suites/insert_p0/test_group_commit_timeout.groovy new file mode 100644 index 00000000000000..7866a33df0ef86 --- /dev/null +++ b/regression-test/suites/insert_p0/test_group_commit_timeout.groovy @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_group_commit_timeout", "nonConcurrent") { + def tableName = "test_group_commit_timeout" + sql """ + CREATE TABLE if not exists ${tableName} ( + `id` int(11) NOT NULL, + `name` varchar(100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "300000" + ); + """ + + def query_timeout = sql """show variables where variable_name = 'query_timeout';""" + def insert_timeout = sql """show variables where variable_name = 'insert_timeout';""" + logger.info("query_timeout: ${query_timeout}, insert_timeout: ${insert_timeout}") + + long start = System.currentTimeMillis() + try { + sql "SET global query_timeout = 5" + sql "SET global insert_timeout = 5" + + sql "set group_commit = sync_mode" + sql "insert into ${tableName} values(1, 'a', 10)" + assertTrue(false) + } catch (Exception e) { + long end = System.currentTimeMillis() + logger.info("failed " + e.getMessage()) + assertTrue(e.getMessage().contains("FragmentMgr cancel worker going to cancel timeout instance")) + assertTrue(end - start <= 60000) + } finally { + sql "SET global query_timeout = ${query_timeout[0][1]}" + sql "SET global insert_timeout = ${insert_timeout[0][1]}" + } +}