Skip to content

Commit

Permalink
feat: finished initial SocketIOApiClient + unit test
Browse files Browse the repository at this point in the history
Closes: hyperledger-cacti#123
Signed-off-by: Michal Bajer <michal.bajer@fujitsu.com>
  • Loading branch information
outSH committed Dec 13, 2021
1 parent 7a87b9d commit e257146
Show file tree
Hide file tree
Showing 2 changed files with 519 additions and 91 deletions.
132 changes: 126 additions & 6 deletions packages/cactus-api-client/src/main/typescript/socketio-api-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@
* SocketIOApiClient.ts
*/

const defaultMaxCounterRequestID = 100;
const defaultSyncFunctionTimeoutMillisecond = 5000;

import { Logger, Checks } from "@hyperledger/cactus-common";
import { LogLevelDesc, LoggerProvider } from "@hyperledger/cactus-common";

import { Socket, SocketOptions, ManagerOptions, io } from "socket.io-client";
import { readFileSync } from "fs";
import { resolve as resolvePath } from "path";
import { verify, VerifyOptions } from "jsonwebtoken";
import { Observable, ReplaySubject } from "rxjs";
import { finalize } from "rxjs/operators";

//////////////////////////////////////////////////
// COMMON CODE
//////////////////////////////////////////////////

const defaultMaxCounterRequestID = 100;
const defaultSyncFunctionTimeoutMillisecond = 5000;

export type VerifierSocketIOOptions = {
readonly maxCounterRequestID?: number;
readonly syncFunctionTimeoutMillisecond?: number;
Expand All @@ -34,6 +36,12 @@ export type SocketIOApiClientOptions = {
readonly verifier?: VerifierSocketIOOptions;
};

export class SocketIOLedgerEvent {
id = "";
verifierId = "";
data: Record<string, unknown> | null = null;
}

//////////////////////////////////////////////////
// verifyValidatorJwt
//////////////////////////////////////////////////
Expand Down Expand Up @@ -66,6 +74,8 @@ export function verifyValidatorJwt(
export class SocketIOApiClient {
private readonly log: Logger;
private readonly socket: Socket;
// TODO - Why replay only last one? Maybe make it configurable?
private monitorSubject: ReplaySubject<SocketIOLedgerEvent> | undefined;

readonly className: string;
counterReqID = 1;
Expand All @@ -77,7 +87,6 @@ export class SocketIOApiClient {
constructor(public readonly options: SocketIOApiClientOptions) {
this.className = this.constructor.name;

// TODO - Checks decorator??
Checks.nonBlankString(
options.validatorID,
`${this.className}::constructor() arg validatorID`,
Expand All @@ -87,6 +96,7 @@ export class SocketIOApiClient {
`${this.className}::constructor() arg validatorURL`,
);
Checks.nonBlankString(
// todo - checks path exists?
options.validatorKeyPath,
`${this.className}::constructor() arg validatorKeyPath`,
);
Expand All @@ -103,6 +113,7 @@ export class SocketIOApiClient {
this.socket = io(options.validatorURL, options.socketOptions);
}

// TODO - Better interface in all functions (no Record and any)
public sendAsyncRequest(
contract: Record<string, unknown>,
method: Record<string, unknown>,
Expand Down Expand Up @@ -148,13 +159,11 @@ export class SocketIOApiClient {

this.socket.on("connect_error", (err: Error) => {
this.log.error("##connect_error:", err);
// end communication
this.socket.disconnect();
reject(err);
});
this.socket.on("connect_timeout", (err: Record<string, unknown>) => {
this.log.error("####Error:", err);
// end communication
this.socket.disconnect();
reject(err);
});
Expand Down Expand Up @@ -218,6 +227,117 @@ export class SocketIOApiClient {
});
}

// TODO - To entire class - analyze and handle broken connection / socket disconnected scenarios
public watchBlocksV1(
monitorOptions?: Record<string, unknown>,
): Observable<SocketIOLedgerEvent> {
if (this.monitorSubject) {
this.log.debug("Reuse observable subject from previous call...");
return this.monitorSubject;
} else {
this.log.debug("Create new observable subject...");

this.monitorSubject = new ReplaySubject<SocketIOLedgerEvent>(0);

this.log.debug("call : startMonitor");
try {
this.log.debug(
`##in startMonitor, validatorUrl = ${this.options.validatorURL}`,
);

this.socket.on("connect_error", (err: Error) => {
this.log.error("##connect_error:", err);
this.socket.disconnect();
if (this.monitorSubject) {
this.monitorSubject.error(err);
}
});

this.socket.on("connect_timeout", (err: Record<string, unknown>) => {
this.log.error("####Error:", err);
this.socket.disconnect();
if (this.monitorSubject) {
this.monitorSubject.error(err);
}
});

this.socket.on("error", (err: Record<string, unknown>) => {
this.log.error("####Error:", err);
this.socket.disconnect();
if (this.monitorSubject) {
this.monitorSubject.error(err);
}
});

this.socket.on("monitor_error", (err: Record<string, unknown>) => {
this.log.error("#### Monitor Error:", err);
if (this.monitorSubject) {
this.monitorSubject.error(err);
}
});

this.socket.on("eventReceived", (res: any) => {
// output the data received from the client
this.log.debug("#[recv]eventReceived, res:", res);

this.checkValidator(this.options.validatorKeyPath, res.blockData)
.then((decodedData) => {
const resultObj = {
status: res.status,
blockData: decodedData.blockData,
};
this.log.debug("resultObj =", resultObj);
const event = new SocketIOLedgerEvent();
event.verifierId = this.options.validatorID;
this.log.debug(`##event.verifierId: ${event.verifierId}`);
event.data = resultObj;
if (this.monitorSubject) {
this.monitorSubject.next(event);
}
})
.catch((err) => {
this.log.error(err);
});
});

const emitStartMonitor = () => {
this.log.debug("##emit: startMonitor");
if (!monitorOptions || Object.keys(monitorOptions).length === 0) {
this.socket.emit("startMonitor");
} else {
this.socket.emit("startMonitor", monitorOptions);
}
};

if (this.socket.connected) {
emitStartMonitor();
} else {
this.socket.on("connect", () => {
this.log.debug("#connect");
emitStartMonitor();
});
}
} catch (err) {
this.log.error(`##Error: startMonitor, ${err}`);
this.monitorSubject.error(err);
}

return this.monitorSubject.pipe(
finalize(() => {
if (
this.monitorSubject &&
this.monitorSubject.observers.length === 1
) {
// Last observer finished
this.log.debug("##emit: stopMonitor");
this.socket.emit("stopMonitor");
this.monitorSubject = undefined;
}
}),
);
}
}

// Request ID generation
private genarateReqID(): string {
const maxCounterRequestID =
Expand Down
Loading

0 comments on commit e257146

Please sign in to comment.