Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TypeError: exceptions must derive from BaseException #163

Closed
mjhuber opened this issue Jul 21, 2020 · 1 comment · Fixed by #170
Closed

TypeError: exceptions must derive from BaseException #163

mjhuber opened this issue Jul 21, 2020 · 1 comment · Fixed by #170
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@mjhuber
Copy link

mjhuber commented Jul 21, 2020

using the subscriber client api a TypeError will often get thrown from pubsub_v1/futures.py. Consider the following:

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(CONFIG.GCP_PROJECT, CONFIG.SUBSCRIPTION_NAME)

flow_control = pubsub_v1.types.FlowControl(max_bytes=100 * 1024 * 1024, max_messages=1, max_lease_duration=60 * 10)

future = subscriber.subscribe(
    subscription_path,
    callback=self._tornado_sync_wrapper,
    flow_control=flow_control,
    scheduler=self._make_single_threaded_scheduler(),
)

try:
    future.result()
except Exception as ex:
    logger.error("Exception while trying to run queue listener", exc_info=True)
    future.cancel()

this bit of code in pubsub_v1/futures.py can return a TypeError exceptions must derive from BaseException:

    def result(self, timeout=None):
        """Resolve the future and return a value where appropriate.

        Args:
            timeout (Union[int, float]): The number of seconds before this call
                times out and raises TimeoutError.

        Raises:
            concurrent.futures.TimeoutError: If the request times out.
            Exception: For undefined exceptions in the underlying
                call execution.
        """
        # Attempt to get the exception if there is one.
        # If there is not one, then we know everything worked, and we can
        # return an appropriate value.
        err = self.exception(timeout=timeout)
        if err is None:
            return self._result
        raise err

    def exception(self, timeout=None):
        """Return the exception raised by the call, if any.

        Args:
            timeout (Union[int, float]): The number of seconds before this call
                times out and raises TimeoutError.

        Raises:
            concurrent.futures.TimeoutError: If the request times out.

        Returns:
            Exception: The exception raised by the call, if any.
        """
        # Wait until the future is done.
        if not self._completed.wait(timeout=timeout):
            raise exceptions.TimeoutError("Timed out waiting for result.")

        # If the batch completed successfully, this should return None.
        if self._result != self._SENTINEL:
            return None

        # Okay, this batch had an error; this should return it.
        return self._exception

The type returned from self.exception can be of type <opencensus.ext.grpc.utils.WrappedResponseIterator object> which is not of type Exception.

TypeError: exceptions must derive from BaseException
  File "triggers/trigger_pushd.py", line 186, in run_listener
    future.result()
  File "/usr/local/lib/python3.7/dist-packages/google/cloud/pubsub_v1/futures.py", line 107, in result
    raise err

Environment details

  • OS type and version: Ubuntu 18.04
  • Python version: Python 3.7
  • pip version: 9.0.1
  • google-cloud-pubsub version: 1.7.0
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Jul 21, 2020
@plamut plamut added the type: question Request for information or clarification. Not an issue. label Jul 21, 2020
@plamut plamut self-assigned this Jul 21, 2020
@plamut
Copy link
Contributor

plamut commented Jul 22, 2020

@mjhuber Since the code sample is not self-contained, I could not run it, and modifying it by providing e.g. a generic callback did not result in a successful reproduction, unfortunately. I had to revert to tracing through the code and tried to guess the reason.

The result() method of the StreamingPullFuture instance (returned by subscriber.subscribe()) either returns the result, or raises an exception that is set on it by invoking future.set_exception(exception). If the type of exception argument is incorrect, a TypeError error occurs.

Now, an exception on the future can be set in a callback the future registers on the streaming pull manager, and is invoked when the manager is shut down:

def __init__(self, manager):
super(StreamingPullFuture, self).__init__()
self._manager = manager
self._manager.add_close_callback(self._on_close_callback)
self._cancelled = False
def _on_close_callback(self, manager, result):
if self.done():
# The future has already been resolved in a different thread,
# nothing to do on the streaming pull manager shutdown.
return
if result is None:
self.set_result(True)
else:
self.set_exception(result)

This shutdown can be triggered, for instance, when the underlying RPC terminates without recovery:

def _on_rpc_done(self, future):
"""Triggered whenever the underlying RPC terminates without recovery.
This is typically triggered from one of two threads: the background
consumer thread (when calling ``recv()`` produces a non-recoverable
error) or the grpc management thread (when cancelling the RPC).
This method is *non-blocking*. It will start another thread to deal
with shutting everything down. This is to prevent blocking in the
background consumer and preventing it from being ``joined()``.
"""
_LOGGER.info("RPC termination has signaled streaming pull manager shutdown.")
future = _maybe_wrap_exception(future)
thread = threading.Thread(
name=_RPC_ERROR_THREAD_NAME, target=self.close, kwargs={"reason": future}
)
thread.daemon = True
thread.start()

The reason for the shutdown is passed as an argument to the abovementioned future's on close callback and should be an exception instance. The following should convert the RPC's future to an exception, but that does not happen it seems:

future = _maybe_wrap_exception(future)

The helper function can convert grpc.RpcError instances, but it seems that opencensus wraps RPC futures in its own class opencensus.ext.grpc.utils.WrappedResponseIterator, which is why the future is not converted to an exception.

As I said, I could not reproduce the error, but could you do a quick debug session (or add additional logging) and confirm this theory? Does the error occur in the thread Thread-OnRpcTerminated right at the end of the StreamingPullManager.close() method?

If proved correct, the fix should be straightforward - the _maybe_wrap_exception() function should be modified to always convert the given future to an exception of some kind.

Thanks in advance!

@plamut plamut added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. and removed type: question Request for information or clarification. Not an issue. labels Jul 22, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants