Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloud spanner client does not seem to be retrying broken streams #3775

Closed
vkedia opened this issue Aug 9, 2017 · 39 comments
Closed

Cloud spanner client does not seem to be retrying broken streams #3775

vkedia opened this issue Aug 9, 2017 · 39 comments
Assignees
Labels
api: spanner Issues related to the Spanner API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. release blocking Required feature/issue must be fixed prior to next release.

Comments

@vkedia
Copy link

vkedia commented Aug 9, 2017

Documentation for snapshot transactions mentions:

If streaming a chunk fails due to a “resumable” error, Session.read() retries the StreamingRead API reqeust, passing the resume_token from the last partial result streamed.

This is the correct thing to do but the client actually does not seem to be doing this. I looked around a little bit in the code and I could find anyplace where it is retrying the read with the resume token.
This is the place where the stream would throw an error if it broke:
https://github.com/GoogleCloudPlatform/google-cloud-python/blob/9d9b6c0708ae6553458ad6107e5ad8efb23762e8/spanner/google/cloud/spanner/streamed.py#L132

I dont see any exception handling happening here and it looks like that the error would just be propagated back to the user.

@vkedia
Copy link
Author

vkedia commented Aug 9, 2017

@lukesneeringer lukesneeringer added api: spanner Issues related to the Spanner API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. labels Aug 9, 2017
@lukesneeringer
Copy link
Contributor

Our retry logic is in GAX, but leaving this open for @tseaver to confirm that it is applicable here.

@vkedia
Copy link
Author

vkedia commented Aug 9, 2017

This needs to be done at a higher level than GAX, since we send resume tokens along with the responses. So if a stream breaks, instead of retrying the whole stream, the library should just pass the resume token back in the request which will cause the stream to be resumed from that point onwards. It is further complicated since cloud spanner might not send resume tokens with all responses so the client library needs to buffer responses till it sees one with resume token and only then yield them to the user. This will allow it to safely resume in case the stream breaks.
This is the Java implementation of the same logic:
https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java#L2166

What does GAX do for streaming calls? Does it retry the whole stream if it fails? What if the stream had yielded all but the last 1 response before failing? In that case retrying the whole stream is very wasteful. If GAX does do retries for streaming calls, we need to disable them.

@theacodes
Copy link
Contributor

What does GAX do for streaming calls? Does it retry the whole stream if it fails? What if the stream had yielded all but the last 1 response before failing? In that case retrying the whole stream is very wasteful. If GAX does do retries for streaming calls, we need to disable them.

gax is pretty naive in what it does for anything other than simple RPCs. I'm writing a doc internally on this now. :)

@bjwatson
Copy link

bjwatson commented Aug 9, 2017

@lukesneeringer @vkedia Does this need to be Beta blocking?

@vkedia
Copy link
Author

vkedia commented Aug 10, 2017

Till this is resolved can we fix the documentation to indicate that users would need to resume the stream themselves by passing back the resume token if available?

@tseaver
Copy link
Contributor

tseaver commented Aug 10, 2017

Snapshot.read and Snapshot.execute_sql return an instance of StreamedResultSet. That instance does not contain the information needed to reconstruct the original call inside its consume_next, which means that we cannot make handling such errors seamless to the user out-of-the box.

We could attempt to add enough information to the StreamedResultSet (a curried partial, maybe?) to allow such retries. E.g., from read():

    retry = functools.partial(self.read, table, columns, keyset, index, limit)
    return StreamedResultSet(iterator, retry=retry)

or from execute_sql:

    retry = functools.partial(self.execute_sql, params, param_types, query_mode)
    return StreamedResultSet(iterator, retry=retry)

The only other way I can see to hide such errors from the user would be to add callback-based methods which took the arguments to be passed to read / execute_sql, executed them, and then iterated the result set, invoking the callback for each row successfully pulled from the stream.

E.g.:

    def handle_row(row):
        ...

    snapshot.read_cb(table, keyset, ..., callback=handle_row)

    snapshot.execute_sql(sql, ..., callback=handle_row)

@vkedia
Copy link
Author

vkedia commented Aug 10, 2017

I don't understand how the callback solution fixes this. Can you please elaborate? How would you retry in that case?
Your option 1 is similar to what we do in Java as well. Note that when you retry you will have to pass back the resume_token, so the retry function could possible take a resume token as an argument.

Also note this is more complex because we do not guarantee that all PartialResultSet would have a resume token. So if you yield a Row to the user but the PRS that contained that Row did not have a resume_token, you cannot retry safely. So what you will need to do is store PRS's in a buffer till you see one with resume_token. Only then yield them to the user. So essentially you guarantee that every time you yield something to the user it is at a resumable boundary.

@theacodes
Copy link
Contributor

The curried partial is the pattern we'll use in google.api.core. Instead of passing the curried partial to StreamedResultSet, can you construct an intermediary iterator wrapper for the request?

