Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBEX] create Form526DocumentUploadPollingJob #17473

Merged
7 changes: 7 additions & 0 deletions app/models/lighthouse526_document_upload.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
class Lighthouse526DocumentUpload < ApplicationRecord
include AASM

POLLING_WINDOW = 1.hour
VETERAN_UPLOAD_DOCUMENT_TYPE = 'Veteran Upload'
BDD_INSTRUCTIONS_DOCUMENT_TYPE = 'BDD Instructions'
FORM_0781_DOCUMENT_TYPE = 'Form 0781'
Expand All @@ -24,6 +25,12 @@ class Lighthouse526DocumentUpload < ApplicationRecord
# Veteran Uploads must reference a FormAttachment record, where a Veteran-submitted file is stored
validates :form_attachment, presence: true, if: :veteran_upload?

# Window for polling Lighthouse for the status of an upload
scope :status_update_required, lambda {
where(arel_table[:status_last_polled_at].lt(POLLING_WINDOW.ago.utc))
.or(where(status_last_polled_at: nil))
}

aasm do
state :pending, initial: true
state :completed, :failed
Expand Down
84 changes: 84 additions & 0 deletions app/sidekiq/lighthouse/form526_document_upload_polling_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# frozen_string_literal: true

require 'lighthouse/benefits_documents/form526/documents_status_polling_service'
require 'lighthouse/benefits_documents/form526/update_documents_status_service'

module Lighthouse
class Form526DocumentUploadPollingJob
include Sidekiq::Job
# Job runs every hour; ensure retries happen within the same window to prevent duplicate polling of documents
# 7 retries = retry for ~42 minutes
# See Sidekiq documentation for exponential retry formula:
# https://github.com/sidekiq/sidekiq/wiki/Error-Handling#automatic-job-retry
sidekiq_options retry: 7

POLLED_BATCH_DOCUMENT_COUNT = 100
STATSD_KEY_PREFIX = 'worker.lighthouse.poll_form526_document_uploads'

sidekiq_retries_exhausted do |msg, _ex|
job_id = msg['jid']
error_class = msg['error_class']
error_message = msg['error_message']

StatsD.increment("#{STATSD_KEY_PREFIX}.exhausted")

Rails.logger.warn(
'Lighthouse::Form526DocumentUploadPollingJob retries exhausted',
{ job_id:, error_class:, error_message:, timestamp: Time.now.utc }
)
rescue => e
Rails.logger.error(
'Failure in Form526DocumentUploadPollingJob#sidekiq_retries_exhausted',
{
messaged_content: e.message,
job_id:,
pre_exhaustion_failure: {
error_class:,
error_message:
}
}
)
end

def perform
Lighthouse526DocumentUpload.pending.status_update_required.in_batches(
of: POLLED_BATCH_DOCUMENT_COUNT
) do |document_batch|
lighthouse_document_request_ids = document_batch.pluck(:lighthouse_document_request_id)
response = BenefitsDocuments::Form526::DocumentsStatusPollingService.call(lighthouse_document_request_ids)

if response.status == 200
freeheeling marked this conversation as resolved.
Show resolved Hide resolved
result = BenefitsDocuments::Form526::UpdateDocumentsStatusService.call(document_batch, response.body)

if result && !result[:success]
response_struct = OpenStruct.new(result[:response])

handle_error(response_struct, response_struct.unknown_ids.map(&:to_s))
end
else
handle_error(response, lighthouse_document_request_ids)
end
rescue Faraday::ResourceNotFound => e
response_struct = OpenStruct.new(e.response)

handle_error(response_struct, lighthouse_document_request_ids)
end
end

private

def handle_error(response, lighthouse_document_request_ids)
StatsD.increment("#{STATSD_KEY_PREFIX}.polling_error")

Rails.logger.warn(
'Lighthouse::Form526DocumentUploadPollingJob status endpoint error',
{
response_status: response.status,
response_body: response.body,
lighthouse_document_request_ids:,
timestamp: Time.now.utc
}
)
end
end
end
5 changes: 3 additions & 2 deletions lib/lighthouse/benefits_documents/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ class Configuration < Common::Client::Configuration::REST

SYSTEM_NAME = 'VA.gov'
API_SCOPES = %w[documents.read documents.write].freeze
DOCUMENTS_PATH = 'services/benefits-documents/v1/documents'
DOCUMENTS_STATUS_PATH = 'services/benefits-documents/v1/uploads/status'
BASE_PATH = 'services/benefits-documents/v1'
DOCUMENTS_PATH = "#{BASE_PATH}/documents".freeze
DOCUMENTS_STATUS_PATH = "#{BASE_PATH}/uploads/status".freeze
TOKEN_PATH = 'oauth2/benefits-documents/system/v1/token'
QA_TESTING_DOMAIN = 'https://dev-api.va.gov'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ def initialize(lighthouse526_document_uploads, lighthouse_status_response)

