From d0b74ddcdf5d92b74f0f607c78cd643d80db781a Mon Sep 17 00:00:00 2001 From: tiffanyvu Date: Thu, 19 Dec 2024 11:07:42 -0800 Subject: [PATCH] feat(lambda): functionality for updating ID, values, and deleting packages (#923) * wip * check sinkmain * remove deleted code * rename to handler * add env config * fix parse * fix event body type * add origin * add logs * log * fix test and add log * temp bypass record event * change order of sinkmain if * push into sinkmain * add id into doc * log in sinkmain * format deleted package * add sink change log * change order of sinklog changelog * fix schema * add logs changelog * try adding delete property * fix property * log query * change term * letter s * must not * wip update package fields * add log * omg * try fix test and changelog update fields * transform updated values? * typo * again * try * remove origin * add timestamp? * fix changelog not showing * edit changelog text * meg * changelog admin changes updates, language TBD * edit schema parsing for sinkmain * change schema parsing in changelog * rm commented out code * fix eslint * clean up * remove more logs * readd comment * cleanup zod schema logic * change event name * test adminchange for delete * attachments is undefined for soft deleted packages, add optional * wip * revert safeparse * remove empty test * set default attachments value * move default attachments default value * delete test, will be handled by sinkmain tests * edit todo msg * rm unused code * remove origin property * test * check for existing package id * add update id schema * add admin change type * typo * again * add log * edit log * edit changelog * exclude _index * fix destructuring * source? * test log * hide changelog * ew test * add changemade and types to Document type * add zod to event body * update response status codes * merge and rm console log * rm comment * wip id validation * test logs * again * add logs * logs * fix * test format * log * test moving responses into funcs * test * fix punctuation * wip address comments * rm old code * add comment * rm eslint ignore * move topicname logic * test package event type in changelog * log * logs * fix * move event * move logic * logs * typo * what * log * add mako origin to get to transforms * umm touching opensearch * typo * logs * revert * try * revert * revert changelog * hi * logs * add old id to changelog * offset stuff * logs * logs * duplicate offset * fix * log hits * logs * test * fix * fix * idToBeUpdated * add onemac * fix * clean * move around * clean * remove casting * casting throws type error * fix import and add test --------- Co-authored-by: asharonbaltazar <58940073+asharonbaltazar@users.noreply.github.com> --- lib/lambda/search.ts | 2 + lib/lambda/sinkChangelog.ts | 49 +++- lib/lambda/sinkMainProcessors.test.ts | 81 ++++++ lib/lambda/sinkMainProcessors.ts | 53 +++- lib/lambda/update/adminChangeSchemas.ts | 50 ++++ lib/lambda/update/getPackageType.ts | 26 ++ lib/lambda/update/updatePackage.ts | 232 ++++++++++++++++++ .../opensearch/changelog/index.ts | 5 +- .../shared-types/opensearch/main/index.ts | 3 + lib/stacks/api.ts | 10 + .../features/package/admin-changes/index.tsx | 3 - .../package/package-activity/index.tsx | 2 +- 12 files changed, 502 insertions(+), 14 deletions(-) create mode 100644 lib/lambda/update/adminChangeSchemas.ts create mode 100644 lib/lambda/update/getPackageType.ts create mode 100644 lib/lambda/update/updatePackage.ts diff --git a/lib/lambda/search.ts b/lib/lambda/search.ts index a4af6bfa05..f823579122 100644 --- a/lib/lambda/search.ts +++ b/lib/lambda/search.ts @@ -24,6 +24,8 @@ export const getSearchData = async (event: APIGatewayEvent) => { query.query = query?.query || {}; query.query.bool = query.query?.bool || {}; query.query.bool.must = query.query.bool?.must || []; + query.query.bool.must_not = query.query.bool?.must_not || []; + query.query.bool.must_not.push({ term: { deleted: true } }); const stateFilter = await getStateFilter(event); if (stateFilter) { diff --git a/lib/lambda/sinkChangelog.ts b/lib/lambda/sinkChangelog.ts index 9c2fcae00b..d2fe78a00c 100644 --- a/lib/lambda/sinkChangelog.ts +++ b/lib/lambda/sinkChangelog.ts @@ -2,6 +2,12 @@ import { Handler } from "aws-lambda"; import { decodeBase64WithUtf8 } from "shared-utils"; import { KafkaEvent, KafkaRecord, opensearch } from "shared-types"; import { ErrorType, bulkUpdateDataWrapper, getTopic, logError } from "../libs/sink-lib"; +import { + transformUpdateValuesSchema, + transformDeleteSchema, + transformedUpdateIdSchema, +} from "./update/adminChangeSchemas"; +import { getPackageChangelog } from "lib/libs/api/package"; // One notable difference between this handler and sinkMain's... // The order in which records are processed for the changelog doesn't matter. @@ -48,6 +54,7 @@ const processAndIndex = async ({ for (const kafkaRecord of kafkaRecords) { console.log(JSON.stringify(kafkaRecord, null, 2)); const { value, offset } = kafkaRecord; + try { // If a legacy tombstone, continue if (!value) { @@ -57,6 +64,45 @@ const processAndIndex = async ({ // Parse the kafka record's value const record = JSON.parse(decodeBase64WithUtf8(value)); + // query all changelog entries for this ID and create copies of all entries with new ID + if (record.isAdminChange) { + const schema = transformDeleteSchema(offset).or( + transformUpdateValuesSchema(offset).or(transformedUpdateIdSchema), + ); + + const result = schema.safeParse(record); + + if (result.success) { + if (result.data.adminChangeType === "update-id") { + docs.forEach((log) => { + const recordOffset = log.id.split("-").at(-1); + + docs.push({ + ...log, + id: `${result.data.id}-${recordOffset}`, + packageId: result.data.id, + }); + }); + const packageChangelogs = await getPackageChangelog(result.data.idToBeUpdated); + + packageChangelogs.hits.hits.forEach((log) => { + const recordOffset = log._id.split("-").at(-1); + docs.push({ + ...log._source, + id: `${result.data.id}-${recordOffset}`, + packageId: result.data.id, + }); + }); + } else { + docs.push(result.data); + } + } else { + console.log( + `Skipping package with invalid format for type "${record.adminChangeType}"`, + result.error.message, + ); + } + } // If we're not a mako event, continue // TODO: handle legacy. for now, just continue if (!record.event || record?.origin !== "mako") { @@ -64,9 +110,6 @@ const processAndIndex = async ({ } // If the event is a supported event, transform and push to docs array for indexing - console.log("event below"); - console.log(record.event); - if (record.event in transforms) { const transformForEvent = transforms[record.event as keyof typeof transforms]; diff --git a/lib/lambda/sinkMainProcessors.test.ts b/lib/lambda/sinkMainProcessors.test.ts index 83e63c8b30..4b289f4f07 100644 --- a/lib/lambda/sinkMainProcessors.test.ts +++ b/lib/lambda/sinkMainProcessors.test.ts @@ -125,6 +125,87 @@ describe("insertOneMacRecordsFromKafkaIntoMako", () => { ); }); + it("handles valid kafka admin records", () => { + insertOneMacRecordsFromKafkaIntoMako( + [ + createKafkaRecord({ + topic: TOPIC, + key: "TUQtMjQtMjMwMA==", + value: convertObjToBase64({ + id: "MD-24-2301", + submitterName: "George Harrison", + submitterEmail: "george@example.com", + changeMade: "ID has been updated.", + isAdminChange: true, + adminChangeType: "update-id", + idToBeUpdated: "MD-24-2300", + }), + }), + createKafkaRecord({ + topic: TOPIC, + key: "TUQtMjQtMjMwMA==", + value: convertObjToBase64({ + id: "MD-24-2301", + submitterName: "George Harrison", + submitterEmail: "george@example.com", + changeMade: "title has been updated.", + isAdminChange: true, + adminChangeType: "update-values", + title: "updated title", + }), + }), + createKafkaRecord({ + topic: TOPIC, + key: "TUQtMjQtMjMwMA==", + value: convertObjToBase64({ + id: "MD-24-2301", + submitterName: "George Harrison", + submitterEmail: "george@example.com", + isAdminChange: true, + adminChangeType: "delete", + deleted: true, + }), + }), + ], + TOPIC, + ); + + expect(spiedOnBulkUpdateDataWrapper).toBeCalledWith( + [ + // record deleted + { + id: "MD-24-2301", + submitterName: "George Harrison", + submitterEmail: "george@example.com", + changeMade: "ID has been updated.", + isAdminChange: true, + adminChangeType: "update-id", + idToBeUpdated: "MD-24-2300", + }, + // property updated + { + id: "MD-24-2301", + submitterName: "George Harrison", + submitterEmail: "george@example.com", + changeMade: "title has been updated.", + isAdminChange: true, + adminChangeType: "update-values", + title: "updated title", + }, + // id updated + { + id: "MD-24-2301", + submitterName: "George Harrison", + submitterEmail: "george@example.com", + isAdminChange: true, + adminChangeType: "delete", + deleted: true, + }, + ], + "main", + ); + }); + it("skips value-less kafka records", () => { insertOneMacRecordsFromKafkaIntoMako( [ diff --git a/lib/lambda/sinkMainProcessors.ts b/lib/lambda/sinkMainProcessors.ts index 3615a3cf06..c95359a25d 100644 --- a/lib/lambda/sinkMainProcessors.ts +++ b/lib/lambda/sinkMainProcessors.ts @@ -3,25 +3,44 @@ import { KafkaRecord, opensearch, SeatoolRecordWithUpdatedDate } from "shared-ty import { Document, transforms } from "shared-types/opensearch/main"; import { decodeBase64WithUtf8 } from "shared-utils"; import { isBefore } from "date-fns"; +import { + deleteAdminChangeSchema, + updateValuesAdminChangeSchema, + updateIdAdminChangeSchema, +} from "./update/adminChangeSchemas"; const removeDoubleQuotesSurroundingString = (str: string) => str.replace(/^"|"$/g, ""); +const adminRecordSchema = deleteAdminChangeSchema + .or(updateValuesAdminChangeSchema) + .or(updateIdAdminChangeSchema); type OneMacRecord = { id: string; - makoChangedDate: string | null; + [key: string]: unknown | undefined; }; +type ParsedRecordFromKafka = Partial<{ + event: string; + origin: string; + isAdminChange: boolean; + adminChangeType: string; +}>; + const isRecordAOneMacRecord = ( - record: Partial<{ - event: string; - origin: string; - }>, + record: ParsedRecordFromKafka, ): record is { event: keyof typeof transforms } => typeof record === "object" && record?.event !== undefined && record.event in transforms && record?.origin === "mako"; +const isRecordAnAdminOneMacRecord = ( + record: ParsedRecordFromKafka, +): record is { adminChangeType: string; isAdminChange: boolean } => + typeof record === "object" && + record?.isAdminChange === true && + record?.adminChangeType !== undefined; + const getOneMacRecordWithAllProperties = ( value: string, topicPartition: string, @@ -29,6 +48,28 @@ const getOneMacRecordWithAllProperties = ( ): OneMacRecord | undefined => { const record = JSON.parse(decodeBase64WithUtf8(value)); + if (isRecordAnAdminOneMacRecord(record)) { + const safeRecord = adminRecordSchema.safeParse(record); + + if (safeRecord.success === false) { + console.log(`Skipping package with invalid format for type "${record.adminChangeType}"`); + + logError({ + type: ErrorType.VALIDATION, + error: safeRecord.error.errors, + metadata: { topicPartition, kafkaRecord, record }, + }); + + return; + } + + const { data: oneMacAdminRecord } = safeRecord; + + console.log(`admin record: ${JSON.stringify(oneMacAdminRecord, null, 2)}`); + + return oneMacAdminRecord; + } + if (isRecordAOneMacRecord(record)) { const transformForEvent = transforms[record.event]; @@ -37,7 +78,7 @@ const getOneMacRecordWithAllProperties = ( if (safeEvent.success === false) { logError({ type: ErrorType.VALIDATION, - error: safeEvent.error, + error: safeEvent.error.errors, metadata: { topicPartition, kafkaRecord, record }, }); diff --git a/lib/lambda/update/adminChangeSchemas.ts b/lib/lambda/update/adminChangeSchemas.ts new file mode 100644 index 0000000000..274c8b4af0 --- /dev/null +++ b/lib/lambda/update/adminChangeSchemas.ts @@ -0,0 +1,50 @@ +import { z } from "zod"; + +export const deleteAdminChangeSchema = z + .object({ + id: z.string(), + deleted: z.boolean(), + adminChangeType: z.literal("delete"), + }) + .and(z.record(z.string(), z.any())); + +export const updateValuesAdminChangeSchema = z + .object({ + id: z.string(), + adminChangeType: z.literal("update-values"), + }) + .and(z.record(z.string(), z.any())); + +export const updateIdAdminChangeSchema = z + .object({ + id: z.string(), + adminChangeType: z.literal("update-id"), + idToBeUpdated: z.string(), + }) + .and(z.record(z.string(), z.any())); + +export const transformDeleteSchema = (offset: number) => + deleteAdminChangeSchema.transform((data) => ({ + ...data, + event: "delete", + packageId: data.id, + id: `${data.id}-${offset}`, + timestamp: Date.now(), + })); + +export const transformUpdateValuesSchema = (offset: number) => + updateValuesAdminChangeSchema.transform((data) => ({ + ...data, + event: "update-values", + packageId: data.id, + id: `${data.id}-${offset}`, + timestamp: Date.now(), + })); + +export const transformedUpdateIdSchema = updateIdAdminChangeSchema.transform((data) => ({ + ...data, + event: "update-id", + packageId: data.id, + id: `${data.id}`, + timestamp: Date.now(), +})); \ No newline at end of file diff --git a/lib/lambda/update/getPackageType.ts b/lib/lambda/update/getPackageType.ts new file mode 100644 index 0000000000..7383efae94 --- /dev/null +++ b/lib/lambda/update/getPackageType.ts @@ -0,0 +1,26 @@ +import { response } from "lib/libs/handler-lib"; +import { events } from "lib/packages/shared-types"; +import { getPackageChangelog } from "lib/libs/api/package"; + +export const getPackageType = async (packageId: string) => { + // use event of current package to determine how ID should be formatted + try { + const packageChangelog = await getPackageChangelog(packageId); + const packageSubmissionType = packageChangelog.hits.hits.find( + (pkg) => pkg._source.event in events, + ); + + if (!packageSubmissionType) { + throw new Error("The type of package could not be determined."); + } + + return packageSubmissionType._source.event; + } catch (error) { + return response({ + statusCode: 500, + body: { + message: error.message || "An error occurred determining the package submission type.", + }, + }); + } +}; diff --git a/lib/lambda/update/updatePackage.ts b/lib/lambda/update/updatePackage.ts new file mode 100644 index 0000000000..24592a66a6 --- /dev/null +++ b/lib/lambda/update/updatePackage.ts @@ -0,0 +1,232 @@ +import { response } from "libs/handler-lib"; +import { APIGatewayEvent } from "aws-lambda"; +import { getPackage } from "libs/api/package"; +import { produceMessage } from "libs/api/kafka"; +import { ItemResult } from "shared-types/opensearch/main"; +import { getPackageType } from "./getPackageType"; +import { events } from "lib/packages/shared-types"; +import { z } from "zod"; + +const sendDeleteMessage = async (packageId: string) => { + const topicName = process.env.topicName as string; + if (!topicName) { + throw new Error("Topic name is not defined"); + } + await produceMessage( + topicName, + packageId, + JSON.stringify({ + id: packageId, + deleted: true, + isAdminChange: true, + adminChangeType: "delete", + }), + ); + + return response({ + statusCode: 200, + body: { message: `${packageId} has been deleted.` }, + }); +}; + +const sendUpdateValuesMessage = async ({ + currentPackage, + updatedFields, + changeReason, +}: { + currentPackage: ItemResult; + updatedFields: object; + changeReason?: string; +}) => { + const topicName = process.env.topicName as string; + if (!topicName) { + throw new Error("Topic name is not defined"); + } + const invalidFields = Object.keys(updatedFields).filter( + (field) => !(field in currentPackage._source), + ); + if (invalidFields.length > 0) { + return response({ + statusCode: 400, + body: { message: `Cannot update invalid field(s): ${invalidFields.join(", ")}` }, + }); + } + + if ("id" in updatedFields) { + return response({ + statusCode: 400, + body: { message: "ID is not a valid field to update" }, + }); + } + + const fieldNames = Object.keys(updatedFields).join(", "); + const changeMadeText = `${fieldNames} ${ + Object.keys(updatedFields).length > 1 ? "have" : "has" + } been updated`; + + await produceMessage( + topicName, + currentPackage._id, + JSON.stringify({ + id: currentPackage._id, + ...updatedFields, + isAdminChange: true, + adminChangeType: "update-values", + changeMade: changeMadeText, + changeReason, + }), + ); + + return response({ + statusCode: 200, + body: { message: `${changeMadeText} in package ${currentPackage._id}.` }, + }); +}; + +const sendUpdateIdMessage = async ({ + currentPackage, + updatedId, +}: { + currentPackage: ItemResult; + updatedId: string; +}) => { + const topicName = process.env.topicName as string; + if (!topicName) { + throw new Error("Topic name is not defined"); + } + // ID and changeMade are excluded; the rest of the object has to be spread into the new package + const { + id: _id, + changeMade: _changeMade, + origin: _origin, + ...remainingFields + } = currentPackage._source; + + if (!updatedId) { + return response({ + statusCode: 400, + body: { message: "New ID required to update package" }, + }); + } + + // check if a package with this new ID already exists + const packageExists = await getPackage(updatedId); + if (packageExists) { + return response({ + statusCode: 400, + body: { message: "This ID already exists" }, + }); + } + // use event of current package to determine how ID should be formatted + const packageEvent = await getPackageType(currentPackage._id); + const packageSubmissionTypeSchema = events[packageEvent as keyof typeof events].baseSchema; + + if (!packageSubmissionTypeSchema) { + return response({ + statusCode: 500, + body: { message: "Could not validate the ID of this type of package." }, + }); + } + + const idSchema = packageSubmissionTypeSchema.shape.id; + const parsedId = idSchema.safeParse(updatedId); + + if (!parsedId.success) { + return response({ + statusCode: 400, + body: parsedId.error.message, + }); + } + + await sendDeleteMessage(currentPackage._id); + await produceMessage( + topicName, + updatedId, + JSON.stringify({ + id: updatedId, + idToBeUpdated: currentPackage._id, + ...remainingFields, + origin: "OneMAC", + changeMade: "ID has been updated.", + isAdminChange: true, + adminChangeType: "update-id", + }), + ); + + return response({ + statusCode: 200, + body: { message: `The ID of package ${currentPackage._id} has been updated to ${updatedId}.` }, + }); +}; + +const updatePackageEventBodySchema = z.object({ + packageId: z.string(), + action: z.enum(["update-values", "update-id", "delete"]), + updatedId: z.string().optional(), + updatedFields: z.record(z.unknown()).optional(), + changeReason: z.string().optional(), +}); + +export const handler = async (event: APIGatewayEvent) => { + if (!event.body) { + return response({ + statusCode: 400, + body: { message: "Event body required" }, + }); + } + try { + const parseEventBody = (body: unknown) => { + return updatePackageEventBodySchema.parse(typeof body === "string" ? JSON.parse(body) : body); + }; + + const { + packageId, + action, + updatedId = packageId, + updatedFields = {}, + changeReason, + } = parseEventBody(event.body); + + if (!packageId || !action) { + return response({ + statusCode: 400, + body: { message: "Package ID and action are required" }, + }); + } + + const currentPackage = await getPackage(packageId); + + if (!currentPackage) { + return response({ + statusCode: 404, + body: { message: "No record found for the given id" }, + }); + } + + if (action === "delete") { + return await sendDeleteMessage(packageId); + } + + if (action === "update-id") { + return await sendUpdateIdMessage({ currentPackage, updatedId }); + } + + if (action === "update-values") { + return await sendUpdateValuesMessage({ + currentPackage, + updatedFields, + changeReason, + }); + } + return response({ + statusCode: 400, + body: { message: "Could not update package." }, + }); + } catch (err) { + console.error("Error has occured modifying package:", err); + return response({ + statusCode: 500, + body: { message: err.message || "Internal Server Error" }, + }); + } +}; diff --git a/lib/packages/shared-types/opensearch/changelog/index.ts b/lib/packages/shared-types/opensearch/changelog/index.ts index 91cea308f2..f6137b4981 100644 --- a/lib/packages/shared-types/opensearch/changelog/index.ts +++ b/lib/packages/shared-types/opensearch/changelog/index.ts @@ -79,7 +79,10 @@ export type Document = Omit & | "toggle-withdraw-rai" | "upload-subsequent-documents" | "withdraw-package" - | "withdraw-rai"; + | "withdraw-rai" + | "update-values" + | "update-id" + | "delete"; }; export type Response = Res; diff --git a/lib/packages/shared-types/opensearch/main/index.ts b/lib/packages/shared-types/opensearch/main/index.ts index 672d7945b0..131c807321 100644 --- a/lib/packages/shared-types/opensearch/main/index.ts +++ b/lib/packages/shared-types/opensearch/main/index.ts @@ -63,6 +63,9 @@ export type Document = AppkDocument & changelog?: Changelog[]; appkChildren?: Omit[]; deleted?: boolean; + adminChangeType?: string; + changeMade?: string; + idToBeUpdated?: string; }; export type Response = Res; diff --git a/lib/stacks/api.ts b/lib/stacks/api.ts index c6d8fd2ce1..f8cee59b1b 100644 --- a/lib/stacks/api.ts +++ b/lib/stacks/api.ts @@ -281,6 +281,16 @@ export class Api extends cdk.NestedStack { entry: join(__dirname, "../lambda/getAllForms.ts"), environment: {}, }, + { + id: "updatePackage", + entry: join(__dirname, "../lambda/update/updatePackage.ts"), + environment: { + topicName, + brokerString, + osDomain: `https://${openSearchDomainEndpoint}`, + indexNamespace, + }, + }, ]; const lambdas = lambdaDefinitions.reduce((acc, lambdaDef) => { diff --git a/react-app/src/features/package/admin-changes/index.tsx b/react-app/src/features/package/admin-changes/index.tsx index 5bf8503ccb..403827f0ca 100644 --- a/react-app/src/features/package/admin-changes/index.tsx +++ b/react-app/src/features/package/admin-changes/index.tsx @@ -59,9 +59,6 @@ export const AdminChange: FC = (props) => { } return ["Disable formal RAI response withdraw", AC_WithdrawDisabled]; } - - // case "update-id": - // return ["Package ID Update", AC_UpdateId]; case "legacy-admin-change": return [props.changeType || "Manual Update", AC_LegacyAdminChange]; default: diff --git a/react-app/src/features/package/package-activity/index.tsx b/react-app/src/features/package/package-activity/index.tsx index 9ae69d9b45..29479345cf 100644 --- a/react-app/src/features/package/package-activity/index.tsx +++ b/react-app/src/features/package/package-activity/index.tsx @@ -48,7 +48,7 @@ type SubmissionProps = { }; const Submission = ({ packageActivity }: SubmissionProps) => { - const { attachments, id, packageId, additionalInformation } = packageActivity; + const { attachments = [], id, packageId, additionalInformation } = packageActivity; const { onUrl, loading, onZip } = useAttachmentService({ packageId }); return (