Skip to content

Commit

Permalink
Modify to execute flush when giganto down (#542)
Browse files Browse the repository at this point in the history
- the WAL buffer is flushed when it is full or giganto down.

Close: #541
  • Loading branch information
BLYKIM authored Sep 18, 2023
1 parent 4ba15a5 commit 79546ac
Showing 1 changed file with 7 additions and 8 deletions.
15 changes: 7 additions & 8 deletions src/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,14 +800,12 @@ async fn handle_data<T>(
)
.await?;
}
if store.flush().is_ok() {
ack_cnt_rotation.fetch_add(1, Ordering::SeqCst);
ack_time_rotation.store(timestamp, Ordering::SeqCst);
if ACK_ROTATION_CNT <= ack_cnt_rotation.load(Ordering::SeqCst) {
send_ack_timestamp(&mut (*sender_rotation.lock().await), timestamp).await?;
ack_cnt_rotation.store(0, Ordering::SeqCst);
ack_time_notify.notify_one();
}
ack_cnt_rotation.fetch_add(1, Ordering::SeqCst);
ack_time_rotation.store(timestamp, Ordering::SeqCst);
if ACK_ROTATION_CNT <= ack_cnt_rotation.load(Ordering::SeqCst) {
send_ack_timestamp(&mut (*sender_rotation.lock().await), timestamp).await?;
ack_cnt_rotation.store(0, Ordering::SeqCst);
ack_time_notify.notify_one();
}
#[cfg(feature = "benchmark")]
{
Expand All @@ -830,6 +828,7 @@ async fn handle_data<T>(
}

if shutdown_signal.load(Ordering::SeqCst) {
store.flush()?;
handler.abort();
break;
}
Expand Down

0 comments on commit 79546ac

Please sign in to comment.