Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport_service: Improve logs and move code from tokio::select macro #254

Merged
merged 7 commits into from
Oct 1, 2024
5 changes: 4 additions & 1 deletion src/protocol/transport_service.rs
Original file line number Diff line number Diff line change
@@ -389,7 +389,10 @@ impl Stream for TransportService {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
while let Poll::Ready(event) = self.rx.poll_recv(cx) {
match event {
None => return Poll::Ready(None),
None => {
tracing::warn!(target: LOG_TARGET, "transport service closed");
return Poll::Ready(None);
}
Some(InnerTransportEvent::ConnectionEstablished {
peer,
endpoint,
391 changes: 232 additions & 159 deletions src/transport/tcp/connection.rs
Original file line number Diff line number Diff line change
@@ -493,6 +493,231 @@ impl TcpConnection {
})
}

/// Handles the yamux substream.
///
/// Returns `true` if the connection handler should exit.
async fn handle_yamux_substream(
&mut self,
substream: Option<Result<crate::yamux::Stream, crate::yamux::ConnectionError>>,
) -> crate::Result<bool> {
match substream {
Some(Ok(stream)) => {
let substream_id = {
let substream_id = self.next_substream_id.fetch_add(1usize, Ordering::Relaxed);
SubstreamId::from(substream_id)
};
let protocols = self.protocol_set.protocols();
let permit = self.protocol_set.try_get_permit().ok_or(Error::ConnectionClosed)?;
let open_timeout = self.substream_open_timeout;

self.pending_substreams.push(Box::pin(async move {
match tokio::time::timeout(
open_timeout,
Self::accept_substream(
stream,
permit,
substream_id,
protocols,
open_timeout,
),
)
.await
{
Ok(Ok(substream)) => Ok(substream),
Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate {
protocol: None,
substream_id: None,
error: SubstreamError::NegotiationError(error),
}),
Err(_) => Err(ConnectionError::Timeout {
protocol: None,
substream_id: None,
}),
}
}));

Ok(false)
}
Some(Err(error)) => {
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
?error,
"connection closed with error",
);

self.protocol_set
.report_connection_closed(self.peer, self.endpoint.connection_id())
.await?;
Ok(true)
}
None => {
tracing::debug!(target: LOG_TARGET, peer = ?self.peer, "connection closed");
self.protocol_set
.report_connection_closed(self.peer, self.endpoint.connection_id())
.await?;
Ok(true)
}
}
}

/// Handles negotiated substream results.
async fn handle_negotiated_substream(
&mut self,
result: Result<NegotiatedSubstream, ConnectionError>,
) {
match result {
// TODO: return error to protocol
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
?error,
"failed to accept/open substream",
);

let (protocol, substream_id, error) = match error {
ConnectionError::Timeout {
protocol,
substream_id,
} => (
protocol,
substream_id,
SubstreamError::NegotiationError(NegotiationError::Timeout),
),
ConnectionError::FailedToNegotiate {
protocol,
substream_id,
error,
} => (protocol, substream_id, error),
};

match (protocol, substream_id) {
(Some(protocol), Some(substream_id)) => {
if let Err(error) = self
.protocol_set
.report_substream_open_failure(protocol.clone(), substream_id, error)
.await
{
tracing::error!(
target: LOG_TARGET,
?protocol,
endpoint = ?self.endpoint,
?error,
"failed to register substream open failure to protocol"
);
}
}
_ => {}
}
}
Ok(substream) => {
let protocol = substream.protocol.clone();
let direction = substream.direction;
let substream_id = substream.substream_id;
let socket = FuturesAsyncReadCompatExt::compat(substream.io);
let bandwidth_sink = self.bandwidth_sink.clone();

let substream = substream::Substream::new_tcp(
self.peer,
substream_id,
Substream::new(socket, bandwidth_sink, substream.permit),
self.protocol_set.protocol_codec(&protocol),
);

if let Err(error) = self
.protocol_set
.report_substream_open(self.peer, protocol.clone(), direction, substream)
.await
{
tracing::error!(
target: LOG_TARGET,
?protocol,
peer = ?self.peer,
endpoint = ?self.endpoint,
?error,
"failed to register opened substream to protocol",
);
}
}
}
}

/// Handles protocol command.
///
/// Returns `true` if the connection handler should exit.
async fn handle_protocol_command(
&mut self,
command: Option<ProtocolCommand>,
) -> crate::Result<bool> {
match command {
Some(ProtocolCommand::OpenSubstream {
protocol,
fallback_names,
substream_id,
permit,
}) => {
let control = self.control.clone();
let open_timeout = self.substream_open_timeout;

tracing::trace!(
target: LOG_TARGET,
?protocol,
?substream_id,
"open substream",
);

self.pending_substreams.push(Box::pin(async move {
match tokio::time::timeout(
open_timeout,
Self::open_substream(
control,
substream_id,
permit,
protocol.clone(),
fallback_names,
open_timeout,
),
)
.await
{
Ok(Ok(substream)) => Ok(substream),
Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate {
protocol: Some(protocol),
substream_id: Some(substream_id),
error,
}),
Err(_) => Err(ConnectionError::Timeout {
protocol: Some(protocol),
substream_id: Some(substream_id),
}),
}
}));

Ok(false)
}
Some(ProtocolCommand::ForceClose) => {
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
connection_id = ?self.endpoint.connection_id(),
"force closing connection",
);

self.protocol_set
.report_connection_closed(self.peer, self.endpoint.connection_id())
.await?;
Ok(true)
}
None => {
tracing::debug!(target: LOG_TARGET, "protocols have disconnected, closing connection");
self.protocol_set
.report_connection_closed(self.peer, self.endpoint.connection_id())
.await?;
Ok(true)
}
}
}

