Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Epic/attribution lambda generalization #1022

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cclf-import-test-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
CGO_ENABLED: 0
run: |
go build -o bin/bootstrap ./lambda/cclf/main.go
zip -j function.zip bin/bootstrap
zip -j function.zip bin/bootstrap ../conf/configs/dev.yml ../conf/configs/test.yml ../conf/configs/prod.yml
- uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: ${{ vars.AWS_REGION }}
Expand Down
40 changes: 36 additions & 4 deletions .github/workflows/cclf-import-test-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ jobs:
run:
working-directory: bcda
outputs:
filename: ${{ steps.createfile.outputs.FILENAME }}
cclffilename: ${{ steps.createfile.outputs.cclffilename }}
csvfilename: ${{ steps.createfile.outputs.csvfilename }}
steps:
- uses: actions/checkout@v4
- uses: aws-actions/configure-aws-credentials@v4
Expand All @@ -53,9 +54,15 @@ jobs:
year=$(date +'%y')
date=$(date +'%y%m%d')
time=$(date +'%H%M%S')

fname=T.BCD.A0001.ZCY24.D${date}.T${time}1
cclf8_fname=T.BCD.A0001.ZC8Y24.D${date}.T${time}1
echo "FILENAME=$cclf8_fname" >> "$GITHUB_OUTPUT"
echo "CCLFFILENAME=$cclf8_fname" >> "$GITHUB_OUTPUT"

csvname=T.PCPB.M${year}11.D${date}.T${time}1
echo "CSVFILENAME=$csvname" >> "$GITHUB_OUTPUT"

mv ../shared_files/cclf/archives/csv/P.PCPB.M2411.D181120.T1000000 ${csvname}

unzip ../shared_files/cclf/archives/valid/T.BCD.A0001.ZCY18.D181120.T1000000

Expand All @@ -70,6 +77,9 @@ jobs:
aws s3 cp --no-progress $fname \
s3://bfd-test-eft/bfdeft01/bcda/in/test/$fname

aws s3 cp --no-progress ${csvname} \
s3://bfd-test-eft/bfdeft01/bcda/in/test/${csvname}

verify:
needs: trigger
runs-on: self-hosted
Expand All @@ -93,15 +103,16 @@ jobs:
CONNECTION_INFO=/bcda/test/api/DATABASE_URL
- name: Verify CCLF file was ingested
env:
FILENAME: ${{needs.trigger.outputs.filename}}
CCLFFILENAME: ${{needs.trigger.outputs.cclffilename}}
CSVFILENAME: ${{needs.trigger.outputs.csvfilename}}
PGSSLMODE: require
# CAUTION: if changing the script below, validate that sensitive information is not printed in the workflow
run: |
HOST=$(aws rds describe-db-instances --db-instance-identifier bcda-test-rds 2>&1 | jq -r '.DBInstances[0].Endpoint.Address' 2>&1)
CONNECTION_URL=$(echo $CONNECTION_INFO 2>&1 | sed -E "s/@.*\/bcda/\@$HOST\/bcda/" 2>&1)

# Verify that we have a record of the CCLF file in the database
CCLF_FILE=`psql -t "$CONNECTION_URL" -c "SELECT id FROM cclf_files WHERE name = '$FILENAME' LIMIT 1" 2>&1`
CCLF_FILE=`psql -t "$CONNECTION_URL" -c "SELECT id FROM cclf_files WHERE name = '$CCLFFILENAME' LIMIT 1" 2>&1`
if [[ $? -ne 0 || -z $CCLF_FILE ]]; then
echo "cclf_file query returned zero results or command failed"
exit 1
Expand All @@ -118,3 +129,24 @@ jobs:
exit 1
fi
fi

# Verify that we have a record of the CSV file in the database
CSV_FILE=`psql -t "$CONNECTION_URL" -c "SELECT id FROM cclf_files WHERE name = '$CSVFILENAME' LIMIT 1" 2>&1`
if [[ $? -ne 0 || -z $CSV_FILE ]]; then
echo "csv_file query returned zero results or command failed"
exit 1
else

