Skip to content

Commit

Permalink
Adaptive async insert timeout adjustments for large limits
Browse files Browse the repository at this point in the history
Customers may have a maximum asynchronous timeout
limit as large as 100 seconds.
This doesn't work well with the current algorithm,
which uses a default minimum limit of 50 milliseconds.

* Use the maximum limit as the initial value for the
adaptive algorithm;
* Introduce async_insert_busy_timeout_max_limits_ratio
to determine the maximum ratio between the minimum
and maximum limits. If the ratio between
async_insert_busy_timeout_min_ms and
async_insert_busy_timeout_max_ms exceeds
this value, increase the minimum limit.
  • Loading branch information
jkartseva committed Mar 27, 2024
1 parent d401998 commit b319ff1
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 4 deletions.
5 changes: 3 additions & 2 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -775,10 +775,11 @@ class IColumn;
M(UInt64, async_insert_max_query_number, 450, "Maximum number of insert queries before being inserted", 0) \
M(Milliseconds, async_insert_poll_timeout_ms, 10, "Timeout for polling data from asynchronous insert queue", 0) \
M(Bool, async_insert_use_adaptive_busy_timeout, true, "If it is set to true, use adaptive busy timeout for asynchronous inserts", 0) \
M(Milliseconds, async_insert_busy_timeout_min_ms, 50, "If auto-adjusting is enabled through async_insert_use_adaptive_busy_timeout, minimum time to wait before dumping collected data per query since the first data appeared. It also serves as the initial value for the adaptive algorithm", 0) \
M(Milliseconds, async_insert_busy_timeout_max_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared.", 0) ALIAS(async_insert_busy_timeout_ms) \
M(Milliseconds, async_insert_busy_timeout_min_ms, 50, "If auto-adjusting is enabled through async_insert_use_adaptive_busy_timeout, minimum time to wait before dumping collected data per query since the first data appeared.", 0) \
M(Milliseconds, async_insert_busy_timeout_max_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared. It also serves as the initial value for the adaptive algorithm.", 0) ALIAS(async_insert_busy_timeout_ms) \
M(Double, async_insert_busy_timeout_increase_rate, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout increases", 0) \
M(Double, async_insert_busy_timeout_decrease_rate, 0.2, "The exponential growth rate at which the adaptive asynchronous insert timeout decreases", 0) \
M(Double, async_insert_busy_timeout_max_limits_ratio, 20.0, "The maximum allowable ratio between the minimum and maximum limits for adaptive asynchronous insert timeout. This may increase the minimum limit set by the async_insert_busy_timeout_min_ms setting. It is expected to be greater or equal than 1.0", 0) \
\
M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"azure_max_upload_part_size", 5ull*1024*1024*1024, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to Azure blob storage."},
{"azure_upload_part_size_multiply_factor", 2, 2, "Multiply azure_min_upload_part_size by this factor each time azure_multiply_parts_count_threshold parts were uploaded from a single write to Azure blob storage."},
{"azure_upload_part_size_multiply_parts_count_threshold", 500, 500, "Each time this number of parts was uploaded to Azure blob storage, azure_min_upload_part_size is multiplied by azure_upload_part_size_multiply_factor."},
{"async_insert_busy_timeout_max_limits_ratio", 20.0, 20.0, "The maximum allowable ratio between the minimum and maximum limits for adaptive asynchronous insert timeout." },
}},
{"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"},
{"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"},
Expand Down
10 changes: 8 additions & 2 deletions src/Interpreters/AsynchronousInsertQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t poo

for (size_t i = 0; i < pool_size; ++i)
queue_shards[i].busy_timeout_ms
= std::min(Milliseconds(settings.async_insert_busy_timeout_min_ms), Milliseconds(settings.async_insert_busy_timeout_max_ms));
= std::max(Milliseconds(settings.async_insert_busy_timeout_min_ms), Milliseconds(settings.async_insert_busy_timeout_max_ms));

for (size_t i = 0; i < pool_size; ++i)
dump_by_first_update_threads.emplace_back([this, i] { processBatchDeadlines(i); });
Expand Down Expand Up @@ -483,7 +483,10 @@ AsynchronousInsertQueue::Milliseconds AsynchronousInsertQueue::getBusyWaitTimeou
return settings.async_insert_busy_timeout_max_ms;

const auto max_ms = Milliseconds(settings.async_insert_busy_timeout_max_ms);
const auto min_ms = std::min(std::max(Milliseconds(settings.async_insert_busy_timeout_min_ms), Milliseconds(1)), max_ms);
const double limits_ratio = settings.async_insert_busy_timeout_max_limits_ratio;

const auto min_ms = std::clamp(
Milliseconds(settings.async_insert_busy_timeout_min_ms), std::chrono::ceil<Milliseconds>(max_ms / limits_ratio), max_ms);

auto normalize = [&min_ms, &max_ms](const auto & t_ms) { return std::min(std::max(t_ms, min_ms), max_ms); };

Expand Down Expand Up @@ -543,6 +546,9 @@ void AsynchronousInsertQueue::validateSettings(const Settings & settings, Logger

if (settings.async_insert_busy_timeout_decrease_rate <= 0)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting 'async_insert_busy_timeout_decrease_rate' must be greater than zero");

if (settings.async_insert_busy_timeout_max_limits_ratio <= 1.0)
throw Exception(ErrorCodes::INVALID_SETTING_VALUE, "Setting 'async_insert_busy_timeout_max_limits_ratio' must be greater or equal than 1.0");
}

void AsynchronousInsertQueue::flushAll()
Expand Down
16 changes: 16 additions & 0 deletions tests/queries/0_stateless/02968_adaptive_async_insert_timeout.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ INSERT INTO async_insert_mt_test
async_insert_busy_timeout_min_ms=500
VALUES (3, []), (1, [1, 3]), (2, [7, 8]), (4, [5, 9]), (5, [2, 6]);

INSERT INTO async_insert_mt_test
SETTINGS
async_insert=1,
wait_for_async_insert=1,
async_insert_busy_timeout_min_ms=10,
async_insert_busy_timeout_ms=5000,
async_insert_busy_timeout_max_limits_ratio=50
VALUES (3, []), (1, [1, 3]), (2, [7, 8]), (4, [5, 9]), (5, [2, 6]);


INSERT INTO async_insert_mt_test
SETTINGS
Expand All @@ -47,5 +56,12 @@ INSERT INTO async_insert_mt_test
async_insert_busy_timeout_decrease_rate=-1.0
VALUES (3, []), (1, [1, 3]), (2, [7, 8]), (4, [5, 9]), (5, [2, 6]); -- { serverError INVALID_SETTING_VALUE }

INSERT INTO async_insert_mt_test
SETTINGS
async_insert=1,
wait_for_async_insert=1,
async_insert_busy_timeout_max_limits_ratio=0
VALUES (3, []), (1, [1, 3]), (2, [7, 8]), (4, [5, 9]), (5, [2, 6]); -- { serverError INVALID_SETTING_VALUE }


DROP TABLE IF EXISTS async_insert_mt_test;

0 comments on commit b319ff1

Please sign in to comment.