Skip to content

Commit e8d8ea3

Browse files
chore: add logging to semphore leaser (#17825)
* chore(query): add statement_queue_ttl_in_seconds setting for queries queue * chore: add logging to semphore leaser * Update acquirer.rs * Update settings_default.rs --------- Co-authored-by: zhang2014 <coswde@gmail.com>
1 parent bfffde7 commit e8d8ea3

File tree

5 files changed

+33
-1
lines changed

5 files changed

+33
-1
lines changed

src/meta/semaphore/src/acquirer/acquirer.rs

+9
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ impl Acquirer {
227227
val_bytes,
228228
self.lease,
229229
cancel_rx.map(|_| ()),
230+
self.ctx.clone(),
230231
);
231232

232233
let task_name = format!("{}/(seq={})", self.ctx, sem_key.seq);
@@ -256,7 +257,10 @@ impl Acquirer {
256257
val_bytes: Vec<u8>,
257258
ttl: Duration,
258259
cancel: impl Future<Output = ()> + Send + 'static,
260+
ctx: impl ToString,
259261
) -> Result<(), ConnectionClosed> {
262+
let ctx = ctx.to_string();
263+
260264
let sleep_time = ttl / 3;
261265
let sleep_time = std::cmp::min(sleep_time, Duration::from_millis(2_000));
262266

@@ -287,6 +291,11 @@ impl Acquirer {
287291
// it means the semaphore has been released. Re-inserting it would cause confusion
288292
// in the semaphore state and potentially lead to inconsistent behavior.
289293

294+
info!(
295+
"{}: About to extend semaphore permit lease: {} ttl: {:?}",
296+
ctx, key_str, ttl
297+
);
298+
290299
let upsert = UpsertKV::update(&key_str, &val_bytes)
291300
.with(MatchSeq::GE(1))
292301
.with_ttl(ttl);

src/query/service/src/sessions/queue_mgr.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ pub trait QueueData: Send + Sync + 'static {
6666

6767
fn remove_error_message(key: Option<Self::Key>) -> ErrorCode;
6868

69+
fn lock_ttl(&self) -> Duration;
70+
6971
fn timeout(&self) -> Duration;
7072

7173
fn need_acquire_to_queue(&self) -> bool;
@@ -158,7 +160,7 @@ impl<Data: QueueData> QueueManager<Data> {
158160
data.get_lock_key(),
159161
self.permits as u64,
160162
data.get_key(), // ID of this acquirer
161-
Duration::from_secs(3),
163+
data.lock_ttl(),
162164
);
163165

164166
let future = AcquireQueueFuture::create(
@@ -333,6 +335,7 @@ pub struct QueryEntry {
333335
pub sql: String,
334336
pub user_info: UserInfo,
335337
pub timeout: Duration,
338+
pub lock_ttl: Duration,
336339
pub need_acquire_to_queue: bool,
337340
}
338341

@@ -354,6 +357,7 @@ impl QueryEntry {
354357
0 => Duration::from_secs(60 * 60 * 24 * 365 * 35),
355358
timeout => Duration::from_secs(timeout),
356359
},
360+
lock_ttl: Duration::from_secs(settings.get_statement_queue_ttl_in_seconds()?),
357361
})
358362
}
359363

@@ -481,6 +485,10 @@ impl QueueData for QueryEntry {
481485
}
482486
}
483487

488+
fn lock_ttl(&self) -> Duration {
489+
self.lock_ttl
490+
}
491+
484492
fn timeout(&self) -> Duration {
485493
self.timeout
486494
}

src/query/service/tests/it/sessions/queue_mgr.rs

+4
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ impl<const PASSED: bool> QueueData for TestData<PASSED> {
5252
ErrorCode::Internal(format!("{:?}", key))
5353
}
5454

55+
fn lock_ttl(&self) -> Duration {
56+
Duration::from_secs(3)
57+
}
58+
5559
fn timeout(&self) -> Duration {
5660
Duration::from_secs(1000)
5761
}

src/query/settings/src/settings_default.rs

+7
Original file line numberDiff line numberDiff line change
@@ -1018,6 +1018,13 @@ impl DefaultSettings {
10181018
scope: SettingScope::Both,
10191019
range: Some(SettingRange::Numeric(0..=u64::MAX)),
10201020
}),
1021+
("statement_queue_ttl_in_seconds", DefaultSettingValue {
1022+
value: UserSettingValue::UInt64(15),
1023+
desc: "This parameter specifies the interval, in seconds, between lease renewal operations with the meta service to maintain active communication.",
1024+
mode: SettingMode::Both,
1025+
scope: SettingScope::Both,
1026+
range: Some(SettingRange::Numeric(1..=3600)),
1027+
}),
10211028
("geometry_output_format", DefaultSettingValue {
10221029
value: UserSettingValue::String("GeoJSON".to_owned()),
10231030
desc: "Display format for GEOMETRY values.",

src/query/settings/src/settings_getter_setter.rs

+4
Original file line numberDiff line numberDiff line change
@@ -932,4 +932,8 @@ impl Settings {
932932
pub fn get_enable_block_stream_write(&self) -> Result<bool> {
933933
Ok(self.try_get_u64("enable_block_stream_write")? == 1)
934934
}
935+
936+
pub fn get_statement_queue_ttl_in_seconds(&self) -> Result<u64> {
937+
self.try_get_u64("statement_queue_ttl_in_seconds")
938+
}
935939
}

0 commit comments

Comments
 (0)