def call
update_documents_status

unknown_ids = @lighthouse_status_response.dig('data', 'requestIdsNotFound')

return { success: true } if unknown_ids.blank?

{ success: false, response: { status: 404, body: 'Upload Request Async Status Not Found', unknown_ids: } }
end

private
Expand Down
215 changes: 215 additions & 0 deletions spec/sidekiq/lighthouse/form526_document_upload_polling_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe Lighthouse::Form526DocumentUploadPollingJob, type: :job do
before do
Sidekiq::Job.clear_all
# NOTE: to re-record the VCR cassettes for these tests:
# 1. Comment out the line below stubbing the token
# 2. Include both a valid Lighthouse client_id and rsa_key in config/settings/test.local.yml:
# lighthouse:
# auth:
# ccg:
# client_id: <MY CLIENT ID>
# rsa_key: <MY RSA KEY PATH>
# To generate the above credentials refer to this tutorial:
# https://developer.va.gov/explore/api/benefits-documents/client-credentials
allow_any_instance_of(BenefitsDocuments::Configuration).to receive(:access_token).and_return('abcd1234')
end

describe '#perform' do
shared_examples 'document status updates' do |state, request_id, cassette|
around { |example| VCR.use_cassette(cassette) { example.run } }

let!(:document) { create(:lighthouse526_document_upload, lighthouse_document_request_id: request_id) }

it 'updates document status' do
described_class.new.perform
expect(document.reload.aasm_state).to eq(state)
expect(document.reload.lighthouse_processing_ended_at).not_to be_nil
expect(document.reload.last_status_response).not_to be_nil
end

it 'saves the status_last_polled_at time' do
polling_time = DateTime.new(1985, 10, 26).utc
Timecop.freeze(polling_time) do
described_class.new.perform
expect(document.reload.status_last_polled_at).to eq(polling_time)
end
end
end

# End-to-end integration test - completion
context 'for a document that has completed' do
# Completed Lighthouse QA environment document requestId provided by Lighthouse for end-to-end testing
it_behaves_like 'document status updates', 'completed', 22,
'lighthouse/benefits_claims/documents/form526_document_upload_status_complete'
end

context 'for a document that has failed' do
# Failed Lighthouse QA environment document requestId provided by Lighthouse for end-to-end testing
it_behaves_like 'document status updates', 'failed', 16_819,
'lighthouse/benefits_claims/documents/form526_document_upload_status_failed'
end

context 'for a single document request whose status is not found' do
# Non-existent Lighthouse QA environment document requestId
let!(:unknown_document) { create(:lighthouse526_document_upload, lighthouse_document_request_id: 21) }
let(:error_body) do
{ 'errors' => [{ 'detail' => 'Upload Request Async Status Not Found', 'status' => 404,
'title' => 'Not Found', 'instance' => '062dd917-a229-42d7-ad39-741eb81766a8',
'diagnostics' => '7YODuWbVvC0k+iFgaQC0SrlARmYKPKz4' }] }
end

around do |example|
VCR.use_cassette('lighthouse/benefits_claims/documents/form526_document_upload_status_not_found') do
example.run
end
end

it 'increments a StatsD counter and logs error' do
expect(StatsD).to receive(:increment).with('worker.lighthouse.poll_form526_document_uploads.polling_error')

Timecop.freeze(Time.new(1985, 10, 26).utc) do
expect(Rails.logger).to receive(:warn).with(
'Lighthouse::Form526DocumentUploadPollingJob status endpoint error',
hash_including(response_status: 404, response_body: error_body,
lighthouse_document_request_ids: [unknown_document.lighthouse_document_request_id])
)
described_class.new.perform
end
end
end

context 'for a document with status and another document whose request id is not found' do
let!(:complete_document) { create(:lighthouse526_document_upload, lighthouse_document_request_id: 22) }
let!(:unknown_document) { create(:lighthouse526_document_upload, lighthouse_document_request_id: 21) }

around do |example|
VCR.use_cassette('lighthouse/benefits_claims/documents/form526_document_upload_with_request_ids_not_found') do
example.run
end
end

it 'increments StatsD counters for both documents and logs unknown document error' do
expect(StatsD).to receive(:increment)
.with('api.form526.lighthouse_document_upload_processing_status.bdd_instructions.complete').ordered
expect(StatsD).to receive(:increment)
.with('worker.lighthouse.poll_form526_document_uploads.polling_error').ordered