# Verify that the correct number of benes were imported into the database.
CSV_BENES=`psql -t "$CONNECTION_URL" -c "SELECT count(mbi) FROM cclf_beneficiaries WHERE file_id = $CSV_FILE" 2>&1`
if [[ $? -ne 0 || -z $CSV_BENES ]]; then
echo "CSV beneficiaries query returned zero results or command failed"
exit 1
fi
if [[ $(echo $CSV_BENES | xargs) != "5" ]]; then
echo "expected 5 beneficiaries imported from file, received $CSV_BENES".
exit 1
fi
fi


8 changes: 4 additions & 4 deletions bcda/cclf/cclf.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (importer CclfImporter) importCCLF0(ctx context.Context, zipMetadata *cclfZ
return validator, nil
}

err = fmt.Errorf("failed to parse CCLF8 from CCLF0 file %s", fileMetadata)
err = fmt.Errorf("failed to parse CCLF8 from CCLF0 file %s", fileMetadata.name)
importer.Logger.Error(err)
return nil, err
}
Expand Down Expand Up @@ -214,17 +214,17 @@ func (importer CclfImporter) importCCLF8(ctx context.Context, zipMetadata *cclfZ

err = rtx.UpdateCCLFFileImportStatus(ctx, fileMetadata.fileID, constants.ImportComplete)
if err != nil {
err = errors.Wrapf(err, "could not update cclf file record for file: %s.", fileMetadata)
err = errors.Wrapf(err, "could not update cclf file record for file: %s.", fileMetadata.name)
importer.Logger.Error(err)
}

if err = tx.Commit(); err != nil {
importer.Logger.Error(err.Error())
failMsg := fmt.Sprintf("failed to commit transaction for CCLF%d import file %s", fileMetadata.cclfNum, fileMetadata)
failMsg := fmt.Sprintf("failed to commit transaction for CCLF%d import file %s", fileMetadata.cclfNum, fileMetadata.name)
return errors.Wrap(err, failMsg)
}

successMsg := fmt.Sprintf("Successfully imported %d records from CCLF%d file %s.", importedCount, fileMetadata.cclfNum, fileMetadata)
successMsg := fmt.Sprintf("Successfully imported %d records from CCLF%d file %s.", importedCount, fileMetadata.cclfNum, fileMetadata.name)
importer.Logger.WithFields(logrus.Fields{"imported_count": importedCount}).Info(successMsg)

return nil
Expand Down
218 changes: 218 additions & 0 deletions bcda/cclf/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package cclf

import (
"bytes"
"context"
"database/sql"
"encoding/csv"
"errors"
"fmt"
"io"
f "path/filepath"
"time"

"github.com/jackc/pgx"
"github.com/jackc/pgx/stdlib"
"github.com/sirupsen/logrus"

"github.com/CMSgov/bcda-app/bcda/constants"
ers "github.com/CMSgov/bcda-app/bcda/errors"
"github.com/CMSgov/bcda-app/bcda/models"
"github.com/CMSgov/bcda-app/bcda/models/postgres"
"github.com/CMSgov/bcda-app/bcda/utils"
"github.com/CMSgov/bcda-app/optout"
)

// FileProcessors for attribution are created as interfaces so that they can be passed in place of the implementation; local development and other envs will require different processors.
// This interface has two implementations; one for ingesting and testing locally, and one for ingesting in s3.
type CSVFileProcessor interface {
// Fetch the csv attribution file to be imported.
LoadCSV(path string) (*bytes.Reader, func(), error)
// Remove csv attribution file that was successfully imported.
CleanUpCSV(file csvFile) (err error)
}

type csvFile struct {
metadata csvFileMetadata
data *bytes.Reader
imported bool
filepath string
}

type csvFileMetadata struct {
name string
env string
acoID string
cclfNum int
perfYear int
timestamp time.Time
deliveryDate time.Time
fileID uint
fileType models.CCLFFileType
}

type CSVImporter struct {
Logger logrus.FieldLogger
FileProcessor CSVFileProcessor
Database *sql.DB
}

func (importer CSVImporter) ImportCSV(filepath string) error {

file := csvFile{filepath: filepath}

optOut, _ := optout.IsOptOut(filepath)
if optOut {
return &ers.IsOptOutFile{}
}

short := f.Base(filepath)

metadata, err := GetCSVMetadata(short)
if err != nil {
return &ers.InvalidCSVMetadata{Msg: err.Error()}
}
file.metadata = metadata

data, _, err := importer.FileProcessor.LoadCSV(filepath)
if err != nil {
if errors.Is(err, &ers.AttributionFileMismatchedEnv{}) {
importer.Logger.WithFields(logrus.Fields{"file": filepath}).Info(err)
return nil
} else {
return err
}
}

file.data = data

err = importer.ProcessCSV(file)
if err != nil {
return err
}

err = importer.FileProcessor.CleanUpCSV(file)
if err != nil {
return err
}
return nil
}

// ProcessCSV() will take provided metadata and write a new record to the cclf_files table and the contents of the file and write new record(s) to the cclf_beneficiaries table.
// If any step of writing to the database should fail, the whole transaction will fail. If the new records are written successfully, then the new record in the cclf_files
// table will have it's import status updated.
func (importer CSVImporter) ProcessCSV(csv csvFile) error {
ctx := context.Background()
repository := postgres.NewRepository(importer.Database)
exists, err := repository.GetCCLFFileExistsByName(ctx, csv.metadata.name)
if err != nil {
return fmt.Errorf("database query returned an error: %s", err)
}
if exists {
return &ers.AttributionFileAlreadyExists{Filename: csv.metadata.name}
}

importer.Logger.Infof("Importing CSV file %s...", csv.metadata.name)
conn, err := stdlib.AcquireConn(importer.Database)
if err != nil {
return err
}

defer utils.CloseAndLog(logrus.WarnLevel, func() error { return stdlib.ReleaseConn(importer.Database, conn) })

tx, err := conn.BeginEx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}

rtx := postgres.NewRepositoryPgxTx(tx)
var records int
defer func() {
if err != nil {
if err1 := tx.Rollback(); err1 != nil {
importer.Logger.Errorf("Failed to rollback transaction: %s, %s", err.Error(), err1.Error())
}
return
}
}()

// CCLF model corresponds with a database record
record := models.CCLFFile{
CCLFNum: csv.metadata.cclfNum,
Name: csv.metadata.name,
ACOCMSID: csv.metadata.acoID,
Timestamp: csv.metadata.timestamp,
PerformanceYear: csv.metadata.perfYear,
ImportStatus: constants.ImportInprog,
Type: csv.metadata.fileType,
}

record.ID, err = rtx.CreateCCLFFile(ctx, record)
if err != nil {
err := fmt.Errorf("database error when calling CreateCCLFFile(): %s", err)
return err
}

csv.metadata.fileID = record.ID

rows, count, err := importer.prepareCSVData(csv.data, record.ID)
if err != nil {
return err
}

records, err = tx.CopyFrom(pgx.Identifier{"cclf_beneficiaries"}, []string{"file_id", "mbi"}, pgx.CopyFromRows(rows))
if count != records {
return fmt.Errorf("unexpected number of records imported (expected: %d, actual: %d)", count, records)
}
if err != nil {
return errors.New("failed to write attribution beneficiaries to database using CopyFrom.")
}

err = rtx.UpdateCCLFFileImportStatus(ctx, csv.metadata.fileID, constants.ImportComplete)
if err != nil {
return fmt.Errorf("database error when calling UpdateCCLFFileImportStatus(): %s", csv.metadata.name)
carlpartridge marked this conversation as resolved.
Show resolved Hide resolved
}

if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to commit database transaction: %s", err)
}

successMsg := fmt.Sprintf("successfully imported %d records from csv file %s.", records, csv.metadata.name)
importer.Logger.WithFields(logrus.Fields{"imported_count": records}).Info(successMsg)
return nil
}

func (importer CSVImporter) prepareCSVData(csvfile *bytes.Reader, id uint) ([][]interface{}, int, error) {
var rows [][]interface{}
r := csv.NewReader(csvfile)

_, err := r.Read()
if err == io.EOF {
return nil, 0, errors.New("empty attribution file")
}
if err != nil {
return nil, 0, fmt.Errorf("failed to read csv attribution header: %s", err)
}

count := 0

for {
var record []string
record, err = r.Read()
carlpartridge marked this conversation as resolved.
Show resolved Hide resolved
if err == io.EOF {
err = nil
break
}
if err != nil {
return nil, 0, fmt.Errorf("failed to read csv attribution file: %s", err)
}
row := make([]interface{}, 2)
row[0] = id
row[1] = record[0]

rows = append(rows, row)
count++
}
return rows, count, err

}
Loading
Loading