Skip to content

Commit

Permalink
test moving terminate_tasks_rx into the read task
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jul 21, 2022
1 parent c406596 commit 9b25774
Showing 1 changed file with 50 additions and 41 deletions.
91 changes: 50 additions & 41 deletions shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,22 +378,47 @@ fn spawn_read_write_tasks<
let mut reader = FramedRead::new(rx, codec.clone());
let mut writer = FramedWrite::new(tx, codec);

// Shutdown flows
//
// main task shuts down due to transform error:
// 1. The main task shutsdown sending terminate_tasks_tx and dropping the first out_tx
// 2. The reader task detects change on terminate_tasks_rx and terminates, the last out_tx instance is dropped
// 3. The writer task detects that the last out_tx is dropped and terminates
//
// client closes connection:
// 1. The reader task detects that the client has closed the connection via reader.next() returning None, dropping in_tx and the first out_tx
// 2. The main task detects that in_tx is dropped by in_rx returning None and terminates, dropping the last out_tx
// 3. The writer task detects that the last out_tx is dropped and terminates

// reader task
tokio::spawn(
async move {
while let Some(message) = reader.next().await {
match message {
Ok(message) => {
let remaining_messages =
process_return_to_sender_messages(message, &out_tx);
if !remaining_messages.is_empty() {
if let Err(error) = in_tx.send(remaining_messages) {
warn!("failed to pass on received message: {}", error);
return;
loop {
tokio::select! {
result = reader.next() => {
if let Some(message) = result {
match message {
Ok(message) => {
let remaining_messages =
process_return_to_sender_messages(message, &out_tx);
if !remaining_messages.is_empty() {
if let Err(error) = in_tx.send(remaining_messages) {
warn!("failed to pass on received message: {}", error);
return;
}
}
}
Err(error) => {
warn!("failed to receive or decode message: {:?}", error);
return;
}
}
} else {
debug!("client has closed the connection");
return;
}
}
Err(error) => {
warn!("failed to receive or decode message: {:?}", error);
_ = terminate_tasks_rx.changed() => {
return;
}
}
Expand All @@ -402,44 +427,28 @@ fn spawn_read_write_tasks<
.in_current_span(),
);

// sender task
tokio::spawn(
async move {
loop {
tokio::select! {
result = out_rx.recv() => {
if let Some(message) = result {
if let Err(err) = writer.send(message).await {
error!("failed to send or encode message: {:?}", err);
}
} else {
debug!("main task shutdown"); // TODO: hang on... why do we need terminate_tasks_rx then
break;
}
if let Some(message) = out_rx.recv().await {
if let Err(err) = writer.send(message).await {
error!("failed to send or encode message: {:?}", err);
}
_ = terminate_tasks_rx.changed() => {
debug!("main task is signalling this task to end, first flush out any remaining messages");
while let Ok(message) = out_rx.try_recv() {
if let Err(err) = writer.send(message).await {
error!("while flushing messages: failed to send or encode message: {:?}", err);
}
} else {
// Main task has ended.
// First flush out any remaining messages.
// Then end the task thus closing the connection by dropping the write half
while let Ok(message) = out_rx.try_recv() {
if let Err(err) = writer.send(message).await {
error!(
"while flushing messages: failed to send or encode message: {err:?}",
);
}
break;
}
break;
}
}
// The cassandra protocol needs to:
// 1. receive bad version init
// 2. reply with error
// 3. receive another message
// 4. kill the connection:
// 1. codec returns Err
// 2. rx task receives Err, logging it and returning
// 3. rx task ends dropping the in_tx
// 4. main task receives None from in_rx causing it to return
// 5. main task ends resulting in drop running terminate_tasks_tx.send(())
//
// I suspect that:
// Sender is backlogged and gets killed before it can process everything so 2 never occurs
}
.in_current_span(),
);
Expand Down

0 comments on commit 9b25774

Please sign in to comment.