Skip to content

Commit

Permalink
Merge branch 'staging' into 10285-design-debt
Browse files Browse the repository at this point in the history
  • Loading branch information
jimlerza authored Dec 20, 2024
2 parents d48fe23 + 49803be commit 4c0d271
Show file tree
Hide file tree
Showing 68 changed files with 1,584 additions and 354 deletions.
2 changes: 1 addition & 1 deletion scripts/env/environments/00-common
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ REGION="${DEFAULT_REGION}"
CURRENT_COLOR=$(./scripts/dynamo/get-current-color.sh "${ENV}")
SOURCE_TABLE=$(./scripts/dynamo/get-source-table.sh "${ENV}")
SOURCE_TABLE_VERSION="${SOURCE_TABLE//efcms-${ENV}-/}"
DB_HOST=$(./scripts/postgres/get-host.sh -h)
DB_HOST=$(./scripts/postgres/get-host.sh -h -w)

# region hard-coded; all ES domains and Cognito user pools are in us-east-1
ELASTICSEARCH_ENDPOINT=$(aws es describe-elasticsearch-domain \
Expand Down
244 changes: 244 additions & 0 deletions scripts/run-once-scripts/cleanup-corrupt-messages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
#!/usr/bin/env npx ts-node --transpile-only

import { type ScriptConfig, parseArguments } from '../reports/reportUtils';
import {
type ServerApplicationContext,
createApplicationContext,
} from '../../web-api/src/applicationContext';
import { requireEnvVars } from '../../shared/admin-tools/util';
import { connect } from '../../web-api/src/database';
import PQueue from 'p-queue';
import fs from 'fs';
import { queryFull } from '../../web-api/src/persistence/dynamodbClientService';
import { Signer } from '@aws-sdk/rds-signer';
import path from 'path';
import { Kysely } from 'kysely';
import { Database } from '../../web-api/src/database-types';
import { RawCorrespondence } from '../../shared/src/business/entities/Correspondence';

requireEnvVars(['REGION', 'DB_NAME', 'DB_HOST', 'DB_USER']);
const { DB_NAME, DB_HOST, DB_USER } = process.env;
const DB_PORT = 5432;

const scriptConfig: ScriptConfig = {
parameters: {
liveRun: {
required: false,
type: 'boolean',
default: false,
long: 'live-run',
description:
'If true, will proceed with removing the attachments from the impacted messages.',
},
},
};

type MessageFragment = {
attachments: any[] | undefined;
docketNumber: string;
messageId: string;
};

const getDocketEntryIdsByDocketNumbers = async ({
applicationContext,
docketNumbers,
}: {
applicationContext: ServerApplicationContext;
docketNumbers: string[];
}): Promise<any> => {
console.log(`Fetching docket entries for each docket number...`);

const priorityQueue = new PQueue({ concurrency: 50 });

const docketEntryIdsByDocketNumber: Record<string, string[]> = {};
const correspondenceIdsByDocketNumber: Record<string, string[]> = {};

const getDocketEntriesFunctions = docketNumbers.map(
docketNumber => async () => {
const docketEntries = (await queryFull({
ExpressionAttributeNames: {
'#pk': 'pk',
'#sk': 'sk',
},
ExpressionAttributeValues: {
':pk': `case|${docketNumber}`,
':prefix': 'docket-entry|',
},
KeyConditionExpression: '#pk = :pk AND begins_with(#sk, :prefix)',
applicationContext,
})) as RawDocketEntry[];

const correspondence = (await queryFull({
ExpressionAttributeNames: {
'#pk': 'pk',
'#sk': 'sk',
},
ExpressionAttributeValues: {
':pk': `case|${docketNumber}`,
':prefix': 'correspondence|',
},
KeyConditionExpression: '#pk = :pk AND begins_with(#sk, :prefix)',
applicationContext,
})) as RawCorrespondence[];

docketEntryIdsByDocketNumber[docketNumber] = docketEntries.map(
docketEntry => docketEntry.docketEntryId,
);

correspondenceIdsByDocketNumber[docketNumber] = correspondence.map(
correspondence => correspondence.correspondenceId,
);
},
);

await priorityQueue.addAll(getDocketEntriesFunctions);
return { docketEntryIdsByDocketNumber, correspondenceIdsByDocketNumber };
};

const removePoisonAttachmentsFromMessages = async ({
messageFragments,
docketEntryIdsByDocketNumber,
correspondenceIdsByDocketNumber,
}: {
messageFragments: MessageFragment[];
docketEntryIdsByDocketNumber: Record<string, string[]>;
correspondenceIdsByDocketNumber: Record<string, string[]>;
}): Promise<{
deletedAttachmentAuditRecords: {
messageId: string;
docketEntryId: string;
}[];
updatedMessageFragments: MessageFragment[];
}> => {
const updatedMessageFragments: MessageFragment[] = [];
const deletedAttachmentAuditRecords: {
messageId: string;
docketEntryId: string;
}[] = [];
for (const message of messageFragments) {
if (message.attachments) {
for (const attachment of message.attachments) {
if (
!docketEntryIdsByDocketNumber[message.docketNumber]?.includes(
attachment.documentId,
) &&
!correspondenceIdsByDocketNumber[message.docketNumber]?.includes(
attachment.documentId,
)
) {
deletedAttachmentAuditRecords.push({
messageId: message.messageId,
docketEntryId: attachment.documentId,
});
console.log(
`Removing attachment ${attachment.documentId} from message ${message.messageId}`,
);
message.attachments = message.attachments?.filter(
att => att.documentId !== attachment.documentId,
);
updatedMessageFragments.push(message);
}
}
}
}

return {
deletedAttachmentAuditRecords,
updatedMessageFragments,
};
};

const udpateMessagesInDb = async (
db: Kysely<Database>,
updatedMessageFragments: MessageFragment[],
) => {
await db.transaction().execute(async trx => {
for (const message of updatedMessageFragments) {
await trx
.updateTable('dwMessage')
.set({ attachments: JSON.stringify(message.attachments) })
.where('messageId', '=', message.messageId)
.execute();
}
});
};

// eslint-disable-next-line @typescript-eslint/no-floating-promises
(async () => {
const applicationContext: ServerApplicationContext = createApplicationContext(
{},
);

const { liveRun } = parseArguments(scriptConfig);

const sourceSigner = new Signer({
hostname: DB_HOST!,
port: DB_PORT,
region: 'us-east-1',
username: DB_USER!,
});
const sourcePassword = await sourceSigner.getAuthToken();

const config = {
database: DB_NAME,
host: DB_HOST,
idleTimeoutMillis: 1000,
max: 1,
password: sourcePassword,
port: DB_PORT,
user: DB_USER,
ssl: {
ca: fs.readFileSync('global-bundle.pem').toString(),
},
};

const db = await connect(config);

console.log('Fetching messages that have not been replied to...');
const messageFragments = await db
.selectFrom('dwMessage')
.select(['attachments', 'messageId', 'docketNumber'])
.execute();

// collect all unique docket numbers from messages
console.log('Collecting unique docket numbers from messages...');
const docketNumbers = Array.from(
new Set(messageFragments.map(message => message.docketNumber)),
);

const { docketEntryIdsByDocketNumber, correspondenceIdsByDocketNumber } =
await getDocketEntryIdsByDocketNumbers({
applicationContext,
docketNumbers,
});

const { deletedAttachmentAuditRecords, updatedMessageFragments } =
await removePoisonAttachmentsFromMessages({
docketEntryIdsByDocketNumber,
messageFragments,
correspondenceIdsByDocketNumber,
});

if (liveRun) {
console.log(`Updating ${updatedMessageFragments.length} messages in DB...`);
await udpateMessagesInDb(db, updatedMessageFragments);
}

const auditFilename = 'corruptMessageCleanupAudit.json';
fs.writeFileSync(
auditFilename,
JSON.stringify(deletedAttachmentAuditRecords, null, 2),
);

console.log(
'------------------------------------------------------------------',
);
console.log(
`A log of attachments removed can be found here: ${path.resolve(__dirname, auditFilename)}`,
);
console.log(
'Removed attachments count: ',
deletedAttachmentAuditRecords.length,
);
console.log('Impacted messages count: ', updatedMessageFragments.length);
})();
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import {
BatchWriteCommand,
DynamoDBDocumentClient,
} from '@aws-sdk/lib-dynamodb';

export async function batchDeleteDynamoItems(
itemsToDelete: { DeleteRequest: { Key: { pk: string; sk: string } } }[],
client: DynamoDBDocumentClient,
tableNameInput: string,
): Promise<number> {
const BATCH_SIZE = 25;
const RETRY_DELAY_MS = 5000; // Set the delay between retries (in milliseconds)
let totalItemsDeleted = 0;

for (let i = 0; i < itemsToDelete.length; i += BATCH_SIZE) {
const batch = itemsToDelete.slice(i, i + BATCH_SIZE);

const batchWriteParams = {
RequestItems: {
[tableNameInput]: batch,
},
};

try {
let unprocessedItems: any[] = batch;
let retryCount = 0;
const MAX_RETRIES = 5;

// Retry logic for unprocessed items
while (unprocessedItems.length > 0 && retryCount < MAX_RETRIES) {
const response = await client.send(
new BatchWriteCommand(batchWriteParams),
);

totalItemsDeleted +=
unprocessedItems.length -
(response.UnprocessedItems?.[tableNameInput]?.length || 0);

unprocessedItems = response.UnprocessedItems?.[tableNameInput] ?? [];

if (unprocessedItems.length > 0) {
console.log(
`Retrying unprocessed items: ${unprocessedItems.length}, attempt ${retryCount + 1}`,
);
batchWriteParams.RequestItems[tableNameInput] = unprocessedItems;
retryCount++;

// Add delay before the next retry
await new Promise(resolve => setTimeout(resolve, RETRY_DELAY_MS));
}
}

if (unprocessedItems.length > 0) {
console.error(
`Failed to delete ${unprocessedItems.length} items after ${MAX_RETRIES} retries.`,
);
}
} catch (error) {
console.error('Error in batch delete:', error);
}
}
return totalItemsDeleted;
}
60 changes: 60 additions & 0 deletions scripts/run-once-scripts/postgres-migration/delete-case-notes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* HOW TO RUN
* npx ts-node --transpileOnly scripts/run-once-scripts/postgres-migration/delete-case-notes.ts
*/

import { DynamoDBDocumentClient } from '@aws-sdk/lib-dynamodb';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { getDbReader } from '../../../web-api/src/database';
import { isEmpty } from 'lodash';
import { batchDeleteDynamoItems } from './batch-delete-dynamo-items';
import { environment } from '../../../web-api/src/environment';

const caseUserNotesPageSize = 10000;
const dynamoDbClient = new DynamoDBClient({ region: 'us-east-1' });
const dynamoDbDocClient = DynamoDBDocumentClient.from(dynamoDbClient);

// We set the environment as 'production' (= "a deployed environment") to get the RDS connection to work properly
environment.nodeEnv = 'production';

const getCaseNotesToDelete = async (offset: number) => {
const caseNotes = await getDbReader(reader =>
reader
.selectFrom('dwUserCaseNote')
.select(['docketNumber', 'userId'])
.orderBy(['docketNumber', 'userId'])
.limit(caseUserNotesPageSize)
.offset(offset)
.execute(),
);
return caseNotes;
};

let totalItemsDeleted = 0;

async function main() {
let offset = 0;
let caseNotesToDelete = await getCaseNotesToDelete(offset);

while (!isEmpty(caseNotesToDelete)) {
const dynamoItemsToDelete = caseNotesToDelete.map(c => ({
DeleteRequest: {
Key: {
pk: `user-case-note|${c.docketNumber}`,
sk: `user|${c.userId}`,
},
},
}));
totalItemsDeleted += await batchDeleteDynamoItems(
dynamoItemsToDelete,
dynamoDbDocClient,
environment.dynamoDbTableName,
);
console.log(`Total case notes deleted so far: ${totalItemsDeleted}`);
offset += caseUserNotesPageSize;
caseNotesToDelete = await getCaseNotesToDelete(offset);
}
console.log('Done deleting case notes from Dynamo');
}

main().catch(console.error);
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
ScanCommand,
} from '@aws-sdk/lib-dynamodb';
import { DynamoDBClient, ScanCommandInput } from '@aws-sdk/client-dynamodb';
import { requireEnvVars } from '../../shared/admin-tools/util';
import { requireEnvVars } from '../../../shared/admin-tools/util';

requireEnvVars(['TABLE_NAME']);

Expand Down
Loading

0 comments on commit 4c0d271

Please sign in to comment.