-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add delay on entity re-ingestion on failed attempt #298
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,6 +148,21 @@ def parse_options(argv) | |
opts[:queue_name] = queue | ||
end | ||
|
||
o.on('-i', '--ingestion_prefix PREFIX', | ||
'Prefix for keys used in counting the number of failed ingestion attempts') do |prefix| | ||
opts[:ingestion_prefix] = prefix | ||
end | ||
|
||
o.on('-x', '--ingestion_attempts NUMBER', Integer, | ||
'Max number of attempts to try ingesting an entity') do |ingestion_attempts| | ||
opts[:ingestion_attempts] = ingestion_attempts | ||
end | ||
|
||
o.on('-f', '--first_failed_wait NUMBER', Integer, | ||
'Time in seconds to wait after first failed deposit. Time will double every failed attempt') do |failed_wait| | ||
opts[:first_failed_wait] = failed_wait | ||
end | ||
|
||
o.separator '' | ||
o.separator 'Common options:' | ||
|
||
|
@@ -183,16 +198,10 @@ def rotate_logs | |
|
||
def run_preservation_cycle | ||
begin | ||
entity_json = queue.wait_next_item | ||
return unless entity_json | ||
|
||
# jupiter is submitting the entries to reddis in a hash format using fat arrows. We need to change them to colons | ||
# in order to parse them correctly from json | ||
entity = JSON.parse(entity_json.gsub('=>', ':'), { symbolize_names: true }) | ||
Comment on lines
-189
to
-191
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this no longer an issue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To my understanding, this was an old error where the items were being added to the queue in an incorrect manner that added the fat arrows. The new versions of jupiter and pmpy will be adding correct json. |
||
return unless entity[:type].present? && entity[:uuid].present? | ||
entity = queue.wait_next_item | ||
return unless entity && entity[:type].present? && entity[:uuid].present? | ||
rescue StandardError => e | ||
Rollbar.error(e) | ||
logger.error(e) | ||
log_exception(e) | ||
end | ||
|
||
# add additional information about the error context to errors that occur while processing this item. | ||
|
@@ -209,7 +218,11 @@ def run_preservation_cycle | |
# readding it to the queue as it will always fail | ||
rescue PushmiPullyu::AIP::EntityInvalid => e | ||
rescue StandardError => e | ||
queue.add_entity_json(entity_json) | ||
begin | ||
queue.add_entity_in_timeframe(entity) | ||
rescue MaxDepositAttemptsReached => e | ||
log_exception(e) | ||
end | ||
Comment on lines
+221
to
+225
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This bit of code will log when an entity reaches the max number of attempts for ingestion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ConnorSheremeta do you think this would be enough to keep track of the failed items? It would mean going through the logs though, not sure if this is the best approach. Always open to ideas! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filtering through the logs could suffice although it may not be ideal, I suppose these failures could be logged to another file but that may not be worth doing as this information can be filtered as needed |
||
|
||
# rubocop:disable Lint/RescueException | ||
# Something other than a StandardError exception means something happened which we were not expecting! | ||
|
@@ -218,8 +231,7 @@ def run_preservation_cycle | |
raise e | ||
# rubocop:enable Lint/RescueException | ||
ensure | ||
Rollbar.error(e) | ||
logger.error(e) | ||
log_exception(e) | ||
end | ||
end | ||
|
||
|
@@ -294,4 +306,9 @@ def start_server_as_daemon | |
end | ||
end | ||
|
||
def log_exception(exception) | ||
Rollbar.error(exception) | ||
logger.error(exception) | ||
end | ||
|
||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
class PushmiPullyu::PreservationQueue | ||
|
||
class ConnectionError < StandardError; end | ||
class MaxDepositAttemptsReached < StandardError; end | ||
|
||
def initialize(redis_url: 'redis://localhost:6379', | ||
pool_opts: { size: 1, timeout: 5 }, | ||
|
@@ -50,7 +51,8 @@ def next_item | |
rd.multi do |tx| | ||
tx.zrem(@queue_name, element) # remove the top element transactionally | ||
end | ||
return element | ||
|
||
return JSON.parse(element, { symbolize_names: true }) | ||
else | ||
rd.unwatch # cancel the transaction since there was nothing in the queue | ||
return nil | ||
|
@@ -68,12 +70,27 @@ def wait_next_item | |
end | ||
end | ||
|
||
def add_entity_json(entity_json) | ||
def add_entity_in_timeframe(entity) | ||
entity_attempts_key = "#{PushmiPullyu.options[:ingestion_prefix]}#{entity[:uuid]}" | ||
|
||
@redis.with do |connection| | ||
connection.zadd @queue_name, Time.now.to_f, entity_json | ||
# separate information for priority information and queue | ||
deposit_attempt = connection.incr entity_attempts_key | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would happen if we deploy this and there are items already in the queue which wouldn't have an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from https://devdocs.io/redis/incr
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is correct, the normal behaviour would continue |
||
|
||
if deposit_attempt <= PushmiPullyu.options[:ingestion_attempts] | ||
connection.zadd @queue_name, Time.now.to_f + self.class.extra_wait_time(deposit_attempt), | ||
entity.slice(:uuid, :type).to_json | ||
else | ||
connection.del entity_attempts_key | ||
raise MaxDepositAttemptsReached | ||
end | ||
end | ||
end | ||
|
||
def self.extra_wait_time(deposit_attempt) | ||
(2**deposit_attempt) * PushmiPullyu.options[:first_failed_wait] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
end | ||
|
||
protected | ||
|
||
def connected? | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,20 +8,24 @@ | |
|
||
before do | ||
PushmiPullyu.server_running = true | ||
redis.zadd 'test:pmpy_queue', 1, 'noid1' | ||
redis.zadd 'test:pmpy_queue', 3, 'noid3' | ||
redis.zadd 'test:pmpy_queue', 4, 'noid2' | ||
Comment on lines
-11
to
-13
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These values where just used for this test. |
||
redis.zadd 'test:pmpy_queue', 10, 'noid1' | ||
redis.zadd 'test:pmpy_queue', 1, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c1","type":"items"}' | ||
redis.zadd 'test:pmpy_queue', 3, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c3","type":"items"}' | ||
redis.zadd 'test:pmpy_queue', 4, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c2","type":"items"}' | ||
redis.zadd 'test:pmpy_queue', 10, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c1","type":"items"}' | ||
end | ||
|
||
after do | ||
redis.del 'test:pmpy_queue' | ||
end | ||
|
||
it 'retrieves 3 items in priority order' do | ||
expect(queue.wait_next_item).to eq 'noid3' | ||
expect(queue.wait_next_item).to eq 'noid2' | ||
expect(queue.wait_next_item).to eq 'noid1' | ||
next_item = queue.wait_next_item | ||
|
||
expect(next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c3' | ||
next_item = queue.wait_next_item | ||
expect(next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c2' | ||
next_item = queue.wait_next_item | ||
expect(next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c1' | ||
end | ||
end | ||
|
||
|
@@ -32,7 +36,7 @@ | |
let!(:redis) { Redis.new } | ||
|
||
before do | ||
redis.zadd 'test:pmpy_queue', Time.now.to_f, 'noid1' | ||
redis.zadd 'test:pmpy_queue', Time.now.to_f, '{"uuid":"9e3be94f-a5de-4589-96ca-b18efba280c1","type":"items"}' | ||
end | ||
|
||
after do | ||
|
@@ -50,7 +54,7 @@ | |
expect(queue.next_item).to be_nil | ||
|
||
Timecop.travel(now + 15.minutes) | ||
expect(queue.next_item).to eq 'noid1' | ||
expect(queue.next_item[:uuid]).to eq '9e3be94f-a5de-4589-96ca-b18efba280c1' | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth adding these new options to the README
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are there, do let me know if I can clarify it though.
https://github.com/ualbertalib/pushmi_pullyu/pull/298/files#diff-b335630551682c19a781afebcf4d07bf978fb1f8ac04c6bf87428ed5106870f5