-
Notifications
You must be signed in to change notification settings - Fork 320
Add Websocket#subscribe() and #unsubscribe() #213
Changes from 3 commits
85f714a
187d67c
2fd0dda
e8ea4fe
d99da97
c0fd6ca
df6d38c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,11 @@ class WebsocketClient extends EventEmitter { | |
{ channels = null } = {} | ||
) { | ||
super(); | ||
this.productIDs = Utils.determineProductIDs(productIDs); | ||
this.productIDs = new Set(Utils.determineProductIDs(productIDs)); | ||
this.websocketURI = websocketURI; | ||
this.auth = Utils.checkAuth(auth); | ||
this.channels = channels || ['full']; | ||
if (!this.channels.includes('heartbeat')) { | ||
this.channels.push('heartbeat'); | ||
} | ||
this.channels = new Set(channels || ['full']); | ||
this.channels.add('heartbeat'); | ||
this.connect(); | ||
} | ||
|
||
|
@@ -48,26 +46,37 @@ class WebsocketClient extends EventEmitter { | |
this.socket = null; | ||
} | ||
|
||
onOpen() { | ||
this.emit('open'); | ||
_sendSubscription(type, { product_ids, channels }) { | ||
const message = { type }; | ||
|
||
if (channels) { | ||
message.channels = [...channels]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we do this in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the copies were because Sets were being used. I'll kill 'em. |
||
} | ||
|
||
const subscribeMessage = { | ||
type: 'subscribe', | ||
product_ids: this.productIDs, | ||
channels: this.channels, | ||
}; | ||
if (product_ids) { | ||
message.product_ids = [...product_ids]; | ||
} | ||
|
||
// Add Signature | ||
if (this.auth.secret) { | ||
let sig = signRequest( | ||
this.auth, | ||
'GET', | ||
this.channels ? '/users/self/verify' : '/users/self' | ||
); | ||
Object.assign(subscribeMessage, sig); | ||
const sig = signRequest(this.auth, 'GET', '/users/self/verify'); | ||
Object.assign(message, sig); | ||
} | ||
|
||
this.socket.send(JSON.stringify(subscribeMessage)); | ||
this.socket.send(JSON.stringify(message)); | ||
} | ||
|
||
subscribe({ product_ids, channels }) { | ||
this._sendSubscription('subscribe', { product_ids, channels }); | ||
} | ||
|
||
unsubscribe({ product_ids, channels }) { | ||
this._sendSubscription('unsubscribe', { product_ids, channels }); | ||
} | ||
|
||
onOpen() { | ||
this.emit('open'); | ||
this.subscribe({ product_ids: this.productIDs, channels: this.channels }); | ||
} | ||
|
||
onClose() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,15 +31,17 @@ class OrderbookSync extends WebsocketClient { | |
this._client = new PublicClient(this.apiURI); | ||
} | ||
|
||
this.productIDs.forEach(productID => { | ||
this._queues[productID] = []; | ||
this._sequences[productID] = -2; | ||
this.books[productID] = new Orderbook(); | ||
}); | ||
this.productIDs.forEach(this._newProduct, this); | ||
|
||
this.on('message', this.processMessage.bind(this)); | ||
} | ||
|
||
_newProduct(productID) { | ||
this._queues[productID] = []; | ||
this._sequences[productID] = -2; | ||
this.books[productID] = new Orderbook(); | ||
} | ||
|
||
loadOrderbook(productID) { | ||
if (!this.books[productID]) { | ||
return; | ||
|
@@ -67,8 +69,21 @@ class OrderbookSync extends WebsocketClient { | |
.catch(problems); | ||
} | ||
|
||
// subscriptions changed -- possible new products | ||
_newSubscription(data) { | ||
const channel = data.channels.find(c => c.name === 'full'); | ||
channel && channel.product_ids | ||
.filter(productID => !(productID in this.books)) | ||
.forEach(productID => this._newProduct(productID)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use same pattern as above ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dig it. Good eye. Done. |
||
} | ||
|
||
processMessage(data) { | ||
const { product_id } = data; | ||
const { type, product_id } = data; | ||
|
||
if (type === 'subscriptions') { | ||
this._newSubscription(data); | ||
return; | ||
} | ||
|
||
if (this._sequences[product_id] < 0) { | ||
// Orderbook snapshot not loaded yet | ||
|
@@ -100,7 +115,7 @@ class OrderbookSync extends WebsocketClient { | |
this._sequences[product_id] = data.sequence; | ||
const book = this.books[product_id]; | ||
|
||
switch (data.type) { | ||
switch (type) { | ||
case 'open': | ||
book.add(data); | ||
break; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,6 +99,117 @@ suite('WebsocketClient', () => { | |
}); | ||
}); | ||
|
||
test('subscribes to additional products', done => { | ||
var client; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. I'll fix. |
||
const server = testserver(++port, () => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't we stop incrementing the port? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True. This must have been left over from multiple rebases. I'll fix. |
||
client = new Gdax.WebsocketClient([], 'ws://localhost:' + port); | ||
}); | ||
server.on('connection', socket => { | ||
socket.once('message', data => { | ||
const msg = JSON.parse(data); | ||
assert.equal(msg.type, 'subscribe'); | ||
|
||
socket.on('message', data => { | ||
const msg = JSON.parse(data); | ||
assert.equal(msg.type, 'subscribe'); | ||
assert.ok(msg.product_ids.includes('ETH-BTC')); | ||
assert.ok(msg.product_ids.includes('ETH-USD')); | ||
|
||
server.close(); | ||
done(); | ||
}); | ||
client.subscribe({ product_ids: ['ETH-BTC', 'ETH-USD'] }); | ||
}); | ||
}); | ||
}); | ||
|
||
test('unsubscribes from product', done => { | ||
var client; | ||
const server = testserver(++port, () => { | ||
client = new Gdax.WebsocketClient(['BTC-USD'], 'ws://localhost:' + port); | ||
}); | ||
server.on('connection', socket => { | ||
socket.once('message', data => { | ||
const msg = JSON.parse(data); | ||
assert.equal(msg.type, 'subscribe'); | ||
|
||
socket.on('message', data => { | ||
const msg = JSON.parse(data); | ||
assert.equal(msg.type, 'unsubscribe'); | ||
assert.deepEqual(msg.product_ids, ['BTC-USD']); | ||
assert.deepEqual(msg.channels, ['full']); | ||
|
||
server.close(); | ||
done(); | ||
}); | ||
client.unsubscribe({ product_ids: ['BTC-USD'], channels: ['full'] }); | ||
}); | ||
}); | ||
}); | ||
|
||
test('subscribes to additional channels', done => { | ||
var client; | ||
const server = testserver(++port, () => { | ||
client = new Gdax.WebsocketClient( | ||
['BTC-USD'], | ||
'ws://localhost:' + port, | ||
null, | ||
{ channels: ['heartbeat'] } | ||
); | ||
}); | ||
server.on('connection', socket => { | ||
socket.once('message', data => { | ||
const msg = JSON.parse(data); | ||
assert.equal(msg.type, 'subscribe'); | ||
|
||
socket.on('message', data => { | ||
const msg = JSON.parse(data); | ||
assert.equal(msg.type, 'subscribe'); | ||
assert.deepEqual(msg.channels, [ | ||
{ | ||
name: 'ticker', | ||
product_ids: ['LTC-USD'], | ||
}, | ||
]); | ||
|
||
server.close(); | ||
done(); | ||
}); | ||
client.subscribe({ | ||
channels: [{ name: 'ticker', product_ids: ['LTC-USD'] }], | ||
}); | ||
}); | ||
}); | ||
}); | ||
|
||
test('unsubscribes from channel', done => { | ||
var client; | ||
const server = testserver(++port, () => { | ||
client = new Gdax.WebsocketClient( | ||
['BTC-USD'], | ||
'ws://localhost:' + port, | ||
null, | ||
{ channels: ['ticker'] } | ||
); | ||
}); | ||
server.on('connection', socket => { | ||
socket.once('message', data => { | ||
const msg = JSON.parse(data); | ||
assert.equal(msg.type, 'subscribe'); | ||
|
||
socket.on('message', data => { | ||
const msg = JSON.parse(data); | ||
assert.equal(msg.type, 'unsubscribe'); | ||
assert.deepEqual(msg.channels, ['ticker']); | ||
|
||
server.close(); | ||
done(); | ||
}); | ||
client.unsubscribe({ channels: ['ticker'] }); | ||
}); | ||
}); | ||
}); | ||
|
||
test('passes authentication details through', done => { | ||
const server = testserver(port, () => { | ||
new Gdax.WebsocketClient('ETH-USD', 'ws://localhost:' + port, { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what these sets are used for — why aren't we using plain arrays?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sets are a better construct for unique lists (obviously), but if you don't like them, I'll clean them out.