Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: add locking for concurrent pin operations
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jun 11, 2019
1 parent 280f987 commit 5348a50
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 136 deletions.
41 changes: 18 additions & 23 deletions src/core/components/pin.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ function toB58String (hash) {

module.exports = (self) => {
const dag = self.dag
const pinManager = new PinManager(self._repo, dag, self.log)
const pinManager = new PinManager(self._repo, dag, self._options.repoOwner, self.log)

const pin = {
add: promisify((paths, options, callback) => {
Expand All @@ -43,7 +43,7 @@ module.exports = (self) => {
if (recursive) {
if (pinManager.recursivePins.has(key)) {
// it's already pinned recursively
return cb(null, key)
return cb(null, null)
}

// entire graph of nested links should be pinned,
Expand All @@ -60,7 +60,7 @@ module.exports = (self) => {
}
if (pinManager.directPins.has(key)) {
// already directly pinned
return cb(null, key)
return cb(null, null)
}

// make sure we have the object
Expand All @@ -73,15 +73,20 @@ module.exports = (self) => {
}, (err, results) => {
if (err) { return pinComplete(err) }

// update the pin sets in memory
const pinset = recursive ? pinManager.recursivePins : pinManager.directPins
results.forEach(key => pinset.add(key))

// persist updated pin sets to datastore
pinManager.flushPins((err, root) => {
const flushComplete = (err) => {
if (err) { return pinComplete(err) }
pinComplete(null, results.map(hash => ({ hash })))
})
pinComplete(null, mhs.map(mh => ({ hash: toB58String(mh) })))
}

// each result is either a key or null if there is already a pin
results = results.filter(Boolean)
if (!results.length) { return flushComplete() }

if (recursive) {
pinManager.addRecursivePins(results, flushComplete)
} else {
pinManager.addDirectPins(results, flushComplete)
}
})
}

Expand Down Expand Up @@ -143,20 +148,10 @@ module.exports = (self) => {
}, (err, results) => {
if (err) { return lockCb(err) }

// update the pin sets in memory
results.forEach(key => {
if (recursive && pinManager.recursivePins.has(key)) {
pinManager.recursivePins.delete(key)
} else {
pinManager.directPins.delete(key)
}
})

// persist updated pin sets to datastore
pinManager.flushPins((err, root) => {
pinManager.rmPins(results, recursive, (err) => {
if (err) { return lockCb(err) }
self.log(`Removed pins: ${results}`)
lockCb(null, results.map(hash => ({ hash })))
lockCb(null, mhs.map(mh => ({ hash: toB58String(mh) })))
})
})
}, callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@
const mortice = require('mortice')
const pull = require('pull-stream')
const EventEmitter = require('events')
const log = require('debug')('ipfs:gc:lock')
const debug = require('debug')

class GCLock extends EventEmitter {
constructor (repoOwner) {
class Lock extends EventEmitter {
constructor (repoOwner, debugName) {
super()

// Ensure that we get a different mutex for each instance of GCLock
// (There should only be one GCLock instance per IPFS instance, but
// there may be multiple IPFS instances, eg in unit tests)
// Ensure that we get a different mutex for each instance of Lock
const randId = (~~(Math.random() * 1e9)).toString(36) + Date.now()
this.mutex = mortice(randId, {
singleProcess: repoOwner
})

this.lockId = 0
this.log = debug(debugName || 'lock')
}

readLock (lockedFn, cb) {
Expand All @@ -37,14 +36,14 @@ class GCLock extends EventEmitter {
}

const lockId = this.lockId++
log(`[${lockId}] ${type} requested`)
this.log(`[${lockId}] ${type} requested`)
this.emit(`${type} request`, lockId)
const locked = () => new Promise((resolve, reject) => {
this.emit(`${type} start`, lockId)
log(`[${lockId}] ${type} started`)
this.log(`[${lockId}] ${type} started`)
lockedFn((err, res) => {
this.emit(`${type} release`, lockId)
log(`[${lockId}] ${type} released`)
this.log(`[${lockId}] ${type} released`)
err ? reject(err) : resolve(res)
})
})
Expand All @@ -62,7 +61,7 @@ class GCLock extends EventEmitter {
}

pullLock (type, lockedPullFn) {
const pullLocker = new PullLocker(this, this.mutex, type, this.lockId++)
const pullLocker = new PullLocker(this, this.mutex, type, this.lockId++, this.log)

return pull(
pullLocker.take(),
Expand All @@ -73,11 +72,12 @@ class GCLock extends EventEmitter {
}

class PullLocker {
constructor (emitter, mutex, type, lockId) {
constructor (emitter, mutex, type, lockId, log) {
this.emitter = emitter
this.mutex = mutex
this.type = type
this.lockId = lockId
this.log = log

// This Promise resolves when the mutex gives us permission to start
// running the locked piece of code
Expand All @@ -91,7 +91,7 @@ class PullLocker {
return new Promise((resolve, reject) => {
this.releaseLock = (err) => err ? reject(err) : resolve()

log(`[${this.lockId}] ${this.type} (pull) started`)
this.log(`[${this.lockId}] ${this.type} (pull) started`)
this.emitter.emit(`${this.type} start`, this.lockId)

// The locked piece of code is ready to start, so resolve the
Expand All @@ -106,7 +106,7 @@ class PullLocker {
return pull(
pull.asyncMap((i, cb) => {
if (!this.lock) {
log(`[${this.lockId}] ${this.type} (pull) requested`)
this.log(`[${this.lockId}] ${this.type} (pull) requested`)
this.emitter.emit(`${this.type} request`, this.lockId)
// Request the lock
this.lock = this.mutex[this.type](() => this.locked())
Expand All @@ -125,11 +125,11 @@ class PullLocker {
// Releases the lock
release () {
return pull.through(null, (err) => {
log(`[${this.lockId}] ${this.type} (pull) released`)
this.log(`[${this.lockId}] ${this.type} (pull) released`)
this.emitter.emit(`${this.type} release`, this.lockId)
this.releaseLock(err)
})
}
}

module.exports = GCLock
module.exports = Lock
Loading

0 comments on commit 5348a50

Please sign in to comment.