Timecop.freeze(Time.new(1985, 10, 26).utc) do
expect(Rails.logger).to receive(:warn).with(
'Lighthouse::Form526DocumentUploadPollingJob status endpoint error',
hash_including(response_status: 404, response_body: 'Upload Request Async Status Not Found',
lighthouse_document_request_ids: [unknown_document.lighthouse_document_request_id])
)
described_class.new.perform
end
end
end

context 'non-200 failure response from Lighthouse' do
let!(:pending_document) { create(:lighthouse526_document_upload) }
# Error body example from: https://dev-developer.va.gov/explore/api/benefits-documents/docs?version=current
let(:error_body) { { 'errors' => [{ 'detail' => 'Code must match \'^[A-Z]{2}$\'', 'status' => 400 }] } }
let(:error_response) { Faraday::Response.new(response_body: error_body, status: 400) }

before do
allow(BenefitsDocuments::Form526::DocumentsStatusPollingService).to receive(:call).and_return(error_response)
end

it 'increments a StatsD counter and logs error' do
expect(StatsD).to receive(:increment).with('worker.lighthouse.poll_form526_document_uploads.polling_error')

Timecop.freeze(Time.new(1985, 10, 26).utc) do
expect(Rails.logger).to receive(:warn).with(
'Lighthouse::Form526DocumentUploadPollingJob status endpoint error',
hash_including(response_status: 400, response_body: error_body,
lighthouse_document_request_ids: [pending_document.lighthouse_document_request_id])
)
described_class.new.perform
end
end
end

context 'retries exhausted' do
it 'updates the exhaustion StatsD counter' do
described_class.within_sidekiq_retries_exhausted_block do
expect(StatsD).to receive(:increment).with('worker.lighthouse.poll_form526_document_uploads.exhausted')
end
end

it 'logs exhaustion metadata to the Rails logger' do
exhaustion_time = DateTime.new(1985, 10, 26).utc
sidekiq_exhaustion_metadata = { 'jid' => 8_675_309, 'error_class' => 'BROKESKI',
'error_message' => 'We are going to need a bigger boat' }
Timecop.freeze(exhaustion_time) do
described_class.within_sidekiq_retries_exhausted_block(sidekiq_exhaustion_metadata) do
expect(Rails.logger).to receive(:warn).with(
'Lighthouse::Form526DocumentUploadPollingJob retries exhausted',
{
job_id: 8_675_309,
error_class: 'BROKESKI',
error_message: 'We are going to need a bigger boat',
timestamp: exhaustion_time
}
)
end
end
end
end

describe 'Documents Polling' do
let(:faraday_response) { instance_double(Faraday::Response, body: {}, status: 200) }
let(:polling_service) { BenefitsDocuments::Form526::DocumentsStatusPollingService }
let(:polling_time) { DateTime.new(1985, 10, 26).utc }

before do
# Verifies correct info is being passed to both services
allow(BenefitsDocuments::Form526::DocumentsStatusPollingService).to receive(:call).and_return(faraday_response)
allow(BenefitsDocuments::Form526::UpdateDocumentsStatusService)
.to receive(:call).and_return(success: true, response: { status: 200 })
end

context 'for a pending document' do
around { |example| Timecop.freeze(polling_time) { example.run } }

it 'polls for unpolled and repoll documents' do
documents = [
create(:lighthouse526_document_upload),
create(:lighthouse526_document_upload, status_last_polled_at: polling_time - 2.hours)
]
document_request_ids = documents.map(&:lighthouse_document_request_id)

expect(polling_service).to receive(:call).with(document_request_ids)
described_class.new.perform
end

it 'does not poll for recently polled documents' do
recently_polled_document = create(:lighthouse526_document_upload,
status_last_polled_at: polling_time - 42.minutes)
expect(polling_service).not_to receive(:call).with([recently_polled_document.lighthouse_document_request_id])
described_class.new.perform
end
end

context 'for completed and failed documents' do
let!(:documents) do
[
create(:lighthouse526_document_upload, aasm_state: 'completed',
status_last_polled_at: polling_time - 2.hours),
create(:lighthouse526_document_upload, aasm_state: 'failed', status_last_polled_at: polling_time - 2.hours)
]
end

it 'does not poll for completed or failed documents' do
documents.each do |doc|
expect(polling_service).not_to receive(:call).with([doc.lighthouse_document_request_id])
end
described_class.new.perform
end
end
end
end
end
Loading
Loading