Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub 0.28: batch support? #4141

Closed
krelst opened this issue Oct 9, 2017 · 7 comments
Closed

Pub/Sub 0.28: batch support? #4141

krelst opened this issue Oct 9, 2017 · 7 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. type: question Request for information or clarification. Not an issue.

Comments

@krelst
Copy link

krelst commented Oct 9, 2017

It seems that the new google-cloud-pubsub library version 0.28 does not include any support for pulling and processing messages in batches. Several of our production systems rely on processing pubsub messages in batches in order to reduce the load on other systems (e.g. to lower the amount of queries done our databases). Is there any ETA on re-adding batch pulling to the library?

@lukesneeringer lukesneeringer added the api: pubsub Issues related to the Pub/Sub API. label Oct 12, 2017
@lukesneeringer
Copy link
Contributor

Hi @krelst,
The new libraries relies exclusively on Pub/Sub's new streaming pull feature, in order to maximize performance in high throughput situations.

I am afraid I am unsure what you mean by "batch pulling". Can you explain your use case?

@lukesneeringer lukesneeringer added the type: question Request for information or clarification. Not an issue. label Oct 12, 2017
@krelst
Copy link
Author

krelst commented Oct 12, 2017

Hi @lukesneeringer

Thanks for the response! In versions of the google-cloud-pubsub library bellow 0.28, the subscription.pull() method returns a list of (ack_id, messages)-tuples. We are setting the max_messages of that method to 1000, and consequently for most of our pipelines the list returned contains in between 100/200 messages. Instead of processing each message separately, we are parsing all messages in that list at once, so that we can e.g. perform all required database updates in a single query, instead of having to perform a query for each of the messages. As the google-cloud-pubsub library version 0.28 works with callbacks on a message level, this seems a lot harder to do. So for our use-case it would be useful to have callbacks for a list of messages, instead for a single message.

I hope this clarifies my question!

@lukesneeringer
Copy link
Contributor

lukesneeringer commented Oct 12, 2017

It does clarify my question, thank you.

I see how this is a useful feature in principle, although when I have time to implement it is a separate question. The Pub/Sub team is primarily concerned with high throughput situations -- they actually explicitly wanted "polling" pull to be explicitly removed so that people would not use it by accident.

There are a few options here.

First: You could use the old polling-based pull method. It is not present in the documented surface, but you can get at it: from google.cloud.pubsub_v1.gapic.subscriber_client import SubscriberClient. That class (which should look mostly like google.cloud.pubsub_v1.subscriber.Client has a pull method that will perform the old polling behavior.

Second: You could implement the batching yourself. In fact, it would probably look pretty similar to the batching class we provide on the publisher side. Here is a quick pass at a plausible implementation (note: written in the GitHub text editor; may have some silly bugs):

import threading


class Batch(object):
    def __init__(self, max_messages=250, max_seconds=30):
        self._max_messages = max_messages
        self._max_seconds = max_seconds
        self._accepting_messages = True
        self._accepting_messages_check_lock = threading.Lock()
        self._messages = []

        # Fire off a timer to commit when time elapses.
        self._countdown_thread = threading.Thread(target=self.countdown)
        self._countdown_thread.start()

    @property
    def accepting_messages(self):
        return self._accepting_messages

    def countdown(self):
        time.sleep(self._max_seconds)
        self._commit()

    def commit(self):
        threading.Thread(target=self._commit).start()

    def _commit(self):
        # Sanity check: Do not multiply-commit.
        with self._accepting_messages_check_lock:
            if not self._accepting_messages:
                return
            self._accepting_messages = False

        # Actually do the work and then ack the messages.
        do_something_with(self._messages)  # <-- YOUR PROCESSING HERE.
        for message in self._messages:
            message.ack()

    def add_message(self, message):
        self._messages.append(message)
        if len(self._messages) >= self._max_messages:
            self._commit()

And then, your callback becomes something like:

import threading

from (somewhere) import Batch


lock = threading.Lock()
batch = None

def my_callback(message):
    with lock:
        if batch is None or not batch.accepting_messages:
            batch = Batch()
        batch.add_message(message)

A few thoughts:

  • Refer to our publisher batch for an implementation of an extremely similar concept. We were worried about pluggability so what we did is more complex than what you need, but the basic principles all remain the same.
  • Obviously do not rely on the scoping I did in my callback block. :-)

