diff --git a/filecoin/functions/handle-filecoin-submit-message.js b/filecoin/functions/handle-filecoin-submit-message.js index 3706f2b5..c29d69c7 100644 --- a/filecoin/functions/handle-filecoin-submit-message.js +++ b/filecoin/functions/handle-filecoin-submit-message.js @@ -3,6 +3,7 @@ import * as Sentry from '@sentry/serverless' import * as storefrontEvents from '@web3-storage/filecoin-api/storefront/events' import { createPieceTable } from '../store/piece.js' +import { createDataStore, composeDataStoresWithOrderedStream } from '../store/data.js' import { decodeMessage } from '../queue/filecoin-submit-queue.js' import { mustGetEnv } from './utils.js' @@ -13,6 +14,7 @@ Sentry.AWSLambda.init({ }) const AWS_REGION = process.env.AWS_REGION || 'us-west-2' +const R2_REGION = process.env.R2_REGION || 'auto' /** * Get EventRecord from the SQS Event triggering the handler. @@ -34,12 +36,30 @@ async function handleFilecoinSubmitMessage (sqsEvent) { }) // create context - const { pieceTableName } = getEnv() + const { + pieceTableName, + s3BucketName, + r2BucketName, + r2BucketEndpoint, + r2BucketAccessKeyId, + r2BucketSecretAccessKey + } = getEnv() const context = { - pieceStore: createPieceTable(AWS_REGION, pieceTableName) + pieceStore: createPieceTable(AWS_REGION, pieceTableName), + dataStore: composeDataStoresWithOrderedStream( + createDataStore(AWS_REGION, s3BucketName), + createDataStore(R2_REGION, r2BucketName, { + endpoint: r2BucketEndpoint, + credentials: { + accessKeyId: r2BucketAccessKeyId, + secretAccessKey: r2BucketSecretAccessKey, + }, + }) + ) } const { ok, error } = await storefrontEvents.handleFilecoinSubmitMessage(context, record) + console.log('handleFilecoinSubmitMessage - ok, error', ok, error) if (error) { return { statusCode: 500, @@ -59,6 +79,12 @@ async function handleFilecoinSubmitMessage (sqsEvent) { function getEnv () { return { pieceTableName: mustGetEnv('PIECE_TABLE_NAME'), + // carpark buckets - CAR file bytes may be found here with keys like {cid}/{cid}.car + s3BucketName: mustGetEnv('STORE_BUCKET_NAME'), + r2BucketName: mustGetEnv('R2_CARPARK_BUCKET_NAME'), + r2BucketEndpoint: mustGetEnv('R2_ENDPOINT'), + r2BucketAccessKeyId: mustGetEnv('R2_ACCESS_KEY_ID'), + r2BucketSecretAccessKey: mustGetEnv('R2_SECRET_ACCESS_KEY'), } } diff --git a/filecoin/functions/handle-piece-insert-to-content-claim.js b/filecoin/functions/handle-piece-insert-to-content-claim.js index 25081b6d..f62f9f26 100644 --- a/filecoin/functions/handle-piece-insert-to-content-claim.js +++ b/filecoin/functions/handle-piece-insert-to-content-claim.js @@ -1,13 +1,13 @@ import * as Sentry from '@sentry/serverless' import { Config } from 'sst/node/config' import { unmarshall } from '@aws-sdk/util-dynamodb' -import { Piece } from '@web3-storage/data-segment' -import { CID } from 'multiformats/cid' import * as Delegation from '@ucanto/core/delegation' import { fromString } from 'uint8arrays/from-string' import * as DID from '@ipld/dag-ucan/did' -import { reportPieceCid } from '../index.js' +import * as storefrontEvents from '@web3-storage/filecoin-api/storefront/events' + +import { decodeRecord } from '../store/piece.js' import { getServiceConnection, getServiceSigner } from '../service.js' import { mustGetEnv } from './utils.js' @@ -35,11 +35,10 @@ async function pieceCidReport (event) { /** @type {PieceStoreRecord} */ // @ts-expect-error can't figure out type of new - const pieceRecord = unmarshall(records[0].new) - const piece = Piece.fromString(pieceRecord.piece).link - const content = CID.parse(pieceRecord.content) + const storeRecord = unmarshall(records[0].new) + const record = decodeRecord(storeRecord) - const claimsServiceConnection = getServiceConnection({ + const connection = getServiceConnection({ did: contentClaimsDid, url: contentClaimsUrl }) @@ -56,24 +55,24 @@ async function pieceCidReport (event) { claimsIssuer = claimsIssuer.withDID(DID.parse(contentClaimsDid).did()) } - const { ok, error } = await reportPieceCid({ - piece, - content, - claimsServiceConnection, - claimsInvocationConfig: /** @type {import('../types.js').ClaimsInvocationConfig} */ ({ - issuer: claimsIssuer, - audience: claimsServiceConnection.id, - with: claimsIssuer.did(), - proofs: claimsProofs - }) - }) + const context = { + claimsService: { + connection, + invocationConfig: { + issuer: claimsIssuer, + audience: connection.id, + with: claimsIssuer.did(), + proofs: claimsProofs + }, + }, + } + const { ok, error } = await storefrontEvents.handlePieceInsertToEquivalencyClaim(context, record) if (error) { console.error(error) - return { statusCode: 500, - body: error.message || 'failed to add aggregate' + body: error.message || 'failed to handle piece insert event to content claim' } } diff --git a/filecoin/functions/piece-cid-compute.js b/filecoin/functions/piece-cid-compute.js index 483bfccc..89cb64ec 100644 --- a/filecoin/functions/piece-cid-compute.js +++ b/filecoin/functions/piece-cid-compute.js @@ -1,9 +1,14 @@ -import { S3Client } from '@aws-sdk/client-s3' import * as Sentry from '@sentry/serverless' +import { S3Client } from '@aws-sdk/client-s3' +import { Config } from 'sst/node/config' +import { Storefront } from '@web3-storage/filecoin-client' +import * as Delegation from '@ucanto/core/delegation' +import { fromString } from 'uint8arrays/from-string' +import * as DID from '@ipld/dag-ucan/did' import { computePieceCid } from '../index.js' +import { getServiceConnection, getServiceSigner } from '../service.js' import { mustGetEnv } from './utils.js' -import { createPieceTable } from '../store/piece.js' Sentry.AWSLambda.init({ environment: process.env.SST_STAGE, @@ -11,19 +16,15 @@ Sentry.AWSLambda.init({ tracesSampleRate: 1.0, }) -const AWS_REGION = process.env.AWS_REGION || 'us-west-2' - /** - * Get EventRecord from the SQS Event triggering the handler + * Get EventRecord from the SQS Event triggering the handler. + * Trigger `filecoin/offer` from bucket event * * @param {import('aws-lambda').SQSEvent} event */ async function computeHandler (event) { - const { - pieceTableName, - disablePieceCidCompute, - did - } = getEnv() + const { PRIVATE_KEY: privateKey } = Config + const { storefrontDid, storefrontUrl, did, storefrontProof, disablePieceCidCompute } = getEnv() if (disablePieceCidCompute) { const body = 'piece cid computation is disabled' @@ -34,21 +35,46 @@ async function computeHandler (event) { } } + // Create context + let storefrontSigner = getServiceSigner({ + privateKey + }) + const connection = getServiceConnection({ + did: storefrontDid, + url: storefrontUrl + }) + const storefrontServiceProofs = [] + if (storefrontProof) { + const proof = await Delegation.extract(fromString(storefrontProof, 'base64pad')) + if (!proof.ok) throw new Error('failed to extract proof', { cause: proof.error }) + storefrontServiceProofs.push(proof.ok) + } else { + // if no proofs, we must be using the service private key to sign + storefrontSigner = storefrontSigner.withDID(DID.parse(did).did()) + } + const storefrontService = { + connection, + invocationConfig: { + issuer: storefrontSigner, + with: storefrontSigner.did(), + audience: storefrontSigner, + proofs: storefrontServiceProofs + }, + } + + // Decode record const record = parseEvent(event) if (!record) { throw new Error('Unexpected sqs record format') } const s3Client = new S3Client({ region: record.bucketRegion }) - const pieceTable = createPieceTable(AWS_REGION, pieceTableName) + // Compute piece for record const { error, ok } = await computePieceCid({ record, s3Client, - pieceTable, - group: did }) - if (error) { console.error(error) @@ -58,9 +84,24 @@ async function computeHandler (event) { } } + // Invoke `filecoin/offer` + const filecoinSubmitInv = await Storefront.filecoinOffer( + storefrontService.invocationConfig, + ok.content, + ok.piece, + { connection: storefrontService.connection } + ) + console.log('piece cid compute', record.key, filecoinSubmitInv.out.error, filecoinSubmitInv.out.ok) + + if (filecoinSubmitInv.out.error) { + return { + statusCode: 500, + body: filecoinSubmitInv.out.error, + } + } + return { statusCode: 200, - body: ok } } @@ -71,8 +112,10 @@ export const handler = Sentry.AWSLambda.wrapHandler(computeHandler) */ function getEnv () { return { - pieceTableName: mustGetEnv('PIECE_TABLE_NAME'), did: mustGetEnv('DID'), + storefrontDid: mustGetEnv('STOREFRONT_DID'), + storefrontUrl: mustGetEnv('STOREFRONT_URL'), + storefrontProof: process.env.PROOF, disablePieceCidCompute: process.env.DISABLE_PIECE_CID_COMPUTE === 'true' } } diff --git a/filecoin/index.js b/filecoin/index.js index f26d9ee1..5c596ecd 100644 --- a/filecoin/index.js +++ b/filecoin/index.js @@ -4,7 +4,6 @@ import * as Hasher from 'fr32-sha2-256-trunc254-padded-binary-tree-multihash' import * as Digest from 'multiformats/hashes/digest' import { Piece } from '@web3-storage/data-segment' import { CID } from 'multiformats/cid' -import { Assert } from '@web3-storage/content-claims/capability' import { GetCarFailed, ComputePieceFailed } from './errors.js' @@ -27,14 +26,10 @@ import { GetCarFailed, ComputePieceFailed } from './errors.js' * @param {object} props * @param {EventRecord} props.record * @param {S3Client} props.s3Client - * @param {string} props.group - * @param {import('@web3-storage/filecoin-api/storefront/api').PieceStore} props.pieceTable */ export async function computePieceCid({ record, - s3Client, - pieceTable, - group + s3Client }) { const key = record.key // CIDs in carpark are in format `${carCid}/${carCid}.car` @@ -80,57 +75,10 @@ export async function computePieceCid({ } } - // Write to table - const insertedAt = (new Date()).toISOString() - const { ok, error } = await pieceTable.put({ - content: CID.parse(cidString), - piece: piece.link, - status: 'submitted', - insertedAt, - updatedAt: insertedAt, - group - }) - - return { - ok, - error - } -} - -/** - * @param {object} props - * @param {import('@web3-storage/data-segment').PieceLink} props.piece - * @param {import('multiformats').CID} props.content - * @param {import('@ucanto/principal/ed25519').ConnectionView} props.claimsServiceConnection - * @param {import('./types.js').ClaimsInvocationConfig} props.claimsInvocationConfig - */ -export async function reportPieceCid ({ - piece, - content, - claimsServiceConnection, - claimsInvocationConfig -}) { - // Add claim for reading - const claimResult = await Assert.equals - .invoke({ - issuer: claimsInvocationConfig.issuer, - audience: claimsInvocationConfig.audience, - with: claimsInvocationConfig.with, - nb: { - content, - equals: piece - }, - expiration: Infinity, - proofs: claimsInvocationConfig.proofs - }) - .execute(claimsServiceConnection) - if (claimResult.out.error) { - return { - error: claimResult.out.error - } - } - return { - ok: {}, + ok: { + content: CID.parse(cidString), + piece: piece.link + }, } } diff --git a/filecoin/package.json b/filecoin/package.json index 07ca7ba6..d287a057 100644 --- a/filecoin/package.json +++ b/filecoin/package.json @@ -9,22 +9,21 @@ "@aws-sdk/client-dynamodb": "^3.515.0", "@aws-sdk/client-s3": "^3.515.0", "@aws-sdk/client-sqs": "^3.515.0", + "@ipld/car": "^5.2.6", "@sentry/serverless": "^7.74.1", "@ucanto/client": "^9.0.0", "@ucanto/core": "^9.0.1", "@ucanto/interface": "^9.0.0", "@ucanto/principal": "^9.0.0", "@ucanto/transport": "^9.0.0", - "@web3-storage/content-claims": "^3.2.0", "@web3-storage/data-segment": "^5.0.0", - "@web3-storage/filecoin-api": "^4.2.0", + "@web3-storage/filecoin-api": "^4.6.0", "@web3-storage/filecoin-client": "^3.1.3", "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "^3.1.1", "multiformats": "^13.1.0", "uint8arrays": "4.0.6" }, "devDependencies": { - "@ipld/car": "^5.2.6", "@web-std/blob": "3.0.5", "ava": "^4.3.3", "constructs": "*", diff --git a/filecoin/store/data.js b/filecoin/store/data.js new file mode 100644 index 00000000..4e4c2285 --- /dev/null +++ b/filecoin/store/data.js @@ -0,0 +1,161 @@ +import { + S3Client, + GetObjectCommand, + PutObjectCommand +} from '@aws-sdk/client-s3' +import * as CAR from '@ucanto/transport/car' +import { sha256 } from 'multiformats/hashes/sha2' +import { CID } from 'multiformats/cid' +import * as raw from 'multiformats/codecs/raw' +import { CarWriter } from '@ipld/car' +import pRetry from 'p-retry' +import { StoreOperationFailed, RecordNotFound } from '@web3-storage/filecoin-api/errors' + +/** + * Abstraction layer with Factory to perform operations on bucket storing + * data receipts. + * + * @param {string} region + * @param {string} bucketName + * @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options] + */ +export function createDataStore(region, bucketName, options = {}) { + const s3client = new S3Client({ + region, + ...options, + }) + return useDataStore(s3client, bucketName) +} + +/** + * @param {S3Client} s3client + * @param {string} bucketName + * @returns {import('@web3-storage/filecoin-api/storefront/api').DataStore} + */ +export const useDataStore = (s3client, bucketName) => { + return { + // Only used for testing storing a CAR + // until we hook up claims to look for data + put: async (bytes) => { + const hash = await sha256.digest(bytes) + const root = CID.create(1, raw.code, hash) + + const { writer, out } = CarWriter.create(root) + writer.put({ cid: root, bytes }) + writer.close() + + const chunks = [] + for await (const chunk of out) { + chunks.push(chunk) + } + const blob = new Blob(chunks) + const cid = await CAR.codec.link(new Uint8Array(await blob.arrayBuffer())) + + const putCmd = new PutObjectCommand({ + Bucket: bucketName, + Key: `${cid.toString()}/${cid.toString()}.car`, + Body: bytes + }) + await s3client.send(putCmd) + + return { + ok: {} + } + }, + /** + * Stream Blob bytes for a given invocation. + */ + /** + * + * @param {import('@ucanto/interface').UnknownLink} cid + */ + // @ts-expect-error aws Readable stream types are not good + stream: async (cid) => { + // TODO: probably get from Roundabout from R2? + const getObjectCmd = new GetObjectCommand({ + Bucket: bucketName, + Key: `${cid.toString()}/${cid.toString()}.car`, + }) + let res + + try { + res = await pRetry(() => s3client.send(getObjectCmd), { retries: 3 }) + } catch (/** @type {any} */ error) { + if (error?.$metadata?.httpStatusCode === 404) { + return { + error: new RecordNotFound(`blob ${cid.toString()} not found in store`) + } + } + return { + error: new StoreOperationFailed(error.message) + } + } + + const stream = res.Body + if (!stream) { + return { + error: new RecordNotFound(`blob ${cid.toString()} not found in store`) + } + } + + return { + ok: stream + } + }, + has: async () => { + return { + error: new StoreOperationFailed('no blob should checked by storefront') + } + } + } +} + +/** + * compose many data stores. + * store#stream will check stores in order until 0-1 `ok` result is found. + * + * @param {import('@web3-storage/filecoin-api/storefront/api').DataStore} dataStore + * @param {Array} moreDataStores + * @returns {import('@web3-storage/filecoin-api/storefront/api').DataStore} + */ +export function composeDataStoresWithOrderedStream(dataStore, ...moreDataStores) { + return { + ...dataStore, + stream: composeSome(dataStore.stream, ...moreDataStores.map(s => s.stream.bind(s))), + } +} + +/** + * @typedef {AsyncIterable} Rec + * @typedef {import('@web3-storage/filecoin-api/types').StoreGetError} StoreGetError + * @typedef {import('@ucanto/interface').Result} Result + */ + +/** + * compose async functions that return Promise>. + * The returned function will have the same signature, + * but will try the composed functions in order until one (or none) returns 'ok'. + * + * @template T + * @param {Array<(e: T) => Promise>} streamFunctions + * + */ +function composeSome(...streamFunctions) { + /** + * @param {T} e + */ + return async function (e) { + /** @type {Result | undefined} */ + let result + for (const stream of streamFunctions) { + result = await stream(e) + if (result.ok) { + return result + } + } + if (result === undefined) { + throw new Error('no result received') + } + return result + } +} \ No newline at end of file diff --git a/filecoin/test/compute-piece-cid.test.js b/filecoin/test/compute-piece-cid.test.js index b0388c08..8b527883 100644 --- a/filecoin/test/compute-piece-cid.test.js +++ b/filecoin/test/compute-piece-cid.test.js @@ -2,33 +2,19 @@ import { test } from './helpers/context.js' import { PutObjectCommand } from '@aws-sdk/client-s3' -import { createS3, createBucket, createDynamodDb } from './helpers/resources.js' -import { createDynamoTable, getItemsFromTable } from './helpers/tables.js' +import { createS3, createBucket } from './helpers/resources.js' import { createCar } from './helpers/car.js' import { computePieceCid } from '../index.js' -import { pieceTableProps } from '../store/index.js' -import { createPieceTable } from '../store/piece.js' - -const AWS_REGION = 'us-west-2' test.before(async t => { // S3 const { client, stop: s3Stop } = await createS3({ port: 9000 }) - // DynamoDB - const { - client: dynamoClient, - endpoint: dbEndpoint, - stop: dynamoStop - } = await createDynamodDb({ port: 8000 }) Object.assign(t.context, { s3Client: client, - dbEndpoint, - dynamoClient, stop: async () => { await s3Stop() - await dynamoStop() } }) }) @@ -38,12 +24,8 @@ test.after(async t => { }) test('computes piece CID from a CAR file in the bucket', async t => { - const { tableName, bucketName } = await prepareResources(t.context.dynamoClient, t.context.s3Client) + const { bucketName } = await prepareResources(t.context.s3Client) const { body, checksum, key, piece, link } = await createCar() - const pieceTable = createPieceTable(AWS_REGION, tableName, { - endpoint: t.context.dbEndpoint - }) - await t.context.s3Client.send( new PutObjectCommand({ Bucket: bucketName, @@ -52,7 +34,6 @@ test('computes piece CID from a CAR file in the bucket', async t => { ChecksumSHA256: checksum, }) ) - const group = 'this-group' const record = { bucketName, bucketRegion: 'us-west-2', @@ -62,39 +43,23 @@ test('computes piece CID from a CAR file in the bucket', async t => { const { ok, error } = await computePieceCid({ record, s3Client: t.context.s3Client, - pieceTable, - group }) t.truthy(ok) t.falsy(error) - const storedItems = await getItemsFromTable(t.context.dynamoClient, tableName, { - content: { - ComparisonOperator: 'EQ', - AttributeValueList: [{ S: link.toString() }] - } - }, { - indexName: 'content' - }) - - t.truthy(storedItems) - t.is(storedItems?.length, 1) - t.is(storedItems?.[0].piece, piece.toString()) - t.is(storedItems?.[0].group, group) + t.is(ok?.piece.toString(), piece.toString()) + t.is(ok?.content.toString(), link.toString()) }) /** - * @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient * @param {import("@aws-sdk/client-s3").S3Client} s3Client */ -async function prepareResources (dynamoClient, s3Client) { - const [ tableName, bucketName ] = await Promise.all([ - createDynamoTable(dynamoClient, pieceTableProps), +async function prepareResources (s3Client) { + const [ bucketName ] = await Promise.all([ createBucket(s3Client) ]) return { - tableName, bucketName } } diff --git a/filecoin/test/filecoin-events.test.js b/filecoin/test/filecoin-events.test.js index e260dfb9..dfd5663a 100644 --- a/filecoin/test/filecoin-events.test.js +++ b/filecoin/test/filecoin-events.test.js @@ -104,6 +104,8 @@ for (const [title, unit] of Object.entries(filecoinApiTest.events.storefront)) { // context const storefrontSigner = await Signer.generate() const aggregatorSigner = await Signer.generate() + const claimsSigner = await Signer.generate() + const service = getMockService() const storefrontConnection = getConnection( storefrontSigner, @@ -113,6 +115,7 @@ for (const [title, unit] of Object.entries(filecoinApiTest.events.storefront)) { aggregatorSigner, service ).connection + const claimsConnection = getConnection(claimsSigner, service).connection await unit( { @@ -144,6 +147,14 @@ for (const [title, unit] of Object.entries(filecoinApiTest.events.storefront)) { audience: aggregatorSigner, }, }, + claimsService: { + connection: claimsConnection, + invocationConfig: { + issuer: storefrontSigner, + with: storefrontSigner.did(), + audience: claimsSigner, + }, + }, errorReporter: { catch(error) { t.fail(error.message) diff --git a/filecoin/test/helpers/mocks.js b/filecoin/test/helpers/mocks.js deleted file mode 100644 index 30418dd6..00000000 --- a/filecoin/test/helpers/mocks.js +++ /dev/null @@ -1,34 +0,0 @@ -import * as Server from '@ucanto/server' - -const notImplemented = () => { - throw new Server.Failure('not implemented') -} - -/** - * @param {Partial< - * { assert: Partial } - * >} impl - */ -export function mockService(impl) { - return { - assert: { - equals: withCallCount(impl.assert?.equals ?? notImplemented) - } - } -} - -/** - * @template {Function} T - * @param {T} fn - */ -function withCallCount(fn) { - /** @param {T extends (...args: infer A) => any ? A : never} args */ - const countedFn = (...args) => { - countedFn.called = true - countedFn.callCount++ - return fn(...args) - } - countedFn.called = false - countedFn.callCount = 0 - return countedFn -} diff --git a/filecoin/test/helpers/resources.js b/filecoin/test/helpers/resources.js index e34645e4..b09ecda2 100644 --- a/filecoin/test/helpers/resources.js +++ b/filecoin/test/helpers/resources.js @@ -140,7 +140,7 @@ export const createSQS = async (opts = {}) => { const port = opts.port || 9324 const queue = await pRetry(() => - new Container('softwaremill/elasticmq') + new Container('softwaremill/elasticmq:1.5.4') .withExposedPorts(port) .start() ) diff --git a/filecoin/test/helpers/service-context.js b/filecoin/test/helpers/service-context.js index e093206a..3700383c 100644 --- a/filecoin/test/helpers/service-context.js +++ b/filecoin/test/helpers/service-context.js @@ -12,6 +12,7 @@ import { encodeAgentMessage } from './ucan.js' import { pieceTableProps } from '../../store/index.js' // store clients +import { useDataStore as createDataStoreClient } from '../../store/data.js' import { usePieceTable as createPieceStoreClient } from '../../store/piece.js' import { useTaskStore as createTaskStoreClient } from '../../store/task.js' import { useReceiptStore as createReceiptStoreClient } from '../../store/receipt.js' @@ -26,15 +27,17 @@ import { createClient as createFilecoinSubmitQueueClient } from '../../queue/fil export async function getStores (ctx) { const { dynamoClient, s3Client } = ctx const pieceStore = await createDynamoTable(dynamoClient, pieceTableProps) - const [ invocationBucketName, workflowBucketName ] = await Promise.all([ + const [ invocationBucketName, workflowBucketName, dataStoreBucketName ] = await Promise.all([ + createBucket(s3Client), + createBucket(s3Client), createBucket(s3Client), - createBucket(s3Client) ]) return { pieceStore: createPieceStoreClient(dynamoClient, pieceStore), taskStore: getTaskStoreClient(s3Client, invocationBucketName, workflowBucketName), receiptStore: getReceiptStoreClient(s3Client, invocationBucketName, workflowBucketName), + dataStore: createDataStoreClient(s3Client, dataStoreBucketName) } } diff --git a/filecoin/test/helpers/ucanto.js b/filecoin/test/helpers/ucanto.js index ffe21b8d..c87dadcc 100644 --- a/filecoin/test/helpers/ucanto.js +++ b/filecoin/test/helpers/ucanto.js @@ -1,12 +1,7 @@ import * as Signer from '@ucanto/principal/ed25519' import * as UcantoClient from '@ucanto/client' import { Message } from '@ucanto/core' -import * as Server from '@ucanto/server' import * as CAR from '@ucanto/transport/car' -import { Assert } from '@web3-storage/content-claims/capability' - -import { OperationFailed } from './errors.js' -import { mockService } from './mocks.js' /** * @typedef {import('@ucanto/interface').IssuedInvocation} IssuedInvocation @@ -15,89 +10,6 @@ import { mockService } from './mocks.js' * @typedef {import('@ucanto/interface').Tuple} TupleIssuedInvocation */ -const nop = (/** @type {any} */ invCap) => {} - -/** - * @param {any} serviceProvider - * @param {object} [options] - * @param {(inCap: any) => void} [options.onCall] - * @param {boolean} [options.mustFail] - */ -export async function getClaimsServiceServer (serviceProvider, options = {}) { - const onCall = options.onCall || nop - const equalsStore = new Map() - - const service = mockService({ - assert: { - equals: Server.provide(Assert.equals, async ({ capability, invocation }) => { - const invCap = invocation.capabilities[0] - const { content, equals } = capability.nb - - if (options.mustFail) { - return { - error: new OperationFailed( - 'failed to add to aggregate', - // @ts-ignore wrong dep - invCap.nb?.content - ) - } - } - - equalsStore.set(content.toString(), equals.toString()) - equalsStore.set(equals.toString(), content.toString()) - - onCall(invCap) - - return { - ok: {} - } - }) - } - }) - - const server = Server.create({ - id: serviceProvider, - service, - codec: CAR.inbound, - validateAuthorization: () => ({ ok: {} }) - }) - const connection = UcantoClient.connect({ - id: serviceProvider, - codec: CAR.outbound, - channel: server, - }) - - return { - service, - connection - } -} - -export async function getServiceCtx () { - const storefront = await Signer.generate() - const aggregator = await Signer.generate() - const claims = await Signer.generate() - - return { - storefront: { - did: storefront.did(), - privateKey: Signer.format(storefront), - raw: storefront - }, - aggregator: { - did: aggregator.did(), - privateKey: Signer.format(aggregator), - raw: aggregator - }, - claims: { - did: claims.did(), - privateKey: Signer.format(claims), - raw: claims - } - } -} - - /** * @param {object} source * @param {IssuedInvocation[]} [source.invocations] diff --git a/filecoin/test/report-piece-cid.test.js b/filecoin/test/report-piece-cid.test.js deleted file mode 100644 index 225c91bb..00000000 --- a/filecoin/test/report-piece-cid.test.js +++ /dev/null @@ -1,86 +0,0 @@ -import { test } from './helpers/context.js' - -import pDefer from 'p-defer' - -import { reportPieceCid } from '../index.js' -import { getServiceSigner } from '../service.js' - -import { getClaimsServiceServer, getServiceCtx } from './helpers/ucanto.js' -import { createCar } from './helpers/car.js' - -test('reports piece cid from a piece written to the piece table', async t => { - const { piece, link } = await createCar() - const claimsEqualsCall = pDefer() - const { claimsInvocationConfig, claimsService } = await getService({ - claims: { - onCall: claimsEqualsCall - } - }) - - const reportPieceCidResponse = await reportPieceCid({ - piece, - content: link, - claimsInvocationConfig, - claimsServiceConnection: claimsService.connection, - }) - - t.truthy(reportPieceCidResponse.ok) - t.falsy(reportPieceCidResponse.error) - - // Validate ucanto server calls - t.is(claimsService.service.assert.equals.callCount, 1) - const invCapClaims = await claimsEqualsCall.promise - t.is(invCapClaims.can, 'assert/equals') -}) - -test('fails reporting piece cid if fails to claim equals', async t => { - const { piece, link } = await createCar() - const claimEqualsCall = pDefer() - const { claimsInvocationConfig, claimsService } = await getService({ - claims: { - onCall: claimEqualsCall, - mustFail: true - } - }) - - const reportPieceCidResponse = await reportPieceCid({ - piece, - content: link, - claimsInvocationConfig, - claimsServiceConnection: claimsService.connection, - }) - - t.falsy(reportPieceCidResponse.ok) - t.truthy(reportPieceCidResponse.error) - - // Validate ucanto server calls - t.is(claimsService.service.assert.equals.callCount, 1) -}) - -/** - * @typedef {object} Props - * @property {import('p-defer').DeferredPromise} onCall - * @property {boolean} [mustFail] - * - * @param {Record<'claims', Props>} options - */ -async function getService (options) { - const { storefront, claims } = await getServiceCtx() - const claimsService = await getClaimsServiceServer(claims.raw, { - onCall: (invCap) => { - options.claims.onCall.resolve(invCap) - }, - mustFail: options.claims.mustFail - }) - - const issuer = getServiceSigner(storefront) - - return { - claimsInvocationConfig:/** @type {import('../types').ClaimsInvocationConfig} */({ - issuer, - audience: claimsService.connection.id, - with: issuer.did(), - }), - claimsService, - } -} diff --git a/package-lock.json b/package-lock.json index ed33a30c..972f77ec 100644 --- a/package-lock.json +++ b/package-lock.json @@ -114,9 +114,8 @@ "@ucanto/interface": "^9.0.0", "@ucanto/principal": "^9.0.0", "@ucanto/transport": "^9.0.0", - "@web3-storage/content-claims": "^3.2.0", "@web3-storage/data-segment": "^5.0.0", - "@web3-storage/filecoin-api": "^4.2.0", + "@web3-storage/filecoin-api": "^4.6.0", "@web3-storage/filecoin-client": "^3.1.3", "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "^3.1.1", "multiformats": "^13.1.0", @@ -5470,9 +5469,9 @@ } }, "node_modules/@ucanto/transport": { - "version": "9.0.2", - "resolved": "https://registry.npmjs.org/@ucanto/transport/-/transport-9.0.2.tgz", - "integrity": "sha512-I3VbqLb+q++59JHGbkexDc3u1BiIyNfBkX4JuApjiU7iiDfXoFg/T8jvJuZW36oazCut/eARLYCORfHLkPRVOQ==", + "version": "9.1.0", + "resolved": "https://registry.npmjs.org/@ucanto/transport/-/transport-9.1.0.tgz", + "integrity": "sha512-3pLXEg9YIH0NN1faBh0Xaioxbb2JtPL+4AFtQtmO8LnRyqGnTahZwwaM8XFL5eMBAp0pYDoZaQ6wdMce0t1cAQ==", "dependencies": { "@ucanto/core": "^9.0.1", "@ucanto/interface": "^9.0.0" @@ -5599,14 +5598,14 @@ } }, "node_modules/@web3-storage/capabilities": { - "version": "13.1.1", - "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-13.1.1.tgz", - "integrity": "sha512-yQwrjhqwXGc1z8FCs7dCMsNp+G1LCrPq8RWCrflHA0rlISyMez6DQQpOJrCfao/MSk30nzPSzIm+FX/k3+8knw==", + "version": "13.2.0", + "resolved": "https://registry.npmjs.org/@web3-storage/capabilities/-/capabilities-13.2.0.tgz", + "integrity": "sha512-KYqyW4RNrDPdogSANcmME0+Ci+FLQfTCkYUMfpp67K5hymCz25e8KWSHBSR+lCO9ggSuD4q49zi1LEid8ISIQg==", "dependencies": { "@ucanto/core": "^9.0.1", "@ucanto/interface": "^9.0.0", "@ucanto/principal": "^9.0.0", - "@ucanto/transport": "^9.0.0", + "@ucanto/transport": "^9.1.0", "@ucanto/validator": "^9.0.1", "@web3-storage/data-segment": "^3.2.0" } @@ -5679,24 +5678,48 @@ } }, "node_modules/@web3-storage/filecoin-api": { - "version": "4.4.0", - "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-api/-/filecoin-api-4.4.0.tgz", - "integrity": "sha512-mT3QRC1Cbu0SDHMQjo1ZJFEH8dL4+8H6++wZOIm401bFSHXEamTkydQT3DfX9IHXAFSab/7kwkQ93yDwOgtXiA==", + "version": "4.6.0", + "resolved": "https://registry.npmjs.org/@web3-storage/filecoin-api/-/filecoin-api-4.6.0.tgz", + "integrity": "sha512-9uy5aXxERE/DvJgFlljqgiGkmj3HCWcj6PqjMOAaemiyI0wPKoIdfO0P7sH4d0wz87Iiz7y9Kxwf3VGhGqoB2g==", "dependencies": { "@ipld/dag-ucan": "^3.4.0", "@ucanto/client": "^9.0.0", "@ucanto/core": "^9.0.1", "@ucanto/interface": "^9.0.0", "@ucanto/server": "^9.0.1", - "@ucanto/transport": "^9.0.0", - "@web3-storage/capabilities": "^13.1.1", + "@ucanto/transport": "^9.1.0", + "@web3-storage/capabilities": "^13.2.0", + "@web3-storage/content-claims": "^4.0.2", "@web3-storage/data-segment": "^4.0.0", + "fr32-sha2-256-trunc254-padded-binary-tree-multihash": "^3.3.0", "p-map": "^6.0.0" }, "engines": { "node": ">=16.15" } }, + "node_modules/@web3-storage/filecoin-api/node_modules/@web3-storage/content-claims": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/@web3-storage/content-claims/-/content-claims-4.0.2.tgz", + "integrity": "sha512-k6tIc7YjQtdKWi01r7+5stp2lo13ztwpIz+7NQYEbu5fZEsKKes5B4FKRqPWkZYO17+rPaihOY6sICT498c9EA==", + "dependencies": { + "@ucanto/client": "^9.0.0", + "@ucanto/interface": "^9.0.0", + "@ucanto/server": "^9.0.1", + "@ucanto/transport": "^9.0.0", + "carstream": "^1.0.2", + "multiformats": "^12.0.1" + } + }, + "node_modules/@web3-storage/filecoin-api/node_modules/@web3-storage/content-claims/node_modules/multiformats": { + "version": "12.1.3", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-12.1.3.tgz", + "integrity": "sha512-eajQ/ZH7qXZQR2AgtfpmSMizQzmyYVmCql7pdhldPuYQi4atACekbJaQplk6dWyIi10jCaFnd6pqvcEFXjbaJw==", + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, "node_modules/@web3-storage/filecoin-api/node_modules/@web3-storage/data-segment": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/@web3-storage/data-segment/-/data-segment-4.0.0.tgz", @@ -5780,20 +5803,20 @@ } }, "node_modules/@web3-storage/upload-api": { - "version": "8.3.0", - "resolved": "https://registry.npmjs.org/@web3-storage/upload-api/-/upload-api-8.3.0.tgz", - "integrity": "sha512-TI1IopAA3K+/QFkw9ukJ0bUete3nGsd0mIF3eeBvwVaZhl4nE8OWtdYQX+AcGvoeUoww6eheDWmynBRsaedqJQ==", + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/@web3-storage/upload-api/-/upload-api-9.0.0.tgz", + "integrity": "sha512-zpCeRZtHCbqhodUp/DPXM3Majnb02koP0j02RutqTy2VGKzvlC0Jbx5T4AhLKVj2zLzyZy9jm7eqjJbsXwH+qA==", "dependencies": { "@ucanto/client": "^9.0.0", "@ucanto/interface": "^9.0.0", "@ucanto/principal": "^9.0.0", "@ucanto/server": "^9.0.1", - "@ucanto/transport": "^9.0.0", + "@ucanto/transport": "^9.1.0", "@ucanto/validator": "^9.0.1", "@web3-storage/access": "^18.2.0", - "@web3-storage/capabilities": "^13.1.1", + "@web3-storage/capabilities": "^13.2.0", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/filecoin-api": "^4.3.1", + "@web3-storage/filecoin-api": "^4.6.0", "multiformats": "^12.1.2", "p-retry": "^5.1.2" }, @@ -18270,7 +18293,7 @@ "@web3-storage/access": "^18.2.0", "@web3-storage/capabilities": "^13.1.1", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/upload-api": "^8.3.0", + "@web3-storage/upload-api": "^9.0.0", "multiformats": "^13.1.0", "nanoid": "^5.0.2", "preact": "^10.14.1", diff --git a/stacks/filecoin-stack.js b/stacks/filecoin-stack.js index 960e25bb..a1e26223 100644 --- a/stacks/filecoin-stack.js +++ b/stacks/filecoin-stack.js @@ -48,7 +48,6 @@ export function FilecoinStack({ stack, app }) { * 1st processor queue - filecoin submit * On filecoin submit queue messages, validate piece for given content and store it in store. */ - // TODO: This will ONLY run when we validate pieces provided by user const filecoinSubmitQueueName = getCdkNames('filecoin-submit-queue', stack.stage) const filecoinSubmitQueueDLQ = new Queue(stack, `${filecoinSubmitQueueName}-dlq`, { cdk: { queue: { retentionPeriod: Duration.days(14) } } @@ -59,8 +58,17 @@ export function FilecoinStack({ stack, app }) { handler: 'filecoin/functions/handle-filecoin-submit-message.main', environment : { PIECE_TABLE_NAME: pieceTable.tableName, + // Setup both buckets + STORE_BUCKET_NAME: carparkBucket.bucketName, + R2_ACCESS_KEY_ID: process.env.R2_ACCESS_KEY_ID ?? '', + R2_SECRET_ACCESS_KEY: process.env.R2_SECRET_ACCESS_KEY ?? '', + R2_REGION: process.env.R2_REGION ?? '', + R2_ENDPOINT: process.env.R2_ENDPOINT ?? '', + R2_CARPARK_BUCKET_NAME: process.env.R2_CARPARK_BUCKET_NAME ?? '', }, permissions: [pieceTable], + // piece is computed in this lambda + timeout: 15 * 60, }, deadLetterQueue: filecoinSubmitQueueDLQ.cdk.queue, cdk: { @@ -230,19 +238,22 @@ export function FilecoinStack({ stack, app }) { }) // piece-cid compute - // Shortcut from system that goes directly into submitted status + // Shortcut from system that offers piece anyway on bucket event const pieceCidComputeHandler = new Function( stack, 'piece-cid-compute-handler', { environment : { - PIECE_TABLE_NAME: pieceTable.tableName, + DISABLE_PIECE_CID_COMPUTE, DID: UPLOAD_API_DID, - DISABLE_PIECE_CID_COMPUTE + STOREFRONT_DID: UPLOAD_API_DID, + STOREFRONT_URL: storefrontCustomDomain?.domainName ? `https://${storefrontCustomDomain?.domainName}` : '', }, + bind: [ + privateKey + ], permissions: [pieceTable, carparkBucket], handler: 'filecoin/functions/piece-cid-compute.handler', - timeout: 15 * 60, }, ) diff --git a/test/filecoin.test.js b/test/filecoin.test.js index e27b1b67..cdeca146 100644 --- a/test/filecoin.test.js +++ b/test/filecoin.test.js @@ -32,7 +32,8 @@ test.before(t => { } }) -test('w3filecoin integration flow', async t => { +test.only('w3filecoin integration flow', async t => { + console.log('test') const stage = getStage() const s3Client = getAwsBucketClient() const s3ClientFilecoin = getAwsBucketClient('us-east-2') @@ -77,6 +78,7 @@ test('w3filecoin integration flow', async t => { } // Check filecoin pieces computed after leaving queue + // Bucket event given client is not invoking this await Promise.all(uploads.map(async (upload) => { const pieces = await getPiecesByContent(t, upload.content.toString()) t.assert(pieces) @@ -85,6 +87,8 @@ test('w3filecoin integration flow', async t => { t.is(pieces?.[0].piece, upload.piece.toString()) })) + console.log('pieces in table') + // we only care about one making its way to the finish, as based on timings an individual piece may need to wait for a new batch await Promise.race(uploads.map(async testUpload => { // Check roundabout can redirect from pieceCid to signed bucket url for car diff --git a/upload-api/functions/ucan-invocation-router.js b/upload-api/functions/ucan-invocation-router.js index a722d641..2d8635b4 100644 --- a/upload-api/functions/ucan-invocation-router.js +++ b/upload-api/functions/ucan-invocation-router.js @@ -228,7 +228,7 @@ export async function ucanInvocationRouter(request) { options: { // TODO: we compute and put all pieces into the queue on bucket event // We may change this to validate user provided piece - skipFilecoinSubmitQueue: true + skipFilecoinSubmitQueue: false }, plansStorage, requirePaymentPlan, diff --git a/upload-api/package.json b/upload-api/package.json index 4a7db365..5d297000 100644 --- a/upload-api/package.json +++ b/upload-api/package.json @@ -25,7 +25,7 @@ "@web3-storage/access": "^18.2.0", "@web3-storage/capabilities": "^13.1.1", "@web3-storage/did-mailto": "^2.1.0", - "@web3-storage/upload-api": "^8.3.0", + "@web3-storage/upload-api": "^9.0.0", "multiformats": "^13.1.0", "nanoid": "^5.0.2", "preact": "^10.14.1",