-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pubsub large message #4870
pubsub large message #4870
Conversation
cf8eada
to
5117f56
Compare
5117f56
to
d307000
Compare
d307000
to
bbdf265
Compare
@@ -97,7 +97,7 @@ def make_lock(): | |||
Returns: | |||
_thread.Lock: A newly created lock. | |||
""" | |||
return threading.Lock() | |||
return threading.RLock() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -204,6 +204,8 @@ def _commit(self): | |||
self._messages, | |||
) | |||
end = time.time() | |||
self._client._publish_count += len(self._messages) | |||
self._client._commit_count += 1 |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
return self._commit() | ||
|
||
if self._size: | ||
return self._commit() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -278,7 +281,8 @@ def publish(self, message): | |||
|
|||
# Try to commit, but it must be **without** the lock held, since | |||
# ``commit()`` will try to obtain the lock. | |||
if num_messages >= self._settings.max_messages: | |||
if (num_messages >= self._settings.max_messages | |||
or self._size >= self._settings.max_bytes): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
~.pubsub_v1.batch.Batch: The batch object. | ||
""" | ||
return self._batch(topic, create, autocommit) | ||
|
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
session.run('py.test', '--quiet', 'tests/system.py') | ||
session.run('py.test', | ||
'--quiet', | ||
'tests/system.py') |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
future = None | ||
while future is None: | ||
future = batch.publish(message) | ||
self._batches[topic] = backup |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -72,7 +72,7 @@ def test_init_infinite_latency(): | |||
assert batch._thread is None | |||
|
|||
|
|||
@mock.patch.object(threading, 'Lock') | |||
@mock.patch.object(threading, 'RLock') | |||
def test_make_lock(Lock): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -203,6 +203,7 @@ def test_blocking__commit_wrong_messageid_length(): | |||
|
|||
def test_monitor(): | |||
batch = create_batch(max_latency=5.0) | |||
batch._size = 1 |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This PR doesn't address the order-of-publishing concerns expressed by @kir-titievsky:
|
No description provided.