class _RetryingStreamIterator(object):
    def __init__(self, iter, request):
        self._iter = iter
        self._request = request
        self._resume_token = None

    def __iter__(self):
        return self

    def __next__(self):
        try:
            next = six.next(self._iter)
            self._resume_token = next.resumeToken
        except grpc.RpcError as exc:
            if (exc.status_code == grpc.StatusCode.RESUMABLE
                    and self.resumeToken is not None):
                self._iter = self._request(resume_token=self._resume_token)
                return six.next(self._iter)
            else:
                raise

...


# read/execute_sql:

request = functools.partial(api.streaming_read,
    self._session.name, table, columns, keyset.to_pb(),
    transaction=transaction, index=index, limit=limit,
    resume_token=resume_token, options=options)

iterator = _RetryingStreamIterator(request(), request)

if self._multi_use:
    return StreamedResultSet(iterator, source=self)
else:
    return StreamedResultSet(iterator)

@bjwatson bjwatson added the release blocking Required feature/issue must be fixed prior to next release. label Aug 11, 2017
@bjwatson
Copy link

I applied the release blocking label temporarily, to make sure we reach some kind of resolution on this before Beta. Either there's a quick and easy fix, or we update the docs so that we're not misleading users. In the latter case, we'll want to fix this shortly after declaring Beta.

@tseaver tseaver removed the release blocking Required feature/issue must be fixed prior to next release. label Aug 11, 2017
@tseaver
Copy link
Contributor

tseaver commented Aug 11, 2017

@bjwatson I've removed the "release blocking" label becuase #3790 documents the current, application-level retry semantics.

@bjwatson
Copy link

Great, thanks @tseaver!

@vkedia
Copy link
Author

vkedia commented Aug 11, 2017

I left a comment on that PR. The documentation is not completely correct and as I mention their it is really tricky to tell the user what they need to do in case of a broken stream (which is why we handle that in the library). So I think it is better to fix this.

@vkedia
Copy link
Author

vkedia commented Aug 14, 2017

I thought about this a bit more and it is really hard for users to build their own retry logic around the current API and will be error prone. It is better to just fix it in the client code.
@bjwatson Can you add back the release blocking label to this?

@dhermes
Copy link
Contributor

dhermes commented Aug 14, 2017

/cc @jonparrott Who has been working on the retry design

@lukesneeringer
Copy link
Contributor

If it is release blocking, the minimum that this will push us back is two weeks, since @tseaver will be mostly out.

@lukesneeringer lukesneeringer added the release blocking Required feature/issue must be fixed prior to next release. label Aug 14, 2017
@vkedia
Copy link
Author

vkedia commented Aug 14, 2017

I see. I also realized now that this might be a breaking change as well. That is because once we implement retries in the client, there is no reason for us to expose the concept of resume_token to the users. It will just be confusing. So we should remove the resume_token method from StreamedIterator.
We should also remove resume_token as an argument to read/query.

@theacodes
Copy link
Contributor

With #3819 in we have the core retry functionality needed for this. I do not plan on adding a general-purpose iterator retry decorator to api.core at the moment.

landrito pushed a commit to landrito/google-cloud-python that referenced this issue Aug 21, 2017
landrito pushed a commit to landrito/google-cloud-python that referenced this issue Aug 22, 2017
landrito pushed a commit to landrito/google-cloud-python that referenced this issue Aug 22, 2017
@tseaver
Copy link
Contributor

tseaver commented Aug 28, 2017

Looking at using the retry features from #3819 to implement this. @vkedia, is there a specification for the set of "retryable" exceptions which would be raised during streaming iteration?

@tseaver
Copy link
Contributor

tseaver commented Aug 28, 2017

Also, is there a spanner-specific deadline after which retries should abort?

@tseaver
Copy link
Contributor

tseaver commented Aug 28, 2017

@jonparrott Are we doing anything about "sensible defaults" for the initial and maximum arguments to google.api.core.retry.exponential_sleep_generator?

@theacodes
Copy link
Contributor

@tseaver the defaults to the high-level Retry are sensible. The expectation for gapic clients is that they'll populate those values based on the gapic client config json file. I haven't yet sent a PR for that code, but here's a gist from my prototype.

@lukesneeringer can help with the gapic config part.

@theacodes
Copy link
Contributor

One possibly valid approach in the near term is to use Retry's defaults until I'm done replacing gax, in which case using the gapic config should be trivial. WDYT, @lukesneeringer?

@tseaver
Copy link
Contributor

tseaver commented Aug 28, 2017

@jonparrott I missed seeing the Retry class, which looks to have been added after #3819 (update, added in #3835).

Also, I'm unsure how GAPIC config fits in with retries for the streaming-result-set iteration, where code in this library is responsible for tracking / passing back the resume_token to "restart" the result set after a transient failure.

@theacodes
Copy link
Contributor

Also, I'm unsure how GAPIC config fits in with retries for the streaming-result-set iteration, where code in this library is responsible for tracking / passing back the resume_token to "restart" the result set after a transient failure.