@krelst
Copy link
Author

krelst commented Oct 12, 2017

Indeed this would solve our issue :) As I don't think we are the only one relying on batching of message, we really hope this will be included in one of the next releases of the pub sub library.

@lukesneeringer
Copy link
Contributor

Hi @krelst,
It is something we will discuss and consider. If the need is common, we will definitely want to find a way to meet it. Taking something roughly like what I wrote above and incorporating it into the library proper could be an approach. It really depends on what people need. :-)

We are committed to the new Pub/Sub surface, so you can implement the above without worry about the surface breaking. If we do add built-in batching, it will be in an additive way that does not break this code. :-)

As you have said this is a workable solution for you, I am going to close this issue. Thanks for reaching out!

@john1625b
Copy link

Hi, this thread seems to solve the issue based on message size, however in the documentation, it seems like there is a way to batch it by byte size:

https://cloud.google.com/pubsub/docs/publisher#batching-configuration

Messages can be batched based on request size (in bytes)

what object would I pass in the batch configuration object. thanks

@chemelnucfin
Copy link
Contributor

Hello, I'm currently working out the problem that you can batch larger sizes than the specified byte parameter. But this would be the setting you would use.

batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
)

parthea pushed a commit that referenced this issue Oct 21, 2023
* Tables Notebooks [(#2090)](GoogleCloudPlatform/python-docs-samples#2090)

* initial commit
* update census
* update notebooks

* remove the reference to a bug [(#2100)](GoogleCloudPlatform/python-docs-samples#2100)

as the bug has been fixed in the public client lib

* delete this file. [(#2102)](GoogleCloudPlatform/python-docs-samples#2102)

* rename file name [(#2103)](GoogleCloudPlatform/python-docs-samples#2103)

* trying to fix images [(#2101)](GoogleCloudPlatform/python-docs-samples#2101)

* remove typo in installation [(#2110)](GoogleCloudPlatform/python-docs-samples#2110)

* Rename census_income_prediction.ipynb to getting_started_notebook.ipynb [(#2115)](GoogleCloudPlatform/python-docs-samples#2115)

renaming the notebooks as Getting Started (will be in sync with the doc). It will be great if the folder could be renamed too

* added back missing file package import [(#2150)](GoogleCloudPlatform/python-docs-samples#2150)

* added back missing file import [(#2145)](GoogleCloudPlatform/python-docs-samples#2145)

* remove incorrect reference to Iris dataset [(#2203)](GoogleCloudPlatform/python-docs-samples#2203)

* conversion to jupyter/colab [(#2340)](GoogleCloudPlatform/python-docs-samples#2340)

plus bug fixes

* updated for the Jupyter support [(#2337)](GoogleCloudPlatform/python-docs-samples#2337)

* updated readme for support Jupyter [(#2336)](GoogleCloudPlatform/python-docs-samples#2336)

to approve with the updated notebook supporting jupyter

* conversion to jupyer/colab [(#2339)](GoogleCloudPlatform/python-docs-samples#2339)

plus bug fixes

* conversion of notebook for jupyter/Colab [(#2338)](GoogleCloudPlatform/python-docs-samples#2338)

conversion of the notebook to support both Jupyter and Colab + bug fixes

* [BLOCKED] AutoML Tables: Docs samples updated to use new (pending) client [(#2276)](GoogleCloudPlatform/python-docs-samples#2276)

* AutoML Tables: Docs samples updated to use new (pending) client

* Linter warnings

* add product recommendation for automl tables notebook [(#2257)](GoogleCloudPlatform/python-docs-samples#2257)

* added colab filtering notebook

* update to tables client

* update readme

* tell user to restart kernel for automl

* AutoML Tables: Notebook samples updated to use new tables client [(#2424)](GoogleCloudPlatform/python-docs-samples#2424)

* fix users bug and emphasize kernal restart [(#2407)](GoogleCloudPlatform/python-docs-samples#2407)

* fix problems with automl docs [(#2501)](GoogleCloudPlatform/python-docs-samples#2501)

Today when we try to use the function `batch_predict` follow the docs we receive and error saying: `the paramaters should be a pandas.Dataframe` it’s happens because the first parameter of the function `batch_predict` is a pandas.Dataframe. To solve this problem we need to use de named parameters of python.

* Fix typo in GCS URI parameter [(#2459)](GoogleCloudPlatform/python-docs-samples#2459)

* fix: fix tables notebook links and bugs [(#2601)](GoogleCloudPlatform/python-docs-samples#2601)

* feat(tables): update samples to show explainability [(#2523)](GoogleCloudPlatform/python-docs-samples#2523)

* show xai

* local feature importance

* use updated client

* use fixed library

* use new model

* Auto-update dependencies. [(#2005)](GoogleCloudPlatform/python-docs-samples#2005)

* Auto-update dependencies.

* Revert update of appengine/flexible/datastore.

* revert update of appengine/flexible/scipy

* revert update of bigquery/bqml

* revert update of bigquery/cloud-client

* revert update of bigquery/datalab-migration

* revert update of bigtable/quickstart

* revert update of compute/api

* revert update of container_registry/container_analysis

* revert update of dataflow/run_template

* revert update of datastore/cloud-ndb

* revert update of dialogflow/cloud-client

* revert update of dlp

* revert update of functions/imagemagick

* revert update of functions/ocr/app

* revert update of healthcare/api-client/fhir

* revert update of iam/api-client

* revert update of iot/api-client/gcs_file_to_device

* revert update of iot/api-client/mqtt_example

* revert update of language/automl

* revert update of run/image-processing

* revert update of vision/automl

* revert update testing/requirements.txt

* revert update of vision/cloud-client/detect

* revert update of vision/cloud-client/product_search

* revert update of jobs/v2/api_client

* revert update of jobs/v3/api_client

* revert update of opencensus

* revert update of translate/cloud-client

* revert update to speech/cloud-client

Co-authored-by: Kurtis Van Gent <31518063+kurtisvg@users.noreply.github.com>
Co-authored-by: Doug Mahugh <dmahugh@gmail.com>

* Update dependency google-cloud-automl to v0.10.0 [(#3033)](GoogleCloudPlatform/python-docs-samples#3033)

Co-authored-by: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com>
Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com>

* Simplify noxfile setup. [(#2806)](GoogleCloudPlatform/python-docs-samples#2806)

* chore(deps): update dependency requests to v2.23.0

* Simplify noxfile and add version control.

* Configure appengine/standard to only test Python 2.7.

* Update Kokokro configs to match noxfile.

* Add requirements-test to each folder.

* Remove Py2 versions from everything execept appengine/standard.

* Remove conftest.py.

* Remove appengine/standard/conftest.py

* Remove 'no-sucess-flaky-report' from pytest.ini.

* Add GAE SDK back to appengine/standard tests.

* Fix typo.

* Roll pytest to python 2 version.

* Add a bunch of testing requirements.

* Remove typo.

* Add appengine lib directory back in.

* Add some additional requirements.

* Fix issue with flake8 args.

* Even more requirements.

* Readd appengine conftest.py.

* Add a few more requirements.

* Even more Appengine requirements.

* Add webtest for appengine/standard/mailgun.

* Add some additional requirements.

* Add workaround for issue with mailjet-rest.

* Add responses for appengine/standard/mailjet.

Co-authored-by: Renovate Bot <bot@renovateapp.com>

* chore: some lint fixes [(#3750)](GoogleCloudPlatform/python-docs-samples#3750)

* automl: tables code sample clean-up [(#3571)](GoogleCloudPlatform/python-docs-samples#3571)

* delete unused tables_dataset samples

* delete args code associated with unused automl_tables samples

* delete tests associated with unused automl_tables samples

* restore get_dataset method/yargs without region tagging

* Restore update_dataset methodsa without region tagging

Co-authored-by: Takashi Matsuo <tmatsuo@google.com>
Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com>

* add example of creating AutoML Tables client with non-default endpoint ('new' sdk) [(#3929)](GoogleCloudPlatform/python-docs-samples#3929)

* add example of creating client with non-default endpoint

* more test file cleanup

* move connectivity print stmt out of test fn

Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com>
Co-authored-by: Torry Yang <sirtorry@users.noreply.github.com>

* Replace GCLOUD_PROJECT with GOOGLE_CLOUD_PROJECT. [(#4022)](GoogleCloudPlatform/python-docs-samples#4022)

* chore(deps): update dependency google-cloud-automl to v1 [(#4127)](GoogleCloudPlatform/python-docs-samples#4127)

This PR contains the following updates:

| Package | Update | Change |
|---|---|---|
| [google-cloud-automl](https://togithub.com/googleapis/python-automl) | major | `==0.10.0` -> `==1.0.1` |

---

### Release Notes

<details>
<summary>googleapis/python-automl</summary>

### [`v1.0.1`](https://togithub.com/googleapis/python-automl/blob/master/CHANGELOG.md#&#8203;101-httpswwwgithubcomgoogleapispython-automlcomparev100v101-2020-06-18)

[Compare Source](https://togithub.com/googleapis/python-automl/compare/v0.10.0...v1.0.1)

</details>

---

### Renovate configuration

:date: **Schedule**: At any time (no schedule defined).

:vertical_traffic_light: **Automerge**: Disabled by config. Please merge this manually once you are satisfied.

:recycle: **Rebasing**: Never, or you tick the rebase/retry checkbox.

:no_bell: **Ignore**: Close this PR and you won't be reminded about this update again.

---

 - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box

---

This PR has been generated by [WhiteSource Renovate](https://renovate.whitesourcesoftware.com). View repository job log [here](https://app.renovatebot.com/dashboard#GoogleCloudPlatform/python-docs-samples).

* [tables/automl] fix: update the csv file and the dataset name [(#4188)](GoogleCloudPlatform/python-docs-samples#4188)

fixes #4177
fixes #4178

* samples: Automl table batch test [(#4267)](GoogleCloudPlatform/python-docs-samples#4267)

* added rtest req.txt

* samples: added automl batch predict test

* added missing package

* Update tables/automl/batch_predict_test.py

Co-authored-by: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com>

Co-authored-by: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com>

* samples: fixed wrong format on GCS input Uri [(#4270)](GoogleCloudPlatform/python-docs-samples#4270)

## Description

Current predict sample indicates that it can multiples GCS URI inputs but it should be singular.

## Checklist
- [X] Please **merge** this PR for me once it is approved.

* chore(deps): update dependency pytest to v5.4.3 [(#4279)](GoogleCloudPlatform/python-docs-samples#4279)

* chore(deps): update dependency pytest to v5.4.3

* specify pytest for python 2 in appengine

Co-authored-by: Leah Cole <coleleah@google.com>

* Update automl_tables_predict.py with batch_predict_bq sample [(#4142)](GoogleCloudPlatform/python-docs-samples#4142)

Added a new method `batch_predict_bq` demonstrating running batch_prediction using BigQuery.
Added notes in comments about asynchronicity for `batch_predict` method.

The region `automl_tables_batch_predict_bq` will be used on cloud.google.com (currently both sections for GCS and BigQuery use the same sample code which is incorrect).

Fixes #4141

Note: It's a good idea to open an issue first for discussion.

- [x] Please **merge** this PR for me once it is approved.

* Update dependency pytest to v6 [(#4390)](GoogleCloudPlatform/python-docs-samples#4390)

* chore: exclude notebooks

* chore: update templates

* chore: add codeowners and fix tests

* chore: ignore warnings from sphinx

* chore: fix tables client

* test: fix unit tests

Co-authored-by: Torry Yang <sirtorry@users.noreply.github.com>
Co-authored-by: florencep <florenceperot@google.com>
Co-authored-by: Mike Burton <mb-github@niskala.org>
Co-authored-by: Lars Wander <lwander@users.noreply.github.com>
Co-authored-by: Michael Hu <Michael.an.hu@gmail.com>
Co-authored-by: Michael Hu <michaelanhu@gmail.com>
Co-authored-by: Alefh Sousa <alefh.sousa@gmail.com>
Co-authored-by: DPEBot <dpebot@google.com>
Co-authored-by: Kurtis Van Gent <31518063+kurtisvg@users.noreply.github.com>
Co-authored-by: Doug Mahugh <dmahugh@gmail.com>
Co-authored-by: WhiteSource Renovate <bot@renovateapp.com>
Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com>
Co-authored-by: Takashi Matsuo <tmatsuo@google.com>
Co-authored-by: Anthony <wens.ajw@gmail.com>
Co-authored-by: Amy <amy@infosleuth.net>
Co-authored-by: Mike <45373284+munkhuushmgl@users.noreply.github.com>
Co-authored-by: Leah Cole <coleleah@google.com>
Co-authored-by: Sergei Dorogin <github@dorogin.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

4 participants