diff --git a/packages/cactus-core-api/src/main/typescript/plugin/ledger-connector/i-socket-api-client.ts b/packages/cactus-core-api/src/main/typescript/plugin/ledger-connector/i-socket-api-client.ts index 48d30acf73..18554855ba 100644 --- a/packages/cactus-core-api/src/main/typescript/plugin/ledger-connector/i-socket-api-client.ts +++ b/packages/cactus-core-api/src/main/typescript/plugin/ledger-connector/i-socket-api-client.ts @@ -22,4 +22,8 @@ export interface ISocketApiClient { watchBlocksV1?( monitorOptions?: Record, ): Observable; + + watchBlocksAsyncV1?( + monitorOptions?: Record, + ): Promise>; } diff --git a/packages/cactus-plugin-ledger-connector-corda/README.md b/packages/cactus-plugin-ledger-connector-corda/README.md index 05de3d3afb..6cb9b9452d 100644 --- a/packages/cactus-plugin-ledger-connector-corda/README.md +++ b/packages/cactus-plugin-ledger-connector-corda/README.md @@ -293,6 +293,37 @@ const res = await apiClient.invokeContractV1({ }); ``` +### Transaction Monitoring +- There are two interfaces to monitor changes of vault states - reactive `watchBlocksV1` method, and low-level HTTP API calls. +- Note: The monitoring APIs are implemented only on kotlin-server connector (`main-server`), not typescript connector! +- For usage examples review the functional test file: `packages/cactus-plugin-ledger-connector-corda/src/test/typescript/integration/monitor-transactions-v4.8.test.ts` +- Because transactions read from corda are stored on the connector, they will be lost if connector is closed/killed before transaction were read by the clients. +- Each client has own set of state monitors that are managed independently. After starting the monitoring, each new transaction is queued on the connector until read and explicitly cleared by `watchBlocksV1` or direct HTTP API call. +- Client monitors can be periodically removed by the connector, if there was no action from the client for specified amount of time. +- Client expiration delay can be configured with `cactus.sessionExpireMinutes` option. It default to 30 minutes. +- Each transaction has own index assigned by the corda connector. Index is unique for each client monitoring session. For instance: + - Stopping monitoring for given state will reset the transaction index counter for given client. After restart, it will report first transaction with index 0. + - Each client can see tha same transaction with different index. + - Index can be used to determine the transaction order for given client session. + +#### watchBlocksV1 +- `watchBlocksV1(options: watchBlocksV1Options): Observable` +- Reactive (RxJS) interface to observe state changes. +- Internally, it uses polling of low-level HTTP APIs. +- Watching block should return each block at least once, no blocks should be missed after startMonitor has started. The only case when transaction is lost is when connector we were connected to died. +- Transactions can be duplicated in case internal `ClearMonitorTransactionsV1` call was not successful (for instance, because of connection problems). +- Options: + - `stateFullClassName: string`: state to monitor. + - `pollRate?: number`: how often poll the kotlin server for changes (default 5 seconds). + +#### Low-level HTTP API +- These should not be used when watchBlocks API is sufficient. +- Consists of the following methods: + - `startMonitorV1`: Start monitoring for specified state changes. All changes after calling this function will be stored in internal kotlin-server buffer, ready to be read by calls to `GetMonitorTransactionsV1`. Transactions occuring before the call to startMonitorV1 will not be reported. + - `GetMonitorTransactionsV1`: Read all transactions for given state name still remaining in internal buffer. + - `ClearMonitorTransactionsV1`: Remove transaction for given state name with specified index number from internal buffer. Should be used to acknowledge receiving specified transactions in user code, so that transactions are not reported multiple times. + - `stopMonitorV1`: Don't watch for transactions changes anymore, remove any transactions that were not read until now. + ### Custom Configuration via Env Variables ```json diff --git a/packages/cactus-plugin-ledger-connector-corda/package.json b/packages/cactus-plugin-ledger-connector-corda/package.json index f24dc1c776..0b1a108689 100644 --- a/packages/cactus-plugin-ledger-connector-corda/package.json +++ b/packages/cactus-plugin-ledger-connector-corda/package.json @@ -63,6 +63,7 @@ "joi": "17.4.2", "node-ssh": "12.0.0", "prom-client": "13.2.0", + "rxjs": "7.3.0", "temp": "0.9.4", "typescript-optional": "2.0.1" }, diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/.openapi-generator/FILES b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/.openapi-generator/FILES index c5163f9e93..8bd804e7e6 100644 --- a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/.openapi-generator/FILES +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/.openapi-generator/FILES @@ -4,6 +4,8 @@ settings.gradle src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCorda.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCordaService.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiUtil.kt +src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/ClearMonitorTransactionsV1Request.kt +src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/ClearMonitorTransactionsV1Response.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/CordaNodeSshCredentials.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/CordaRpcCredentials.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/CordaX500Name.kt @@ -15,6 +17,9 @@ src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/mode src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/DiagnoseNodeV1Request.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/DiagnoseNodeV1Response.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/FlowInvocationType.kt +src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1Request.kt +src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1Response.kt +src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1ResponseTx.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/InvokeContractV1Request.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/InvokeContractV1Response.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/JarFile.kt @@ -29,5 +34,9 @@ src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/mode src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/Party.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/PublicKey.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/SHA256.kt +src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StartMonitorV1Request.kt +src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StartMonitorV1Response.kt +src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StopMonitorV1Request.kt +src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StopMonitorV1Response.kt src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/X500Principal.kt src/main/resources/application.yaml diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/Application.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/Application.kt index e4942f9052..c32725a706 100644 --- a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/Application.kt +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/Application.kt @@ -10,10 +10,12 @@ import org.springframework.context.annotation.ComponentScan import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.context.annotation.Bean import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter +import org.springframework.scheduling.annotation.EnableScheduling @SpringBootApplication @ComponentScan(basePackages = ["org.hyperledger.cactus.plugin.ledger.connector.corda.server", "org.hyperledger.cactus.plugin.ledger.connector.corda.server.api", "org.hyperledger.cactus.plugin.ledger.connector.corda.server.model"]) +@EnableScheduling class Application { /** * Spring Bean that binds a Corda Jackson object-mapper to HTTP message types used in Spring. diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCorda.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCorda.kt index 312e113d98..d6accc6a8d 100644 --- a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCorda.kt +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCorda.kt @@ -1,15 +1,23 @@ package org.hyperledger.cactus.plugin.ledger.connector.corda.server.api +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ClearMonitorTransactionsV1Request +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ClearMonitorTransactionsV1Response import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsBadRequestV1Response import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsSuccessV1Response import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsV1Request import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DiagnoseNodeV1Request import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DiagnoseNodeV1Response +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1Request +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1Response import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.InvokeContractV1Request import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.InvokeContractV1Response import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ListFlowsV1Request import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ListFlowsV1Response import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.NodeInfo +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StartMonitorV1Request +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StartMonitorV1Response +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StopMonitorV1Request +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StopMonitorV1Response import org.springframework.http.HttpStatus import org.springframework.http.MediaType import org.springframework.http.ResponseEntity @@ -37,6 +45,17 @@ import kotlin.collections.Map class ApiPluginLedgerConnectorCordaController(@Autowired(required = true) val service: ApiPluginLedgerConnectorCordaService) { + @DeleteMapping( + value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/clear-monitor-transactions"], + produces = ["application/json"], + consumes = ["application/json"] + ) + fun clearMonitorTransactionsV1( @Valid @RequestBody(required = false) clearMonitorTransactionsV1Request: ClearMonitorTransactionsV1Request? +): ResponseEntity { + return ResponseEntity(service.clearMonitorTransactionsV1(clearMonitorTransactionsV1Request), HttpStatus.valueOf(200)) + } + + @PostMapping( value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/deploy-contract-jars"], produces = ["application/json"], @@ -59,6 +78,17 @@ class ApiPluginLedgerConnectorCordaController(@Autowired(required = true) val se } + @GetMapping( + value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/get-monitor-transactions"], + produces = ["application/json"], + consumes = ["application/json"] + ) + fun getMonitorTransactionsV1( @Valid @RequestBody(required = false) getMonitorTransactionsV1Request: GetMonitorTransactionsV1Request? +): ResponseEntity { + return ResponseEntity(service.getMonitorTransactionsV1(getMonitorTransactionsV1Request), HttpStatus.valueOf(200)) + } + + @GetMapping( value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/get-prometheus-exporter-metrics"], produces = ["text/plain"] @@ -99,4 +129,26 @@ class ApiPluginLedgerConnectorCordaController(@Autowired(required = true) val se ): ResponseEntity> { return ResponseEntity(service.networkMapV1(body), HttpStatus.valueOf(200)) } + + + @PostMapping( + value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/start-monitor"], + produces = ["application/json"], + consumes = ["application/json"] + ) + fun startMonitorV1( @Valid @RequestBody(required = false) startMonitorV1Request: StartMonitorV1Request? +): ResponseEntity { + return ResponseEntity(service.startMonitorV1(startMonitorV1Request), HttpStatus.valueOf(200)) + } + + + @DeleteMapping( + value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/stop-monitor"], + produces = ["application/json"], + consumes = ["application/json"] + ) + fun stopMonitorV1( @Valid @RequestBody(required = false) stopMonitorV1Request: StopMonitorV1Request? +): ResponseEntity { + return ResponseEntity(service.stopMonitorV1(stopMonitorV1Request), HttpStatus.valueOf(200)) + } } diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCordaService.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCordaService.kt index 462daaf871..4c987f50a0 100644 --- a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCordaService.kt +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCordaService.kt @@ -1,22 +1,34 @@ package org.hyperledger.cactus.plugin.ledger.connector.corda.server.api +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ClearMonitorTransactionsV1Request +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ClearMonitorTransactionsV1Response import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsBadRequestV1Response import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsSuccessV1Response import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsV1Request import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DiagnoseNodeV1Request import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DiagnoseNodeV1Response +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1Request +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1Response import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.InvokeContractV1Request import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.InvokeContractV1Response import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ListFlowsV1Request import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ListFlowsV1Response import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.NodeInfo +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StartMonitorV1Request +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StartMonitorV1Response +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StopMonitorV1Request +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StopMonitorV1Response interface ApiPluginLedgerConnectorCordaService { + fun clearMonitorTransactionsV1(clearMonitorTransactionsV1Request: ClearMonitorTransactionsV1Request?): ClearMonitorTransactionsV1Response + fun deployContractJarsV1(deployContractJarsV1Request: DeployContractJarsV1Request?): DeployContractJarsSuccessV1Response fun diagnoseNodeV1(diagnoseNodeV1Request: DiagnoseNodeV1Request?): DiagnoseNodeV1Response + fun getMonitorTransactionsV1(getMonitorTransactionsV1Request: GetMonitorTransactionsV1Request?): GetMonitorTransactionsV1Response + fun getPrometheusMetricsV1(): kotlin.String fun invokeContractV1(invokeContractV1Request: InvokeContractV1Request?): InvokeContractV1Response @@ -24,4 +36,8 @@ interface ApiPluginLedgerConnectorCordaService { fun listFlowsV1(listFlowsV1Request: ListFlowsV1Request?): ListFlowsV1Response fun networkMapV1(body: kotlin.Any?): List + + fun startMonitorV1(startMonitorV1Request: StartMonitorV1Request?): StartMonitorV1Response + + fun stopMonitorV1(stopMonitorV1Request: StopMonitorV1Request?): StopMonitorV1Response } diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/impl/ApiPluginLedgerConnectorCordaServiceImpl.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/impl/ApiPluginLedgerConnectorCordaServiceImpl.kt index 4944d16fe0..6f532a6808 100644 --- a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/impl/ApiPluginLedgerConnectorCordaServiceImpl.kt +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/impl/ApiPluginLedgerConnectorCordaServiceImpl.kt @@ -23,11 +23,13 @@ import org.hyperledger.cactus.plugin.ledger.connector.corda.server.api.ApiPlugin import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.* import java.io.IOException import java.io.InputStream -import java.lang.Exception import java.lang.RuntimeException import java.util.* import java.util.concurrent.TimeUnit import kotlin.IllegalArgumentException +import net.corda.core.contracts.ContractState +import org.springframework.web.util.HtmlUtils.htmlEscape +import kotlin.Exception // TODO Look into this project for powering the connector of ours: // https://github.com/180Protocol/codaptor @@ -37,7 +39,8 @@ class ApiPluginLedgerConnectorCordaServiceImpl( // to be hardcoded like this. Not even sure if these magic strings here actually get used at all or if spring just // overwrites the bean property with whatever it constructed internally based on the configuration. // Either way, these magic strings gotta go. - val rpc: NodeRPCConnection + val rpc: NodeRPCConnection, + val monitorManager: StateMonitorSessionsManager ) : ApiPluginLedgerConnectorCordaService { companion object { @@ -109,7 +112,7 @@ class ApiPluginLedgerConnectorCordaServiceImpl( "requiredSigningKeys" to returnValue.requiredSigningKeys, "sigs" to returnValue.sigs ); - + } else if (returnValue != null) { callOutput = try { val returnValueJson = writer.writeValueAsString(returnValue); @@ -346,4 +349,144 @@ class ApiPluginLedgerConnectorCordaServiceImpl( logger.info("Returning {} NodeInfo elements in response.", nodeInfoList.size) return nodeInfoList } + + /** + * Start monitoring state changes for clientAppID of stateClass specified in the request body. + */ + override fun startMonitorV1(startMonitorV1Request: StartMonitorV1Request?): StartMonitorV1Response { + val clientAppId = startMonitorV1Request?.clientAppId + val stateName = startMonitorV1Request?.stateFullClassName + + if (clientAppId.isNullOrEmpty()) { + val message = "Request rejected because missing client app ID" + logger.info(message) + return StartMonitorV1Response(false, message) + } + + if (stateName.isNullOrEmpty()) { + val message = "Request rejected because missing state class name" + logger.info(message) + return StartMonitorV1Response(false, message) + } + + try { + @Suppress("UNCHECKED_CAST") + val contractState = jsonJvmObjectDeserializer.getOrInferType(stateName) as Class + + monitorManager.withClient(clientAppId) { + startMonitor(stateName, contractState) + return StartMonitorV1Response(true, "OK") + } + } catch (ex: ClassNotFoundException) { + val message = "Unknown corda state name to monitor: ${htmlEscape(stateName)}" + logger.warn("startMonitorV1 error: {}", message) + return StartMonitorV1Response(false, message) + } catch (ex: Throwable) { + logger.warn("startMonitorV1 error: {}, cause: {}", ex.toString(), ex.cause.toString()) + return StartMonitorV1Response(false, htmlEscape(ex.toString())) + } + } + + /** + * Read all transactions that were not read by its client yet. + * Must be called after startMonitorV1 and before stopMonitorV1. + * Transactions buffer must be explicitly cleared with clearMonitorTransactionsV1 + */ + override fun getMonitorTransactionsV1(getMonitorTransactionsV1Request: GetMonitorTransactionsV1Request?): GetMonitorTransactionsV1Response { + val clientAppId = getMonitorTransactionsV1Request?.clientAppId + val stateName = getMonitorTransactionsV1Request?.stateFullClassName + + if (clientAppId.isNullOrEmpty()) { + val message = "Request rejected because missing client app ID" + logger.info(message) + return GetMonitorTransactionsV1Response(false, message) + } + + if (stateName.isNullOrEmpty()) { + val message = "Request rejected because missing state class name" + logger.info(message) + return GetMonitorTransactionsV1Response(false, message) + } + + try { + monitorManager.withClient(clientAppId) { + return GetMonitorTransactionsV1Response(true, "OK", stateName, getTransactions(stateName).toList()) + } + } + catch (ex: Throwable) { + logger.warn("getMonitorTransactionsV1 error: {}, cause: {}", ex.toString(), ex.cause.toString()) + return GetMonitorTransactionsV1Response(false, htmlEscape(ex.toString())) + } + } + + /** + * Clear monitored transactions based on index from internal client buffer. + * Any future call to getMonitorTransactionsV1 will not return transactions removed by this call. + */ + override fun clearMonitorTransactionsV1(clearMonitorTransactionsV1Request: ClearMonitorTransactionsV1Request?): ClearMonitorTransactionsV1Response { + val clientAppId = clearMonitorTransactionsV1Request?.clientAppId + val stateName = clearMonitorTransactionsV1Request?.stateFullClassName + val indexesToRemove = clearMonitorTransactionsV1Request?.txIndexes + + if (clientAppId.isNullOrEmpty()) { + val message = "Request rejected because missing client app ID" + logger.info(message) + return ClearMonitorTransactionsV1Response(false, message) + } + + if (stateName.isNullOrEmpty()) { + val message = "Request rejected because missing state class name" + logger.info(message) + return ClearMonitorTransactionsV1Response(false, message) + } + + if (indexesToRemove.isNullOrEmpty()) { + val message = "No indexes to remove" + logger.info(message) + return ClearMonitorTransactionsV1Response(true, message) + } + + try { + monitorManager.withClient(clientAppId) { + clearTransactions(stateName, indexesToRemove) + return ClearMonitorTransactionsV1Response(true, "OK") + } + } + catch (ex: Throwable) { + logger.warn("clearMonitorTransactionsV1 error: {}, cause: {}", ex.toString(), ex.cause.toString()) + return ClearMonitorTransactionsV1Response(false, htmlEscape(ex.toString())) + } + } + + /** + * Stop monitoring state changes for clientAppID of stateClass specified in the request body. + * Removes all transactions that were not read yet, unsubscribes from the monitor. + */ + override fun stopMonitorV1(stopMonitorV1Request: StopMonitorV1Request?): StopMonitorV1Response { + val clientAppId = stopMonitorV1Request?.clientAppId + val stateName = stopMonitorV1Request?.stateFullClassName + + if (clientAppId.isNullOrEmpty()) { + val message = "Request rejected because missing client app ID" + logger.info(message) + return StopMonitorV1Response(false, message) + } + + if (stateName.isNullOrEmpty()) { + val message = "Request rejected because missing state class name" + logger.info(message) + return StopMonitorV1Response(false, message) + } + + try { + monitorManager.withClient(clientAppId) { + stopMonitor(stateName) + return StopMonitorV1Response(true, "OK") + } + } + catch (ex: Throwable) { + logger.warn("clearMonitorTransactionsV1 error: {}, cause: {}", ex.toString(), ex.cause.toString()) + return StopMonitorV1Response(false, htmlEscape(ex.toString())) + } + } } diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/impl/StateMonitorClientSession.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/impl/StateMonitorClientSession.kt new file mode 100644 index 0000000000..2fdfec7538 --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/impl/StateMonitorClientSession.kt @@ -0,0 +1,139 @@ +package org.hyperledger.cactus.plugin.ledger.connector.corda.server.impl + +import net.corda.core.contracts.ContractState +import net.corda.core.utilities.loggerFor +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1ResponseTx +import rx.Subscription +import java.math.BigInteger +import java.time.LocalDateTime +import javax.annotation.PreDestroy + +/** + * Monitoring session for single client, can track multiple state changes. + * + * @param rpc The Corda RPC connection + * @param sessionExpireMinutes Period after which the session will become expired + * + * @author michal.bajer@fujitsu.com + */ +class StateMonitorClientSession(private val rpc: NodeRPCConnection, private val sessionExpireMinutes: Long) : + AutoCloseable { + /** + * Simple data class for monitor reactive `subscription`, and queue of `stateChanges` received from corda + */ + private data class StateMonitor( + val stateChanges: MutableSet, + val subscription: Subscription + ) + + private val monitors = mutableMapOf() + private var sessionExpireTime = LocalDateTime.now().plusMinutes(sessionExpireMinutes) + + companion object { + val logger = loggerFor() + } + + /** + * Start monitoring of corda state changes. + * + * Changes can be later read by `getTransactions`. + * When monitoring is already running, this function does nothing. + * + * @param stateName String representation of corda state to monitor. + * @param cordaState ContractState object of state to monitor. + */ + fun startMonitor(stateName: String, cordaState: Class) { + if (monitors.containsKey(stateName)) { + logger.info("Monitoring of state {} is already running", stateName) + return + } + + // FIXME: "valutTrack(xxx).updates" occurs an error if Corda ledger has already over 200 transactions using the stateName that you set above. + // Please refer to "https://r3-cev.atlassian.net/browse/CORDA-2956" to get more infomation. + val stateObservable = this.rpc.proxy.vaultTrack(cordaState).updates + var indexCounter = BigInteger.valueOf(0) + val stateChanges = mutableSetOf() + val monitorSub = stateObservable.subscribe { update -> + update.produced.forEach { change -> + val txResponse = GetMonitorTransactionsV1ResponseTx(indexCounter.toString(), change.toString()) + indexCounter = indexCounter.add(BigInteger.valueOf(1)) + logger.debug("Pushing new transaction for state '{}', index {}", stateName, indexCounter) + stateChanges.add(txResponse) + } + } + monitors[stateName] = StateMonitor(stateChanges, monitorSub) + logger.info("Monitoring for changes of state '{}' started.", stateName) + } + + /** + * Get transactions (state changes) from internal buffer. + * + * Throws an error if there's no monitor for specified state. Make sure startMonitor was already called first. + * + * @param stateName String representation of corda state to monitor. + * @return Set of corda state changes + */ + fun getTransactions(stateName: String): MutableSet { + if (!monitors.containsKey(stateName)) { + throw Exception("No monitor running for corda state $stateName on requested client") + } + + return monitors[stateName]?.stateChanges ?: mutableSetOf() + } + + /** + * Remove transactions with specified indexes from internal buffer. + * + * Throws an error if there's no monitor for specified state. Make sure startMonitor was already called first. + * + * @param stateName String representation of corda state to monitor. + * @param indexesToRemove List of string indexes of transactions to remove. + */ + fun clearTransactions(stateName: String, indexesToRemove: List) { + val transactions = this.getTransactions(stateName) + logger.debug("Transactions before remove: {}", transactions.size) + transactions.removeAll { it.index in indexesToRemove } + logger.debug("Transactions after remove: {}", transactions.size) + } + + /** + * Stop monitoring of corda state changes. + * + * Clears the transactions that were not read yet. + * When there is no monitor running for specified state, it does nothing. + * + * @param stateName String representation of corda state to monitor. + */ + fun stopMonitor(stateName: String) { + monitors[stateName]?.subscription?.unsubscribe() + monitors.remove(stateName) + logger.info("Monitoring for state '{}' stopped.", stateName) + } + + /** + * Removes all active monitors from this session + */ + @PreDestroy + override fun close() { + monitors.forEach { it.value.subscription.unsubscribe() } + monitors.clear() + } + + /** + * Increase time until this session is marked as expired. + * Make sure to call from time to time when the session is still in use. + */ + fun refreshExpireTime() { + this.sessionExpireTime = LocalDateTime.now().plusMinutes(sessionExpireMinutes) + } + + /** + * Return true if this session is expired + */ + fun isExpired() = LocalDateTime.now().isAfter(sessionExpireTime) + + /** + * Return true if there are active monitors in this session + */ + fun hasMonitorRunning() = monitors.isNotEmpty() +} \ No newline at end of file diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/impl/StateMonitorSessionsManager.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/impl/StateMonitorSessionsManager.kt new file mode 100644 index 0000000000..6b67d3d5af --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/impl/StateMonitorSessionsManager.kt @@ -0,0 +1,89 @@ +package org.hyperledger.cactus.plugin.ledger.connector.corda.server.impl + +import net.corda.core.utilities.loggerFor +import org.springframework.beans.factory.annotation.Value +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Component +import javax.annotation.PreDestroy + +private const val SessionExpireMinutes = "cactus.sessionExpireMinutes" +private const val SessionExpireMinutesDefault = "30" +private const val SessionExpireCheckInterval: Long = 60 * 1000 // every minute + +/** + * Client session manager for corda state monitoring. + * + * Manages monitoring session for each client calling this connector. + * Endpoint handlers can use withClient function to execute monitoring-session code in context of particular client. + * Sessions of clients who do not perform any operation for specified amount of time (cactus.sessionExpireMinutes) are removed periodically. + * + * @param sessionExpireMinutes How long to wait for action until removing the client session (config: cactus.sessionExpireMinutes) + * @param rpc The Corda RPC connection + * + * @property clientSessions Map of client sessions. Should not be accessed directly - it's public only because `withClient` is inlined in caller code. + * + * @author michal.bajer@fujitsu.com + */ +@Component +class StateMonitorSessionsManager( + @Value("\${$SessionExpireMinutes:${SessionExpireMinutesDefault}}") val sessionExpireMinutes: Long, + val rpc: NodeRPCConnection +) : AutoCloseable { + companion object { + val logger = loggerFor() + } + + val clientSessions = mutableMapOf() + + /** + * Remove all client sessions that has no state monitors running or are expired (there was no action on them for some time) + * + * It's executed periodically by the framework, but can be run manually as well when needed. + * Polling period taken from const `$SessionExpireCheckInterval` + */ + @Scheduled(fixedDelay = SessionExpireCheckInterval) + fun cleanInvalidClientSessions() { + logger.info("Remove all invalid client sessions. Before - {}", clientSessions.size) + clientSessions.entries.removeAll { !it.value.hasMonitorRunning() || it.value.isExpired() } + logger.info("Remove all invalid client sessions. After - {}", clientSessions.size) + } + + /** + * Remove all running client monitors. + */ + @PreDestroy + override fun close() { + logger.info("StateMonitorQueuesManager close - stop all running monitors.") + clientSessions.forEach { it.value.close() } + } + + /** + * Run StateMonitorClientSession functions in specified client session context. + * + * When client with specified ID is not found, then new client session is created. + * Function is inlined in callers code, it's possible to return values from this block. + * After each call the access time for given client is refreshed. + * If client has no running monitors after the user-defined block, then it's session is removed. + * + * @param clientAppId string representation of client ID + * @param block lambda function to be executed in given client session context + */ + final inline fun withClient(clientAppId: String, block: StateMonitorClientSession.() -> T): T { + // Get client session and update it's expire time + val clientSession = + this.clientSessions.getOrPut(clientAppId) { StateMonitorClientSession(rpc, sessionExpireMinutes) } + clientSession.refreshExpireTime() + + // Run the caller logic on specific client session + val results = clientSession.block() + logger.debug("Monitor withClient block response: {}", results) + + // Check if client session still valid + if (!clientSession.hasMonitorRunning()) { + logger.info("Client session {} not valid anymore - remove.", clientAppId) + this.clientSessions.remove(clientAppId) + } + + return results + } +} \ No newline at end of file diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/ClearMonitorTransactionsV1Request.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/ClearMonitorTransactionsV1Request.kt new file mode 100644 index 0000000000..48d4de133f --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/ClearMonitorTransactionsV1Request.kt @@ -0,0 +1,32 @@ +package org.hyperledger.cactus.plugin.ledger.connector.corda.server.model + +import java.util.Objects +import com.fasterxml.jackson.annotation.JsonProperty +import javax.validation.constraints.DecimalMax +import javax.validation.constraints.DecimalMin +import javax.validation.constraints.Max +import javax.validation.constraints.Min +import javax.validation.constraints.NotNull +import javax.validation.constraints.Pattern +import javax.validation.constraints.Size +import javax.validation.Valid + +/** + * + * @param clientAppId ID of a client application that wants to monitor the state changes + * @param stateFullClassName The fully qualified name of the Corda state to monitor + * @param txIndexes + */ +data class ClearMonitorTransactionsV1Request( + + @get:Size(min=1,max=1024) + @field:JsonProperty("clientAppId", required = true) val clientAppId: kotlin.String, + + @get:Size(min=1,max=1024) + @field:JsonProperty("stateFullClassName", required = true) val stateFullClassName: kotlin.String, + + @field:JsonProperty("txIndexes", required = true) val txIndexes: kotlin.collections.List +) { + +} + diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/ClearMonitorTransactionsV1Response.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/ClearMonitorTransactionsV1Response.kt new file mode 100644 index 0000000000..3d22addb69 --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/ClearMonitorTransactionsV1Response.kt @@ -0,0 +1,27 @@ +package org.hyperledger.cactus.plugin.ledger.connector.corda.server.model + +import java.util.Objects +import com.fasterxml.jackson.annotation.JsonProperty +import javax.validation.constraints.DecimalMax +import javax.validation.constraints.DecimalMin +import javax.validation.constraints.Max +import javax.validation.constraints.Min +import javax.validation.constraints.NotNull +import javax.validation.constraints.Pattern +import javax.validation.constraints.Size +import javax.validation.Valid + +/** + * + * @param success Flag set to true if operation completed correctly. + * @param msg Message describing operation status or any errors that occurred. + */ +data class ClearMonitorTransactionsV1Response( + + @field:JsonProperty("success", required = true) val success: kotlin.Boolean, + + @field:JsonProperty("msg", required = true) val msg: kotlin.String +) { + +} + diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1Request.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1Request.kt new file mode 100644 index 0000000000..1990872221 --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1Request.kt @@ -0,0 +1,29 @@ +package org.hyperledger.cactus.plugin.ledger.connector.corda.server.model + +import java.util.Objects +import com.fasterxml.jackson.annotation.JsonProperty +import javax.validation.constraints.DecimalMax +import javax.validation.constraints.DecimalMin +import javax.validation.constraints.Max +import javax.validation.constraints.Min +import javax.validation.constraints.NotNull +import javax.validation.constraints.Pattern +import javax.validation.constraints.Size +import javax.validation.Valid + +/** + * + * @param clientAppId ID of a client application that wants to monitor the state changes + * @param stateFullClassName The fully qualified name of the Corda state to monitor + */ +data class GetMonitorTransactionsV1Request( + + @get:Size(min=1,max=1024) + @field:JsonProperty("clientAppId", required = true) val clientAppId: kotlin.String, + + @get:Size(min=1,max=1024) + @field:JsonProperty("stateFullClassName", required = true) val stateFullClassName: kotlin.String +) { + +} + diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1Response.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1Response.kt new file mode 100644 index 0000000000..587efada36 --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1Response.kt @@ -0,0 +1,36 @@ +package org.hyperledger.cactus.plugin.ledger.connector.corda.server.model + +import java.util.Objects +import com.fasterxml.jackson.annotation.JsonProperty +import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1ResponseTx +import javax.validation.constraints.DecimalMax +import javax.validation.constraints.DecimalMin +import javax.validation.constraints.Max +import javax.validation.constraints.Min +import javax.validation.constraints.NotNull +import javax.validation.constraints.Pattern +import javax.validation.constraints.Size +import javax.validation.Valid + +/** + * + * @param success Flag set to true if operation completed correctly. + * @param msg Message describing operation status or any errors that occurred. + * @param stateFullClassName The fully qualified name of the Corda state to monitor + * @param tx + */ +data class GetMonitorTransactionsV1Response( + + @field:JsonProperty("success", required = true) val success: kotlin.Boolean, + + @field:JsonProperty("msg", required = true) val msg: kotlin.String, + + @get:Size(min=1,max=1024) + @field:JsonProperty("stateFullClassName") val stateFullClassName: kotlin.String? = null, + + @field:Valid + @field:JsonProperty("tx") val tx: kotlin.collections.List? = null +) { + +} + diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1ResponseTx.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1ResponseTx.kt new file mode 100644 index 0000000000..f8aad73032 --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1ResponseTx.kt @@ -0,0 +1,27 @@ +package org.hyperledger.cactus.plugin.ledger.connector.corda.server.model + +import java.util.Objects +import com.fasterxml.jackson.annotation.JsonProperty +import javax.validation.constraints.DecimalMax +import javax.validation.constraints.DecimalMin +import javax.validation.constraints.Max +import javax.validation.constraints.Min +import javax.validation.constraints.NotNull +import javax.validation.constraints.Pattern +import javax.validation.constraints.Size +import javax.validation.Valid + +/** + * + * @param index + * @param data + */ +data class GetMonitorTransactionsV1ResponseTx( + + @field:JsonProperty("index") val index: kotlin.String? = null, + + @field:JsonProperty("data") val data: kotlin.String? = null +) { + +} + diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StartMonitorV1Request.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StartMonitorV1Request.kt new file mode 100644 index 0000000000..3955d04202 --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StartMonitorV1Request.kt @@ -0,0 +1,29 @@ +package org.hyperledger.cactus.plugin.ledger.connector.corda.server.model + +import java.util.Objects +import com.fasterxml.jackson.annotation.JsonProperty +import javax.validation.constraints.DecimalMax +import javax.validation.constraints.DecimalMin +import javax.validation.constraints.Max +import javax.validation.constraints.Min +import javax.validation.constraints.NotNull +import javax.validation.constraints.Pattern +import javax.validation.constraints.Size +import javax.validation.Valid + +/** + * + * @param clientAppId ID of a client application that wants to monitor the state changes + * @param stateFullClassName The fully qualified name of the Corda state to monitor + */ +data class StartMonitorV1Request( + + @get:Size(min=1,max=1024) + @field:JsonProperty("clientAppId", required = true) val clientAppId: kotlin.String, + + @get:Size(min=1,max=1024) + @field:JsonProperty("stateFullClassName", required = true) val stateFullClassName: kotlin.String +) { + +} + diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StartMonitorV1Response.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StartMonitorV1Response.kt new file mode 100644 index 0000000000..c320125102 --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StartMonitorV1Response.kt @@ -0,0 +1,27 @@ +package org.hyperledger.cactus.plugin.ledger.connector.corda.server.model + +import java.util.Objects +import com.fasterxml.jackson.annotation.JsonProperty +import javax.validation.constraints.DecimalMax +import javax.validation.constraints.DecimalMin +import javax.validation.constraints.Max +import javax.validation.constraints.Min +import javax.validation.constraints.NotNull +import javax.validation.constraints.Pattern +import javax.validation.constraints.Size +import javax.validation.Valid + +/** + * + * @param success Flag set to true if monitoring started correctly. + * @param msg Message describing operation status or any errors that occurred. + */ +data class StartMonitorV1Response( + + @field:JsonProperty("success", required = true) val success: kotlin.Boolean, + + @field:JsonProperty("msg", required = true) val msg: kotlin.String +) { + +} + diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StopMonitorV1Request.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StopMonitorV1Request.kt new file mode 100644 index 0000000000..95aaa62bb2 --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StopMonitorV1Request.kt @@ -0,0 +1,29 @@ +package org.hyperledger.cactus.plugin.ledger.connector.corda.server.model + +import java.util.Objects +import com.fasterxml.jackson.annotation.JsonProperty +import javax.validation.constraints.DecimalMax +import javax.validation.constraints.DecimalMin +import javax.validation.constraints.Max +import javax.validation.constraints.Min +import javax.validation.constraints.NotNull +import javax.validation.constraints.Pattern +import javax.validation.constraints.Size +import javax.validation.Valid + +/** + * + * @param clientAppId ID of a client application that wants to monitor the state changes + * @param stateFullClassName The fully qualified name of the Corda state to monitor + */ +data class StopMonitorV1Request( + + @get:Size(min=1,max=1024) + @field:JsonProperty("clientAppId", required = true) val clientAppId: kotlin.String, + + @get:Size(min=1,max=1024) + @field:JsonProperty("stateFullClassName", required = true) val stateFullClassName: kotlin.String +) { + +} + diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StopMonitorV1Response.kt b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StopMonitorV1Response.kt new file mode 100644 index 0000000000..7c0413fac4 --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/main-server/kotlin/gen/kotlin-spring/src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StopMonitorV1Response.kt @@ -0,0 +1,27 @@ +package org.hyperledger.cactus.plugin.ledger.connector.corda.server.model + +import java.util.Objects +import com.fasterxml.jackson.annotation.JsonProperty +import javax.validation.constraints.DecimalMax +import javax.validation.constraints.DecimalMin +import javax.validation.constraints.Max +import javax.validation.constraints.Min +import javax.validation.constraints.NotNull +import javax.validation.constraints.Pattern +import javax.validation.constraints.Size +import javax.validation.Valid + +/** + * + * @param success Flag set to true if operation completed correctly. + * @param msg Message describing operation status or any errors that occurred. + */ +data class StopMonitorV1Response( + + @field:JsonProperty("success", required = true) val success: kotlin.Boolean, + + @field:JsonProperty("msg", required = true) val msg: kotlin.String +) { + +} + diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main/json/openapi.json b/packages/cactus-plugin-ledger-connector-corda/src/main/json/openapi.json index ed235b71ab..67fd6036a5 100644 --- a/packages/cactus-plugin-ledger-connector-corda/src/main/json/openapi.json +++ b/packages/cactus-plugin-ledger-connector-corda/src/main/json/openapi.json @@ -456,7 +456,7 @@ "required": [ "success", "callOutput", - "flowId" + "flowId" ], "properties": { "success": { @@ -490,6 +490,220 @@ } } }, + "StartMonitorV1Request": { + "type": "object", + "required": [ + "clientAppId", + "stateFullClassName" + ], + "additionalProperties": false, + "properties": { + "clientAppId": { + "description": "ID of a client application that wants to monitor the state changes", + "example": "frond_v1_x_7Hdg6s", + "type": "string", + "minLength": 1, + "maxLength": 1024, + "nullable": false + }, + "stateFullClassName": { + "description": "The fully qualified name of the Corda state to monitor", + "example": "net.corda.samples.example.states.IOUState", + "type": "string", + "minLength": 1, + "maxLength": 1024, + "nullable": false + } + } + }, + "StartMonitorV1Response": { + "type": "object", + "required": [ + "success", + "msg" + ], + "properties": { + "success": { + "type": "boolean", + "description": "Flag set to true if monitoring started correctly.", + "nullable": false + }, + "msg": { + "type": "string", + "description": "Message describing operation status or any errors that occurred.", + "nullable": false + } + } + }, + "GetMonitorTransactionsV1Request": { + "type": "object", + "required": [ + "clientAppId", + "stateFullClassName" + ], + "additionalProperties": false, + "properties": { + "clientAppId": { + "description": "ID of a client application that wants to monitor the state changes", + "example": "frond_v1_x_7Hdg6s", + "type": "string", + "minLength": 1, + "maxLength": 1024, + "nullable": false + }, + "stateFullClassName": { + "description": "The fully qualified name of the Corda state to monitor", + "example": "net.corda.samples.example.states.IOUState", + "type": "string", + "minLength": 1, + "maxLength": 1024, + "nullable": false + } + } + }, + "GetMonitorTransactionsV1Response": { + "type": "object", + "required": [ + "success", + "msg" + ], + "properties": { + "success": { + "type": "boolean", + "description": "Flag set to true if operation completed correctly.", + "nullable": false + }, + "msg": { + "type": "string", + "description": "Message describing operation status or any errors that occurred.", + "nullable": false + }, + "stateFullClassName": { + "description": "The fully qualified name of the Corda state to monitor", + "example": "net.corda.samples.example.states.IOUState", + "type": "string", + "minLength": 1, + "maxLength": 1024, + "nullable": false + }, + "tx": { + "type": "array", + "default": [], + "items": { + "type": "object", + "properties": { + "index": { + "type": "string" + }, + "data": { + "type": "string", + "minItems": 0, + "maxItems": 10e6 + } + } + } + } + } + }, + "ClearMonitorTransactionsV1Request": { + "type": "object", + "required": [ + "clientAppId", + "stateFullClassName", + "txIndexes" + ], + "additionalProperties": false, + "properties": { + "clientAppId": { + "description": "ID of a client application that wants to monitor the state changes", + "example": "frond_v1_x_7Hdg6s", + "type": "string", + "minLength": 1, + "maxLength": 1024, + "nullable": false + }, + "stateFullClassName": { + "description": "The fully qualified name of the Corda state to monitor", + "example": "net.corda.samples.example.states.IOUState", + "type": "string", + "minLength": 1, + "maxLength": 1024, + "nullable": false + }, + "txIndexes": { + "type": "array", + "default": [], + "items": { + "type": "string" + } + } + } + }, + "ClearMonitorTransactionsV1Response": { + "type": "object", + "required": [ + "success", + "msg" + ], + "additionalProperties": false, + "properties": { + "success": { + "type": "boolean", + "description": "Flag set to true if operation completed correctly.", + "nullable": false + }, + "msg": { + "type": "string", + "description": "Message describing operation status or any errors that occurred.", + "nullable": false + } + } + }, + "StopMonitorV1Request": { + "type": "object", + "required": [ + "clientAppId", + "stateFullClassName" + ], + "additionalProperties": false, + "properties": { + "clientAppId": { + "description": "ID of a client application that wants to monitor the state changes", + "example": "frond_v1_x_7Hdg6s", + "type": "string", + "minLength": 1, + "maxLength": 1024, + "nullable": false + }, + "stateFullClassName": { + "description": "The fully qualified name of the Corda state to monitor", + "example": "net.corda.samples.example.states.IOUState", + "type": "string", + "minLength": 1, + "maxLength": 1024, + "nullable": false + } + } + }, + "StopMonitorV1Response": { + "type": "object", + "required": [ + "success", + "msg" + ], + "properties": { + "success": { + "type": "boolean", + "description": "Flag set to true if operation completed correctly.", + "nullable": false + }, + "msg": { + "type": "string", + "description": "Message describing operation status or any errors that occurred.", + "nullable": false + } + } + }, "ListFlowsV1Request": { "type": "object", "additionalProperties": false, @@ -801,6 +1015,142 @@ } } }, + "/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/start-monitor": { + "post": { + "operationId": "startMonitorV1", + "x-hyperledger-cactus": { + "http": { + "path": "/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/start-monitor", + "verbLowerCase": "post" + } + }, + "summary": "Start monitoring corda changes (transactions) of given state class", + "parameters": [], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/StartMonitorV1Request" + } + } + } + }, + "responses": { + "200": { + "description": "OK", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/StartMonitorV1Response" + } + } + } + } + } + } + }, + "/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/get-monitor-transactions": { + "get": { + "operationId": "GetMonitorTransactionsV1", + "x-hyperledger-cactus": { + "http": { + "path": "/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/get-monitor-transactions", + "verbLowerCase": "get" + } + }, + "summary": "Get transactions for monitored state classes.", + "parameters": [], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/GetMonitorTransactionsV1Request" + } + } + } + }, + "responses": { + "200": { + "description": "OK", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/GetMonitorTransactionsV1Response" + } + } + } + } + } + } + }, + "/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/clear-monitor-transactions": { + "delete": { + "operationId": "ClearMonitorTransactionsV1", + "x-hyperledger-cactus": { + "http": { + "path": "/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/clear-monitor-transactions", + "verbLowerCase": "delete" + } + }, + "summary": "Clear transactions from internal store so they'll not be available by GetMonitorTransactionsV1 anymore.", + "parameters": [], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ClearMonitorTransactionsV1Request" + } + } + } + }, + "responses": { + "200": { + "description": "OK", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ClearMonitorTransactionsV1Response" + } + } + } + } + } + } + }, + "/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/stop-monitor": { + "delete": { + "operationId": "stopMonitorV1", + "x-hyperledger-cactus": { + "http": { + "path": "/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/stop-monitor", + "verbLowerCase": "delete" + } + }, + "summary": "Stop monitoring corda changes (transactions) of given state class", + "parameters": [], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/StopMonitorV1Request" + } + } + } + }, + "responses": { + "200": { + "description": "OK", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/StopMonitorV1Response" + } + } + } + } + } + } + }, "/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/network-map": { "post": { "operationId": "networkMapV1", diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main/typescript/api-client/corda-api-client.ts b/packages/cactus-plugin-ledger-connector-corda/src/main/typescript/api-client/corda-api-client.ts new file mode 100644 index 0000000000..aed280c7b0 --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/main/typescript/api-client/corda-api-client.ts @@ -0,0 +1,237 @@ +import { Observable, ReplaySubject } from "rxjs"; +import { finalize, share } from "rxjs/operators"; +import { + Logger, + LogLevelDesc, + LoggerProvider, + Checks, +} from "@hyperledger/cactus-common"; +import { ISocketApiClient } from "@hyperledger/cactus-core-api"; +import { + DefaultApi, + GetMonitorTransactionsV1ResponseTx, +} from "../generated/openapi/typescript-axios"; +import { Configuration } from "../generated/openapi/typescript-axios/configuration"; + +const DEFAULT_POLL_RATE_MS = 5000; + +type CordaBlock = GetMonitorTransactionsV1ResponseTx; + +/** + * Options for CordaApiClient.watchBlocksV1 method. + */ +export type watchBlocksV1Options = { + readonly stateFullClassName: string; + readonly clientAppId: string; + readonly pollRate?: number; +}; + +export class CordaApiClientOptions extends Configuration { + readonly logLevel?: LogLevelDesc; +} + +/** + * ApiClient to call remote Corda connector. + */ +export class CordaApiClient + extends DefaultApi + implements ISocketApiClient { + public static readonly CLASS_NAME = "CordaApiClient"; + + private readonly log: Logger; + + public get className(): string { + return CordaApiClient.CLASS_NAME; + } + + constructor(public readonly options: CordaApiClientOptions) { + super(options); + const fnTag = `${this.className}#constructor()`; + Checks.truthy(options, `${fnTag} arg options`); + + const level = this.options.logLevel || "INFO"; + const label = this.className; + this.log = LoggerProvider.getOrCreate({ level, label }); + + this.log.debug(`Created ${this.className} OK.`); + this.log.debug(`basePath=${this.options.basePath}`); + } + + /** + * Send low-level HTTP API startMonitorV1 to the connector to start monitoring queue for specified client. + * Errors are pushed to RxJs subject. + * + * @param subject RxJs subject associated with this monitoring request. + * @param clientAppId Client application ID to identify monitoring queue on the connector. + * @param stateName Corda state to monitor. + */ + private async sendStartMonitorRequest( + subject: ReplaySubject, + clientAppId: string, + stateName: string, + ) { + const reportError = (err: any) => { + this.log.warn("Error in startMonitorV1:", err); + subject.error(`startMonitorV1 for '${stateName}' transactions failed`); + }; + + try { + const startMonRes = await this.startMonitorV1({ + clientAppId: clientAppId, + stateFullClassName: stateName, + }); + + if (startMonRes.status != 200 || !startMonRes.data.success) { + reportError( + `Wrong response: status ${startMonRes.status}, success ${startMonRes.data.success}, msg ${startMonRes.data.msg}`, + ); + } else { + this.log.info(`Monitoring for ${stateName} transactions started.`); + } + } catch (err) { + reportError(err); + } + } + + /** + * Function to perform single request to read and confirm retrieval of transactions from the connector. + * Should be executed periodically (i.e. connector should be polled for new transactions). + * New transactions are pushed into the subject. + * + * @param subject RxJs subject associated with this monitoring request. + * @param clientAppId Client application ID to identify monitoring queue on the connector. + * @param stateName Corda state to monitor. + */ + private async pollTransactionsLogin( + subject: ReplaySubject, + clientAppId: string, + stateName: string, + ) { + try { + const response = await this.getMonitorTransactionsV1({ + clientAppId: clientAppId, + stateFullClassName: stateName, + }); + + if (response.status != 200 || !response.data.success) { + throw new Error(`Poll error: ${response.data.msg}`); + } + + if (response.data.stateFullClassName != stateName) { + throw new Error( + `Received class name mismatch! ${stateName} != ${response.data.stateFullClassName}`, + ); + } + + if (!response.data.tx) { + this.log.debug("No new transactions, continue..."); + return; + } + + const readTxIdx = response.data.tx.map((tx) => tx.index); + await this.clearMonitorTransactionsV1({ + clientAppId: clientAppId, + stateFullClassName: stateName, + txIndexes: readTxIdx?.filter(Boolean) as string[], + }); + + response.data.tx.forEach((tx) => subject.next(tx)); + } catch (err) { + this.log.warn("Monitor poll error for state", stateName); + subject.error(err); + } + } + + /** + * Should be called to stop monitoring on the connector. + * Calling this will remove all pending transactions (that were not read yet)! + * + * @param monitor Monitoring interval set with `setTimeout`. + * @param clientAppId Client application ID to identify monitoring queue on the connector. + * @param stateName Corda state to monitor. + */ + private finalizeMonitoring( + monitor: ReturnType, + clientAppId: string, + stateName: string, + ) { + this.log.info("Unsubscribe from the monitoring..."); + + clearInterval(monitor); + + this.stopMonitorV1({ + clientAppId: clientAppId, + stateFullClassName: stateName, + }) + .then((stopMonRes) => { + if (stopMonRes.status != 200 || !stopMonRes.data.success) { + this.log.warn( + "Error response from stopMonitorV1:", + stopMonRes.data.msg, + ); + } else { + this.log.info(`Monitoring for ${stateName} transactions stopped.`); + } + }) + .catch((err) => { + this.log.warn("Error when calling stopMonitorV1:", err); + }); + } + + /** + * Watch new transactions (state changes) on the corda ledger. + * + * @param options.stateFullClassName Corda state to monitor + * @param options.clientAppId Calling app ID. Each monitoring queue on the connector is associated with a client ID. + * @param options.pollRate How often poll the connector for new transactions. Defaults to 5s + * + * @returns RxJS observable of transactions. + */ + public async watchBlocksAsyncV1( + options: watchBlocksV1Options, + ): Promise> { + Checks.truthy(options, "watchBlocksV1 missing options"); + Checks.nonBlankString( + options.stateFullClassName, + "watchBlocksV1 stateFullClassName empty", + ); + Checks.nonBlankString( + options.clientAppId, + "watchBlocksV1 clientAppId empty", + ); + const pollRate = options.pollRate ?? DEFAULT_POLL_RATE_MS; + this.log.debug("Using monitoring poll rate:", pollRate); + + const subject = new ReplaySubject(0); + + // Start monitoring + await this.sendStartMonitorRequest( + subject, + options.clientAppId, + options.stateFullClassName, + ); + + // Periodically poll + const monitoringInterval = setInterval( + () => + this.pollTransactionsLogin( + subject, + options.clientAppId, + options.stateFullClassName, + ), + pollRate, + ); + + // Share and finalize monitoring when not listened to anymore + return subject.pipe( + finalize(() => + this.finalizeMonitoring( + monitoringInterval, + options.clientAppId, + options.stateFullClassName, + ), + ), + share(), + ); + } +} diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main/typescript/generated/openapi/typescript-axios/api.ts b/packages/cactus-plugin-ledger-connector-corda/src/main/typescript/generated/openapi/typescript-axios/api.ts index d6e8e61590..59a9cd20b4 100644 --- a/packages/cactus-plugin-ledger-connector-corda/src/main/typescript/generated/openapi/typescript-axios/api.ts +++ b/packages/cactus-plugin-ledger-connector-corda/src/main/typescript/generated/openapi/typescript-axios/api.ts @@ -21,6 +21,50 @@ import { DUMMY_BASE_URL, assertParamExists, setApiKeyToObject, setBasicAuthToObj // @ts-ignore import { BASE_PATH, COLLECTION_FORMATS, RequestArgs, BaseAPI, RequiredError } from './base'; +/** + * + * @export + * @interface ClearMonitorTransactionsV1Request + */ +export interface ClearMonitorTransactionsV1Request { + /** + * ID of a client application that wants to monitor the state changes + * @type {string} + * @memberof ClearMonitorTransactionsV1Request + */ + clientAppId: string; + /** + * The fully qualified name of the Corda state to monitor + * @type {string} + * @memberof ClearMonitorTransactionsV1Request + */ + stateFullClassName: string; + /** + * + * @type {Array} + * @memberof ClearMonitorTransactionsV1Request + */ + txIndexes: Array; +} +/** + * + * @export + * @interface ClearMonitorTransactionsV1Response + */ +export interface ClearMonitorTransactionsV1Response { + /** + * Flag set to true if operation completed correctly. + * @type {boolean} + * @memberof ClearMonitorTransactionsV1Response + */ + success: boolean; + /** + * Message describing operation status or any errors that occurred. + * @type {string} + * @memberof ClearMonitorTransactionsV1Response + */ + msg: string; +} /** * * @export @@ -324,6 +368,75 @@ export enum FlowInvocationType { FlowDynamic = 'FLOW_DYNAMIC' } +/** + * + * @export + * @interface GetMonitorTransactionsV1Request + */ +export interface GetMonitorTransactionsV1Request { + /** + * ID of a client application that wants to monitor the state changes + * @type {string} + * @memberof GetMonitorTransactionsV1Request + */ + clientAppId: string; + /** + * The fully qualified name of the Corda state to monitor + * @type {string} + * @memberof GetMonitorTransactionsV1Request + */ + stateFullClassName: string; +} +/** + * + * @export + * @interface GetMonitorTransactionsV1Response + */ +export interface GetMonitorTransactionsV1Response { + /** + * Flag set to true if operation completed correctly. + * @type {boolean} + * @memberof GetMonitorTransactionsV1Response + */ + success: boolean; + /** + * Message describing operation status or any errors that occurred. + * @type {string} + * @memberof GetMonitorTransactionsV1Response + */ + msg: string; + /** + * The fully qualified name of the Corda state to monitor + * @type {string} + * @memberof GetMonitorTransactionsV1Response + */ + stateFullClassName?: string; + /** + * + * @type {Array} + * @memberof GetMonitorTransactionsV1Response + */ + tx?: Array; +} +/** + * + * @export + * @interface GetMonitorTransactionsV1ResponseTx + */ +export interface GetMonitorTransactionsV1ResponseTx { + /** + * + * @type {string} + * @memberof GetMonitorTransactionsV1ResponseTx + */ + index?: string; + /** + * + * @type {string} + * @memberof GetMonitorTransactionsV1ResponseTx + */ + data?: string; +} /** * * @export @@ -674,6 +787,82 @@ export interface SHA256 { */ size: number; } +/** + * + * @export + * @interface StartMonitorV1Request + */ +export interface StartMonitorV1Request { + /** + * ID of a client application that wants to monitor the state changes + * @type {string} + * @memberof StartMonitorV1Request + */ + clientAppId: string; + /** + * The fully qualified name of the Corda state to monitor + * @type {string} + * @memberof StartMonitorV1Request + */ + stateFullClassName: string; +} +/** + * + * @export + * @interface StartMonitorV1Response + */ +export interface StartMonitorV1Response { + /** + * Flag set to true if monitoring started correctly. + * @type {boolean} + * @memberof StartMonitorV1Response + */ + success: boolean; + /** + * Message describing operation status or any errors that occurred. + * @type {string} + * @memberof StartMonitorV1Response + */ + msg: string; +} +/** + * + * @export + * @interface StopMonitorV1Request + */ +export interface StopMonitorV1Request { + /** + * ID of a client application that wants to monitor the state changes + * @type {string} + * @memberof StopMonitorV1Request + */ + clientAppId: string; + /** + * The fully qualified name of the Corda state to monitor + * @type {string} + * @memberof StopMonitorV1Request + */ + stateFullClassName: string; +} +/** + * + * @export + * @interface StopMonitorV1Response + */ +export interface StopMonitorV1Response { + /** + * Flag set to true if operation completed correctly. + * @type {boolean} + * @memberof StopMonitorV1Response + */ + success: boolean; + /** + * Message describing operation status or any errors that occurred. + * @type {string} + * @memberof StopMonitorV1Response + */ + msg: string; +} /** * * @export @@ -700,6 +889,40 @@ export interface X500Principal { */ export const DefaultApiAxiosParamCreator = function (configuration?: Configuration) { return { + /** + * + * @summary Clear transactions from internal store so they\'ll not be available by GetMonitorTransactionsV1 anymore. + * @param {ClearMonitorTransactionsV1Request} [clearMonitorTransactionsV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + clearMonitorTransactionsV1: async (clearMonitorTransactionsV1Request?: ClearMonitorTransactionsV1Request, options: any = {}): Promise => { + const localVarPath = `/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/clear-monitor-transactions`; + // use dummy base URL string because the URL constructor only accepts absolute URLs. + const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL); + let baseOptions; + if (configuration) { + baseOptions = configuration.baseOptions; + } + + const localVarRequestOptions = { method: 'DELETE', ...baseOptions, ...options}; + const localVarHeaderParameter = {} as any; + const localVarQueryParameter = {} as any; + + + + localVarHeaderParameter['Content-Type'] = 'application/json'; + + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); + let headersFromBaseOptions = baseOptions && baseOptions.headers ? baseOptions.headers : {}; + localVarRequestOptions.headers = {...localVarHeaderParameter, ...headersFromBaseOptions, ...options.headers}; + localVarRequestOptions.data = serializeDataIfNeeded(clearMonitorTransactionsV1Request, localVarRequestOptions, configuration) + + return { + url: toPathString(localVarUrlObj), + options: localVarRequestOptions, + }; + }, /** * * @summary Deploys a set of jar files (Cordapps, e.g. the contracts in Corda speak). @@ -767,6 +990,40 @@ export const DefaultApiAxiosParamCreator = function (configuration?: Configurati options: localVarRequestOptions, }; }, + /** + * + * @summary Get transactions for monitored state classes. + * @param {GetMonitorTransactionsV1Request} [getMonitorTransactionsV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + getMonitorTransactionsV1: async (getMonitorTransactionsV1Request?: GetMonitorTransactionsV1Request, options: any = {}): Promise => { + const localVarPath = `/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/get-monitor-transactions`; + // use dummy base URL string because the URL constructor only accepts absolute URLs. + const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL); + let baseOptions; + if (configuration) { + baseOptions = configuration.baseOptions; + } + + const localVarRequestOptions = { method: 'GET', ...baseOptions, ...options}; + const localVarHeaderParameter = {} as any; + const localVarQueryParameter = {} as any; + + + + localVarHeaderParameter['Content-Type'] = 'application/json'; + + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); + let headersFromBaseOptions = baseOptions && baseOptions.headers ? baseOptions.headers : {}; + localVarRequestOptions.headers = {...localVarHeaderParameter, ...headersFromBaseOptions, ...options.headers}; + localVarRequestOptions.data = serializeDataIfNeeded(getMonitorTransactionsV1Request, localVarRequestOptions, configuration) + + return { + url: toPathString(localVarUrlObj), + options: localVarRequestOptions, + }; + }, /** * * @summary Get the Prometheus Metrics @@ -892,6 +1149,74 @@ export const DefaultApiAxiosParamCreator = function (configuration?: Configurati localVarRequestOptions.headers = {...localVarHeaderParameter, ...headersFromBaseOptions, ...options.headers}; localVarRequestOptions.data = serializeDataIfNeeded(body, localVarRequestOptions, configuration) + return { + url: toPathString(localVarUrlObj), + options: localVarRequestOptions, + }; + }, + /** + * + * @summary Start monitoring corda changes (transactions) of given state class + * @param {StartMonitorV1Request} [startMonitorV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + startMonitorV1: async (startMonitorV1Request?: StartMonitorV1Request, options: any = {}): Promise => { + const localVarPath = `/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/start-monitor`; + // use dummy base URL string because the URL constructor only accepts absolute URLs. + const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL); + let baseOptions; + if (configuration) { + baseOptions = configuration.baseOptions; + } + + const localVarRequestOptions = { method: 'POST', ...baseOptions, ...options}; + const localVarHeaderParameter = {} as any; + const localVarQueryParameter = {} as any; + + + + localVarHeaderParameter['Content-Type'] = 'application/json'; + + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); + let headersFromBaseOptions = baseOptions && baseOptions.headers ? baseOptions.headers : {}; + localVarRequestOptions.headers = {...localVarHeaderParameter, ...headersFromBaseOptions, ...options.headers}; + localVarRequestOptions.data = serializeDataIfNeeded(startMonitorV1Request, localVarRequestOptions, configuration) + + return { + url: toPathString(localVarUrlObj), + options: localVarRequestOptions, + }; + }, + /** + * + * @summary Stop monitoring corda changes (transactions) of given state class + * @param {StopMonitorV1Request} [stopMonitorV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + stopMonitorV1: async (stopMonitorV1Request?: StopMonitorV1Request, options: any = {}): Promise => { + const localVarPath = `/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/stop-monitor`; + // use dummy base URL string because the URL constructor only accepts absolute URLs. + const localVarUrlObj = new URL(localVarPath, DUMMY_BASE_URL); + let baseOptions; + if (configuration) { + baseOptions = configuration.baseOptions; + } + + const localVarRequestOptions = { method: 'DELETE', ...baseOptions, ...options}; + const localVarHeaderParameter = {} as any; + const localVarQueryParameter = {} as any; + + + + localVarHeaderParameter['Content-Type'] = 'application/json'; + + setSearchParams(localVarUrlObj, localVarQueryParameter, options.query); + let headersFromBaseOptions = baseOptions && baseOptions.headers ? baseOptions.headers : {}; + localVarRequestOptions.headers = {...localVarHeaderParameter, ...headersFromBaseOptions, ...options.headers}; + localVarRequestOptions.data = serializeDataIfNeeded(stopMonitorV1Request, localVarRequestOptions, configuration) + return { url: toPathString(localVarUrlObj), options: localVarRequestOptions, @@ -907,6 +1232,17 @@ export const DefaultApiAxiosParamCreator = function (configuration?: Configurati export const DefaultApiFp = function(configuration?: Configuration) { const localVarAxiosParamCreator = DefaultApiAxiosParamCreator(configuration) return { + /** + * + * @summary Clear transactions from internal store so they\'ll not be available by GetMonitorTransactionsV1 anymore. + * @param {ClearMonitorTransactionsV1Request} [clearMonitorTransactionsV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + async clearMonitorTransactionsV1(clearMonitorTransactionsV1Request?: ClearMonitorTransactionsV1Request, options?: any): Promise<(axios?: AxiosInstance, basePath?: string) => AxiosPromise> { + const localVarAxiosArgs = await localVarAxiosParamCreator.clearMonitorTransactionsV1(clearMonitorTransactionsV1Request, options); + return createRequestFunction(localVarAxiosArgs, globalAxios, BASE_PATH, configuration); + }, /** * * @summary Deploys a set of jar files (Cordapps, e.g. the contracts in Corda speak). @@ -928,6 +1264,17 @@ export const DefaultApiFp = function(configuration?: Configuration) { const localVarAxiosArgs = await localVarAxiosParamCreator.diagnoseNodeV1(diagnoseNodeV1Request, options); return createRequestFunction(localVarAxiosArgs, globalAxios, BASE_PATH, configuration); }, + /** + * + * @summary Get transactions for monitored state classes. + * @param {GetMonitorTransactionsV1Request} [getMonitorTransactionsV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + async getMonitorTransactionsV1(getMonitorTransactionsV1Request?: GetMonitorTransactionsV1Request, options?: any): Promise<(axios?: AxiosInstance, basePath?: string) => AxiosPromise> { + const localVarAxiosArgs = await localVarAxiosParamCreator.getMonitorTransactionsV1(getMonitorTransactionsV1Request, options); + return createRequestFunction(localVarAxiosArgs, globalAxios, BASE_PATH, configuration); + }, /** * * @summary Get the Prometheus Metrics @@ -969,6 +1316,28 @@ export const DefaultApiFp = function(configuration?: Configuration) { const localVarAxiosArgs = await localVarAxiosParamCreator.networkMapV1(body, options); return createRequestFunction(localVarAxiosArgs, globalAxios, BASE_PATH, configuration); }, + /** + * + * @summary Start monitoring corda changes (transactions) of given state class + * @param {StartMonitorV1Request} [startMonitorV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + async startMonitorV1(startMonitorV1Request?: StartMonitorV1Request, options?: any): Promise<(axios?: AxiosInstance, basePath?: string) => AxiosPromise> { + const localVarAxiosArgs = await localVarAxiosParamCreator.startMonitorV1(startMonitorV1Request, options); + return createRequestFunction(localVarAxiosArgs, globalAxios, BASE_PATH, configuration); + }, + /** + * + * @summary Stop monitoring corda changes (transactions) of given state class + * @param {StopMonitorV1Request} [stopMonitorV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + async stopMonitorV1(stopMonitorV1Request?: StopMonitorV1Request, options?: any): Promise<(axios?: AxiosInstance, basePath?: string) => AxiosPromise> { + const localVarAxiosArgs = await localVarAxiosParamCreator.stopMonitorV1(stopMonitorV1Request, options); + return createRequestFunction(localVarAxiosArgs, globalAxios, BASE_PATH, configuration); + }, } }; @@ -979,6 +1348,16 @@ export const DefaultApiFp = function(configuration?: Configuration) { export const DefaultApiFactory = function (configuration?: Configuration, basePath?: string, axios?: AxiosInstance) { const localVarFp = DefaultApiFp(configuration) return { + /** + * + * @summary Clear transactions from internal store so they\'ll not be available by GetMonitorTransactionsV1 anymore. + * @param {ClearMonitorTransactionsV1Request} [clearMonitorTransactionsV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + clearMonitorTransactionsV1(clearMonitorTransactionsV1Request?: ClearMonitorTransactionsV1Request, options?: any): AxiosPromise { + return localVarFp.clearMonitorTransactionsV1(clearMonitorTransactionsV1Request, options).then((request) => request(axios, basePath)); + }, /** * * @summary Deploys a set of jar files (Cordapps, e.g. the contracts in Corda speak). @@ -998,6 +1377,16 @@ export const DefaultApiFactory = function (configuration?: Configuration, basePa diagnoseNodeV1(diagnoseNodeV1Request?: DiagnoseNodeV1Request, options?: any): AxiosPromise { return localVarFp.diagnoseNodeV1(diagnoseNodeV1Request, options).then((request) => request(axios, basePath)); }, + /** + * + * @summary Get transactions for monitored state classes. + * @param {GetMonitorTransactionsV1Request} [getMonitorTransactionsV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + getMonitorTransactionsV1(getMonitorTransactionsV1Request?: GetMonitorTransactionsV1Request, options?: any): AxiosPromise { + return localVarFp.getMonitorTransactionsV1(getMonitorTransactionsV1Request, options).then((request) => request(axios, basePath)); + }, /** * * @summary Get the Prometheus Metrics @@ -1035,6 +1424,26 @@ export const DefaultApiFactory = function (configuration?: Configuration, basePa networkMapV1(body?: object, options?: any): AxiosPromise> { return localVarFp.networkMapV1(body, options).then((request) => request(axios, basePath)); }, + /** + * + * @summary Start monitoring corda changes (transactions) of given state class + * @param {StartMonitorV1Request} [startMonitorV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + startMonitorV1(startMonitorV1Request?: StartMonitorV1Request, options?: any): AxiosPromise { + return localVarFp.startMonitorV1(startMonitorV1Request, options).then((request) => request(axios, basePath)); + }, + /** + * + * @summary Stop monitoring corda changes (transactions) of given state class + * @param {StopMonitorV1Request} [stopMonitorV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + */ + stopMonitorV1(stopMonitorV1Request?: StopMonitorV1Request, options?: any): AxiosPromise { + return localVarFp.stopMonitorV1(stopMonitorV1Request, options).then((request) => request(axios, basePath)); + }, }; }; @@ -1045,6 +1454,18 @@ export const DefaultApiFactory = function (configuration?: Configuration, basePa * @extends {BaseAPI} */ export class DefaultApi extends BaseAPI { + /** + * + * @summary Clear transactions from internal store so they\'ll not be available by GetMonitorTransactionsV1 anymore. + * @param {ClearMonitorTransactionsV1Request} [clearMonitorTransactionsV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + * @memberof DefaultApi + */ + public clearMonitorTransactionsV1(clearMonitorTransactionsV1Request?: ClearMonitorTransactionsV1Request, options?: any) { + return DefaultApiFp(this.configuration).clearMonitorTransactionsV1(clearMonitorTransactionsV1Request, options).then((request) => request(this.axios, this.basePath)); + } + /** * * @summary Deploys a set of jar files (Cordapps, e.g. the contracts in Corda speak). @@ -1068,6 +1489,18 @@ export class DefaultApi extends BaseAPI { return DefaultApiFp(this.configuration).diagnoseNodeV1(diagnoseNodeV1Request, options).then((request) => request(this.axios, this.basePath)); } + /** + * + * @summary Get transactions for monitored state classes. + * @param {GetMonitorTransactionsV1Request} [getMonitorTransactionsV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + * @memberof DefaultApi + */ + public getMonitorTransactionsV1(getMonitorTransactionsV1Request?: GetMonitorTransactionsV1Request, options?: any) { + return DefaultApiFp(this.configuration).getMonitorTransactionsV1(getMonitorTransactionsV1Request, options).then((request) => request(this.axios, this.basePath)); + } + /** * * @summary Get the Prometheus Metrics @@ -1112,6 +1545,30 @@ export class DefaultApi extends BaseAPI { public networkMapV1(body?: object, options?: any) { return DefaultApiFp(this.configuration).networkMapV1(body, options).then((request) => request(this.axios, this.basePath)); } + + /** + * + * @summary Start monitoring corda changes (transactions) of given state class + * @param {StartMonitorV1Request} [startMonitorV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + * @memberof DefaultApi + */ + public startMonitorV1(startMonitorV1Request?: StartMonitorV1Request, options?: any) { + return DefaultApiFp(this.configuration).startMonitorV1(startMonitorV1Request, options).then((request) => request(this.axios, this.basePath)); + } + + /** + * + * @summary Stop monitoring corda changes (transactions) of given state class + * @param {StopMonitorV1Request} [stopMonitorV1Request] + * @param {*} [options] Override http request option. + * @throws {RequiredError} + * @memberof DefaultApi + */ + public stopMonitorV1(stopMonitorV1Request?: StopMonitorV1Request, options?: any) { + return DefaultApiFp(this.configuration).stopMonitorV1(stopMonitorV1Request, options).then((request) => request(this.axios, this.basePath)); + } } diff --git a/packages/cactus-plugin-ledger-connector-corda/src/main/typescript/public-api.ts b/packages/cactus-plugin-ledger-connector-corda/src/main/typescript/public-api.ts index 801a8de76d..2403d0e76f 100644 --- a/packages/cactus-plugin-ledger-connector-corda/src/main/typescript/public-api.ts +++ b/packages/cactus-plugin-ledger-connector-corda/src/main/typescript/public-api.ts @@ -20,3 +20,9 @@ export async function createPluginFactory( ): Promise { return new PluginFactoryLedgerConnector(pluginFactoryOptions); } + +export { + CordaApiClient, + CordaApiClientOptions, + watchBlocksV1Options, +} from "./api-client/corda-api-client"; diff --git a/packages/cactus-plugin-ledger-connector-corda/src/test/typescript/integration/monitor-transactions-v4.8.test.ts b/packages/cactus-plugin-ledger-connector-corda/src/test/typescript/integration/monitor-transactions-v4.8.test.ts new file mode 100644 index 0000000000..b58a55499e --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-corda/src/test/typescript/integration/monitor-transactions-v4.8.test.ts @@ -0,0 +1,835 @@ +/** + * Test state change monitoring interface in Kotlin Corda v4 connector component. + */ + +// Contants: Log Levels +const testLogLevel: LogLevelDesc = "debug"; +const sutLogLevel: LogLevelDesc = "info"; + +// Contants: Test ledger +const ledgerImageName = + "ghcr.io/hyperledger/cactus-corda-4-8-all-in-one-obligation"; +const ledgerImageVersion = "2022-03-31-28f0cbf--1956"; +const partyARpcUsername = "user1"; +const partyARpcPassword = "password"; +const partyBRpcUsername = partyARpcUsername; +const partyBRpcPassword = partyARpcPassword; +const stateToMonitor = "net.corda.samples.example.states.IOUState"; +const flowToInvoke = "net.corda.samples.example.flows.ExampleFlow$Initiator"; +const testAppId = "monitor-transactions-test-app"; + +// Contants: Kotlin connector server +const kotlinServerImageName = + "ghcr.io/hyperledger/cactus-connector-corda-server"; +const kotlinServerImageVersion = "2022-04-18-a8a7ed1--1956"; + +import "jest-extended"; +import { v4 as internalIpV4 } from "internal-ip"; + +import { + CordaTestLedger, + SampleCordappEnum, + CordaConnectorContainer, + pruneDockerAllIfGithubAction, +} from "@hyperledger/cactus-test-tooling"; +import { + Logger, + LoggerProvider, + LogLevelDesc, +} from "@hyperledger/cactus-common"; +import { + CordappDeploymentConfig, + FlowInvocationType, + InvokeContractV1Request, + JvmTypeKind, + PublicKey, +} from "../../../main/typescript/generated/openapi/typescript-axios/index"; +import { CordaApiClient } from "../../../main/typescript/api-client/corda-api-client"; +import { Configuration } from "@hyperledger/cactus-core-api"; +import { Subscription } from "rxjs"; + +// Unit Test logger setup +const log: Logger = LoggerProvider.getOrCreate({ + label: "kotlin-server-monitor-transactions-v4.8.test", + level: testLogLevel, +}); + +////////////////////////////////// +// Helper Functions +////////////////////////////////// + +async function deployContract( + apiClient: CordaApiClient, + ledger: CordaTestLedger, + rpcPort: number, + internalIp: string, +) { + log.info("deployContract() called..."); + + const sshConfig = await ledger.getSshConfig(); + const corDappsDirPartyA = await ledger.getCorDappsDirPartyA(); + + const cdcA: CordappDeploymentConfig = { + cordappDir: corDappsDirPartyA, + cordaNodeStartCmd: "supervisorctl start corda-a", + cordaJarPath: + "/samples-kotlin/Advanced/obligation-cordapp/build/nodes/ParticipantA/corda.jar", + nodeBaseDirPath: + "/samples-kotlin/Advanced/obligation-cordapp/build/nodes/ParticipantA/", + rpcCredentials: { + hostname: internalIp, + port: rpcPort, + username: partyARpcUsername, + password: partyARpcPassword, + }, + sshCredentials: { + hostKeyEntry: "foo", + hostname: internalIp, + password: "root", + port: sshConfig.port as number, + username: sshConfig.username as string, + }, + }; + + const partyBRpcPort = await ledger.getRpcBPublicPort(); + const corDappsDirPartyB = await ledger.getCorDappsDirPartyB(); + + const cdcB: CordappDeploymentConfig = { + cordappDir: corDappsDirPartyB, + cordaNodeStartCmd: "supervisorctl start corda-b", + cordaJarPath: + "/samples-kotlin/Advanced/obligation-cordapp/build/nodes/ParticipantB/corda.jar", + nodeBaseDirPath: + "/samples-kotlin/Advanced/obligation-cordapp/build/nodes/ParticipantB/", + rpcCredentials: { + hostname: internalIp, + port: partyBRpcPort, + username: partyBRpcUsername, + password: partyBRpcPassword, + }, + sshCredentials: { + hostKeyEntry: "foo", + hostname: internalIp, + password: "root", + port: sshConfig.port as number, + username: sshConfig.username as string, + }, + }; + + const cordappDeploymentConfigs: CordappDeploymentConfig[] = [cdcA, cdcB]; + log.debug("cordappDeploymentConfigs:", cordappDeploymentConfigs); + + const jarFiles = await ledger.pullCordappJars( + SampleCordappEnum.BASIC_CORDAPP, + ); + expect(jarFiles).toBeTruthy(); + + const deployRes = await apiClient.deployContractJarsV1({ + jarFiles, + cordappDeploymentConfigs, + }); + expect(deployRes.data.deployedJarFiles.length).toBeGreaterThan(0); + + const flowsRes = await apiClient.listFlowsV1(); + expect(flowsRes.data.flowNames).toContain(flowToInvoke); +} + +async function invokeContract(apiClient: CordaApiClient, publicKey: PublicKey) { + const req: InvokeContractV1Request = ({ + timeoutMs: 60000, + flowFullClassName: flowToInvoke, + flowInvocationType: FlowInvocationType.FlowDynamic, + params: [ + { + jvmTypeKind: JvmTypeKind.Primitive, + jvmType: { + fqClassName: "java.lang.Integer", + }, + primitiveValue: 42, + }, + { + jvmTypeKind: JvmTypeKind.Reference, + jvmType: { + fqClassName: "net.corda.core.identity.Party", + }, + jvmCtorArgs: [ + { + jvmTypeKind: JvmTypeKind.Reference, + jvmType: { + fqClassName: "net.corda.core.identity.CordaX500Name", + }, + jvmCtorArgs: [ + { + jvmTypeKind: JvmTypeKind.Primitive, + jvmType: { + fqClassName: "java.lang.String", + }, + primitiveValue: "ParticipantB", + }, + { + jvmTypeKind: JvmTypeKind.Primitive, + jvmType: { + fqClassName: "java.lang.String", + }, + primitiveValue: "New York", + }, + { + jvmTypeKind: JvmTypeKind.Primitive, + jvmType: { + fqClassName: "java.lang.String", + }, + primitiveValue: "US", + }, + ], + }, + { + jvmTypeKind: JvmTypeKind.Reference, + jvmType: { + fqClassName: + "org.hyperledger.cactus.plugin.ledger.connector.corda.server.impl.PublicKeyImpl", + }, + jvmCtorArgs: [ + { + jvmTypeKind: JvmTypeKind.Primitive, + jvmType: { + fqClassName: "java.lang.String", + }, + primitiveValue: publicKey?.algorithm, + }, + { + jvmTypeKind: JvmTypeKind.Primitive, + jvmType: { + fqClassName: "java.lang.String", + }, + primitiveValue: publicKey?.format, + }, + { + jvmTypeKind: JvmTypeKind.Primitive, + jvmType: { + fqClassName: "java.lang.String", + }, + primitiveValue: publicKey?.encoded, + }, + ], + }, + ], + }, + ], + } as unknown) as InvokeContractV1Request; + + const res = await apiClient.invokeContractV1(req); + expect(res).toBeTruthy(); + expect(res.status).toBe(200); + expect(res.data.success).toBeTrue(); +} + +////////////////////////////////// +// Monitor Tests +////////////////////////////////// + +describe("Monitor Tests", () => { + let ledger: CordaTestLedger; + let connector: CordaConnectorContainer; + let apiClient: CordaApiClient; + let partyBPublicKey: PublicKey; + + beforeAll(async () => { + log.info("Prune Docker..."); + await pruneDockerAllIfGithubAction({ logLevel: testLogLevel }); + + ledger = new CordaTestLedger({ + imageName: ledgerImageName, + imageVersion: ledgerImageVersion, + logLevel: testLogLevel, + }); + + const ledgerContainer = await ledger.start(); + expect(ledgerContainer).toBeTruthy(); + log.debug("Corda ledger started..."); + + await ledger.logDebugPorts(); + const partyARpcPort = await ledger.getRpcAPublicPort(); + + const internalIp = (await internalIpV4()) as string; + expect(internalIp).toBeTruthy(); + log.info("Internal IP (based on default gateway):", internalIp); + + const springAppConfig = { + logging: { + level: { + root: "info", + "net.corda": "info", + "org.hyperledger.cactus": sutLogLevel, + }, + }, + cactus: { + sessionExpireMinutes: 10, + corda: { + node: { host: internalIp }, + rpc: { + port: partyARpcPort, + username: partyARpcUsername, + password: partyARpcPassword, + }, + }, + }, + }; + const springApplicationJson = JSON.stringify(springAppConfig); + const envVarSpringAppJson = `SPRING_APPLICATION_JSON=${springApplicationJson}`; + log.debug(envVarSpringAppJson); + + connector = new CordaConnectorContainer({ + logLevel: sutLogLevel, + imageName: kotlinServerImageName, + imageVersion: kotlinServerImageVersion, + envVars: [envVarSpringAppJson], + }); + expect(connector).toBeTruthy(); + + await connector.start(); + await connector.logDebugPorts(); + const apiUrl = await connector.getApiLocalhostUrl(); + + const config = new Configuration({ basePath: apiUrl }); + apiClient = new CordaApiClient(config); + expect(apiClient).toBeTruthy(); + + await deployContract(apiClient, ledger, partyARpcPort, internalIp); + + log.info("Fetching network map for Corda network..."); + const networkMapRes = await apiClient.networkMapV1(); + expect(networkMapRes.data).toBeTruthy(); + + const partyB = networkMapRes.data.find((it) => + it.legalIdentities.some((li) => li.name.organisation === "ParticipantB"), + ); + partyBPublicKey = partyB?.legalIdentities[0].owningKey as PublicKey; + expect(partyBPublicKey).toBeTruthy(); + }); + + afterAll(async () => { + if (ledger) { + await ledger.stop(); + await ledger.destroy(); + } + + if (connector) { + await connector.stop(); + await connector.destroy(); + } + }); + + describe("Low-level StartMonitor and StopMonitor tests", () => { + afterEach(async () => { + // Stop monitor + await apiClient.stopMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + }); + + test("Transactions can be read repeatedly until cleared or monitoring stop", async () => { + // Start monitor + const resMonitor = await apiClient.startMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resMonitor.status).toBe(200); + expect(resMonitor.data.success).toBeTrue(); + + // Get transactions before invoke - should be 0 + const resGetTxPre = await apiClient.getMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resGetTxPre.status).toBe(200); + expect(resGetTxPre.data.stateFullClassName).toEqual(stateToMonitor); + expect(resGetTxPre.data.tx?.length).toBe(0); + + // Invoke transactions + const transactionCount = 3; + for (let i = 0; i < transactionCount; i++) { + await invokeContract(apiClient, partyBPublicKey); + } + + // Get transactions after invoke + const resGetTxPost = await apiClient.getMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resGetTxPost.status).toBe(200); + expect(resGetTxPost.data.stateFullClassName).toEqual(stateToMonitor); + expect(resGetTxPost.data.tx?.length).toBe(transactionCount); + const seenIndexes = new Set(); + resGetTxPost.data.tx?.forEach((tx) => { + expect(tx.index).toBeTruthy(); + // Expect indexes to be unique + expect(seenIndexes).not.toContain(tx.index); + seenIndexes.add(tx.index as string); + expect(tx.data).toBeTruthy(); + }); + + // Get transactions after already reading all current ones - should be the same as before + const resGetTxPostRead = await apiClient.getMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resGetTxPostRead.status).toBe(200); + expect(resGetTxPostRead.data.stateFullClassName).toEqual(stateToMonitor); + expect(resGetTxPostRead.data.tx).toEqual(resGetTxPost.data.tx); + }); + + test("Received transactions can be cleared so they can't be read anymore", async () => { + // Start monitor + const resMonitor = await apiClient.startMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resMonitor.status).toBe(200); + expect(resMonitor.data.success).toBeTrue(); + + // Invoke transactions + const transactionCount = 3; + for (let i = 0; i < transactionCount; i++) { + await invokeContract(apiClient, partyBPublicKey); + } + + // Get transactions after invoke + const resGetTx = await apiClient.getMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resGetTx.status).toBe(200); + expect(resGetTx.data.stateFullClassName).toEqual(stateToMonitor); + expect(resGetTx.data.tx?.length).toBe(transactionCount); + + // Clear seen transactions + const readTxIdx = resGetTx.data.tx?.map((tx) => tx.index); + const resClearTx = await apiClient.clearMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + txIndexes: readTxIdx as string[], + }); + expect(resClearTx.status).toBe(200); + expect(resClearTx.data.success).toBeTrue(); + + // Get transactions after clear - should be 0 + const resGetTxPostClear = await apiClient.getMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resGetTxPostClear.status).toBe(200); + expect(resGetTxPostClear.data.stateFullClassName).toEqual(stateToMonitor); + expect(resGetTxPostClear.data.tx?.length).toBe(0); + }); + + test("Sending startMonitor repeatedly doesn't affect monitor results", async () => { + // Start monitor + const resMonitor = await apiClient.startMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resMonitor.status).toBe(200); + expect(resMonitor.data.success).toBeTrue(); + + // Invoke first transactions + const firstTransactionCount = 3; + for (let i = 0; i < firstTransactionCount; i++) { + await invokeContract(apiClient, partyBPublicKey); + } + + // Start monitor once again + const resMonitorAgain = await apiClient.startMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resMonitorAgain.status).toBe(200); + expect(resMonitorAgain.data.success).toBeTrue(); + + // Invoke second transactions + const secondTransactionCount = 3; + for (let i = 0; i < secondTransactionCount; i++) { + await invokeContract(apiClient, partyBPublicKey); + } + + // Get final transactions + const resGetTx = await apiClient.getMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resGetTx.status).toBe(200); + expect(resGetTx.data.stateFullClassName).toEqual(stateToMonitor); + expect(resGetTx.data.tx?.length).toEqual( + firstTransactionCount + secondTransactionCount, + ); + }); + + test("Monitoring restart after previous stop works", async () => { + // Start monitor + const resMonitor = await apiClient.startMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resMonitor.status).toBe(200); + expect(resMonitor.data.success).toBeTrue(); + + // Invoke transactions + const transactionCount = 3; + for (let i = 0; i < transactionCount; i++) { + await invokeContract(apiClient, partyBPublicKey); + } + + // Stop Monitor + const resStopMonitor = await apiClient.stopMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resStopMonitor.status).toBe(200); + expect(resStopMonitor.data.success).toBeTrue(); + + // Restart Monitor + const resMonitorRestart = await apiClient.startMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resMonitorRestart.status).toBe(200); + expect(resMonitorRestart.data.success).toBeTrue(); + + // Invoke transactions after restart + const transactionCountAfterRestart = 2; + for (let i = 0; i < transactionCountAfterRestart; i++) { + await invokeContract(apiClient, partyBPublicKey); + } + + // Get transactions should return only new transactions + const resGetTxPostRestart = await apiClient.getMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resGetTxPostRestart.status).toBe(200); + expect(resGetTxPostRestart.data.stateFullClassName).toEqual( + stateToMonitor, + ); + expect(resGetTxPostRestart.data.tx?.length).toBe( + transactionCountAfterRestart, + ); + }); + + test("Monitor returns only transactions after monitor was started, not previous ones", async () => { + // Invoke initial transactions + const transactionCountFirst = 5; + for (let i = 0; i < transactionCountFirst; i++) { + await invokeContract(apiClient, partyBPublicKey); + } + + // Start monitor + const resMonitor = await apiClient.startMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resMonitor.status).toBe(200); + expect(resMonitor.data.success).toBeTrue(); + + // Invoke transactions after start + const transactionCountAfterStart = 2; + for (let i = 0; i < transactionCountAfterStart; i++) { + await invokeContract(apiClient, partyBPublicKey); + } + + // Get transactions + const resGetTx = await apiClient.getMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resGetTx.status).toBe(200); + expect(resGetTx.data.stateFullClassName).toEqual(stateToMonitor); + expect(resGetTx.data.tx?.length).toBe(transactionCountAfterStart); + }); + + test("Start monitoring with unknown state returns error", async () => { + const unknownState = "foo.bar.non.existent"; + + // Start monitor + const resMonitor = await apiClient.startMonitorV1({ + clientAppId: testAppId, + stateFullClassName: unknownState, + }); + expect(resMonitor.status).toBe(200); + expect(resMonitor.data.success).toBeFalse(); + expect(resMonitor.data.msg).toContain(unknownState); + }); + + test("Stop monitoring with unknown state does nothing and returns success", async () => { + // Stop monitor + const resStopMon = await apiClient.stopMonitorV1({ + clientAppId: testAppId, + stateFullClassName: "foo.bar.non.existent", + }); + expect(resStopMon.status).toBe(200); + expect(resStopMon.data.success).toBeTrue(); + }); + + test("Reading / clearing transactions without monitor running returns an error", async () => { + // Get transactions before start monitor + const resGet = await apiClient.getMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resGet.status).toBe(200); + expect(resGet.data.success).toBeFalse(); + expect(resGet.data.msg).toBeTruthy(); + expect(resGet.data.stateFullClassName).toBeFalsy(); + expect(resGet.data.tx).toBeFalsy(); + + // Clear transactions before start monitor + const resClear = await apiClient.clearMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + txIndexes: ["1", "2"], + }); + expect(resClear.status).toBe(200); + expect(resClear.data.success).toBeFalse(); + expect(resClear.data.msg).toBeTruthy(); + }); + + test("Reading / clearing unknown state returns an error", async () => { + // Start monitor + const resMonitor = await apiClient.startMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resMonitor.status).toBe(200); + expect(resMonitor.data.success).toBeTrue(); + + // Get transactions of unknown state + const resGet = await apiClient.getMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: "foo.bar.non.existent", + }); + expect(resGet.status).toBe(200); + expect(resGet.data.success).toBeFalse(); + expect(resGet.data.msg).toBeTruthy(); + expect(resGet.data.stateFullClassName).toBeFalsy(); + expect(resGet.data.tx).toBeFalsy(); + + // Clear transactions of unknown state + const resClear = await apiClient.clearMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: "foo.bar.non.existent", + txIndexes: ["1", "2"], + }); + expect(resClear.status).toBe(200); + expect(resClear.data.msg).toBeTruthy(); + expect(resClear.data.success).toBeFalse(); + }); + }); + + describe("Multiple clients tests", () => { + const anotherAppId = "anotherTestApp"; + + afterEach(async () => { + // Stop Monitors + await apiClient.stopMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + + await apiClient.stopMonitorV1({ + clientAppId: anotherAppId, + stateFullClassName: stateToMonitor, + }); + }); + + test("State change can be read by all listening clients separately", async () => { + // Start monitor for first client + const resMonitorFirst = await apiClient.startMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resMonitorFirst.status).toBe(200); + expect(resMonitorFirst.data.success).toBeTrue(); + + // Start monitor for second client + const resMonitorAnother = await apiClient.startMonitorV1({ + clientAppId: anotherAppId, + stateFullClassName: stateToMonitor, + }); + expect(resMonitorAnother.status).toBe(200); + expect(resMonitorAnother.data.success).toBeTrue(); + + // Invoke transactions + const transactionCountAfterStart = 3; + for (let i = 0; i < transactionCountAfterStart; i++) { + await invokeContract(apiClient, partyBPublicKey); + } + + // Get transactions for first client + const resGetTxFirstClient = await apiClient.getMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resGetTxFirstClient.status).toBe(200); + expect(resGetTxFirstClient.data.stateFullClassName).toEqual( + stateToMonitor, + ); + expect(resGetTxFirstClient.data.tx?.length).toBe( + transactionCountAfterStart, + ); + + // Clear transactions seen by the first client + const readTxIdx = resGetTxFirstClient.data.tx?.map((tx) => tx.index); + const resClearTx = await apiClient.clearMonitorTransactionsV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + txIndexes: readTxIdx as string[], + }); + expect(resClearTx.status).toBe(200); + expect(resClearTx.data.success).toBeTrue(); + + // Get transactions for second client - should have all transactions available + const resGetTxSecondClient = await apiClient.getMonitorTransactionsV1({ + clientAppId: anotherAppId, + stateFullClassName: stateToMonitor, + }); + expect(resGetTxSecondClient.status).toBe(200); + expect(resGetTxSecondClient.data.stateFullClassName).toEqual( + stateToMonitor, + ); + expect(resGetTxSecondClient.data.tx?.length).toBe( + transactionCountAfterStart, + ); + }); + + test("State change unsubscribe doesn't affect other client monitors", async () => { + // Start monitor for first client + const resMonitorFirst = await apiClient.startMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + expect(resMonitorFirst.status).toBe(200); + expect(resMonitorFirst.data.success).toBeTrue(); + + // Start monitor for second client + const resMonitorAnother = await apiClient.startMonitorV1({ + clientAppId: anotherAppId, + stateFullClassName: stateToMonitor, + }); + expect(resMonitorAnother.status).toBe(200); + expect(resMonitorAnother.data.success).toBeTrue(); + + // Invoke transactions + const transactionCountAfterStart = 3; + for (let i = 0; i < transactionCountAfterStart; i++) { + await invokeContract(apiClient, partyBPublicKey); + } + + // Stop first client monitoring + await apiClient.stopMonitorV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + }); + + // Invoke transactions for second client only + const transactionCountOnlySecondClient = 4; + for (let i = 0; i < transactionCountOnlySecondClient; i++) { + await invokeContract(apiClient, partyBPublicKey); + } + + // Get transactions for second client + const resGetTxSecondClient = await apiClient.getMonitorTransactionsV1({ + clientAppId: anotherAppId, + stateFullClassName: stateToMonitor, + }); + expect(resGetTxSecondClient.status).toBe(200); + expect(resGetTxSecondClient.data.stateFullClassName).toEqual( + stateToMonitor, + ); + expect(resGetTxSecondClient.data.tx?.length).toBe( + transactionCountAfterStart + transactionCountOnlySecondClient, + ); + }); + }); + + describe("watchBlocks tests", () => { + // watchBlocks tests are async, don't wait so long if something goes wrong + const watchBlockTestTimeout = 5 * 60 * 1000; // 5 minutes + + test( + "watchBlocksAsyncV1 reports all transactions", + async () => { + const transactionCount = 10; + + const observable = await apiClient.watchBlocksAsyncV1({ + clientAppId: testAppId, + stateFullClassName: stateToMonitor, + pollRate: 1000, + }); + + let sub: Subscription | undefined; + const monitor = new Promise((resolve, reject) => { + let transactionsReceived = 0; + + sub = observable.subscribe({ + next(tx) { + let error: string | undefined; + + log.debug("Received transaction from monitor:", tx); + + if (tx.index === undefined || !tx.data) { + error = `Wrong transaction format - idx ${tx.index} data ${tx.data}`; + } + + transactionsReceived++; + + if (error) { + log.error(error); + reject(error); + } + + if (transactionsReceived === transactionCount) { + log.info(`Read all ${transactionCount} transactions - OK`); + resolve(); + } + }, + error(err) { + log.error("watchBlocksAsyncV1 failed:", err); + reject(err); + }, + }); + }).finally(() => sub?.unsubscribe()); + + // Invoke transactions + for (let i = 0; i < transactionCount; i++) { + invokeContract(apiClient, partyBPublicKey); + } + + await monitor; + }, + watchBlockTestTimeout, + ); + + test( + "Running watchBlocksAsyncV1 with unknown state report an error on rxjs subject", + async () => { + const observable = await apiClient.watchBlocksAsyncV1({ + clientAppId: testAppId, + stateFullClassName: "foo.bar.unknown", + }); + + let sub: Subscription | undefined; + await new Promise((resolve, reject) => { + sub = observable.subscribe({ + next() { + reject("Monitor reported new transaction when it should fail."); + }, + error(err) { + log.info("watchBlocksAsyncV1 error reported as expected:", err); + resolve(); + }, + }); + }).finally(() => sub?.unsubscribe()); + }, + watchBlockTestTimeout, + ); + }); +}); diff --git a/packages/cactus-verifier-client/package.json b/packages/cactus-verifier-client/package.json index b076aa1abc..dcc85f760a 100644 --- a/packages/cactus-verifier-client/package.json +++ b/packages/cactus-verifier-client/package.json @@ -54,6 +54,7 @@ "@hyperledger/cactus-core-api": "1.0.0", "@hyperledger/cactus-plugin-ledger-connector-besu": "1.0.0", "@hyperledger/cactus-plugin-ledger-connector-quorum": "1.0.0", + "@hyperledger/cactus-plugin-ledger-connector-corda": "1.0.0", "jest-extended": "0.11.5", "rxjs": "7.3.0" } diff --git a/packages/cactus-verifier-client/src/main/typescript/get-validator-api-client.ts b/packages/cactus-verifier-client/src/main/typescript/get-validator-api-client.ts index ac346b4c3d..e4ea053f22 100644 --- a/packages/cactus-verifier-client/src/main/typescript/get-validator-api-client.ts +++ b/packages/cactus-verifier-client/src/main/typescript/get-validator-api-client.ts @@ -20,6 +20,11 @@ import { QuorumApiClientOptions, } from "@hyperledger/cactus-plugin-ledger-connector-quorum"; +import { + CordaApiClient, + CordaApiClientOptions, +} from "@hyperledger/cactus-plugin-ledger-connector-corda"; + /** * Configuration of ApiClients currently supported by Verifier and VerifierFactory * Each entry key defines the name of the connection type that has to be specified in VerifierFactory config. @@ -43,6 +48,10 @@ export type ClientApiConfig = { in: QuorumApiClientOptions; out: QuorumApiClient; }; + CORDA_4X: { + in: CordaApiClientOptions; + out: CordaApiClient; + }; }; /** @@ -66,6 +75,8 @@ export function getValidatorApiClient( return new BesuApiClient(options as BesuApiClientOptions); case "QUORUM_2X": return new QuorumApiClient(options as QuorumApiClientOptions); + case "CORDA_4X": + return new CordaApiClient(options as CordaApiClientOptions); default: // Will not compile if any ClientApiConfig key was not handled by this switch const _: never = validatorType; diff --git a/packages/cactus-verifier-client/src/main/typescript/verifier.ts b/packages/cactus-verifier-client/src/main/typescript/verifier.ts index ed60323cbb..2cbf18304c 100644 --- a/packages/cactus-verifier-client/src/main/typescript/verifier.ts +++ b/packages/cactus-verifier-client/src/main/typescript/verifier.ts @@ -72,59 +72,63 @@ export class Verifier> * * @todo Change return type from Promise to void, this method is already async by design. */ - startMonitor( + async startMonitor( appId: string, options: Record, eventListener: IVerifierEventListener< BlockTypeFromSocketApi >, ): Promise { - return new Promise((resolve) => { - if (!this.ledgerApi.watchBlocksV1) { - throw new Error("startMonitor not supported on this ledger"); - } + if (!(this.ledgerApi.watchBlocksV1 || this.ledgerApi.watchBlocksAsyncV1)) { + throw new Error("startMonitor not supported on this ledger"); + } - if (this.runningMonitors.has(appId)) { - throw new Error(`Monitor with appId '${appId}' is already running!`); - } + if (this.runningMonitors.has(appId)) { + throw new Error(`Monitor with appId '${appId}' is already running!`); + } - this.log.debug("call : startMonitor appId =", appId); - - try { - const blocksObservable = this.ledgerApi.watchBlocksV1(options); - - const watchBlocksSub = blocksObservable.subscribe({ - next: (blockData: unknown) => { - const event = { - id: "", - verifierId: this.verifierID, - data: blockData as BlockTypeFromSocketApi, - }; - eventListener.onEvent(event); - }, - error: (err) => { - this.log.error("Error when watching for new blocks, err:", err); - if (eventListener.onError) { - eventListener.onError(err); - } - }, - complete: () => { - this.log.info("Watch completed"); - }, - }); - - this.runningMonitors.set(appId, watchBlocksSub); - this.log.debug( - "New monitor added, runningMonitors.size ==", - this.runningMonitors.size, + this.log.debug("call : startMonitor appId =", appId); + + try { + const blocksObservable = this.ledgerApi.watchBlocksV1 + ? this.ledgerApi.watchBlocksV1(options) + : await this.ledgerApi.watchBlocksAsyncV1?.(options); + + if (!blocksObservable) { + throw new Error( + "Could not get a valid blocks observable in startMonitor", ); - } catch (err) { - this.log.error(`##Error: startMonitor, ${err}`); - this.runningMonitors.delete(appId); } - resolve(); - }); + const watchBlocksSub = blocksObservable.subscribe({ + next: (blockData: unknown) => { + const event = { + id: "", + verifierId: this.verifierID, + data: blockData as BlockTypeFromSocketApi, + }; + eventListener.onEvent(event); + }, + error: (err) => { + this.log.error("Error when watching for new blocks, err:", err); + if (eventListener.onError) { + eventListener.onError(err); + } + }, + complete: () => { + this.log.info("Watch completed"); + }, + }); + + this.runningMonitors.set(appId, watchBlocksSub); + this.log.debug( + "New monitor added, runningMonitors.size ==", + this.runningMonitors.size, + ); + } catch (err) { + this.log.error(`##Error: startMonitor, ${err}`); + this.runningMonitors.delete(appId); + } } /** @@ -158,18 +162,16 @@ export class Verifier> * * @todo Change return type from Promise to void, this method is already async by design. */ - sendAsyncRequest( + async sendAsyncRequest( contract: Record, method: Record, args: any, ): Promise { - return new Promise((resolve) => { - if (!this.ledgerApi.sendAsyncRequest) { - throw new Error("sendAsyncRequest not supported on this ledger"); - } + if (!this.ledgerApi.sendAsyncRequest) { + throw new Error("sendAsyncRequest not supported on this ledger"); + } - resolve(this.ledgerApi.sendAsyncRequest(contract, method, args)); - }); + return this.ledgerApi.sendAsyncRequest(contract, method, args); } /** @@ -179,15 +181,13 @@ export class Verifier> * @param args - arguments. * @returns Promise that will resolve with response from the ledger, or reject when error occurred. */ - sendSyncRequest( + async sendSyncRequest( contract: Record, method: Record, args: any, ): Promise { if (!this.ledgerApi.sendSyncRequest) { - return new Promise(() => { - throw new Error("sendSyncRequest not supported on this ledger"); - }); + throw new Error("sendSyncRequest not supported on this ledger"); } return this.ledgerApi.sendSyncRequest(contract, method, args); diff --git a/tools/docker/corda-all-in-one/corda-v4_8/Dockerfile b/tools/docker/corda-all-in-one/corda-v4_8/Dockerfile index 4370c2f85a..549f1bb274 100644 --- a/tools/docker/corda-all-in-one/corda-v4_8/Dockerfile +++ b/tools/docker/corda-all-in-one/corda-v4_8/Dockerfile @@ -1,6 +1,8 @@ FROM docker:20.10.2-dind -ARG SAMPLES_KOTLIN_SHA=30fd841dd035934bae75ab8910da3b6e3d5d6ee7 +# cordaVersion=4.8.5 +# cordaCoreVersion=4.8.5 +ARG SAMPLES_KOTLIN_SHA=1504878ce446555bd861bbe4dd3d1154e905a07f ARG SAMPLES_KOTLIN_CORDAPP_SUB_DIR_PATH="./Advanced/obligation-cordapp/" ARG CORDA_TOOLS_SHELL_CLI_VERSION=4.8