From 350791877d2830e6683c04299d5d2c034adab3d2 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 13 Mar 2023 09:38:02 +0100 Subject: [PATCH] feat: ucan stream consumer store remove size --- package-lock.json | 3 + stacks/index.js | 2 +- stacks/ucan-invocation-stack.js | 28 ++- ucan-invocation/buckets/car-store.js | 50 ++++ ucan-invocation/constants.js | 1 + .../metrics-store-remove-size-total.js | 55 +++++ ucan-invocation/package.json | 2 + ucan-invocation/tables/metrics.js | 22 ++ .../metrics-store-remove-size-total.test.js | 216 ++++++++++++++++++ .../metrics-store-remove-total.test.js | 2 +- ucan-invocation/test/helpers/context.js | 8 + ucan-invocation/test/helpers/resources.js | 42 ++++ ucan-invocation/types.ts | 11 +- 13 files changed, 436 insertions(+), 6 deletions(-) create mode 100644 ucan-invocation/buckets/car-store.js create mode 100644 ucan-invocation/functions/metrics-store-remove-size-total.js create mode 100644 ucan-invocation/test/functions/metrics-store-remove-size-total.test.js diff --git a/package-lock.json b/package-lock.json index 3dfd85fc..3508982b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15118,6 +15118,8 @@ "uint8arrays": "^4.0.2" }, "devDependencies": { + "@aws-sdk/client-dynamodb": "^3.226.0", + "@aws-sdk/client-s3": "^3.226.0", "@serverless-stack/resources": "*", "@ucanto/interface": "^4.2.3", "ava": "^4.3.3", @@ -19405,6 +19407,7 @@ "requires": { "@aws-sdk/client-dynamodb": "^3.226.0", "@aws-sdk/client-eventbridge": "^3.218.0", + "@aws-sdk/client-s3": "^3.226.0", "@sentry/serverless": "^7.22.0", "@serverless-stack/resources": "*", "@ucanto/interface": "^4.2.3", diff --git a/stacks/index.js b/stacks/index.js index 3b8ee33f..526b962a 100644 --- a/stacks/index.js +++ b/stacks/index.js @@ -29,8 +29,8 @@ export default function (app) { }) app.stack(BusStack) app.stack(UploadDbStack) - app.stack(UcanInvocationStack) app.stack(CarparkStack) + app.stack(UcanInvocationStack) app.stack(SatnavStack) app.stack(UploadApiStack) app.stack(ReplicatorStack) diff --git a/stacks/ucan-invocation-stack.js b/stacks/ucan-invocation-stack.js index d8967c92..8d279cea 100644 --- a/stacks/ucan-invocation-stack.js +++ b/stacks/ucan-invocation-stack.js @@ -8,6 +8,7 @@ import { import { Duration } from 'aws-cdk-lib' import { BusStack } from './bus-stack.js' +import { CarparkStack } from './carpark-stack.js' import { UploadDbStack } from './upload-db-stack.js' import { getBucketConfig, @@ -26,8 +27,9 @@ export function UcanInvocationStack({ stack, app }) { // Setup app monitoring with Sentry setupSentry(app, stack) - // Get eventBus reference + // Get dependent stack references const { eventBus } = use(BusStack) + const { carparkBucket } = use(CarparkStack) const { adminMetricsTable, spaceMetricsTable } = use(UploadDbStack) const ucanBucket = new Bucket(stack, 'ucan-store', { @@ -96,6 +98,18 @@ export function UcanInvocationStack({ stack, app }) { deadLetterQueue: metricsUploadAddTotalDLQ.cdk.queue, }) + // store/remove size + const metricsStoreRemoveSizeTotalDLQ = new Queue(stack, 'metrics-store-remove-size-total-dlq') + const metricsStoreRemoveSizeTotalConsumer = new Function(stack, 'metrics-store-remove-size-total-consumer', { + environment: { + TABLE_NAME: adminMetricsTable.tableName, + STORE_BUCKET_NAME: carparkBucket.bucketName, + }, + permissions: [adminMetricsTable, carparkBucket], + handler: 'functions/metrics-store-remove-size-total.consumer', + deadLetterQueue: metricsStoreRemoveSizeTotalDLQ.cdk.queue, + }) + // metrics upload/remove count const metricsUploadRemoveTotalDLQ = new Queue(stack, 'metrics-upload-remove-total-dlq') const metricsUploadRemoveTotalConsumer = new Function(stack, 'metrics-upload-remove-total-consumer', { @@ -174,6 +188,16 @@ export function UcanInvocationStack({ stack, app }) { } } }, + metricsStoreRemoveSizeTotalConsumer: { + function: metricsStoreRemoveSizeTotalConsumer, + // TODO: Set kinesis filters when supported by SST + // https://github.com/serverless-stack/sst/issues/1407 + cdk: { + eventSource: { + ...(getKinesisEventSourceConfig(stack)) + } + } + }, metricsUploadAddTotalConsumer: { function: metricsUploadAddTotalConsumer, cdk: { @@ -192,8 +216,6 @@ export function UcanInvocationStack({ stack, app }) { }, spaceMetricsUploadAddTotalConsumer: { function: spaceMetricsUploadAddTotalConsumer, - // TODO: Set kinesis filters when supported by SST - // https://github.com/serverless-stack/sst/issues/1407 cdk: { eventSource: { ...(getKinesisEventSourceConfig(stack)) diff --git a/ucan-invocation/buckets/car-store.js b/ucan-invocation/buckets/car-store.js new file mode 100644 index 00000000..b54b3793 --- /dev/null +++ b/ucan-invocation/buckets/car-store.js @@ -0,0 +1,50 @@ +import { + S3Client, + HeadObjectCommand, +} from '@aws-sdk/client-s3' + +/** + * Abstraction layer with Factory to perform operations on bucket storing CAR files. + * + * @param {string} region + * @param {string} bucketName + * @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options] + */ +export function createCarStore(region, bucketName, options) { + const s3 = new S3Client({ + region, + ...options, + }) + return useCarStore(s3, bucketName) +} + +/** + * @param {S3Client} s3 + * @param {string} bucketName + * @returns {import('../types').CarStoreBucket} + */ +export function useCarStore(s3, bucketName) { + return { + /** + * @param {import('multiformats').UnknownLink} link + */ + getSize: async (link) => { + const cid = link.toString() + const cmd = new HeadObjectCommand({ + Key: `${cid}/${cid}.car`, + Bucket: bucketName, + }) + let res + try { + res = await s3.send(cmd) + } catch (cause) { + // @ts-expect-error + if (cause?.$metadata?.httpStatusCode === 404) { + return 0 + } + throw new Error('Failed to check if car-store', { cause }) + } + return res.ContentLength || 0 + }, + } +} diff --git a/ucan-invocation/constants.js b/ucan-invocation/constants.js index 3c803434..07f4719a 100644 --- a/ucan-invocation/constants.js +++ b/ucan-invocation/constants.js @@ -30,4 +30,5 @@ export const SPACE_METRICS_NAMES = { STORE_ADD_TOTAL: `${STORE_ADD}-total`, STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`, STORE_REMOVE_TOTAL: `${STORE_REMOVE}-total`, + STORE_REMOVE_SIZE_TOTAL: `${STORE_REMOVE}-size-total`, } diff --git a/ucan-invocation/functions/metrics-store-remove-size-total.js b/ucan-invocation/functions/metrics-store-remove-size-total.js new file mode 100644 index 00000000..8952b67a --- /dev/null +++ b/ucan-invocation/functions/metrics-store-remove-size-total.js @@ -0,0 +1,55 @@ +import * as Sentry from '@sentry/serverless' + +import { createCarStore } from '../buckets/car-store.js' +import { createMetricsTable } from '../tables/metrics.js' +import { parseKinesisEvent } from '../utils/parse-kinesis-event.js' +import { STORE_REMOVE } from '../constants.js' + +Sentry.AWSLambda.init({ + environment: process.env.SST_STAGE, + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 1.0, +}) + +const AWS_REGION = process.env.AWS_REGION || 'us-west-2' + +/** + * @param {import('aws-lambda').KinesisStreamEvent} event + */ +async function handler(event) { + const ucanInvocations = parseKinesisEvent(event) + + const { + TABLE_NAME: tableName = '', + STORE_BUCKET_NAME: storeBucketName = '', + } = process.env + + await updateRemoveSizeTotal(ucanInvocations, { + metricsTable: createMetricsTable(AWS_REGION, tableName), + carStoreBucket: createCarStore(AWS_REGION, storeBucketName) + }) +} + +/** + * @param {import('../types').UcanInvocation[]} ucanInvocations + * @param {import('../types').RemoveSizeCtx} ctx + */ +export async function updateRemoveSizeTotal (ucanInvocations, ctx) { + const invocationsWithStoreRemove = ucanInvocations.filter( + inv => inv.value.att.find(a => a.can === STORE_REMOVE) + ).flatMap(inv => inv.value.att) + + // TODO: once we have receipts for store/remove, replace this + // Temporary adaptor to set size in invocation + for (const inv of invocationsWithStoreRemove) { + // @ts-ignore remove invocations always have link + const size = await ctx.carStoreBucket.getSize(inv.nb?.link) + + // @ts-ignore + inv.nb.size = size + } + + await ctx.metricsTable.incrementStoreRemoveSizeTotal(invocationsWithStoreRemove) +} + +export const consumer = Sentry.AWSLambda.wrapHandler(handler) diff --git a/ucan-invocation/package.json b/ucan-invocation/package.json index caa77f8c..e641c65f 100644 --- a/ucan-invocation/package.json +++ b/ucan-invocation/package.json @@ -13,6 +13,8 @@ "uint8arrays": "^4.0.2" }, "devDependencies": { + "@aws-sdk/client-dynamodb": "^3.226.0", + "@aws-sdk/client-s3": "^3.226.0", "@serverless-stack/resources": "*", "@ucanto/interface": "^4.2.3", "ava": "^4.3.3", diff --git a/ucan-invocation/tables/metrics.js b/ucan-invocation/tables/metrics.js index 43877832..813cb125 100644 --- a/ucan-invocation/tables/metrics.js +++ b/ucan-invocation/tables/metrics.js @@ -95,6 +95,28 @@ export function createMetricsTable (region, tableName, options = {}) { name: METRICS_NAMES.STORE_REMOVE_TOTAL }) }) + await dynamoDb.send(updateCmd) + }, + /** + * Increment total value from new given operations. + * + * @param {Capabilities} operationsInv + */ + incrementStoreRemoveSizeTotal: async (operationsInv) => { + // @ts-expect-error + const invTotalSize = operationsInv.reduce((acc, c) => acc + c.nb?.size, 0) + + const updateCmd = new UpdateItemCommand({ + TableName: tableName, + UpdateExpression: `ADD #value :value`, + ExpressionAttributeNames: {'#value': 'value'}, + ExpressionAttributeValues: { + ':value': { N: String(invTotalSize) }, + }, + Key: marshall({ + name: METRICS_NAMES.STORE_REMOVE_SIZE_TOTAL + }) + }) await dynamoDb.send(updateCmd) }, diff --git a/ucan-invocation/test/functions/metrics-store-remove-size-total.test.js b/ucan-invocation/test/functions/metrics-store-remove-size-total.test.js new file mode 100644 index 00000000..67292e12 --- /dev/null +++ b/ucan-invocation/test/functions/metrics-store-remove-size-total.test.js @@ -0,0 +1,216 @@ +import { testConsumerWithBucket as test } from '../helpers/context.js' + +import * as Signer from '@ucanto/principal/ed25519' +import * as StoreCapabilities from '@web3-storage/capabilities/store' +import { PutObjectCommand } from '@aws-sdk/client-s3' + +import { createSpace } from '../helpers/ucanto.js' +import { randomCAR } from '../helpers/random.js' +import { createDynamoTable, getItemFromTable} from '../helpers/tables.js' +import { adminMetricsTableProps } from '../../tables/index.js' +import { + createDynamodDb, + createS3, + createBucket, +} from '../helpers/resources.js' + +import { updateRemoveSizeTotal } from '../../functions/metrics-store-remove-size-total.js' +import { createMetricsTable } from '../../tables/metrics.js' +import { createCarStore } from '../../buckets/car-store.js' +import { METRICS_NAMES } from '../../constants.js' + +const REGION = 'us-west-2' + +test.before(async t => { + // Dynamo DB + const { + client: dynamo, + endpoint: dbEndpoint + } = await createDynamodDb({ port: 8000 }) + t.context.dbEndpoint = dbEndpoint + t.context.dynamoClient = dynamo + + // S3 + const { client, clientOpts } = await createS3() + t.context.s3 = client + t.context.s3Opts = clientOpts +}) + +test('handles a batch of single invocation with store/remove', async t => { + const { tableName, bucketName } = await prepareResources(t.context.dynamoClient, t.context.s3) + const uploadService = await Signer.generate() + const alice = await Signer.generate() + const { spaceDid } = await createSpace(alice) + const car = await randomCAR(128) + + // Put CAR to bucket + const putObjectCmd = new PutObjectCommand({ + Key: `${car.cid.toString()}/${car.cid.toString()}.car`, + Bucket: bucketName, + Body: Buffer.from( + await car.arrayBuffer() + ) + }) + await t.context.s3.send(putObjectCmd) + + const metricsTable = createMetricsTable(REGION, tableName, { + endpoint: t.context.dbEndpoint + }) + const carStoreBucket = createCarStore(REGION, bucketName, t.context.s3Opts) + const invocations = [{ + carCid: car.cid.toString(), + value: { + att: [ + StoreCapabilities.remove.create({ + with: spaceDid, + nb: { + link: car.cid + } + }) + ], + aud: uploadService.did(), + iss: alice.did() + }, + ts: Date.now() + }] + + // @ts-expect-error + await updateRemoveSizeTotal(invocations, { + metricsTable, + carStoreBucket + }) + + const item = await getItemFromTable(t.context.dynamoClient, tableName, { + name: METRICS_NAMES.STORE_REMOVE_SIZE_TOTAL + }) + t.truthy(item) + t.is(item?.name, METRICS_NAMES.STORE_REMOVE_SIZE_TOTAL) + t.is(item?.value, car.size) +}) + +test('handles batch of single invocations with multiple store/remove attributes', async t => { + const { tableName, bucketName } = await prepareResources(t.context.dynamoClient, t.context.s3) + const uploadService = await Signer.generate() + const alice = await Signer.generate() + const { spaceDid } = await createSpace(alice) + + const cars = await Promise.all( + Array.from({ length: 10 }).map(() => randomCAR(128)) + ) + // Put CARs to bucket + await Promise.all(cars.map(async car => { + const putObjectCmd = new PutObjectCommand({ + Key: `${car.cid.toString()}/${car.cid.toString()}.car`, + Bucket: bucketName, + Body: Buffer.from( + await car.arrayBuffer() + ) + }) + return t.context.s3.send(putObjectCmd) + })) + + const metricsTable = createMetricsTable(REGION, tableName, { + endpoint: t.context.dbEndpoint + }) + const carStoreBucket = createCarStore(REGION, bucketName, t.context.s3Opts) + + const invocations = [{ + carCid: cars[0].cid.toString(), + value: { + att: cars.map((car) => StoreCapabilities.remove.create({ + with: spaceDid, + nb: { + link: car.cid + } + })), + aud: uploadService.did(), + iss: alice.did() + }, + ts: Date.now() + }] + + // @ts-expect-error + await updateRemoveSizeTotal(invocations, { + metricsTable, + carStoreBucket + }) + + const item = await getItemFromTable(t.context.dynamoClient, tableName, { + name: METRICS_NAMES.STORE_REMOVE_SIZE_TOTAL + }) + + t.truthy(item) + t.is(item?.name, METRICS_NAMES.STORE_REMOVE_SIZE_TOTAL) + t.is(item?.value, cars.reduce((acc, c) => acc + c.size, 0)) +}) + +test('handles a batch of single invocation without store/remove', async t => { + const { tableName, bucketName } = await prepareResources(t.context.dynamoClient, t.context.s3) + const uploadService = await Signer.generate() + const alice = await Signer.generate() + const { spaceDid } = await createSpace(alice) + const car = await randomCAR(128) + + // Put CAR to bucket + const putObjectCmd = new PutObjectCommand({ + Key: `${car.cid.toString()}/${car.cid.toString()}.car`, + Bucket: bucketName, + Body: Buffer.from( + await car.arrayBuffer() + ) + }) + await t.context.s3.send(putObjectCmd) + + const metricsTable = createMetricsTable(REGION, tableName, { + endpoint: t.context.dbEndpoint + }) + const carStoreBucket = createCarStore(REGION, bucketName, t.context.s3Opts) + + const invocations = [{ + carCid: car.cid.toString(), + value: { + att: [ + StoreCapabilities.add.create({ + with: spaceDid, + nb: { + link: car.cid, + size: car.size + } + }) + ], + aud: uploadService.did(), + iss: alice.did() + }, + ts: Date.now() + }] + + // @ts-expect-error + await updateRemoveSizeTotal(invocations, { + metricsTable, + carStoreBucket + }) + + const item = await getItemFromTable(t.context.dynamoClient, tableName, { + name: METRICS_NAMES.STORE_REMOVE_SIZE_TOTAL + }) + + t.truthy(item) + t.is(item?.name, METRICS_NAMES.STORE_REMOVE_SIZE_TOTAL) + t.is(item?.value, 0) +}) + +/** + * @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient + * @param {import('@aws-sdk/client-s3').S3Client} s3Client + */ +async function prepareResources (dynamoClient, s3Client) { + const [ tableName, bucketName ] = await Promise.all([ + createDynamoTable(dynamoClient, adminMetricsTableProps), + createBucket(s3Client) + ]) + + return { + bucketName, + tableName + } +} diff --git a/ucan-invocation/test/functions/metrics-store-remove-total.test.js b/ucan-invocation/test/functions/metrics-store-remove-total.test.js index c231b33a..e7f41458 100644 --- a/ucan-invocation/test/functions/metrics-store-remove-total.test.js +++ b/ucan-invocation/test/functions/metrics-store-remove-total.test.js @@ -164,4 +164,4 @@ async function prepareResources (dynamoClient) { return { tableName } -} +} \ No newline at end of file diff --git a/ucan-invocation/test/helpers/context.js b/ucan-invocation/test/helpers/context.js index 9270ae65..4ed06423 100644 --- a/ucan-invocation/test/helpers/context.js +++ b/ucan-invocation/test/helpers/context.js @@ -8,8 +8,13 @@ import anyTest from 'ava' * @property {string} dbEndpoint * @property {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient * + * @typedef {object} S3Context + * @property {import('@aws-sdk/client-s3').S3Client} s3 + * @property {import('@aws-sdk/client-s3').ServiceInputTypes} s3Opts + * * @typedef {import("ava").TestFn>} TestAnyFn * @typedef {import("ava").TestFn>} TestConsumerFn + * @typedef {import("ava").TestFn>} TestConsumerWithBucketFn */ // eslint-disable-next-line unicorn/prefer-export-from @@ -17,3 +22,6 @@ export const test = /** @type {TestAnyFn} */ (anyTest) // eslint-disable-next-line unicorn/prefer-export-from export const testConsumer = /** @type {TestConsumerFn} */ (anyTest) + +// eslint-disable-next-line unicorn/prefer-export-from +export const testConsumerWithBucket = /** @type {TestConsumerWithBucketFn} */ (anyTest) diff --git a/ucan-invocation/test/helpers/resources.js b/ucan-invocation/test/helpers/resources.js index f11706c2..ba66c848 100644 --- a/ucan-invocation/test/helpers/resources.js +++ b/ucan-invocation/test/helpers/resources.js @@ -1,4 +1,6 @@ import { GenericContainer as Container } from 'testcontainers' +import { customAlphabet } from 'nanoid' +import { S3Client, CreateBucketCommand } from '@aws-sdk/client-s3' import { DynamoDBClient } from '@aws-sdk/client-dynamodb' /** @@ -54,3 +56,43 @@ export function dynamoDBTableConfig ({ fields, primaryIndex }) { KeySchema } } + +/** + * @param {object} [opts] + * @param {number} [opts.port] + * @param {string} [opts.region] + */ +export async function createS3(opts = {}) { + const region = opts.region || 'us-west-2' + const port = opts.port || 9000 + + const minio = await new Container('quay.io/minio/minio') + .withCmd(['server', '/data']) + .withExposedPorts(port) + .start() + + const clientOpts = { + endpoint: `http://${minio.getHost()}:${minio.getMappedPort(port)}`, + forcePathStyle: true, + region, + credentials: { + accessKeyId: 'minioadmin', + secretAccessKey: 'minioadmin', + }, + } + + return { + client: new S3Client(clientOpts), + clientOpts, + } +} + +/** + * @param {S3Client} s3 + */ +export async function createBucket(s3) { + const id = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 10) + const Bucket = id() + await s3.send(new CreateBucketCommand({ Bucket })) + return Bucket +} diff --git a/ucan-invocation/types.ts b/ucan-invocation/types.ts index b26540c1..67939b02 100644 --- a/ucan-invocation/types.ts +++ b/ucan-invocation/types.ts @@ -6,10 +6,15 @@ export interface MetricsTable { incrementStoreAddTotal: (incrementSizeTotal: Capability[]) => Promise incrementStoreAddSizeTotal: (incrementSizeTotal: Capability[]) => Promise incrementStoreRemoveTotal: (incrementSizeTotal: Capability[]) => Promise + incrementStoreRemoveSizeTotal: (incrementSizeTotal: Capability[]) => Promise incrementUploadAddTotal: (incrementSizeTotal: Capability[]) => Promise incrementUploadRemoveTotal: (incrementSizeTotal: Capability[]) => Promise } +export interface CarStoreBucket { + getSize: (link: UnknownLink) => Promise +} + export interface TotalSizeCtx { metricsTable: MetricsTable } @@ -23,11 +28,15 @@ export interface SpaceMetricsTable { incrementStoreRemoveCount: (storeRemoveInv: Capability[]) => Promise incrementUploadAddCount: (uploadAddInv: Capability[]) => Promise } - export interface SpaceMetricsTableCtx { spaceMetricsTable: SpaceMetricsTable } +export interface RemoveSizeCtx { + metricsTable: MetricsTable + carStoreBucket: CarStoreBucket +} + export interface UcanInvocation { carCid: string, value: UcanInvocationValue,