Skip to content

Commit

Permalink
feat(corda4): implement monitoring of state changes
Browse files Browse the repository at this point in the history
Add new endpoints to corda kotlin server used to start and stop
monitoring, and get/clean state changes from its internal buffer.
Monitoring session is started separately for each client, each client
can monitor many corda states. Clients that are not active for specified
amount of time are removed. Implementation uses transaction queues that
are polled periodically by the client, instead of socketio-based
solution because we want to minimize probability of losing a transaction
from corda. This can happen when WS connection is disconnected, for
instance. With transaction queue on the connector, and explicit
get/remove calls from the client, we have greater control over what
transactions are delivered to the client code. Also, setting up socketio
on spring boot seems overly complicated and would obscurificate the
implementation.

Add reactive watchBlocksV1 that polls kotlin server and
reports new transactions asynchronously. Add CordaApiClient support to
VerifierClient.

Add functional test for both monitoring interfaces.
Update corda setup in corda-all-in-one to newer version.

Closes: #1610

Signed-off-by: Michal Bajer <michal.bajer@fujitsu.com>
  • Loading branch information
outSH authored and petermetz committed Apr 26, 2022
1 parent dda3f00 commit 865ec2f
Show file tree
Hide file tree
Showing 28 changed files with 2,706 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ export interface ISocketApiClient<BlockType> {
watchBlocksV1?(
monitorOptions?: Record<string, unknown>,
): Observable<BlockType>;

watchBlocksAsyncV1?(
monitorOptions?: Record<string, unknown>,
): Promise<Observable<BlockType>>;
}
31 changes: 31 additions & 0 deletions packages/cactus-plugin-ledger-connector-corda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,37 @@ const res = await apiClient.invokeContractV1({
});
```

### Transaction Monitoring
- There are two interfaces to monitor changes of vault states - reactive `watchBlocksV1` method, and low-level HTTP API calls.
- Note: The monitoring APIs are implemented only on kotlin-server connector (`main-server`), not typescript connector!
- For usage examples review the functional test file: `packages/cactus-plugin-ledger-connector-corda/src/test/typescript/integration/monitor-transactions-v4.8.test.ts`
- Because transactions read from corda are stored on the connector, they will be lost if connector is closed/killed before transaction were read by the clients.
- Each client has own set of state monitors that are managed independently. After starting the monitoring, each new transaction is queued on the connector until read and explicitly cleared by `watchBlocksV1` or direct HTTP API call.
- Client monitors can be periodically removed by the connector, if there was no action from the client for specified amount of time.
- Client expiration delay can be configured with `cactus.sessionExpireMinutes` option. It default to 30 minutes.
- Each transaction has own index assigned by the corda connector. Index is unique for each client monitoring session. For instance:
- Stopping monitoring for given state will reset the transaction index counter for given client. After restart, it will report first transaction with index 0.
- Each client can see tha same transaction with different index.
- Index can be used to determine the transaction order for given client session.

#### watchBlocksV1
- `watchBlocksV1(options: watchBlocksV1Options): Observable<CordaBlock>`
- Reactive (RxJS) interface to observe state changes.
- Internally, it uses polling of low-level HTTP APIs.
- Watching block should return each block at least once, no blocks should be missed after startMonitor has started. The only case when transaction is lost is when connector we were connected to died.
- Transactions can be duplicated in case internal `ClearMonitorTransactionsV1` call was not successful (for instance, because of connection problems).
- Options:
- `stateFullClassName: string`: state to monitor.
- `pollRate?: number`: how often poll the kotlin server for changes (default 5 seconds).

#### Low-level HTTP API
- These should not be used when watchBlocks API is sufficient.
- Consists of the following methods:
- `startMonitorV1`: Start monitoring for specified state changes. All changes after calling this function will be stored in internal kotlin-server buffer, ready to be read by calls to `GetMonitorTransactionsV1`. Transactions occuring before the call to startMonitorV1 will not be reported.
- `GetMonitorTransactionsV1`: Read all transactions for given state name still remaining in internal buffer.
- `ClearMonitorTransactionsV1`: Remove transaction for given state name with specified index number from internal buffer. Should be used to acknowledge receiving specified transactions in user code, so that transactions are not reported multiple times.
- `stopMonitorV1`: Don't watch for transactions changes anymore, remove any transactions that were not read until now.

### Custom Configuration via Env Variables

```json
Expand Down
1 change: 1 addition & 0 deletions packages/cactus-plugin-ledger-connector-corda/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"joi": "17.4.2",
"node-ssh": "12.0.0",
"prom-client": "13.2.0",
"rxjs": "7.3.0",
"temp": "0.9.4",
"typescript-optional": "2.0.1"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ settings.gradle
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCorda.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiPluginLedgerConnectorCordaService.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/api/ApiUtil.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/ClearMonitorTransactionsV1Request.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/ClearMonitorTransactionsV1Response.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/CordaNodeSshCredentials.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/CordaRpcCredentials.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/CordaX500Name.kt
Expand All @@ -15,6 +17,9 @@ src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/mode
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/DiagnoseNodeV1Request.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/DiagnoseNodeV1Response.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/FlowInvocationType.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1Request.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1Response.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/GetMonitorTransactionsV1ResponseTx.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/InvokeContractV1Request.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/InvokeContractV1Response.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/JarFile.kt
Expand All @@ -29,5 +34,9 @@ src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/mode
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/Party.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/PublicKey.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/SHA256.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StartMonitorV1Request.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StartMonitorV1Response.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StopMonitorV1Request.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/StopMonitorV1Response.kt
src/main/kotlin/org/hyperledger/cactus/plugin/ledger/connector/corda/server/model/X500Principal.kt
src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import org.springframework.context.annotation.ComponentScan
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.context.annotation.Bean
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter
import org.springframework.scheduling.annotation.EnableScheduling


