Skip to content

Commit

Permalink
feat: ucan stream consumer store/add and store/remove count (#151)
Browse files Browse the repository at this point in the history
Part of metrics work #117

Adds system total metrics for `store/add` and `store/remove`. With
these, we can track number of CARs that we commited to store, and how
many were removed.

Other notes:
- we will still need to collect the effective number of CARs that are
stored given this metrics is just number of signed URLs we provided in
`store/add`.
- helper functions for database were moved around to be re-used in all
tests for metrics
  • Loading branch information
vasco-santos authored Mar 14, 2023
1 parent 9461e93 commit 57842a1
Show file tree
Hide file tree
Showing 11 changed files with 573 additions and 45 deletions.
39 changes: 39 additions & 0 deletions stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,29 @@ export function UcanInvocationStack({ stack, app }) {
}
})

// metrics store/add count
const metricsStoreAddTotalDLQ = new Queue(stack, 'metrics-store-add-total-dlq')
const metricsStoreAddTotalConsumer = new Function(stack, 'metrics-store-add-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-store-add-total.consumer',
deadLetterQueue: metricsStoreAddTotalDLQ.cdk.queue,
})

// metrics store/remove count
const metricsStoreRemoveTotalDLQ = new Queue(stack, 'metrics-store-remove-total-dlq')
const metricsStoreRemoveTotalConsumer = new Function(stack, 'metrics-store-remove-total-consumer', {
environment: {
TABLE_NAME: adminMetricsTable.tableName
},
permissions: [adminMetricsTable],
handler: 'functions/metrics-store-remove-total.consumer',
deadLetterQueue: metricsStoreRemoveTotalDLQ.cdk.queue,
})

