Skip to content

Commit

Permalink
feat: ucan stream consumer store/add and store/remove count
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Mar 13, 2023
1 parent ac2a4a7 commit f2b3080
Show file tree
Hide file tree
Showing 11 changed files with 575 additions and 48 deletions.
40 changes: 39 additions & 1 deletion stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,29 @@ export function UcanInvocationStack({ stack, app }) {
}
})

// metrics store/add count
const metricsStoreAddTotalDLQ = new Queue(stack, 'metrics-store-add-total-dlq')
const metricsStoreAddTotalConsumer = new Function(stack, 'metrics-store-add-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-store-add-total.consumer',
deadLetterQueue: metricsStoreAddTotalDLQ.cdk.queue,
})

// metrics store/remove count
const metricsStoreRemoveTotalDLQ = new Queue(stack, 'metrics-store-remove-total-dlq')
const metricsStoreRemoveTotalConsumer = new Function(stack, 'metrics-store-remove-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-store-remove-total.consumer',
deadLetterQueue: metricsStoreRemoveTotalDLQ.cdk.queue,
})

// metrics store/add size total
const metricsStoreAddSizeTotalDLQ = new Queue(stack, 'metrics-store-add-size-total-dlq')
const metricsStoreAddSizeTotalConsumer = new Function(stack, 'metrics-store-add-size-total-consumer', {
environment: {
Expand All @@ -70,7 +93,22 @@ export function UcanInvocationStack({ stack, app }) {
}
},
consumers: {
// consumer1: 'functions/consumer1.handler'
metricsStoreAddTotalConsumer: {
function: metricsStoreAddTotalConsumer,
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
},
metricsStoreRemoveTotalConsumer: {
function: metricsStoreRemoveTotalConsumer,
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
},
metricsStoreAddSizeTotalConsumer: {
function: metricsStoreAddSizeTotalConsumer,
// TODO: Set kinesis filters when supported by SST
Expand Down
12 changes: 10 additions & 2 deletions test/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ test('w3infra integration flow', async t => {

// Get metrics before upload
const beforeOperationMetrics = await getMetrics(t)
const beforeStoreAddTotal = beforeOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_TOTAL)
const beforeStoreAddSizeTotal = beforeOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_SIZE_TOTAL)

const s3Client = getAwsBucketClient()
Expand Down Expand Up @@ -159,13 +160,20 @@ test('w3infra integration flow', async t => {
beforeStoreAddSizeTotal && await pWaitFor(async () => {
const afterOperationMetrics = await getMetrics(t)
const afterStoreAddSizeTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
const afterStoreAddTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_TOTAL)

// If staging accept more broad condition given multiple parallel tests can happen there
if (stage === 'staging') {
return afterStoreAddSizeTotal?.value >= beforeStoreAddSizeTotal.value + carSize
return (
afterStoreAddSizeTotal?.value >= beforeStoreAddSizeTotal.value + carSize &&
afterStoreAddTotal?.value >= beforeStoreAddTotal?.value + carSize
)
}

return afterStoreAddSizeTotal?.value === beforeStoreAddSizeTotal.value + carSize
return (
afterStoreAddSizeTotal?.value === beforeStoreAddSizeTotal.value + carSize &&
afterStoreAddTotal?.value === beforeStoreAddTotal?.value + 1
)
})
})

Expand Down
7 changes: 5 additions & 2 deletions ucan-invocation/constants.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// UCAN protocol
export const STORE_ADD = 'store/add'
export const STORE_REMOVE = 'store/remove'

// Metrics
export const METRICS_NAMES = {
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`
}
STORE_ADD_TOTAL: `${STORE_ADD}-total`,
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`,
STORE_REMOVE_TOTAL: `${STORE_REMOVE}-total`,
}
46 changes: 46 additions & 0 deletions ucan-invocation/functions/metrics-store-add-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
} = process.env

await updateStoreAddTotal(ucanInvocations, {
metricsTable: createMetricsTable(AWS_REGION, tableName, {
endpoint: dbEndpoint
})
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').TotalSizeCtx} ctx
*/
export async function updateStoreAddTotal (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementStoreAddTotal(invocationsWithStoreAdd)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
46 changes: 46 additions & 0 deletions ucan-invocation/functions/metrics-store-remove-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
} = process.env

await updateStoreRemoveTotal(ucanInvocations, {
metricsTable: createMetricsTable(AWS_REGION, tableName, {
endpoint: dbEndpoint
})
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').TotalSizeCtx} ctx
*/
export async function updateStoreRemoveTotal (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementStoreRemoveTotal(invocationsWithStoreRemove)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
44 changes: 44 additions & 0 deletions ucan-invocation/tables/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,28 @@ export function createMetricsTable (region, tableName, options = {}) {
})

return {
/**
* Increment total count from store/add operations.
*
* @param {Capabilities} operationsInv
*/
incrementStoreAddTotal: async (operationsInv) => {
const invTotalSize = operationsInv.length

const updateCmd = new UpdateItemCommand({
TableName: tableName,
UpdateExpression: `ADD #value :value`,
ExpressionAttributeNames: {'#value': 'value'},
ExpressionAttributeValues: {
':value': { N: String(invTotalSize) },
},
Key: marshall({
name: METRICS_NAMES.STORE_ADD_TOTAL
})
})

await dynamoDb.send(updateCmd)
},
/**
* Increment total value from new given operations.
*
Expand All @@ -52,6 +74,28 @@ export function createMetricsTable (region, tableName, options = {}) {
})
})

await dynamoDb.send(updateCmd)
},
/**
* Increment total count from store/remove operations.
*
* @param {Capabilities} operationsInv
*/
incrementStoreRemoveTotal: async (operationsInv) => {
const invTotalSize = operationsInv.length

const updateCmd = new UpdateItemCommand({
TableName: tableName,
UpdateExpression: `ADD #value :value`,
ExpressionAttributeNames: {'#value': 'value'},
ExpressionAttributeValues: {
':value': { N: String(invTotalSize) },
},
Key: marshall({
name: METRICS_NAMES.STORE_REMOVE_TOTAL
})
})

await dynamoDb.send(updateCmd)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import { testConsumer as test } from '../helpers/context.js'

import { customAlphabet } from 'nanoid'
import { CreateTableCommand, GetItemCommand } from '@aws-sdk/client-dynamodb'
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'
import * as Signer from '@ucanto/principal/ed25519'
import * as StoreCapabilities from '@web3-storage/capabilities/store'

import { adminMetricsTableProps } from '../../tables/index.js'
import { createDynamodDb, dynamoDBTableConfig } from '../helpers/resources.js'
import { createDynamodDb } from '../helpers/resources.js'
import { createSpace } from '../helpers/ucanto.js'
import { randomCAR } from '../helpers/random.js'
import { createDynamoAdminMetricsTable, getItemFromTable} from '../helpers/tables.js'

import { updateSizeTotal } from '../../functions/metrics-store-add-size-total.js'
import { createMetricsTable } from '../../tables/metrics.js'
Expand Down Expand Up @@ -62,7 +59,9 @@ test('handles a batch of single invocation with store/add', async t => {
metricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
const item = await getItemFromTable(t.context.dynamoClient, tableName, {
name: METRICS_NAMES.STORE_ADD_SIZE_TOTAL
})
t.truthy(item)
t.is(item?.name, METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
t.is(item?.value, car.size)
Expand Down Expand Up @@ -103,7 +102,10 @@ test('handles batch of single invocations with multiple store/add attributes', a
metricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
const item = await getItemFromTable(t.context.dynamoClient, tableName, {
name: METRICS_NAMES.STORE_ADD_SIZE_TOTAL
})

t.truthy(item)
t.is(item?.name, METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
t.is(item?.value, cars.reduce((acc, c) => acc + c.size, 0))
Expand All @@ -114,45 +116,10 @@ test('handles batch of single invocations with multiple store/add attributes', a
*/
async function prepareResources (dynamoClient) {
const [ tableName ] = await Promise.all([
createDynamouploadTable(dynamoClient),
createDynamoAdminMetricsTable(dynamoClient),
])

return {
tableName
}
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamo
*/
async function createDynamouploadTable(dynamo) {
const id = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 10)
const tableName = id()

await dynamo.send(new CreateTableCommand({
TableName: tableName,
...dynamoDBTableConfig(adminMetricsTableProps),
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 1
}
}))

return tableName
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamo
* @param {string} tableName
* @param {string} name
*/
async function getItemFromTable(dynamo, tableName, name) {
const params = {
TableName: tableName,
Key: marshall({
name,
})
}
const response = await dynamo.send(new GetItemCommand(params))
return response?.Item && unmarshall(response?.Item)
}
Loading

0 comments on commit f2b3080

Please sign in to comment.