Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish messages > 'batch_setting.max_size' in a separate batch. #4608

Closed
kir-titievsky opened this issue Dec 18, 2017 · 22 comments
Closed

Publish messages > 'batch_setting.max_size' in a separate batch. #4608

kir-titievsky opened this issue Dec 18, 2017 · 22 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. triaged for GA type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@kir-titievsky
Copy link

kir-titievsky commented Dec 18, 2017

Running a simple no-batching publisher with Pub/Sub fails when attempting to publish 100 messages. The code works with 10 messages. Code sample:

import time
from google.cloud import pubsub_v1

batch_settings = pubsub_v1.types.BatchSettings(
    max_bytes=0,  # no batching 
    max_latency=1.0,  # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path('${PROJECT}', 'saxby')

for n in range(100):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    t = time.clock()
    def dt(x):
        print(time.clock() - t)
    data = data.encode('utf-8')
    publisher.publish(topic_path, data=data).add_done_callback(dt)

Errors:

E1218 11:01:17.394871000 123145784201216 wakeup_fd_pipe.cc:38]         pipe creation failed (24): Too many open files
E1218 11:01:17.394877000 123145586491392 wakeup_fd_pipe.cc:38]         pipe creation failed (24): Too many open files
E1218 11:01:17.394900000 123145594904576 wakeup_fd_pipe.cc:38]         pipe creation failed (24): Too many open files
E1218 11:01:17.394910000 123145784201216 ev_poll_posix.cc:905]         pollset_work: {"created":"@1513612877.394894000","description":"OS Error","errno":24,"file":"src/core/lib/iomgr/wakeup_fd_pipe.cc","file_line":39,"os_error":"Too many open files","syscall":"pipe"}
E1218 11:01:17.394930000 123145586491392 ev_poll_posix.cc:905]         pollset_work: {"created":"@1513612877.394915000","description":"OS Error","errno":24,"file":"src/core/lib/iomgr/wakeup_fd_pipe.cc","file_line":39,"os_error":"Too many open files","syscall":"pipe"}
E1218 11:01:17.394936000 123145594904576 ev_poll_posix.cc:905]         pollset_work: {"created":"@1513612877.394923000","description":"OS Error","errno":24,"file":"src/core/lib/iomgr/wakeup_fd_pipe.cc","file_line":39,"os_error":"Too many open files","syscall":"pipe"}
E1218 11:01:17.394945000 123145784201216 completion_queue.cc:958]      Completion queue next failed: {"created":"@1513612877.394894000","description":"OS Error","errno":24,"file":"src/core/lib/iomgr/wakeup_fd_pipe.cc","file_line":39,"os_error":"Too many open files","syscall":"pipe"}
E1218 11:01:17.394959000 123145586491392 completion_queue.cc:958]      Completion queue next failed: {"created":"@1513612877.394915000","description":"OS Error","errno":24,"file":"src/core/lib/iomgr/wakeup_fd_pipe.cc","file_line":39,"os_error":"Too many open files","syscall":"pipe"}
E1218 11:01:17.394967000 123145594904576 completion_queue.cc:958]      Completion queue next failed: {"created":"@1513612877.394923000","description":"OS Error","errno":24,"file":"src/core/lib/iomgr/wakeup_fd_pipe.cc","file_line":39,"os_error":"Too many open files","syscall":"pipe"}
E1218 11:01:17.394980000 123145784201216 wakeup_fd_pipe.cc:38]         pipe creation failed (24): Too many open files
E1218 11:01:17.395000000 123145586491392 wakeup_fd_pipe.cc:38]         pipe creation failed (24): Too many open files
$ pip show google-cloud-pubsub
Name: google-cloud-pubsub
Version: 0.29.4
Summary: Python Client for Google Cloud Pub/Sub
Home-page: https://github.com/GoogleCloudPlatform/google-cloud-python
Author: Google Cloud Platform
Author-email: googleapis-packages@google.com
License: Apache 2.0
Location: /Users/kir/cloud/env/lib/python2.7/site-packages
Requires: psutil, google-auth, google-api-core, grpc-google-iam-v1

OS: macOS Sierra (10.12.6)

There is probably some way to make this work with non-default file limits, but it seems like publishing a 100 messages should work on any sane machine.

@dhermes dhermes added api: pubsub Issues related to the Pub/Sub API. status: investigating The issue is under investigation, which is determined to be non-trivial. labels Dec 18, 2017
@dhermes
Copy link
Contributor

dhermes commented Dec 18, 2017

