Skip to content

Commit

Permalink
feat: ucan stream consumer store remove size
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Mar 16, 2023
1 parent a29403d commit 9deb620
Show file tree
Hide file tree
Showing 13 changed files with 435 additions and 6 deletions.
3 changes: 3 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion stacks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ export default function (app) {
})
app.stack(BusStack)
app.stack(UploadDbStack)
app.stack(UcanInvocationStack)
app.stack(CarparkStack)
app.stack(UcanInvocationStack)
app.stack(SatnavStack)
app.stack(UploadApiStack)
app.stack(ReplicatorStack)
Expand Down
28 changes: 25 additions & 3 deletions stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import { Duration } from 'aws-cdk-lib'

import { BusStack } from './bus-stack.js'
import { CarparkStack } from './carpark-stack.js'
import { UploadDbStack } from './upload-db-stack.js'
import {
getBucketConfig,
Expand All @@ -26,8 +27,9 @@ export function UcanInvocationStack({ stack, app }) {
// Setup app monitoring with Sentry
setupSentry(app, stack)

// Get eventBus reference
// Get dependent stack references
const { eventBus } = use(BusStack)
const { carparkBucket } = use(CarparkStack)
const { adminMetricsTable, spaceMetricsTable } = use(UploadDbStack)

const ucanBucket = new Bucket(stack, 'ucan-store', {
Expand Down Expand Up @@ -96,6 +98,18 @@ export function UcanInvocationStack({ stack, app }) {
deadLetterQueue: metricsUploadAddTotalDLQ.cdk.queue,
})

// store/remove size
const metricsStoreRemoveSizeTotalDLQ = new Queue(stack, 'metrics-store-remove-size-total-dlq')
const metricsStoreRemoveSizeTotalConsumer = new Function(stack, 'metrics-store-remove-size-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName,
STORE_BUCKET_NAME: carparkBucket.bucketName,
},
permissions: [adminMetricsTable, carparkBucket],
handler: 'functions/metrics-store-remove-size-total.consumer',
deadLetterQueue: metricsStoreRemoveSizeTotalDLQ.cdk.queue,
})

// metrics upload/remove count
const metricsUploadRemoveTotalDLQ = new Queue(stack, 'metrics-upload-remove-total-dlq')
const metricsUploadRemoveTotalConsumer = new Function(stack, 'metrics-upload-remove-total-consumer', {
Expand Down Expand Up @@ -174,6 +188,16 @@ export function UcanInvocationStack({ stack, app }) {
}
}
},
metricsStoreRemoveSizeTotalConsumer: {
function: metricsStoreRemoveSizeTotalConsumer,
// TODO: Set kinesis filters when supported by SST
// https://github.com/serverless-stack/sst/issues/1407
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
},
metricsUploadAddTotalConsumer: {
function: metricsUploadAddTotalConsumer,
cdk: {
Expand All @@ -192,8 +216,6 @@ export function UcanInvocationStack({ stack, app }) {
},
spaceMetricsUploadAddTotalConsumer: {
function: spaceMetricsUploadAddTotalConsumer,
// TODO: Set kinesis filters when supported by SST
// https://github.com/serverless-stack/sst/issues/1407
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
Expand Down
50 changes: 50 additions & 0 deletions ucan-invocation/buckets/car-store.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import {
S3Client,
HeadObjectCommand,
} from '@aws-sdk/client-s3'

/**
* Abstraction layer with Factory to perform operations on bucket storing CAR files.
*
* @param {string} region
* @param {string} bucketName
* @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options]
*/
export function createCarStore(region, bucketName, options) {
const s3 = new S3Client({
region,
...options,
})
return useCarStore(s3, bucketName)
}

/**
* @param {S3Client} s3
* @param {string} bucketName
* @returns {import('../types').CarStoreBucket}
*/
export function useCarStore(s3, bucketName) {
return {
/**
* @param {import('multiformats').UnknownLink} link
*/
getSize: async (link) => {
const cid = link.toString()
const cmd = new HeadObjectCommand({
Key: `${cid}/${cid}.car`,
Bucket: bucketName,
})
let res
try {
res = await s3.send(cmd)
} catch (cause) {
// @ts-expect-error
if (cause?.$metadata?.httpStatusCode === 404) {
return 0
}
throw new Error('Failed to check if car-store', { cause })
}
return res.ContentLength || 0
},
}
}
1 change: 1 addition & 0 deletions ucan-invocation/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ export const SPACE_METRICS_NAMES = {
STORE_ADD_TOTAL: `${STORE_ADD}-total`,
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`,
STORE_REMOVE_TOTAL: `${STORE_REMOVE}-total`,
STORE_REMOVE_SIZE_TOTAL: `${STORE_REMOVE}-size-total`,
}
55 changes: 55 additions & 0 deletions ucan-invocation/functions/metrics-store-remove-size-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import * as Sentry from '@sentry/serverless'

import { createCarStore } from '../buckets/car-store.js'
import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
STORE_BUCKET_NAME: storeBucketName = '',
} = process.env

await updateRemoveSizeTotal(ucanInvocations, {
metricsTable: createMetricsTable(AWS_REGION, tableName),
carStoreBucket: createCarStore(AWS_REGION, storeBucketName)
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').RemoveSizeCtx} ctx
*/
export async function updateRemoveSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
).flatMap(inv => inv.value.att)

// TODO: once we have receipts for store/remove, replace this
// Temporary adaptor to set size in invocation
for (const inv of invocationsWithStoreRemove) {
// @ts-ignore remove invocations always have link
const size = await ctx.carStoreBucket.getSize(inv.nb?.link)

// @ts-ignore
inv.nb.size = size
}

await ctx.metricsTable.incrementStoreRemoveSizeTotal(invocationsWithStoreRemove)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
2 changes: 2 additions & 0 deletions ucan-invocation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@aws-sdk/client-dynamodb": "^3.226.0",
"@aws-sdk/client-s3": "^3.226.0",
"@serverless-stack/resources": "*",
"@ucanto/interface": "^4.2.3",
"ava": "^4.3.3",
Expand Down
22 changes: 22 additions & 0 deletions ucan-invocation/tables/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,28 @@ export function createMetricsTable (region, tableName, options = {}) {
name: METRICS_NAMES.STORE_REMOVE_TOTAL
})
})
await dynamoDb.send(updateCmd)
},
/**
* Increment total value from new given operations.
*
* @param {Capabilities} operationsInv
*/
incrementStoreRemoveSizeTotal: async (operationsInv) => {
// @ts-expect-error
const invTotalSize = operationsInv.reduce((acc, c) => acc + c.nb?.size, 0)

const updateCmd = new UpdateItemCommand({
TableName: tableName,
UpdateExpression: `ADD #value :value`,
ExpressionAttributeNames: {'#value': 'value'},
ExpressionAttributeValues: {
':value': { N: String(invTotalSize) },
},
Key: marshall({
name: METRICS_NAMES.STORE_REMOVE_SIZE_TOTAL
})
})

await dynamoDb.send(updateCmd)
},
Expand Down
Loading

0 comments on commit 9deb620

Please sign in to comment.