Skip to content

Commit

Permalink
feat: store and upload list with pagination support (#28)
Browse files Browse the repository at this point in the history
Adds pagination for `store/list` and `upload/list` relying on a `cursor`
and `size` capability invocation parameters.

Updates `access-client` to 6.0.0
https://github.com/web3-storage/w3protocol/releases/tag/access-v6.0.0 .
Most notably, account was renamed to space and these changes are
reflected here as well

Closes #16
  • Loading branch information
vasco-santos authored Nov 22, 2022
1 parent ca450ac commit 42c6d0c
Show file tree
Hide file tree
Showing 11 changed files with 7,926 additions and 3,630 deletions.
2 changes: 1 addition & 1 deletion api/access.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { info } from '@web3-storage/access/capabilities/account'
import { info } from '@web3-storage/access/capabilities/space'
import { connect } from '@ucanto/client'
import { CAR, CBOR, HTTP } from '@ucanto/transport'
import fetch from '@web-std/fetch'
Expand Down
2 changes: 1 addition & 1 deletion api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"@ucanto/server": "^3.0.4",
"@ucanto/transport": "^3.0.2",
"@web-std/fetch": "^4.1.0",
"@web3-storage/access": "^5.0.2",
"@web3-storage/access": "^6.0.1",
"@web3-storage/sigv4": "^1.0.2",
"multiformats": "^10.0.2"
},
Expand Down
9 changes: 6 additions & 3 deletions api/service/store/list.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ export function storeListProvider(context) {
return Server.provide(
Store.list,
async ({ capability }) => {
const { cursor, size } = capability.nb

// Only use capability account for now to check if account is registered.
// This must change to access account/info!!
// We need to use https://github.com/web3-storage/w3protocol/blob/9d4b5bec1f0e870233b071ecb1c7a1e09189624b/packages/access/src/agent.js#L270
const account = capability.with

// TODO: Page, Size

return await context.storeTable.list(account)
return await context.storeTable.list(account, {
size,
cursor
})
}
)
}
11 changes: 6 additions & 5 deletions api/service/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ export interface StoreTable {
exists: (uploaderDID: string, payloadCID: string) => Promise<boolean>
insert: (item: StoreItemInput) => Promise<StoreItemOutput>
remove: (uploaderDID: string, payloadCID: string) => Promise<void>
list: (uploaderDID: string) => Promise<ListResponse<StoreListResult>>
list: (uploaderDID: string, options?: ListOptions) => Promise<ListResponse<StoreListResult>>
}

export interface UploadTable {
exists: (uploaderDID: string, dataCID: string) => Promise<boolean>
insert: (uploaderDID: string, item: UploadItemInput) => Promise<UploadItemOutput[]>
remove: (uploaderDID: string, dataCID: string) => Promise<void>
list: (uploaderDID: string) => Promise<ListResponse<UploadItemOutput>>
list: (uploaderDID: string, options?: ListOptions) => Promise<ListResponse<UploadItemOutput>>
}

