From 8e39f9622beb1abec9b296cf08cfd3704bede7eb Mon Sep 17 00:00:00 2001 From: Travis Semple Date: Tue, 5 Nov 2024 13:59:16 -0800 Subject: [PATCH] Sentry fixes + Disbursement job fix (#1811) --- jobs/payment-jobs/services/data_warehouse.py | 15 ++++----- .../tasks/bcol_refund_confirmation_task.py | 10 +++--- .../tasks/cfs_create_invoice_task.py | 2 +- .../tasks/ejv_partner_distribution_task.py | 25 +++++++++++---- .../test_ejv_partner_distribution_task.py | 31 +++++++++++++++++++ .../test_data_warehouse_connection.py | 10 +++--- pay-api/src/pay_api/resources/v1/account.py | 2 ++ pay-api/tests/unit/api/test_account.py | 23 +++++++++++--- .../services/payment_reconciliations.py | 2 +- 9 files changed, 91 insertions(+), 29 deletions(-) diff --git a/jobs/payment-jobs/services/data_warehouse.py b/jobs/payment-jobs/services/data_warehouse.py index 79dbeb84f..15de87eff 100644 --- a/jobs/payment-jobs/services/data_warehouse.py +++ b/jobs/payment-jobs/services/data_warehouse.py @@ -18,11 +18,12 @@ # services/data_warehouse.py +from dataclasses import dataclass + import pg8000 +from google.cloud.sql.connector import Connector from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker -from google.cloud.sql.connector import Connector -from dataclasses import dataclass @dataclass @@ -49,14 +50,14 @@ def getconn(connector: Connector, db_config: DBConfig) -> object: """ if db_config.unix_sock: # Use Unix socket connection with the Connector for deployment - instance_connection_string = db_config.unix_sock.replace('/cloudsql/', '') + instance_connection_string = db_config.unix_sock.replace("/cloudsql/", "") return connector.connect( instance_connection_string=instance_connection_string, - ip_type='private', + ip_type="private", user=db_config.user, password=db_config.password, db=db_config.database, - driver='pg8000', + driver="pg8000", ) else: conn = pg8000.connect( @@ -64,7 +65,7 @@ def getconn(connector: Connector, db_config: DBConfig) -> object: user=db_config.user, password=db_config.password, host=db_config.host, - port=db_config.port + port=db_config.port, ) return conn @@ -98,7 +99,7 @@ def init_app(self, app): max_overflow=2, pool_timeout=10, pool_recycle=1800, - connect_args={"use_native_uuid": False} + connect_args={"use_native_uuid": False}, ) app.teardown_appcontext(self.teardown) diff --git a/jobs/payment-jobs/tasks/bcol_refund_confirmation_task.py b/jobs/payment-jobs/tasks/bcol_refund_confirmation_task.py index cf1fa5677..43fd02027 100644 --- a/jobs/payment-jobs/tasks/bcol_refund_confirmation_task.py +++ b/jobs/payment-jobs/tasks/bcol_refund_confirmation_task.py @@ -72,7 +72,7 @@ def _get_data_warehouse_bcol_records_for_invoices(cls, invoice_refs: List[Invoic # Split invoice refs into groups of 5000 invoice_ref_chunks = [] for i in range(0, len(invoice_refs), 5000): - invoice_ref_chunks.append(invoice_refs[i: i + 5000]) + invoice_ref_chunks.append(invoice_refs[i : i + 5000]) bcol_refunds_all = {} current_app.logger.debug("Connecting to data_warehouse...") @@ -81,17 +81,19 @@ def _get_data_warehouse_bcol_records_for_invoices(cls, invoice_refs: List[Invoic invoice_numbers_str = ", ".join("'" + str(x.invoice_number) + "'" for x in invoice_ref_grp) current_app.logger.debug("Collecting Data Warehouse BCOL refund records...") - query = text(f""" + query = text( + f""" SELECT key, total_amt FROM colin.bconline_billing_record WHERE key IN ({invoice_numbers_str}) AND qty = -1 - """) + """ + ) results = session.execute(query).fetchall() # Convert float from the database to Decimal - bcol_refunds_all.update({row['key']: Decimal(str(row['total_amt'])) for row in results}) + bcol_refunds_all.update({row["key"]: Decimal(str(row["total_amt"])) for row in results}) # set invoice_number as the key (makes it easier map against) return bcol_refunds_all diff --git a/jobs/payment-jobs/tasks/cfs_create_invoice_task.py b/jobs/payment-jobs/tasks/cfs_create_invoice_task.py index cbd3daff5..bc2f814a6 100644 --- a/jobs/payment-jobs/tasks/cfs_create_invoice_task.py +++ b/jobs/payment-jobs/tasks/cfs_create_invoice_task.py @@ -355,7 +355,7 @@ def _create_pad_invoices(cls): # pylint: disable=too-many-locals continue # This is synced after receiving a CSV file at 9:30 AM each day. credit_remaining_total = CreditModel.find_remaining_by_account_id(account.id) - current_app.logger.info('credit_remaining_total: %s', credit_remaining_total) + current_app.logger.info("credit_remaining_total: %s", credit_remaining_total) credit_total = min(credit_remaining_total, invoice_total) additional_params = { "credit_total": float(credit_total), diff --git a/jobs/payment-jobs/tasks/ejv_partner_distribution_task.py b/jobs/payment-jobs/tasks/ejv_partner_distribution_task.py index c640c4b49..34d72ccc7 100644 --- a/jobs/payment-jobs/tasks/ejv_partner_distribution_task.py +++ b/jobs/payment-jobs/tasks/ejv_partner_distribution_task.py @@ -32,7 +32,7 @@ from pay_api.models import Receipt as ReceiptModel from pay_api.models import db from pay_api.utils.enums import DisbursementStatus, EjvFileType, EJVLinkType, InvoiceStatus, PaymentMethod -from sqlalchemy import Date, and_, cast +from sqlalchemy import Date, and_, cast, or_ from tasks.common.cgi_ejv import CgiEjv from tasks.common.dataclasses import Disbursement, DisbursementLineItem @@ -160,6 +160,15 @@ def get_disbursement_by_distribution_for_partner(partner): .filter(PartnerDisbursementsModel.partner_code == partner.code) .filter(DistributionCodeModel.stop_ejv.is_(False) | DistributionCodeModel.stop_ejv.is_(None)) .filter(~InvoiceModel.receipts.any(cast(ReceiptModel.receipt_date, Date) >= disbursement_date.date())) + .filter( + or_( + and_( + PartnerDisbursementsModel.is_reversal.is_(False), + InvoiceModel.invoice_status_code == InvoiceStatus.PAID.value, + ), + PartnerDisbursementsModel.is_reversal.is_(True), + ) + ) .order_by(DistributionCodeModel.distribution_code_id, PaymentLineItemModel.id) .all() ) @@ -311,11 +320,15 @@ def _update_disbursement_status_and_ejv_link( raise NotImplementedError("Unknown disbursement type") # Possible this could already be created, eg two PLI. - if db.session.query(EjvLinkModel).filter( - EjvLinkModel.link_id == disbursement.line_item.identifier, - EjvLinkModel.link_type == disbursement.line_item.target_type, - EjvLinkModel.ejv_header_id == ejv_header_model.id, - ).first(): + if ( + db.session.query(EjvLinkModel) + .filter( + EjvLinkModel.link_id == disbursement.line_item.identifier, + EjvLinkModel.link_type == disbursement.line_item.target_type, + EjvLinkModel.ejv_header_id == ejv_header_model.id, + ) + .first() + ): return db.session.add( diff --git a/jobs/payment-jobs/tests/jobs/test_ejv_partner_distribution_task.py b/jobs/payment-jobs/tests/jobs/test_ejv_partner_distribution_task.py index 9d3075e47..3e3469b4e 100644 --- a/jobs/payment-jobs/tests/jobs/test_ejv_partner_distribution_task.py +++ b/jobs/payment-jobs/tests/jobs/test_ejv_partner_distribution_task.py @@ -126,6 +126,33 @@ def test_disbursement_for_partners(session, monkeypatch, client_code, batch_type target_type=EJVLinkType.INVOICE.value, ).save() + eft_invoice_approved = factory_invoice( + payment_account=pad_account, + corp_type_code=corp_type.code, + total=11.5, + payment_method_code=PaymentMethod.EFT.value, + status_code="APPROVED", + ) + + factory_payment_line_item( + invoice_id=eft_invoice_approved.id, + fee_schedule_id=fee_schedule.fee_schedule_id, + filing_fees=10, + total=10, + service_fees=1.5, + fee_dist_id=fee_distribution.distribution_code_id, + ) + + inv_ref = factory_invoice_reference(invoice_id=eft_invoice_approved.id) + partner_disbursement_approved = PartnerDisbursementsModel( + amount=10, + is_reversal=False, + partner_code=eft_invoice.corp_type_code, + status_code=DisbursementStatus.WAITING_FOR_JOB.value, + target_id=eft_invoice_approved.id, + target_type=EJVLinkType.INVOICE.value, + ).save() + EjvPartnerDistributionTask.create_ejv_file() # Lookup invoice and assert disbursement status @@ -141,6 +168,9 @@ def test_disbursement_for_partners(session, monkeypatch, client_code, batch_type invoice = Invoice.find_by_id(invoice.id) assert invoice.disbursement_status_code == DisbursementStatus.UPLOADED.value + eft_invoice_approved = Invoice.find_by_id(eft_invoice_approved.id) + assert eft_invoice_approved.disbursement_status_code is None + ejv_inv_link = db.session.query(EjvLink).filter(EjvLink.link_id == invoice.id).first() assert ejv_inv_link @@ -154,6 +184,7 @@ def test_disbursement_for_partners(session, monkeypatch, client_code, batch_type assert partner_disbursement.status_code == DisbursementStatus.UPLOADED.value assert partner_disbursement.processed_on + assert partner_disbursement_approved.status_code == DisbursementStatus.WAITING_FOR_JOB.value # Reverse those payments and assert records. # Set the status of invoice as disbursement completed, so that reversal can kick start. diff --git a/jobs/payment-jobs/tests/services/test_data_warehouse_connection.py b/jobs/payment-jobs/tests/services/test_data_warehouse_connection.py index eeb2fa9c0..fd709d012 100644 --- a/jobs/payment-jobs/tests/services/test_data_warehouse_connection.py +++ b/jobs/payment-jobs/tests/services/test_data_warehouse_connection.py @@ -26,11 +26,11 @@ def app(): """Create a Flask app instance configured for testing.""" app = Flask(__name__) - app.config['DW_HOST'] = 'mock_host' - app.config['DW_PORT'] = 5432 - app.config['DW_NAME'] = 'mock_database' - app.config['DW_USER'] = 'mock_user' - app.config['DW_PASSWORD'] = 'mock_password' + app.config["DW_HOST"] = "mock_host" + app.config["DW_PORT"] = 5432 + app.config["DW_NAME"] = "mock_database" + app.config["DW_USER"] = "mock_user" + app.config["DW_PASSWORD"] = "mock_password" return app diff --git a/pay-api/src/pay_api/resources/v1/account.py b/pay-api/src/pay_api/resources/v1/account.py index a196e46b2..2ccda50ac 100644 --- a/pay-api/src/pay_api/resources/v1/account.py +++ b/pay-api/src/pay_api/resources/v1/account.py @@ -258,6 +258,8 @@ def put_account_fee_product(account_number: str, product: str): def post_search_purchase_history(account_number: str): """Search purchase history.""" current_app.logger.info("