Thanks for filing. I'm currently fixing #4575, which seems related.

@dhermes
Copy link
Contributor

dhermes commented Dec 19, 2017

@kir-titievsky The issue is that max_bytes=0 makes it impossible to ever add a message to a batch. You wanted max_messages=1.


The very first iteration runs until it has created every thread and then fails:

>>> import google.auth
>>> from google.cloud import pubsub_v1
>>> batch_settings = pubsub_v1.types.BatchSettings(
...     max_bytes=0,  # no batching
...     max_latency=1.0,  # One second
... )
>>> publisher = pubsub_v1.PublisherClient(batch_settings)
E1219 10:27:12.768400187   29067 ev_epollex_linux.cc:1482]   Skipping epollex becuase GRPC_LINUX_EPOLL is not defined.
E1219 10:27:12.768413837   29067 ev_epoll1_linux.cc:1261]    Skipping epoll1 becuase GRPC_LINUX_EPOLL is not defined.
E1219 10:27:12.768441213   29067 ev_epollsig_linux.cc:1761]  Skipping epollsig becuase GRPC_LINUX_EPOLL is not defined.
>>> _, project = google.auth.default()
>>> topic_path = publisher.topic_path(project, 'saxby')
>>> data = b'Message number 0'
>>> publisher.publish(topic_path, data=data)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "${SITE_PACKAGES}/google/cloud/pubsub_v1/publisher/client.py", line 201, in publish
    batch = self.batch(topic, create=True)
  File "${SITE_PACKAGES}/google/cloud/pubsub_v1/publisher/client.py", line 133, in batch
    topic=topic,
  File "${SITE_PACKAGES}/google/cloud/pubsub_v1/publisher/batch/thread.py", line 91, in __init__
    self._thread.start()
  File "${HOME}/.pyenv/versions/3.6.3/lib/python3.6/threading.py", line 846, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread

If you use a custom Batch class you can see the number of (totally unused) batches grow:

>>> from google.cloud.pubsub_v1.publisher.batch import thread
>>>
>>> class CustomBatch(thread.Batch):
...     COUNT = 0
...     def __init__(self, *args, **kwargs):
...         CustomBatch.COUNT += 1
...         super(CustomBatch, self).__init__(*args, **kwargs)
...
>>>
>>> publisher = pubsub_v1.PublisherClient(batch_settings, batch_class=CustomBatch)
>>> publisher.publish(topic_path, data=data)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "${SITE_PACKAGES}/google/cloud/pubsub_v1/publisher/client.py", line 201, in publish
    batch = self.batch(topic, create=True)
  File "${SITE_PACKAGES}/google/cloud/pubsub_v1/publisher/client.py", line 133, in batch
    topic=topic,
  File "<stdin>", line 5, in __init__
  File "${SITE_PACKAGES}/google/cloud/pubsub_v1/publisher/batch/thread.py", line 91, in __init__
    self._thread.start()
  File "${HOME}/.pyenv/versions/3.6.3/lib/python3.6/threading.py", line 846, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
>>> CustomBatch.COUNT
11207

The code is stuck in an infinite loop.

It seems we could add some validation to the BatchSettings named tuple? @lukesneeringer and I were just discussing the need to make sure that max_latency was a float and non-negative.

@dhermes dhermes added priority: p2 Moderately-important priority. Fix may not be included in next release. and removed status: investigating The issue is under investigation, which is determined to be non-trivial. labels Dec 19, 2017
@kir-titievsky
Copy link
Author

kir-titievsky commented Dec 19, 2017 via email

@danoscarmike
Copy link
Contributor

@dhermes please update when you get a chance. Thanks!

@dhermes dhermes changed the title Pub/Sub publish 100 messages: "Pipe creation failed (24): Too many files open Pub/Sub BatchSettings constructor should have validation Jan 5, 2018
@chemelnucfin chemelnucfin added the type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. label Jan 9, 2018
@danoscarmike danoscarmike added release blocking Required feature/issue must be fixed prior to next release. triaged for GA labels Jan 19, 2018
@danoscarmike danoscarmike assigned chemelnucfin and unassigned dhermes Feb 8, 2018
@danoscarmike
Copy link
Contributor

@chemelnucfin please add this one to the priority list.

Any single message where message byte size >max_bytes: raise error.

@chemelnucfin
Copy link
Contributor

on it.

@chemelnucfin
Copy link
Contributor

@danoscarmike Is there a reason you want to raise a ValueError? As the code stands currently it ignores the message already. Should we use warnings instead?

