Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setting "return immediately" flag #1157

Closed
omaray opened this issue Aug 12, 2016 · 19 comments
Closed

Setting "return immediately" flag #1157

omaray opened this issue Aug 12, 2016 · 19 comments
Assignees
Labels
api: core api: pubsub Issues related to the Pub/Sub API.

Comments

@omaray
Copy link

omaray commented Aug 12, 2016

Here's an interesting feedback from a customer:

http://stackoverflow.com/questions/38628150/gcloud-java-pubsub-api-how-to-set-return-immediately-flag

@mziccard @garrettjonesgoogle

Marco, what are your thoughts? And can you elaborate in this issue where this shows up in the code? It seems like this might cause some unnecessary polling.

@omaray omaray changed the title Setting return immediately flag Setting "return immediately" flag Aug 12, 2016
@mziccard
Copy link
Contributor

Hi Omar!

The answer to that question is not entirely true. When MessageConsumer is used (see MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options); returnImmediately is set to false (see here). MessageConsumer in fact does the continued pulling on behalf of the user, and handles possible timeout-related exceptions that might occur when using returnImmediately=false.

For what concerns other pull methods:

  • Iterator<ReceivedMessage> pull(String subscription, int maxMessages)
  • Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages)

The use of returnImmediately was discussed here. A returnImmediately option could be added to both methods, I am just not sure how much this is needed or might lead to unexpected behavior (e.g. with returnImmediately=false the synchronous method might hang until timeout). /cc @aozarov

@mziccard mziccard added the api: pubsub Issues related to the Pub/Sub API. label Aug 12, 2016
@aozarov
Copy link
Contributor

aozarov commented Aug 12, 2016

I think the problem with returnImmediately=false, which could potentially block forever if no new messages are sent, is that the notion of "The client may cancel the request if it does not wish to wait any longer for the response." (as described here) is vague.

How do we do that? Seems like a challenge for synchronous calls.

Could probably be added for asynchronous calls and use the Future#cancel, hoping that the propagated cancel to the generated spi layer would do the right thing and cancel the underlying RPC call. This was left out for simplicity and and consistency with the synchronous call. It should be fairly simple to create an Iterator from MessageProcessor if desired (and maybe we can provide a utility for it) however I don't strongly object breaking the symmetry with the synchronous call in this case.

@evmin
Copy link

evmin commented Nov 5, 2016

The original stackoverflow question was from me.

I would like to confirm that with our use cases, where we are replacing incumbent tech, we do need to be able to setReturnImmediately to false even for the synchronous pull. So I am very much interested in having this feature implemented. Once done I will be able to plan moving away from the existing REST API.

@mziccard
Copy link
Contributor

mziccard commented Nov 5, 2016

Hi @evmin! We are progressing on this, in fact I hope we will be able to release something by the end of next week.

we do need to be able to setReturnImmediately to false even for the synchronous pull.

We are considering making returnImmediately=false the default behavior of our:

Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages)

that way you can obtain the sync behavior you are seeking by doing:

Future<Iterator<ReceivedMessage>> future = pubsub.pullAsync("mysubscripton", 1000);
Iterator<ReceivedMessage> messageIterator = future.get():

How does this sound? The reason we don't want to add the returnImmediately option to the synchronous version pull(String subscription, int maxMessages) is that this request can hang indefinitely with no way for the user to terminate it. On the other hand, with the Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages) method you can still terminate the request by calling cance() on the future.

We would love to hear your thoughts!

@evmin
Copy link

evmin commented Nov 6, 2016

