Skip to content

Commit

Permalink
chore: address review
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Mar 26, 2024
1 parent 3aa4686 commit 20f8bfd
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 20 deletions.
6 changes: 3 additions & 3 deletions packages/filecoin-api/src/storefront/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
import {
Store,
UpdatableAndQueryableStore,
StreammableStore,
Queue,
ServiceConfig,
} from '../types.js'
Expand All @@ -29,7 +30,7 @@ export type PieceStore = UpdatableAndQueryableStore<
>
export type FilecoinSubmitQueue = Queue<FilecoinSubmitMessage>
export type PieceOfferQueue = Queue<PieceOfferMessage>
export type DataStore = Store<UnknownLink, AsyncIterable<Uint8Array>>
export type DataStore = StreammableStore<UnknownLink, Uint8Array>
export type TaskStore = Store<UnknownLink, Invocation>
export type ReceiptStore = Store<UnknownLink, Receipt>

Expand Down Expand Up @@ -123,8 +124,7 @@ export interface ClaimsClientContext {
*/
claimsService: {
invocationConfig: ClaimsInvocationConfig
// eslint-disable-next-line @typescript-eslint/no-explicit-any
connection: ConnectionView<any>
connection: ConnectionView<import('@web3-storage/content-claims/server/service/api').Service>
}
}

Expand Down
8 changes: 4 additions & 4 deletions packages/filecoin-api/src/storefront/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ export const handleFilecoinSubmitMessage = async (context, message) => {

// read and compute piece for content
// TODO: needs to be hooked with location claims
const contentGetRes = await context.dataStore.get(message.content)
if (contentGetRes.error) {
return { error: new BlobNotFound(contentGetRes.error.message) }
const contentStreamRes = await context.dataStore.stream(message.content)
if (contentStreamRes.error) {
return { error: new BlobNotFound(contentStreamRes.error.message) }
}

const computedPieceCid = await computePieceCid(contentGetRes.ok)
const computedPieceCid = await computePieceCid(contentStreamRes.ok)
if (computedPieceCid.error) {
return computedPieceCid
}
Expand Down
11 changes: 11 additions & 0 deletions packages/filecoin-api/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ export interface UpdatableStore<RecKey, Rec> extends Store<RecKey, Rec> {
) => Promise<Result<Rec, StoreGetError>>
}

export interface StreammableStore<RecKey, Rec> {
/**
* Puts a record in the store.
*/
put: (record: Rec) => Promise<Result<Unit, StorePutError>>
/**
* Gets a record from the store.
*/
stream: (key: RecKey) => Promise<Result<AsyncIterable<Rec>, StoreGetError>>
}

export interface QueryableStore<RecKey, Rec, Query> extends Store<RecKey, Rec> {
/**
* Queries for record matching a given criterium.
Expand Down
25 changes: 19 additions & 6 deletions packages/filecoin-api/test/context/store-implementations.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { UpdatableStore } from './store.js'
import { UpdatableStore, StreammableStore } from './store.js'

/**
* @typedef {import('@ucanto/interface').Link} Link
Expand All @@ -18,7 +18,8 @@ import { UpdatableStore } from './store.js'
* @typedef {import('../../src/deal-tracker/api.js').DealRecordKey} DealRecordKey
*/
export const getStoreImplementations = (
StoreImplementation = UpdatableStore
StoreImplementation = UpdatableStore,
StreammableStoreImplementation = StreammableStore
) => ({
storefront: {
pieceStore: new StoreImplementation({
Expand Down Expand Up @@ -76,12 +77,24 @@ export const getStoreImplementations = (
return Array.from(items).find((i) => i.ran.link().equals(record))
},
}),
dataStore: new StoreImplementation({
getFn: (
/** @type {Set<AsyncIterable<Uint8Array>>} */ items,
dataStore: new StreammableStore({
streamFn: (
/** @type {Set<Uint8Array>} */ items,
/** @type {import('@ucanto/interface').UnknownLink} */ record
) => {
return Array.from(items).pop()
const item = Array.from(items).pop()
if (!item) {
return undefined
}
const asyncIterableRes = {
[Symbol.asyncIterator]: async function* () {
// Yield the Uint8Array asynchronously
if (item) {
yield item
}
},
}
return asyncIterableRes
},
}),
},
Expand Down
47 changes: 47 additions & 0 deletions packages/filecoin-api/test/context/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,53 @@ export class Store {
}
}

/**
* @template K
* @template V
* @implements {API.StreammableStore<K,V>}
*/
export class StreammableStore {
/**
* @param {import('./types.js').StreammableStoreOptions<K, V>} options
*/
constructor(options) {
/** @type {Set<V>} */
this.items = new Set()
this.streamFn = options.streamFn
}

/**
* @param {V} record
* @returns {Promise<import('@ucanto/interface').Result<{}, StorePutError>>}
*/
async put(record) {
this.items.add(record)

return Promise.resolve({
ok: {},
})
}

/**
* @param {K} item
* @returns {Promise<import('@ucanto/interface').Result<AsyncIterable<V>, StoreGetError>>}
*/
async stream(item) {
if (!this.streamFn) {
throw new Error('get not supported')
}
const t = this.streamFn(this.items, item)
if (!t) {
return {
error: new RecordNotFound('not found'),
}
}
return {
ok: t,
}
}
}

/**
* @template K
* @template V
Expand Down
4 changes: 4 additions & 0 deletions packages/filecoin-api/test/context/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ export interface StoreOptions<K, V> {
export interface UpdatableStoreOptions<K, V> extends StoreOptions<K, V> {
updateFn?: (items: Set<V>, key: K, item: Partial<V>) => V
}

export interface StreammableStoreOptions<K, V> extends StoreOptions<K, V> {
streamFn?: (items: Set<V>, item: K) => AsyncIterable<V> | undefined
}
8 changes: 1 addition & 7 deletions packages/filecoin-api/test/events/storefront.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,7 @@ export const test = {
}

// Store bytes on datastore
const asyncIterableMock = {
[Symbol.asyncIterator]: async function* () {
// Yield the Uint8Array asynchronously
yield cargo.bytes
},
}
await context.dataStore.put(asyncIterableMock)
await context.dataStore.put(cargo.bytes)

// Handle message
const handledMessageRes =
Expand Down
4 changes: 4 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 20f8bfd

Please sign in to comment.