Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pinning API #36

Merged
merged 5 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions benchmarks/gc/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"name": "benchmarks-gc",
"version": "1.0.0",
"main": "index.js",
"private": true,
"type": "module",
"scripts": {
"clean": "aegir clean",
"build": "aegir build --bundle false",
"lint": "aegir lint",
"dep-check": "aegir dep-check",
"start": "npm run build && node dist/src/index.js"
},
"devDependencies": {
"@chainsafe/libp2p-noise": "^11.0.0",
"@chainsafe/libp2p-yamux": "^3.0.5",
"@ipld/dag-pb": "^4.0.2",
"@libp2p/websockets": "^5.0.3",
"aegir": "^38.1.5",
"blockstore-datastore-adapter": "^5.0.0",
"datastore-core": "^8.0.4",
"datastore-fs": "^8.0.0",
"datastore-level": "^9.0.4",
"execa": "^7.0.0",
"go-ipfs": "^0.18.1",
"helia": "~0.0.0",
"ipfs-core": "^0.18.0",
"ipfsd-ctl": "^13.0.0",
"it-all": "^2.0.0",
"it-drain": "^2.0.0",
"kubo-rpc-client": "^3.0.1",
"libp2p": "^0.42.2",
"multiformats": "^11.0.1",
"tinybench": "^2.3.1"
}
}
47 changes: 47 additions & 0 deletions benchmarks/gc/src/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# GC Benchmark

Benchmarks Helia GC performance against js-ipfs and Kubo

- Removes any existing pins
- Creates 10000 DAGs with two nodes linked to by a root node that is pinned
- Creates 10000 unpinned blocks
- Runs GC to delete the unpinned blocks leaving the others intact

All three implementations use on-disk block/datastores to ensure a reasonable basis for comparison.

Warning! It can take a long time with realistic pinset sizes - on the order of a whole day.

To run:

1. Add `benchmarks/*` to the `workspaces` entry in the root `package.json` of this repo
3. Run
```console
$ npm run reset
$ npm i
$ npm run build
$ cd benchmarks/gc
$ npm start

> benchmarks-gc@1.0.0 start
> npm run build && node dist/src/index.js


> benchmarks-gc@1.0.0 build
> aegir build --bundle false

[14:51:28] tsc [started]
[14:51:33] tsc [completed]
generating Ed25519 keypair...
┌─────────┬────────────────┬─────────┬───────────┬──────┐
│ (index) │ Implementation │ ops/s │ ms/op │ runs │
├─────────┼────────────────┼─────────┼───────────┼──────┤
//... results here
```

## Graph

To output stats for a graph run:

```console
$ npm run build && node dist/src/graph.js
```
18 changes: 18 additions & 0 deletions benchmarks/gc/src/graph.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { execa } from 'execa'

const ITERATIONS = 2
const INCREMENT = 1000
const MAX = 10000

for (let i = 1; i <= MAX / INCREMENT; i ++) {
await execa('node', ['dist/src/index.js'], {
env: {
...process.env,
INCREMENT: (i * INCREMENT).toString(),
ITERATIONS: ITERATIONS.toString(),
ITERATION: i.toString()
},
stdout: 'inherit',
stderr: 'inherit'
})
}
86 changes: 86 additions & 0 deletions benchmarks/gc/src/helia.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { createHelia, DAGWalker } from 'helia'
import { createLibp2p } from 'libp2p'
import { webSockets } from '@libp2p/websockets'
import { noise } from '@chainsafe/libp2p-noise'
import { yamux } from '@chainsafe/libp2p-yamux'
import type { GcBenchmark } from './index.js'
import * as dagPb from '@ipld/dag-pb'
import all from 'it-all'
import os from 'node:os'
import path from 'node:path'
import { LevelDatastore } from 'datastore-level'
import { BlockstoreDatastoreAdapter } from 'blockstore-datastore-adapter'
import { ShardingDatastore } from 'datastore-core/sharding'
import { NextToLast } from 'datastore-core/shard'
import { FsDatastore } from 'datastore-fs'
import drain from 'it-drain'

