diff --git a/src/ingest.rs b/src/ingest.rs index 3257a6fd..cc6dd1ab 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -800,14 +800,12 @@ async fn handle_data( ) .await?; } - if store.flush().is_ok() { - ack_cnt_rotation.fetch_add(1, Ordering::SeqCst); - ack_time_rotation.store(timestamp, Ordering::SeqCst); - if ACK_ROTATION_CNT <= ack_cnt_rotation.load(Ordering::SeqCst) { - send_ack_timestamp(&mut (*sender_rotation.lock().await), timestamp).await?; - ack_cnt_rotation.store(0, Ordering::SeqCst); - ack_time_notify.notify_one(); - } + ack_cnt_rotation.fetch_add(1, Ordering::SeqCst); + ack_time_rotation.store(timestamp, Ordering::SeqCst); + if ACK_ROTATION_CNT <= ack_cnt_rotation.load(Ordering::SeqCst) { + send_ack_timestamp(&mut (*sender_rotation.lock().await), timestamp).await?; + ack_cnt_rotation.store(0, Ordering::SeqCst); + ack_time_notify.notify_one(); } #[cfg(feature = "benchmark")] { @@ -830,6 +828,7 @@ async fn handle_data( } if shutdown_signal.load(Ordering::SeqCst) { + store.flush()?; handler.abort(); break; }