Skip to content

Commit

Permalink
feat: add blob protocol to infra (#353)
Browse files Browse the repository at this point in the history
This PR creates stores and wires up new `upload-api` running `blob/add`,
`web3.storage/blob/allocate`, `web3.storage/blob/accept` and
`ucan/conclude` capabilities. Tests are also imported from `upload-api`
implementation and run here.

As agreed on storacha/w3up#1343 , there
won't be any deduping between old world and new world. Therefore, we
have new `allocations` table, and use different key schema in `carpark`.
We are writing blobs keyed as `base58btc` as previously discussed as
`${base58btcEncodedMultihash}/${base58btcEncodedMultihash}.blob`. I
added `.blob` suffix but I am happy to other suggestions. Depending on
how we progress with the reads side, we can consider creating a new
bucket to fully isolate new content?

The `receipts` and `tasks` storage end up being more complicated as they
need to follow
https://github.com/web3-storage/w3infra/blob/main/docs/ucan-invocation-stream.md#buckets,
and is essentially the same as what happens on
https://github.com/web3-storage/w3infra/blob/main/upload-api/ucan-invocation.js#L66
but at a different level as this is a proactive write of tasks and
receipts.
  • Loading branch information
vasco-santos committed Apr 18, 2024
1 parent 50b266f commit 13566fe
Show file tree
Hide file tree
Showing 22 changed files with 1,415 additions and 376 deletions.
2 changes: 1 addition & 1 deletion billing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"@sentry/serverless": "^7.74.1",
"@ucanto/interface": "^10.0.1",
"@ucanto/server": "^10.0.0",
"@web3-storage/capabilities": "^13.3.0",
"@web3-storage/capabilities": "^13.3.1",
"big.js": "^6.2.1",
"multiformats": "^13.1.0",
"p-retry": "^6.2.0",
Expand Down
568 changes: 219 additions & 349 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
"@ipld/dag-ucan": "^3.0.1",
"@tsconfig/node16": "^1.0.3",
"@types/git-rev-sync": "^2.0.0",
"@ucanto/client": "^9.0.0",
"@ucanto/core": "^9.0.1",
"@ucanto/interface": "^9.0.0",
"@ucanto/principal": "^9.0.0",
"@ucanto/transport": "^9.0.0",
"@ucanto/validator": "^9.0.1",
"@ucanto/client": "^9.0.1",
"@ucanto/core": "^10.0.1",
"@ucanto/interface": "^10.0.1",
"@ucanto/principal": "^9.0.1",
"@ucanto/transport": "^9.1.1",
"@ucanto/validator": "^9.0.2",
"@web-std/blob": "^3.0.4",
"@web-std/fetch": "^4.1.0",
"@web3-storage/data-segment": "5.0.0",
Expand Down
5 changes: 4 additions & 1 deletion stacks/upload-api-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export function UploadApiStack({ stack, app }) {

// Get references to constructs created in other stacks
const { carparkBucket } = use(CarparkStack)
const { storeTable, uploadTable, delegationBucket, delegationTable, revocationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable, pieceTable, privateKey } = use(UploadDbStack)
const { allocationTable, storeTable, uploadTable, delegationBucket, delegationTable, revocationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable, pieceTable, privateKey } = use(UploadDbStack)
const { invocationBucket, taskBucket, workflowBucket, ucanStream } = use(UcanInvocationStack)
const { customerTable, spaceDiffTable, spaceSnapshotTable, stripeSecretKey } = use(BillingDbStack)
const { pieceOfferQueue, filecoinSubmitQueue } = use(FilecoinStack)
Expand All @@ -41,6 +41,7 @@ export function UploadApiStack({ stack, app }) {
defaults: {
function: {
permissions: [
allocationTable,
storeTable,
uploadTable,
customerTable,
Expand All @@ -66,6 +67,7 @@ export function UploadApiStack({ stack, app }) {
environment: {
DID: process.env.UPLOAD_API_DID ?? '',
AGGREGATOR_DID,
ALLOCATION_TABLE_NAME: allocationTable.tableName,
STORE_TABLE_NAME: storeTable.tableName,
STORE_BUCKET_NAME: carparkBucket.bucketName,
UPLOAD_TABLE_NAME: uploadTable.tableName,
Expand All @@ -92,6 +94,7 @@ export function UploadApiStack({ stack, app }) {
COMMIT: git.commmit,
STAGE: stack.stage,
ACCESS_SERVICE_URL: getServiceURL(stack, customDomain) ?? '',
UPLOAD_SERVICE_URL: getServiceURL(stack, customDomain) ?? '',
POSTMARK_TOKEN: process.env.POSTMARK_TOKEN ?? '',
PROVIDERS: process.env.PROVIDERS ?? '',
R2_ACCESS_KEY_ID: process.env.R2_ACCESS_KEY_ID ?? '',
Expand Down
8 changes: 8 additions & 0 deletions stacks/upload-db-stack.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Table, Bucket, Config } from 'sst/constructs'

import {
allocationTableProps,
storeTableProps,
uploadTableProps,
consumerTableProps,
Expand Down Expand Up @@ -31,6 +32,12 @@ export function UploadDbStack({ stack, app }) {
// TODO: we should look into creating a trust layer for content claims
const contentClaimsPrivateKey = new Config.Secret(stack, 'CONTENT_CLAIMS_PRIVATE_KEY')

/**
* The allocation table tracks allocated multihashes per space.
* Used by the blob/* service capabilities.
*/
const allocationTable = new Table(stack, 'allocation', allocationTableProps)

/**
* This table takes a stored CAR and makes an entry in the store table
* Used by the store/* service capabilities.
Expand Down Expand Up @@ -99,6 +106,7 @@ export function UploadDbStack({ stack, app }) {
const spaceMetricsTable = new Table(stack, 'space-metrics', spaceMetricsTableProps)

return {
allocationTable,
storeTable,
uploadTable,
pieceTable,
Expand Down
45 changes: 42 additions & 3 deletions upload-api/functions/ucan-invocation-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import { createUcantoServer } from '../service.js'
import { Config } from 'sst/node/config'
import { CAR, Legacy, Codec } from '@ucanto/transport'
import { Email } from '../email.js'
import { createTasksStorage } from '../stores/tasks.js'
import { createReceiptsStorage } from '../stores/receipts.js'
import { createAllocationsStorage } from '../stores/allocations.js'
import { createBlobsStorage, composeblobStoragesWithOrderedHas } from '../stores/blobs.js'
import { useProvisionStore } from '../stores/provisions.js'
import { useSubscriptionsStore } from '../stores/subscriptions.js'
import { createDelegationsTable } from '../tables/delegations.js'
Expand All @@ -39,6 +43,7 @@ import { createSpaceDiffStore } from '@web3-storage/w3infra-billing/tables/space
import { createSpaceSnapshotStore } from '@web3-storage/w3infra-billing/tables/space-snapshot.js'
import { useUsageStore } from '../stores/usage.js'
import { createStripeBillingProvider } from '../billing.js'
import { createTasksScheduler } from '../scheduler.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -99,6 +104,7 @@ export async function ucanInvocationRouter(request) {
storeTableName,
storeBucketName,
uploadTableName,
allocationTableName,
consumerTableName,
customerTableName,
subscriptionTableName,
Expand All @@ -122,6 +128,7 @@ export async function ucanInvocationRouter(request) {
aggregatorDid,
dealTrackerDid,
dealTrackerUrl,
uploadServiceURL,
pieceOfferQueueUrl,
filecoinSubmitQueueUrl,
requirePaymentPlan,
Expand All @@ -144,6 +151,22 @@ export async function ucanInvocationRouter(request) {
const { PRIVATE_KEY, STRIPE_SECRET_KEY } = Config
const serviceSigner = getServiceSigner({ did: UPLOAD_API_DID, privateKey: PRIVATE_KEY })

const tasksStorage = createTasksStorage(AWS_REGION, invocationBucketName, workflowBucketName)
const receiptsStorage = createReceiptsStorage(AWS_REGION, taskBucketName, invocationBucketName, workflowBucketName)
const allocationsStorage = createAllocationsStorage(AWS_REGION, allocationTableName, {
endpoint: dbEndpoint,
})
const blobsStorage = composeblobStoragesWithOrderedHas(
createBlobsStorage(R2_REGION, carparkBucketName, {
endpoint: carparkBucketEndpoint,
credentials: {
accessKeyId: carparkBucketAccessKeyId,
secretAccessKey: carparkBucketSecretAccessKey,
},
}),
createBlobsStorage(AWS_REGION, storeBucketName),
)

const invocationBucket = createInvocationStore(
AWS_REGION,
invocationBucketName
Expand Down Expand Up @@ -172,16 +195,29 @@ export async function ucanInvocationRouter(request) {
const spaceSnapshotStore = createSpaceSnapshotStore({ region: AWS_REGION }, { tableName: spaceSnapshotTableName })
const usageStorage = useUsageStore({ spaceDiffStore, spaceSnapshotStore })

const connection = getServiceConnection({
const dealTrackerConnection = getServiceConnection({
did: dealTrackerDid,
url: dealTrackerUrl
})
const selfConnection = getServiceConnection({
did: serviceSigner.did(),
url: uploadServiceURL
})
const tasksScheduler = createTasksScheduler(() => selfConnection)

const server = createUcantoServer(serviceSigner, {
codec,
allocationsStorage,
blobsStorage,
tasksStorage,
receiptsStorage,
tasksScheduler,
getServiceConnection: () => selfConnection,
// TODO: to be deprecated with `store/*` protocol
storeTable: createStoreTable(AWS_REGION, storeTableName, {
endpoint: dbEndpoint,
}),
// TODO: to be deprecated with `store/*` protocol
carStoreBucket: composeCarStoresWithOrderedHas(
createCarStore(AWS_REGION, storeBucketName),
createCarStore(R2_REGION, carparkBucketName, {
Expand All @@ -192,6 +228,7 @@ export async function ucanInvocationRouter(request) {
},
}),
),
// TODO: to be deprecated with `store/*` protocol
dudewhereBucket: createDudewhereStore(R2_REGION, R2_DUDEWHERE_BUCKET_NAME, {
endpoint: R2_ENDPOINT,
credentials: {
Expand All @@ -218,10 +255,10 @@ export async function ucanInvocationRouter(request) {
pieceOfferQueue: createPieceOfferQueueClient({ region: AWS_REGION }, { queueUrl: pieceOfferQueueUrl }),
filecoinSubmitQueue: createFilecoinSubmitQueueClient({ region: AWS_REGION }, { queueUrl: filecoinSubmitQueueUrl }),
dealTrackerService: {
connection,
connection: dealTrackerConnection,
invocationConfig: {
issuer: serviceSigner,
audience: connection.id,
audience: dealTrackerConnection.id,
with: serviceSigner.did()
}
},
Expand Down Expand Up @@ -316,6 +353,7 @@ function getLambdaEnv () {
storeTableName: mustGetEnv('STORE_TABLE_NAME'),
storeBucketName: mustGetEnv('STORE_BUCKET_NAME'),
uploadTableName: mustGetEnv('UPLOAD_TABLE_NAME'),
allocationTableName: mustGetEnv('ALLOCATION_TABLE_NAME'),
consumerTableName: mustGetEnv('CONSUMER_TABLE_NAME'),
customerTableName: mustGetEnv('CUSTOMER_TABLE_NAME'),
subscriptionTableName: mustGetEnv('SUBSCRIPTION_TABLE_NAME'),
Expand All @@ -339,6 +377,7 @@ function getLambdaEnv () {
postmarkToken: mustGetEnv('POSTMARK_TOKEN'),
providers: mustGetEnv('PROVIDERS'),
accessServiceURL: mustGetEnv('ACCESS_SERVICE_URL'),
uploadServiceURL: mustGetEnv('UPLOAD_SERVICE_URL'),
aggregatorDid: mustGetEnv('AGGREGATOR_DID'),
requirePaymentPlan: (process.env.REQUIRE_PAYMENT_PLAN === 'true'),
dealTrackerDid: mustGetEnv('DEAL_TRACKER_DID'),
Expand Down
9 changes: 5 additions & 4 deletions upload-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
"@ucanto/transport": "^9.1.1",
"@ucanto/validator": "^9.0.2",
"@web-std/fetch": "^4.1.0",
"@web3-storage/access": "^18.3.0",
"@web3-storage/capabilities": "^13.3.0",
"@web3-storage/access": "^18.3.1",
"@web3-storage/capabilities": "^13.3.1",
"@web3-storage/did-mailto": "^2.1.0",
"@web3-storage/upload-api": "^9.0.1",
"@web3-storage/upload-api": "^9.1.5",
"multiformats": "^13.1.0",
"nanoid": "^5.0.2",
"preact": "^10.14.1",
Expand Down Expand Up @@ -65,7 +65,8 @@
"eslintConfig": {
"rules": {
"unicorn/consistent-destructuring": "off",
"unicorn/prefer-array-flat-map": "off"
"unicorn/prefer-array-flat-map": "off",
"unicorn/no-useless-undefined": "off"
}
}
}
46 changes: 46 additions & 0 deletions upload-api/scheduler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* @typedef {import('@web3-storage/upload-api/types').TasksScheduler} TasksSchedulerInterface
* @typedef {import('@web3-storage/upload-api/types').Service} Service
* @typedef {import('@ucanto/interface').ConnectionView<Service>} Connection
* @typedef {import('@ucanto/interface').ServiceInvocation} ServiceInvocation
* @typedef {import('@ucanto/interface').Failure} Failure
* @typedef {import('@ucanto/interface').Unit} Unit
* @typedef {import('@ucanto/interface').Result<Unit, Failure>} Result
*/

/**
* @param {() => Connection} getServiceConnection
*/
export const createTasksScheduler = (getServiceConnection) => new TasksScheduler(getServiceConnection)

/**
* @implements {TasksSchedulerInterface}
*/
export class TasksScheduler {
/**
*
* @param {() => Connection} getServiceConnection
*/
constructor (getServiceConnection) {
this.getServiceConnection = getServiceConnection
}

/**
* @param {ServiceInvocation} invocation
* @returns {Promise<Result>}
*/
async schedule(invocation) {
const connection = this.getServiceConnection()
// This performs a HTTP Request to the Service URL.
// upload-api service URL stores received invocations and produced
// receipts on the server.
const [res] = await connection.execute(invocation)

if (res.out.error) {
return res.out
}
return {
ok: {},
}
}
}
Loading

0 comments on commit 13566fe

Please sign in to comment.