Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing over-ride for lock used in publisher client. #4628

Merged
merged 3 commits into from
Dec 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ def __len__(self):
"""Return the number of messages currently in the batch."""
return len(self.messages)

@staticmethod
@abc.abstractmethod
def make_lock():
"""Return a lock in the chosen concurrency model.

Returns:
ContextManager: A newly created lock.
"""
raise NotImplementedError

@property
@abc.abstractmethod
def messages(self):
Expand Down
9 changes: 9 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ def __init__(self, client, topic, settings, autocommit=True):
)
self._thread.start()

@staticmethod
def make_lock():
"""Return a threading lock.

Returns:
_thread.Lock: A newly created lock.
"""
return threading.Lock()

@property
def client(self):
"""~.pubsub_v1.client.PublisherClient: A publisher client."""
Expand Down
12 changes: 8 additions & 4 deletions pubsub/google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import copy
import os
import pkg_resources
import threading

import grpc
import six
Expand All @@ -44,16 +43,21 @@ class Client(object):
Args:
batch_settings (~google.cloud.pubsub_v1.types.BatchSettings): The
settings for batch publishing.
batch_class (class): A class that describes how to handle
batch_class (Optional[type]): A class that describes how to handle
batches. You may subclass the
:class:`.pubsub_v1.publisher.batch.base.BaseBatch` class in
order to define your own batcher. This is primarily provided to
allow use of different concurrency models; the default
is based on :class:`threading.Thread`.
is based on :class:`threading.Thread`. This class should also have
a class method (or static method) that takes no arguments and
produces a lock that can be used as a context manager.
kwargs (dict): Any additional arguments provided are sent as keyword
arguments to the underlying
:class:`~.gapic.pubsub.v1.publisher_client.PublisherClient`.
Generally, you should not need to set additional keyword arguments.
Before being passed along to the GAPIC constructor, a channel may
be added if ``credentials`` are passed explicitly or if the
Pub / Sub emulator is detected as running.
"""
def __init__(self, batch_settings=(), batch_class=thread.Batch, **kwargs):
# Sanity check: Is our goal to use the emulator?
Expand Down Expand Up @@ -86,7 +90,7 @@ def __init__(self, batch_settings=(), batch_class=thread.Batch, **kwargs):
# The batches on the publisher client are responsible for holding
# messages. One batch exists for each topic.
self._batch_class = batch_class
self._batch_lock = threading.Lock()
self._batch_lock = batch_class.make_lock()
self._batches = {}

@property
Expand Down
7 changes: 7 additions & 0 deletions pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ def test_init_infinite_latency():
assert batch._thread is None


@mock.patch.object(threading, 'Lock')
def test_make_lock(Lock):
lock = Batch.make_lock()
assert lock is Lock.return_value
Lock.assert_called_once_with()


def test_client():
client = create_client()
settings = types.BatchSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import os

from google.auth import credentials
import mock

import mock
import pytest

from google.cloud.pubsub_v1.gapic import publisher_client
Expand Down