Skip to content

Commit

Permalink
fix: parallelise loading of dag-pb links in directories when exporting (
Browse files Browse the repository at this point in the history
#286)

This PR extends the work done in #249 to also include directories and sharded directories.
  • Loading branch information
Alan Shaw authored Mar 10, 2023
1 parent 999a539 commit 9e01878
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 19 deletions.
1 change: 1 addition & 0 deletions packages/ipfs-unixfs-exporter/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
"hamt-sharding": "^3.0.0",
"interface-blockstore": "^4.0.0",
"ipfs-unixfs": "^11.0.0",
"it-filter": "^2.0.0",
"it-last": "^2.0.0",
"it-map": "^2.0.0",
"it-parallel": "^3.0.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import parallel from 'it-parallel'
import { pipe } from 'it-pipe'
import map from 'it-map'
import filter from 'it-filter'
import type { ExporterOptions, UnixfsV1DirectoryContent, UnixfsV1Resolver } from '../../../index.js'

const directoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, depth, blockstore) => {
Expand All @@ -6,13 +10,19 @@ const directoryContent: UnixfsV1Resolver = (cid, node, unixfs, path, resolve, de
const length = options.length ?? node.Links.length
const links = node.Links.slice(offset, length)

for (const link of links) {
const result = await resolve(link.Hash, link.Name ?? '', `${path}/${link.Name ?? ''}`, [], depth + 1, blockstore, options)

if (result.entry != null) {
yield result.entry
}
}
yield * pipe(
links,
source => map(source, link => {
return async () => {
const linkName = link.Name ?? ''
const linkPath = `${path}/${linkName}`
const result = await resolve(link.Hash, linkName, linkPath, [], depth + 1, blockstore, options)
return result.entry
}
}),
source => parallel(source, { ordered: true }),
source => filter(source, entry => entry != null)
)
}

return yieldDirectoryContent
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import parallel from 'it-parallel'
import { pipe } from 'it-pipe'
import map from 'it-map'
import { decode, PBNode } from '@ipld/dag-pb'
import type { Blockstore } from 'interface-blockstore'
import type { ExporterOptions, Resolve, UnixfsV1DirectoryContent, UnixfsV1Resolver } from '../../../index.js'
Expand All @@ -22,22 +25,30 @@ const hamtShardedDirectoryContent: UnixfsV1Resolver = (cid, node, unixfs, path,
async function * listDirectory (node: PBNode, path: string, resolve: Resolve, depth: number, blockstore: Blockstore, options: ExporterOptions): UnixfsV1DirectoryContent {
const links = node.Links

for (const link of links) {
const name = link.Name != null ? link.Name.substring(2) : null
const results = pipe(
links,
source => map(source, link => {
return async () => {
const name = link.Name != null ? link.Name.substring(2) : null

if (name != null && name !== '') {
const result = await resolve(link.Hash, name, `${path}/${name}`, [], depth + 1, blockstore, options)
if (name != null && name !== '') {
const result = await resolve(link.Hash, name, `${path}/${name}`, [], depth + 1, blockstore, options)

yield result.entry
} else {
// descend into subshard
const block = await blockstore.get(link.Hash)
node = decode(block)
return { entries: result.entry == null ? [] : [result.entry] }
} else {
// descend into subshard
const block = await blockstore.get(link.Hash)
node = decode(block)

for await (const file of listDirectory(node, path, resolve, depth, blockstore, options)) {
yield file
return { entries: listDirectory(node, path, resolve, depth, blockstore, options) }
}
}
}
}),
source => parallel(source, { ordered: true })
)

for await (const { entries } of results) {
yield * entries
}
}

Expand Down

0 comments on commit 9e01878

Please sign in to comment.