-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Pub/Sub] Publisher stops publishing after RetryError Deadline of 600 exceeded error not being surfaced #7822
Comments
@jam182 Just as a sanity check, how does your publisher client code look like? Does it use the Something like the following: publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")
while True:
msg = b"some message"
future = publisher.publish(topic_path, msg)
try:
result = future.result()
except Exception as ex:
# handle exception
time.sleep(3) I was able to replicate the reported behavior (an error not surfacing to the publisher code) if I faked an error in the underlying channel, and also ignored the I also tried an alternative approach that uses def my_callback(future):
result = future.result()
# do something with result
...
while True:
...
future = publisher.publish(topic_path, msg)
future.add_done_callback(my_callback)
... In this case, the exception occurred in the callback (in Could any of these two scenarios be applicable to your case? |
I'd say it's a combination of both. the code would look something like this: logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)
events = Queue(maxsize=100000)
def check_response():
while True:
logger.debug('Checking the queue')
message = events.get(block=True)
logger.debug('Checking the message')
_callback = partial(callback, message)
message.response.add_done_callback(_callback)
def callback(message, future):
try:
logger.debug('Callback is waiting for the future')
message_id = future.result(timeout=2)
logger.debug('Future resolved: %s', message_id)
except Exception:
logger.exception('Failed to publish message. Retry.')
message.retry()
class Message(object):
"""This represents one item in the events queue."""
__slots__ = ('response', 'publish_func', 'record')
def __init__(self, response, publish_func, record):
self.response = response
self.publish_func = publish_func
self.record = record
def retry(self):
self.response = self.publish_func(self.record)
queue_size = events.qsize()
try:
events.put_nowait(self)
except Full:
logger.error('Events queue is full. Messages are not being checked anymore.')
class PubSubLogger(object):
def __init__(self, config):
self.pub_sub_client = PubSubTopicClient(config['topic_name']) # this returns the google publisher client object to always publishes on a specific project/topic
Thread(target=check_response).start()
def log_event(event):
try:
events.put_nowait(
Message(
self.pub_sub_client.publish_message(event),
self.pub_sub_client.publish_message,
event,
)
)
except Full:
self.logger.error('Clicks queue is full. Messages are not being checked anymore.') we don't really see the |
UPDATE: in the I updated our pubsub version to I think the way you reproduced the error was without calling the The only concerns is, in our retry logic we reuse the same client object in case of failure to issue a new |
@jam182 Glad to hear that updating to
Indeed, as the only way I managed to reproduce the bug was to not call try
client.publish(...)
except Exception as exc:
# does not work this way...
If repeating the call with the same client instance, the same underlying channel will be used (the channel does not get recreated on additional FWIW, a On the other hand, if a non-retryable error occurs, the underlying machinery will not attempt to retry the request, and a different exception will be propagated up to the custom script. "Manually" retrying on such errors is probably futile. |
Closing this, as it has been resolved by upgrading @jam182 Thank you for the effort on your side, too, and for providing all the details. Should the issue re-emerge, however, feel free to come back and re-open it. Thanks again! |
Often, the publisher client stops working but without surfacing the stacktrace that never reaches our code. The result is the application hanging without failing any healthcheck. For now we had to set up an alert for when we see the log in stackdriver but that is really bad.
Stacktrace
OS type and version
Alpine 3.8
Python version and virtual environment information:
python --version
2.7.14
pip freeze
Extra
It mostly happens in one particular region
asia-southeast1-b
but in general it happens in all the regions (US, EU, etc..).The text was updated successfully, but these errors were encountered: