Skip to content

Commit

Permalink
feat: upgrade storefront api with filecoin offer from bucket event in…
Browse files Browse the repository at this point in the history
…stead of directly compute (#343)

This PR intends to
[upgrade](storacha/w3up#1332) storefront
package. This enables `filecoin/offer` invocations lead to messages to
the submit queue, where message consumer will compute Piece for the
content. The main goal of this is to have the client to compute Filecoin
Pieces and offer them, instead of pieces being directly computed from
bucket event and written into the DB (skipping the submit validation).

Note that:
- given we will need to deal with client upgrades and old clients still
running for the time being, I opted in this PR to still keep a version
of the bucket event in place. However, instead of it running computation
of piece, it also triggers `filecoin/offer` so that we can keep old
clients running where `filecoin/offer` is not invoked. This of course
means that we may need to run Piece computation twice for the old path.
However, looking at our AWS bill, we paid 200 USD in
February for lambda execution of whole w3infra. This assumes then that we will
not incur a lot of additional cost while making migration much simpler
- datastore currently implements a composed store with S3+R2, which
allows us to move on an iterative approach where we can deploy first
step without client upgrade, while we can iterate on writes to anywhere
with the overall picture.
  • Loading branch information
vasco-santos authored Apr 2, 2024
1 parent 5d71aaa commit 3fd05a8
Show file tree
Hide file tree
Showing 19 changed files with 368 additions and 381 deletions.
29 changes: 27 additions & 2 deletions filecoin/functions/handle-filecoin-submit-message.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as Sentry from '@sentry/serverless'
import * as storefrontEvents from '@web3-storage/filecoin-api/storefront/events'

import { createPieceTable } from '../store/piece.js'
import { createDataStore, composeDataStoresWithOrderedStream } from '../store/data.js'
import { decodeMessage } from '../queue/filecoin-submit-queue.js'
import { mustGetEnv } from './utils.js'

Expand All @@ -13,6 +14,7 @@ Sentry.AWSLambda.init({
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'
const R2_REGION = process.env.R2_REGION || 'auto'

/**
* Get EventRecord from the SQS Event triggering the handler.
Expand All @@ -34,9 +36,26 @@ async function handleFilecoinSubmitMessage (sqsEvent) {
})

// create context
const { pieceTableName } = getEnv()
const {
pieceTableName,
s3BucketName,
r2BucketName,
r2BucketEndpoint,
r2BucketAccessKeyId,
r2BucketSecretAccessKey
} = getEnv()
const context = {
pieceStore: createPieceTable(AWS_REGION, pieceTableName)
pieceStore: createPieceTable(AWS_REGION, pieceTableName),
dataStore: composeDataStoresWithOrderedStream(
createDataStore(AWS_REGION, s3BucketName),
createDataStore(R2_REGION, r2BucketName, {
endpoint: r2BucketEndpoint,
credentials: {
accessKeyId: r2BucketAccessKeyId,
secretAccessKey: r2BucketSecretAccessKey,
},
})
)
}

const { ok, error } = await storefrontEvents.handleFilecoinSubmitMessage(context, record)
Expand All @@ -59,6 +78,12 @@ async function handleFilecoinSubmitMessage (sqsEvent) {
function getEnv () {
return {
pieceTableName: mustGetEnv('PIECE_TABLE_NAME'),
// carpark buckets - CAR file bytes may be found here with keys like {cid}/{cid}.car
s3BucketName: mustGetEnv('STORE_BUCKET_NAME'),
r2BucketName: mustGetEnv('R2_CARPARK_BUCKET_NAME'),
r2BucketEndpoint: mustGetEnv('R2_ENDPOINT'),
r2BucketAccessKeyId: mustGetEnv('R2_ACCESS_KEY_ID'),
r2BucketSecretAccessKey: mustGetEnv('R2_SECRET_ACCESS_KEY'),
}
}

Expand Down
39 changes: 19 additions & 20 deletions filecoin/functions/handle-piece-insert-to-content-claim.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import * as Sentry from '@sentry/serverless'
import { Config } from 'sst/node/config'
import { unmarshall } from '@aws-sdk/util-dynamodb'
import { Piece } from '@web3-storage/data-segment'
import { CID } from 'multiformats/cid'
import * as Delegation from '@ucanto/core/delegation'
import { fromString } from 'uint8arrays/from-string'
import * as DID from '@ipld/dag-ucan/did'

import { reportPieceCid } from '../index.js'
import * as storefrontEvents from '@web3-storage/filecoin-api/storefront/events'

import { decodeRecord } from '../store/piece.js'
import { getServiceConnection, getServiceSigner } from '../service.js'
import { mustGetEnv } from './utils.js'

Expand Down Expand Up @@ -35,11 +35,10 @@ async function pieceCidReport (event) {

/** @type {PieceStoreRecord} */
// @ts-expect-error can't figure out type of new
const pieceRecord = unmarshall(records[0].new)
const piece = Piece.fromString(pieceRecord.piece).link
const content = CID.parse(pieceRecord.content)
const storeRecord = unmarshall(records[0].new)
const record = decodeRecord(storeRecord)

const claimsServiceConnection = getServiceConnection({
const connection = getServiceConnection({
did: contentClaimsDid,
url: contentClaimsUrl
})
Expand All @@ -56,24 +55,24 @@ async function pieceCidReport (event) {
claimsIssuer = claimsIssuer.withDID(DID.parse(contentClaimsDid).did())
}

const { ok, error } = await reportPieceCid({
piece,
content,
claimsServiceConnection,
claimsInvocationConfig: /** @type {import('../types.js').ClaimsInvocationConfig} */ ({
issuer: claimsIssuer,
audience: claimsServiceConnection.id,
with: claimsIssuer.did(),
proofs: claimsProofs
})
})
const context = {
claimsService: {
connection,
invocationConfig: {
issuer: claimsIssuer,
audience: connection.id,
with: claimsIssuer.did(),
proofs: claimsProofs
},
},
}

const { ok, error } = await storefrontEvents.handlePieceInsertToEquivalencyClaim(context, record)
if (error) {
console.error(error)

return {
statusCode: 500,
body: error.message || 'failed to add aggregate'
body: error.message || 'failed to handle piece insert event to content claim'
}
}

Expand Down
74 changes: 58 additions & 16 deletions filecoin/functions/piece-cid-compute.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
import { S3Client } from '@aws-sdk/client-s3'
import * as Sentry from '@sentry/serverless'
import { S3Client } from '@aws-sdk/client-s3'
import { Config } from 'sst/node/config'
import { Storefront } from '@web3-storage/filecoin-client'
import * as Delegation from '@ucanto/core/delegation'
import { fromString } from 'uint8arrays/from-string'
import * as DID from '@ipld/dag-ucan/did'

import { computePieceCid } from '../index.js'
import { getServiceConnection, getServiceSigner } from '../service.js'
import { mustGetEnv } from './utils.js'
import { createPieceTable } from '../store/piece.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
* Get EventRecord from the SQS Event triggering the handler
* Get EventRecord from the SQS Event triggering the handler.
* Trigger `filecoin/offer` from bucket event
*
* @param {import('aws-lambda').SQSEvent} event
*/
async function computeHandler (event) {
const {
pieceTableName,
disablePieceCidCompute,
did
} = getEnv()
const { PRIVATE_KEY: privateKey } = Config
const { storefrontDid, storefrontUrl, did, storefrontProof, disablePieceCidCompute } = getEnv()

if (disablePieceCidCompute) {
const body = 'piece cid computation is disabled'
Expand All @@ -34,21 +35,46 @@ async function computeHandler (event) {
}
}

// Create context
let storefrontSigner = getServiceSigner({
privateKey
})
const connection = getServiceConnection({
did: storefrontDid,
url: storefrontUrl
})
const storefrontServiceProofs = []
if (storefrontProof) {
const proof = await Delegation.extract(fromString(storefrontProof, 'base64pad'))
if (!proof.ok) throw new Error('failed to extract proof', { cause: proof.error })
storefrontServiceProofs.push(proof.ok)
} else {
// if no proofs, we must be using the service private key to sign
storefrontSigner = storefrontSigner.withDID(DID.parse(did).did())
}
const storefrontService = {
connection,
invocationConfig: {
issuer: storefrontSigner,
with: storefrontSigner.did(),
audience: storefrontSigner,
proofs: storefrontServiceProofs
},
}

// Decode record
const record = parseEvent(event)
if (!record) {
throw new Error('Unexpected sqs record format')
}

const s3Client = new S3Client({ region: record.bucketRegion })
const pieceTable = createPieceTable(AWS_REGION, pieceTableName)

// Compute piece for record
const { error, ok } = await computePieceCid({
record,
s3Client,
pieceTable,
group: did
})

if (error) {
console.error(error)

Expand All @@ -58,9 +84,23 @@ async function computeHandler (event) {
}
}

// Invoke `filecoin/offer`
const filecoinSubmitInv = await Storefront.filecoinOffer(
storefrontService.invocationConfig,
ok.content,
ok.piece,
{ connection: storefrontService.connection }
)

if (filecoinSubmitInv.out.error) {
return {
statusCode: 500,
body: filecoinSubmitInv.out.error,
}
}

return {
statusCode: 200,
body: ok
}
}

Expand All @@ -71,8 +111,10 @@ export const handler = Sentry.AWSLambda.wrapHandler(computeHandler)
*/
function getEnv () {
return {
pieceTableName: mustGetEnv('PIECE_TABLE_NAME'),
did: mustGetEnv('DID'),
storefrontDid: mustGetEnv('STOREFRONT_DID'),
storefrontUrl: mustGetEnv('STOREFRONT_URL'),
storefrontProof: process.env.PROOF,
disablePieceCidCompute: process.env.DISABLE_PIECE_CID_COMPUTE === 'true'
}
}
Expand Down
62 changes: 5 additions & 57 deletions filecoin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import * as Hasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash'
import * as Digest from 'multiformats/hashes/digest'
import { Piece } from '@web3-storage/data-segment'
import { CID } from 'multiformats/cid'
import { Assert } from '@web3-storage/content-claims/capability'

import { GetCarFailed, ComputePieceFailed } from './errors.js'

Expand All @@ -27,14 +26,10 @@ import { GetCarFailed, ComputePieceFailed } from './errors.js'
* @param {object} props
* @param {EventRecord} props.record
* @param {S3Client} props.s3Client
* @param {string} props.group
* @param {import('@web3-storage/filecoin-api/storefront/api').PieceStore} props.pieceTable
*/
export async function computePieceCid({
record,
s3Client,
pieceTable,
group
s3Client
}) {
const key = record.key
// CIDs in carpark are in format `${carCid}/${carCid}.car`
Expand Down Expand Up @@ -80,57 +75,10 @@ export async function computePieceCid({
}
}

// Write to table
const insertedAt = (new Date()).toISOString()
const { ok, error } = await pieceTable.put({
content: CID.parse(cidString),
piece: piece.link,
status: 'submitted',
insertedAt,
updatedAt: insertedAt,
group
})

return {
ok,
error
}
}

/**
* @param {object} props
* @param {import('@web3-storage/data-segment').PieceLink} props.piece
* @param {import('multiformats').CID} props.content
* @param {import('@ucanto/principal/ed25519').ConnectionView<any>} props.claimsServiceConnection
* @param {import('./types.js').ClaimsInvocationConfig} props.claimsInvocationConfig
*/
export async function reportPieceCid ({
piece,
content,
claimsServiceConnection,
claimsInvocationConfig
}) {
// Add claim for reading
const claimResult = await Assert.equals
.invoke({
issuer: claimsInvocationConfig.issuer,
audience: claimsInvocationConfig.audience,
with: claimsInvocationConfig.with,
nb: {
content,
equals: piece
},
expiration: Infinity,
proofs: claimsInvocationConfig.proofs
})
.execute(claimsServiceConnection)
if (claimResult.out.error) {
return {
error: claimResult.out.error
}
}

return {
ok: {},
ok: {
content: CID.parse(cidString),
piece: piece.link
},
}
}
6 changes: 3 additions & 3 deletions filecoin/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@
"@aws-sdk/client-dynamodb": "^3.515.0",
"@aws-sdk/client-s3": "^3.515.0",
"@aws-sdk/client-sqs": "^3.515.0",
"@ipld/car": "^5.2.6",
"@sentry/serverless": "^7.74.1",
"@ucanto/client": "^9.0.0",
"@ucanto/core": "^9.0.1",
"@ucanto/interface": "^9.0.0",
"@ucanto/principal": "^9.0.0",
"@ucanto/transport": "^9.0.0",
"@web3-storage/content-claims": "^3.2.0",
"@web3-storage/data-segment": "^5.0.0",
"@web3-storage/filecoin-api": "^4.2.0",
"@web3-storage/filecoin-api": "^4.6.0",
"@web3-storage/filecoin-client": "^3.1.3",
"fr32-sha2-256-trunc254-padded-binary-tree-multihash": "^3.1.1",
"multiformats": "^13.1.0",
"p-retry": "^6.2.0",
"uint8arrays": "4.0.6"
},
"devDependencies": {
"@ipld/car": "^5.2.6",
"@web-std/blob": "3.0.5",
"ava": "^4.3.3",
"constructs": "*",
Expand Down
Loading

0 comments on commit 3fd05a8

Please sign in to comment.