Skip to content

Commit

Permalink
feat: factor notifications out
Browse files Browse the repository at this point in the history
fix: getMany call
feat: add id to loggers
feat(engine): improve message sending
test: skip webworker tests
  • Loading branch information
dignifiedquire authored and daviddias committed Aug 25, 2017
1 parent 84add4d commit f17ecfa
Show file tree
Hide file tree
Showing 25 changed files with 529 additions and 318 deletions.
31 changes: 31 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
version: 2
jobs:
build:
working_directory: ~/js-ipfs-bitswap
docker:
- image: circleci/node:6-browsers
environment:
CHROME_BIN: "/usr/bin/google-chrome"
steps:
- checkout
- restore_cache:
key: dependency-cache-{{ checksum "package.json" }}
- run:
name: install-deps
command: npm install
- save_cache:
key: dependency-cache-{{ checksum "package.json" }}
paths:
- ./node_modules
- run:
name: lint
command: npm run lint
- run:
name: test:node
command: npm run test:node
- run:
name: test:browser
command: npm run test:browser
- run:
name: coverage
command: npm run coverage
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# While testing new npm
package-lock.json
yarn.lock

# Logs
logs
Expand Down
33 changes: 1 addition & 32 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,32 +1 @@
sudo: false
language: node_js

matrix:
include:
- node_js: 6
env: CXX=g++-4.8
- node_js: 8
env: CXX=g++-4.8
# - node_js: stable
# env: CXX=g++-4.8

script:
- npm run lint
- npm run test
- npm run coverage
- make test

before_script:
- export DISPLAY=:99.0
- sh -e /etc/init.d/xvfb start

after_success:
- npm run coverage-publish

addons:
firefox: 'latest'
apt:
sources:
- ubuntu-toolchain-r-test
packages:
- g++-4.8
NO, JUST NO
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ src
│   └── ledger.js
├── index.js
├── network.js # Handles peerSet and open new conns
├── notifications.js # Handles tracking of incomning blocks and wants/unwants.
├─── want-manager # Keeps track of all blocks the peer (self) wants
│   ├── index.js
│   └── msg-queue.js # Messages to send queue, one per peer
Expand Down
14 changes: 0 additions & 14 deletions circle.yml

This file was deleted.

23 changes: 11 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@
"./test/utils/create-temp-repo-nodejs.js": "./test/utils/create-temp-repo-browser.js"
},
"scripts": {
"test": "aegir-test",
"test:browser": "aegir-test browser",
"test:node": "aegir-test node",
"lint": "aegir-lint",
"release": "aegir-release --docs",
"release-minor": "aegir-release --type minor --docs",
"release-major": "aegir-release --type major --docs",
"test": "aegir test --target node --target browser",
"test:browser": "aegir test --target browser",
"test:node": "aegir test --target node",
"lint": "aegir lint",
"release": "aegir release",
"release-minor": "aegir release --type minor",
"release-major": "aegir release --type major",
"bench": "node benchmarks/index",
"build": "aegir-build",
"coverage": "aegir-coverage",
"coverage-publish": "aegir-coverage publish",
"docs": "aegir-docs"
"build": "aegir build",
"coverage": "aegir coverage -u",
"docs": "aegir docs"
},
"repository": {
"type": "git",
Expand All @@ -38,7 +37,7 @@
},
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"aegir": "^11.0.2",
"aegir": "ipfs/aegir",
"benchmark": "^2.1.4",
"chai": "^4.1.1",
"dirty-chai": "^2.0.1",
Expand Down
62 changes: 30 additions & 32 deletions src/decision-engine/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict'

const debug = require('debug')
const each = require('async/each')
const eachSeries = require('async/eachSeries')
const waterfall = require('async/waterfall')
Expand All @@ -14,17 +13,16 @@ const values = require('lodash.values')
const groupBy = require('lodash.groupby')
const pullAllWith = require('lodash.pullallwith')

const log = debug('bitswap:engine')
log.error = debug('bitswap:engine:error')

const Message = require('../types/message')
const Wantlist = require('../types/wantlist')
const Ledger = require('./ledger')
const logger = require('../utils').logger

const MAX_MESSAGE_SIZE = 512 * 1024

class DecisionEngine {
constructor (blockstore, network) {
constructor (peerId, blockstore, network) {
this._log = logger(peerId, 'engine')
this.blockstore = blockstore
this.network = network

Expand All @@ -38,28 +36,38 @@ class DecisionEngine {
this._outbox = debounce(this._processTasks.bind(this), 100)
}

_sendBlocks (env, cb) {
_sendBlocks (peer, blocks, cb) {
// split into messges of max 512 * 1024 bytes
const blocks = env.blocks
const total = blocks.reduce((acc, b) => {
return acc + b.data.byteLength
}, 0)

if (total < MAX_MESSAGE_SIZE) {
return this._sendSafeBlocks(env.peer, blocks, cb)
return this._sendSafeBlocks(peer, blocks, cb)
}

let size = 0
let batch = []
let outstanding = blocks.length

eachSeries(blocks, (b, cb) => {
outstanding--
batch.push(b)
size += b.data.byteLength

if (size >= MAX_MESSAGE_SIZE) {
if (size >= MAX_MESSAGE_SIZE ||
// need to ensure the last remaining items get sent
outstanding === 0) {
const nextBatch = batch.slice()
batch = []
this._sendSafeBlocks(env.peer, nextBatch, cb)
this._sendSafeBlocks(peer, nextBatch, (err) => {
if (err) {
this._log('sendblock error: %s', err.message)
}
// not returning the error, so we send as much as we can
// as otherwise `eachSeries` would cancel
cb()
})
} else {
cb()
}
Expand All @@ -68,18 +76,9 @@ class DecisionEngine {

_sendSafeBlocks (peer, blocks, cb) {
const msg = new Message(false)
blocks.forEach((b) => msg.addBlock(b))

blocks.forEach((b) => {
msg.addBlock(b)
})

// console.log('sending %s blocks', msg.blocks.size)
this.network.sendMessage(peer, msg, (err) => {
if (err) {
log('sendblock error: %s', err.message)
}
cb()
})
this.network.sendMessage(peer, msg, cb)
}

_processTasks () {
Expand All @@ -105,23 +104,22 @@ class DecisionEngine {
return find(blocks, (b) => b.cid.equals(cid))
})

this._sendBlocks({
peer: peer,
blocks: blockList
}, (err) => {
this._sendBlocks(peer, blockList, (err) => {
if (err) {
log.error('failed to send', err)
// `_sendBlocks` actually doesn't return any errors
this._log.error('should never happen: ', err)
} else {
blockList.forEach((block) => this.messageSent(peer, block))
}
blockList.forEach((block) => {
this.messageSent(peer, block)
})

cb()
})
})
], (err) => {
this._tasks = []

if (err) {
log.error(err)
this._log.error(err)
}
})
}
Expand Down Expand Up @@ -208,7 +206,7 @@ class DecisionEngine {
// If we already have the block, serve it
this.blockstore.has(entry.cid, (err, exists) => {
if (err) {
log.error('failed existence check')
this._log.error('failed existence check')
} else if (exists) {
this._tasks.push({
entry: entry.entry,
Expand All @@ -226,7 +224,7 @@ class DecisionEngine {
_processBlocks (blocks, ledger, callback) {
const cids = []
blocks.forEach((b, cidStr) => {
log('got block (%s bytes)', b.data.length)
this._log('got block (%s bytes)', b.data.length)
ledger.receivedBytes(b.data.length)
cids.push(b.cid)
})
Expand Down
Loading

0 comments on commit f17ecfa

Please sign in to comment.