From my perspective there are four considerations here.

  1. Flexibility. With not providing an option to configure returnImmediately for synchronous pull the decision is effectively made for the developer, for the future needs. I would err here on the side of flexibility. I would make it available and let the devs decide down the line what fits their model. Make it a non default option, make it timeout, make timeout configurable, but provide the option.
  2. The case I am working on accepts the fact that a thread might never return. It is a framework where polling is part of built in mechanism. The idea is that if it never returns, than there is no message. Valid use case as executor pools are dedicated.
  3. Current implementation does have a synchronous pull. Regardless of the development of this library it looks like the time pressure will make us to go with the currently available Services version. Retrofitting synchronous pull to async is probably not a good idea if my understanding that pullAsync creates yet another thread on execution is correct. Is this a case? Would the thread be created on each poll? If this is the case, creating a thread from within yet another dedicated executor pool seems to be counterproductive, and the framework we are using expects to manage the threads itself.
  4. Python version does provide synchronous Pull. Would be a good idea to maintain parity and consistency, so cross platform implementations (such as ours) would be coding to the same standard.

Having said all this, I think that future.get() is still a good option to include.

This is my thinking at the moment. Do I miss something here that invalidates the train of thought?

@mziccard
Copy link
Contributor

mziccard commented Nov 7, 2016

@evmin Let me answer, but in a different order :)

Retrofitting synchronous pull to async is probably not a good idea if my understanding that pullAsync creates yet another thread on execution is correct. Is this a case?

No this is not the case. gRPC is inherently async, we have a channel associated to an executor which runs the async IO. Roughly speaking, gRPC responses are received via async callbacks, our async methods just convert results and do not start any thread other than the ones that are natively used by gRPC. This is important, our sync methods are just syntactic sugar on top of the async:

public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
  try {
    return Uninterruptibles.getUninterruptibly(pullAsync(subscription, maxMessages));
  } catch (ExecutionException ex) {
    throw Throwables.propagate(ex.getCause());
  }
}

The case I am working on accepts the fact that a thread might never return. It is a framework where polling is part of built in mechanism. The idea is that if it never returns, than there is no message. Valid use case as executor pools are dedicated.

I know very little of your architecture but I believe that polling via user threads + sync methods is not the best option to reach maximum throughput when using the gRPC transport. I would consider instantiating multiple PubSub service objects (each one uses a separate channel) and giving them enough "executing power" (depending on your application needs) via PubSubOptions.Builder.setExecutorFactory() (by default we use an 8 thread executor, shared across all channels/service instances). Knowing very little, these are just random thoughts, thought.

Flexibility. With not providing an option to configure returnImmediately for synchronous pull the decision is effectively made for the developer, for the future needs.

I see your point on flexibility. However, if pullAsync uses returnImmediately=false by default, you can achieve your desired sync method with pullAsync("mysubscription", 1000).get() (at not extra cost, as I explained above).

Python version does provide synchronous Pull. Would be a good idea to maintain parity and consistency, so cross platform implementations (such as ours) would be coding to the same standard.

AFAIK Python does provide only synchronous pull at the moment (please correct me if I am wrong). So they have to provide all the available options to the sync method.

@aozarov In this comment we have some more user thoughts in favor of adding returnImmediately option to all pull methods. I would like to hear your thoughts on this!

@evmin
Copy link

evmin commented Nov 7, 2016

That's a very, long, detailed and comprehensive description - I am truly appreciative of your effort. Thank you!

If no extra threads are created, then .get() is indeed workable solution. Great.

Though I am a bit confused here.

If I check https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java#L159

I see
private final ExecutorService executor = Executors.newSingleThreadExecutor();

If we check newSingleThreadExecutor, I can see
public static ExecutorService More ...newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

And that's where it is probably too deep for me - I was not able to find confirmation if the Single Thread Executor does create a new thread or not. It looks like it does, doesn't it?

Apologies for the potentially ignorant question, I might be missing something, just being paranoid.

P.S. You are correct on the performance compromise. It is the penalty for adapting one framework for another - the benefits are the speed of implementation.

@mziccard
Copy link
Contributor

mziccard commented Nov 7, 2016

Apologies for the potentially ignorant question, I might be missing something, just being paranoid.

It's no problem really, there's a value in going through this and we are always happy to receive feedback and questions from people actually using (or planning to use) the library.

