Skip to content

Commit

Permalink
feat(cactus-api-client): add support for plain socketio validators in…
Browse files Browse the repository at this point in the history
… api-server and api-client

Full description of changes and planning log available in hyperledger-cacti#1602. In
general, this commit adds two new components - one for communicating
with plain socketio validators using socketio interface, and verifier
that implements similar features as socketio-server verifier.

Closes: hyperledger-cacti#1602
Signed-off-by: Michal Bajer <michal.bajer@fujitsu.com>
  • Loading branch information
outSH committed Dec 17, 2021
1 parent 7deaa22 commit 8670e68
Show file tree
Hide file tree
Showing 14 changed files with 1,698 additions and 6 deletions.
3 changes: 2 additions & 1 deletion packages/cactus-api-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"@hyperledger/cactus-common": "1.0.0-rc.3",
"@hyperledger/cactus-core": "1.0.0-rc.3",
"@hyperledger/cactus-core-api": "1.0.0-rc.3",
"@hyperledger/cactus-plugin-consortium-manual": "1.0.0-rc.3"
"@hyperledger/cactus-plugin-consortium-manual": "1.0.0-rc.3",
"rxjs": "7.3.0"
}
}
3 changes: 2 additions & 1 deletion packages/cactus-api-client/src/main/typescript/index.web.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from "./public-api";
export { ApiClient } from "./api-client";
export { DefaultConsortiumProvider } from "./default-consortium-provider";
6 changes: 6 additions & 0 deletions packages/cactus-api-client/src/main/typescript/public-api.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
export { ApiClient } from "./api-client";
export { DefaultConsortiumProvider } from "./default-consortium-provider";
export {
SocketIOApiClient,
SocketLedgerEvent,
SocketIOApiClientOptions,
} from "./socketio-api-client";
export { Verifier, VerifierEventListener } from "./verifier";
358 changes: 358 additions & 0 deletions packages/cactus-api-client/src/main/typescript/socketio-api-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,358 @@
/*
* Copyright 2020-2021 Hyperledger Cactus Contributors
* SPDX-License-Identifier: Apache-2.0
*
* SocketIOApiClient.ts
*/

// TODO - Document it

const defaultMaxCounterRequestID = 100;
const defaultSyncFunctionTimeoutMillisecond = 5000;

import { Logger, Checks } from "@hyperledger/cactus-common";
import { LogLevelDesc, LoggerProvider } from "@hyperledger/cactus-common";
import { ISocketApiClient } from "@hyperledger/cactus-core-api";

import { Socket, SocketOptions, ManagerOptions, io } from "socket.io-client";
import { readFile } from "fs";
import { resolve as resolvePath } from "path";
import { verify, VerifyOptions, VerifyErrors, JwtPayload } from "jsonwebtoken";
import { Observable, ReplaySubject } from "rxjs";
import { finalize } from "rxjs/operators";

export function verifyValidatorJwt(
keyPath: string,
targetData: string,
): Promise<JwtPayload> {
return new Promise((resolve, reject) => {
readFile(
resolvePath(__dirname, keyPath),
(fileError: Error | null, publicKey: Buffer) => {
if (fileError) {
reject(fileError);
}

const option: VerifyOptions = {
algorithms: ["ES256"],
};

verify(
targetData,
publicKey,
option,
(err: VerifyErrors | null, decoded: JwtPayload | undefined) => {
if (err) {
reject(err);
} else if (decoded === undefined) {
reject(Error("Decoded message is undefined"));
} else {
resolve(decoded);
}
},
);
},
);
});
}

export type SocketIOApiClientOptions = {
readonly validatorID: string;
readonly validatorURL: string;
readonly validatorKeyPath: string;
readonly logLevel?: LogLevelDesc;
readonly maxCounterRequestID?: number;
readonly syncFunctionTimeoutMillisecond?: number;
readonly socketOptions?: Partial<ManagerOptions & SocketOptions>;
};