const dagPbWalker: DAGWalker = {
codec: dagPb.code,
async * walk (block) {
const node = dagPb.decode(block)

yield * node.Links.map(l => l.Hash)
}
}

export async function createHeliaBenchmark (): Promise<GcBenchmark> {
const repoPath = path.join(os.tmpdir(), `helia-${Math.random()}`)

const helia = await createHelia({
blockstore: new BlockstoreDatastoreAdapter(
new ShardingDatastore(
new FsDatastore(`${repoPath}/blocks`, {
extension: '.data'
}),
new NextToLast(2)
)
),
datastore: new LevelDatastore(`${repoPath}/data`),
libp2p: await createLibp2p({
transports: [
webSockets()
],
connectionEncryption: [
noise()
],
streamMuxers: [
yamux()
]
}),
dagWalkers: [
dagPbWalker
],
start: false
})

return {
async gc () {
await helia.gc()
},
async putBlocks (blocks) {
await drain(helia.blockstore.putMany(blocks))
},
async pin (cid) {
await helia.pins.add(cid)
},
async teardown () {
await helia.stop()
},
async clearPins () {
const pins = await all(helia.pins.ls())

for (const pin of pins) {
await helia.pins.rm(pin.cid)
}

return pins.length
},
isPinned: (cid) => {
return helia.pins.isPinned(cid)
},
hasBlock: (cid) => {
return helia.blockstore.has(cid)
}
}
}
193 changes: 193 additions & 0 deletions benchmarks/gc/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import { Bench } from 'tinybench'
import { CID } from 'multiformats/cid'
import { createHeliaBenchmark } from './helia.js'
import { createIpfsBenchmark } from './ipfs.js'
import { createKuboBenchmark } from './kubo.js'
import * as dagPb from '@ipld/dag-pb'
import crypto from 'node:crypto'
import { sha256 } from 'multiformats/hashes/sha2'

const PINNED_DAG_COUNT = parseInt(process.env.INCREMENT ?? '10000')
const GARBAGE_BLOCK_COUNT = parseInt(process.env.INCREMENT ?? '10000')
const ITERATIONS = parseInt(process.env.ITERATIONS ?? '5')
const RESULT_PRECISION = 2

export interface GcBenchmark {
gc: () => Promise<void>
teardown: () => Promise<void>
pin: (cid: CID ) => Promise<void>
putBlocks: (blocks: Array<{ key: CID, value: Uint8Array }>) => Promise<void>
clearPins: () => Promise<number>
isPinned: (cid: CID) => Promise<boolean>
hasBlock: (cid: CID) => Promise<boolean>
}

const blocks: Array<{ key: CID, value: Uint8Array }> = []
const garbageBlocks: Array<{ key: CID, value: Uint8Array }> = []
const pins: CID[] = []

/**
* Create blocks that will be pinned and/or deleted
*/
async function generateBlocks (): Promise<void> {
// generate DAGs of two blocks linked by a root that will be pinned
for (let i = 0; i < PINNED_DAG_COUNT; i++) {
const block1 = dagPb.encode({
Data: crypto.randomBytes(5),
Links: []
})
const mh1 = await sha256.digest(block1)
const cid1 = CID.createV1(dagPb.code, mh1)

const block2 = dagPb.encode({
Data: crypto.randomBytes(5),
Links: []
})
const mh2 = await sha256.digest(block2)
const cid2 = CID.createV1(dagPb.code, mh2)

const block3 = dagPb.encode({
Links: [{
Hash: cid1,
Tsize: block1.length
}, {
Hash: cid2,
Tsize: block2.length
}]
})
const mh3 = await sha256.digest(block3)
const cid3 = CID.createV1(dagPb.code, mh3)

blocks.push({ key: cid1, value: block1 })
blocks.push({ key: cid2, value: block2 })
blocks.push({ key: cid3, value: block3 })
pins.push(cid3)
}

// generate garbage blocks that will be deleted
for (let i = 0; i < GARBAGE_BLOCK_COUNT; i++) {
const block = dagPb.encode({
Data: crypto.randomBytes(5),
Links: []
})
const mh = await sha256.digest(block)
const cid = CID.createV1(dagPb.code, mh)

garbageBlocks.push({ key: cid, value: block })
}
}

