Skip to content

Commit

Permalink
[DBEX] create Form526DocumentUploadPollingJob
Browse files Browse the repository at this point in the history
  • Loading branch information
freeheeling authored Jul 17, 2024
1 parent 29233b6 commit 32d7b1f
Show file tree
Hide file tree
Showing 9 changed files with 786 additions and 2 deletions.
7 changes: 7 additions & 0 deletions app/models/lighthouse526_document_upload.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
class Lighthouse526DocumentUpload < ApplicationRecord
include AASM

POLLING_WINDOW = 1.hour
VETERAN_UPLOAD_DOCUMENT_TYPE = 'Veteran Upload'
BDD_INSTRUCTIONS_DOCUMENT_TYPE = 'BDD Instructions'
FORM_0781_DOCUMENT_TYPE = 'Form 0781'
Expand All @@ -24,6 +25,12 @@ class Lighthouse526DocumentUpload < ApplicationRecord
# Veteran Uploads must reference a FormAttachment record, where a Veteran-submitted file is stored
validates :form_attachment, presence: true, if: :veteran_upload?

# Window for polling Lighthouse for the status of an upload
scope :status_update_required, lambda {
where(arel_table[:status_last_polled_at].lt(POLLING_WINDOW.ago.utc))
.or(where(status_last_polled_at: nil))
}

aasm do
state :pending, initial: true
state :completed, :failed
Expand Down
84 changes: 84 additions & 0 deletions app/sidekiq/lighthouse/form526_document_upload_polling_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# frozen_string_literal: true

require 'lighthouse/benefits_documents/form526/documents_status_polling_service'
require 'lighthouse/benefits_documents/form526/update_documents_status_service'

module Lighthouse
class Form526DocumentUploadPollingJob
include Sidekiq::Job
# Job runs every hour; ensure retries happen within the same window to prevent duplicate polling of documents
# 7 retries = retry for ~42 minutes
# See Sidekiq documentation for exponential retry formula:
# https://github.com/sidekiq/sidekiq/wiki/Error-Handling#automatic-job-retry
sidekiq_options retry: 7

POLLED_BATCH_DOCUMENT_COUNT = 100
STATSD_KEY_PREFIX = 'worker.lighthouse.poll_form526_document_uploads'

sidekiq_retries_exhausted do |msg, _ex|
job_id = msg['jid']
error_class = msg['error_class']
error_message = msg['error_message']

StatsD.increment("#{STATSD_KEY_PREFIX}.exhausted")

Rails.logger.warn(
'Lighthouse::Form526DocumentUploadPollingJob retries exhausted',
{ job_id:, error_class:, error_message:, timestamp: Time.now.utc }
)
rescue => e
Rails.logger.error(
'Failure in Form526DocumentUploadPollingJob#sidekiq_retries_exhausted',
{
messaged_content: e.message,
job_id:,
pre_exhaustion_failure: {
error_class:,
error_message:
}
}
)
end

def perform
Lighthouse526DocumentUpload.pending.status_update_required.in_batches(
of: POLLED_BATCH_DOCUMENT_COUNT
) do |document_batch|
lighthouse_document_request_ids = document_batch.pluck(:lighthouse_document_request_id)
response = BenefitsDocuments::Form526::DocumentsStatusPollingService.call(lighthouse_document_request_ids)

if response.status == 200
result = BenefitsDocuments::Form526::UpdateDocumentsStatusService.call(document_batch, response.body)

if result && !result[:success]
response_struct = OpenStruct.new(result[:response])

handle_error(response_struct, response_struct.unknown_ids.map(&:to_s))
end
else
handle_error(response, lighthouse_document_request_ids)
end
rescue Faraday::ResourceNotFound => e
response_struct = OpenStruct.new(e.response)

handle_error(response_struct, lighthouse_document_request_ids)
end
end

private

def handle_error(response, lighthouse_document_request_ids)
StatsD.increment("#{STATSD_KEY_PREFIX}.polling_error")

Rails.logger.warn(
'Lighthouse::Form526DocumentUploadPollingJob status endpoint error',
{
response_status: response.status,
response_body: response.body,
lighthouse_document_request_ids:,
timestamp: Time.now.utc
}
)
end
end
end
5 changes: 3 additions & 2 deletions lib/lighthouse/benefits_documents/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ class Configuration < Common::Client::Configuration::REST