@theacodes
Copy link
Contributor

theacodes commented Feb 9, 2018 via email

@chemelnucfin
Copy link
Contributor

chemelnucfin commented Feb 9, 2018

@jonparrott But is that the desired behavior or should we just fix the infinite loop situation when ignoring the message?

@theacodes
Copy link
Contributor

We should never just drop messages on the floor. Based on the behavior of the client in other languages, we should treat it as an error.

@theacodes theacodes changed the title Pub/Sub BatchSettings constructor should have validation Pub/Sub should reject messages that are over the BatchSetting's max_size. Feb 12, 2018
@chemelnucfin
Copy link
Contributor

chemelnucfin commented Feb 13, 2018

[x] #4872 Rejects the message.
[ ] #4870 Send the message off immediately.

@theacodes theacodes removed the release blocking Required feature/issue must be fixed prior to next release. label Feb 13, 2018
@theacodes
Copy link
Contributor

Since #4872 is merged, this is no longer release blocking.

@tseaver
Copy link
Contributor

tseaver commented Feb 27, 2018

@jonparrott ISTM that merging #4872 should have closed this issue?

@theacodes
Copy link
Contributor

No, we need to actually match Java and Go's strategy here and submit the oversized message in its own batch.

@chemelnucfin
Copy link
Contributor

@tseaver was working on this in #4870. Forgot about it.

@chemelnucfin chemelnucfin added the status: blocked Resolving the issue is dependent on other work. label Mar 7, 2018
@tseaver
Copy link
Contributor

tseaver commented Mar 16, 2018

@chemelnucfin I'm not sure why this is marked as status: blocked.

@tseaver tseaver changed the title Pub/Sub should reject messages that are over the BatchSetting's max_size. Publish messages > 'batch_setting.max_size' in a separate batch. Mar 16, 2018
@tseaver tseaver removed the status: blocked Resolving the issue is dependent on other work. label Mar 16, 2018
@tseaver
Copy link
Contributor

tseaver commented Apr 10, 2018

Note to self: use work from PR #4870 to address this issue.

@tseaver tseaver assigned tseaver and unassigned chemelnucfin Apr 10, 2018
@tseaver
Copy link
Contributor

tseaver commented Apr 10, 2018

@jonparrott ISTM that there are a couple of questions here:

  • If a single message is larger than the max_bytes setting, should any existing batched messages be flushed first before publishing the over-large message in its own batch? I don't know whether we care at all about order-of-publishing.
  • Likewise, if a message itself is smaller than the max_bytes setting, but appending it to the batch would cause the batch's aggregate size to overflow, should the existing messages be flushed, and the new one added to the newly-emptied list?

@kir-titievsky
Copy link
Author

@jonparrott 's comment on Feb 27 says that oversize messages should be published in their own batch.

PM perspective here: we do care about order of publishing. Client library should not re-order messages if possible.

@chemelnucfin
Copy link
Contributor

chemelnucfin commented Apr 10, 2018

@kir-titievsky But it looked like from java that the batch gets sent off immediately?

@kir-titievsky
Copy link
Author

Ah, I see what you mean now. I, as a user, would expect that the large message would cause the existing buffer of small messages to get flushed first.

@tseaver
Copy link
Contributor

tseaver commented May 17, 2018

@kir-titievsky, @theacodes We currently test for an overflow of the message count after appending the message in publisher.batch.thread.Batch.publish. I think the "overflow" logic needs to be normalized for both max_bytes and max_messages:

  • First, remove exceptions for individual messages exceeding max_bytes in both publisher.batch.base.Batch.will_accept or in publisher.client.Client.publish.

  • Then, inside publisher.batch.thread.Batch.publish

    1. Test for overflow of either bytes or message count before appending the message to the batch.
    2. If either is true, and we already have messages, spawn the commit thread for the current batch and return None (forcing the client to create a new batch). The client with then publish the message via the new batch.
    3. If there is overflow, but there are no prior messages in the batch, go ahead and append the message, trigger the commit thread, and return the future for the message (the overflowing message will go out in its own request).
    4. If there is no overflow, append the message and return its future.

As an alternative to step 3, we could just remove the existing post-append overflow test (currently only for count), and let the monitor thread or the next call to publish trigger the commit.
This still leaves in place

tseaver added a commit that referenced this issue May 21, 2018
* Remove exceptions for oversize messages.

Toward #4608.

* Normalize overflow handling for max count and bytes.

Closes #4608.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. triaged for GA type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

6 participants