We have 3 different methods that do pull in different ways:

  1. Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages)
    • async pull method, sends just one pull request to subscription and returns up to maxMessages (this is the method we are planning to make always use returnImmediately=false).
    • Message deadlines are renewed automatically by our library until you consume the message with iterator.next(), after that you are on your own :) and you must call message.modifyAckDeadline(...) if you need to postpone message deadline.
    • You must ack and nack messages yourself, with message.ack() or message.nack().
  2. Iterator<ReceivedMessage> pull(String subscription, int maxMessages);
    • sync pull method, sends just one pull request to subscription and returns up to maxMessages (in our idea this method should always use returnImmediately=true).
    • Message deadlines are renewed automatically by our library until you consume the message with iterator.next(), after that you are on your own :) and you must call message.modifyAckDeadline(...) if you need to postpone message deadline.
    • You must ack and nack messages yourself, with message.ack() or message.nack().
  3. MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options)
    • Implements continuous pulling on your behalf. For each received message the callback is executed.
    • By default all callback are executed on the same, single threaded, executor. If you need more "executing power" you can pass the PullOption.executorFactory(ExecutorFactory) option to the method
    • To control the maximum number of "pending" message you application can afford (i.e. messages pulled but whose callback has not yet terminated) you can pass the PullOption.maxQueuedCallbacks(int) to the method - by default we set this value to 100
    • Message deadlines are automatically renewed by our library until the callback is executed on the message (either failing or succeeding)
    • Messages are automatically "acked" if callback terminates successfully, "nacked" if the callback throws an exception
    • You can stop pulling by callingclose() on the MessageConsumer

Long story short, the single threaded executor you see in https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java#L159 is just used when you use method 3) which does continuous pulling for you - and you could even avoid this by passing your own ExecutorFactory.
In your scenario you would use method 1) and then call get() on the returned future, so no extra threads for you :)

@evmin
Copy link

evmin commented Nov 7, 2016

Thank you. Indeed, option no.1 with get() would serve the purpose. Also, your explanation of option no 3 highlights a few opportunities. I have tried it, but there were a couple of exceptions now and then that I could not decipher, but this is a topic for another thread, possibly.

I will copy the explanation and stick it on our internal wiki - it is very good intro. Would be great if the text written above could have made its way into some official documentation - my opinion is that it would benefits both the devs starting on the lib, and the penetration of the tech in general.

Thank you again.

@garrettjonesgoogle
Copy link
Member

Remaining to do: add the extended explanation by mziccard@ into the docs somewhere.

@garrettjonesgoogle
Copy link
Member

@pongad : could you triage this w.r.t. the Pub/Sub high-performance rewrite? Basically, determine if there is any use case in this issue not addressed in the rewrite, or if the noted use cases need to be discussed in documentation in the rewrite.

@pongad
Copy link
Contributor

pongad commented Feb 3, 2017

@garrettjonesgoogle From what I understand, @evmin still needs the synchronous pull method. We have made a decision to deprecate, but not remove, the previous hand-written client. All users who use these clients can continue doing so, so I don't think there's immediate cause of worry.

That said, I think we should revisit our decision to make package-private the SubscriberClient::pull method. Even though the new Subscriber is the way "things should be", I think there's value in keeping a non-deprecated synchronous version to help our users migrate from old code bases that assumes synchronous calls.

@evmin
Copy link

evmin commented Feb 3, 2017

+1 for the synchronous call. Please check Apache Camel - it does manage the consumer threads at the framework level, which is critical to leave exposed to developers in certain scenarios.

@garrettjonesgoogle
Copy link
Member

@pongad, I think the understanding in this thread was that @evmin could achieve synchronous functionality through calling get() on a future returned from an async call.

@evmin , I'm curious why you are bringing up the fact that Apache Camel manages consumer threads; there still remains the fact that gRPC uses a thread pool of its own, thus anything that seems synchronous still does execute on a separate gRPC thread.