SYSTEM_NAME = 'VA.gov'
API_SCOPES = %w[documents.read documents.write].freeze
DOCUMENTS_PATH = 'services/benefits-documents/v1/documents'
DOCUMENTS_STATUS_PATH = 'services/benefits-documents/v1/uploads/status'
BASE_PATH = 'services/benefits-documents/v1'
DOCUMENTS_PATH = "#{BASE_PATH}/documents".freeze
DOCUMENTS_STATUS_PATH = "#{BASE_PATH}/uploads/status".freeze
TOKEN_PATH = 'oauth2/benefits-documents/system/v1/token'
QA_TESTING_DOMAIN = 'https://dev-api.va.gov'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ def initialize(lighthouse526_document_uploads, lighthouse_status_response)

def call
update_documents_status

unknown_ids = @lighthouse_status_response.dig('data', 'requestIdsNotFound')

return { success: true } if unknown_ids.blank?

{ success: false, response: { status: 404, body: 'Upload Request Async Status Not Found', unknown_ids: } }
end

private
Expand Down
215 changes: 215 additions & 0 deletions spec/sidekiq/lighthouse/form526_document_upload_polling_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe Lighthouse::Form526DocumentUploadPollingJob, type: :job do
before do
Sidekiq::Job.clear_all
# NOTE: to re-record the VCR cassettes for these tests:
# 1. Comment out the line below stubbing the token
# 2. Include both a valid Lighthouse client_id and rsa_key in config/settings/test.local.yml:
# lighthouse:
# auth:
# ccg:
# client_id: <MY CLIENT ID>
# rsa_key: <MY RSA KEY PATH>
# To generate the above credentials refer to this tutorial:
# https://developer.va.gov/explore/api/benefits-documents/client-credentials
allow_any_instance_of(BenefitsDocuments::Configuration).to receive(:access_token).and_return('abcd1234')
end

describe '#perform' do
shared_examples 'document status updates' do |state, request_id, cassette|
around { |example| VCR.use_cassette(cassette) { example.run } }

let!(:document) { create(:lighthouse526_document_upload, lighthouse_document_request_id: request_id) }

it 'updates document status' do
described_class.new.perform
expect(document.reload.aasm_state).to eq(state)
expect(document.reload.lighthouse_processing_ended_at).not_to be_nil
expect(document.reload.last_status_response).not_to be_nil
end

it 'saves the status_last_polled_at time' do
polling_time = DateTime.new(1985, 10, 26).utc
Timecop.freeze(polling_time) do
described_class.new.perform
expect(document.reload.status_last_polled_at).to eq(polling_time)
end
end
end

# End-to-end integration test - completion
context 'for a document that has completed' do
# Completed Lighthouse QA environment document requestId provided by Lighthouse for end-to-end testing
it_behaves_like 'document status updates', 'completed', 22,
'lighthouse/benefits_claims/documents/form526_document_upload_status_complete'
end

context 'for a document that has failed' do
# Failed Lighthouse QA environment document requestId provided by Lighthouse for end-to-end testing
it_behaves_like 'document status updates', 'failed', 16_819,
'lighthouse/benefits_claims/documents/form526_document_upload_status_failed'
end

context 'for a single document request whose status is not found' do
# Non-existent Lighthouse QA environment document requestId
let!(:unknown_document) { create(:lighthouse526_document_upload, lighthouse_document_request_id: 21) }
let(:error_body) do
{ 'errors' => [{ 'detail' => 'Upload Request Async Status Not Found', 'status' => 404,
'title' => 'Not Found', 'instance' => '062dd917-a229-42d7-ad39-741eb81766a8',
'diagnostics' => '7YODuWbVvC0k+iFgaQC0SrlARmYKPKz4' }] }
end

around do |example|
VCR.use_cassette('lighthouse/benefits_claims/documents/form526_document_upload_status_not_found') do
example.run
end
end

it 'increments a StatsD counter and logs error' do
expect(StatsD).to receive(:increment).with('worker.lighthouse.poll_form526_document_uploads.polling_error')

Timecop.freeze(Time.new(1985, 10, 26).utc) do
expect(Rails.logger).to receive(:warn).with(
'Lighthouse::Form526DocumentUploadPollingJob status endpoint error',
hash_including(response_status: 404, response_body: error_body,
lighthouse_document_request_ids: [unknown_document.lighthouse_document_request_id])
)
described_class.new.perform
end
end
end

context 'for a document with status and another document whose request id is not found' do
let!(:complete_document) { create(:lighthouse526_document_upload, lighthouse_document_request_id: 22) }
let!(:unknown_document) { create(:lighthouse526_document_upload, lighthouse_document_request_id: 21) }

around do |example|
VCR.use_cassette('lighthouse/benefits_claims/documents/form526_document_upload_with_request_ids_not_found') do
example.run
end
end

