Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

feat: terminate stream if client is dropping the connection #463

Merged
merged 10 commits into from
Mar 5, 2024
29 changes: 26 additions & 3 deletions crates/topos-tce-api/src/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -150,14 +150,37 @@ impl Stream {
}
}

Some(_stream_packet) = self.inbound_stream.next() => {

// We currently open the stream, but no other message from the client is getting processed.
// We are using this open connection to communicate `delivered_certificates` to the client.
Some(stream_packet) = self.inbound_stream.next() => {
match stream_packet {
Ok((_request_id, _message)) => {
trace!("Received message from stream_id: {:?}", self.stream_id);
}
Err(error) => {
match error.kind {
StreamErrorKind::StreamClosed => {
warn!("Stream {} closed", self.stream_id);
return Err(StreamError::new(self.stream_id, StreamErrorKind::StreamClosed));
}
_ => {
// We are not handling specific errors for now.
// If the sequencer is closing the connection, we are receiving a
// StreamErrorKind::TransportError.
error!( "Stream error: {:?}", error);
return Err(StreamError::new(self.stream_id, error.kind));

}

}
}
}
}

// For graceful shutdown in case streams are closed
else => break,
}
}

Ok(self.stream_id)
}
}
49 changes: 47 additions & 2 deletions crates/topos-tce-api/src/stream/tests.rs
Original file line number Diff line number Diff line change
@@ -200,9 +200,54 @@ async fn resuming_one_subscription() {}
#[ignore = "not yet implemented"]
async fn resuming_all_subscription() {}

#[rstest]
#[test(tokio::test)]
#[ignore = "not yet implemented"]
async fn closing_client_stream() {}
async fn closing_client_stream() -> Result<(), Box<dyn std::error::Error>> {
let (mut tx, stream, mut context) = StreamBuilder::default().build();

let join = spawn(stream.run());

let msg: WatchCertificatesRequest = GrpcOpenStream {
target_checkpoint: Some(TargetCheckpoint {
target_subnet_ids: vec![TARGET_SUBNET_ID_1.into()],
positions: vec![],
}),
source_checkpoint: None,
}
.into();

_ = tx.send_data(encode(&msg)?).await;

let expected_stream_id = context.stream_id;

wait_for_command!(
context.runtime_receiver,
matches: InternalRuntimeCommand::Register { stream_id, sender, .. } if stream_id == expected_stream_id => {
sender.send(Ok(()))
}
);

let msg = context.stream_receiver.recv().await;

assert!(
matches!(
msg,
Some(Ok((_, OutboundMessage::StreamOpened(StreamOpened { ref subnet_ids })))) if subnet_ids == &[TARGET_SUBNET_ID_1],
),
"Expected StreamOpened, received: {msg:?}"
);

tx.abort();

let result = join.await?;

assert!(
matches!(result, Err(StreamError { stream_id, kind: StreamErrorKind::Transport(_)}) if stream_id == context.stream_id),
"Doesn't match {result:?}",
);

Ok(())
}

#[test(tokio::test)]
#[ignore = "not yet implemented"]