@evmin what is the motivation for direct access to SubscriberClient::pull from the system design perspective? Doing such a call on your own requires some logic to make it work right, e.g. extending deadlines when the work is taking longer, and it also consumes a user thread. I could see the argument that the new Subscriber class that we are writing doesn't fit a user's needs and they need to write their own pulling logic. Although in that case, I would really like to know what use cases are missing. If the only remaining use case is migrating current direct callers of pull, then maybe letting them sit on the deprecated code for a couple months is sufficient.

Let me know if I'm missing something!

@evmin
Copy link

evmin commented Feb 3, 2017

@garrettjonesgoogle

TL;DR

Current GA library does have a sync pull. Feature consistency across libraries is paramount for stability of the solutions on top of them. Sync pull has been confirmed earlier and was one of the key factors for us to move ahead.

Longer version.

A bit of a background. I am a customer advocate for the GCP tech. I am currently implementing three major projects where the core component is Google PubSub in a traditional, non tech organisation. Exactly the market Google aims to capture on the large scale, if I am not mistaken. My aim is to ensure that the projects are a success.

As with any non technical company, the skill level of the devs varies. And one of the risk mitigation strategies would be to use the abstraction frameworks, where the complex integration patterns reduced to a mere statement. Camel would be an example.

That comes at the price of having the framework stable and resilient - it should be able to handle the abuse of the developers and the end user requirements. In my career I have seen a fair bit of bending the tech - ETL solutions implemented on a service bus and databases used as the HTTP proxy. I know, I know. But if that's what customer is willing to pay for - technology should be able to handle.

The non tech companies have little capability to redevelop the existing solutions - it is considered a risk and they are generally are very risk averse.

@evmin , I'm curious why you are bringing up the fact that Apache Camel manages consumer threads; there still remains the fact that gRPC uses a thread pool of its own, thus anything that seems synchronous still does execute on a separate gRPC thread.

Well, we build on Camel to hide the complexity and I do not completely understand all the implications of how the new model would work for the solutions we have built on top of it. We do use Camel's capability to manage threads heavily to scale up and down to align with the downstream processing. For example we use a single thread to pull just a single message synchronously to test if the end system is back on line and can accept the calls (proprietory, non HTTP). If it does - we scale the number of threads to pump the data. If not - scale down to single thread with back off intervals.

That's why migration to the new, self managed gRPC based library remains a risk factor from my perspective - and having the sync pull allows me to avoids retrofitting of the existing logic on top of Camel / GA library combo. When the time comes I essentially would like to be able to swap the old library for the new with the minimum fuss. So I do need the the synchronous pull and the ability define the number of threads.

Also, a larger chunk of the third party devs we are working with - they all use synchronous pulls - though I have not done any research as to why. If it works for them - that's great.

@evmin what is the motivation for direct access to SubscriberClient::pull from the system design perspective? Doing such a call on your own requires some logic to make it work right, e.g. extending deadlines when the work is taking longer, and it also consumes a user thread.

That's correct. And we are totally fine with that - current GA library expects that, and we came to rely on the ability to manage it ourselves. Abandoning the message to be redelivered after the default timeout is another working scenario for one of the implementations.

From our earlier discussion I understood that pull will be there no matter what. That was essentially one of the key factors that allowed me to convince the team to go ahead - that when the new library hit the ground we would only need to do a swap and that has been priced in into the maintenance costs.

The discussion to remove sync pull is worrisome - as reworking the solutions to async would attract additional effort from our vendors, and, hence, invoices.

If the only remaining use case is migrating current direct callers of pull, then maybe letting them sit on the deprecated code for a couple months is sufficient.

Please consider that the new library will need to supersede the existing GA, where the pull capability is present. So by the time it is released, the GA based client might be embedded so deeply, that reworking these models would simply not be possible.

Generally speaking, more diverse capability set is always better than a narrow one. And if a capability needs to be removed there should be PLENTY of time for end projects to accommodate.