it 'increments StatsD counters for both documents and logs unknown document error' do
expect(StatsD).to receive(:increment)
.with('api.form526.lighthouse_document_upload_processing_status.bdd_instructions.complete').ordered
expect(StatsD).to receive(:increment)
.with('worker.lighthouse.poll_form526_document_uploads.polling_error').ordered

Timecop.freeze(Time.new(1985, 10, 26).utc) do
expect(Rails.logger).to receive(:warn).with(
'Lighthouse::Form526DocumentUploadPollingJob status endpoint error',
hash_including(response_status: 404, response_body: 'Upload Request Async Status Not Found',
lighthouse_document_request_ids: [unknown_document.lighthouse_document_request_id])
)
described_class.new.perform
end
end
end

context 'non-200 failure response from Lighthouse' do
let!(:pending_document) { create(:lighthouse526_document_upload) }
# Error body example from: https://dev-developer.va.gov/explore/api/benefits-documents/docs?version=current
let(:error_body) { { 'errors' => [{ 'detail' => 'Code must match \'^[A-Z]{2}$\'', 'status' => 400 }] } }
let(:error_response) { Faraday::Response.new(response_body: error_body, status: 400) }

before do
allow(BenefitsDocuments::Form526::DocumentsStatusPollingService).to receive(:call).and_return(error_response)
end

it 'increments a StatsD counter and logs error' do
expect(StatsD).to receive(:increment).with('worker.lighthouse.poll_form526_document_uploads.polling_error')

Timecop.freeze(Time.new(1985, 10, 26).utc) do
expect(Rails.logger).to receive(:warn).with(
'Lighthouse::Form526DocumentUploadPollingJob status endpoint error',
hash_including(response_status: 400, response_body: error_body,
lighthouse_document_request_ids: [pending_document.lighthouse_document_request_id])
)
described_class.new.perform
end
end
end

context 'retries exhausted' do
it 'updates the exhaustion StatsD counter' do
described_class.within_sidekiq_retries_exhausted_block do
expect(StatsD).to receive(:increment).with('worker.lighthouse.poll_form526_document_uploads.exhausted')
end
end

it 'logs exhaustion metadata to the Rails logger' do
exhaustion_time = DateTime.new(1985, 10, 26).utc
sidekiq_exhaustion_metadata = { 'jid' => 8_675_309, 'error_class' => 'BROKESKI',
'error_message' => 'We are going to need a bigger boat' }
Timecop.freeze(exhaustion_time) do
described_class.within_sidekiq_retries_exhausted_block(sidekiq_exhaustion_metadata) do
expect(Rails.logger).to receive(:warn).with(
'Lighthouse::Form526DocumentUploadPollingJob retries exhausted',
{
job_id: 8_675_309,
error_class: 'BROKESKI',
error_message: 'We are going to need a bigger boat',
timestamp: exhaustion_time
}
)
end
end
end
end

describe 'Documents Polling' do
let(:faraday_response) { instance_double(Faraday::Response, body: {}, status: 200) }
let(:polling_service) { BenefitsDocuments::Form526::DocumentsStatusPollingService }
let(:polling_time) { DateTime.new(1985, 10, 26).utc }

before do
# Verifies correct info is being passed to both services
allow(BenefitsDocuments::Form526::DocumentsStatusPollingService).to receive(:call).and_return(faraday_response)
allow(BenefitsDocuments::Form526::UpdateDocumentsStatusService)
.to receive(:call).and_return(success: true, response: { status: 200 })
end

context 'for a pending document' do
around { |example| Timecop.freeze(polling_time) { example.run } }

it 'polls for unpolled and repoll documents' do
documents = [
create(:lighthouse526_document_upload),
create(:lighthouse526_document_upload, status_last_polled_at: polling_time - 2.hours)
]
document_request_ids = documents.map(&:lighthouse_document_request_id)

expect(polling_service).to receive(:call).with(document_request_ids)
described_class.new.perform
end

it 'does not poll for recently polled documents' do
recently_polled_document = create(:lighthouse526_document_upload,
status_last_polled_at: polling_time - 42.minutes)
expect(polling_service).not_to receive(:call).with([recently_polled_document.lighthouse_document_request_id])
described_class.new.perform
end
end

context 'for completed and failed documents' do
let!(:documents) do
[
create(:lighthouse526_document_upload, aasm_state: 'completed',
status_last_polled_at: polling_time - 2.hours),
create(:lighthouse526_document_upload, aasm_state: 'failed', status_last_polled_at: polling_time - 2.hours)
]
end

it 'does not poll for completed or failed documents' do
documents.each do |doc|
expect(polling_service).not_to receive(:call).with([doc.lighthouse_document_request_id])
end
described_class.new.perform
end
end
end
end
end
Loading

0 comments on commit 32d7b1f

Please sign in to comment.