Yeah in general I wouldn't sweat that for your purposes here- sorry to mislead you on that (it's monday..)

Basically, two levels of retries will be involved here: the "iterator" retry and the "method" retry. e.g.: calling next(iter) will catch errors and and retry calling the method to re-establish the stream. The method itself is wrapped with its own retry.

It's turtles all the way down.

I don't think you need to work about the gapic config for the iterator retry- you can just use the default retry or customize as you feel is appropriate.

@tseaver
Copy link
Contributor

tseaver commented Aug 28, 2017

So I need to know what are the"transient" errors which may be propagated from the "iterator" retry, and which the "method" (StreamedResultSet.consume_next) will need to trap in order to restart the API, passing the previously-captured resume_token. I'm also unclear whether there is supposed to be any backoff / jitter assocated with the "restart" (vs. invisible-to-me retries of the "iterator" API calls).

@tseaver
Copy link
Contributor

tseaver commented Aug 28, 2017

Per @blowmage: the Ruby implementation appears to only do the "restart" for UNAVAILABLE.

@vkedia
Copy link
Author

vkedia commented Aug 28, 2017 via email

@tseaver
Copy link
Contributor

tseaver commented Aug 28, 2017

@jonparrott the "stock" bits in google.api.core.retry don't really fit this case: I need to inject a "fixup" when the appropriate exception (UNAVAILABLE) is raised, before retrying the stream iteration. Something like:

    def consume_next(self):
        """Consume the next partial result set from the stream.

        Parse the result set into new/existing rows in :attr:`_rows`
        """
        sleep_generator = retry.exponential_sleep_generator(
            initial=retry._DEFAULT_INITIAL_DELAY,
            maximum=retry._DEFAULT_MAXIMUM_DELAY)

        for sleep in sleep_generator:
            try:
                response = six.next(self._response_iterator)
            except exceptions.ServiceUnavailable:

                if self._resume_token in (None, b''):
                    raise

                self._response_iterator = self._retry(
                    resume_token=self._resume_token)
            else:
                break

@theacodes
Copy link
Contributor

@tseaver that seems pretty reasonable. (it's one of the reasons I made exponential_sleep_generator public)

You should set a deadline of some sort, though.

@vkedia
Copy link
Author

vkedia commented Aug 28, 2017 via email

@tseaver
Copy link
Contributor

tseaver commented Aug 28, 2017

And of course I have to add my own deadline handling.

BTW, why does google.api.core.retry.retry_target use the more convoluted datetime stuff to compute the deadline, rather than just time.time plus a constant number of seconds?

@theacodes
Copy link
Contributor

theacodes commented Aug 28, 2017

BTW, why does google.api.core.retry.retry_target use the more convoluted datetime stuff to compute the deadline, rather than just time.time plus a constant number of seconds?

No particular reason other than being functionally equivalent and having on single source of truth for time in tests (datetime_helpers.utcnow()).

And of course I have to add my own deadline handling.

How so?

@tseaver
Copy link
Contributor

tseaver commented Aug 28, 2017

@jonparrott:

And of course I have to add my own deadline handling.

How so?

As you pointed out above, I'm doing the low level work myself.

ISTM that the retry_target (and therefore Retry) could grow some feature to signal a "fixup" to happen between the retries, to accommodate this kind of stateful behavior.

@theacodes
Copy link
Contributor

I'm still not sure what you mean? Just pass the user's deadline into retry_target?

@tseaver
Copy link
Contributor

tseaver commented Aug 28, 2017

I can't use retry_target: it doesn't let me issue the "restart" request and capture the new iterator. And so I need to do my own deadline handling.

@theacodes
Copy link
Contributor

Thanks for the PR - I think I see now, so it seems like if you did this:

retry_target(functools.partial(six.next, self._response_iterator))

It wouldn't let you catch the error and replace self._response_iterator.

I could imagine a hack using the predicate and some getattr magic, but that sounds gross to me.

Is there a minimal change you could make to retry_target to make it work for you- perhaps an optional on_exception callback? If so, I'm happy to review and merge that.

@theacodes
Copy link
Contributor

Also - I've thought about a retry_iterator function - maybe it makes more sense to just go ahead and add that to core?

tseaver added a commit that referenced this issue Sep 21, 2017
* Add '_restart_on_unavailable' iterator wrapper.

  Tracks the 'resume_token', and issues restart after a 503.

* Strip knowledge of 'resume_token' from 'StreamedResultSet'.

* Remove 'resume_token' args from 'Snapshot' and 'Session' API surface:

  Retry handling will be done behind the scenes.

* Use '_restart_on_unavailable' wrapper in 'SRS.{read,execute_sql}.

Closes #3775.
@bjwatson
Copy link

Woo hoo! Thanks @tseaver!

This was referenced Sep 22, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: spanner Issues related to the Spanner API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. release blocking Required feature/issue must be fixed prior to next release.
Projects
None yet
Development

No branches or pull requests

6 participants