Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: return typefuture is not concurrent.futures.Future #6201

Closed
anguillanneuf opened this issue Oct 12, 2018 · 8 comments
Closed

Pub/Sub: return typefuture is not concurrent.futures.Future #6201

anguillanneuf opened this issue Oct 12, 2018 · 8 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. type: question Request for information or clarification. Not an issue.

Comments

@anguillanneuf
Copy link
Contributor

anguillanneuf commented Oct 12, 2018

~concurrent.futures.Future: An object conforming to the

Contrary to what's in the reference, the correct return type for publish() should be google.cloud.pubsub_v1.publisher.futures.Future. I didn't find a reference for it but only some reference for google.cloud.pubsub_v1.subscriber.futures.StreamingPullFuture.

People won't be able to use of Python's concurrent library's wait() method on google.cloud.pubsub_v1.publisher.futures.Future. But our doc implies they can because we say the return type is concurrent.futures.Future.

from concurrent.futures import wait
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
# future has type `google.cloud.pubsub_v1.publisher.futures.Future`
future = publisher.publish('projects/{PROJECT_ID}/topics/{TOPIC_NAME}', data=b'rain')

# wait(fs, timeout=None, return_when='ALL_COMPLETED') expects a sequence of `concurrent.futures.Future`.  
wait([future])

Here is the error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 257, in wait
    with _AcquireFutures(fs):
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 146, in __enter__
    future._condition.acquire()
AttributeError: 'Future' object has no attribute '_condition'

I tried in both Python 2 and 3.

@tseaver tseaver added type: question Request for information or clarification. Not an issue. api: pubsub Issues related to the Pub/Sub API. labels Oct 12, 2018
@tseaver
Copy link
Contributor

tseaver commented Oct 12, 2018

@theacodes can you comment?

@theacodes
Copy link
Contributor

Yeah we should update it. It should additionally say that it conforms to the concurrent.futures.Future interface and can be used in similar ways, but it not completely compatible with concurrent.futures tools such as wait().

@anguillanneuf
Copy link
Contributor Author

anguillanneuf commented Oct 12, 2018

@himikof
Copy link

himikof commented Nov 21, 2018

I have actually got another case where the returned future not being a concurrent.futures.Future hurts: asyncio in the standard library has some utilities for using blocking futures together with asyncio async futures, like asyncio.wrap_future, which check on isinstance(f, concurrent.futures.Future) to distinguish the future types.
I currently have to resort to monkeypatching the base classes list of google.cloud.pubsub_v1.publisher.futures.Future to make it pass the checks, and the method itself works fine. If actually using/inheriting from concurrent.futures.Future is not possible, can at least some utilities for working together with asyncio be provided together with the custom future implementation?

@theacodes
Copy link
Contributor

@himikof we can't inherit from concurent.futures.Future because it brings in a ton of stuff. I'm happy for us to add utilities to make working across this stuff possible. It would be relatively low priority for us right now, but we would more than welcome contributions to get it done sooner.

Thanks!

@sam-writer
Copy link

If you end up here trying to figure out how to make a PubSub future act like a concurrent Future, hopefully this code snippet will help

if __name__ == "__main__":
    asyncio.set_event_loop(uvloop.new_event_loop())
    s1 = app.create_server(host="0.0.0.0", port=8090)
    task = asyncio.ensure_future(s1)

    print('creating subscription to pubsub')
    pubsub_future = subscriber.subscribe(subscription_path, callback=callback)
    # this is not really a future
    # so monkey patch
    pubsub_future._asyncio_future_blocking = True
    pubsub_future.__class__._asyncio_future_blocking = True
    s2 = asyncio.wrap_future(pubsub_future)

    task2 = asyncio.ensure_future(s2)
    try:
        loop.run_forever()
    except:
        loop.stop()

@ChameleonTartu
Copy link

If you end up here trying to figure out how to make a PubSub future act like a concurrent Future, hopefully this code snippet will help

if __name__ == "__main__":
    asyncio.set_event_loop(uvloop.new_event_loop())
    s1 = app.create_server(host="0.0.0.0", port=8090)
    task = asyncio.ensure_future(s1)

    print('creating subscription to pubsub')
    pubsub_future = subscriber.subscribe(subscription_path, callback=callback)
    # this is not really a future
    # so monkey patch
    pubsub_future._asyncio_future_blocking = True
    pubsub_future.__class__._asyncio_future_blocking = True
    s2 = asyncio.wrap_future(pubsub_future)

    task2 = asyncio.ensure_future(s2)
    try:
        loop.run_forever()
    except:
        loop.stop()

@sam-qordoba What is a loop in your example?

@sam-writer
Copy link

Ahh sorry, that was some code I had for using asyncio to run a Sanic web server and pubsub listener in the same process. The main part is

        pubsub_future = subscriber.subscribe(
            subscription_path,
            callback=callback)

        # Google implemented a custom, psuedo-future
        # need monkey patch for it to work with asyncio
        pubsub_future._asyncio_future_blocking = True
        pubsub_future.__class__._asyncio_future_blocking = True
        real_pubsub_future = asyncio.wrap_future(pubsub_future)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

6 participants