Skip to content
This repository has been archived by the owner on Jan 20, 2020. It is now read-only.

Add Websocket#subscribe() and #unsubscribe() #213

Merged
merged 7 commits into from
Jan 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,9 @@ websocket.on('close', () => {
});
```

Optionally set the heartbeat mode or websocket URI.
The client will automatically subscribe to the `heartbeat` channel. By
default, the `full` channel will be subscribed to unless other channels are
requested.

```javascript
const websocket = new Gdax.WebsocketClient(
Expand All @@ -485,8 +487,34 @@ const websocket = new Gdax.WebsocketClient(
secret: 'suchsecret',
passphrase: 'muchpassphrase',
},
{ heartbeat: true }
{ channels: ['full', 'level2'] }
);

```

Optionally, [change subscriptions at runtime](https://docs.gdax.com/#subscribe):

```javascript
websocket.unsubscribe({ channels: ['full'] });

websocket.subscribe({ product_ids: ['LTC-USD'], channels: ['ticker', 'user'] });

websocket.subscribe({
channels: [{
name: 'user',
product_ids: ['ETH-USD']
}]
});

websocket.unsubscribe({
channels: [{
name: 'user',
product_ids: ['LTC-USD']
}, {
name: 'user',
product_ids: ['ETH-USD']
}]
});
```

The following events can be emitted from the `WebsocketClient`:
Expand Down
39 changes: 25 additions & 14 deletions lib/clients/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,37 @@ class WebsocketClient extends EventEmitter {
this.socket = null;
}

onOpen() {
this.emit('open');
_sendSubscription(type, { product_ids, channels }) {
const message = { type };

const subscribeMessage = {
type: 'subscribe',
product_ids: this.productIDs,
channels: this.channels,
};
if (channels) {
message.channels = channels;
}

if (product_ids) {
message.product_ids = product_ids;
}

// Add Signature
if (this.auth.secret) {
let sig = signRequest(
this.auth,
'GET',
this.channels ? '/users/self/verify' : '/users/self'
);
Object.assign(subscribeMessage, sig);
const sig = signRequest(this.auth, 'GET', '/users/self/verify');
Object.assign(message, sig);
}

this.socket.send(JSON.stringify(subscribeMessage));
this.socket.send(JSON.stringify(message));
}

subscribe({ product_ids, channels }) {
this._sendSubscription('subscribe', { product_ids, channels });
}

unsubscribe({ product_ids, channels }) {
this._sendSubscription('unsubscribe', { product_ids, channels });
}

onOpen() {
this.emit('open');
this.subscribe({ product_ids: this.productIDs, channels: this.channels });
}

onClose() {
Expand Down
29 changes: 22 additions & 7 deletions lib/orderbook_sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,17 @@ class OrderbookSync extends WebsocketClient {
this._client = new PublicClient(this.apiURI);
}

this.productIDs.forEach(productID => {
this._queues[productID] = [];
this._sequences[productID] = -2;
this.books[productID] = new Orderbook();
});
this.productIDs.forEach(this._newProduct, this);

this.on('message', this.processMessage.bind(this));
}

_newProduct(productID) {
this._queues[productID] = [];
this._sequences[productID] = -2;
this.books[productID] = new Orderbook();
}

loadOrderbook(productID) {
if (!this.books[productID]) {
return;
Expand Down Expand Up @@ -67,8 +69,21 @@ class OrderbookSync extends WebsocketClient {
.catch(problems);
}

// subscriptions changed -- possible new products
_newSubscription(data) {
const channel = data.channels.find(c => c.name === 'full');
channel && channel.product_ids
.filter(productID => !(productID in this.books))
.forEach(this._newProduct, this);
}

processMessage(data) {
const { product_id } = data;
const { type, product_id } = data;

if (type === 'subscriptions') {
this._newSubscription(data);
return;
}

if (this._sequences[product_id] < 0) {
// Orderbook snapshot not loaded yet
Expand Down Expand Up @@ -100,7 +115,7 @@ class OrderbookSync extends WebsocketClient {
this._sequences[product_id] = data.sequence;
const book = this.books[product_id];

switch (data.type) {
switch (type) {
case 'open':
book.add(data);
break;
Expand Down
111 changes: 111 additions & 0 deletions tests/websocket.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,117 @@ suite('WebsocketClient', () => {
});
});

test('subscribes to additional products', done => {
let client;
const server = testserver(port, () => {
client = new Gdax.WebsocketClient([], 'ws://localhost:' + port);
});
server.on('connection', socket => {
socket.once('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'subscribe');

socket.on('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'subscribe');
assert.ok(msg.product_ids.includes('ETH-BTC'));
assert.ok(msg.product_ids.includes('ETH-USD'));

server.close();
done();
});
client.subscribe({ product_ids: ['ETH-BTC', 'ETH-USD'] });
});
});
});

test('unsubscribes from product', done => {
let client;
const server = testserver(port, () => {
client = new Gdax.WebsocketClient(['BTC-USD'], 'ws://localhost:' + port);
});
server.on('connection', socket => {
socket.once('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'subscribe');

socket.on('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'unsubscribe');
assert.deepEqual(msg.product_ids, ['BTC-USD']);
assert.deepEqual(msg.channels, ['full']);

server.close();
done();
});
client.unsubscribe({ product_ids: ['BTC-USD'], channels: ['full'] });
});
});
});

test('subscribes to additional channels', done => {
let client;
const server = testserver(port, () => {
client = new Gdax.WebsocketClient(
['BTC-USD'],
'ws://localhost:' + port,
null,
{ channels: ['heartbeat'] }
);
});
server.on('connection', socket => {
socket.once('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'subscribe');

socket.on('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'subscribe');
assert.deepEqual(msg.channels, [
{
name: 'ticker',
product_ids: ['LTC-USD'],
},
]);

server.close();
done();
});
client.subscribe({
channels: [{ name: 'ticker', product_ids: ['LTC-USD'] }],
});
});
});
});

test('unsubscribes from channel', done => {
let client;
const server = testserver(port, () => {
client = new Gdax.WebsocketClient(
['BTC-USD'],
'ws://localhost:' + port,
null,
{ channels: ['ticker'] }
);
});
server.on('connection', socket => {
socket.once('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'subscribe');

socket.on('message', data => {
const msg = JSON.parse(data);
assert.equal(msg.type, 'unsubscribe');
assert.deepEqual(msg.channels, ['ticker']);

server.close();
done();
});
client.unsubscribe({ channels: ['ticker'] });
});
});
});

test('passes authentication details through', done => {
const server = testserver(port, () => {
new Gdax.WebsocketClient('ETH-USD', 'ws://localhost:' + port, {
Expand Down