Skip to content

Commit

Permalink
feat: Add ws proxy server and client (#75)
Browse files Browse the repository at this point in the history
* feat: Add ws proxy server and client

* minor clenup

* fix token refresh flow

* refactor server and connection handling

* break down server proxy

* add disposable interface

* fix tests

* bump deps
  • Loading branch information
felipecsl authored Dec 10, 2023
1 parent b985574 commit 80cc0a2
Show file tree
Hide file tree
Showing 13 changed files with 2,045 additions and 1,434 deletions.
19 changes: 19 additions & 0 deletions cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDATCCAekCFANi/vgvNCE/QMwmMvTqYzJ8KUdSMA0GCSqGSIb3DQEBCwUAMD0x
CzAJBgNVBAYTAlVTMQswCQYDVQQIDAJXQTEQMA4GA1UEBwwHU2VhdHRsZTEPMA0G
A1UECgwGSHVza2x5MB4XDTIzMTIwNzAxMzQyMVoXDTI0MDEwNjAxMzQyMVowPTEL
MAkGA1UEBhMCVVMxCzAJBgNVBAgMAldBMRAwDgYDVQQHDAdTZWF0dGxlMQ8wDQYD
VQQKDAZIdXNrbHkwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDaRoG5
iMYVNQBLQtyc/98xisbWT7XEbRhWkxSgGep/G/1by/Y0YiI79fFK9Bw7mlabSmmj
VnW5fI/iSGXp7WO1CWSwT5QNXpUnJ3igal38YhR1H3OCgi9VlJOiKLUi0omsKy48
fPp7mGUOxR4dSGdXMEobrOPH0PxpZpOUXmK6KKe+/iOxhw2uqN7B88vcSd9dwOEP
JgSrXU5iTrcNqqHEr+/Dj8tKXKETNly5Gi6233owqDUggcZ0wjXeysR2uM0GV25c
l7Hpar1csx6sYWfccmHoXdhMmAdyCS4dAGwN+J/jxqtb97jWqUSe8R4o/sIzu/fq
bpTB8T/Dx1fJqxXrAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAALXmKSzQNzuS9mj
FyUf9duuRPGcnRG95BBup0WQyyh8oQY8yEGPd1GnKmvBYIrLbdcuGA84IEUUC1ih
MuWrfT1KZ+7KKC6cRD7AsXHZl1hh4xlssiz6muBrhJzSvs6jhBaHnQM72FVizMjC
ET/B8GukZh2ba9nbg0JPn+IxgrDbMfBBtOPXIfZDpHsGHzoBPjrZjkHAXV8PaWWA
k57FDqOjnl/h26vujBQgOZScTvLfvcIG+h/GQwd5VRAH9wJdo6WFQlBZ9SUvWOH1
nWLbx0ohVFAOE7RV1d50bQjoASBz6zHIhR3Odd2oovtqb+cHAlhbv+I9Mkr6U6QW
VdXNs7M=
-----END CERTIFICATE-----
28 changes: 28 additions & 0 deletions key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDaRoG5iMYVNQBL
Qtyc/98xisbWT7XEbRhWkxSgGep/G/1by/Y0YiI79fFK9Bw7mlabSmmjVnW5fI/i
SGXp7WO1CWSwT5QNXpUnJ3igal38YhR1H3OCgi9VlJOiKLUi0omsKy48fPp7mGUO
xR4dSGdXMEobrOPH0PxpZpOUXmK6KKe+/iOxhw2uqN7B88vcSd9dwOEPJgSrXU5i
TrcNqqHEr+/Dj8tKXKETNly5Gi6233owqDUggcZ0wjXeysR2uM0GV25cl7Hpar1c
sx6sYWfccmHoXdhMmAdyCS4dAGwN+J/jxqtb97jWqUSe8R4o/sIzu/fqbpTB8T/D
x1fJqxXrAgMBAAECggEAI6btjHD3LcKU9DYNE8XFXnG07Y9ieJ17IrTuYwIop69a
MDK92auHvPR8f4okzGV2rPG4FHpMS0o5tDOwFcf1B75riFLPM2nWAem0Dbuh81XP
0pubAd+ivJ9Ch/OPNotd+lWpPS8KuMJZC1MOCqlnW7ni+OdB40LS36JmC49pH8+z
u2+pGq7H77nPkuk72Ma1sAqcyZmqungmpoa0q2x3EBOqq//njk74PJb+V7qcvR9g
lUwdM4Sh4HdhvmZ/C01YPC/4UcztJgowTBbKK6WdpsSwjhP60I3yPBloJr+lYMAX
YQVTrePK4JWqltvL+Bdjlz2VTd8YiXrwW7sknuXRWQKBgQD+kzw0nym9Rpg68Fa1
McWnfxXaFEh/50XY1cK3fPO5J7pdrHnZLHIBi/1q0nCo0YKb/P/8OjObhTnCa4J9
NtGhmju7QU1yTWGgNG+o2NlGrNrAvVwcWUmRyuyNczN3QCak86oteZk3jrDKL8CW
j48buonk7MP5oMp6U6Wj5PGbYwKBgQDbf0KMeSSJX05eqar9MjjHV8eAm4EvrCrZ
/Q8rGYq81dwklB/YZyf26bALSyqznbvp9sCdo58x4XPnHwwe+zWWKrsthZxsF7Fi
jW9BM9TRhvhe5IQWHywxtWvyv5eZNhpkbNLyogm7NOWwUIuE4z3j8bxi883GgoK9
rGjp56fV2QKBgQCPafKo0mF5N6Pa0DqIqRloWre8u2B8bZVzqjiflczXqgHbc6bR
KbCwHmUNILBG6oBh0A2F0mPwYQVA+b/xOkiueWzc+NTgZ6dv0Rp2THNa1VYG7qZN
ch93+pF4vkVoEMO0eXCNXctq+P+vZ2dfalB8loHIbXmZz3NBpo3R3tAdcQKBgHjb
MFRSW5i7/lXHDBwPrA2uum2IsfAC1zFh0hlEHgztoCIP4RzxZ6Lfdwww3hk6D366
W8IwlnYLkhq/EJh6bz3410kwWTl3Ljd6crivBk48B8OQBV582YOhRgfKEHnOvWdw
OBJawArxDEsxfjC0Qp6gur6tSS81KzGunbG02Me5AoGAKwTP+5uB0kdE24AvEDND
1ozkss9Dl4wjF0ucTzqwOT2iPlyM6sYEQzbjeMkpyGLVJbIWpNTSvzHj389mImRt
i2URj3r/EIrdgSLKXzxMyJc+ToM705v4OgZYAuSLQSTaSBH6C1Kuw+l8UCuWK6/p
i4X9qUqQUd9bT/Zh9nGs9G0=
-----END PRIVATE KEY-----
15 changes: 11 additions & 4 deletions src/client/realWsJsonClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
RawPayloadResponse,
WsJsonRawMessage,
} from "./tdaWsJsonTypes";
import { Constructor, debugLog, findByTypeOrThrow } from "./util";
import { Constructor, debugLog, findByTypeOrThrow, throwError } from "./util";
import MulticastIterator from "obgen/multicastIterator";
import BufferedIterator from "obgen/bufferedIterator";
import { isConnectionResponse, isLoginResponse } from "./messageTypeHelpers";
Expand Down Expand Up @@ -92,7 +92,7 @@ export enum ChannelState {
ERROR,
}

const logger = debug("ws");
const logger = debug("realWsJsonClient");

const messageHandlers: WebSocketApiMessageHandler<never, any>[] = [
new CancelAlertMessageHandler(),
Expand Down Expand Up @@ -121,9 +121,9 @@ export default class RealWsJsonClient implements WsJsonClient {
private buffer = new BufferedIterator<ParsedWebSocketResponse>();
private iterator = new MulticastIterator(this.buffer);
private state = ChannelState.DISCONNECTED;
private accessToken?: string;

constructor(
private readonly accessToken: string,
private readonly socket = new WebSocket(
"wss://services.thinkorswim.com/Services/WsJson",
{
Expand All @@ -148,7 +148,10 @@ export default class RealWsJsonClient implements WsJsonClient {
)
) {}

async authenticate(): Promise<RawLoginResponseBody | null> {
async authenticate(
accessToken: string
): Promise<RawLoginResponseBody | null> {
this.accessToken = accessToken;
const { state } = this;
switch (state) {
case ChannelState.DISCONNECTED:
Expand Down Expand Up @@ -189,6 +192,9 @@ export default class RealWsJsonClient implements WsJsonClient {
logger("⬅️\treceived %O", message);
if (isConnectionResponse(message)) {
const handler = findByTypeOrThrow(messageHandlers, LoginMessageHandler);
if (!accessToken) {
throwError("access token is required, cannot authenticate");
}
this.sendMessage(handler.buildRequest(accessToken));
} else if (isLoginResponse(message)) {
this.handleLoginResponse(message, resolve, reject);
Expand Down Expand Up @@ -356,6 +362,7 @@ export default class RealWsJsonClient implements WsJsonClient {
} else {
this.state = ChannelState.ERROR;
reject(`Login failed: ${body.message}`);
this.disconnect();
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/client/wsJsonClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ import {
} from "./types/alertTypes";
import { MarketDepthResponse } from "./services/marketDepthMessageHandler";
import { GetWatchlistResponse } from "./services/getWatchlistMessageHandler";
import { Disposable } from "../server/disposable";

export interface WsJsonClient {
authenticate(): Promise<RawLoginResponseBody | null>;
export interface WsJsonClient extends Disposable {
authenticate(accessToken: string): Promise<RawLoginResponseBody | null>;

isConnected(): boolean;

Expand Down Expand Up @@ -81,8 +82,6 @@ export interface WsJsonClient {

userProperties(): Promise<UserPropertiesResponse>;

disconnect(): void;

marketDepth(symbol: string): AsyncIterable<MarketDepthResponse>;

watchlist(watchlistId: number): Promise<GetWatchlistResponse>;
Expand Down
14 changes: 9 additions & 5 deletions src/client/wsJsonClientAuth.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { OAuth2Client, OAuth2Token } from "@badgateway/oauth2-client";
import { WsJsonClient } from "./wsJsonClient";
import debug from "debug";

const logger = debug("wsJsonClientAuth");

export default class WsJsonClientAuth {
private readonly oauthClient: OAuth2Client;

constructor(
private readonly wsJsonClientFactory: (accessToken: string) => WsJsonClient,
private readonly wsJsonClientFactory: () => WsJsonClient,
clientId: string,
originalFetch: typeof fetch
) {
Expand All @@ -22,21 +25,22 @@ export default class WsJsonClientAuth {
}

async authenticateWithRetry(token: OAuth2Token): Promise<AuthResult> {
const client = this.wsJsonClientFactory(token.accessToken);
const client = this.wsJsonClientFactory();
try {
await client.authenticate();
await client.authenticate(token.accessToken);
return { token, client };
} catch (e) {
return await this.refreshToken(token);
}
}

async refreshToken(token: OAuth2Token): Promise<AuthResult> {
logger("attempting token refresh");
const { oauthClient } = this;
try {
const newToken = await oauthClient.refreshToken(token);
const client = this.wsJsonClientFactory(newToken.accessToken);
await client.authenticate();
const client = this.wsJsonClientFactory();
await client.authenticate(newToken.accessToken);
// oauthClient.refreshToken() doesn't return the refresh token so we need to re-add it
const refreshedToken = { ...newToken, refreshToken: token.refreshToken };
return { token: refreshedToken, client };
Expand Down
226 changes: 226 additions & 0 deletions src/client/wsJsonClientProxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import { WsJsonClient } from "./wsJsonClient";
import { PositionsResponse } from "./services/positionsMessageHandler";
import { RawLoginResponseBody } from "./services/loginMessageHandler";
import {
CancelAlertResponse,
CreateAlertResponse,
LookupAlertsResponse,
} from "./types/alertTypes";
import { CancelOrderResponse } from "./services/cancelOrderMessageHandler";
import {
ChartRequestParams,
ChartResponse,
} from "./services/chartMessageHandler";
import { CreateAlertRequestParams } from "./services/createAlertMessageHandler";
import { MarketDepthResponse } from "./services/marketDepthMessageHandler";
import { OptionChainResponse } from "./services/optionSeriesMessageHandler";
import {
OptionChainDetailsRequest,
OptionChainDetailsResponse,
} from "./services/optionChainDetailsMessageHandler";
import { OptionSeriesQuotesResponse } from "./services/optionSeriesQuotesMessageHandler";
import {
OptionQuotesRequestParams,
OptionQuotesResponse,
} from "./services/optionQuotesMessageHandler";
import {
PlaceLimitOrderRequestParams,
PlaceOrderSnapshotResponse,
} from "./services/placeOrderMessageHandler";
import { QuotesResponse } from "./services/quotesMessageHandler";
import { OrderEventsResponse } from "./services/orderEventsMessageHandler";
import { InstrumentSearchResponse } from "./services/instrumentSearchMessageHandler";
import { UserPropertiesResponse } from "./services/userPropertiesMessageHandler";
import { GetWatchlistResponse } from "./services/getWatchlistMessageHandler";
import WebSocket from "isomorphic-ws";
import MulticastIterator from "obgen/multicastIterator";
import BufferedIterator from "obgen/bufferedIterator";
import { deferredWrap } from "obgen";
import { throwError } from "./util";
import debug from "debug";
import { ChannelState } from "./realWsJsonClient";
import { isString } from "lodash";

const logger = debug("wsClientProxy");

export const ALL_REQUESTS = [
"authenticate",
"optionChainQuotes",
"disconnect",
] as const;
type RequestType = typeof ALL_REQUESTS;
type Request = RequestType[number];

export type ProxiedRequest = {
request: Request;
args?: any[];
};

export type ProxiedResponse = ProxiedRequest & { response: unknown };

// A WsJsonClient proxy implementation that proxies requests to a WebSocket server using the provided `proxyUrl`.
export default class WsJsonClientProxy implements WsJsonClient {
private state = ChannelState.DISCONNECTED;
private buffer = new BufferedIterator<ProxiedResponse>();
private iterator = new MulticastIterator(this.buffer);
private socket?: WebSocket;

constructor(
private readonly proxyUrl: string,
private readonly options?: any
) {}

accountPositions(_: string): AsyncIterable<PositionsResponse> {
throwError("not implemented");
}

async authenticate(
accessToken: string
): Promise<RawLoginResponseBody | null> {
this.socket = new WebSocket(this.proxyUrl, this.options);
this.state = ChannelState.CONNECTING;
this.buffer = new BufferedIterator<ProxiedResponse>();
this.iterator = new MulticastIterator(this.buffer);
const { buffer, socket } = this;
return new Promise((resolve, reject) => {
socket.onmessage = ({ data }) => {
buffer.emit(JSON.parse(data as string) as ProxiedResponse);
};
socket.onopen = () => {
logger("proxy ws connection opened");
this.state = ChannelState.CONNECTED;
this.doAuthenticate(accessToken).then((res) => {
logger("proxy ws authentication response: %O", res);
if (isString(res) && res.includes("NOT_AUTHORIZED")) {
reject(res);
} else {
resolve(res);
}
});
};
socket.onclose = (event) => {
this.state = ChannelState.DISCONNECTED;
logger("proxy ws connection closed: ", event?.reason);
reject(event?.reason);
};
socket.onerror = (err) => {
this.state = ChannelState.ERROR;
logger("proxy ws socket error: %O", err);
reject(err);
};
});
}

private doAuthenticate(
accessToken: string
): Promise<RawLoginResponseBody | null> {
this.sendMessage({ request: "authenticate", args: [accessToken] });
return deferredWrap(() => this.iterator)
.filter(({ request }) => request === "authenticate")
.map(({ response }) => response)
.promise() as Promise<RawLoginResponseBody | null>;
}

cancelAlert(_: number): Promise<CancelAlertResponse> {
throwError("not implemented");
}

cancelOrder(_: number): Promise<CancelOrderResponse> {
throwError("not implemented");
}

chart(_: ChartRequestParams): AsyncIterable<ChartResponse> {
throwError("not implemented");
}

createAlert(_: CreateAlertRequestParams): Promise<CreateAlertResponse> {
throwError("not implemented");
}

disconnect(): void {
this.sendMessage({ request: "disconnect" });
}

ensureConnected(): void {
if (this.state !== ChannelState.CONNECTED) {
throw new Error("Not connected");
}
}

isConnected(): boolean {
return this.state === ChannelState.CONNECTED;
}

isConnecting(): boolean {
return this.state === ChannelState.CONNECTING;
}

lookupAlerts(): AsyncIterable<LookupAlertsResponse> {
throwError("not implemented");
}

marketDepth(_: string): AsyncIterable<MarketDepthResponse> {
throwError("not implemented");
}

optionChain(_: string): Promise<OptionChainResponse> {
throwError("not implemented");
}

optionChainDetails(
_: OptionChainDetailsRequest
): Promise<OptionChainDetailsResponse> {
throwError("not implemented");
}

optionChainQuotes(symbol: string): AsyncIterable<OptionSeriesQuotesResponse> {
this.sendMessage({ request: "optionChainQuotes", args: [symbol] });
return deferredWrap(() => this.iterator)
.filter(({ request }) => request === "optionChainQuotes")
.map(({ response }) => response)
.iterable() as AsyncIterable<OptionSeriesQuotesResponse>;
}

optionQuotes(
_: OptionQuotesRequestParams
): AsyncIterable<OptionQuotesResponse> {
throwError("not implemented");
}

placeOrder(
_: PlaceLimitOrderRequestParams
): Promise<PlaceOrderSnapshotResponse> {
throwError("not implemented");
}

quotes(_: string[]): AsyncIterable<QuotesResponse> {
throwError("not implemented");
}

replaceOrder(
_: Required<PlaceLimitOrderRequestParams>
): Promise<OrderEventsResponse> {
throwError("not implemented");
}

searchInstruments(_: string): Promise<InstrumentSearchResponse> {
throwError("not implemented");
}

userProperties(): Promise<UserPropertiesResponse> {
throwError("not implemented");
}

watchlist(_: number): Promise<GetWatchlistResponse> {
throwError("not implemented");
}

workingOrders(_: string): AsyncIterable<OrderEventsResponse> {
throwError("not implemented");
}

private sendMessage(request: ProxiedRequest) {
this.ensureConnected();
this.socket!.send(JSON.stringify(request));
}
}
Loading

0 comments on commit 80cc0a2

Please sign in to comment.