Skip to content
This repository has been archived by the owner on Jan 20, 2020. It is now read-only.

Commit

Permalink
Add Websocket#subscribe() and #unsubscribe() (#213)
Browse files Browse the repository at this point in the history
* Add subscribe/unsubscribe functions to websocket.

OrderbookSync now tracks new product subscriptions.

* Updated websocket client usage in README.

* Add WebsocketClient#subscribe() and #unsubscribe()

* Avoid incrementing the port in test suite

* Remove var keyword in tests

* Remove use of Sets in WebsocketClient

* Prefer `thisArg` where possible while using Array.prototype.forEach
  • Loading branch information
rmm5t authored and fb55 committed Jan 22, 2018

Unverified

This user has not yet uploaded their public signing key.
1 parent 8943010 commit c14bdab
Showing 4 changed files with 188 additions and 23 deletions.
32 changes: 30 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -474,7 +474,9 @@ websocket.on('close', () => {
});
```

Optionally set the heartbeat mode or websocket URI.
The client will automatically subscribe to the `heartbeat` channel. By
default, the `full` channel will be subscribed to unless other channels are
requested.

```javascript
const websocket = new Gdax.WebsocketClient(
@@ -485,8 +487,34 @@ const websocket = new Gdax.WebsocketClient(
secret: 'suchsecret',
passphrase: 'muchpassphrase',
},
{ heartbeat: true }
{ channels: ['full', 'level2'] }
);

```

Optionally, [change subscriptions at runtime](https://docs.gdax.com/#subscribe):

```javascript
websocket.unsubscribe({ channels: ['full'] });

websocket.subscribe({ product_ids: ['LTC-USD'], channels: ['ticker', 'user'] });

websocket.subscribe({
channels: [{
name: 'user',
product_ids: ['ETH-USD']
}]
});

websocket.unsubscribe({
channels: [{
name: 'user',
product_ids: ['LTC-USD']
}, {
name: 'user',
product_ids: ['ETH-USD']
}]
});
```

The following events can be emitted from the `WebsocketClient`:
39 changes: 25 additions & 14 deletions lib/clients/websocket.js
Original file line number Diff line number Diff line change
@@ -48,26 +48,37 @@ class WebsocketClient extends EventEmitter {
this.socket = null;
}

onOpen() {
this.emit('open');
_sendSubscription(type, { product_ids, channels }) {
const message = { type };

const subscribeMessage = {
type: 'subscribe',
product_ids: this.productIDs,
channels: this.channels,
};
if (channels) {
message.channels = channels;
}

if (product_ids) {
message.product_ids = product_ids;
}

// Add Signature
if (this.auth.secret) {
let sig = signRequest(
this.auth,
'GET',
this.channels ? '/users/self/verify' : '/users/self'
);
Object.assign(subscribeMessage, sig);
const sig = signRequest(this.auth, 'GET', '/users/self/verify');
Object.assign(message, sig);
}

this.socket.send(JSON.stringify(subscribeMessage));
this.socket.send(JSON.stringify(message));
}

subscribe({ product_ids, channels }) {
this._sendSubscription('subscribe', { product_ids, channels });
}

unsubscribe({ product_ids, channels }) {
this._sendSubscription('unsubscribe', { product_ids, channels });
}

onOpen() {
this.emit('open');
this.subscribe({ product_ids: this.productIDs, channels: this.channels });
}

onClose() {
29 changes: 22 additions & 7 deletions lib/orderbook_sync.js
Original file line number Diff line number Diff line change
@@ -31,15 +31,17 @@ class OrderbookSync extends WebsocketClient {
this._client = new PublicClient(this.apiURI);
}

this.productIDs.forEach(productID => {
this._queues[productID] = [];
this._sequences[productID] = -2;
this.books[productID] = new Orderbook();
});
this.productIDs.forEach(this._newProduct, this);

this.on('message', this.processMessage.bind(this));
}

_newProduct(productID) {
this._queues[productID] = [];
this._sequences[productID] = -2;
this.books[productID] = new Orderbook();
}

loadOrderbook(productID) {
if (!this.books[productID]) {
return;
@@ -67,8 +69,21 @@ class OrderbookSync extends WebsocketClient {
.catch(problems);
}

// subscriptions changed -- possible new products
_newSubscription(data) {
const channel = data.channels.find(c => c.name === 'full');
channel && channel.product_ids
.filter(productID => !(productID in this.books))
.forEach(this._newProduct, this);
}

processMessage(data) {
const { product_id } = data;
const { type, product_id } = data;

if (type === 'subscriptions') {
this._newSubscription(data);
return;
}

if (this._sequences[product_id] < 0) {
// Orderbook snapshot not loaded yet
@@ -100,7 +115,7 @@ class OrderbookSync extends WebsocketClient {
this._sequences[product_id] = data.sequence;
const book = this.books[product_id];

switch (data.type) {
switch (type) {
case 'open':
book.add(data);
break;
111 changes: 111 additions & 0 deletions tests/websocket.spec.js
Original file line number Diff line number Diff line change
@@ -99,6 +99,117 @@ suite('WebsocketClient', () => {
});
});

test('subscribes to additional products', done => {
let client;
const server = testserver(port, () => {
client = new Gdax.WebsocketClient([], 'ws://localhost:' + port);
});
server.on('connection', socket => {
socket.once('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'subscribe');

socket.on('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'subscribe');
assert.ok(msg.product_ids.includes('ETH-BTC'));
assert.ok(msg.product_ids.includes('ETH-USD'));

server.close();
done();
});
client.subscribe({ product_ids: ['ETH-BTC', 'ETH-USD'] });
});
});
});

test('unsubscribes from product', done => {
let client;
const server = testserver(port, () => {
client = new Gdax.WebsocketClient(['BTC-USD'], 'ws://localhost:' + port);
});
server.on('connection', socket => {
socket.once('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'subscribe');

socket.on('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'unsubscribe');
assert.deepEqual(msg.product_ids, ['BTC-USD']);
assert.deepEqual(msg.channels, ['full']);

server.close();
done();
});
client.unsubscribe({ product_ids: ['BTC-USD'], channels: ['full'] });
});
});
});

test('subscribes to additional channels', done => {
let client;
const server = testserver(port, () => {
client = new Gdax.WebsocketClient(
['BTC-USD'],
'ws://localhost:' + port,
null,
{ channels: ['heartbeat'] }
);
});
server.on('connection', socket => {
socket.once('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'subscribe');

socket.on('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'subscribe');
assert.deepEqual(msg.channels, [
{
name: 'ticker',
product_ids: ['LTC-USD'],
},
]);

server.close();
done();
});
client.subscribe({
channels: [{ name: 'ticker', product_ids: ['LTC-USD'] }],
});
});
});
});

test('unsubscribes from channel', done => {
let client;
const server = testserver(port, () => {
client = new Gdax.WebsocketClient(
['BTC-USD'],
'ws://localhost:' + port,
null,
{ channels: ['ticker'] }
);
});
server.on('connection', socket => {
socket.once('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'subscribe');

socket.on('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'unsubscribe');
assert.deepEqual(msg.channels, ['ticker']);

server.close();
done();
});
client.unsubscribe({ channels: ['ticker'] });
});
});
});

test('passes authentication details through', done => {
const server = testserver(port, () => {
new Gdax.WebsocketClient('ETH-USD', 'ws://localhost:' + port, {

0 comments on commit c14bdab

Please sign in to comment.