Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add block session support to @helia/interface #398

Merged
merged 15 commits into from
Apr 4, 2024

Conversation

achingbrain
Copy link
Member

@achingbrain achingbrain commented Jan 19, 2024

There are no implementations yet but the usage pattern will be something like:

// unixfs cat command
export async function * cat (cid: CID, blockstore: Blocks, options: Partial<CatOptions> = {}): AsyncIterable<Uint8Array> {
  // create a session for the CID if support is available
  const blocks = await (blockstore.createSession != null ? blockstore.createSession(cid, options) : blockstore)
  const opts: CatOptions = mergeOptions(defaultOptions, options)

  // resolve and export using the session, if created, otherwise fall back to regular blockstore access
  const resolved = await resolve(cid, opts.path, blocks, opts)
  const result = await exporter(resolved.cid, blocks, opts)

  if (result.type !== 'file' && result.type !== 'raw') {
    throw new NotAFileError()
  }

  if (result.content == null) {
    throw new NoContentError()
  }

  yield * result.content(opts)
}

Alternatively the user can control session creation:

import { unixfs } from '@helia/unixfs'
import { createHelia } from 'helia' // or http
import { CID } from 'multiformats/cid'

const node = await createHelia()
const rootCid = CID.parse('Qmfoo')
const sessionBlockstore = await node.blockstore.createSession(rootCid, {
  signal: AbortSignal.timeout(5000)
})

// all operations will use the same session
const fs = unixfs({ blockstore: sessionBlockstore })

