diff --git a/packages/cactus-api-client/package.json b/packages/cactus-api-client/package.json index a294a149879..a6ab296e814 100644 --- a/packages/cactus-api-client/package.json +++ b/packages/cactus-api-client/package.json @@ -63,6 +63,10 @@ "@hyperledger/cactus-common": "1.0.0-rc.3", "@hyperledger/cactus-core": "1.0.0-rc.3", "@hyperledger/cactus-core-api": "1.0.0-rc.3", - "@hyperledger/cactus-plugin-consortium-manual": "1.0.0-rc.3" + "@hyperledger/cactus-plugin-consortium-manual": "1.0.0-rc.3", + "rxjs": "7.3.0" + }, + "devDependencies": { + "@hyperledger/cactus-test-tooling": "1.0.0-rc.3" } } diff --git a/packages/cactus-api-client/src/main/typescript/index.web.ts b/packages/cactus-api-client/src/main/typescript/index.web.ts index 87cb558397c..7d8b42d7859 100644 --- a/packages/cactus-api-client/src/main/typescript/index.web.ts +++ b/packages/cactus-api-client/src/main/typescript/index.web.ts @@ -1 +1,2 @@ -export * from "./public-api"; +export { ApiClient } from "./api-client"; +export { DefaultConsortiumProvider } from "./default-consortium-provider"; diff --git a/packages/cactus-api-client/src/main/typescript/public-api.ts b/packages/cactus-api-client/src/main/typescript/public-api.ts index 7d8b42d7859..d1079aeba48 100644 --- a/packages/cactus-api-client/src/main/typescript/public-api.ts +++ b/packages/cactus-api-client/src/main/typescript/public-api.ts @@ -1,2 +1,8 @@ export { ApiClient } from "./api-client"; export { DefaultConsortiumProvider } from "./default-consortium-provider"; +export { + SocketIOApiClient, + SocketLedgerEvent, + SocketIOApiClientOptions, +} from "./socketio-api-client"; +export { Verifier, VerifierEventListener } from "./verifier"; diff --git a/packages/cactus-api-client/src/main/typescript/socketio-api-client.ts b/packages/cactus-api-client/src/main/typescript/socketio-api-client.ts new file mode 100644 index 00000000000..c6e7b39a3ec --- /dev/null +++ b/packages/cactus-api-client/src/main/typescript/socketio-api-client.ts @@ -0,0 +1,395 @@ +/** + * Copyright 2020-2021 Hyperledger Cactus Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * @cactus-api-client/socketio-api-client.ts + */ + +const defaultMaxCounterRequestID = 100; +const defaultSyncFunctionTimeoutMillisecond = 5 * 1000; // 5 seconds + +import { Logger, Checks } from "@hyperledger/cactus-common"; +import { LogLevelDesc, LoggerProvider } from "@hyperledger/cactus-common"; +import { ISocketApiClient } from "@hyperledger/cactus-core-api"; + +import { Socket, SocketOptions, ManagerOptions, io } from "socket.io-client"; +import { readFile } from "fs"; +import { resolve as resolvePath } from "path"; +import { verify, VerifyOptions, VerifyErrors, JwtPayload } from "jsonwebtoken"; +import { Observable, ReplaySubject } from "rxjs"; +import { finalize } from "rxjs/operators"; + +/** + * Default logic for validating responses from socketio connector (validator). + * Assumes that message is JWT signed with validator private key. + * @param keyPath - Absolute or relative path to validator public key. + * @param targetData - Signed JWT message to be decoded. + * @returns Promise resolving to decoded JwtPayload. + */ +export function verifyValidatorJwt( + keyPath: string, + targetData: string, +): Promise { + return new Promise((resolve, reject) => { + readFile( + resolvePath(__dirname, keyPath), + (fileError: Error | null, publicKey: Buffer) => { + if (fileError) { + reject(fileError); + } + + const option: VerifyOptions = { + algorithms: ["ES256"], + }; + + verify( + targetData, + publicKey, + option, + (err: VerifyErrors | null, decoded: JwtPayload | undefined) => { + if (err) { + reject(err); + } else if (decoded === undefined) { + reject(Error("Decoded message is undefined")); + } else { + resolve(decoded); + } + }, + ); + }, + ); + }); +} + +/** + * Input parameters for SocketIOApiClient construction. + */ +export type SocketIOApiClientOptions = { + readonly validatorID: string; + readonly validatorURL: string; + readonly validatorKeyPath: string; + readonly logLevel?: LogLevelDesc; + readonly maxCounterRequestID?: number; + readonly syncFunctionTimeoutMillisecond?: number; + readonly socketOptions?: Partial; +}; + +/** + * Type of the message emitted from ledger monitoring. + */ +export class SocketLedgerEvent { + id = ""; + verifierId = ""; + data: Record | null = null; +} + +/** + * Client for sending requests to some socketio ledger connectors (validators) using socketio protocol. + * + * @todo Analyze and handle broken connection / socket disconnected scenarios + */ +export class SocketIOApiClient implements ISocketApiClient { + private readonly log: Logger; + private readonly socket: Socket; + // @todo - Why replay only last one? Maybe make it configurable? + private monitorSubject: ReplaySubject | undefined; + + readonly className: string; + counterReqID = 1; + checkValidator: ( + key: string, + data: string, + ) => Promise = verifyValidatorJwt; + + /** + * @param validatorID - (required) ID of validator. + * @param validatorURL - (required) URL to validator socketio endpoint. + * @param validatorKeyPath - (required) Path to validator public key in local storage. + */ + constructor(public readonly options: SocketIOApiClientOptions) { + this.className = this.constructor.name; + + Checks.nonBlankString( + options.validatorID, + `${this.className}::constructor() validatorID`, + ); + Checks.nonBlankString( + options.validatorURL, + `${this.className}::constructor() validatorURL`, + ); + Checks.nonBlankString( + // TODO - checks path exists? + options.validatorKeyPath, + `${this.className}::constructor() validatorKeyPath`, + ); + + const level = this.options.logLevel || "INFO"; + const label = this.className; + this.log = LoggerProvider.getOrCreate({ level, label }); + + this.log.info( + `Created ApiClient for Validator ID: ${options.validatorID}, URL ${options.validatorURL}, KeyPath ${options.validatorKeyPath}`, + ); + this.log.debug("socketOptions:", options.socketOptions); + + this.socket = io(options.validatorURL, options.socketOptions); + } + + /** + * Immediately sends request to the validator, doesn't report any error or responses. + * @param contract - contract to execute on the ledger. + * @param method - function / method to be executed by validator. + * @param args - arguments. + */ + public sendAsyncRequest( + contract: Record, + method: Record, + args: any, + ): void { + try { + const requestData = { + contract: contract, + method: method, + args: args, + }; + + this.log.debug("sendAsyncRequest() Request:", requestData); + this.socket.emit("request2", requestData); + } catch (err) { + this.log.error("sendAsyncRequest() EXCEPTION", err); + throw err; + } + } + + /** + * Sends request to be executed on the ledger, watches and reports any error and the response from a ledger. + * @param contract - contract to execute on the ledger. + * @param method - function / method to be executed by validator. + * @param args - arguments. + * @returns Promise that will resolve with response from the ledger, or reject when error occurred. + * @todo Refactor to RxJS + */ + public sendSyncRequest( + contract: Record, + method: Record, + args: any, + ): Promise { + return new Promise((resolve, reject) => { + this.log.debug("call : sendSyncRequest"); + + try { + this.log.debug( + "##in sendSyncRequest, contract:", + contract, + "method:", + method, + "args:", + args, + ); + let responseFlag = false; + + // reqID generation + const reqID = this.genarateReqID(); + this.log.debug(`##sendSyncRequest, reqID = ${reqID}`); + + this.socket.on("connect_error", (err: Error) => { + this.log.error("##connect_error:", err); + this.socket.disconnect(); + reject(err); + }); + this.socket.on("connect_timeout", (err: Record) => { + this.log.error("####Error:", err); + this.socket.disconnect(); + reject(err); + }); + this.socket.on("error", (err: Record) => { + this.log.error("####Error:", err); + this.socket.disconnect(); + reject(err); + }); + this.socket.on("response", (result: any) => { + this.log.debug("#[recv]response, res:", result); + if (reqID === result.id) { + responseFlag = true; + + this.checkValidator( + this.options.validatorKeyPath, + result.resObj.data, + ) + .then((decodedData) => { + this.log.debug("checkValidator decodedData:", decodedData); + const resultObj = { + status: result.resObj.status, + data: decodedData.result, + }; + this.log.debug("resultObj =", resultObj); + // Result reply + resolve(resultObj); + }) + .catch((err) => { + responseFlag = false; + this.log.debug("checkValidator error:", err); + this.log.error(err); + }); + } + }); + + // Call Validator + const requestData = { + contract: contract, + method: method, + args: args, + reqID: reqID, + }; + this.log.debug("requestData:", requestData); + this.socket.emit("request2", requestData); + this.log.debug("set timeout"); + + // Time-out setting + const timeoutMilliseconds = + this.options.syncFunctionTimeoutMillisecond || + defaultSyncFunctionTimeoutMillisecond; + setTimeout(() => { + if (responseFlag === false) { + this.log.debug("requestTimeout reqID:", reqID); + resolve({ status: 504 }); + } + }, timeoutMilliseconds); + } catch (err) { + this.log.error("##Error: sendSyncRequest:", err); + reject(err); + } + }); + } + + /** + * Start monitoring for new blocks on the ledger associated with given connector. + * @param monitorOptions - Options to be passed to validator `startMonitoring` procedure. + * @returns RxJs Observable, `next` - new block, `error` - any error from the validator. + */ + public watchBlocksV1( + monitorOptions?: Record, + ): Observable { + if (this.monitorSubject) { + this.log.debug("Reuse observable subject from previous call..."); + if (monitorOptions) { + this.log.info( + "Passed monitorOptions will be ignored since monitoring is already in progress!", + ); + } + return this.monitorSubject; + } else { + this.log.debug("Create new observable subject..."); + + this.monitorSubject = new ReplaySubject(0); + + this.log.debug("call : startMonitor"); + try { + this.log.debug( + `##in startMonitor, validatorUrl = ${this.options.validatorURL}`, + ); + + this.socket.on("connect_error", (err: Error) => { + this.log.error("##connect_error:", err); + this.socket.disconnect(); + if (this.monitorSubject) { + this.monitorSubject.error(err); + } + }); + + this.socket.on("connect_timeout", (err: Record) => { + this.log.error("####Error:", err); + this.socket.disconnect(); + if (this.monitorSubject) { + this.monitorSubject.error(err); + } + }); + + this.socket.on("error", (err: Record) => { + this.log.error("####Error:", err); + this.socket.disconnect(); + if (this.monitorSubject) { + this.monitorSubject.error(err); + } + }); + + this.socket.on("monitor_error", (err: Record) => { + this.log.error("#### Monitor Error:", err); + if (this.monitorSubject) { + this.monitorSubject.error(err); + } + }); + + this.socket.on("eventReceived", (res: any) => { + // output the data received from the client + this.log.debug("#[recv]eventReceived, res:", res); + + this.checkValidator(this.options.validatorKeyPath, res.blockData) + .then((decodedData) => { + const resultObj = { + status: res.status, + blockData: decodedData.blockData, + }; + this.log.debug("resultObj =", resultObj); + const event = new SocketLedgerEvent(); + event.verifierId = this.options.validatorID; + this.log.debug(`##event.verifierId: ${event.verifierId}`); + event.data = resultObj; + if (this.monitorSubject) { + this.monitorSubject.next(event); + } + }) + .catch((err) => { + this.log.error(err); + }); + }); + + const emitStartMonitor = () => { + this.log.debug("##emit: startMonitor"); + if (!monitorOptions || Object.keys(monitorOptions).length === 0) { + this.socket.emit("startMonitor"); + } else { + this.socket.emit("startMonitor", monitorOptions); + } + }; + + if (this.socket.connected) { + emitStartMonitor(); + } else { + this.socket.on("connect", () => { + this.log.debug("#connect"); + emitStartMonitor(); + }); + } + } catch (err) { + this.log.error(`##Error: startMonitor, ${err}`); + this.monitorSubject.error(err); + } + + return this.monitorSubject.pipe( + finalize(() => { + if (this.monitorSubject && !this.monitorSubject.observed) { + // Last observer finished + this.log.debug("##emit: stopMonitor"); + this.socket.emit("stopMonitor"); + this.monitorSubject = undefined; + } + }), + ); + } + } + + /** + * Generated sync request id used to track and match responses from the validator. + * @returns ID lower than maxCounterRequestID. + */ + private genarateReqID(): string { + const maxCounterRequestID = + this.options.maxCounterRequestID || defaultMaxCounterRequestID; + if (this.counterReqID > maxCounterRequestID) { + // Counter initialization + this.counterReqID = 1; + } + return `${this.options.validatorID}_${this.counterReqID++}`; + } +} diff --git a/packages/cactus-api-client/src/main/typescript/verifier.ts b/packages/cactus-api-client/src/main/typescript/verifier.ts new file mode 100644 index 00000000000..1acfddd5793 --- /dev/null +++ b/packages/cactus-api-client/src/main/typescript/verifier.ts @@ -0,0 +1,123 @@ +/* + * Copyright 2020-2021 Hyperledger Cactus Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * Verifier.ts + */ + +import { Subscription } from "rxjs"; + +import { Logger } from "@hyperledger/cactus-common"; +import { LogLevelDesc, LoggerProvider } from "@hyperledger/cactus-common"; +import { ISocketApiClient } from "@hyperledger/cactus-core-api"; + +/** + * Interface required for monitoring ledger event using callback to Verifier.startMonitor() + */ +export interface VerifierEventListener { + onEvent(ledgerEvent: BlockType): void; + onError?(err: any): void; +} + +/** + * Utility type for retrieving monitoring event / new block type from generic ISocketApiClient interface. + */ +type BlockTypeFromSocketApi = T extends ISocketApiClient + ? U + : never; + +/** + * Extends ledger connector ApiClient with additional monitoring methods (using callbacks, instead of reactive). + * + * @remarks + * Migrated from cmd-socketio-server for merging the codebases. + */ +export class Verifier> { + private readonly log: Logger; + readonly className: string; + readonly runningMonitors = new Map(); + + /** + * @param ledgerApi - ApiClient for communicating with ledger connector plugin (must implement `ISocketApiClient`) + * @param logLevel - Log level used by the Verifier. + */ + constructor( + public readonly ledgerApi: LedgerApiType, + logLevel: LogLevelDesc = "INFO", + ) { + this.className = this.constructor.name; + this.log = LoggerProvider.getOrCreate({ + level: logLevel, + label: this.className, + }); + this.log.debug("Created Verifier for ledger API"); + } + + /** + * Start monitoring for new events / blocks from underlying ledger. + * + * @param appId - Used to track different apps that use the monitoring. + * Each app has one subscription to common monitoring subject returned by the ApiClient watch method. + * @param eventListener - Type that supplies callbacks called when new event / error was encountered. + * @param monitorOptions - Options passed to the validator. + */ + startMonitor( + appId: string, + eventListener: VerifierEventListener>, + monitorOptions?: Record, + ): void { + if (this.runningMonitors.has(appId)) { + throw new Error(`Monitor with appId '${appId}' is already running!`); + } + + this.log.debug("call : startMonitor appId =", appId); + + try { + const blocksObservable = this.ledgerApi.watchBlocksV1(monitorOptions); + + const watchBlocksSub = blocksObservable.subscribe({ + next: (blockData: unknown) => { + eventListener.onEvent( + blockData as BlockTypeFromSocketApi, + ); + }, + error: (err) => { + this.log.error("Error when watching for new blocks, err:", err); + if (eventListener.onError) { + eventListener.onError(err); + } + }, + complete: () => { + this.log.info("Watch completed"); + }, + }); + + this.runningMonitors.set(appId, watchBlocksSub); + this.log.debug( + "New monitor added, runningMonitors.size ==", + this.runningMonitors.size, + ); + } catch (err) { + this.log.error(`##Error: startMonitor, ${err}`); + this.runningMonitors.delete(appId); + } + } + + /** + * Stops the monitor for specified app, removes it's subscription from internal storage. + * + * @param appId - ID of application that requested the monitoring. + */ + stopMonitor(appId: string): void { + const watchBlocksSub = this.runningMonitors.get(appId); + if (!watchBlocksSub) { + throw new Error("No monitor running with appId: " + appId); + } + watchBlocksSub.unsubscribe(); + this.runningMonitors.delete(appId); + this.log.debug( + "Monitor removed, runningMonitors.size ==", + this.runningMonitors.size, + ); + } +} diff --git a/packages/cactus-api-client/src/test/typescript/unit/socketio-api-client.test.ts b/packages/cactus-api-client/src/test/typescript/unit/socketio-api-client.test.ts new file mode 100644 index 00000000000..6a5f88f552e --- /dev/null +++ b/packages/cactus-api-client/src/test/typescript/unit/socketio-api-client.test.ts @@ -0,0 +1,726 @@ +/** Base Class: packages/cactus-api-client/src/main/typescript/socketio-api-client.ts + * Note: + * Don't use jest timer mocks here, they do not work well with node http module. + * With timer mocks tests will either hang or report open timeout handle. + * Tests: + * - verifyValidatorJwt(), + * - SocketIOApiClient construction, + * - SocketIOApiClient.sendAsyncRequest(), + * - SocketIOApiClient.sendSyncRequest(), + * - SocketIOApiClient.watchBlocksV1(), + */ + +const testLogLevel: LogLevelDesc = "info"; +const sutLogLevel: LogLevelDesc = "info"; + +import "jest-extended"; +import { cloneDeep } from "lodash"; +import { sign, JwtPayload } from "jsonwebtoken"; + +// Unit Test logger setup +import { + Logger, + LoggerProvider, + LogLevelDesc, +} from "@hyperledger/cactus-common"; +const log: Logger = LoggerProvider.getOrCreate({ + label: "socketio-api-client.test", + level: testLogLevel, +}); + +// Default SocketIOApiClient config input +const defaultConfigOptions = { + validatorID: "123validatorId321", + validatorURL: "https://example:1234", + validatorKeyPath: "./nonexistent/path/somekey.crt", + logLevel: sutLogLevel, + maxCounterRequestID: 3, + syncFunctionTimeoutMillisecond: 50, + socketOptions: { + rejectUnauthorized: false, + reconnection: false, + timeout: 55555, + }, +}; + +// Generate private / public keys for test purposes +import { generateKeyPairSync } from "crypto"; +const { publicKey, privateKey } = generateKeyPairSync("ec", { + namedCurve: "P-256", +}); + +import { SocketIOTestSetupHelpers } from "@hyperledger/cactus-test-tooling"; + +// Mock public key reading +import fs from "fs"; +jest + .spyOn(fs, "readFile") + .mockImplementation((_: unknown, callback: any) => + callback( + null, + Buffer.from(publicKey.export({ type: "spki", format: "pem" })), + ), + ); + +import { + SocketIOApiClient, + verifyValidatorJwt, + SocketLedgerEvent, +} from "../../../main/typescript/socketio-api-client"; + +////////////////////////////////// +// Test Timeout +////////////////////////////////// + +jest.setTimeout(3000); + +////////////////////////////////// +// verifyValidatorJwt() +////////////////////////////////// + +describe("verifyValidatorJwt tests", () => { + const mockKeyPath = "someKeyPath.pem"; + const message = { + message: "Hello", + from: "Someone", + }; + let signedMessage = ""; + + beforeAll(() => { + log.debug("input message:", message); + + // Encrypt the message (from validator) + signedMessage = sign( + message, + privateKey.export({ type: "sec1", format: "pem" }), + { + algorithm: "ES256", + expiresIn: "1 day", + }, + ); + expect(signedMessage).toBeTruthy(); + log.debug("signedMessage:", signedMessage); + }); + + test("Decrypts the payload from the validator using it's public key", async () => { + // Verify (decrypt) + const decryptedMessage = await verifyValidatorJwt( + mockKeyPath, + signedMessage, + ); + + // Assert decrypted message + log.debug("decryptedMessage:", decryptedMessage); + expect(decryptedMessage).toMatchObject(message); + const decryptedJwt = decryptedMessage as JwtPayload; + expect(decryptedJwt.iat).toBeNumber(); + expect(decryptedJwt.exp).toBeNumber(); + + // Assert reading correct public key + expect(((fs.readFile as unknown) as jest.Mock).mock.calls.length).toBe(1); + expect(((fs.readFile as unknown) as jest.Mock).mock.calls[0][0]).toContain( + mockKeyPath, + ); + }); + + test("Rejects malicious message", () => { + // Reverse original message to produce wrong input + const maliciousMessage = signedMessage.split("").reverse().join(""); + log.debug("maliciousMessage", maliciousMessage); + + // Verify (decrypt) + return expect(verifyValidatorJwt(mockKeyPath, maliciousMessage)).toReject(); + }); + + test("Rejects expired message", (done) => { + // Encrypt the message (from validator) with short expiration time + signedMessage = sign( + message, + privateKey.export({ type: "sec1", format: "pem" }), + { + algorithm: "ES256", + expiresIn: "1", + }, + ); + expect(signedMessage).toBeTruthy(); + + setTimeout(async () => { + // Verify after short timeout + await expect(verifyValidatorJwt(mockKeyPath, signedMessage)).toReject(); + done(); + }, 100); + }); +}); + +////////////////////////////////// +// SocketIOApiClient Constructor +////////////////////////////////// + +describe("Construction Tests", () => { + let sut: SocketIOApiClient; + + beforeAll(async () => { + sut = new SocketIOApiClient(defaultConfigOptions); + }); + + test("Sets options field from constructor argument", () => { + expect(sut.options).toEqual(defaultConfigOptions); + }); + + test("Sets className field for the class", () => { + expect(sut.className).toEqual("SocketIOApiClient"); + }); + + test("Throws on empty required options fields", () => { + // Empty validatorID + let configOptions = cloneDeep(defaultConfigOptions); + configOptions.validatorID = ""; + expect(() => new SocketIOApiClient(configOptions)).toThrow(); + + // Empty validatorURL + configOptions = cloneDeep(defaultConfigOptions); + configOptions.validatorURL = ""; + expect(() => new SocketIOApiClient(configOptions)).toThrow(); + + // Empty validatorKeyPath + configOptions = cloneDeep(defaultConfigOptions); + configOptions.validatorKeyPath = ""; + expect(() => new SocketIOApiClient(configOptions)).toThrow(); + }); +}); + +////////////////////////////////// +// SocketIOApiClient Logic +////////////////////////////////// + +describe("SocketIOApiClient Tests", function () { + const reqContract = {}; + const reqMethod = { type: "web3Eth", command: "getBalance" }; + const reqArgs = ["06fc56347d91c6ad2dae0c3ba38eb12ab0d72e97"]; + + let testServer: SocketIOTestSetupHelpers.Server; + let testServerPort: string; + let clientSocket: SocketIOTestSetupHelpers.ClientSocket; + let serverSocket: SocketIOTestSetupHelpers.ServerSocket; + let sut: SocketIOApiClient; + + beforeAll(async () => { + [ + testServer, + testServerPort, + ] = await SocketIOTestSetupHelpers.createListeningMockServer(); + }); + + afterAll((done) => { + testServer.close(() => { + log.debug("Test server closed"); + done(); + }); + }); + + beforeEach(async () => { + clientSocket = SocketIOTestSetupHelpers.createClientSocket(testServerPort); + + // Mock client socket in verifier + sut = new SocketIOApiClient(defaultConfigOptions); + (sut as any)["socket"] = clientSocket; + }); + + afterEach(() => { + if (clientSocket) { + clientSocket.close(); + } + + if (serverSocket) { + serverSocket.disconnect(true); + } + + testServer.sockets.removeAllListeners(); + }); + + ////////////////////////////////// + // sendAsyncRequest() + ////////////////////////////////// + + describe("Send SocketIO Async Request Tests", function () { + beforeEach(async () => { + // Connect client and server sockets + testServer.on("connection", (socket) => { + log.debug("Server socket connected", socket.id); + serverSocket = socket; + }); + + await SocketIOTestSetupHelpers.connectTestClient(clientSocket); + expect(clientSocket.connected).toBeTrue(); + expect(serverSocket.connected).toBeTrue(); + }); + + test("Sends request2 with valid args for socketio verifier", () => { + const reqReceived = new Promise((resolve) => { + serverSocket.on("request2", (req: any) => resolve(req)); + }); + + sut.sendAsyncRequest(reqContract, reqMethod, reqArgs); + + return reqReceived.then((req: any) => { + expect(req.contract).toEqual(reqContract); + expect(req.method).toEqual(reqMethod); + expect(req.args).toEqual(reqArgs); + }); + }); + }); + + ////////////////////////////////// + // sendSyncRequest() + ////////////////////////////////// + + describe("Sync Request Tests", function () { + beforeEach(async () => { + // Connect client and server sockets + testServer.on("connection", (socket) => { + log.debug("Server socket connected", socket.id); + serverSocket = socket; + }); + + await SocketIOTestSetupHelpers.connectTestClient(clientSocket); + expect(clientSocket.connected).toBeTrue(); + expect(serverSocket.connected).toBeTrue(); + }); + + test("Returns 504 on request timeout", async () => { + const reqPromise = sut.sendSyncRequest(reqContract, reqMethod, reqArgs); + await expect(reqPromise).resolves.toEqual({ status: 504 }); // timeout + }); + + test("Sends request2 with valid arguments", () => { + const reqReceived = new Promise((resolve) => { + serverSocket.on("request2", (req: any) => resolve(req)); + }); + + const responseReceived = sut.sendSyncRequest( + reqContract, + reqMethod, + reqArgs, + ); + + return Promise.all([ + expect(responseReceived).resolves.toEqual({ status: 504 }), // timeout + reqReceived.then((req: any) => { + expect(req.contract).toEqual(reqContract); + expect(req.method).toEqual(reqMethod); + expect(req.args).toEqual(reqArgs); + expect(req.reqID).toBeString(); + expect(req.reqID.length).toBeGreaterThan(0); + }), + ]); + }); + + test("Sends unique request id until maxCounterRequestID", (done) => { + const seenReqID = new Set(); + const maxCounterRequestID = defaultConfigOptions.maxCounterRequestID; + let calledTimes = 0; + + serverSocket.on("request2", (req: any) => { + expect(seenReqID).not.toContain(req.reqID); + seenReqID.add(req.reqID); + calledTimes++; + if (calledTimes === maxCounterRequestID) { + expect(seenReqID.size).toEqual(maxCounterRequestID); + done(); + } + }); + + // Send maxCounterRequestID requests + for (let i = 0; i < maxCounterRequestID; i++) { + expect( + sut.sendSyncRequest(reqContract, reqMethod, reqArgs), + ).resolves.toEqual({ status: 504 }); // timeout + } + }); + + test("Returns decoded response from the validator", () => { + const encryptedData = "abcXXX123"; + const decryptedData = "1234"; + const responseStatus = 200; + + serverSocket.on("request2", (req: any) => { + serverSocket.emit("response", { + id: req.reqID, + resObj: { + data: encryptedData, + status: responseStatus, + }, + }); + }); + + const verifyMock = jest.fn(); + verifyMock.mockResolvedValue({ + result: decryptedData, + iat: 3333333333, + exp: 7777777777, + }); + sut.checkValidator = verifyMock; + + return expect(sut.sendSyncRequest(reqContract, reqMethod, reqArgs)) + .resolves.toEqual({ status: responseStatus, data: decryptedData }) + .then(() => { + expect(verifyMock).toHaveBeenCalledTimes(1); + expect(verifyMock).toBeCalledWith( + defaultConfigOptions.validatorKeyPath, + encryptedData, + ); + }); + }); + + test("Doesn't process message from not verified validator", () => { + serverSocket.on("request2", (req: any) => { + serverSocket.emit("response", { + id: req.reqID, + resObj: { + data: "abcXXX123", + status: 200, + }, + }); + }); + + const verifyMock = jest.fn(); + verifyMock.mockRejectedValue({ message: "mock verify error" }); + sut.checkValidator = verifyMock; + + return expect( + sut.sendSyncRequest(reqContract, reqMethod, reqArgs), + ).resolves.toEqual({ status: 504 }); // timeout + }); + + test("Process only requests with matching ID", () => { + serverSocket.on("request2", () => { + serverSocket.emit("response", { + id: "not_existing_id", + resObj: { + data: "abcXXX123", + status: 200, + }, + }); + }); + + const verifyMock = jest.fn(); + sut.checkValidator = verifyMock; + + return expect(sut.sendSyncRequest(reqContract, reqMethod, reqArgs)) + .resolves.toEqual({ status: 504 }) // timeout + .then(() => { + expect(verifyMock).not.toBeCalled(); + }); + }); + }); + + ////////////////////////////////// + // watchBlocksV1() + ////////////////////////////////// + + describe("Monitoring Tests", function () { + const options = { opt: "yes" }; + const monitorError = { + code: 404, + message: "Mock Validator Monitor Error", + }; + + test("Sends stopMonitor and report error on monitor error", (done) => { + expect.assertions(1); + + // Connect client and server sockets + testServer.on("connection", (socket) => { + log.debug("Server socket connected", socket.id); + serverSocket = socket; + + serverSocket.on("stopMonitor", () => { + log.info("stopMonitor received - OK"); + done(); + }); + + serverSocket.emit("monitor_error", monitorError); + }); + + sut.watchBlocksV1().subscribe({ + next() { + done(new Error("Shouldn't trigger new block message")); + }, + error(err) { + log.info("Monitor_error received on the Verifier side."); + expect(err).toEqual(monitorError); // race-condition? Will it always happen before emit? + }, + }); + }); + + test("Sends stopMonitor on unsubscribe", (done) => { + // Connect client and server sockets + testServer.on("connection", (socket) => { + log.debug("Server socket connected", socket.id); + serverSocket = socket; + + serverSocket.on("stopMonitor", () => { + log.info("stopMonitor received - OK"); + done(); + }); + }); + + const observer = sut.watchBlocksV1().subscribe({ + next() { + done(new Error("Shouldn't trigger new block message")); + }, + error() { + done(new Error("Shouldn't report any errors")); + }, + }); + + observer.unsubscribe(); + }); + + test("Sends startMonitor to the validator", (done) => { + expect.assertions(1); + + // Connect client and server sockets + testServer.on("connection", (socket) => { + log.debug("Server socket connected", socket.id); + serverSocket = socket; + + serverSocket.on("startMonitor", (inOpts: typeof options) => { + // Assert options passed + expect(inOpts).toEqual(options); + + // Force premature exit + serverSocket.emit("monitor_error", monitorError); + }); + }); + + sut.watchBlocksV1(options).subscribe({ + next() { + done(new Error("Shouldn't trigger new block message")); + }, + error(err) { + log.info("watchBlocksV1 returned error:", err); + done(); + }, + }); + }); + + test("Pushes new blocks from the validator", (done) => { + const decryptedBlockData = "fooDecrypted321"; + const eventStatus = 200; + + // Server side logic for sending events + testServer.on("connection", (socket) => { + log.debug("Server socket connected", socket.id); + serverSocket = socket; + + serverSocket.once("startMonitor", () => { + serverSocket.emit("eventReceived", { + status: eventStatus, + blockData: "fooSignedBlockData123xxx", + }); + }); + }); + + // Setup checkValidator results (success) + const checkValidatorMock = jest.fn(); + checkValidatorMock.mockResolvedValue({ + blockData: decryptedBlockData, + }); + sut.checkValidator = checkValidatorMock; + + // Receive events from the validator + sut.watchBlocksV1(options).subscribe({ + next(ev: SocketLedgerEvent) { + expect(ev.id).toEqual(""); + expect(ev.verifierId).toEqual(defaultConfigOptions.validatorID); + + if (!ev.data) { + done("Event data is empty or null!"); + } else { + expect((ev.data as { [key: string]: number })["status"]).toEqual( + eventStatus, + ); + expect((ev.data as { [key: string]: string })["blockData"]).toEqual( + decryptedBlockData, + ); + + done(); + } + }, + error(err) { + done(err); + }, + }); + }); + + test("Doesn't push new blocks from not verified validator", (done) => { + // Server side logic for sending events + testServer.on("connection", (socket) => { + log.debug("Server socket connected", socket.id); + serverSocket = socket; + + serverSocket.on("startMonitor", () => { + serverSocket.emit("eventReceived", { + status: 200, + blockData: "fooSignedBlockData123xxx", + }); + + // Force finish the test after a while + setTimeout( + () => serverSocket.emit("monitor_error", monitorError), + 150, + ); + }); + }); + + // Setup checkValidator results (failure) + const checkValidatorMock = jest.fn(); + checkValidatorMock.mockRejectedValue("Mock verify error"); + sut.checkValidator = checkValidatorMock; + + // Receive events from the validator + sut.watchBlocksV1(options).subscribe({ + next(ev: SocketLedgerEvent) { + done("New block was pushed but it shouldn't, event:", ev); + }, + error(err) { + expect(err).toEqual(monitorError); + done(); + }, + }); + }); + + test("Replies last message in multiple subscribers", async () => { + const decryptedBlockData = "fooDecrypted321"; + const eventStatus = 200; + + // Server side logic for sending events + testServer.on("connection", (socket) => { + log.debug("Server socket connected", socket.id); + serverSocket = socket; + + serverSocket.once("startMonitor", () => { + serverSocket.emit("eventReceived", { + status: eventStatus, + blockData: "fooSignedBlockData123xxx", + }); + }); + }); + + // Setup checkValidator results (success) + const checkValidatorMock = jest.fn(); + checkValidatorMock.mockResolvedValue({ + blockData: decryptedBlockData, + }); + sut.checkValidator = checkValidatorMock; + + const blockObservable = sut.watchBlocksV1(options); + + // Receive event on first observer + const firstResults = await new Promise( + (resolve, reject) => { + blockObservable.subscribe({ + next(ev: SocketLedgerEvent) { + if (!ev.data) { + reject("First event data is empty or null!"); // todo - test negative + } else { + log.info("First observer received event:", ev); + resolve(ev); + } + }, + error(err) { + reject(err); + }, + }); + }, + ); + + // Receive the same event on second observer + const secondResults = await new Promise( + (resolve, reject) => { + blockObservable.subscribe({ + next(ev: SocketLedgerEvent) { + if (!ev.data) { + reject("Second event data is empty or null!"); + } else { + log.info("Second observer received event:", ev); + resolve(ev); + } + }, + error(err) { + reject(err); + }, + }); + }, + ); + + expect(secondResults).toEqual(firstResults); + }); + + test("Repeat watchBlocksV1 after previous monitor observable finished", async () => { + // Server side logic for sending events + testServer.on("connection", (socket) => { + log.debug("Server socket connected", socket.id); + serverSocket = socket; + + serverSocket.on("startMonitor", () => { + serverSocket.emit("eventReceived", { + status: 200, + blockData: "fooSignedBlockData123xxx", + }); + }); + }); + + // Setup checkValidator results (success) + const checkValidatorMock = jest.fn(); + checkValidatorMock.mockResolvedValue({ + blockData: "fooDecrypted321", + }); + sut.checkValidator = checkValidatorMock; + + const firstResults = await new Promise( + (resolve, reject) => { + const sub = sut.watchBlocksV1(options).subscribe({ + next(ev: SocketLedgerEvent) { + if (!ev.data) { + sub.unsubscribe(); + reject("First event data is empty or null!"); // todo - test negative + } else { + log.info("First observer received event:", ev); + sub.unsubscribe(); + resolve(ev); + } + }, + error(err) { + sub.unsubscribe(); + reject(err); + }, + }); + }, + ); + + const secondResults = await new Promise( + (resolve, reject) => { + const sub = sut.watchBlocksV1(options).subscribe({ + next(ev: SocketLedgerEvent) { + if (!ev.data) { + sub.unsubscribe(); + reject("2nd event data is empty or null!"); // todo - test negative + } else { + log.info("2nd observer received event:", ev); + sub.unsubscribe(); + resolve(ev); + } + }, + error(err) { + sub.unsubscribe(); + reject(err); + }, + }); + }, + ); + + expect(secondResults).toEqual(firstResults); + }); + }); +}); diff --git a/packages/cactus-api-client/src/test/typescript/unit/verifier.test.ts b/packages/cactus-api-client/src/test/typescript/unit/verifier.test.ts new file mode 100644 index 00000000000..3f8c6f7e371 --- /dev/null +++ b/packages/cactus-api-client/src/test/typescript/unit/verifier.test.ts @@ -0,0 +1,169 @@ +/* Base Class: packages/cactus-api-client/src/main/typescript/verifier.ts + */ + +const testLogLevel: LogLevelDesc = "info"; +const sutLogLevel: LogLevelDesc = "info"; + +import "jest-extended"; +import { Observable } from "rxjs"; + +import { + Logger, + LoggerProvider, + LogLevelDesc, +} from "@hyperledger/cactus-common"; +const log: Logger = LoggerProvider.getOrCreate({ + label: "verifier.test", + level: testLogLevel, +}); + +import { ISocketApiClient } from "@hyperledger/cactus-core-api"; +import { + Verifier, + VerifierEventListener, +} from "../../../main/typescript/verifier"; + +////////////////////////////////// +// Test Timeout +////////////////////////////////// + +jest.setTimeout(3000); + +////////////////////////////////// +// Mocks +////////////////////////////////// + +class MockApiClient implements ISocketApiClient { + // isWatchBlocksRunning = jest + // .fn() + // .mockName("isWatchBlocksRunning") + // .mockReturnValueOnce(true) + // .mockReturnValue(false); + sendAsyncRequest = jest.fn().mockName("sendAsyncRequest"); + sendSyncRequest = jest.fn().mockName("sendSyncRequest"); + watchBlocksV1 = jest.fn().mockName("watchBlocksV1"); +} + +class MockEventListener implements VerifierEventListener { + onEvent = jest.fn().mockName("onEvent"); + onError = jest.fn().mockName("onError"); +} + +////////////////////////////// +// Monitoring Tests +////////////////////////////// + +describe("Monitoring Tests", () => { + // Assume block data format is string + let apiClientMock: MockApiClient; + let sut: Verifier>; + let eventListenerMock: MockEventListener; + + beforeEach(() => { + apiClientMock = new MockApiClient(); + apiClientMock.watchBlocksV1.mockReturnValue( + new Observable(() => log.debug("Mock subscribe called")), + ); + sut = new Verifier(apiClientMock, sutLogLevel); + eventListenerMock = new MockEventListener(); + }); + + test("Entry is added to runningMonitors for new monitoring requests", () => { + const monitorOptions = { test: true }; + expect(sut.runningMonitors.size).toEqual(0); + sut.startMonitor("someId", eventListenerMock, monitorOptions); + expect(sut.runningMonitors.size).toEqual(1); + expect(apiClientMock.watchBlocksV1).toBeCalledWith(monitorOptions); + }); + + // test("ApiClient is called without monitorOptions if monitor was already started", () => { + // const monitorOptions = { test: true }; + // sut.startMonitor("someId", eventListenerMock, monitorOptions); + // sut.startMonitor("anotherId", eventListenerMock, monitorOptions); + // expect(apiClientMock.watchBlocksV1).lastCalledWith(undefined); + // }); + + test("Running multiple monitors with same ID throws an Error", () => { + sut.startMonitor("someId", eventListenerMock); + expect(() => sut.startMonitor("someId", eventListenerMock)).toThrow(); + }); + + test("In case of ApiClient exception runningMonitors is not updated", () => { + apiClientMock = new MockApiClient(); + apiClientMock.watchBlocksV1.mockImplementation(() => { + throw Error("Some mock error in watchBlocks"); + }); + sut = new Verifier(apiClientMock, sutLogLevel); + + expect(sut.runningMonitors.size).toEqual(0); + sut.startMonitor("someId", eventListenerMock); + expect(sut.runningMonitors.size).toEqual(0); + }); + + test("stopMonitor throws error when called with unknown monitor id", () => { + expect(() => sut.stopMonitor("someId")).toThrow(); + }); + + test("onEvent callback is called when new block was received", (done) => { + // Our mock block data is just a string + const mockBlockData = "MockBlockData"; + apiClientMock = new MockApiClient(); + apiClientMock.watchBlocksV1.mockReturnValueOnce( + new Observable((subscriber) => { + log.debug("Observable has started..."); + subscriber.next(mockBlockData); + }), + ); + sut = new Verifier(apiClientMock, sutLogLevel); + + eventListenerMock = new MockEventListener(); + eventListenerMock.onEvent.mockImplementation((blockData: string) => { + log.debug("onEvent() called with blockData:", blockData); + expect(blockData).toEqual(mockBlockData); + done(); + }); + + sut.startMonitor("someId", eventListenerMock); + }); + + test("onError callback is called in when monitoring failed", (done) => { + const mockError = Error("Something terrible"); + apiClientMock = new MockApiClient(); + apiClientMock.watchBlocksV1.mockReturnValueOnce( + new Observable((subscriber) => { + log.debug("Observable has started..."); + subscriber.error(mockError); + }), + ); + sut = new Verifier(apiClientMock, sutLogLevel); + + eventListenerMock = new MockEventListener(); + eventListenerMock.onError.mockImplementation((err: any) => { + log.debug("onError() called with error:", err); + expect(err).toEqual(mockError); + done(); + }); + + sut.startMonitor("someId", eventListenerMock); + }); + + test("stopMonitor unsubscribes and deletes entry from runningMonitors", () => { + const thisAppId = "someId"; + + // Start monitor + expect(sut.runningMonitors.size).toEqual(0); + sut.startMonitor(thisAppId, eventListenerMock); + expect(sut.runningMonitors.size).toEqual(1); + + // Assert monitor is running + const mon = sut.runningMonitors.get(thisAppId); + expect(mon?.closed).toBeFalse(); + + // Stop monitor + sut.stopMonitor(thisAppId); + + // Assert monitor closed and removed from runningMonitors + expect(sut.runningMonitors.size).toEqual(0); + expect(mon?.closed).toBeTrue(); + }); +}); diff --git a/packages/cactus-api-client/tsconfig.json b/packages/cactus-api-client/tsconfig.json index b3fccc28ace..24b536ef9d2 100644 --- a/packages/cactus-api-client/tsconfig.json +++ b/packages/cactus-api-client/tsconfig.json @@ -22,6 +22,9 @@ }, { "path": "../cactus-plugin-consortium-manual/tsconfig.json" + }, + { + "path": "../cactus-test-tooling/tsconfig.json" } ] } \ No newline at end of file diff --git a/packages/cactus-core-api/package.json b/packages/cactus-core-api/package.json index 80af4cf66d2..12ddccacb89 100644 --- a/packages/cactus-core-api/package.json +++ b/packages/cactus-core-api/package.json @@ -81,6 +81,7 @@ }, "dependencies": { "@hyperledger/cactus-common": "1.0.0-rc.3", - "axios": "0.21.4" + "axios": "0.21.4", + "rxjs": "7.3.0" } } diff --git a/packages/cactus-core-api/src/main/typescript/plugin/ledger-connector/i-socket-api-client.ts b/packages/cactus-core-api/src/main/typescript/plugin/ledger-connector/i-socket-api-client.ts new file mode 100644 index 00000000000..1b60f9cc1ac --- /dev/null +++ b/packages/cactus-core-api/src/main/typescript/plugin/ledger-connector/i-socket-api-client.ts @@ -0,0 +1,25 @@ +import { Observable } from "rxjs"; + +/** + * Each plugin-ledger-connector supporting socketio protocol must implement this interface. + * @todo Generic methods signatures (similar to IPluginLedgerConnector) + * @todo Maybe more granular interface segregation for better mixins? For instance, Verifier only needs watchBlocksV1. + * It's possible there's no need for send requests abstractions + */ +export interface ISocketApiClient { + sendAsyncRequest?( + contract: Record, + method: Record, + args: any, + ): void; + + sendSyncRequest?( + contract: Record, + method: Record, + args: any, + ): Promise; + + watchBlocksV1( + monitorOptions?: Record, + ): Observable; +} diff --git a/packages/cactus-core-api/src/main/typescript/public-api.ts b/packages/cactus-core-api/src/main/typescript/public-api.ts index c0dcc1f1940..ce2ffe7d3af 100755 --- a/packages/cactus-core-api/src/main/typescript/public-api.ts +++ b/packages/cactus-core-api/src/main/typescript/public-api.ts @@ -5,6 +5,7 @@ export * from "./generated/openapi/typescript-axios/index"; export * from "./generated/openapi/typescript-axios/base"; export { IPluginLedgerConnector } from "./plugin/ledger-connector/i-plugin-ledger-connector"; +export { ISocketApiClient } from "./plugin/ledger-connector/i-socket-api-client"; export { IPluginConsortium } from "./plugin/consortium/i-plugin-consortium"; export { IPluginKeychain } from "./plugin/keychain/i-plugin-keychain"; export { isIPluginKeychain } from "./plugin/keychain/is-i-plugin-keychain"; diff --git a/packages/cactus-plugin-ledger-connector-besu/src/main/typescript/api-client/besu-api-client.ts b/packages/cactus-plugin-ledger-connector-besu/src/main/typescript/api-client/besu-api-client.ts index eedfcd4a5b7..484273292a0 100644 --- a/packages/cactus-plugin-ledger-connector-besu/src/main/typescript/api-client/besu-api-client.ts +++ b/packages/cactus-plugin-ledger-connector-besu/src/main/typescript/api-client/besu-api-client.ts @@ -3,7 +3,7 @@ import { finalize } from "rxjs/operators"; import { Socket, io } from "socket.io-client"; import { Logger, Checks } from "@hyperledger/cactus-common"; import { LogLevelDesc, LoggerProvider } from "@hyperledger/cactus-common"; -import { Constants } from "@hyperledger/cactus-core-api"; +import { Constants, ISocketApiClient } from "@hyperledger/cactus-core-api"; import { DefaultApi, WatchBlocksV1, @@ -17,7 +17,9 @@ export class BesuApiClientOptions extends Configuration { readonly wsApiPath?: string; } -export class BesuApiClient extends DefaultApi { +export class BesuApiClient + extends DefaultApi + implements ISocketApiClient { public static readonly CLASS_NAME = "BesuApiClient"; private readonly log: Logger; @@ -45,7 +47,7 @@ export class BesuApiClient extends DefaultApi { this.log.debug(`basePath=${this.options.basePath}`); } - public async watchBlocksV1(): Promise> { + public watchBlocksV1(): Observable { const socket: Socket = io(this.wsApiHost, { path: this.wsApiPath }); const subject = new ReplaySubject(0); diff --git a/packages/cactus-test-api-client/src/test/typescript/integration/verifier-integration-with-openapi-connectors.test.ts b/packages/cactus-test-api-client/src/test/typescript/integration/verifier-integration-with-openapi-connectors.test.ts new file mode 100644 index 00000000000..09fac3aaed8 --- /dev/null +++ b/packages/cactus-test-api-client/src/test/typescript/integration/verifier-integration-with-openapi-connectors.test.ts @@ -0,0 +1,235 @@ +// Besu setup code based on: +// packages/cactus-plugin-ledger-connector-besu/src/test/typescript/integration/plugin-ledger-connector-besu/deploy-contract/v21-deploy-contract-from-json.test.ts + +////////////////////////////////// +// Constants +////////////////////////////////// + +const testLogLevel: LogLevelDesc = "info"; +const sutLogLevel: LogLevelDesc = "info"; +const containerImageName = "ghcr.io/hyperledger/cactus-besu-21-1-6-all-in-one"; +const containerImageVersion = "2021-08-24--feat-1244"; + +import "jest-extended"; +import { v4 as uuidv4 } from "uuid"; +import { Server as SocketIoServer } from "socket.io"; +import { PluginRegistry } from "@hyperledger/cactus-core"; +import { + Web3SigningCredentialType, + PluginLedgerConnectorBesu, + ReceiptType, + BesuApiClient, + WatchBlocksV1Progress, +} from "@hyperledger/cactus-plugin-ledger-connector-besu"; +import { PluginKeychainMemory } from "@hyperledger/cactus-plugin-keychain-memory"; +import { + BesuTestLedger, + pruneDockerAllIfGithubAction, +} from "@hyperledger/cactus-test-tooling"; +import { + Logger, + LoggerProvider, + LogLevelDesc, + IListenOptions, + Servers, +} from "@hyperledger/cactus-common"; +import Web3 from "web3"; +import { Account } from "web3-core"; +import { Constants } from "@hyperledger/cactus-core-api"; +import express from "express"; +import http from "http"; +import { AddressInfo } from "net"; +import { BesuApiClientOptions } from "@hyperledger/cactus-plugin-ledger-connector-besu"; +import { + Verifier, + VerifierEventListener, +} from "@hyperledger/cactus-api-client"; + +// Unit Test logger setup +const log: Logger = LoggerProvider.getOrCreate({ + label: "verifier-integration-with-openapi-connectors.test", + level: testLogLevel, +}); +log.info("Test started"); + +////////////////////////////////// +// Test Timeout (default) +////////////////////////////////// + +jest.setTimeout(15 * 1000); // 10 seconds (per test) + +describe("Verifier integration with openapi connectors tests", () => { + let besuTestLedger: BesuTestLedger; + let server: http.Server; + let connector: PluginLedgerConnectorBesu; + let apiClient: BesuApiClient; + let sourceEthAccountPubKey: string; + let sourceEthAccountPrivKey: { privateKey: string }; + let targetEthAccount: Account; + + ////////////////////////////////// + // Environment Setup + ////////////////////////////////// + + beforeAll(async () => { + log.info("Prune Docker..."); + await pruneDockerAllIfGithubAction({ logLevel: testLogLevel }); + + log.info("Start BesuTestLedger..."); + log.debug("Besu image:", containerImageName); + log.debug("Besu version:", containerImageVersion); + besuTestLedger = new BesuTestLedger({ + containerImageName, + containerImageVersion, + }); + await besuTestLedger.start(); + + const rpcApiHttpHost = await besuTestLedger.getRpcApiHttpHost(); + const rpcApiWsHost = await besuTestLedger.getRpcApiWsHost(); + log.debug("rpcApiHttpHost:", rpcApiHttpHost); + log.debug("rpcApiWsHost:", rpcApiWsHost); + + // Source account - genesis account + sourceEthAccountPubKey = besuTestLedger.getGenesisAccountPubKey(); + sourceEthAccountPrivKey = { + privateKey: besuTestLedger.getGenesisAccountPrivKey(), + }; + + // Target account - create new + const web3 = new Web3(rpcApiHttpHost); + targetEthAccount = web3.eth.accounts.create(uuidv4()); + const keychainEntryKey = uuidv4(); + const keychainEntryValue = targetEthAccount.privateKey; + const keychainPlugin = new PluginKeychainMemory({ + instanceId: uuidv4(), + keychainId: uuidv4(), + // pre-provision keychain with mock backend holding the private key of the + // test account that we'll reference while sending requests with the + // signing credential pointing to this keychain entry. + backend: new Map([[keychainEntryKey, keychainEntryValue]]), + logLevel: sutLogLevel, + }); + + log.info("Create PluginLedgerConnectorBesu..."); + connector = new PluginLedgerConnectorBesu({ + rpcApiHttpHost, + rpcApiWsHost, + logLevel: sutLogLevel, + instanceId: uuidv4(), + pluginRegistry: new PluginRegistry({ plugins: [keychainPlugin] }), + }); + + log.info("Start HTTP and WS servers..."); + const expressApp = express(); + expressApp.use(express.json({ limit: "250mb" })); + server = http.createServer(expressApp); + const wsApi = new SocketIoServer(server, { + path: Constants.SocketIoConnectionPathV1, + }); + + const listenOptions: IListenOptions = { + hostname: "localhost", + port: 0, + server, + }; + const addressInfo = (await Servers.listen(listenOptions)) as AddressInfo; + const { address, port } = addressInfo; + const apiHost = `http://${address}:${port}`; + log.info( + `Metrics URL: ${apiHost}/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-besu/get-prometheus-exporter-metrics`, + ); + await connector.getOrCreateWebServices(); + await connector.registerWebServices(expressApp, wsApi); + + log.info("Create BesuApiClientOptions..."); + const besuApiClientOptions = new BesuApiClientOptions({ + basePath: apiHost, + }); + apiClient = new BesuApiClient(besuApiClientOptions); + }, 60 * 1000); // 60s timeout + + afterAll(async () => { + log.info("Shutdown the server..."); + await Servers.shutdown(server); + log.info("Stop and destroy the test ledger..."); + await besuTestLedger.stop(); + await besuTestLedger.destroy(); + log.info("Prune docker..."); + await pruneDockerAllIfGithubAction({ logLevel: testLogLevel }); + }, 60 * 1000); // 60s timeout + + ////////////////////////////////// + // Functional Tests + ////////////////////////////////// + + test("Verifier is constructed on BesuApiClient", async () => { + const sut = new Verifier(apiClient, sutLogLevel); + expect(sut.ledgerApi).toBe(apiClient); + }); + + function sendTransactionOnBesuLedger() { + return connector.transact({ + web3SigningCredential: { + ethAccount: sourceEthAccountPubKey, + secret: sourceEthAccountPrivKey.privateKey, + type: Web3SigningCredentialType.PrivateKeyHex, + }, + transactionConfig: { + from: sourceEthAccountPubKey, + to: targetEthAccount.address, + value: 10e9, + gas: 1000000, + }, + consistencyStrategy: { + blockConfirmations: 0, + receiptType: ReceiptType.NodeTxPoolAck, + timeoutMs: 10000, + }, + }); + } + + test("Sanity check that BesuApiClient watchBlocksV1 works", async () => { + const newBlock = new Promise((resolve, reject) => { + const subscription = apiClient + .watchBlocksV1() + .subscribe((res: WatchBlocksV1Progress) => { + log.debug("Received block number", res.blockHeader.number); + if (!res.blockHeader) { + reject("Empty block received"); + } + subscription.unsubscribe(); + resolve(res); + }); + }); + + await sendTransactionOnBesuLedger(); + return expect(newBlock).toResolve(); + }); + + test("Verifier works with BesuApiClient", async () => { + const newBlock = new Promise((resolve, reject) => { + const appId = "testMonitor"; + const sut = new Verifier(apiClient, sutLogLevel); + + const monitor: VerifierEventListener = { + onEvent(ledgerEvent: WatchBlocksV1Progress): void { + log.info( + "Listener received ledgerEvent, block number", + ledgerEvent.blockHeader.number, + ); + sut.stopMonitor(appId); + resolve(ledgerEvent); + }, + onError(err: any): void { + log.error("Ledger monitoring error:", err); + reject(err); + }, + }; + + sut.startMonitor(appId, monitor); + }); + + await sendTransactionOnBesuLedger(); + return expect(newBlock).toResolve(); + }); +}); diff --git a/packages/cactus-test-tooling/src/main/typescript/public-api.ts b/packages/cactus-test-tooling/src/main/typescript/public-api.ts index 1b13ee38fab..676da718629 100755 --- a/packages/cactus-test-tooling/src/main/typescript/public-api.ts +++ b/packages/cactus-test-tooling/src/main/typescript/public-api.ts @@ -156,3 +156,4 @@ export { IDockerPullProgressDetail } from "./common/i-docker-pull-progress"; export { envNodeToDocker } from "./common/env-node-to-docker"; export { envMapToDocker } from "./common/env-map-to-docker"; export { envNodeToMap } from "./common/env-node-to-map"; +export * as SocketIOTestSetupHelpers from "./socketio-test-setup-helpers/socketio-test-setup-helpers"; diff --git a/packages/cactus-test-tooling/src/main/typescript/socketio-test-setup-helpers/socketio-test-setup-helpers.ts b/packages/cactus-test-tooling/src/main/typescript/socketio-test-setup-helpers/socketio-test-setup-helpers.ts new file mode 100644 index 00000000000..31e1ccd1e14 --- /dev/null +++ b/packages/cactus-test-tooling/src/main/typescript/socketio-test-setup-helpers/socketio-test-setup-helpers.ts @@ -0,0 +1,79 @@ +/** + * Helper module for setting up client/server test sockets. + */ + +import { Server, Socket as ServerSocket } from "socket.io"; +import { io, Socket as ClientSocket } from "socket.io-client"; +import { createServer } from "http"; + +export { Server, ServerSocket, ClientSocket }; + +/** + * Create a socket.io server listening on random local port for test purposes. + * + * @returns [socketio Server, port] + */ +export function createListeningMockServer(): Promise<[Server, string]> { + return new Promise((resolve, reject) => { + const httpServer = createServer(); + httpServer.unref(); + + const testServer = new Server(httpServer, { + transports: ["websocket"], + cookie: false, + }); + + httpServer.listen(0, () => { + const addrInfo = httpServer.address(); + + if (addrInfo && typeof addrInfo == "object") { + const port = addrInfo.port.toString(); + resolve([testServer, port]); + } else { + reject(Error("Couldn't create mock server")); + } + }); + }); +} + +/** + * Create client socket to localhost. + * + * @port - Localhost port to connect to. + */ +export function createClientSocket(port: string): ClientSocket { + return io(`http://localhost:${port}`, { + reconnectionAttempts: 10, + reconnectionDelay: 1000, + transports: ["websocket"], + }); +} + +/** + * Connects supplied client to the test server. + * @returns connected client socket + */ +export function connectTestClient(socket: ClientSocket): Promise { + return new Promise((resolve, reject) => { + // Install setup-time error handlers + const errorHandlerFactory = (event: string) => { + // TODO - Better logging / Remove this + return (err: Record | Error) => { + if (socket) { + console.log("connect error - event", event); + socket.close(); + reject(err); + } + }; + }; + + socket.on("error", errorHandlerFactory("error")); + socket.on("connect_error", errorHandlerFactory("connect_error")); + socket.on("connect_timeout", errorHandlerFactory("connect_timeout")); + + socket.on("connect", () => { + socket.removeAllListeners(); + resolve(socket); + }); + }); +}