Skip to content

Commit

Permalink
feat(corda4): implement monitoring of state changes
Browse files Browse the repository at this point in the history
Add new endpoints to corda kotlin server used to start and stop
monitoring, and get/clean state changes from its internal buffer. Add
reactive watchBlocksV1 that polls kotlin server and reports new
transactions asynchronously. Add CordaApiClient support to
VerifierClient. Add functional test for both monitoring interfaces.
Update corda setup in corda-all-in-one to newer version.

Closes: hyperledger-cacti#1610
Signed-off-by: Michal Bajer <michal.bajer@fujitsu.com>
  • Loading branch information
outSH committed Apr 1, 2022
1 parent b8be5bd commit 691726e
Show file tree
Hide file tree
Showing 24 changed files with 2,066 additions and 4 deletions.
3 changes: 2 additions & 1 deletion .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@
"uuidv",
"vscc",
"wasm",
"Xdai"
"Xdai",
"outsh"
],
"dictionaries": [
"typescript,node,npm,go,rust"
Expand Down
21 changes: 21 additions & 0 deletions packages/cactus-plugin-ledger-connector-corda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,27 @@ const res = await apiClient.invokeContractV1({
});
```

### Transaction Monitoring
- There are two interfaces to monitor changes of vault states - reactive `watchBlocksV1` method, and low-level HTTP API calls.
- Note: The monitoring APIs are implemented only on kotlin-server connector (`main-server`), not typescript connector!
- For usage examples review the functional test file: `packages/cactus-plugin-ledger-connector-corda/src/test/typescript/integration/monitor-transactions-v4.8.test.ts`

#### watchBlocksV1
- `watchBlocksV1(options: watchBlocksV1Options): Observable<CordaBlock>`
- Reactive (RxJS) interface to observe state changes.
- Internally, it uses polling of low-level HTTP APIs.
- Options:
- `stateFullClassName: string`: state to monitor.
- `pollRate?: number`: how often poll the kotlin server for changes (default 5 seconds).

#### Low-level HTTP API
- These should not be used when watchBlocks API is sufficient.
- Consists of the following methods:
- `startMonitorV1`: Start monitoring for specified state changes. All changes after calling this function will be stored in internal kotlin-server buffer, ready to be read by calls to `GetMonitorTransactionsV1`. Transactions occuring before the call to startMonitorV1 will not be reported.
- `GetMonitorTransactionsV1`: Read all transactions for given state name still remaining in internal buffer.
- `ClearMonitorTransactionsV1`: Remove transaction for given state name with specified index number from internal buffer. Should be used to acknowledge receiving specified transactions in user code, so that transactions are not reported multiple times.
- `stopMonitorV1`: Don't watch for transactions changes anymore, remove any transactions that were not read until now.

### Custom Configuration via Env Variables

```json
Expand Down
1 change: 1 addition & 0 deletions packages/cactus-plugin-ledger-connector-corda/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"joi": "17.4.2",
"node-ssh": "12.0.0",
"prom-client": "13.2.0",
"rxjs": "7.3.0",
"temp": "0.9.4",
"typescript-optional": "2.0.1"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ settings.gradle
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCorda.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCordaService.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiUtil.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/ClearMonitorTransactionsV1Request.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/ClearMonitorTransactionsV1Response.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/CordaNodeSshCredentials.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/CordaRpcCredentials.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/CordaX500Name.kt
Expand All @@ -15,6 +17,9 @@ src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/mode
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/DiagnoseNodeV1Request.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/DiagnoseNodeV1Response.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/FlowInvocationType.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1Request.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1Response.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1ResponseTx.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/InvokeContractV1Request.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/InvokeContractV1Response.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/JarFile.kt
Expand All @@ -29,5 +34,9 @@ src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/mode
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/Party.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/PublicKey.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/SHA256.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StartMonitorV1Request.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StartMonitorV1Response.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StopMonitorV1Request.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StopMonitorV1Response.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/X500Principal.kt
src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package org.hyperledger.cactus.plugin.ledger.connector.corda.server.api

import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ClearMonitorTransactionsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ClearMonitorTransactionsV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsBadRequestV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsSuccessV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DiagnoseNodeV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DiagnoseNodeV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.InvokeContractV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.InvokeContractV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ListFlowsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ListFlowsV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.NodeInfo
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StartMonitorV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StartMonitorV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StopMonitorV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StopMonitorV1Response
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
import org.springframework.http.ResponseEntity
Expand Down Expand Up @@ -37,6 +45,17 @@ import kotlin.collections.Map
class ApiPluginLedgerConnectorCordaController(@Autowired(required = true) val service: ApiPluginLedgerConnectorCordaService) {


@DeleteMapping(
value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/clear-monitor-transactions"],
produces = ["application/json"],
consumes = ["application/json"]
)
fun clearMonitorTransactionsV1( @Valid @RequestBody(required = false) clearMonitorTransactionsV1Request: ClearMonitorTransactionsV1Request?
): ResponseEntity<ClearMonitorTransactionsV1Response> {
return ResponseEntity(service.clearMonitorTransactionsV1(clearMonitorTransactionsV1Request), HttpStatus.valueOf(200))
}


@PostMapping(
value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/deploy-contract-jars"],
produces = ["application/json"],
Expand All @@ -59,6 +78,17 @@ class ApiPluginLedgerConnectorCordaController(@Autowired(required = true) val se
}


@GetMapping(
value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/get-monitor-transactions"],
produces = ["application/json"],
consumes = ["application/json"]
)
fun getMonitorTransactionsV1( @Valid @RequestBody(required = false) getMonitorTransactionsV1Request: GetMonitorTransactionsV1Request?
): ResponseEntity<GetMonitorTransactionsV1Response> {
return ResponseEntity(service.getMonitorTransactionsV1(getMonitorTransactionsV1Request), HttpStatus.valueOf(200))
}


@GetMapping(
value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/get-prometheus-exporter-metrics"],
produces = ["text/plain"]
Expand Down Expand Up @@ -99,4 +129,26 @@ class ApiPluginLedgerConnectorCordaController(@Autowired(required = true) val se
): ResponseEntity<List<NodeInfo>> {
return ResponseEntity(service.networkMapV1(body), HttpStatus.valueOf(200))
}


@PostMapping(
value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/start-monitor"],
produces = ["application/json"],
consumes = ["application/json"]
)
fun startMonitorV1( @Valid @RequestBody(required = false) startMonitorV1Request: StartMonitorV1Request?
): ResponseEntity<StartMonitorV1Response> {
return ResponseEntity(service.startMonitorV1(startMonitorV1Request), HttpStatus.valueOf(200))
}


@DeleteMapping(
value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/stop-monitor"],
produces = ["application/json"],
consumes = ["application/json"]
)
fun stopMonitorV1( @Valid @RequestBody(required = false) stopMonitorV1Request: StopMonitorV1Request?
): ResponseEntity<StopMonitorV1Response> {
return ResponseEntity(service.stopMonitorV1(stopMonitorV1Request), HttpStatus.valueOf(200))
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,43 @@
package org.hyperledger.cactus.plugin.ledger.connector.corda.server.api

import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ClearMonitorTransactionsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ClearMonitorTransactionsV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsBadRequestV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsSuccessV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DiagnoseNodeV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DiagnoseNodeV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.InvokeContractV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.InvokeContractV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ListFlowsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ListFlowsV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.NodeInfo
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StartMonitorV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StartMonitorV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StopMonitorV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StopMonitorV1Response

interface ApiPluginLedgerConnectorCordaService {

fun clearMonitorTransactionsV1(clearMonitorTransactionsV1Request: ClearMonitorTransactionsV1Request?): ClearMonitorTransactionsV1Response

fun deployContractJarsV1(deployContractJarsV1Request: DeployContractJarsV1Request?): DeployContractJarsSuccessV1Response

fun diagnoseNodeV1(diagnoseNodeV1Request: DiagnoseNodeV1Request?): DiagnoseNodeV1Response

fun getMonitorTransactionsV1(getMonitorTransactionsV1Request: GetMonitorTransactionsV1Request?): GetMonitorTransactionsV1Response

fun getPrometheusMetricsV1(): kotlin.String

fun invokeContractV1(invokeContractV1Request: InvokeContractV1Request?): InvokeContractV1Response

fun listFlowsV1(listFlowsV1Request: ListFlowsV1Request?): ListFlowsV1Response

fun networkMapV1(body: kotlin.Any?): List<NodeInfo>

fun startMonitorV1(startMonitorV1Request: StartMonitorV1Request?): StartMonitorV1Response

fun stopMonitorV1(stopMonitorV1Request: StopMonitorV1Request?): StopMonitorV1Response
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import java.lang.Exception
import java.lang.RuntimeException
import java.util.*
import java.util.concurrent.TimeUnit
import java.math.BigInteger
import kotlin.IllegalArgumentException
import net.corda.core.contracts.ContractState
import rx.Subscription

// TODO Look into this project for powering the connector of ours:
// https://github.com/180Protocol/codaptor
Expand All @@ -40,6 +43,9 @@ class ApiPluginLedgerConnectorCordaServiceImpl(
val rpc: NodeRPCConnection
) : ApiPluginLedgerConnectorCordaService {

val watchedTransactionsBuffer = mutableMapOf<String, MutableList<GetMonitorTransactionsV1ResponseTx>>()
val watchedTransactionsSubs = mutableMapOf<String, Subscription>()

companion object {
val logger = loggerFor<ApiPluginLedgerConnectorCordaServiceImpl>()

Expand Down Expand Up @@ -109,7 +115,7 @@ class ApiPluginLedgerConnectorCordaServiceImpl(
"requiredSigningKeys" to returnValue.requiredSigningKeys,
"sigs" to returnValue.sigs
);

} else if (returnValue != null) {
callOutput = try {
val returnValueJson = writer.writeValueAsString(returnValue);
Expand Down Expand Up @@ -346,4 +352,108 @@ class ApiPluginLedgerConnectorCordaServiceImpl(
logger.info("Returning {} NodeInfo elements in response.", nodeInfoList.size)
return nodeInfoList
}

/**
* Start monitoring state changes for stateClass specified in request body.
*/
override fun startMonitorV1(req: StartMonitorV1Request?): StartMonitorV1Response {
val stateName = req?.stateFullClassName

if (stateName.isNullOrEmpty()) {
// Missing input class name
logger.info("Request rejected because missing state class name")
return StartMonitorV1Response(false)
}

if (this.watchedTransactionsSubs.containsKey(stateName)) {
// Monitor already started
logger.info("Monitoring already started - returning true")
return StartMonitorV1Response(true)
}

@Suppress("UNCHECKED_CAST")
val contractState = jsonJvmObjectDeserializer.getOrInferType(stateName) as Class<out ContractState>

// FIXME: "valutTrack(xxx).updates" occurs an error if Corda ledger has already over 200 transactions using the stateName that you set above.
// Please refer to "https://r3-cev.atlassian.net/browse/CORDA-2956" to get more infomation.
val stateObservable = this.rpc.proxy.vaultTrack(contractState).updates
var txCounter = BigInteger.valueOf(0)
this.watchedTransactionsBuffer[stateName] = mutableListOf<GetMonitorTransactionsV1ResponseTx>()
this.watchedTransactionsSubs[stateName] = stateObservable.subscribe { update ->
update.produced.forEach { newTx ->
val txResponse = GetMonitorTransactionsV1ResponseTx(txCounter.toString(), newTx.toString())
txCounter = txCounter.add(BigInteger.valueOf(1))
logger.debug("Pushing new transaction for state '{}', index {}", stateName, txCounter)
this.watchedTransactionsBuffer[stateName]?.add(txResponse)
}
}

logger.info("Monitoring for state '{}' started.", stateName)
return StartMonitorV1Response(true)
}

/**
* Read all transactions that were not read by any consumer yet.
* Must be called after startMonitorV1 and before stopMonitorV1.
* Transactions buffer must be explicitly cleared with clearMonitorTransactionsV1
*/
override fun getMonitorTransactionsV1(req: GetMonitorTransactionsV1Request?): GetMonitorTransactionsV1Response {
val stateName = req?.stateFullClassName

if (stateName.isNullOrEmpty()) {
// Missing input class name
logger.info("Request rejected because missing state class name")
return GetMonitorTransactionsV1Response("", listOf())
}

val transactions = this.watchedTransactionsBuffer.getOrElse(stateName) { listOf<GetMonitorTransactionsV1ResponseTx>() }
logger.debug("Returning {} transaction to the caller", transactions.size)
return GetMonitorTransactionsV1Response(stateName, transactions)
}

/**
* Clear monitored transactions based on index from internal buffer.
* Any future call to getMonitorTransactionsV1 will not return transactions removed by this call.
*/
override fun clearMonitorTransactionsV1(req: ClearMonitorTransactionsV1Request?): ClearMonitorTransactionsV1Response {
val stateName = req?.stateFullClassName
if (stateName.isNullOrEmpty()) {
// Missing input class name
logger.info("Request rejected because missing state class name")
return ClearMonitorTransactionsV1Response(false)
}

val indexesToRemove = req?.txIndexes
if (indexesToRemove.isNullOrEmpty()) {
logger.info("No indexes to remove")
return ClearMonitorTransactionsV1Response(true)
}

val transactions = this.watchedTransactionsBuffer.getOrElse(stateName) { mutableListOf<GetMonitorTransactionsV1ResponseTx>() }
logger.debug("Transactions before remove", transactions.size)
transactions.removeAll { it.index in indexesToRemove }
logger.debug("Transactions after remove", transactions.size)
return ClearMonitorTransactionsV1Response(true)
}

/**
* Stop monitoring state changes for stateClass specified in request body.
* Removes all transactions that were not read yet, unsubscribes from the monitor.
*/
override fun stopMonitorV1(req: StopMonitorV1Request?): StopMonitorV1Response {
val stateName = req?.stateFullClassName

if (stateName.isNullOrEmpty()) {
// Missing input class name
logger.info("Request rejected because missing state class name")
return StopMonitorV1Response(false)
}

this.watchedTransactionsSubs[stateName]?.unsubscribe()
this.watchedTransactionsSubs.remove(stateName)
this.watchedTransactionsBuffer.remove(stateName)
logger.info("Monitoring for state '{}' stopped.", stateName)

return StopMonitorV1Response(true)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.hyperledger.cactus.plugin.ledger.connector.corda.server.model

import java.util.Objects
import com.fasterxml.jackson.annotation.JsonProperty
import javax.validation.constraints.DecimalMax
import javax.validation.constraints.DecimalMin
import javax.validation.constraints.Max
import javax.validation.constraints.Min
import javax.validation.constraints.NotNull
import javax.validation.constraints.Pattern
import javax.validation.constraints.Size
import javax.validation.Valid

/**
*
* @param stateFullClassName The fully qualified name of the Corda state to monitor
* @param txIndexes
*/
data class ClearMonitorTransactionsV1Request(

@get:Size(min=1,max=1024)
@field:JsonProperty("stateFullClassName", required = true) val stateFullClassName: kotlin.String,

@field:JsonProperty("txIndexes") val txIndexes: kotlin.collections.List<kotlin.String>? = null
) {

}

Loading

0 comments on commit 691726e

Please sign in to comment.