Skip to content

Commit

Permalink
feat(lambda): functionality for updating ID, values, and deleting pac…
Browse files Browse the repository at this point in the history
…kages (#923)

* wip

* check sinkmain

* remove deleted code

* rename to handler

* add env config

* fix parse

* fix event body type

* add origin

* add logs

* log

* fix test and add log

* temp bypass record event

* change order of sinkmain if

* push into sinkmain

* add id into doc

* log in sinkmain

* format deleted package

* add sink change log

* change order of sinklog changelog

* fix schema

* add logs changelog

* try adding delete property

* fix property

* log query

* change term

* letter s

* must not

* wip update package fields

* add log

* omg

* try fix test and changelog update fields

* transform updated values?

* typo

* again

* try

* remove origin

* add timestamp?

* fix changelog not showing

* edit changelog text

* meg

* changelog admin changes updates, language TBD

* edit schema parsing for sinkmain

* change schema parsing in changelog

* rm commented out code

* fix eslint

* clean up

* remove more logs

* readd comment

* cleanup zod schema logic

* change event name

* test adminchange for delete

* attachments is undefined for soft deleted packages, add optional

* wip

* revert safeparse

* remove empty test

* set default attachments value

* move default attachments default value

* delete test, will be handled by sinkmain tests

* edit todo msg

* rm unused code

* remove origin property

* test

* check for existing package id

* add update id schema

* add admin change type

* typo

* again

* add log

* edit log

* edit changelog

* exclude _index

* fix destructuring

* source?

* test log

* hide changelog

* ew test

* add changemade and types to Document type

* add zod to event body

* update response status codes

* merge and rm console log

* rm comment

* wip id validation

* test logs

* again

* add logs

* logs

* fix

* test format

* log

* test moving responses into funcs

* test

* fix punctuation

* wip address comments

* rm old code

* add comment

* rm eslint ignore

* move topicname logic

* test package event type in changelog

* log

* logs

* fix

* move event

* move logic

* logs

* typo

* what

* log

* add mako origin to get to transforms

* umm touching opensearch

* typo

* logs

* revert

* try

* revert

* revert changelog

* hi

* logs

* add old id to changelog

* offset stuff

* logs

* logs

* duplicate offset

* fix

* log hits

* logs

* test

* fix

* fix

* idToBeUpdated

* add onemac

* fix

* clean

* move around

* clean

* remove casting

* casting throws type error

* fix import and add test

---------

Co-authored-by: asharonbaltazar <58940073+asharonbaltazar@users.noreply.github.com>
  • Loading branch information
tiffanyvu and asharonbaltazar authored Dec 19, 2024
1 parent 75dfb41 commit d0b74dd
Show file tree
Hide file tree
Showing 12 changed files with 502 additions and 14 deletions.
2 changes: 2 additions & 0 deletions lib/lambda/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export const getSearchData = async (event: APIGatewayEvent) => {
query.query = query?.query || {};
query.query.bool = query.query?.bool || {};
query.query.bool.must = query.query.bool?.must || [];
query.query.bool.must_not = query.query.bool?.must_not || [];
query.query.bool.must_not.push({ term: { deleted: true } });

const stateFilter = await getStateFilter(event);
if (stateFilter) {
Expand Down
49 changes: 46 additions & 3 deletions lib/lambda/sinkChangelog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ import { Handler } from "aws-lambda";
import { decodeBase64WithUtf8 } from "shared-utils";
import { KafkaEvent, KafkaRecord, opensearch } from "shared-types";
import { ErrorType, bulkUpdateDataWrapper, getTopic, logError } from "../libs/sink-lib";
import {
transformUpdateValuesSchema,
transformDeleteSchema,
transformedUpdateIdSchema,
} from "./update/adminChangeSchemas";
import { getPackageChangelog } from "lib/libs/api/package";

// One notable difference between this handler and sinkMain's...
// The order in which records are processed for the changelog doesn't matter.
Expand Down Expand Up @@ -48,6 +54,7 @@ const processAndIndex = async ({
for (const kafkaRecord of kafkaRecords) {
console.log(JSON.stringify(kafkaRecord, null, 2));
const { value, offset } = kafkaRecord;

try {
// If a legacy tombstone, continue
if (!value) {
Expand All @@ -57,16 +64,52 @@ const processAndIndex = async ({
// Parse the kafka record's value
const record = JSON.parse(decodeBase64WithUtf8(value));

// query all changelog entries for this ID and create copies of all entries with new ID
if (record.isAdminChange) {
const schema = transformDeleteSchema(offset).or(
transformUpdateValuesSchema(offset).or(transformedUpdateIdSchema),
);

const result = schema.safeParse(record);

if (result.success) {
if (result.data.adminChangeType === "update-id") {
docs.forEach((log) => {
const recordOffset = log.id.split("-").at(-1);

docs.push({
...log,
id: `${result.data.id}-${recordOffset}`,
packageId: result.data.id,
});
});
const packageChangelogs = await getPackageChangelog(result.data.idToBeUpdated);

packageChangelogs.hits.hits.forEach((log) => {
const recordOffset = log._id.split("-").at(-1);
docs.push({
...log._source,
id: `${result.data.id}-${recordOffset}`,
packageId: result.data.id,
});
});
} else {
docs.push(result.data);
}
} else {
console.log(
`Skipping package with invalid format for type "${record.adminChangeType}"`,
result.error.message,
);
}
}
// If we're not a mako event, continue
// TODO: handle legacy. for now, just continue
if (!record.event || record?.origin !== "mako") {
continue;
}

// If the event is a supported event, transform and push to docs array for indexing
console.log("event below");
console.log(record.event);

if (record.event in transforms) {
const transformForEvent = transforms[record.event as keyof typeof transforms];

Expand Down
81 changes: 81 additions & 0 deletions lib/lambda/sinkMainProcessors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,87 @@ describe("insertOneMacRecordsFromKafkaIntoMako", () => {
);
});

it("handles valid kafka admin records", () => {
insertOneMacRecordsFromKafkaIntoMako(
[
createKafkaRecord({
topic: TOPIC,
key: "TUQtMjQtMjMwMA==",
value: convertObjToBase64({
id: "MD-24-2301",
submitterName: "George Harrison",
submitterEmail: "george@example.com",
changeMade: "ID has been updated.",
isAdminChange: true,
adminChangeType: "update-id",
idToBeUpdated: "MD-24-2300",
}),
}),
createKafkaRecord({
topic: TOPIC,
key: "TUQtMjQtMjMwMA==",
value: convertObjToBase64({
id: "MD-24-2301",
submitterName: "George Harrison",
submitterEmail: "george@example.com",
changeMade: "title has been updated.",
isAdminChange: true,
adminChangeType: "update-values",
title: "updated title",
}),
}),
createKafkaRecord({
topic: TOPIC,
key: "TUQtMjQtMjMwMA==",
value: convertObjToBase64({
id: "MD-24-2301",
submitterName: "George Harrison",
submitterEmail: "george@example.com",
isAdminChange: true,
adminChangeType: "delete",
deleted: true,
}),
}),
],
TOPIC,
);

expect(spiedOnBulkUpdateDataWrapper).toBeCalledWith(
[
// record deleted
{
id: "MD-24-2301",
submitterName: "George Harrison",
submitterEmail: "george@example.com",
changeMade: "ID has been updated.",
isAdminChange: true,
adminChangeType: "update-id",
idToBeUpdated: "MD-24-2300",
},
// property updated
{
id: "MD-24-2301",
submitterName: "George Harrison",
submitterEmail: "george@example.com",
changeMade: "title has been updated.",
isAdminChange: true,
adminChangeType: "update-values",
title: "updated title",
},
// id updated
{
id: "MD-24-2301",
submitterName: "George Harrison",
submitterEmail: "george@example.com",
isAdminChange: true,
adminChangeType: "delete",
deleted: true,
},
],
"main",
);
});

it("skips value-less kafka records", () => {
insertOneMacRecordsFromKafkaIntoMako(
[
Expand Down
53 changes: 47 additions & 6 deletions lib/lambda/sinkMainProcessors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,73 @@ import { KafkaRecord, opensearch, SeatoolRecordWithUpdatedDate } from "shared-ty
import { Document, transforms } from "shared-types/opensearch/main";
import { decodeBase64WithUtf8 } from "shared-utils";
import { isBefore } from "date-fns";
import {
deleteAdminChangeSchema,
updateValuesAdminChangeSchema,
updateIdAdminChangeSchema,
} from "./update/adminChangeSchemas";

const removeDoubleQuotesSurroundingString = (str: string) => str.replace(/^"|"$/g, "");
const adminRecordSchema = deleteAdminChangeSchema
.or(updateValuesAdminChangeSchema)
.or(updateIdAdminChangeSchema);

type OneMacRecord = {
id: string;
makoChangedDate: string | null;
[key: string]: unknown | undefined;
};

type ParsedRecordFromKafka = Partial<{
event: string;
origin: string;
isAdminChange: boolean;
adminChangeType: string;
}>;

const isRecordAOneMacRecord = (
record: Partial<{
event: string;
origin: string;
}>,
record: ParsedRecordFromKafka,
): record is { event: keyof typeof transforms } =>
typeof record === "object" &&
record?.event !== undefined &&
record.event in transforms &&
record?.origin === "mako";

const isRecordAnAdminOneMacRecord = (
record: ParsedRecordFromKafka,
): record is { adminChangeType: string; isAdminChange: boolean } =>
typeof record === "object" &&
record?.isAdminChange === true &&
record?.adminChangeType !== undefined;

const getOneMacRecordWithAllProperties = (
value: string,
topicPartition: string,
kafkaRecord: KafkaRecord,
): OneMacRecord | undefined => {
const record = JSON.parse(decodeBase64WithUtf8(value));

if (isRecordAnAdminOneMacRecord(record)) {
const safeRecord = adminRecordSchema.safeParse(record);

if (safeRecord.success === false) {
console.log(`Skipping package with invalid format for type "${record.adminChangeType}"`);

logError({
type: ErrorType.VALIDATION,
error: safeRecord.error.errors,
metadata: { topicPartition, kafkaRecord, record },
});

return;
}

const { data: oneMacAdminRecord } = safeRecord;

console.log(`admin record: ${JSON.stringify(oneMacAdminRecord, null, 2)}`);

return oneMacAdminRecord;
}

if (isRecordAOneMacRecord(record)) {
const transformForEvent = transforms[record.event];

Expand All @@ -37,7 +78,7 @@ const getOneMacRecordWithAllProperties = (
if (safeEvent.success === false) {
logError({
type: ErrorType.VALIDATION,
error: safeEvent.error,
error: safeEvent.error.errors,
metadata: { topicPartition, kafkaRecord, record },
});

Expand Down
50 changes: 50 additions & 0 deletions lib/lambda/update/adminChangeSchemas.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { z } from "zod";

export const deleteAdminChangeSchema = z
.object({
id: z.string(),
deleted: z.boolean(),
adminChangeType: z.literal("delete"),
})
.and(z.record(z.string(), z.any()));

export const updateValuesAdminChangeSchema = z
.object({
id: z.string(),
adminChangeType: z.literal("update-values"),
})
.and(z.record(z.string(), z.any()));

export const updateIdAdminChangeSchema = z
.object({
id: z.string(),
adminChangeType: z.literal("update-id"),
idToBeUpdated: z.string(),
})
.and(z.record(z.string(), z.any()));

export const transformDeleteSchema = (offset: number) =>
deleteAdminChangeSchema.transform((data) => ({
...data,
event: "delete",
packageId: data.id,
id: `${data.id}-${offset}`,
timestamp: Date.now(),
}));

export const transformUpdateValuesSchema = (offset: number) =>
updateValuesAdminChangeSchema.transform((data) => ({
...data,
event: "update-values",
packageId: data.id,
id: `${data.id}-${offset}`,
timestamp: Date.now(),
}));

export const transformedUpdateIdSchema = updateIdAdminChangeSchema.transform((data) => ({
...data,
event: "update-id",
packageId: data.id,
id: `${data.id}`,
timestamp: Date.now(),
}));
26 changes: 26 additions & 0 deletions lib/lambda/update/getPackageType.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { response } from "lib/libs/handler-lib";
import { events } from "lib/packages/shared-types";
import { getPackageChangelog } from "lib/libs/api/package";

export const getPackageType = async (packageId: string) => {
// use event of current package to determine how ID should be formatted
try {
const packageChangelog = await getPackageChangelog(packageId);
const packageSubmissionType = packageChangelog.hits.hits.find(
(pkg) => pkg._source.event in events,
);

if (!packageSubmissionType) {
throw new Error("The type of package could not be determined.");
}

return packageSubmissionType._source.event;
} catch (error) {
return response({
statusCode: 500,
body: {
message: error.message || "An error occurred determining the package submission type.",
},
});
}
};
Loading

0 comments on commit d0b74dd

Please sign in to comment.