async function addBlocks (benchmark: GcBenchmark): Promise<void> {
// add all the blocks
await benchmark.putBlocks(blocks)
await benchmark.putBlocks(garbageBlocks)
}

async function pinBlocks (benchmark: GcBenchmark): Promise<void> {
// add all of the pins
for (const pin of pins) {
await benchmark.pin(pin)
}
}

const impls: Array<{ name: string, create: () => Promise<GcBenchmark>, results: { gc: number[], clearedPins: number[], addedBlocks: number[], pinnedBlocks: number[] }}> = [{
name: 'helia',
create: () => createHeliaBenchmark(),
results: {
gc: [],
clearedPins: [],
addedBlocks: [],
pinnedBlocks: []
}
}, {
name: 'ipfs',
create: () => createIpfsBenchmark(),
results: {
gc: [],
clearedPins: [],
addedBlocks: [],
pinnedBlocks: []
}
}, {
name: 'kubo',
create: () => createKuboBenchmark(),
results: {
gc: [],
clearedPins: [],
addedBlocks: [],
pinnedBlocks: []
}
}]

async function main (): Promise<void> {
let subject: GcBenchmark

await generateBlocks()

const suite = new Bench({
iterations: ITERATIONS,
time: 1
})

for (const impl of impls) {
suite.add(impl.name, async () => {
const start = Date.now()
await subject.gc()
impl.results.gc.push(Date.now() - start)
}, {
beforeAll: async () => {
subject = await impl.create()
},
beforeEach: async () => {
let start = Date.now()
const pinCount = await subject.clearPins()

if (pinCount > 0) {
impl.results.clearedPins.push(Date.now() - start)
}

start = Date.now()
await addBlocks(subject)
impl.results.addedBlocks.push(Date.now() - start)

start = Date.now()
await pinBlocks(subject)
impl.results.pinnedBlocks.push(Date.now() - start)
},
afterAll: async () => {
await subject.teardown()
}
})
}

await suite.run()

if (process.env.INCREMENT != null) {
if (process.env.ITERATION === '1') {
console.info('implementation, count, clear pins (ms), add blocks (ms), add pins (ms), gc (ms)')
}

for (const impl of impls) {
console.info(
`${impl.name},`,
`${process.env.INCREMENT},`,
`${(impl.results.clearedPins.reduce((acc, curr) => acc + curr, 0) / impl.results.clearedPins.length).toFixed(RESULT_PRECISION)},`,
`${(impl.results.addedBlocks.reduce((acc, curr) => acc + curr, 0) / impl.results.addedBlocks.length).toFixed(RESULT_PRECISION)},`,
`${(impl.results.pinnedBlocks.reduce((acc, curr) => acc + curr, 0) / impl.results.pinnedBlocks.length).toFixed(RESULT_PRECISION)},`,
`${(impl.results.gc.reduce((acc, curr) => acc + curr, 0) / impl.results.gc.length).toFixed(RESULT_PRECISION)}`,
)
}
} else {
console.table(suite.tasks.map(({ name, result }) => ({
'Implementation': name,
'ops/s': result?.hz.toFixed(RESULT_PRECISION),
'ms/op': result?.period.toFixed(RESULT_PRECISION),
'runs': result?.samples.length
})))
}
}

main().catch(err => {
console.error(err) // eslint-disable-line no-console
process.exit(1)
})
Loading