Skip to content

Commit

Permalink
fix: simplify shard handling (#5)
Browse files Browse the repository at this point in the history
Reduces duplication between adding and removing links to sharded
directories by making them use the same code to trace a path to
the operation target without loading the entire shard.
  • Loading branch information
achingbrain authored Feb 27, 2023
1 parent 3f55208 commit 52d4786
Show file tree
Hide file tree
Showing 16 changed files with 545 additions and 588 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@
"release": "aegir release"
},
"dependencies": {
"@helia/interface": "next",
"@ipld/dag-pb": "^4.0.0",
"@libp2p/interfaces": "^3.3.1",
"@libp2p/logger": "^2.0.5",
Expand All @@ -152,7 +151,8 @@
"it-last": "^2.0.0",
"it-pipe": "^2.0.5",
"merge-options": "^3.0.4",
"multiformats": "^11.0.1"
"multiformats": "^11.0.1",
"sparse-array": "^1.3.2"
},
"devDependencies": {
"aegir": "^38.1.0",
Expand Down
277 changes: 107 additions & 170 deletions src/commands/utils/add-link.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,31 @@ import * as dagPB from '@ipld/dag-pb'
import { CID, Version } from 'multiformats/cid'
import { logger } from '@libp2p/logger'
import { UnixFS } from 'ipfs-unixfs'
import { DirSharded } from './dir-sharded.js'
import {
updateHamtDirectory,
recreateHamtLevel,
recreateInitialHamtLevel,
createShard,
recreateShardedDirectory,
toPrefix,
addLinksToHamtBucket
updateShardedDirectory
} from './hamt-utils.js'
import last from 'it-last'
import type { PBNode, PBLink } from '@ipld/dag-pb/interface'
import { sha256 } from 'multiformats/hashes/sha2'
import type { Bucket } from 'hamt-sharding'
import { AlreadyExistsError, InvalidParametersError, InvalidPBNodeError } from './errors.js'
import type { ImportResult } from 'ipfs-unixfs-importer'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Directory } from './cid-to-directory.js'
import type { Blockstore } from 'interface-blockstore'
import { isOverShardThreshold } from './is-over-shard-threshold.js'
import { hamtBucketBits, hamtHashFn } from './hamt-constants.js'
// @ts-expect-error no types
import SparseArray from 'sparse-array'
import { wrapHash } from './consumable-hash.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'

const log = logger('helia:unixfs:components:utils:add-link')

export interface AddLinkResult {
node: PBNode
cid: CID
size: number
}

export interface AddLinkOptions extends AbortOptions {
Expand Down Expand Up @@ -81,7 +80,7 @@ const convertToShardedDirectory = async (parent: Directory, blockstore: Blocksto
cidVersion: parent.cid.version
})

log(`Converted directory to sharded directory ${result.cid}`)
log(`converted directory to sharded directory ${result.cid}`)

return result
}
Expand Down Expand Up @@ -134,187 +133,125 @@ const addToDirectory = async (parent: Directory, child: PBLink, blockstore: Bloc

return {
node: parent.node,
cid,
size: buf.length
cid
}
}

