Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

refactor: convert repo API to async/await #2674

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/core/components/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
exports.add = require('./add')
exports.config = require('./config')
exports.init = require('./init')
exports.repo = {
gc: require('./repo/gc'),
stat: require('./repo/stat'),
version: require('./repo/version')
}
exports.start = require('./start')
exports.stop = require('./stop')

Expand Down
10 changes: 10 additions & 0 deletions src/core/components/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,16 @@ function createApi ({
add,
config: Commands.config({ repo }),
init: () => { throw new AlreadyInitializedError() },
repo: {
// TODO: gc should be available after init
// `resolve` (passed to `refs` API) which is a dependency for `gc` API
// needs to be altered to allow `name` API dependency to be optional, so
// that `resolve` can also be available when not started, and so `gc` can
// be run when not started.
// gc: Commands.repo.gc({ gcLock, pin, pinManager, refs, repo }),
stat: Commands.repo.stat({ repo }),
version: Commands.repo.version({ repo })
},
start
}

Expand Down
153 changes: 0 additions & 153 deletions src/core/components/pin/gc.js

This file was deleted.

114 changes: 114 additions & 0 deletions src/core/components/repo/gc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
'use strict'

const CID = require('cids')
const { cidToString } = require('../../../utils/cid')
const log = require('debug')('ipfs:repo:gc')
const { MFS_ROOT_KEY } = require('ipfs-mfs')
const Repo = require('ipfs-repo')
const { Errors } = require('interface-datastore')
const ERR_NOT_FOUND = Errors.notFoundError().code
const { parallelMerge, transform } = require('streaming-iterables')

// Limit on the number of parallel block remove operations
const BLOCK_RM_CONCURRENCY = 256

// Perform mark and sweep garbage collection
module.exports = ({ gcLock, pin, pinManager, refs, repo }) => {
return async function * gc () {
const start = Date.now()
log('Creating set of marked blocks')

const release = await gcLock.writeLock()

try {
// Mark all blocks that are being used
const markedSet = await createMarkedSet({ pin, pinManager, repo })
// Get all blocks keys from the blockstore
const blockKeys = repo.blocks.query({ keysOnly: true })

// Delete blocks that are not being used
yield * deleteUnmarkedBlocks({ repo, refs }, markedSet, blockKeys)

log(`Complete (${Date.now() - start}ms)`)
} finally {
release()
}
}
}

// Get Set of CIDs of blocks to keep
async function createMarkedSet ({ pin, pinManager, refs, repo }) {
const pinsSource = async function * () {
for await (const { hash } of pin.ls()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use streaming-iterables map here?

yield hash
}
}

const pinInternalsSource = async function * () {
const cids = await pinManager.getInternalBlocks()
yield * cids
}

const mfsSource = async function * () {
const mh = await repo.root.get(MFS_ROOT_KEY)
const rootCid = new CID(mh)
yield rootCid
try {
for await (const { ref } of refs(rootCid, { recursive: true })) {
yield new CID(ref)
}
} catch (err) {
if (err.code === ERR_NOT_FOUND) {
log('No blocks in MFS')
return
}
throw err
}
}

const output = new Set()
for await (const cid of parallelMerge(pinsSource, pinInternalsSource, mfsSource)) {
output.add(cidToString(cid, { base: 'base32' }))
}
return output
}

// Delete all blocks that are not marked as in use
async function * deleteUnmarkedBlocks ({ repo, refs }, markedSet, blockKeys) {
// Iterate through all blocks and find those that are not in the marked set
// blockKeys yields { key: Key() }
let blocksCount = 0
let removedBlocksCount = 0

const removeBlock = async ({ key: k }) => {
blocksCount++

try {
const cid = Repo.utils.blockstore.keyToCid(k)
const b32 = cid.toV1().toString('base32')
if (markedSet.has(b32)) return null
const res = { cid }

try {
await repo.blocks.delete(cid)
removedBlocksCount++
} catch (err) {
res.err = new Error(`Could not delete block with CID ${cid}: ${err.message}`)
}

return res
} catch (err) {
const msg = `Could not convert block with key '${k}' to CID`
log(msg, err)
return { err: new Error(msg + `: ${err.message}`) }
}
}

for await (const res of transform(BLOCK_RM_CONCURRENCY, removeBlock, blockKeys)) {
// filter nulls (blocks that were retained)
if (res) yield res
}

log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blocksCount} blocks. ` +
`Deleted ${removedBlocksCount} blocks.`)
}
15 changes: 15 additions & 0 deletions src/core/components/repo/stat.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict'

module.exports = ({ repo }) => {
return async function stat () {
const stats = await repo.stat()

return {
numObjects: stats.numObjects,
repoSize: stats.repoSize,
repoPath: stats.repoPath,
version: stats.version.toString(),
storageMax: stats.storageMax
}
}
}
33 changes: 33 additions & 0 deletions src/core/components/repo/version.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

const { repoVersion } = require('ipfs-repo')

module.exports = ({ repo }) => {
/**
* If the repo has been initialized, report the current version.
* Otherwise report the version that would be initialized.
*
* @returns {number}
*/
return async function version () {
try {
await repo._checkInitialized()
} catch (err) {
// TODO: (dryajov) This is really hacky, there must be a better way
const match = [
/Key not found in database \[\/version\]/,
/ENOENT/,
/repo is not initialized yet/
].some((m) => {
return m.test(err.message)
})
if (match) {
// this repo has not been initialized
return repoVersion
}
throw err
}

return repo.version.get()
}
}
8 changes: 8 additions & 0 deletions src/core/components/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ function createApi ({
add,
config: Commands.config({ repo }),
init: () => { throw new AlreadyInitializedError() },
repo: {
// TODO: this PR depends on `refs` refactor and the `pins` refactor
// https://github.com/ipfs/js-ipfs/pull/2658
// https://github.com/ipfs/js-ipfs/pull/2660
// gc: Commands.repo.gc({ gcLock, pin, pinManager, refs, repo }),
stat: Commands.repo.stat({ repo }),
version: Commands.repo.version({ repo })
},
start: () => apiManager.api,
stop
}
Expand Down
10 changes: 10 additions & 0 deletions src/core/components/stop.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ function createApi ({
add,
config: Commands.config({ repo }),
init: () => { throw new AlreadyInitializedError() },
repo: {
// TODO: gc should be available when stopped
// `resolve` (passed to `refs` API) which is a dependency for `gc` API
// needs to be altered to allow `name` API dependency to be optional, so
// that `resolve` can also be available when not started, and so `gc` can
// be run when not started.
// gc: Commands.repo.gc({ gcLock, pin, pinManager, refs, repo }),
stat: Commands.repo.stat({ repo }),
version: Commands.repo.version({ repo })
},
start,
stop: () => apiManager.api
}
Expand Down