Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release the state lock before calling the publish api #7686

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 42 additions & 42 deletions pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,50 +193,50 @@ def _commit(self):
self._status = base.BatchStatus.SUCCESS
return

# Begin the request to publish these messages.
# Log how long the underlying request takes.
start = time.time()

try:
response = self._client.api.publish(self._topic, self._messages)
except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, set the exception on all futures and
# exit.
self._status = base.BatchStatus.ERROR

for future in self._futures:
future.set_exception(exc)

_LOGGER.exception("Failed to publish %s messages.", len(self._futures))
return
# Begin the request to publish these messages.
# Log how long the underlying request takes.
start = time.time()

try:
response = self._client.api.publish(self._topic, self._messages)
except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, set the exception on all futures and
# exit.
self._status = base.BatchStatus.ERROR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The self._state_lock is designed to protect setting self._status: this change undoes that protection in the case that an error occurs, which introduces a race condition AFAICT.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Tres, we have covered that in the PR description:

We only need to hold the state lock for the transition from ACCEPTING_MESSAGES / STARTING to IN_PROGRESS. After that, since only one thread is able to transition to IN_PROGRESS, we can safely release the state lock before calling the publish api and eventually transitioning to SUCCESS / ERROR.

Maybe you can elaborate on the race condition? Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL; DR - Not holding the lock after changing the state to IN_PROGRESS indeed seems safe. However, I would still like to see the effect of this change covered by a test before merging. I can help with this, if needed.


Long reply:

I checked the Batch code, and from what I can tell, that the following threads are relevant:

  • MonitorBatchPublisher (if autcommit == True): Triggers _commit() after max_latency time elapses.
  • CommitBatchPublisher: If somebody calls commit() and the batch is in ACCEPTING_MESSAGES state, _commit() gets called in this thread.
  • User thread(s): When calling publisher.publish(), batch.publish() gets called. If that causes an overflow, commit() is called in the CommitBatchPublisher thread.

The first thing that _commit() does after acquiring the lock is changing the batch status to IN_PROGRESS.

In that state, any further calls to publish() become a no-op, because the will_accept() check returns False for any state that is not ACCEPTING_MESSAGES.

Additionally, any other _commit() calls also become a no-op, because the batch status is now IN_PROGRESS.

Only the thread that changed the state to IN_PROGRESS can proceed, thus shielding the rest of the _commit() method with _state_lock indeed seems unnecessary, which includes the call to self._client.api.publish().

Without the _state_lock held, further calls to batch.publish() will not block longer than needed. The batch will simply not accept a new message, return False, and publisher.publish() will create an entirely new batch as a result.


for future in self._futures:
future.set_exception(exc)

_LOGGER.exception("Failed to publish %s messages.", len(self._futures))
return

end = time.time()
_LOGGER.debug("gRPC Publish took %s seconds.", end - start)

if len(response.message_ids) == len(self._futures):
# Iterate over the futures on the queue and return the response
# IDs. We are trusting that there is a 1:1 mapping, and raise
# an exception if not.
self._status = base.BatchStatus.SUCCESS
zip_iter = six.moves.zip(response.message_ids, self._futures)
for message_id, future in zip_iter:
future.set_result(message_id)
else:
# Sanity check: If the number of message IDs is not equal to
# the number of futures I have, then something went wrong.
self._status = base.BatchStatus.ERROR
exception = exceptions.PublishError(
"Some messages were not successfully published."
)

end = time.time()
_LOGGER.debug("gRPC Publish took %s seconds.", end - start)
for future in self._futures:
future.set_exception(exception)

if len(response.message_ids) == len(self._futures):
# Iterate over the futures on the queue and return the response
# IDs. We are trusting that there is a 1:1 mapping, and raise
# an exception if not.
self._status = base.BatchStatus.SUCCESS
zip_iter = six.moves.zip(response.message_ids, self._futures)
for message_id, future in zip_iter:
future.set_result(message_id)
else:
# Sanity check: If the number of message IDs is not equal to
# the number of futures I have, then something went wrong.
self._status = base.BatchStatus.ERROR
exception = exceptions.PublishError(
"Some messages were not successfully published."
)

for future in self._futures:
future.set_exception(exception)

_LOGGER.error(
"Only %s of %s messages were published.",
len(response.message_ids),
len(self._futures),
)
_LOGGER.error(
"Only %s of %s messages were published.",
len(response.message_ids),
len(self._futures),
)

def monitor(self):
"""Commit this batch after sufficient time has elapsed.
Expand Down