/// Start connection event loop.
pub(crate) async fn start(mut self) -> crate::Result<()> {
self.protocol_set
@@ -501,169 +726,17 @@ impl TcpConnection {

loop {
tokio::select! {
substream = self.connection.next() => match substream {
Some(Ok(stream)) => {
let substream_id = {
let substream_id = self.next_substream_id.fetch_add(1usize, Ordering::Relaxed);
SubstreamId::from(substream_id)
};
let protocols = self.protocol_set.protocols();
let permit = self.protocol_set.try_get_permit().ok_or(Error::ConnectionClosed)?;
let open_timeout = self.substream_open_timeout;

self.pending_substreams.push(Box::pin(async move {
match tokio::time::timeout(
open_timeout,
Self::accept_substream(stream, permit, substream_id, protocols, open_timeout),
)
.await
{
Ok(Ok(substream)) => Ok(substream),
Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate {
protocol: None,
substream_id: None,
error: SubstreamError::NegotiationError(error),
}),
Err(_) => Err(ConnectionError::Timeout {
protocol: None,
substream_id: None
}),
}
}));
},
Some(Err(error)) => {
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
?error,
"connection closed with error",
);
self.protocol_set.report_connection_closed(self.peer, self.endpoint.connection_id()).await?;

return Ok(())
}
None => {
tracing::debug!(target: LOG_TARGET, peer = ?self.peer, "connection closed");
self.protocol_set.report_connection_closed(self.peer, self.endpoint.connection_id()).await?;

return Ok(())
substream = self.connection.next() => {
if self.handle_yamux_substream(substream).await? {
return Ok(());
}
},
// TODO: move this to a function
substream = self.pending_substreams.select_next_some(), if !self.pending_substreams.is_empty() => {
match substream {
// TODO: return error to protocol
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
?error,
"failed to accept/open substream",
);

let (protocol, substream_id, error) = match error {
ConnectionError::Timeout { protocol, substream_id } => {
(protocol, substream_id, SubstreamError::NegotiationError(NegotiationError::Timeout))
}
ConnectionError::FailedToNegotiate { protocol, substream_id, error } => {
(protocol, substream_id, error)
}
};

match (protocol, substream_id) {
(Some(protocol), Some(substream_id)) => {
if let Err(error) = self.protocol_set
.report_substream_open_failure(protocol, substream_id, error)
.await
{
tracing::error!(
target: LOG_TARGET,
?error,
"failed to register opened substream to protocol"
);
}
}
_ => {}
}
}
Ok(substream) => {
let protocol = substream.protocol.clone();
let direction = substream.direction;
let substream_id = substream.substream_id;
let socket = FuturesAsyncReadCompatExt::compat(substream.io);
let bandwidth_sink = self.bandwidth_sink.clone();

let substream = substream::Substream::new_tcp(
self.peer,
substream_id,
Substream::new(socket, bandwidth_sink, substream.permit),
self.protocol_set.protocol_codec(&protocol)
);

if let Err(error) = self.protocol_set
.report_substream_open(self.peer, protocol, direction, substream)
.await
{
tracing::error!(
target: LOG_TARGET,
?error,
"failed to register opened substream to protocol",
);
}
}
}
self.handle_negotiated_substream(substream).await;
}
protocol = self.protocol_set.next() => match protocol {
Some(ProtocolCommand::OpenSubstream { protocol, fallback_names, substream_id, permit }) => {
let control = self.control.clone();
let open_timeout = self.substream_open_timeout;

tracing::trace!(
target: LOG_TARGET,
?protocol,
?substream_id,
"open substream",
);

self.pending_substreams.push(Box::pin(async move {
match tokio::time::timeout(
open_timeout,
Self::open_substream(
control,
substream_id,
permit,
protocol.clone(),
fallback_names,
open_timeout,
),
)
.await
{
Ok(Ok(substream)) => Ok(substream),
Ok(Err(error)) => Err(ConnectionError::FailedToNegotiate {
protocol: Some(protocol),
substream_id: Some(substream_id),
error,
}),
Err(_) => Err(ConnectionError::Timeout {
protocol: Some(protocol),
substream_id: Some(substream_id)
}),
}
}));
}
Some(ProtocolCommand::ForceClose) => {
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
connection_id = ?self.endpoint.connection_id(),
"force closing connection",
);

return self.protocol_set.report_connection_closed(self.peer, self.endpoint.connection_id()).await
}
None => {
tracing::debug!(target: LOG_TARGET, "protocols have disconnected, closing connection");
return self.protocol_set.report_connection_closed(self.peer, self.endpoint.connection_id()).await
protocol = self.protocol_set.next() => {
if self.handle_protocol_command(protocol).await? {
return Ok(())
}
}
}