From 852ccdf0288e7031a77f909ce77c5c739ec9c5e8 Mon Sep 17 00:00:00 2001 From: Bly Kim Date: Mon, 18 Sep 2023 13:51:34 +0900 Subject: [PATCH] Modify to execute flush when giganto down - the WAL buffer is flushed when it is full or giganto down. Close: #541 --- src/ingest.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/ingest.rs b/src/ingest.rs index 3257a6fd..cc6dd1ab 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -800,14 +800,12 @@ async fn handle_data( ) .await?; } - if store.flush().is_ok() { - ack_cnt_rotation.fetch_add(1, Ordering::SeqCst); - ack_time_rotation.store(timestamp, Ordering::SeqCst); - if ACK_ROTATION_CNT <= ack_cnt_rotation.load(Ordering::SeqCst) { - send_ack_timestamp(&mut (*sender_rotation.lock().await), timestamp).await?; - ack_cnt_rotation.store(0, Ordering::SeqCst); - ack_time_notify.notify_one(); - } + ack_cnt_rotation.fetch_add(1, Ordering::SeqCst); + ack_time_rotation.store(timestamp, Ordering::SeqCst); + if ACK_ROTATION_CNT <= ack_cnt_rotation.load(Ordering::SeqCst) { + send_ack_timestamp(&mut (*sender_rotation.lock().await), timestamp).await?; + ack_cnt_rotation.store(0, Ordering::SeqCst); + ack_time_notify.notify_one(); } #[cfg(feature = "benchmark")] { @@ -830,6 +828,7 @@ async fn handle_data( } if shutdown_signal.load(Ordering::SeqCst) { + store.flush()?; handler.abort(); break; }