for await (const entry of fs.ls(rootCid) {
  if (entry.type !== 'file') {
    continue
  }

  for await (const buf of fs.cat(entry.cid)) {
    // ...
  }
}

Removes the BlockAnnouncer/BlockRetriever single-method interface BlockBroker split because we would have to add another BlockSessionFactory interface for this which starts getting unwieldy. Instead just have all the methods be optional and filter the brokers before use.

@achingbrain achingbrain requested a review from a team as a code owner January 19, 2024 11:24
Copy link
Member

@SgtPooki SgtPooki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to combining blockBroker method types.

Could you explain how a session would become aware of providers obtained from a delegated-routing call?

* This method is optional to maintain compatibility with existing
* blockstores that do not support sessions.
*/
createSession?(root: CID, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<Blockstore>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, the signal passed here is for the entire retrival session?

E.g. Your example would abort if the entire session lasted longer than 5000ms, correct?

  const sessionBlockstore = await node.blockstore.createSession(rootCid, {
  signal: AbortSignal.timeout(5000)
})

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do we want to return Blockstore explitly and not Blocks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, the signal passed here is for the entire retrival session?

No, the signal is just for the session creation, e.g. when we do a findProvs and send WANTHAVE to create the initial set of providers.

Also, do we want to return Blockstore explitly and not Blocks?

It returns Blockstore intentionally so that you can't create a session from a session.

packages/utils/src/utils/networked-storage.ts Outdated Show resolved Hide resolved
Comment on lines +202 to +206
if (broker.createSession == null) {
return broker
}

return broker.createSession(root, options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really related to this section of code but: how do these session blockstores get providers from delegated-routing to the sessions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to #398 (comment) - you accept an instance of the Helia Routing as a component and query it:

E.g.:

import { createHeliaHttp from } '@helia/http'
import { createDelegatedRoutingV1HttpApiClient } from '@helia/delegated-routing-v1-http-api-client'
import { Routing } from '@helia/interface'

interface MyBlockBrokerComponents {
  routing: Routing
}

interface MyBlockBrokerInit {
  // broker-specific config
}

class MyBlockBroker {
  private routing: Routing

  constructor (components: MyBlockBrokerComponents, init: MyBlockBrokerInit) {
    this.routing = components.routing
  }
  
  createSession (root: CID): Promise<BlockBroker> {
    for await (const prov of this.routing.findProviders(root)) {
      if (prov.protocols.includes('my-block-broker-protocol')) {
        // do block broker stuff
      }
    }

   // ...more code here
  }
}

function createMyBlockBroker(init: MyBlockBrokerInit) {
  return (components) => new MyBlockBroker(components, init)
}

const node = createHeliaHttp({
  blockBrokers: [
    createMyBlockBroker({
      // ...init stuff here
    })
  ]
  routers: [
    createDelegatedRoutingV1HttpApiClient('https://example.com')
  ]
})

// this will call the broker's createSession method
const sessionBlockstore = await node.blockstore.createSesssion(cid)

Comment on lines +60 to +62
* A session blockstore is a special blockstore that only pulls content from a
* subset of network peers which respond as having the block for the initial
* root CID.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity what's the reason for putting the session information in the blockstore rather than passing it along with say the signals?

My concern here (which might not be valid) is that it feels like we did something similar in the Go code and needing users to remember to create new block fetching stacks for every session ended up leading to the sessions not being plumbed through the stack well and therefore not having one "code session" per "logical session".

We're currently trying to alter how we do this to allow it to happen via signal (or "context" in Go). See ipfs/boxo#567 and ipfs/kubo#7198.

Every language/environment is different so maybe these concerns would be less likely to surface here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My feeling is that if people are using the API incorrectly it's probably because the correct usage isn't obvious enough, either through basic things like argument names and positions or documentation or both.

We already have people not passing AbortSignals into long-lived operations so I think a session identifier there would also get missed.

The primary interaction point for people with DAGs of various shapes are the @helia/* modules. Since these are (mostly?) stateless they are essentially disposable so I think pushing the session/no session decision to the user (e.g. do I use helia.blockstore or helia.blockstore.createSession(rootCid)) and having them interact with separate @helia/unixfs instances for each session will create a smaller amount of cognitive load.

@achingbrain
Copy link
Member Author

Could you explain how a session would become aware of providers obtained from a delegated-routing call?

HTTP Delegated Routers are configured as a router, and when you use the routing methods it'll query them.

E.g.:

import { createHeliaHttp from } '@helia/http'
import { createDelegatedRoutingV1HttpApiClient } from '@helia/delegated-routing-v1-http-api-client'

const node = createHeliaHttp({
  routers: [
    createDelegatedRoutingV1HttpApiClient('https://example.com')
  ]
})

// this will call the http router internally:
for await (const prov of node.routing.findProviders(cid)) {
  if (prov.protocols.includes('transport-ipfs-gateway-http')) {
    // do http things
  }

  if (prov.protocols.includes('transport-bitswap')) {
    // do bitswap things
  }

  // etc
}

There are no implementations yet but the usage pattern will be something
like:

```javascript
// unixfs cat command
export async function * cat (cid: CID, blockstore: Blocks, options: Partial<CatOptions> = {}): AsyncIterable<Uint8Array> {
  // create a session for the CID if support is available
  const blocks = await (blockstore.createSession != null ? blockstore.createSession(cid) : blockstore)
  const opts: CatOptions = mergeOptions(defaultOptions, options)

  // resolve and export using the session, if created, otherwise fall back to regular blockstore access
  const resolved = await resolve(cid, opts.path, blocks, opts)
  const result = await exporter(resolved.cid, blocks, opts)

  if (result.type !== 'file' && result.type !== 'raw') {
    throw new NotAFileError()
  }

  if (result.content == null) {
    throw new NoContentError()
  }

  yield * result.content(opts)
}
```
Copy link
Member

@SgtPooki SgtPooki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approved, but left some comments for improvement

packages/interface/src/blocks.ts Outdated Show resolved Hide resolved
packages/interface/src/blocks.ts Outdated Show resolved Hide resolved
packages/interface/src/blocks.ts Outdated Show resolved Hide resolved
packages/utils/src/utils/networked-storage.ts Outdated Show resolved Hide resolved
packages/utils/src/utils/networked-storage.ts Show resolved Hide resolved
achingbrain and others added 2 commits March 1, 2024 08:32
Co-authored-by: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com>
Comment on lines +87 to +89
await Promise.all(
this.components.blockBrokers.map(async broker => broker.announce?.(cid, block, options))
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want allSettled instead of all? do we want a put to fail if a single announcement fails?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps allSettled. Maybe we'd want to throw if all blocker provers failed to announce?

@SgtPooki
Copy link
Member

SgtPooki commented Apr 3, 2024

@achingbrain anything blocking the merging of this? were you able to test this successfully in helia-http-gateway or should I take a look at that?

@achingbrain achingbrain merged commit 5cf216b into main Apr 4, 2024
18 checks passed
@achingbrain achingbrain deleted the feat/add-sessions-to-interface branch April 4, 2024 10:27
@achingbrain achingbrain mentioned this pull request Apr 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants