Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

WIP: feat/pubsub #471

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
},
"dependencies": {
"async": "^2.1.4",
"bl": "^1.1.2",
"bs58": "^3.1.0",
"concat-stream": "^1.6.0",
"detect-node": "^2.0.3",
Expand All @@ -43,6 +42,7 @@
"peer-id": "^0.8.1",
"peer-info": "^0.8.1",
"promisify-es6": "^1.0.2",
"pump": "^1.0.1",
"qs": "^6.3.0",
"readable-stream": "1.1.14",
"stream-http": "^2.5.0",
Expand All @@ -64,7 +64,7 @@
"gulp": "^3.9.1",
"hapi": "^16.0.1",
"interface-ipfs-core": "^0.22.1",
"ipfsd-ctl": "^0.17.0",
"@haad/ipfsd-ctl": "^0.18.0-beta.5",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be updated

"pre-commit": "^1.2.0",
"socket.io": "^1.7.1",
"socket.io-client": "^1.7.1",
Expand Down Expand Up @@ -117,4 +117,4 @@
"url": "https://github.com/ipfs/js-ipfs-api/issues"
},
"homepage": "https://github.com/ipfs/js-ipfs-api"
}
}
18 changes: 10 additions & 8 deletions src/api/add.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
'use strict'

const isStream = require('isstream')
const addToDagNodesTransform = require('../add-to-dagnode-transform')
const promisify = require('promisify-es6')
const DAGNodeStream = require('../dagnode-stream')

module.exports = (send) => {
return promisify((files, callback) => {
const good = Buffer.isBuffer(files) ||
const ok = Buffer.isBuffer(files) ||
isStream.isReadable(files) ||
Array.isArray(files)

if (!good) {
callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
if (!ok) {
return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
}

const sendWithTransform = send.withTransform(addToDagNodesTransform)

return sendWithTransform({
const request = {
path: 'add',
files: files
}, callback)
}

// Transform the response stream to DAGNode values
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
send.andTransform(request, transform, callback)
})
}
55 changes: 30 additions & 25 deletions src/api/block.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
'use strict'

const promisify = require('promisify-es6')
const bl = require('bl')
const Block = require('ipfs-block')
const multihash = require('multihashes')
const CID = require('cids')
const streamToValue = require('../stream-to-value')

module.exports = (send) => {
return {
Expand All @@ -21,25 +21,27 @@ module.exports = (send) => {
opts = {}
}

return send({
path: 'block/get',
args: args,
qs: opts
}, (err, res) => {
if (err) {
return callback(err)
}
// Transform the response from Buffer or a Stream to a Block
const transform = (res, callback) => {
if (Buffer.isBuffer(res)) {
callback(null, new Block(res))
} else {
res.pipe(bl((err, data) => {
streamToValue(res, (err, data) => {
if (err) {
return callback(err)
}
callback(null, new Block(data))
}))
})
}
})
}

const request = {
path: 'block/get',
args: args,
qs: opts
}

send.andTransform(request, transform, callback)
}),
stat: promisify((args, opts, callback) => {
// TODO this needs to be adjusted with the new go-ipfs http-api
Expand All @@ -51,19 +53,22 @@ module.exports = (send) => {
callback = opts
opts = {}
}
return send({

const request = {
path: 'block/stat',
args: args,
qs: opts
}, (err, stats) => {
if (err) {
return callback(err)
}
}

// Transform the response from { Key, Size } objects to { key, size } objects
const transform = (stats, callback) => {
callback(null, {
key: stats.Key,
size: stats.Size
})
})
}

send.andTransform(request, transform, callback)
}),
put: promisify((block, cid, callback) => {
// TODO this needs to be adjusted with the new go-ipfs http-api
Expand All @@ -81,15 +86,15 @@ module.exports = (send) => {
block = block.data
}

return send({
const request = {
path: 'block/put',
files: block
}, (err, blockInfo) => {
if (err) {
return callback(err)
}
callback(null, new Block(block))
})
}

// Transform the response to a Block
const transform = (blockInfo, callback) => callback(null, new Block(block))

send.andTransform(request, transform, callback)
})
}
}
7 changes: 5 additions & 2 deletions src/api/dht.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const promisify = require('promisify-es6')
const streamToValue = require('../stream-to-value')

module.exports = (send) => {
return {
Expand All @@ -19,11 +20,13 @@ module.exports = (send) => {
opts = {}
}

send({
const request = {
path: 'dht/findprovs',
args: args,
qs: opts
}, callback)
}

send.andTransform(request, streamToValue, callback)
}),
get: promisify((key, opts, callback) => {
if (typeof opts === 'function' &&
Expand Down
15 changes: 8 additions & 7 deletions src/api/get.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
'use strict'

const tarStreamToObjects = require('../tar-stream-to-objects')
const cleanMultihash = require('../clean-multihash')
const promisify = require('promisify-es6')
const cleanMultihash = require('../clean-multihash')
const TarStreamToObjects = require('../tar-stream-to-objects')

module.exports = (send) => {
return promisify(function get (path, opts, callback) {
return promisify((path, opts, callback) => {
if (typeof opts === 'function' &&
!callback) {
callback = opts
Expand All @@ -26,12 +26,13 @@ module.exports = (send) => {
return callback(err)
}

var sendWithTransform = send.withTransform(tarStreamToObjects)

sendWithTransform({
const request = {
path: 'get',
args: path,
qs: opts
}, callback)
}

// Convert the response stream to TarStream objects
send.andTransform(request, TarStreamToObjects, callback)
})
}
4 changes: 3 additions & 1 deletion src/api/log.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'

const pump = require('pump')
const ndjson = require('ndjson')
const promisify = require('promisify-es6')

Expand All @@ -12,7 +13,8 @@ module.exports = (send) => {
if (err) {
return callback(err)
}
callback(null, response.pipe(ndjson.parse()))
const outputStream = pump(response, ndjson.parse())
callback(null, outputStream)
})
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/api/object.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const DAGNode = dagPB.DAGNode
const DAGLink = dagPB.DAGLink
const promisify = require('promisify-es6')
const bs58 = require('bs58')
const bl = require('bl')
const streamToValue = require('../stream-to-value')
const cleanMultihash = require('../clean-multihash')
const LRU = require('lru-cache')
const lruOptions = {
Expand Down Expand Up @@ -188,7 +188,7 @@ module.exports = (send) => {
}

if (typeof result.pipe === 'function') {
result.pipe(bl(callback))
streamToValue(result, callback)
} else {
callback(null, result)
}
Expand Down
32 changes: 25 additions & 7 deletions src/api/ping.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
'use strict'

const promisify = require('promisify-es6')
const streamToValue = require('../stream-to-value')

module.exports = (send) => {
return promisify((id, callback) => {
send({
const request = {
path: 'ping',
args: id,
qs: { n: 1 }
}, function (err, res) {
if (err) {
return callback(err, null)
}
callback(null, res[1])
})
}

// Transform the response stream to a value:
// { Success: <boolean>, Time: <number>, Text: <string> }
const transform = (res, callback) => {
streamToValue(res, (err, res) => {
if (err) {
return callback(err)
}

// go-ipfs http api currently returns 3 lines for a ping.
// they're a little messed, so take the correct values from each lines.
const pingResult = {
Success: res[1].Success,
Time: res[1].Time,
Text: res[2].Text
}

callback(null, pingResult)
})
}

send.andTransform(request, transform, callback)
})
}
Loading