const addToShardedDirectory = async (parent: Directory, child: Required<PBLink>, blockstore: Blockstore, options: AddLinkOptions): Promise<AddLinkResult> => {
const {
shard, path
} = await addFileToShardedDirectory(parent, child, blockstore, options)
const result = await last(shard.flush(blockstore))
const { path, hash } = await recreateShardedDirectory(parent.cid, child.Name, blockstore, options)
const finalSegment = path[path.length - 1]

if (result == null) {
throw new Error('No result from flushing shard')
if (finalSegment == null) {
throw new Error('Invalid HAMT, could not generate path')
}

const block = await blockstore.get(result.cid)
const node = dagPB.decode(block)
// find the next prefix
// const index = await hash.take(hamtBucketBits)
const prefix = finalSegment.prefix
const index = parseInt(prefix, 16)

// we have written out the shard, but only one sub-shard will have been written so replace it in the original shard
const parentLinks = parent.node.Links.filter((link) => {
return (link.Name ?? '').substring(0, 2) !== path[0].prefix
})

const newLink = node.Links
.find(link => (link.Name ?? '').substring(0, 2) === path[0].prefix)

if (newLink == null) {
throw new Error(`No link found with prefix ${path[0].prefix}`)
}

parentLinks.push(newLink)

return await updateHamtDirectory({
Data: parent.node.Data,
Links: parentLinks
}, blockstore, path[0].bucket, options)
}
log('next prefix for %s is %s', child.Name, prefix)

const addFileToShardedDirectory = async (parent: Directory, child: Required<PBLink>, blockstore: Blockstore, options: AddLinkOptions): Promise<{ shard: DirSharded, path: BucketPath[] }> => {
if (parent.node.Data == null) {
throw new InvalidPBNodeError('Parent node with no data passed to addFileToShardedDirectory')
}
const linkName = `${prefix}${child.Name}`
const existingLink = finalSegment.node.Links.find(l => (l.Name ?? '').startsWith(prefix))

// start at the root bucket and descend, loading nodes as we go
const rootBucket = await recreateInitialHamtLevel(parent.node.Links)
const node = UnixFS.unmarshal(parent.node.Data)

const shard = new DirSharded({
root: true,
dir: true,
parent: undefined,
parentKey: undefined,
path: '',
dirty: true,
flat: false,
mode: node.mode
}, {
...options,
cidVersion: parent.cid.version
})
shard._bucket = rootBucket

if (node.mtime != null) {
// update mtime if previously set
shard.mtime = {
secs: BigInt(Math.round(Date.now() / 1000))
}
}

// load subshards until the bucket & position no longer changes
const position = await rootBucket._findNewBucketAndPos(child.Name)
const path = toBucketPath(position)
path[0].node = parent.node
let index = 0

while (index < path.length) {
const segment = path[index]
index++
const node = segment.node

if (node == null) {
throw new Error('Segment had no node')
}

const link = node.Links
.find(link => (link.Name ?? '').substring(0, 2) === segment.prefix)

if (link == null) {
// prefix is new, file will be added to the current bucket
log(`Link ${segment.prefix}${child.Name} will be added`)
index = path.length

break
}
if (existingLink != null) {
log('link %s was present in shard', linkName)
// link is already present in shard

if (link.Name === `${segment.prefix}${child.Name}`) {
if (existingLink.Name === linkName) {
// file with same name is already present in shard
if (!options.allowOverwriting) {
throw new AlreadyExistsError()
}

// file already existed, file will be added to the current bucket
log(`Link ${segment.prefix}${child.Name} will be replaced`)
index = path.length

break
}

if ((link.Name ?? '').length > 2) {
// another file had the same prefix, will be replaced with a subshard
log(`Link ${link.Name} ${link.Hash} will be replaced with a subshard`)
index = path.length

break
}

// load sub-shard
log(`Found subshard ${segment.prefix}`)
const block = await blockstore.get(link.Hash)
const subShard = dagPB.decode(block)

// subshard hasn't been loaded, descend to the next level of the HAMT
if (path[index] == null) {
log(`Loaded new subshard ${segment.prefix}`)
await recreateHamtLevel(blockstore, subShard.Links, rootBucket, segment.bucket, parseInt(segment.prefix, 16), options)

const position = await rootBucket._findNewBucketAndPos(child.Name)

path.push({
bucket: position.bucket,
prefix: toPrefix(position.pos),
node: subShard
log('overwriting %s in subshard', child.Name)
finalSegment.node.Links = finalSegment.node.Links.filter(l => l.Name !== linkName)
finalSegment.node.Links.push({
Name: linkName,
Hash: child.Hash,
Tsize: child.Tsize
})
} else if (existingLink.Name?.length === 2) {
throw new Error('Existing link was subshard?!')
} else {
// conflict, add a new HAMT segment
log('prefix %s already exists, creating new subshard', prefix)
// find the sibling we are going to replace
const index = finalSegment.node.Links.findIndex(l => l.Name?.startsWith(prefix))
const sibling = finalSegment.node.Links.splice(index, 1)[0]

// give the sibling a new HAMT prefix
const siblingName = (sibling.Name ?? '').substring(2)
const wrapped = wrapHash(hamtHashFn)
const siblingHash = wrapped(uint8ArrayFromString(siblingName))

// discard hash bits until we reach the subshard depth
for (let i = 0; i < path.length; i++) {
await siblingHash.take(hamtBucketBits)
}

break
while (true) {
const siblingIndex = await siblingHash.take(hamtBucketBits)
const siblingPrefix = toPrefix(siblingIndex)
sibling.Name = `${siblingPrefix}${siblingName}`

// calculate the target file's HAMT prefix in the new sub-shard
const newIndex = await hash.take(hamtBucketBits)
const newPrefix = toPrefix(newIndex)

if (siblingPrefix === newPrefix) {
// the two sibling names have caused another conflict - add an intermediate node to
// the HAMT and try again

// create the child locations
const children = new SparseArray()
children.set(newIndex, true)

path.push({
prefix: newPrefix,
children,
node: {
Links: []
}
})

continue
}

// create the child locations
const children = new SparseArray()
children.set(newIndex, true)
children.set(siblingIndex, true)

// add our new segment
path.push({
prefix,
children,
node: {
Links: [
sibling, {
Name: `${newPrefix}${child.Name}`,
Hash: child.Hash,
Tsize: child.Tsize
}
]
}
})

break
}
}
} else {
log('link %s was not present in sub-shard', linkName)

const nextSegment = path[index]

// add next levels worth of links to bucket
await addLinksToHamtBucket(blockstore, subShard.Links, nextSegment.bucket, rootBucket, options)

nextSegment.node = subShard
}

// finally add the new file into the shard
await shard._bucket.put(child.Name, {
size: BigInt(child.Tsize),
cid: child.Hash
})

return {
shard, path
}
}

export interface BucketPath {
bucket: Bucket<any>
prefix: string
node?: PBNode
}

const toBucketPath = (position: { pos: number, bucket: Bucket<any> }): BucketPath[] => {
const path = [{
bucket: position.bucket,
prefix: toPrefix(position.pos)
}]

let bucket = position.bucket._parent
let positionInBucket = position.bucket._posAtParent

while (bucket != null) {
path.push({
bucket,
prefix: toPrefix(positionInBucket)
})
// add new link to shard
child.Name = linkName
finalSegment.node.Links.push(child)
finalSegment.children.set(index, true)

positionInBucket = bucket._posAtParent
bucket = bucket._parent
log('adding %s to existing sub-shard', linkName)
}

path.reverse()

return path
return await updateShardedDirectory(path, blockstore, options)
}
Loading

0 comments on commit 52d4786

Please sign in to comment.