To give you a feel from the enterprise world. We have a solution that has been out of support by vendor for 7 years. It works and there is no inclination to upgrade. Salesforce took one and a half years to deprecate TLS 1.0. RedHat supports the old releases for at least 5. And there is a reason for that - things in the enterprise world are slow. Which is generally good for the vendor, as the tech becomes incumbent and generates a stable income stream.

So when the new library is released and stabilised as GA it would take at least 6 months for it to make its way into a Camel release. And then there is no guarantee that the clients using the previous version will be able to quickly move onto the new version and adapt to the new scheme of things. At least we wouldn’t and that would preclude us from moving on.

Camel backward compatibility and the feature stability were one of the core selling points. With PubSub component being a part of the offering Google instantly gets a wider market, but the feature stability is paramount.

Hope I was able to bring my point across.

@garrettjonesgoogle
Copy link
Member

TL;DR

Current GA library does have a sync pull. Feature consistency across libraries is paramount for stability of the solutions on top of them. Sync pull has been confirmed earlier and was one of the key factors for us to move ahead.

The earlier confirmation coupled with your dependency on it is reason enough to keep it (reason 1).

Longer version.

A bit of a background. I am a customer advocate for the GCP tech. I am currently implementing three major projects where the core component is Google PubSub in a traditional, non tech organisation. Exactly the market Google aims to capture on the large scale, if I am not mistaken. My aim is to ensure that the projects are a success.

As with any non technical company, the skill level of the devs varies. And one of the risk mitigation strategies would be to use the abstraction frameworks, where the complex integration patterns reduced to a mere statement. Camel would be an example.

That comes at the price of having the framework stable and resilient - it should be able to handle the abuse of the developers and the end user requirements. In my career I have seen a fair bit of bending the tech - ETL solutions implemented on a service bus and databases used as the HTTP proxy. I know, I know. But if that's what customer is willing to pay for - technology should be able to handle.

The non tech companies have little capability to redevelop the existing solutions - it is considered a risk and they are generally are very risk averse.

For my understanding: is the PubSub interface in google-cloud-java exposed directly to any of your target population of users? Or is everyone going through Apache Camel? If so, I'm assuming Apache Camel would completely abstract away the PubSub support?

I'm mostly curious to know exactly who will be coding against PubSub - it helps to know who our users are.

@evmin , I'm curious why you are bringing up the fact that Apache Camel manages consumer threads; there still remains the fact that gRPC uses a thread pool of its own, thus anything that seems synchronous still does execute on a separate gRPC thread.

Well, we build on Camel to hide the complexity and I do not completely understand all the implications of how the new model would work for the solutions we have built on top of it. We do use Camel's capability to manage threads heavily to scale up and down to align with the downstream processing. For example we use a single thread to pull just a single message synchronously to test if the end system is back on line and can accept the calls (proprietory, non HTTP). If it does - we scale the number of threads to pump the data. If not - scale down to single thread with back off intervals.

I wonder if such up/down scaling support could be added to the google-cloud-java PubSub library. If so, we couldn't promise it in short order, which is probably reason 2 for us to expose the synchronous pull.

That's why migration to the new, self managed gRPC based library remains a risk factor from my perspective - and having the sync pull allows me to avoids retrofitting of the existing logic on top of Camel / GA library combo. When the time comes I essentially would like to be able to swap the old library for the new with the minimum fuss. So I do need the the synchronous pull and the ability define the number of threads.

For my understanding: will the Camel part be removed, where users will switch directly to the PubSub interface?

Also, a larger chunk of the third party devs we are working with - they all use synchronous pulls - though I have not done any research as to why. If it works for them - that's great.

@evmin what is the motivation for direct access to SubscriberClient::pull from the system design perspective? Doing such a call on your own requires some logic to make it work right, e.g. extending deadlines when the work is taking longer, and it also consumes a user thread.

That's correct. And we are totally fine with that - current GA library expects that, and we came to rely on the ability to manage it ourselves. Abandoning the message to be redelivered after the default timeout is another working scenario for one of the implementations.