// metrics store/add size total
const metricsStoreAddSizeTotalDLQ = new Queue(stack, 'metrics-store-add-size-total-dlq')
const metricsStoreAddSizeTotalConsumer = new Function(stack, 'metrics-store-add-size-total-consumer', {
environment: {
Expand Down Expand Up @@ -83,6 +106,22 @@ export function UcanInvocationStack({ stack, app }) {
}
},
consumers: {
metricsStoreAddTotalConsumer: {
function: metricsStoreAddTotalConsumer,
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
},
metricsStoreRemoveTotalConsumer: {
function: metricsStoreRemoveTotalConsumer,
cdk: {
eventSource: {
...(getKinesisEventSourceConfig(stack))
}
}
},
metricsStoreAddSizeTotalConsumer: {
function: metricsStoreAddSizeTotalConsumer,
// TODO: Set kinesis filters when supported by SST
Expand Down
4 changes: 4 additions & 0 deletions test/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ test('w3infra integration flow', async t => {

// Get metrics before upload
const beforeOperationMetrics = await getMetrics(t)
const beforeStoreAddTotal = beforeOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_TOTAL)
const beforeStoreAddSizeTotal = beforeOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_SIZE_TOTAL)

const s3Client = getAwsBucketClient()
Expand Down Expand Up @@ -173,18 +174,21 @@ test('w3infra integration flow', async t => {
if (beforeStoreAddSizeTotal && spaceBeforeUploadAddMetrics) {
await pWaitFor(async () => {
const afterOperationMetrics = await getMetrics(t)
const afterStoreAddTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_TOTAL)
const afterStoreAddSizeTotal = afterOperationMetrics.find(row => row.name === METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
const spaceAfterUploadAddMetrics = await getSpaceMetrics(t, spaceDid, SPACE_METRICS_NAMES.UPLOAD_ADD_TOTAL)

// If staging accept more broad condition given multiple parallel tests can happen there
if (stage === 'staging') {
return (
afterStoreAddTotal?.value >= beforeStoreAddTotal?.value + carSize &&
afterStoreAddSizeTotal?.value >= beforeStoreAddSizeTotal.value + carSize &&
spaceAfterUploadAddMetrics?.value >= spaceBeforeUploadAddMetrics?.value + 1
)
}

return (
afterStoreAddTotal?.value === beforeStoreAddTotal?.value + 1 &&
afterStoreAddSizeTotal?.value === beforeStoreAddSizeTotal.value + carSize &&
spaceAfterUploadAddMetrics?.value === spaceBeforeUploadAddMetrics?.value + 1
)
Expand Down
11 changes: 9 additions & 2 deletions ucan-invocation/constants.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
// UCAN protocol
export const STORE_ADD = 'store/add'
export const STORE_REMOVE = 'store/remove'
export const UPLOAD_ADD = 'upload/add'

// Admin Metrics
export const METRICS_NAMES = {
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`
UPLOAD_ADD_TOTAL: `${UPLOAD_ADD}-total`,
STORE_ADD_TOTAL: `${STORE_ADD}-total`,
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`,
STORE_REMOVE_TOTAL: `${STORE_REMOVE}-total`,
}

// Spade Metrics
export const SPACE_METRICS_NAMES = {
UPLOAD_ADD_TOTAL: `${UPLOAD_ADD}-total`
UPLOAD_ADD_TOTAL: `${UPLOAD_ADD}-total`,
STORE_ADD_TOTAL: `${STORE_ADD}-total`,
STORE_ADD_SIZE_TOTAL: `${STORE_ADD}-size-total`,
STORE_REMOVE_TOTAL: `${STORE_REMOVE}-total`,
}
46 changes: 46 additions & 0 deletions ucan-invocation/functions/metrics-store-add-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
} = process.env

await updateStoreAddTotal(ucanInvocations, {
metricsTable: createMetricsTable(AWS_REGION, tableName, {
endpoint: dbEndpoint
})
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').TotalSizeCtx} ctx
*/
export async function updateStoreAddTotal (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementStoreAddTotal(invocationsWithStoreAdd)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
46 changes: 46 additions & 0 deletions ucan-invocation/functions/metrics-store-remove-total.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* @param {import('aws-lambda').KinesisStreamEvent} event
*/
async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)

const {
TABLE_NAME: tableName = '',
// set for testing
DYNAMO_DB_ENDPOINT: dbEndpoint,
} = process.env

await updateStoreRemoveTotal(ucanInvocations, {
metricsTable: createMetricsTable(AWS_REGION, tableName, {
endpoint: dbEndpoint
})
})
}

/**
* @param {import('../types').UcanInvocation[]} ucanInvocations
* @param {import('../types').TotalSizeCtx} ctx
*/
export async function updateStoreRemoveTotal (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementStoreRemoveTotal(invocationsWithStoreRemove)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)
44 changes: 44 additions & 0 deletions ucan-invocation/tables/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,28 @@ export function createMetricsTable (region, tableName, options = {}) {
})

return {
/**
* Increment total count from store/add operations.
*
* @param {Capabilities} operationsInv
*/
incrementStoreAddTotal: async (operationsInv) => {
const invTotalSize = operationsInv.length

const updateCmd = new UpdateItemCommand({
TableName: tableName,
UpdateExpression: `ADD #value :value`,
ExpressionAttributeNames: {'#value': 'value'},
ExpressionAttributeValues: {
':value': { N: String(invTotalSize) },
},
Key: marshall({
name: METRICS_NAMES.STORE_ADD_TOTAL
})
})

await dynamoDb.send(updateCmd)
},
/**
* Increment total value from new given operations.
*
Expand All @@ -52,6 +74,28 @@ export function createMetricsTable (region, tableName, options = {}) {
})
})

await dynamoDb.send(updateCmd)
},
/**
* Increment total count from store/remove operations.
*
* @param {Capabilities} operationsInv
*/
incrementStoreRemoveTotal: async (operationsInv) => {
const invTotalSize = operationsInv.length

const updateCmd = new UpdateItemCommand({
TableName: tableName,
UpdateExpression: `ADD #value :value`,
ExpressionAttributeNames: {'#value': 'value'},
ExpressionAttributeValues: {
':value': { N: String(invTotalSize) },
},
Key: marshall({
name: METRICS_NAMES.STORE_REMOVE_TOTAL
})
})

await dynamoDb.send(updateCmd)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import { testConsumer as test } from '../helpers/context.js'

import { customAlphabet } from 'nanoid'
import { CreateTableCommand, GetItemCommand } from '@aws-sdk/client-dynamodb'
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'
import * as Signer from '@ucanto/principal/ed25519'
import * as StoreCapabilities from '@web3-storage/capabilities/store'

import { adminMetricsTableProps } from '../../tables/index.js'
import { createDynamodDb, dynamoDBTableConfig } from '../helpers/resources.js'
import { createDynamodDb } from '../helpers/resources.js'
import { createSpace } from '../helpers/ucanto.js'
import { randomCAR } from '../helpers/random.js'
import { createDynamoAdminMetricsTable, getItemFromTable} from '../helpers/tables.js'

import { updateSizeTotal } from '../../functions/metrics-store-add-size-total.js'
import { createMetricsTable } from '../../tables/metrics.js'
Expand Down Expand Up @@ -62,7 +59,9 @@ test('handles a batch of single invocation with store/add', async t => {
metricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
const item = await getItemFromTable(t.context.dynamoClient, tableName, {
name: METRICS_NAMES.STORE_ADD_SIZE_TOTAL
})
t.truthy(item)
t.is(item?.name, METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
t.is(item?.value, car.size)
Expand Down Expand Up @@ -103,7 +102,10 @@ test('handles batch of single invocations with multiple store/add attributes', a
metricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
const item = await getItemFromTable(t.context.dynamoClient, tableName, {
name: METRICS_NAMES.STORE_ADD_SIZE_TOTAL
})

t.truthy(item)
t.is(item?.name, METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
t.is(item?.value, cars.reduce((acc, c) => acc + c.size, 0))
Expand All @@ -114,45 +116,10 @@ test('handles batch of single invocations with multiple store/add attributes', a
*/
async function prepareResources (dynamoClient) {
const [ tableName ] = await Promise.all([
createDynamouploadTable(dynamoClient),
createDynamoAdminMetricsTable(dynamoClient),
])

return {
tableName
}
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamo
*/
async function createDynamouploadTable(dynamo) {
const id = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 10)
const tableName = id()

await dynamo.send(new CreateTableCommand({
TableName: tableName,
...dynamoDBTableConfig(adminMetricsTableProps),
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 1
}
}))

return tableName
}

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamo
* @param {string} tableName
* @param {string} name
*/
async function getItemFromTable(dynamo, tableName, name) {
const params = {
TableName: tableName,
Key: marshall({
name,
})
}
const response = await dynamo.send(new GetItemCommand(params))
return response?.Item && unmarshall(response?.Item)
}
Loading

0 comments on commit 57842a1

Please sign in to comment.