Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub subscriber not processing messages after starting sometimes #16089

Closed
stevesng opened this issue Nov 23, 2021 · 13 comments
Closed

PubSub subscriber not processing messages after starting sometimes #16089

stevesng opened this issue Nov 23, 2021 · 13 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. 🚨 This issue needs some love. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@stevesng
Copy link

We have a very similar problem to #8415 but our case is more specific in that our worker (subscriber) does not process a single message upon starting by autoscaling (both time-based and load-based) sometimes. Most of the times, the worker starts and processes messages without any problem but once every few days or week, the worker starts and processes no messages at all, so much so that we have a name for it now - zombie worker.

Below are the logs and metrics captured for a recent zombie worker occurrence.

Oldest unacked message age:
Screen Shot 2021-11-16 at 11 06 35 AM

Undelivered messages number:
zombie-worker-messages

Expired ack deadlines count:
Screen Shot 2021-11-16 at 11 40 42 AM

GRPC warnings in logs:

W, [2021-11-11T06:15:25.531965 #1] WARN -- : bidi: read-loop failed
W, [2021-11-11T06:15:25.532056 #1] WARN -- : 4:Deadline Exceeded. debug_error_string:{"created":"@1636611325.531511420","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":81,"grpc_status":4} (GRPC::DeadlineExceeded)
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/active_call.rb:29:in `check_status'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:209:in `block in read_loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:195:in `loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:195:in `read_loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/bin/rake:1:in `each'
W, [2021-11-11T06:15:25.532766 #1] WARN -- : bidi: read-loop failed
W, [2021-11-11T06:15:25.532830 #1] WARN -- : 4:Deadline Exceeded. debug_error_string:{"created":"@1636611325.531513935","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":81,"grpc_status":4} (GRPC::DeadlineExceeded)
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/active_call.rb:29:in `check_status'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:209:in `block in read_loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:195:in `loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:195:in `read_loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/bin/rake:1:in `each'
W, [2021-11-11T06:15:25.533273 #1] WARN -- : bidi-write-loop: send close failed
W, [2021-11-11T06:15:25.534701 #1] WARN -- : call#run_batch failed somehow (GRPC::Core::CallError)
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:165:in `run_batch'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:165:in `write_loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:75:in `block in run_on_client'
W, [2021-11-11T06:15:25.535033 #1] WARN -- : bidi-write-loop: send close failed
W, [2021-11-11T06:15:25.535089 #1] WARN -- : call#run_batch failed somehow (GRPC::Core::CallError)
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:165:in `run_batch'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:165:in `write_loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:75:in `block in run_on_client'
W, [2021-11-11T06:30:25.532826 #1] WARN -- : bidi: read-loop failed
W, [2021-11-11T06:30:25.532899 #1] WARN -- : 4:Deadline Exceeded. debug_error_string:{"created":"@1636612225.532517910","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":81,"grpc_status":4} (GRPC::DeadlineExceeded)
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/active_call.rb:29:in `check_status'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:209:in `block in read_loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:195:in `loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:195:in `read_loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/bin/rake:1:in `each'
W, [2021-11-11T06:30:25.533315 #1] WARN -- : bidi-write-loop: send close failed
W, [2021-11-11T06:30:25.533533 #1] WARN -- : call#run_batch failed somehow (GRPC::Core::CallError)
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:165:in `run_batch'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:165:in `write_loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:75:in `block in run_on_client'
W, [2021-11-11T06:30:25.535103 #1] WARN -- : bidi: read-loop failed
W, [2021-11-11T06:30:25.535164 #1] WARN -- : 4:Deadline Exceeded. debug_error_string:{"created":"@1636612225.534486605","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":81,"grpc_status":4} (GRPC::DeadlineExceeded)
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/active_call.rb:29:in `check_status'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:209:in `block in read_loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:195:in `loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:195:in `read_loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/bin/rake:1:in `each'
W, [2021-11-11T06:30:25.536260 #1] WARN -- : bidi-write-loop: send close failed
W, [2021-11-11T06:30:25.536758 #1] WARN -- : call#run_batch failed somehow (GRPC::Core::CallError)
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:165:in `run_batch'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:165:in `write_loop'
/var/www/app/rails_shared/bundle/ruby/2.5.0/gems/grpc-1.36.0-x86_64-linux/src/ruby/lib/grpc/generic/bidi_call.rb:75:in `block in run_on_client'

At the time of the occurrence, there were 31 other workers running and all are completing jobs except the zombie worker:

Screen Shot 2021-11-11 at 2 42 1

We are not sure if this is a client (subscriber) issue or server issue but we are very sure it is not caused by our application because after killing the zombie worker, the messages are re-delivered to another (normal) worker and it has no problem processing the messages.

During another occurrence, we managed to captured the thread status of the zombie worker’s process too:

Screen Shot 2021-11-16 at 6 01 1

Environment details

  • OS: GKE (Linux)
  • Ruby version: 2.5.0
  • Gem name and version: google-cloud-pubsub (2.6.1)

Steps to reproduce

  1. Use local copy of google-cloud-pubsub, remove @inventory.add response.received_messages and register_callback rec_msg in stream.rb.
  2. Start only 1 worker (subscriber) that subscribe to a topic
  3. Publish batch of 80 messages to same topic
  4. Wait for 15 minutes

The reason I removed these 2 lines of codes is our logs show no jobs completed and no errors thrown, and our metrics show no messages added to inventory so they were never executed anyway. And indeed, with these steps, I am able to reproduce locally the oldest unacked message age and undelivered messages metrics as well as the GRPC warnings in the logs:

W, [2021-11-22T15:23:51.262995 #98511]  WARN -- : bidi: read-loop failed
W, [2021-11-22T15:23:51.263064 #98511]  WARN -- : 4:Deadline Exceeded. debug_error_string:{"created":"@1637565831.262731000","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":81,"grpc_status":4} (GRPC::DeadlineExceeded)
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/active_call.rb:29:in `check_status'
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/bidi_call.rb:209:in `block in read_loop'
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/bidi_call.rb:195:in `loop'
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/bidi_call.rb:195:in `read_loop'
/Users/steve/.rbenv/versions/2.5.8/bin/rake:1:in `each'
W, [2021-11-22T15:23:51.263597 #98511]  WARN -- : bidi: read-loop failed
W, [2021-11-22T15:23:51.264036 #98511]  WARN -- : 4:Deadline Exceeded. debug_error_string:{"created":"@1637565831.263365000","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":81,"grpc_status":4} (GRPC::DeadlineExceeded)
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/active_call.rb:29:in `check_status'
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/bidi_call.rb:209:in `block in read_loop'
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/bidi_call.rb:195:in `loop'
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/bidi_call.rb:195:in `read_loop'
/Users/steve/.rbenv/versions/2.5.8/bin/rake:1:in `each'
W, [2021-11-22T15:23:51.264693 #98511]  WARN -- : bidi-write-loop: send close failed
W, [2021-11-22T15:23:51.265109 #98511]  WARN -- : call#run_batch failed somehow (GRPC::Core::CallError)
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/bidi_call.rb:165:in `run_batch'
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/bidi_call.rb:165:in `write_loop'
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/bidi_call.rb:75:in `block in run_on_client'
W, [2021-11-22T15:23:51.264293 #98511]  WARN -- : bidi-write-loop: send close failed
W, [2021-11-22T15:23:51.265388 #98511]  WARN -- : call#run_batch failed somehow (GRPC::Core::CallError)
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/bidi_call.rb:165:in `run_batch'
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/bidi_call.rb:165:in `write_loop'
/Users/steve/.rbenv/versions/2.5.8/lib/ruby/gems/2.5.0/gems/grpc-1.36.0-universal-darwin/src/ruby/lib/grpc/generic/bidi_call.rb:75:in `block in run_on_client'

If I do not remove @inventory.add response.received_messages, GRPC::Unavailable warnings will be thrown sporadically for the empty request made every 30 seconds. If I do not remove both lines of codes, no GRPC warnings are thrown and there is no problem at all, even when no messages are published.

However, I am not able to reproduce the unusual expired ack deadlines metric, which leads us to believe the issue might be on the server side or the GRPC communication in between. Is that possible?

Code example

These are the codes I use to publish batch of 80 messages to the topic for testing:

messages = []
80.times do |n|
  message = {}
  message['id'] = "#{Time.now.to_i}#{n}"
  message['content'] = Faker::Lorem.paragraph(10)
  messages << message.to_json
end

pubsub = Google::Cloud::PubSub.new(
  project_id: 'PROJECT_ID',
  credentials: "/path/to/keyfile.json"
)

topic = pubsub.topic 'TOPIC_NAME'

topic.publish do |batch_publisher|
  messages.each do |message|
    batch_publisher.publish message
  end
end

Our worker's subscriber is operated similarly to the example provided but with the following configuration:

configuration = {
  deadline: 10,
  streams: 2,
  inventory: {
    max_outstanding_messages: 80,
    max_total_lease_duration: 20
  },
  threads: { callback: 8, push: 4 }
}

pubsub.subscription('TOPIC_NAME', skip_lookup: true).listen configuration do |received_message|
  process received_message
end
@quartzmo quartzmo self-assigned this Nov 23, 2021
@quartzmo quartzmo added api: pubsub Issues related to the Pub/Sub API. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p2 Moderately-important priority. Fix may not be included in next release. labels Nov 23, 2021
@quartzmo
Copy link
Member

@stevesng Thank you for opening this superbly-documented issue. I will spend some time on this and follow your steps to reproduce today.

@kamalaboulhosn If you are available and have time, can you please review this issue and consider the possibilities on the service side?

@kamalaboulhosn
Copy link
Contributor

kamalaboulhosn commented Nov 23, 2021

Determining if it was a server-side issue would require a support request being put in via the Cloud Console as we would have to look up information about the subscriber on the backend. If the messages are getting delivered to the subscriber client and upon shutting down the subscriber, the messages are successfully processed by other subscribers, that points more to a client-side issue than a server-side issue (which is possibly what the "expired acks" graph is showing).

@quartzmo
Copy link
Member

I attempted to reproduce the error by following the steps above, but after 1 hour my streams are still working as expected (abridged output of gRPC logging plus some added print statements):

D, [2021-11-23T15:24:19.925557 #97633] DEBUG -- : bidi-read-loop: 30
D, [2021-11-23T15:24:20.069178 #97633] DEBUG -- : calling pubsub.googleapis.com:/google.pubsub.v1.Subscriber/ModifyAckDeadline
stream_pool: [#<Google::Cloud::PubSub::Subscriber::Stream (inventory: 0, status: running, thread: sleep)>, #<Google::Cloud::PubSub::Subscriber::Stream (inventory: 0, status: running, thread: sleep)>]
stream_pool: [#<Google::Cloud::PubSub::Subscriber::Stream (inventory: 0, status: running, thread: sleep)>, #<Google::Cloud::PubSub::Subscriber::Stream (inventory: 0, status: running, thread: sleep)>]
modified Stream: skipping @inventory.add response.received_messages
modified Stream: skipping register_callback rec_msg
modified Stream: skipping register_callback rec_msg

@stevesng
Copy link
Author

Thanks @quartzmo and @kamalaboulhosn for the quick response.

Unfortunately, we are currently on the basic support plan and couldn't make a technical support request from Cloud Console. Is there another way we can make the request?

What we find particularly unusual about the expired ack deadlines is that they are thin spikes every 15 minutes which coincide with the timeout of streaming pull request and the GRPC warnings.

Here is another period of about 3 hours during which a zombie worker was running:
Screen Shot 2021-11-24 at 12 18 26 PM

Is it possible the messages are locked in a "sending" state whereby the client has not fully received them due to some network issue and the server cannot deliver them to another subscriber because they are still "sending"? And only after the client gives up does the server deliver the messages to another?

quartzmo added a commit to quartzmo/google-cloud-ruby that referenced this issue Nov 24, 2021
@quartzmo
Copy link
Member

@stevesng My repro files are available on my fork at deadlock_pub.rb and deadlock_sub.rb, if you have time to look to see if I missed anything. A couple hours of runtime yesterday in my local environment did not reproduce the call#run_batch failed somehow (GRPC::Core::CallError). If you think it's worthwhile, I can try running on GKE as well.

@quartzmo
Copy link
Member

@stevesng You provided this example:

pubsub.subscription('TOPIC_NAME', skip_lookup: true).listen configuration do |received_message|
  process received_message
end

Which unlike the example provided that you also referenced is missing received_message.acknowledge!. Should the ack be included in my repro or not?

@stevesng
Copy link
Author

My apologies, our worker runs from a rake task with rails app environment so it is more complicated than the code example I have previously given.

I am able to simplify the rake task to the following:

# frozen_string_literal: true

module GRPCLogger
  LOGGER = Logger.new $stderr, level: Logger::WARN
  def logger
    LOGGER
  end
end

module GRPC
  extend GRPCLogger
end

namespace :worker do
  task :run do
    require 'google/cloud/pubsub'

    puts 'Starting worker'

    pubsub = Google::Cloud::PubSub.new(
      project: 'PROJECT_NAME',
      keyfile: '/path/to/keyfile.json'
    )

    configuration = {
      deadline: 10,
      streams: 2,
      inventory: {
        max_outstanding_messages: 80,
        max_total_lease_duration: 20
      },
      threads: { callback: 8, push: 4 }
    }

    subscriber = pubsub.subscription('TOPIC_NAME', skip_lookup: true).listen configuration do |received_message|
      # process received_message
      puts "Data: #{received_message.message.data}, published at #{received_message.message.published_at}"
      received_message.acknowledge!
    end

    subscriber.on_error do |exception|
      puts "Exception: #{exception.class} #{exception.message}"
    end

    at_exit do
      subscriber.stop!(10)
    end

    subscriber.start

    sleep
  end
end

I tested it locally and it still gave the GRPC warnings exactly 15 minutes after starting.

@stevesng
Copy link
Author

@quartzmo I pulled your fork and ran deadlock_sub.rb, it did gave GRPC::Core::CallError and GRPC::DeadlineExceeded at the same time after 15 minutes. However, I'm not very sure about these errors/warnings now as I noticed some of our normal workers gave them 15 minutes after starting too.

@tseaver
Copy link

tseaver commented Nov 30, 2021

FWIW, the Python Firestore client has an apparent auth-related Bidi stream issue which might be relevant.

@quartzmo
Copy link
Member

quartzmo commented Nov 30, 2021

Possibly related python-pubsub issue:

Without any apparent incident, the pod disconnects from the stream, then seems to reconnect... However it does not process any message afterwards

and python-api-core gRPC auth refresh issue.

@quartzmo quartzmo removed their assignment Dec 20, 2021
@meredithslota
Copy link
Contributor

googleapis/python-api-core#223 is still open, re-classified as a feature request.
googleapis/python-pubsub#504 is still open, user filed a support request. I will see if I can dig up anything useful from that.

@stevesng
Copy link
Author

We would like to share our workaround that has worked surprisingly well since we started employing it about 3 months ago.

Our original goal was to automatically detect and kill the zombie workers but it turned out none was detected (and thus none required killing).

There are 2 parts to the workaround. The first part is to monitor the liveness of the worker and kill it if it does not process any message. The second part is for the worker itself to publish message to the same topic it is subscribed to when it is not processing any message.

My suspicion is establishing a gRPC connection to publish message before receiving message prevented the worker from becoming zombie.

For those who are interested, here are our codes:

# frozen_string_literal: true

namespace :worker do
  task :run do
    require 'google/cloud/pubsub'

    puts 'Starting worker'

    pubsub = Google::Cloud::PubSub.new(
      project: 'PROJECT_NAME',
      keyfile: '/path/to/keyfile.json'
    )

    configuration = {
      deadline: 10,
      streams: 2,
      inventory: {
        max_outstanding_messages: 80,
        max_total_lease_duration: 20
      },
      threads: { callback: 8, push: 4 }
    }

    topic_name = 'TOPIC_NAME'
    worker_id = "#{Socket.gethostname}-#{Process.pid}"

    ECHO_JOB_CLASS = 'PubsubEchoJob'

    subscriber = pubsub.subscription(topic_name, skip_lookup: true).listen configuration do |received_message|
      job_data = JSON.parse(received_message.data)
      if job_data['job_class'] == ECHO_JOB_CLASS
        puts 'Echo message was received.'
        # acknowledge echo message if it is sent by this worker or sent more than 10 seconds ago
        if job_data['worker_id'] == worker_id || (Time.now.to_i - job_data['sent_at'] > 10)
          received_message.acknowledge!
        else
          received_message.reject!
        end
      else
        # process received_message
        puts "Data: #{received_message.message.data}, published at #{received_message.message.published_at}"
        received_message.acknowledge!
      end
    end

    subscriber.on_error do |exception|
      puts "Exception: #{exception.class} #{exception.message}"
    end

    at_exit do
      subscriber.stop!(10)
    end

    subscriber.start

    liveness_check_task = begin
      @last_completed_messages = 0

      task = Concurrent::TimerTask.new(
        execution_interval: ENV['LIVENESS_CHECK_INTERVAL'] || 60
      ) do
        total_processing_messages = 0
        total_completed_messages = 0
        subscriber.stream_pool.each do |stream|
          thread_pool = stream.send(:callback_thread_pool)
          scheduled_messages = thread_pool.scheduled_task_count
          completed_messages = thread_pool.completed_task_count
          awaiting_messages = thread_pool.queue_length
          total_processing_messages += (scheduled_messages - completed_messages - awaiting_messages)
          total_completed_messages += completed_messages
        end
        last_completed_messages = @last_completed_messages
        latest_completed_messages = @last_completed_messages = total_completed_messages
        (latest_completed_messages > last_completed_messages) || (total_processing_messages > 0)
      end

      task.with_observer do |_time, working|
        liveness_check_filename = ENV['LIVENESS_CHECK_FILENAME'] || '/tmp/healthy'
        puts "Worker has #{'not ' unless working}been working (completed #{@last_completed_messages} jobs)"
        if working
          FileUtils.touch(liveness_check_filename)
        else
          FileUtils.rm_f(liveness_check_filename)
          @topic ||= pubsub.subscription(topic_name, skip_lookup: true).topic
          @topic.publish({
            job_class: ECHO_JOB_CLASS,
            worker_id: worker_id,
            sent_at: Time.now.to_i
          }.to_json)
          puts 'Echo message was sent.'
        end
      end
    end

    liveness_check_task.execute

    sleep
  end
end

@yoshi-automation yoshi-automation added the 🚨 This issue needs some love. label May 22, 2022
@NivedhaSenthil
Copy link
Member

Tried replication with the latest version and do not see this happening. Closing this issue for now... please feel free to reopen if the issue still persists.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. 🚨 This issue needs some love. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

7 participants