Skip to content

Commit

Permalink
websockets support
Browse files Browse the repository at this point in the history
  • Loading branch information
worryg0d committed Jul 18, 2023
1 parent d29e631 commit e550dc9
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 22 deletions.
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,32 @@ client.on('log', (level, loggerName, message, furtherInfo) => {
The `level` being passed to the listener can be `verbose`, `info`, `warning` or `error`. Visit the [logging
documentation][doc-logging] for more information.

## WebSockets

You can use websocket as transport. But Cassandra doesn't support this protocol
so some proxy should be deployed in front of Cassandra, which can handle this transport protocol.

```javascript
const client = new cassandra.Client({
transport: 'WebSocket',
contactPoints: [
// some proxies that support websocket transport
'127.0.0.1:9043',
'localhost:9044'
],
webSocketOptions: {
// some client websocket options
protocolVersion: 13,
...
}
});
```

You can configure your websocket client with `webSocketOptions`.
To properly configure it follow [websocket/ws doc][ws-doc].

You also can use websockets over SSL by passing `transport: 'SecureWebSocket'`.

## Compatibility

- Apache Cassandra versions 2.1 and above.
Expand Down Expand Up @@ -291,4 +317,5 @@ Unless required by applicable law or agreed to in writing, software distributed
[streams2]: https://nodejs.org/api/stream.html#stream_class_stream_readable
[cql-udt]: https://cassandra.apache.org/doc/latest/cql/types.html#udts
[dse]: https://www.datastax.com/products/datastax-enterprise
[astra]: https://www.datastax.com/products/datastax-astra
[astra]: https://www.datastax.com/products/datastax-astra
[ws-doc]: https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketaddress-protocols-options
6 changes: 6 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { metrics } from './lib/metrics';
import { tracker } from './lib/tracker';
import { metadata } from './lib/metadata';
import { datastax } from './lib/datastax/';
import { ClientRequestArgs } from 'http';
import Long = types.Long;
import Uuid = types.Uuid;
import graph = datastax.graph;
Expand Down Expand Up @@ -191,7 +192,11 @@ export interface ExecutionOptions {
setHints(hints: string[]): void;
}

export type WebSocketClientOptions = (ClientOptions | ClientRequestArgs)
& {protocols?: string | string[] | undefined};

export interface ClientOptions {
transport?: 'SecureWebSocket' | 'WebSocket' | undefined
contactPoints?: string[];
localDataCenter?: string;
keyspace?: string;
Expand Down Expand Up @@ -253,6 +258,7 @@ export interface ClientOptions {
tcpNoDelay?: boolean;
};
sslOptions?: tls.ConnectionOptions;
webSocketOptions?: WebSocketClientOptions;
}

export interface QueryOptions {
Expand Down
81 changes: 61 additions & 20 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const StreamIdStack = require('./stream-id-stack');
const OperationState = require('./operation-state');
const promiseUtils = require('./promise-utils');
const { ExecutionOptions } = require('./execution-options');
const { WebSocketWrapper } = require('./websocket');

/**
* Represents a connection to a Cassandra node
Expand Down Expand Up @@ -171,30 +172,70 @@ class Connection extends events.EventEmitter {
const self = this;
this.log('info', `Connecting to ${this.endpointFriendlyName}`);

if (!this.options.sslOptions) {
this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold });
this.netClient.connect(this.port, this.address, function connectCallback() {
self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`);
self.bindSocketListeners();
self.startup(callback);
});
}
else {
// Use TLS
const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions);
if (this.options.transport) {
if (this.options.transport.toLowerCase() === 'securewebsocket') {
// Use secure WebSocket
const options = utils.extend({ rejectUnauthorized: false, transport: this.options.transport },
this.options.webSocketOptions);

if (!options.protocols) {
options.protocols = ['cql'];
}

this.netClient = new WebSocketWrapper(options);

this.netClient.connect(this.port, this.address, function connectCallback() {
self.log('verbose', `Secure WebSocket to ${self.endpointFriendlyName}`);
self.bindSocketListeners();
self.startup(callback);
});
} else {
// Use WebSocket
const options = utils.extend({
transport: this.options.transport,
highWaterMark: this.options.socketOptions.coalescingThreshold,
handshakeTimeout: this.options.socketOptions.connectTimeout,
}, this.options.webSocketOptions);

if (!options.protocols) {
options.protocols = ['cql'];
}

if (this.options.sni) {
sslOptions.servername = this._serverName;
this.netClient = new WebSocketWrapper(options);

this.netClient.connect(this.port, this.address, function connectCallback() {
self.log('verbose', `WebSocket connected to ${self.endpointFriendlyName}`);
self.bindSocketListeners();
self.startup(callback);
});
}
} else {
// Use Socket
if (!this.options.sslOptions) {
this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold });

this.netClient.connect(this.port, this.address, function connectCallback() {
self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`);
self.bindSocketListeners();
self.startup(callback);
});
} else {
// Use Socket with TLS
const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions);

this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() {
self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`);
self.bindSocketListeners();
self.startup(callback);
});
if (this.options.sni) {
sslOptions.servername = this._serverName;
}

// TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version)
this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold);
this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() {
self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`);
self.bindSocketListeners();
self.startup(callback);
});

// TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version)
this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold);
}
}

this.netClient.once('error', function socketError(err) {
Expand Down
91 changes: 91 additions & 0 deletions lib/websocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const { EventEmitter } = require('events');
const { WebSocket } = require('ws');

/**
* WebSocketWrapper is a wrapper on the `ws.Websocket` which implements
* `net.Socket` interface to be used by the `cassandra.Connection`
*/
class WebSocketWrapper extends EventEmitter {
/**
* Creates a websocket wrapper instance. To connect use `connect` method
* @param {object} options client options for a websocket
*/
constructor(options) {
super();
this.options = options;
}

/**
* Creates an instance of a websocket and connects
* @param {String} port
* @param {String} address
* @param {() => void} connectionCallback is called when connection is successfully established
* @returns {WebSocketWrapper} wrapper itself
*/
connect(port, address, connectionCallback) {
const schema = this.options.transport.toLowerCase() === 'securewebsocket' ? 'wss' : 'ws';

this.ws = new WebSocket(schema+'://'+address+':'+port, this.options.protocols, this.options);

if (connectionCallback) {
this.ws.on('open', connectionCallback);
}

const stream = WebSocket.createWebSocketStream(this.ws, this.options);

stream.on('error', err => {
this.emit('error', err);
});
stream.on('drain', () => {
this.emit('drain');
});
stream.on('close', () => {
this.emit('close');
});
stream.on('end', () => {
this.emit('end');
});

this.write = stream.write.bind(stream);
this.pipe = stream.pipe.bind(stream);
this.end = stream.end.bind(stream);
this.destroy = stream.destroy.bind(stream);

return this;
}

/**
* It is not implemented because `ws` lib doesn't provide API to work with
*/
setTimeout() {}

/**
* It is not implemented because `ws` lib doesn't provide API to work with
*/
setKeepAlive() {}

/**
* It is not implemented because `ws` lib doesn't provide API to work with
*/
setNoDelay() {}
}

module.exports.WebSocketWrapper = WebSocketWrapper;
6 changes: 6 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
"@types/long": "^4.0.0",
"@types/node": ">=8",
"adm-zip": "^0.5.3",
"long": "^2.2.0"
"long": "^2.2.0",
"ws": "^8.13.0"
},
"devDependencies": {
"chai": "4.2.0",
Expand Down

0 comments on commit e550dc9

Please sign in to comment.