Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(transport): return Poll::ready until error is consumed #536

Merged
merged 5 commits into from
Jan 15, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions tonic/src/transport/service/reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use tracing::trace;
pub(crate) struct Reconnect<M, Target>
where
M: Service<Target>,
M::Error: Into<Error>,
{
mk_service: M,
state: State<M::Future, M::Response>,
target: Target,
error: Option<M::Error>,
error: Option<crate::Error>,
has_been_connected: bool,
is_lazy: bool,
}
Expand All @@ -32,6 +33,7 @@ enum State<F, S> {
impl<M, Target> Reconnect<M, Target>
where
M: Service<Target>,
M::Error: Into<Error>,
{
pub(crate) fn new(mk_service: M, target: Target, is_lazy: bool) -> Self {
Reconnect {
Expand All @@ -52,14 +54,19 @@ where
M::Future: Unpin,
Error: From<M::Error> + From<S::Error>,
Target: Clone,
<M as tower_service::Service<Target>>::Error: Into<crate::Error>,
{
type Response = S::Response;
type Error = Error;
type Future = ResponseFuture<S::Future, M::Error>;
type Future = ResponseFuture<S::Future>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut state;

if self.error.is_some() {
return Poll::Ready(Ok(()));
}

loop {
match self.state {
State::Idle => {
Expand Down Expand Up @@ -94,7 +101,9 @@ where
if !(self.has_been_connected || self.is_lazy) {
return Poll::Ready(Err(e.into()));
} else {
self.error = Some(e);
let error = e.into();
tracing::error!("reconnect::poll_ready: {:?}", error);
self.error = Some(error);
break;
}
}
Expand Down Expand Up @@ -130,8 +139,10 @@ where
}

fn call(&mut self, request: Request) -> Self::Future {
tracing::trace!("Reconnect::call");
if let Some(error) = self.error.take() {
return ResponseFuture::error(error);
tracing::error!("error: {:?}", error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why log messages at error when we're returning the error? As this is a library, it seems preferable to keep these at something like debug so that the application has more control over whether these are handled/logged, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if I understand correctly, this would log the errors twice -- once above in poll_ready and then again at call-time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, no one has complained about it yet, but I think thats not a bad idea. Let me do a pass of those in a follow up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no one has complained about it yet

If I'm correct, this logging was introduced by this PR and not there before...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had two other places where we logged at error!, so I moved them all to debug!, thats what I meant.

return ResponseFuture::error(error.into());
}

let service = match self.state {
Expand All @@ -150,6 +161,7 @@ where
M::Future: fmt::Debug,
M::Response: fmt::Debug,
Target: fmt::Debug,
<M as tower_service::Service<Target>>::Error: Into<Error>,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Reconnect")
Expand All @@ -163,37 +175,36 @@ where
/// Future that resolves to the response or failure to connect.
#[pin_project]
#[derive(Debug)]
pub(crate) struct ResponseFuture<F, E> {
pub(crate) struct ResponseFuture<F> {
#[pin]
inner: Inner<F, E>,
inner: Inner<F>,
}

#[pin_project(project = InnerProj)]
#[derive(Debug)]
enum Inner<F, E> {
enum Inner<F> {
Future(#[pin] F),
Error(Option<E>),
Error(Option<crate::Error>),
}

impl<F, E> ResponseFuture<F, E> {
impl<F> ResponseFuture<F> {
pub(crate) fn new(inner: F) -> Self {
ResponseFuture {
inner: Inner::Future(inner),
}
}

pub(crate) fn error(error: E) -> Self {
pub(crate) fn error(error: crate::Error) -> Self {
ResponseFuture {
inner: Inner::Error(Some(error)),
}
}
}

impl<F, T, E, ME> Future for ResponseFuture<F, ME>
impl<F, T, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<T, E>>,
E: Into<Error>,
ME: Into<Error>,
{
type Output = Result<T, Error>;

Expand All @@ -203,7 +214,7 @@ where
match me.inner.project() {
InnerProj::Future(fut) => fut.poll(cx).map_err(Into::into),
InnerProj::Error(e) => {
let e = e.take().expect("Polled after ready.").into();
let e = e.take().expect("Polled after ready.");
Poll::Ready(Err(e))
}
}
Expand Down