export class SocketLedgerEvent {
id = "";
verifierId = "";
data: Record<string, unknown> | null = null;
}

// TODO - To entire class - analyze and handle broken connection / socket disconnected scenarios
export class SocketIOApiClient implements ISocketApiClient<SocketLedgerEvent> {
private readonly log: Logger;
private readonly socket: Socket;
// TODO - Why replay only last one? Maybe make it configurable?
private monitorSubject: ReplaySubject<SocketLedgerEvent> | undefined;

readonly className: string;
counterReqID = 1;
checkValidator: (
key: string,
data: string,
) => Promise<JwtPayload> = verifyValidatorJwt;

constructor(public readonly options: SocketIOApiClientOptions) {
this.className = this.constructor.name;

Checks.nonBlankString(
options.validatorID,
`${this.className}::constructor() validatorID`,
);
Checks.nonBlankString(
options.validatorURL,
`${this.className}::constructor() validatorURL`,
);
Checks.nonBlankString(
// TODO - checks path exists?
options.validatorKeyPath,
`${this.className}::constructor() validatorKeyPath`,
);

const level = this.options.logLevel || "INFO";
const label = this.className;
this.log = LoggerProvider.getOrCreate({ level, label });

this.log.info(
`Created ApiClient for Validator ID: ${options.validatorID}, URL ${options.validatorURL}, KeyPath ${options.validatorKeyPath}`,
);
this.log.debug("socketOptions:", options.socketOptions);

this.socket = io(options.validatorURL, options.socketOptions);
}

public sendAsyncRequest(
contract: Record<string, unknown>,
method: Record<string, unknown>,
args: any,
): void {
try {
const requestData = {
contract: contract,
method: method,
args: args,
};

this.log.debug("sendAsyncRequest() Request:", requestData);
this.socket.emit("request2", requestData);
} catch (err) {
this.log.error("sendAsyncRequest() EXCEPTION", err);
throw err;
}
}

// TODO - Refactor to RxJS
public sendSyncRequest(
contract: Record<string, unknown>,
method: Record<string, unknown>,
args: any,
): Promise<any> {
return new Promise((resolve, reject) => {
this.log.debug("call : sendSyncRequest");

try {
this.log.debug(
"##in sendSyncRequest, contract:",
contract,
"method:",
method,
"args:",
args,
);
let responseFlag = false;

// reqID generation
const reqID = this.genarateReqID();
this.log.debug(`##sendSyncRequest, reqID = ${reqID}`);

this.socket.on("connect_error", (err: Error) => {
this.log.error("##connect_error:", err);
this.socket.disconnect();
reject(err);
});
this.socket.on("connect_timeout", (err: Record<string, unknown>) => {
this.log.error("####Error:", err);
this.socket.disconnect();
reject(err);
});
this.socket.on("error", (err: Record<string, unknown>) => {
this.log.error("####Error:", err);
this.socket.disconnect();
reject(err);
});
this.socket.on("response", (result: any) => {
this.log.debug("#[recv]response, res:", result);
if (reqID === result.id) {
responseFlag = true;

this.checkValidator(
this.options.validatorKeyPath,
result.resObj.data,
)
.then((decodedData) => {
this.log.debug("checkValidator decodedData:", decodedData);
const resultObj = {
status: result.resObj.status,
data: decodedData.result,
};
this.log.debug("resultObj =", resultObj);
// Result reply
resolve(resultObj);
})
.catch((err) => {
responseFlag = false;
this.log.debug("checkValidator error:", err);
this.log.error(err);
});
}
});

// Call Validator
const requestData = {
contract: contract,
method: method,
args: args,
reqID: reqID,
};
this.log.debug("requestData:", requestData);
this.socket.emit("request2", requestData);
this.log.debug("set timeout");

// Time-out setting
const timeoutMilliseconds =
this.options.syncFunctionTimeoutMillisecond ||
defaultSyncFunctionTimeoutMillisecond;
setTimeout(() => {
if (responseFlag === false) {
this.log.debug("requestTimeout reqID:", reqID);
resolve({ status: 504 });
}
}, timeoutMilliseconds);
} catch (err) {
this.log.error("##Error: sendSyncRequest:", err);
reject(err);
}
});
}

public isWatchBlocksRunning(): boolean {
return this.monitorSubject !== undefined && !this.monitorSubject.closed;
}

public watchBlocksV1(
monitorOptions?: Record<string, unknown>,
): Observable<SocketLedgerEvent> {
if (this.monitorSubject) {
this.log.debug("Reuse observable subject from previous call...");
if (monitorOptions) {
this.log.info(
"Passed monitorOptions will be ignored since monitoring is already in progress!",
);
}
return this.monitorSubject;
} else {
this.log.debug("Create new observable subject...");

this.monitorSubject = new ReplaySubject<SocketLedgerEvent>(0);

this.log.debug("call : startMonitor");
try {
this.log.debug(
`##in startMonitor, validatorUrl = ${this.options.validatorURL}`,
);

this.socket.on("connect_error", (err: Error) => {
this.log.error("##connect_error:", err);
this.socket.disconnect();
if (this.monitorSubject) {
this.monitorSubject.error(err);
}
});

this.socket.on("connect_timeout", (err: Record<string, unknown>) => {
this.log.error("####Error:", err);
this.socket.disconnect();
if (this.monitorSubject) {
this.monitorSubject.error(err);
}
});

this.socket.on("error", (err: Record<string, unknown>) => {
this.log.error("####Error:", err);
this.socket.disconnect();
if (this.monitorSubject) {
this.monitorSubject.error(err);
}
});

this.socket.on("monitor_error", (err: Record<string, unknown>) => {
this.log.error("#### Monitor Error:", err);
if (this.monitorSubject) {
this.monitorSubject.error(err);
}
});

this.socket.on("eventReceived", (res: any) => {
// output the data received from the client
this.log.debug("#[recv]eventReceived, res:", res);

this.checkValidator(this.options.validatorKeyPath, res.blockData)
.then((decodedData) => {
const resultObj = {
status: res.status,
blockData: decodedData.blockData,
};
this.log.debug("resultObj =", resultObj);
const event = new SocketLedgerEvent();
event.verifierId = this.options.validatorID;
this.log.debug(`##event.verifierId: ${event.verifierId}`);
event.data = resultObj;
if (this.monitorSubject) {
this.monitorSubject.next(event);
}
})
.catch((err) => {
this.log.error(err);
});
});

const emitStartMonitor = () => {
this.log.debug("##emit: startMonitor");
if (!monitorOptions || Object.keys(monitorOptions).length === 0) {
this.socket.emit("startMonitor");
} else {
this.socket.emit("startMonitor", monitorOptions);
}
};

if (this.socket.connected) {
emitStartMonitor();
} else {
this.socket.on("connect", () => {
this.log.debug("#connect");
emitStartMonitor();
});
}
} catch (err) {
this.log.error(`##Error: startMonitor, ${err}`);
this.monitorSubject.error(err);
}

return this.monitorSubject.pipe(
finalize(() => {
if (this.monitorSubject && !this.monitorSubject.observed) {
// Last observer finished
this.log.debug("##emit: stopMonitor");
this.socket.emit("stopMonitor");
this.monitorSubject = undefined;
}
}),
);
}
}

// Request ID generation
private genarateReqID(): string {
const maxCounterRequestID =
this.options.maxCounterRequestID || defaultMaxCounterRequestID;
if (this.counterReqID > maxCounterRequestID) {
// Counter initialization
this.counterReqID = 1;
}
return `${this.options.validatorID}_${this.counterReqID++}`;
}
}
Loading

0 comments on commit 8670e68

Please sign in to comment.