export interface Signer {
Expand Down Expand Up @@ -65,7 +65,8 @@ export interface StoreAddSuccessResult {
export type StoreAddResult = StoreAddSuccessResult | MalformedCapability

export type ListOptions = {
pageSize?: number,
size?: number,
cursor?: string
}

export interface StoreListResult {
Expand All @@ -76,8 +77,8 @@ export interface StoreListResult {
}

export interface ListResponse<R> {
cursorID?: string,
pageSize: number,
cursor?: string,
size: number,
results: R[]
}

Expand Down
9 changes: 6 additions & 3 deletions api/service/upload/list.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ export function uploadListProvider(context) {
return Server.provide(
Upload.list,
async ({ capability }) => {
const { cursor, size } = capability.nb

// Only use capability account for now to check if account is registered.
// This must change to access account/info!!
// We need to use https://github.com/web3-storage/w3protocol/blob/9d4b5bec1f0e870233b071ecb1c7a1e09189624b/packages/access/src/agent.js#L270
const account = capability.with

// TODO: Page, Size

return await context.uploadTable.list(account)
return await context.uploadTable.list(account, {
size,
cursor
})
})
}
20 changes: 12 additions & 8 deletions api/tables/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,21 @@ export function createStoreTable (region, tableName, options = {}) {
* @param {import('../service/types').ListOptions} [options]
*/
list: async (uploaderDID, options = {}) => {
const exclusiveStartKey = options.cursor ? marshall({
uploaderDID,
payloadCID: options.cursor
}) : undefined

const cmd = new QueryCommand({
TableName: tableName,
Limit: options.pageSize || 20,
Limit: options.size || 20,
KeyConditions: {
uploaderDID: {
ComparisonOperator: 'EQ',
AttributeValueList: [{ S: uploaderDID }],
},
},
ExclusiveStartKey: exclusiveStartKey,
AttributesToGet: ['payloadCID', 'size', 'origin', 'uploadedAt'],
})
const response = await dynamoDb.send(cmd)
Expand All @@ -120,16 +126,14 @@ export function createStoreTable (region, tableName, options = {}) {
return item
}) || []

/*
// TODO: cursor integrate with capabilities
// Get cursor of last key payload CID
// Get cursor of the item where list operation stopped (inclusive).
// This value can be used to start a new operation to continue listing.
const lastKey = response.LastEvaluatedKey && unmarshall(response.LastEvaluatedKey)
const cursorID = lastKey ? lastKey.payloadCID : undefined
*/
const cursor = lastKey ? lastKey.payloadCID : undefined

return {
pageSize: results.length,
// cursorID,
size: results.length,
cursor,
results
}
}
Expand Down
18 changes: 14 additions & 4 deletions api/tables/upload.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,21 @@ export function createUploadTable (region, tableName, options = {}) {
* @param {import('../service/types').ListOptions} [options]
*/
list: async (uploaderDID, options = {}) => {
const exclusiveStartKey = options.cursor ? marshall({
uploaderDID,
sk: options.cursor
}) : undefined

const cmd = new QueryCommand({
TableName: tableName,
Limit: options.pageSize || 20,
Limit: options.size || 20,
KeyConditions: {
uploaderDID: {
ComparisonOperator: 'EQ',
AttributeValueList: [{ S: uploaderDID }],
},
},
ExclusiveStartKey: exclusiveStartKey,
AttributesToGet: ['dataCID', 'carCID', 'uploadedAt'],
})
const response = await dynamoDb.send(cmd)
Expand All @@ -153,11 +159,15 @@ export function createUploadTable (region, tableName, options = {}) {
// @ts-expect-error
const results = response.Items?.map(i => unmarshall(i)) || []

// TODO: cursor integrate with capabilities
// Get cursor of the item where list operation stopped (inclusive).
// This value can be used to start a new operation to continue listing.
const lastKey = response.LastEvaluatedKey && unmarshall(response.LastEvaluatedKey)
const cursor = lastKey ? lastKey.sk : undefined

return {
pageSize: results.length,
results
size: results.length,
results,
cursor
}
},
}
Expand Down
77 changes: 69 additions & 8 deletions api/test/service/store.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ test('store/add allowed if invocation passes access verification', async (t) =>
t.is(storeAdd.headers['x-amz-checksum-sha256'], base64pad.baseEncode(link.multihash.digest))

const { service } = t.context.access.server
t.true(service.account.info.called)
t.is(service.account.info.callCount, 1)
t.true(service.space.info.called)
t.is(service.space.info.callCount, 1)
})

test('store/add disallowed if invocation fails access verification', async (t) => {
Expand Down Expand Up @@ -193,8 +193,8 @@ test('store/add disallowed if invocation fails access verification', async (t) =
t.is(storeAdd.error, true)

const { service } = t.context.access.server
t.true(service.account.info.called)
t.is(service.account.info.callCount, 1)
t.true(service.space.info.called)
t.is(service.space.info.callCount, 1)
})

test('store/remove does not fail for non existent link', async (t) => {
Expand Down Expand Up @@ -285,11 +285,12 @@ test('store/list does not fail for empty list', async (t) => {
issuer: alice,
audience: uploadService,
with: spaceDid,
proofs: [ proof ]
proofs: [ proof ],
nb: {}
// @ts-expect-error ʅʕ•ᴥ•ʔʃ
}).execute(connection)

t.like(storeList, { results: [], pageSize: 0 })
t.like(storeList, { results: [], size: 0 })
})

test('store/list returns items previously stored by the user', async (t) => {
Expand Down Expand Up @@ -318,11 +319,12 @@ test('store/list returns items previously stored by the user', async (t) => {
issuer: alice,
audience: uploadService,
with: spaceDid,
proofs: [ proof ]
proofs: [ proof ],
nb: {}
// @ts-expect-error ʅʕ•ᴥ•ʔʃ
}).execute(connection)

t.is(storeList.pageSize, links.length)
t.is(storeList.size, links.length)

// list order last-in-first-out
links.reverse()
Expand All @@ -333,6 +335,65 @@ test('store/list returns items previously stored by the user', async (t) => {
}
})

test('store/list can be paginated with custom size', async (t) => {
const uploadService = await Signer.generate()
const alice = await Signer.generate()
const { proof, spaceDid } = await createSpace(alice)
const connection = await getClientConnection(uploadService, t.context)

const data = [ new Uint8Array([11, 22, 34, 44, 55]), new Uint8Array([22, 34, 44, 55, 66]) ]
const links = []

for (const datum of data) {
const storeAdd = await StoreCapabilities.add.invoke({
issuer: alice,
audience: uploadService,
with: spaceDid,
nb: { link: await CAR.codec.link(datum) , size: datum.byteLength },
proofs: [proof]
// @ts-expect-error ʅʕ•ᴥ•ʔʃ
}).execute(connection)
t.not(storeAdd.error, true, storeAdd.message)
links.push(storeAdd.link)
}

// Get list with page size 1 (two pages)
const size = 1
const listPages = []
let cursor

do {
/** @type {import('../../service/types').ListResponse<any>} */
const storeList = await StoreCapabilities.list.invoke({
issuer: alice,
audience: uploadService,
with: spaceDid,
proofs: [ proof ],
nb: {
size,
cursor
}
// @ts-expect-error ʅʕ•ᴥ•ʔʃ
}).execute(connection)

cursor = storeList.cursor
// Add page if it has size
storeList.size && listPages.push(storeList.results)
} while (cursor)

t.is(listPages.length, data.length, 'has number of pages of added CARs')

// Inspect content
const storeList = listPages.flat()
// list order last-in-first-out
links.reverse()
let i = 0
for (const entry of storeList) {
t.like(entry, { payloadCID: links[i].toString(), size: 5 })
i++
}
})

/**
* @param {import("@aws-sdk/client-dynamodb").DynamoDBClient} dynamo
*/
Expand Down
67 changes: 63 additions & 4 deletions api/test/service/upload.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,12 @@ test('store/list does not fail for empty list', async (t) => {
issuer: alice,
audience: uploadService,
with: spaceDid,
proofs: [ proof ]
proofs: [ proof ],
nb: {}
// @ts-expect-error ʅʕ•ᴥ•ʔʃ
}).execute(connection)

t.like(uploadList, { results: [], pageSize: 0 })
t.like(uploadList, { results: [], size: 0 })
})

test('store/list returns entries previously uploaded by the user', async (t) => {
Expand Down Expand Up @@ -295,18 +296,76 @@ test('store/list returns entries previously uploaded by the user', async (t) =>
issuer: alice,
audience: uploadService,
with: spaceDid,
proofs: [ proof ]
proofs: [ proof ],
nb: {}
// @ts-expect-error ʅʕ•ᴥ•ʔʃ
}).execute(connection)

t.is(uploadList.pageSize, cars.length)
t.is(uploadList.size, cars.length)

// Validate entries have given CARs
for (const entry of uploadList.results) {
t.truthy(cars.find(car => car.roots[0].toString() === entry.dataCID ))
}
})

test('upload/list can be paginated with custom size', async (t) => {
const uploadService = await Signer.generate()
const alice = await Signer.generate()
const { proof, spaceDid } = await createSpace(alice)
const connection = await getClientConnection(uploadService, t.context)

// invoke multiple upload/add with proof
const cars = [
await randomCAR(128),
await randomCAR(128)
]

for (const car of cars) {
await UploadCapabilities.add.invoke({
issuer: alice,
audience: uploadService,
with: spaceDid,
nb: { root: car.roots[0], shards: [car.cid] },
proofs: [proof]
// @ts-expect-error ʅʕ•ᴥ•ʔʃ
}).execute(connection)
}

// Get list with page size 1 (two pages)
const size = 1
const listPages = []
let cursor

do {
/** @type {import('../../service/types').ListResponse<any>} */
const uploadList = await UploadCapabilities.list.invoke({
issuer: alice,
audience: uploadService,
with: spaceDid,
proofs: [ proof ],
nb: {
size,
// @ts-ignore let's do an interface for service!!
cursor
}
// @ts-expect-error ʅʕ•ᴥ•ʔʃ
}).execute(connection)

cursor = uploadList.cursor
// Add page if it has size
uploadList.size && listPages.push(uploadList.results)
} while (cursor)

t.is(listPages.length, cars.length, 'has number of pages of added CARs')

// Inspect content
const uploadList = listPages.flat()
for (const entry of uploadList) {
t.truthy(cars.find(car => car.roots[0].toString() === entry.dataCID ))
}
})

/**
* @param {import("@aws-sdk/client-dynamodb").DynamoDBClient} dynamo
*/
Expand Down
Loading

0 comments on commit 42c6d0c

Please sign in to comment.