@SpringBootApplication
@ComponentScan(basePackages = ["org.hyperledger.cactus.plugin.ledger.connector.corda.server", "org.hyperledger.cactus.plugin.ledger.connector.corda.server.api", "org.hyperledger.cactus.plugin.ledger.connector.corda.server.model"])
@EnableScheduling
class Application {
/**
* Spring Bean that binds a Corda Jackson object-mapper to HTTP message types used in Spring.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package org.hyperledger.cactus.plugin.ledger.connector.corda.server.api

import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ClearMonitorTransactionsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ClearMonitorTransactionsV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsBadRequestV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsSuccessV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DiagnoseNodeV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DiagnoseNodeV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.InvokeContractV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.InvokeContractV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ListFlowsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ListFlowsV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.NodeInfo
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StartMonitorV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StartMonitorV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StopMonitorV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StopMonitorV1Response
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
import org.springframework.http.ResponseEntity
Expand Down Expand Up @@ -37,6 +45,17 @@ import kotlin.collections.Map
class ApiPluginLedgerConnectorCordaController(@Autowired(required = true) val service: ApiPluginLedgerConnectorCordaService) {


@DeleteMapping(
value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/clear-monitor-transactions"],
produces = ["application/json"],
consumes = ["application/json"]
)
fun clearMonitorTransactionsV1( @Valid @RequestBody(required = false) clearMonitorTransactionsV1Request: ClearMonitorTransactionsV1Request?
): ResponseEntity<ClearMonitorTransactionsV1Response> {
return ResponseEntity(service.clearMonitorTransactionsV1(clearMonitorTransactionsV1Request), HttpStatus.valueOf(200))
}


@PostMapping(
value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/deploy-contract-jars"],
produces = ["application/json"],
Expand All @@ -59,6 +78,17 @@ class ApiPluginLedgerConnectorCordaController(@Autowired(required = true) val se
}


@GetMapping(
value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/get-monitor-transactions"],
produces = ["application/json"],
consumes = ["application/json"]
)
fun getMonitorTransactionsV1( @Valid @RequestBody(required = false) getMonitorTransactionsV1Request: GetMonitorTransactionsV1Request?
): ResponseEntity<GetMonitorTransactionsV1Response> {
return ResponseEntity(service.getMonitorTransactionsV1(getMonitorTransactionsV1Request), HttpStatus.valueOf(200))
}


@GetMapping(
value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/get-prometheus-exporter-metrics"],
produces = ["text/plain"]
Expand Down Expand Up @@ -99,4 +129,26 @@ class ApiPluginLedgerConnectorCordaController(@Autowired(required = true) val se
): ResponseEntity<List<NodeInfo>> {
return ResponseEntity(service.networkMapV1(body), HttpStatus.valueOf(200))
}


@PostMapping(
value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/start-monitor"],
produces = ["application/json"],
consumes = ["application/json"]
)
fun startMonitorV1( @Valid @RequestBody(required = false) startMonitorV1Request: StartMonitorV1Request?
): ResponseEntity<StartMonitorV1Response> {
return ResponseEntity(service.startMonitorV1(startMonitorV1Request), HttpStatus.valueOf(200))
}


@DeleteMapping(
value = ["/api/v1/plugins/@hyperledger/cactus-plugin-ledger-connector-corda/stop-monitor"],
produces = ["application/json"],
consumes = ["application/json"]
)
fun stopMonitorV1( @Valid @RequestBody(required = false) stopMonitorV1Request: StopMonitorV1Request?
): ResponseEntity<StopMonitorV1Response> {
return ResponseEntity(service.stopMonitorV1(stopMonitorV1Request), HttpStatus.valueOf(200))
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,43 @@
package org.hyperledger.cactus.plugin.ledger.connector.corda.server.api

import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ClearMonitorTransactionsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ClearMonitorTransactionsV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsBadRequestV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsSuccessV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DeployContractJarsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DiagnoseNodeV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.DiagnoseNodeV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.GetMonitorTransactionsV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.InvokeContractV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.InvokeContractV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ListFlowsV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.ListFlowsV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.NodeInfo
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StartMonitorV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StartMonitorV1Response
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StopMonitorV1Request
import org.hyperledger.cactus.plugin.ledger.connector.corda.server.model.StopMonitorV1Response

interface ApiPluginLedgerConnectorCordaService {

fun clearMonitorTransactionsV1(clearMonitorTransactionsV1Request: ClearMonitorTransactionsV1Request?): ClearMonitorTransactionsV1Response

fun deployContractJarsV1(deployContractJarsV1Request: DeployContractJarsV1Request?): DeployContractJarsSuccessV1Response

fun diagnoseNodeV1(diagnoseNodeV1Request: DiagnoseNodeV1Request?): DiagnoseNodeV1Response

fun getMonitorTransactionsV1(getMonitorTransactionsV1Request: GetMonitorTransactionsV1Request?): GetMonitorTransactionsV1Response

fun getPrometheusMetricsV1(): kotlin.String

fun invokeContractV1(invokeContractV1Request: InvokeContractV1Request?): InvokeContractV1Response

fun listFlowsV1(listFlowsV1Request: ListFlowsV1Request?): ListFlowsV1Response

fun networkMapV1(body: kotlin.Any?): List<NodeInfo>

fun startMonitorV1(startMonitorV1Request: StartMonitorV1Request?): StartMonitorV1Response

fun stopMonitorV1(stopMonitorV1Request: StopMonitorV1Request?): StopMonitorV1Response
}
Loading

0 comments on commit 865ec2f

Please sign in to comment.