From our earlier discussion I understood that pull will be there no matter what. That was essentially one of the key factors that allowed me to convince the team to go ahead - that when the new library hit the ground we would only need to do a swap and that has been priced in into the maintenance costs.

The discussion to remove sync pull is worrisome - as reworking the solutions to async would attract additional effort from our vendors, and, hence, invoices.

If the only remaining use case is migrating current direct callers of pull, then maybe letting them sit on the deprecated code for a couple months is sufficient.

Please consider that the new library will need to supersede the existing GA, where the pull capability is present. So by the time it is released, the GA based client might be embedded so deeply, that reworking these models would simply not be possible.

Generally speaking, more diverse capability set is always better than a narrow one. And if a capability needs to be removed there should be PLENTY of time for end projects to accommodate.

Technically, it isn't being removed from the GA library; the proposal was just to not add it to the new one.

I would argue that a more diverse capability set isn't always better; that generally means more to maintain, which can be wasteful if you implement features that only a few users might want. This definitely doesn't apply to your case though.

To give you a feel from the enterprise world. We have a solution that has been out of support by vendor for 7 years. It works and there is no inclination to upgrade. Salesforce took one and a half years to deprecate TLS 1.0. RedHat supports the old releases for at least 5. And there is a reason for that - things in the enterprise world are slow. Which is generally good for the vendor, as the tech becomes incumbent and generates a stable income stream.

Understood!

So when the new library is released and stabilised as GA it would take at least 6 months for it to make its way into a Camel release. And then there is no guarantee that the clients using the previous version will be able to quickly move onto the new version and adapt to the new scheme of things. At least we wouldn’t and that would preclude us from moving on.

Camel backward compatibility and the feature stability were one of the core selling points. With PubSub component being a part of the offering Google instantly gets a wider market, but the feature stability is paramount.

Hope I was able to bring my point across.

Indeed. You have made clear the extent of the user base that depends on this feature, so we will definitely retain the feature in google-cloud-java.

@evmin
Copy link

evmin commented Feb 6, 2017

The earlier confirmation coupled with your dependency on it is reason enough to keep it (reason 1).

Thank you. I truly appreciate this.

For my understanding: is the PubSub interface in google-cloud-java exposed directly to any of your target population of users? Or is everyone going through Apache Camel?

Actually both. External partners go directly through the API - we are trying to standardise on PubSub as the integration layer. Got two external vendors on Java client. TO my understanding one of them is using synchronous calls as well.

Internal teams - use Camel.

If so, I'm assuming Apache Camel would completely abstract away the PubSub support?

Kind of. The intention is to get Camel to abstract the implementation details of PubSub specifics, where PubSub is just yet another produce or consumer unified withing the Camel framework.

I'm mostly curious to know exactly who will be coding against PubSub - it helps to know who our users are.

In our case - external vendors. I am happy to go into the details here, would prefer to do that by email.

For my understanding: will the Camel part be removed, where users will switch directly to the PubSub interface?

I hoped for exactly the opposite - Apache Camel is an OpenSource Routing and Mediation framework that is used at the core of quite a number of commercial products - JBoss FUSE, Talend ESB, etc. PubSub component will naturally flow to these products hopefully leading to addition uptake of Google PubSub.

Indeed. You have made clear the extent of the user base that depends on this feature, so we will definitely retain the feature in google-cloud-java.

Thank you very much for this.

@garrettjonesgoogle
Copy link
Member

The Pub/Sub rewrite has been merged and released. Documenting returnImmediately as commented previously is no longer relevant. Doing a synchronous pull in SubscriberClient is supported. Closing out the issue.

@anguillanneuf
Copy link
Contributor

anguillanneuf commented Sep 12, 2018

The doc says when return_immediately is set to False, the system waits (for a bounded amount of time) until at lease one message is available. Do we know what's this "bounded amount of time"?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: core api: pubsub Issues related to the Pub/Sub API.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants