Skip to content

Commit

Permalink
Merge pull request #35 from axone-protocol/fix/websocket-reconnect-logic
Browse files Browse the repository at this point in the history
fix: added reconnect logic for web sockets
  • Loading branch information
yevhen-burkovskyi authored Jun 19, 2024
2 parents f76bc26 + 334a93f commit 8b6754c
Showing 1 changed file with 34 additions and 40 deletions.
74 changes: 34 additions & 40 deletions src/core/lib/okp4/okp4.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import { GetProposalResponse } from "@core/lib/okp4/responses/get-proposal.respo
@Injectable()
export class Okp4Service {
private BASE_URL = config.okp4.url;
private socket!: WebSocket;
private reconnectAttempts = 1;

constructor(
private readonly httpService: HttpService,
private eventEmitter: EventEmitter2
private readonly httpService: HttpService,
private eventEmitter: EventEmitter2
) {}

private constructUrl(endpoint: string, params?: string): string {
Expand All @@ -44,10 +46,7 @@ export class Okp4Service {

async getSupplyByDenom(denom: string): Promise<SupplyByDenomResponse> {
return this.getWithErrorHandling(
this.constructUrl(
Endpoints.SUPPLY_BY_DENOM,
createUrlParams({ denom })
)
this.constructUrl(Endpoints.SUPPLY_BY_DENOM, createUrlParams({ denom }))
);
}

Expand All @@ -70,22 +69,15 @@ export class Okp4Service {
);
}

async getDelegatorsRewards(
addr: string
): Promise<DelegatorsRewardsResponse> {
async getDelegatorsRewards(addr: string): Promise<DelegatorsRewardsResponse> {
return this.getWithErrorHandling(
this.constructUrl(
Endpoints.DELEGATORS_REWARDS.replace(
RouteParam.DELEGATOR_ADDRES,
addr
)
Endpoints.DELEGATORS_REWARDS.replace(RouteParam.DELEGATOR_ADDRES, addr)
)
);
}

async getSpendableBalances(
addr: string
): Promise<SpendableBalancesResponse> {
async getSpendableBalances(addr: string): Promise<SpendableBalancesResponse> {
return this.getWithErrorHandling(
this.constructUrl(`${Endpoints.SPENDABLE_BALANCE}/${addr}`)
);
Expand Down Expand Up @@ -142,28 +134,25 @@ export class Okp4Service {
async getBlocksByHeight(height: number): Promise<BlocksResponse> {
return this.getWithErrorHandling(
this.constructUrl(
Endpoints.BLOCKS_BY_HEIGHT.replace(
RouteParam.HEIGHT,
height.toString()
)
Endpoints.BLOCKS_BY_HEIGHT.replace(RouteParam.HEIGHT, height.toString())
)
);
}

apiPubkeyToAddr(pubkey: string) {
return toBase64(
fromHex(toHex(sha256(fromBase64(pubkey))).slice(0, 40))
);
return toBase64(fromHex(toHex(sha256(fromBase64(pubkey))).slice(0, 40)));
}

wssPubkeyToAddr(pubkey: string) {
return toHex(sha256(fromBase64(pubkey))).slice(0, 40);
}

async connectToNewBlockSocket(event: string) {
const client = new WebSocket(config.okp4.wss);
client.on("open", () => {
client.send(
this.socket = new WebSocket(config.okp4.wss);

this.socket.on("open", () => {
Log.default("Connected to Okp4 WebSocket");
this.socket.send(
JSON.stringify({
jsonrpc: "2.0",
method: "subscribe",
Expand All @@ -172,30 +161,37 @@ export class Okp4Service {
})
);
});
client.on("message", (data) => {

this.socket.on("message", (data) => {
if (Buffer.isBuffer(data)) {
const message = data.toString("utf-8");
try {
const jsonData = JSON.parse(message);
if (
jsonData &&
jsonData?.result &&
jsonData?.result?.query === "tm.event='NewBlock'"
jsonData?.result &&
jsonData?.result?.query === "tm.event='NewBlock'"
) {
this.eventEmitter.emit(
event,
jsonData?.result?.data?.value
);
this.eventEmitter.emit(event, jsonData?.result?.data?.value);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (e: any) {
Log.warn(
"[OKP4] Problem with parsing data from wss\n" +
e.message
);
Log.warn("[OKP4] Problem with parsing data from wss\n" + e.message);
}
}
});

this.socket.on("close", this.socketReconnect.bind(this, event));
this.socket.on("error", this.socketReconnect.bind(this, event));
}

private socketReconnect(event: string) {
Log.warn(
`Okp4 Socket Connection closed, reconnect attempt: ${this.reconnectAttempts}`
);
this.reconnectAttempts++;
const timeout = Math.min(10000, 1000 * 2 ** this.reconnectAttempts); // Cap at 10 seconds
setTimeout(this.connectToNewBlockSocket.bind(this, event), timeout);
}

async getGovParams(type = GovType.VOTING): Promise<GovParamsResponse> {
Expand All @@ -212,9 +208,7 @@ export class Okp4Service {
);
}

async getProposal(
proposalId: string | number
): Promise<GetProposalResponse> {
async getProposal(proposalId: string | number): Promise<GetProposalResponse> {
return this.getWithErrorHandling(
this.constructUrl(
Endpoints.GOV_PROPOSAL.replace(
Expand Down

0 comments on commit 8b6754c

Please sign in to comment.