Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send AppendResponse keepalive once per second #4036

Merged
merged 1 commit into from
Apr 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions safekeeper/src/receive_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::task::spawn_blocking;
use tokio::time::Duration;
use tokio::time::Instant;
use tracing::*;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
Expand Down Expand Up @@ -206,6 +208,10 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
}
}

// Send keepalive messages to walproposer, to make sure it receives updates
// even when it writes a steady stream of messages.
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);

/// Takes messages from msg_rx, processes and pushes replies to reply_tx.
struct WalAcceptor {
tli: Arc<Timeline>,
Expand Down Expand Up @@ -253,18 +259,25 @@ impl WalAcceptor {
timeline: Arc::clone(&self.tli),
};

let mut next_msg: ProposerAcceptorMessage;
// After this timestamp we will stop processing AppendRequests and send a response
// to the walproposer. walproposer sends at least one AppendRequest per second,
// we will send keepalives by replying to these requests once per second.
let mut next_keepalive = Instant::now();

loop {
let opt_msg = self.msg_rx.recv().await;
if opt_msg.is_none() {
return Ok(()); // chan closed, streaming terminated
}
next_msg = opt_msg.unwrap();
let mut next_msg = opt_msg.unwrap();

if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
// loop through AppendRequest's while it's readily available to
// write as many WAL as possible without fsyncing
//
// Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
// Otherwise, we might end up in a situation where we read a message, but don't
// process it.
while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);

Expand All @@ -274,6 +287,11 @@ impl WalAcceptor {
}
}

// get out of this loop if keepalive time is reached
if Instant::now() >= next_keepalive {
break;
}

match self.msg_rx.try_recv() {
Ok(msg) => next_msg = msg,
Err(TryRecvError::Empty) => break,
Expand All @@ -282,18 +300,18 @@ impl WalAcceptor {
}

// flush all written WAL to the disk
if let Some(reply) = self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)? {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
}
}
self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)?
} else {
// process message other than AppendRequest
if let Some(reply) = self.tli.process_msg(&next_msg)? {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
}
self.tli.process_msg(&next_msg)?
};

if let Some(reply) = reply_msg {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
}
// reset keepalive time
next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
}
}
}
Expand Down