From 5042ff14d345aa648a4e4a3198ad0a90c7469ffa Mon Sep 17 00:00:00 2001 From: John Saigle <4022790+johnsaigle@users.noreply.github.com> Date: Tue, 30 Jul 2024 10:27:05 -0400 Subject: [PATCH] node: Flow cancel enhancements and bug fixes (#4016) * node: Fix issue where transfers that were loaded from the DB did not add a flow-cancel transfer on the TargetChain Flow-canceling is done in the `ProcessMsgForTime` loop when a new message occurs. However, this was not done when a node restarted and reloaded transfers from the past 24 hours. As a result it was possible for the node to calculate a result that showed that the outgoing transfers for an emitter chain exceeded the daily limit. In effect this is true but only with the condition that there was incoming flow to allow this to happen. This appeared to violate an invariant and so the node did not start properly. node: Add unit tests when reloading flow cancel transactions from the database node: fix lint errors in governor_test.go * node: Add a command-line flag to enable or disable flow-canceling on restart Added a command-line flag to enable or disable flow-canceling when starting the node. This should allow Guardians to disable flow canceling in the case of future bugs or during a security incident. This should prevent the need to rollback to earlier Guardian versions. (@mdulin2 ) * node: Use deterministic iteration order over chains when changing Governor state - Adds a field that stores a sorted slice of chain IDs to the governor. - Use this field to iterate in a determinstic order when performing actions that change the state of the Governor - This should help Guardians reach a more similar view of the Governor in scenarios where iteration order might impact whether a transfer is queued. (This is relevant especially in the case of Flow Canceling) - Cases where only a single VAA is being modified were not changed. Iteration order should not matter here and determinstic order may may worse for performance when searching for a particular element. * node: Fix tokenEntry when checking flow cancel for pending transfers (Squash and merge bug fix from PR #4001) Similar to a previous issue in the function `ProcessMsgForTime`, the tokenEntry was not being generated properly. This should result in queued "small transfers" being able to flow cancel when they are released from the queue. Also adds a comment on the CheckedInt64 function to indicate what its error states mean and when they occur. Add comments and change variable names for governor_monitoring - Add function comments to explain what they do and what their error states mean - Adds governor logging to error cases - Change variable names in publishStatus function. `value` was used first to indicate the "governor usage" and then reused to indicate the remaining available notional value for a chain. This refactor tries to make it clear that these are different concepts Add unit test for flow cancelling when a pending transfer is released - Add a unit test to ensure that, when a pending transfer is released, it also does flow-cancelling on the TargetChain (previously we had a bug here) - Add documentation for CheckPendingForTime to clarify that it has side-effects * node: Modify error handling for CheckPending method in the Governor Previous rollouts of the Flow Cancel feature contained issues when calculating the Governor usage when usage was near the daily limit. This caused an invariant to be violated. However, this was propagated to the processor code and resulted in the processor restarting the entire process. Instead, the Governor should simply fail-closed and report that there is no remaining capacity, causing further VAAs to be queued until the usage diminishes over time. The circumstances leading to the invariant violations are not addressed in this commit. Instead this commit reworks the way errors are handled by the CheckPending, making careful choices about when the process should or should not be killed. - Change "invariant" error handling: instead of causing the process to die, log an error and skip further for a single chain while allowing processing for other chains to continue - Remove 'invariant error' in TrimAndSumValueForChain as it can occur somewhat regularly with the addition of the flow cancel feature - Return dailyLimit in error condition rather than 0 so that future transfers will be queued - Do not cap the sum returned from TrimAndSumValueForChain: instead allow it to exceed the daily limit. - Modify unit tests to reflect this - Add unit tests for overflow/underflow scenarios in the TrimAndSumValue functions - Change other less severe error cases to log warnings instead of returning errors. - Generally prevent flow-cancel related issues from affecting normal Governor operations. Instead the flow cancel transfers should simply not be populated and thus result in "GovernorV1" behavior. - Add documentation to CheckPendingForTime to explain the dangers of returning an error - Reword error messages to be more precise and include more relevant fields. Add documentation explaining when the process should and should not die * node: Add additional metrics for Governor status Modify the monitoring code and protobuf files to make the status of the Governor more legible when flow-canceling is enabled. This can be consumed by Wormhole Dashboard to better reflect the effects of flow cancelling. On the level of the Governor: - whether the Guardian has enabled flow cancel or not On the level of the Governor's emitters, reports 24h metrics for: - net value that has moved across the chain - total outgoing amount - total incoming flow cancel amount Currently big transfers are not accounted for as they do not affect the Governor's capacity. (They are always queued.) * node: Add new flow cancel parameter to Governor in tests * node: goimports formatting * node: Bug fix in changes to governor monitoring - Fix issue where stats weren't being populated unless flow cancel was enabled - Fix wrong return value used in unit test - Fix typo in proto variable name - Move sorting outside of a for loop for efficiency - Restore unit test that was deleted in the process of rebasing * node: address prealloc lint error in governor code * node: Fix "generated proto differs from committed proto" * node: Fix bug in chainIds allocation - This resolves a mistake with allocating the chainIds in the governor initialization that causes nil entries in the slice. - Add unit tests to ensure that the chainIds slice matches the chains map - Add unit test to ensure that TrimAndSumValueForChain checks for a nil pointer to avoid panics * node: Fix returning nil on err in governor_test.go * node: Cleanup comments in governor code * node: fix governor comment * node: enable flow cancel in governor_monitoring tests * node: Add flow cancel information to p2p heartbeat features * node: Remove outdated comment from governor * node: Upgrade logs to Error from Warn when reloading transfers from database * node: Enable flow cancel in check_query test function * node: Cleanup comments and redundant code in governor * node: Refactor how the flow cancel token list gets populated - Only populate the flow cancel tokens list once - Change default behavior to use an empty flow cancel assets list, rather than first populating the list and then clearing it - Refactor the logic around enabling the flow cancel token field for governed assets. Now it only executes if flow cancel is enabled, rather than operating over an empty slice when flow cancel is disabled - Modify devnet/testnet configs so that they are responsible for returning the correct list of flow cancelling assets * node: Add unit test for flow cancel feature flag * node: Move new Governor status proto fields from Emitter to Chain * node: lint governor_monitoring --------- Co-authored-by: Maxwell Dulin --- node/cmd/guardiand/node.go | 11 +- node/pkg/adminrpc/adminserver_test.go | 2 +- node/pkg/governor/devnet_config.go | 7 +- node/pkg/governor/governor.go | 200 +++++--- node/pkg/governor/governor_db.go | 12 +- node/pkg/governor/governor_monitoring.go | 172 +++++-- node/pkg/governor/governor_monitoring_test.go | 2 +- node/pkg/governor/governor_prices.go | 2 +- node/pkg/governor/governor_test.go | 442 +++++++++++++++++- node/pkg/governor/testnet_config.go | 8 +- node/pkg/node/node_test.go | 2 +- node/pkg/node/options.go | 10 +- node/pkg/p2p/p2p.go | 6 +- node/pkg/proto/gossip/v1/gossip.pb.go | 107 +++-- node/pkg/publicrpc/publicrpcserver_test.go | 2 +- proto/gossip/v1/gossip.proto | 4 + 16 files changed, 817 insertions(+), 172 deletions(-) diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 460757e45c..b714c6ac47 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -231,7 +231,8 @@ var ( // Prometheus remote write URL promRemoteURL *string - chainGovernorEnabled *bool + chainGovernorEnabled *bool + governorFlowCancelEnabled *bool ccqEnabled *bool ccqAllowedRequesters *string @@ -435,6 +436,7 @@ func init() { promRemoteURL = NodeCmd.Flags().String("promRemoteURL", "", "Prometheus remote write URL (Grafana)") chainGovernorEnabled = NodeCmd.Flags().Bool("chainGovernorEnabled", false, "Run the chain governor") + governorFlowCancelEnabled = NodeCmd.Flags().Bool("governorFlowCancelEnabled", false, "Enable flow cancel on the governor") ccqEnabled = NodeCmd.Flags().Bool("ccqEnabled", false, "Enable cross chain query support") ccqAllowedRequesters = NodeCmd.Flags().String("ccqAllowedRequesters", "", "Comma separated list of signers allowed to submit cross chain queries") @@ -541,6 +543,11 @@ func runNode(cmd *cobra.Command, args []string) { os.Exit(1) } + if !(*chainGovernorEnabled) && *governorFlowCancelEnabled { + fmt.Println("Flow cancel can only be enabled when the governor is enabled") + os.Exit(1) + } + logger := zap.New(zapcore.NewCore( consoleEncoder{zapcore.NewConsoleEncoder( zap.NewDevelopmentEncoderConfig())}, @@ -1575,7 +1582,7 @@ func runNode(cmd *cobra.Command, args []string) { node.GuardianOptionDatabase(db), node.GuardianOptionWatchers(watcherConfigs, ibcWatcherConfig), node.GuardianOptionAccountant(*accountantWS, *accountantContract, *accountantCheckEnabled, accountantWormchainConn, *accountantNttContract, accountantNttWormchainConn), - node.GuardianOptionGovernor(*chainGovernorEnabled), + node.GuardianOptionGovernor(*chainGovernorEnabled, *governorFlowCancelEnabled), node.GuardianOptionGatewayRelayer(*gatewayRelayerContract, gatewayRelayerWormchainConn), node.GuardianOptionQueryHandler(*ccqEnabled, *ccqAllowedRequesters), node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap), diff --git a/node/pkg/adminrpc/adminserver_test.go b/node/pkg/adminrpc/adminserver_test.go index 08bcf64745..f1167f71c1 100644 --- a/node/pkg/adminrpc/adminserver_test.go +++ b/node/pkg/adminrpc/adminserver_test.go @@ -322,7 +322,7 @@ func Test_adminCommands(t *testing.T) { } func newNodePrivilegedServiceForGovernorTests() *nodePrivilegedService { - gov := governor.NewChainGovernor(zap.NewNop(), &db.MockGovernorDB{}, wh_common.GoTest) + gov := governor.NewChainGovernor(zap.NewNop(), &db.MockGovernorDB{}, wh_common.GoTest, false) return &nodePrivilegedService{ db: nil, diff --git a/node/pkg/governor/devnet_config.go b/node/pkg/governor/devnet_config.go index 55a6e2f68b..828073b067 100644 --- a/node/pkg/governor/devnet_config.go +++ b/node/pkg/governor/devnet_config.go @@ -17,8 +17,11 @@ func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigE {chain: 2, addr: "000000000000000000000000DDb64fE46a91D46ee29420539FC25FD07c5FEa3E", symbol: "WETH", coinGeckoId: "weth", decimals: 8, price: 1174}, } - flowCancelTokens := []tokenConfigEntry{ - {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usdc", decimals: 6, price: 1}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 1 + flowCancelTokens := []tokenConfigEntry{} + if gov.flowCancelEnabled { + flowCancelTokens = []tokenConfigEntry{ + {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usdc", decimals: 6, price: 1}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 1 + } } chains := []chainConfigEntry{ diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index 13830fd006..fbea430def 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -28,9 +28,12 @@ package governor import ( "context" "encoding/hex" + "errors" "fmt" "math" "math/big" + "sort" + "strconv" "sync" "time" @@ -179,12 +182,15 @@ func (ce *chainEntry) isBigTransfer(value uint64) bool { } type ChainGovernor struct { - db db.GovernorDB // protected by `mutex` - logger *zap.Logger - mutex sync.Mutex - tokens map[tokenKey]*tokenEntry // protected by `mutex` - tokensByCoinGeckoId map[string][]*tokenEntry // protected by `mutex` - chains map[vaa.ChainID]*chainEntry // protected by `mutex` + db db.GovernorDB // protected by `mutex` + logger *zap.Logger + mutex sync.Mutex + tokens map[tokenKey]*tokenEntry // protected by `mutex` + tokensByCoinGeckoId map[string][]*tokenEntry // protected by `mutex` + chains map[vaa.ChainID]*chainEntry // protected by `mutex` + // We maintain a sorted slice of governed chainIds so we can iterate over maps in a deterministic way + // This slice should be sorted in ascending order by (Wormhole) Chain ID. + chainIds []vaa.ChainID msgsSeen map[string]bool // protected by `mutex` // Key is hash, payload is consts transferComplete and transferEnqueued. msgsToPublish []*common.MessagePublication // protected by `mutex` dayLengthInMinutes int @@ -194,12 +200,14 @@ type ChainGovernor struct { nextConfigPublishTime time.Time statusPublishCounter int64 configPublishCounter int64 + flowCancelEnabled bool } func NewChainGovernor( logger *zap.Logger, db db.GovernorDB, env common.Environment, + flowCancelEnabled bool, ) *ChainGovernor { return &ChainGovernor{ db: db, @@ -209,6 +217,7 @@ func NewChainGovernor( chains: make(map[vaa.ChainID]*chainEntry), msgsSeen: make(map[string]bool), env: env, + flowCancelEnabled: flowCancelEnabled, } } @@ -232,19 +241,28 @@ func (gov *ChainGovernor) Run(ctx context.Context) error { return nil } +func (gov *ChainGovernor) IsFlowCancelEnabled() bool { + return gov.flowCancelEnabled +} + func (gov *ChainGovernor) initConfig() error { gov.mutex.Lock() defer gov.mutex.Unlock() gov.dayLengthInMinutes = 24 * 60 - configTokens := tokenList() - flowCancelTokens := FlowCancelTokenList() configChains := chainList() + configTokens := tokenList() + flowCancelTokens := []tokenConfigEntry{} if gov.env == common.UnsafeDevNet { configTokens, flowCancelTokens, configChains = gov.initDevnetConfig() } else if gov.env == common.TestNet { configTokens, flowCancelTokens, configChains = gov.initTestnetConfig() + } else { + // mainnet, unit tests, or accountant-mock + if gov.flowCancelEnabled { + flowCancelTokens = FlowCancelTokenList() + } } for _, ct := range configTokens { @@ -306,23 +324,27 @@ func (gov *ChainGovernor) initConfig() error { } } - for _, flowCancelConfigEntry := range flowCancelTokens { - addr, err := vaa.StringToAddress(flowCancelConfigEntry.addr) - if err != nil { - return err - } - key := tokenKey{chain: vaa.ChainID(flowCancelConfigEntry.chain), addr: addr} + // If flow cancelling is enabled, enable the `flowCancels` field for the Governed assets that + // correspond to the entries in the Flow Cancel Tokens List + if gov.flowCancelEnabled { + for _, flowCancelConfigEntry := range flowCancelTokens { + addr, err := vaa.StringToAddress(flowCancelConfigEntry.addr) + if err != nil { + return err + } + key := tokenKey{chain: vaa.ChainID(flowCancelConfigEntry.chain), addr: addr} - // Only add flow cancelling for tokens that are already configured for rate-limiting. - if _, ok := gov.tokens[key]; ok { - gov.tokens[key].flowCancels = true - } else { - gov.logger.Debug("token present in flow cancel list but absent from main token list:", - zap.Stringer("chain", key.chain), - zap.Stringer("addr", key.addr), - zap.String("symbol", flowCancelConfigEntry.symbol), - zap.String("coinGeckoId", flowCancelConfigEntry.coinGeckoId), - ) + // Only add flow cancelling for tokens that are already configured for rate-limiting. + if _, ok := gov.tokens[key]; ok { + gov.tokens[key].flowCancels = true + } else { + gov.logger.Debug("token present in flow cancel list but absent from main token list:", + zap.Stringer("chain", key.chain), + zap.Stringer("addr", key.addr), + zap.String("symbol", flowCancelConfigEntry.symbol), + zap.String("coinGeckoId", flowCancelConfigEntry.coinGeckoId), + ) + } } } @@ -375,6 +397,22 @@ func (gov *ChainGovernor) initConfig() error { return fmt.Errorf("no chains are configured") } + // Populate a sorted list of chain IDs so that we can iterate over maps in a determinstic way. + // https://go.dev/blog/maps, "Iteration order" section + governedChainIds := make([]vaa.ChainID, len(gov.chains)) + i := 0 + for id := range gov.chains { + // updating the slice in place here to satisfy prealloc lint. In theory this should be more performant + governedChainIds[i] = id + i++ + } + // Custom sorting for the vaa.ChainID type + sort.Slice(governedChainIds, func(i, j int) bool { + return governedChainIds[i] < governedChainIds[j] + }) + + gov.chainIds = governedChainIds + return nil } @@ -651,10 +689,23 @@ func (gov *ChainGovernor) parseMsgAlreadyLocked( return true, ce, token, payload, nil } +// CheckPending is a wrapper method for CheckPendingForTime. It is called by the processor with the purpose of releasing +// queued transfers. func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) { return gov.CheckPendingForTime(time.Now()) } +// CheckPendingForTime checks whether a pending message is ready to be released, and if so, modifies the chain entry's `pending` and `transfers` slices by +// moving a `dbTransfer` element from `pending` to `transfers`. Returns a slice of Messages that will be published. +// A transfer is ready to be released when one of the following conditions holds: +// - The 'release time' duration has passed since `now` (i.e. the transfer has been queued for 24 hours, regardless of +// the Governor's current capacity) +// - Within the release time duration, other transfers have been processed and have freed up outbound Governor capacity. +// This happens either because other transfers get released after 24 hours or because incoming transfers of +// flow-cancelling assets have freed up outbound capacity. +// +// WARNING: When this function returns an error, it propagates to the `processor` which in turn interprets this as a +// signal to RESTART THE PROCESSOR. Therefore, errors returned by this function effectively act as panics. func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessagePublication, error) { gov.mutex.Lock() defer gov.mutex.Unlock() @@ -669,15 +720,26 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP gov.msgsToPublish = nil } - for _, ce := range gov.chains { + // Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly + for _, chainId := range gov.chainIds { + ce, ok := gov.chains[chainId] + if !ok { + gov.logger.Error("chainId not found in gov.chains", zap.Stringer("chainId", chainId)) + + } // Keep going as long as we find something that will fit. for { foundOne := false prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime) if err != nil { gov.logger.Error("error when attempting to trim and sum transfers", zap.Error(err)) + gov.logger.Error("refusing to release transfers for this chain until the sum can be correctly calculated", + zap.Stringer("chainId", chainId), + zap.Uint64("prevTotalValue", prevTotalValue), + zap.Error(err)) gov.msgsToPublish = msgsToPublish - return nil, err + // Skip further processing for this chain entry + break } // Keep going until we find something that fits or hit the end. @@ -734,7 +796,8 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP zap.Uint64("value", value), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue), - zap.String("msgID", pe.dbData.Msg.MessageIDString())) + zap.String("msgID", pe.dbData.Msg.MessageIDString()), + zap.String("flowCancels", strconv.FormatBool(pe.token.flowCancels))) } payload, err := vaa.DecodeTransferPayloadHdr(pe.dbData.Msg.Payload) @@ -746,7 +809,9 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP ) delete(gov.msgsSeen, pe.hash) // Rest of the clean up happens below. } else { - // If we get here, publish it and remove it from the pending list. + // If we get here, publish it and move it from the pending list to the + // transfers list. Also add a flow-cancel transfer to the destination chain + // if the transfer is sending a flow-canceling asset. msgsToPublish = append(msgsToPublish, &pe.dbData.Msg) if countsTowardsTransfers { @@ -762,26 +827,47 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP Hash: pe.hash, } - if err := gov.db.StoreTransfer(&dbTransfer); err != nil { - gov.msgsToPublish = msgsToPublish + transfer, err := newTransferFromDbTransfer(&dbTransfer) + if err != nil { + // Should never occur unless dbTransfer.Value overflows MaxInt64 + gov.logger.Error("could not convert dbTransfer to transfer", + zap.String("msgID", dbTransfer.MsgID), + zap.String("hash", pe.hash), + zap.Error(err), + ) + // This causes the processor to die. We don't want to process transfers that + // have USD value in excess of MaxInt64 under any circumstances. + // This check should occur before the call to the database so + // that we don't store a problematic transfer. return nil, err } - transfer, err := newTransferFromDbTransfer(&dbTransfer) - if err != nil { + if err := gov.db.StoreTransfer(&dbTransfer); err != nil { + // This causes the processor to die. We can't tolerate DB connection + // errors. return nil, err } + ce.transfers = append(ce.transfers, transfer) + gov.msgsSeen[pe.hash] = transferComplete + // Add inverse transfer to destination chain entry if this asset can cancel flows. - key := tokenKey{chain: dbTransfer.EmitterChain, addr: dbTransfer.EmitterAddress} + key := tokenKey{chain: pe.token.token.chain, addr: pe.token.token.addr} tokenEntry := gov.tokens[key] if tokenEntry != nil { // Mandatory check to ensure that the token should be able to reduce the Governor limit. if tokenEntry.flowCancels { if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok { + if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil { - return nil, err + gov.logger.Warn("could not add flow canceling transfer to destination chain", + zap.String("msgID", dbTransfer.MsgID), + zap.String("hash", pe.hash), + zap.Error(err), + ) + // Process the next pending transfer + continue } } else { gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", @@ -792,7 +878,6 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP } } } - gov.msgsSeen[pe.hash] = transferComplete } else { delete(gov.msgsSeen, pe.hash) } @@ -836,46 +921,46 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) { return value, nil } -// TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `emitter`. In effect, it represents a +// TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `chainEntry`. In effect, it represents a // chain's "Governor Usage" for a given 24 hour period. // This sum may be reduced by the sum of 'flow cancelling' transfers: that is, transfers of an allow-listed token // that have the `emitter` as their destination chain. // The resulting `sum` return value therefore represents the net flow across a chain when taking flow-cancelling tokens // into account. Therefore, this value should never be less than 0 and should never exceed the "Governor limit" for the chain. -// As a side-effect, this function modifies the parameter `emitter`, updating its `transfers` field so that it only includes +// As a side-effect, this function modifies the parameter `chainEntry`, updating its `transfers` field so that it only includes // filtered `Transfer`s (i.e. outgoing `Transfer`s newer than `startTime`). +// Returns an error if the sum cannot be calculated. The transfers field will still be updated in this case. When +// an error condition occurs, this function returns the chain's `dailyLimit` as the sum. This should result in the +// chain appearing at maximum capacity from the perspective of the Governor, and therefore cause new transfers to be +// queued until space opens up. // SECURITY Invariant: The `sum` return value should never be less than 0 -// SECURITY Invariant: The `sum` return value should never exceed the "Governor limit" for the chain -func (gov *ChainGovernor) TrimAndSumValueForChain(emitter *chainEntry, startTime time.Time) (sum uint64, err error) { - // Sum the value of all outgoing transfers - var sumOutgoing int64 - sumOutgoing, emitter.transfers, err = gov.TrimAndSumValue(emitter.transfers, startTime) +func (gov *ChainGovernor) TrimAndSumValueForChain(chainEntry *chainEntry, startTime time.Time) (sum uint64, err error) { + if chainEntry == nil { + // We don't expect this to happen but this prevents a nil pointer deference + return 0, errors.New("TrimAndSumValeForChain parameter chainEntry must not be nil") + } + // Sum the value of all transfers for this chain. This sum can be negative if flow-cancelling is enabled + // and the incoming value of flow-cancelling assets exceeds the summed value of all outgoing assets. + var sumValue int64 + sumValue, chainEntry.transfers, err = gov.TrimAndSumValue(chainEntry.transfers, startTime) if err != nil { - return 0, err + // Return the daily limit as the sum so that any further transfers will be queued. + return chainEntry.dailyLimit, err } - // Return early if the sum is not positive as it cannot exceed the daily limit. - // In this case, return 0 even if the sum is negative. - if sumOutgoing <= 0 { + // Return 0 even if the sum is negative. + if sumValue <= 0 { return 0, nil } - sum = uint64(sumOutgoing) - if sum > emitter.dailyLimit { - return 0, fmt.Errorf( - "invariant violation: calculated sum %d exceeds Governor limit %d", - sum, - emitter.dailyLimit, - ) - } - - return sum, nil + return uint64(sumValue), nil } // TrimAndSumValue iterates over a slice of transfer structs. It filters out transfers that have Timestamp values that // are earlier than the parameter `startTime`. The function then iterates over the remaining transfers, sums their Value, // and returns the sum and the filtered transfers. +// As a side-effect, this function deletes transfers from the database if their Timestamp is before `startTime`. // The `transfers` slice must be sorted by Timestamp. We expect this to be the case as transfers are added to the // Governor in chronological order as they arrive. Note that `Timestamp` is created by the Governor; it is not read // from the actual on-chain transaction. @@ -948,7 +1033,8 @@ func CheckedAddUint64(x uint64, y uint64) (uint64, error) { return sum, nil } -// CheckedAddInt64 adds two uint64 values with overflow checks +// CheckedAddInt64 adds two uint64 values with overflow checks. Returns an error if the calculation would +// overflow or underflow. In this case, the returned value is 0. func CheckedAddInt64(x int64, y int64) (int64, error) { if x == 0 { return y, nil diff --git a/node/pkg/governor/governor_db.go b/node/pkg/governor/governor_db.go index a71fb194fa..db928ce868 100644 --- a/node/pkg/governor/governor_db.go +++ b/node/pkg/governor/governor_db.go @@ -243,19 +243,23 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error { } ce.transfers = append(ce.transfers, transfer) - // Reload flow-cancel transfers for the TargetChain. This is important when node restarts so that a corresponding, - // inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` loop but - // that function does not capture flow-cancelling when the node is restarted. + // Reload flow-cancel transfers for the TargetChain. This is important when the node restarts so that a corresponding, + // inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` and + // `CheckPending` loops but those functions do not capture flow-cancelling when the node is restarted. tokenEntry := gov.tokens[tk] if tokenEntry != nil { // Mandatory check to ensure that the token should be able to reduce the Governor limit. if tokenEntry.flowCancels { if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok { if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil { + gov.logger.Error("could not add flow canceling transfer to destination chain", + zap.String("msgID", xfer.MsgID), + zap.String("hash", xfer.Hash), zap.Error(err), + ) return err } } else { - gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", + gov.logger.Error("tried to cancel flow but chain entry for target chain does not exist", zap.String("msgID", xfer.MsgID), zap.Stringer("token chain", xfer.OriginChain), zap.Stringer("token address", xfer.OriginAddress), diff --git a/node/pkg/governor/governor_monitoring.go b/node/pkg/governor/governor_monitoring.go index 9929acb98b..142a90dd8e 100644 --- a/node/pkg/governor/governor_monitoring.go +++ b/node/pkg/governor/governor_monitoring.go @@ -100,15 +100,16 @@ func (gov *ChainGovernor) Status() (resp string) { defer gov.mutex.Unlock() startTime := time.Now().Add(-time.Minute * time.Duration(gov.dayLengthInMinutes)) + for _, ce := range gov.chains { - valueTrans, err := sumValue(ce.transfers, startTime) + netValue, _, _, err := sumValue(ce.transfers, startTime) if err != nil { // We don't want to actually return an error or otherwise stop // execution in this case. Instead of propagating the error here, print the contents of the // error message. return fmt.Sprintf("chain: %v, dailyLimit: OVERFLOW. error: %s", ce.emitterChainId, err) } - s1 := fmt.Sprintf("chain: %v, dailyLimit: %v, total: %v, numPending: %v", ce.emitterChainId, ce.dailyLimit, valueTrans, len(ce.pending)) + s1 := fmt.Sprintf("chain: %v, dailyLimit: %v, total: %v, numPending: %v", ce.emitterChainId, ce.dailyLimit, netValue, len(ce.pending)) resp += s1 + "\n" gov.logger.Info(s1) if len(ce.pending) != 0 { @@ -244,62 +245,111 @@ func (gov *ChainGovernor) resetReleaseTimerForTime(vaaId string, now time.Time, } return "", fmt.Errorf("vaa not found in the pending list") + } -// sumValue sums the value of all `transfers`. See also `TrimAndSumValue`. -func sumValue(transfers []transfer, startTime time.Time) (uint64, error) { +// sumValue sums the value of all `transfers`, returning separate fields for: +// - the net sum of all outgoing small tranasfers minus flow cancel sum +// - the sum of all outgoing small tranasfers +// - the sum of all incoming flow-cancelling transfers +// NOTE these sums exclude "big transfers" as they are always queued for 24h and are never added to the chain entry's 'transfers' field. +// Returns an error if the sum of all transfers would overflow the bounds of Int64. In this case, the function +// returns a value of 0. +func sumValue(transfers []transfer, startTime time.Time) (netNotional int64, smallTxOutgoingNotional uint64, flowCancelNotional uint64, err error) { if len(transfers) == 0 { - return 0, nil + return 0, 0, 0, nil } - var sum int64 + // Sum of all outgoing small tranasfers minus incoming flow cancel transfers. Big transfers are excluded + netNotional = int64(0) + smallTxOutgoingNotional = uint64(0) + flowCancelNotional = uint64(0) for _, t := range transfers { if t.dbTransfer.Timestamp.Before(startTime) { continue } - checkedSum, err := CheckedAddInt64(sum, t.value) + netNotional, err = CheckedAddInt64(netNotional, t.value) if err != nil { // We have to stop and return an error here (rather than saturate, for example). The // transfers are not sorted by value so we can't make any guarantee on the final value // if we hit the upper or lower bound. We don't expect this to happen in any case. - return 0, err + return 0, 0, 0, err + } + if t.value < 0 { + // If a transfer is negative then it is an incoming, flow-cancelling transfer. + // We can use the dbTransfer.Value for calculating the sum because it is the unsigned version + // of t.Value + flowCancelNotional += t.dbTransfer.Value + } else { + smallTxOutgoingNotional += t.dbTransfer.Value } - sum = checkedSum - } - - // Do not return negative values. Instead, saturate to zero. - if sum <= 0 { - return 0, nil } - return uint64(sum), nil + return netNotional, smallTxOutgoingNotional, flowCancelNotional, nil } -// REST query to get the current available notional value per chain. +// REST query to get the current available notional value per chain. This is defined as the sum of all transfers +// subtracted from the chains's dailyLimit. +// The available notional limit by chain represents the remaining capacity of a chain. As a result, it should not be +// a negative number: we don't want to represent that there is "negative value" available. func (gov *ChainGovernor) GetAvailableNotionalByChain() (resp []*publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry) { gov.mutex.Lock() defer gov.mutex.Unlock() startTime := time.Now().Add(-time.Minute * time.Duration(gov.dayLengthInMinutes)) - for _, ce := range gov.chains { - value, err := sumValue(ce.transfers, startTime) + + // Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly + for _, chainId := range gov.chainIds { + ce := gov.chains[chainId] + netUsage, _, incoming, err := sumValue(ce.transfers, startTime) if err != nil { - // Don't return an error here, just return 0 - return make([]*publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry, 0) + // Report 0 available notional if we can't calculate the current usage + gov.logger.Error("GetAvailableNotionalByChain: failed to compute sum of transfers for chain entry", + zap.String("chainID", chainId.String()), + zap.Error(err)) + resp = append(resp, &publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry{ + ChainId: uint32(ce.emitterChainId), + RemainingAvailableNotional: 0, + NotionalLimit: ce.dailyLimit, + BigTransactionSize: ce.bigTransactionSize, + }) + continue } - if value >= ce.dailyLimit { - value = 0 - } else { - value = ce.dailyLimit - value + + remaining := gov.availableNotionalValue(chainId, netUsage) + + if !gov.flowCancelEnabled { + // When flow cancel is disabled, we expect that both the netUsage and remaining notional should be + // within the range of [0, dailyLimit]. Flow cancel allows flexibility here. netUsage may be + // negative if there is a lot of incoming flow; conversely, it may exceed dailyLimit if incoming + // flow added space, allowed additional transfers through, and then expired after 24h. + // Note that if flow cancel is enabled and then later disabled, netUsage can exceed dailyLimit + // for 24h as old transfers will be loaded from the database into the Governor, but the flow + // cancel transfers will not. The value should return to the normal range after 24h has elapsed + // since the old transfers were sent. + if netUsage < 0 || incoming != 0 { + gov.logger.Warn("GetAvailableNotionalByChain: net value for chain is negative even though flow cancel is disabled", + zap.String("chainID", chainId.String()), + zap.Uint64("dailyLimit", ce.dailyLimit), + zap.Int64("netUsage", netUsage), + zap.Error(err)) + } else if uint64(netUsage) > ce.dailyLimit { + gov.logger.Warn("GetAvailableNotionalByChain: net value for chain exceeds daily limit even though flow cancel is disabled", + zap.String("chainID", chainId.String()), + zap.Uint64("dailyLimit", ce.dailyLimit), + zap.Error(err)) + } + } resp = append(resp, &publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry{ ChainId: uint32(ce.emitterChainId), - RemainingAvailableNotional: value, + RemainingAvailableNotional: remaining, NotionalLimit: ce.dailyLimit, BigTransactionSize: ce.bigTransactionSize, }) + } sort.SliceStable(resp, func(i, j int) bool { @@ -365,6 +415,25 @@ func (gov *ChainGovernor) IsVAAEnqueued(msgId *publicrpcv1.MessageID) (bool, err return false, nil } +// availableNotionalValue calculates the available notional USD value for a chain entry based on the net value +// of the chain. +func (gov *ChainGovernor) availableNotionalValue(id vaa.ChainID, netUsage int64) uint64 { + remaining := uint64(0) + ce := gov.chains[id] + + // Handle negative case here so we can safely cast to uint64 below + if netUsage < 0 { + // The full capacity is available for the chain. + remaining = ce.dailyLimit + } else if uint64(netUsage) > ce.dailyLimit { + remaining = 0 + } else { + remaining = ce.dailyLimit - uint64(netUsage) + } + + return remaining +} + // REST query to get the list of tokens being monitored by the governor. func (gov *ChainGovernor) GetTokenList() []*publicrpcv1.GovernorGetTokenListResponse_Entry { gov.mutex.Lock() @@ -441,21 +510,21 @@ func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat, sendC chan<- [] if exists { enabled = "1" - value, err := sumValue(ce.transfers, startTime) + netUsage, _, _, err := sumValue(ce.transfers, startTime) + + remaining := uint64(0) if err != nil { // Error can occur if the sum overflows. Return 0 in this case rather than returning an // error. - value = 0 - } - if value >= ce.dailyLimit { - value = 0 + gov.logger.Error("CollectMetrics: failed to compute sum of transfers for chain entry", zap.String("chain", chain.String()), zap.Error(err)) + remaining = 0 } else { - value = ce.dailyLimit - value + remaining = gov.availableNotionalValue(chain, netUsage) } pending := len(ce.pending) totalNotional = fmt.Sprint(ce.dailyLimit) - available = float64(value) + available = float64(remaining) numPending = float64(pending) totalPending += pending } @@ -493,7 +562,9 @@ var governorMessagePrefixStatus = []byte("governor_status_000000000000000000|") func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan<- []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { chains := make([]*gossipv1.ChainGovernorConfig_Chain, 0) - for _, ce := range gov.chains { + // Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly + for _, cid := range gov.chainIds { + ce := gov.chains[cid] chains = append(chains, &gossipv1.ChainGovernorConfig_Chain{ ChainId: uint32(ce.emitterChainId), NotionalLimit: ce.dailyLimit, @@ -513,11 +584,12 @@ func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan<- []b gov.configPublishCounter += 1 payload := &gossipv1.ChainGovernorConfig{ - NodeName: hb.NodeName, - Counter: gov.configPublishCounter, - Timestamp: hb.Timestamp, - Chains: chains, - Tokens: tokens, + NodeName: hb.NodeName, + Counter: gov.configPublishCounter, + Timestamp: hb.Timestamp, + Chains: chains, + Tokens: tokens, + FlowCancelEnabled: gov.flowCancelEnabled, } b, err := proto.Marshal(payload) @@ -551,16 +623,15 @@ func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan<- []b func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []byte, startTime time.Time, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { chains := make([]*gossipv1.ChainGovernorStatus_Chain, 0) numEnqueued := 0 - for _, ce := range gov.chains { - value, err := sumValue(ce.transfers, startTime) + for chainId, ce := range gov.chains { + // The capacity for the chain to emit further messages, denoted as USD value. + remaining := uint64(0) + netUsage, smallTxNotional, flowCancelNotional, err := sumValue(ce.transfers, startTime) - if err != nil || value >= ce.dailyLimit { - // In case of error, set value to 0 rather than returning an error to the caller. An error - // here means sumValue has encountered an overflow and this should never happen. Even if it did - // we don't want to stop execution here. - value = 0 + if err != nil { + gov.logger.Error("publishStatus: failed to compute sum of transfers for chain entry", zap.String("chain", chainId.String()), zap.Error(err)) } else { - value = ce.dailyLimit - value + remaining = gov.availableNotionalValue(chainId, netUsage) } enqueuedVaas := make([]*gossipv1.ChainGovernorStatus_EnqueuedVAA, 0) @@ -589,9 +660,12 @@ func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []b } chains = append(chains, &gossipv1.ChainGovernorStatus_Chain{ - ChainId: uint32(ce.emitterChainId), - RemainingAvailableNotional: value, - Emitters: []*gossipv1.ChainGovernorStatus_Emitter{&emitter}, + ChainId: uint32(ce.emitterChainId), + RemainingAvailableNotional: remaining, + Emitters: []*gossipv1.ChainGovernorStatus_Emitter{&emitter}, + SmallTxNetNotionalValue: netUsage, + SmallTxOutgoingNotionalValue: smallTxNotional, + FlowCancelNotionalValue: flowCancelNotional, }) } diff --git a/node/pkg/governor/governor_monitoring_test.go b/node/pkg/governor/governor_monitoring_test.go index aac6449556..5683e17429 100644 --- a/node/pkg/governor/governor_monitoring_test.go +++ b/node/pkg/governor/governor_monitoring_test.go @@ -11,7 +11,7 @@ import ( func TestIsVAAEnqueuedNilMessageID(t *testing.T) { logger, _ := zap.NewProduction() - gov := NewChainGovernor(logger, nil, common.GoTest) + gov := NewChainGovernor(logger, nil, common.GoTest, true) enqueued, err := gov.IsVAAEnqueued(nil) require.EqualError(t, err, "no message ID specified") assert.Equal(t, false, enqueued) diff --git a/node/pkg/governor/governor_prices.go b/node/pkg/governor/governor_prices.go index 510e7a5f00..f3ebe8fded 100644 --- a/node/pkg/governor/governor_prices.go +++ b/node/pkg/governor/governor_prices.go @@ -309,7 +309,7 @@ func CheckQuery(logger *zap.Logger) error { logger.Info("Instantiating governor.") ctx := context.Background() var db db.MockGovernorDB - gov := NewChainGovernor(logger, &db, common.MainNet) + gov := NewChainGovernor(logger, &db, common.MainNet, true) if err := gov.initConfig(); err != nil { return err diff --git a/node/pkg/governor/governor_test.go b/node/pkg/governor/governor_test.go index 964ec53345..388af53d37 100644 --- a/node/pkg/governor/governor_test.go +++ b/node/pkg/governor/governor_test.go @@ -185,6 +185,21 @@ func TestTrimEmptyTransfers(t *testing.T) { assert.Equal(t, 0, len(updatedTransfers)) } +// Make sure that the code doesn't panic if called with a nil chainEntry +func TestTrimAndSumValueForChainReturnsErrorForNilChainEntry(t *testing.T) { + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + require.NoError(t, err) + assert.NotNil(t, gov) + + now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)") + require.NoError(t, err) + + sum, err := gov.TrimAndSumValueForChain(nil, now) + require.Error(t, err) + assert.Equal(t, uint64(0), sum) +} + func TestSumAllFromToday(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx) @@ -283,6 +298,51 @@ func TestSumWithFlowCancelling(t *testing.T) { assert.Equal(t, difference, usage) } +func TestFlowCancelFeatureFlag(t *testing.T) { + + ctx := context.Background() + var db db.MockGovernorDB + gov := NewChainGovernor(zap.NewNop(), &db, common.GoTest, true) + + // Trigger the evaluation of the flow cancelling config + err := gov.Run(ctx) + require.NoError(t, err) + assert.NotNil(t, gov) + + // Test private bool + assert.True(t, gov.flowCancelEnabled) + // Test public getter + assert.True(t, gov.IsFlowCancelEnabled()) + numFlowCancelling := 0 + for _, tokenEntry := range gov.tokens { + if tokenEntry.flowCancels == true { + numFlowCancelling++ + } + } + assert.NotZero(t, numFlowCancelling) + + // Disable flow cancelling + gov = NewChainGovernor(zap.NewNop(), &db, common.GoTest, false) + + // Trigger the evaluation of the flow cancelling config + err = gov.Run(ctx) + require.NoError(t, err) + assert.NotNil(t, gov) + + // Test private bool + assert.False(t, gov.flowCancelEnabled) + // Test public getter + assert.False(t, gov.IsFlowCancelEnabled()) + numFlowCancelling = 0 + for _, tokenEntry := range gov.tokens { + if tokenEntry.flowCancels == true { + numFlowCancelling++ + } + } + assert.Zero(t, numFlowCancelling) + +} + // Flow cancelling transfers are subtracted from the overall sum of all transfers from a given // emitter chain. Since we are working with uint64 values, ensure that there is no underflow. // When the sum of all flow cancelling transfers is greater than emitted transfers for a chain, @@ -360,10 +420,12 @@ func TestFlowCancelCannotUnderflow(t *testing.T) { assert.Zero(t, usage) } -// Simulate a case where the total sum of transfers for a chain in a 24 hour period exceeds -// the configured Governor limit. This should never happen, so we make sure that an error -// is returned if the system is in this state -func TestInvariantGovernorLimit(t *testing.T) { +// We never expect this to occur when flow-cancelling is disabled. If flow-cancelling is enabled, there +// are some cases where the outgoing value exceeds the daily limit. Example: a large, incoming transfer +// of a flow-cancelling asset increases the Governor capacity beyond the daily limit. After 24h, that +// transfer is trimmed. This reduces the daily limit back to normal, but by this time more outgoing +// transfers have been emitted, causing the sum to exceed the daily limit. +func TestChainEntrySumExceedsDailyLimit(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx) require.NoError(t, err) @@ -406,10 +468,69 @@ func TestInvariantGovernorLimit(t *testing.T) { assert.Equal(t, expectedNumTransfers, len(transfers)) assert.NotZero(t, sum) - // Make sure we trigger the Invariant usage, err := gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24)) - require.ErrorContains(t, err, "invariant violation: calculated sum") - assert.Zero(t, usage) + require.NoError(t, err) + assert.Equal(t, emitterTransferValue*uint64(expectedNumTransfers), usage) +} + +func TestTrimAndSumValueOverflowErrors(t *testing.T) { + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + require.NoError(t, err) + assert.NotNil(t, gov) + + now, err := time.Parse("2006-Jan-02", "2024-Feb-19") + require.NoError(t, err) + + var transfers_from_emitter []transfer + transferTime, err := time.Parse("2006-Jan-02", "2024-Feb-19") + require.NoError(t, err) + + emitterChainId := vaa.ChainIDSolana + + transfer, err := newTransferFromDbTransfer(&db.Transfer{Value: math.MaxInt64, Timestamp: transferTime}) + require.NoError(t, err) + transfer2, err := newTransferFromDbTransfer(&db.Transfer{Value: 1, Timestamp: transferTime}) + require.NoError(t, err) + transfers_from_emitter = append(transfers_from_emitter, transfer, transfer2) + + // Populate chainEntry and ChainGovernor + emitter := &chainEntry{ + transfers: transfers_from_emitter, + emitterChainId: vaa.ChainID(emitterChainId), + dailyLimit: 10000, + } + gov.chains[emitter.emitterChainId] = emitter + + sum, _, err := gov.TrimAndSumValue(emitter.transfers, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer overflow") + assert.Zero(t, sum) + usage, err := gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer overflow") + assert.Equal(t, uint64(10000), usage) + + // overwrite emitter (discard transfer added above) + emitter = &chainEntry{ + emitterChainId: vaa.ChainID(emitterChainId), + dailyLimit: 10000, + } + gov.chains[emitter.emitterChainId] = emitter + + // Now test underflow + transfer3 := &db.Transfer{Value: math.MaxInt64, Timestamp: transferTime, TargetChain: vaa.ChainIDSolana} + + ce := gov.chains[emitter.emitterChainId] + err = ce.addFlowCancelTransferFromDbTransfer(transfer3) + require.NoError(t, err) + err = ce.addFlowCancelTransferFromDbTransfer(transfer3) + require.NoError(t, err) + + sum, _, err = gov.TrimAndSumValue(emitter.transfers, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer underflow") + assert.Zero(t, sum) + usage, err = gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer underflow") + assert.Equal(t, uint64(10000), usage) } func TestTrimOneOfTwoTransfers(t *testing.T) { @@ -545,21 +666,21 @@ func newChainGovernorForTestWithLogger(ctx context.Context, logger *zap.Logger) } var db db.MockGovernorDB - gov := NewChainGovernor(logger, &db, common.GoTest) + gov := NewChainGovernor(logger, &db, common.GoTest, true) err := gov.Run(ctx) if err != nil { - return gov, nil + return gov, err } emitterAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") if err != nil { - return gov, nil + return gov, err } tokenAddr, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E") if err != nil { - return gov, nil + return gov, err } gov.initConfigForTest( @@ -1371,7 +1492,6 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { func TestPendingTransferBeingReleased(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx) - require.NoError(t, err) assert.NotNil(t, gov) @@ -1516,7 +1636,9 @@ func TestPendingTransferBeingReleased(t *testing.T) { assert.Equal(t, 4, len(gov.msgsSeen)) // If we check pending before noon, nothing should happen. - now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)") + now, err = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)") + require.NoError(t, err) + assert.NotNil(t, now) toBePublished, err := gov.CheckPendingForTime(now) require.NoError(t, err) assert.Equal(t, 0, len(toBePublished)) @@ -1544,6 +1666,292 @@ func TestPendingTransferBeingReleased(t *testing.T) { assert.Equal(t, 3, len(gov.msgsSeen)) } +func TestPopulateChainIds(t *testing.T) { + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + require.NoError(t, err) + assert.NotNil(t, gov) + // Sanity check + assert.NotZero(t, len(gov.chainIds)) + + // Ensure that the chainIds slice match the entries in the chains map + assert.Equal(t, len(gov.chains), len(gov.chainIds)) + lowest := 0 + for _, chainId := range gov.chainIds { + chainEntry, ok := gov.chains[chainId] + assert.NotNil(t, chainEntry) + assert.True(t, ok) + assert.Equal(t, chainEntry.emitterChainId, chainId) + // Check that the chainIds are in ascending order. The point of this slice is that it provides + // deterministic ordering over chainIds. + assert.Greater(t, int(chainId), lowest) + lowest = int(chainId) + } +} + +// Test that, when a small transfer (under the 'big tx limit') of a flow-cancelling asset is queued and +// later released, it causes a reduction in the Governor usage for the destination chain. +func TestPendingTransferFlowCancelsWhenReleased(t *testing.T) { + + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + + require.NoError(t, err) + assert.NotNil(t, gov) + + // Set-up time + gov.setDayLengthInMinutes(24 * 60) + transferTime := time.Unix(int64(1654543099), 0) + + // Solana USDC used as the flow cancelling asset. This ensures that the flow cancel mechanism works + // when the Origin chain of the asset does not match the emitter chain + // NOTE: Replace this Chain:Address pair if the Flow Cancel Token List is modified + var flowCancelTokenOriginAddress vaa.Address + flowCancelTokenOriginAddress, err = vaa.StringToAddress("c6fa7af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f5d61") + require.NoError(t, err) + + require.NoError(t, err) + + // Data for Ethereum + tokenBridgeAddrStrEthereum := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec + tokenBridgeAddrEthereum, err := vaa.StringToAddress(tokenBridgeAddrStrEthereum) + require.NoError(t, err) + recipientEthereum := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8" //nolint:gosec + + // Data for Sui + tokenBridgeAddrStrSui := "0xc57508ee0d4595e5a8728974a4a93a787d38f339757230d441e895422c07aba9" //nolint:gosec + tokenBridgeAddrSui, err := vaa.StringToAddress(tokenBridgeAddrStrSui) + require.NoError(t, err) + recipientSui := "0x84a5f374d29fc77e370014dce4fd6a55b58ad608de8074b0be5571701724da31" + + // Data for Solana. Only used to represent the flow cancel asset. + // "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb" + tokenBridgeAddrStrSolana := "0x0e0a589e6488147a94dcfa592b90fdd41152bb2ca77bf6016758a6f4df9d21b4" //nolint:gosec + + // Add chain entries to `gov` + dailyLimit := uint64(10000) + err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStrEthereum, dailyLimit, 0) + require.NoError(t, err) + err = gov.setChainForTesting(vaa.ChainIDSui, tokenBridgeAddrStrSui, dailyLimit, 0) + require.NoError(t, err) + err = gov.setChainForTesting(vaa.ChainIDSolana, tokenBridgeAddrStrSolana, dailyLimit, 0) + require.NoError(t, err) + + // Add flow cancel asset and non-flow cancelable asset to the token entry for `gov` + err = gov.setTokenForTesting(vaa.ChainIDSolana, flowCancelTokenOriginAddress.String(), "USDC", 1.0, true) + require.NoError(t, err) + assert.NotNil(t, gov.tokens[tokenKey{chain: vaa.ChainIDSolana, addr: flowCancelTokenOriginAddress}]) + + // First message: consume most of the dailyLimit for the emitter chain + msg1 := common.MessagePublication{ + TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + Timestamp: time.Unix(int64(transferTime.Unix()+1), 0), + Nonce: uint32(1), + Sequence: uint64(1), + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddrEthereum, + ConsistencyLevel: uint8(32), + Payload: buildMockTransferPayloadBytes(1, + vaa.ChainIDSolana, // The origin asset for the token being transferred + flowCancelTokenOriginAddress.String(), + vaa.ChainIDSui, + recipientSui, + 10000, + ), + } + + // Second message: This transfer gets queued because the limit is exhausted + msg2 := common.MessagePublication{ + TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + Timestamp: time.Unix(int64(transferTime.Unix()+2), 0), + Nonce: uint32(2), + Sequence: uint64(2), + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddrEthereum, + ConsistencyLevel: uint8(32), + Payload: buildMockTransferPayloadBytes(1, + vaa.ChainIDSolana, + flowCancelTokenOriginAddress.String(), + vaa.ChainIDSui, + recipientSui, + 500, + ), + } + + // Third message: Incoming flow cancelling transfer to the emitter chain for the previous messages. This + // reduces the Governor usage for that chain. + msg3 := common.MessagePublication{ + TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + Timestamp: time.Unix(int64(transferTime.Unix()+3), 0), + Nonce: uint32(3), + Sequence: uint64(3), + EmitterChain: vaa.ChainIDSui, + EmitterAddress: tokenBridgeAddrSui, + ConsistencyLevel: uint8(0), // Sui has a consistency level of 0 (instant) + Payload: buildMockTransferPayloadBytes(1, + vaa.ChainIDSolana, + flowCancelTokenOriginAddress.String(), + vaa.ChainIDEthereum, + recipientEthereum, + 1000, + ), + } + + // Stage 0: No transfers sent + chainEntryEthereum, exists := gov.chains[vaa.ChainIDEthereum] + assert.True(t, exists) + assert.NotNil(t, chainEntryEthereum) + chainEntrySui, exists := gov.chains[vaa.ChainIDSui] + assert.True(t, exists) + assert.NotNil(t, chainEntrySui) + sumEth, ethTransfers, err := gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, len(ethTransfers)) + assert.Zero(t, len(chainEntryEthereum.pending)) + assert.Zero(t, sumEth) + require.NoError(t, err) + sumSui, suiTransfers, err := gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(1654543099), 0)) + assert.Zero(t, len(suiTransfers)) + assert.Zero(t, sumSui) + require.NoError(t, err) + + // Perform a FIRST transfer (Ethereum --> Sui) + result, err := gov.ProcessMsgForTime(&msg1, time.Now()) + assert.True(t, result) + require.NoError(t, err) + + numTrans, netValueTrans, numPending, valuePending := gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 2, numTrans) // One for the positive and one for the negative + assert.Equal(t, int64(0), netValueTrans) // Zero, because the asset flow cancels + assert.Equal(t, 0, numPending) + assert.Equal(t, uint64(0), valuePending) + assert.Equal(t, 1, len(gov.msgsSeen)) + + // Check the state of the governor + chainEntryEthereum = gov.chains[vaa.ChainIDEthereum] + chainEntrySui = gov.chains[vaa.ChainIDSui] + assert.Equal(t, int(1), len(chainEntryEthereum.transfers)) + assert.Equal(t, int(0), len(chainEntryEthereum.pending)) // One for inbound refund and another for outbound + assert.Equal(t, int(1), len(chainEntrySui.transfers)) + sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(10000), sumEth) // Equal to total dailyLimit + assert.Equal(t, int(1), len(ethTransfers)) + require.NoError(t, err) + + // Outbound check: + // - ensure that the sum of the transfers is equal to the value of the inverse transfer + // - ensure the actual governor usage is Zero (any negative value is converted to zero by TrimAndSumValueForChain) + sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, 1, len(suiTransfers)) // A single NEGATIVE transfer + assert.Equal(t, int64(-10000), sumSui) // Ensure the inverse (negative) transfer is in the Sui chain Entry + require.NoError(t, err) + suiGovernorUsage, err := gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, suiGovernorUsage) // Actual governor usage must not be negative. + require.NoError(t, err) + + // Perform a SECOND transfer (Ethereum --> Sui again) + // When a transfer is queued, ProcessMsgForTime should return false. + result, err = gov.ProcessMsgForTime(&msg2, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.False(t, result) + require.NoError(t, err) + + // Stage 2: Transfer sent from Ethereum to Sui gets queued + numTrans, netValueTrans, numPending, valuePending = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 2, len(gov.msgsSeen)) // Two messages observed + assert.Equal(t, 2, numTrans) // Two transfers (same as previous step) + assert.Equal(t, int64(0), netValueTrans) // The two transfers and their inverses cancel each other out. + assert.Equal(t, 1, numPending) // Second transfer is queued because the limit is exhausted + assert.Equal(t, uint64(500), valuePending) + + // Check the state of the governor. + chainEntryEthereum = gov.chains[vaa.ChainIDEthereum] + chainEntrySui = gov.chains[vaa.ChainIDSui] + assert.Equal(t, int(1), len(chainEntryEthereum.transfers)) // One from previous step + assert.Equal(t, int(1), len(chainEntryEthereum.pending)) // One for inbound refund and another for outbound + assert.Equal(t, int(1), len(chainEntrySui.transfers)) // One inverse transfer. Inverse from pending not added yet + sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(10000), sumEth) // Same as before: full dailyLimit + assert.Equal(t, int(1), len(ethTransfers)) // Same as before + require.NoError(t, err) + sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int(1), len(suiTransfers)) // just the inverse from before + assert.Equal(t, int64(-10000), sumSui) // Unchanged. + require.NoError(t, err) + suiGovernorUsage, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, suiGovernorUsage) // Actual governor usage must not be negative. + require.NoError(t, err) + + // Stage 3: Message that reduces Governor usage for Ethereum (Sui --> Ethereum) + result, err = gov.ProcessMsgForTime(&msg3, time.Now()) + assert.True(t, result) + require.NoError(t, err) + + // Stage 3: Governor usage reduced on Ethereum due to incoming from Sui + numTrans, netValueTrans, numPending, valuePending = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 3, len(gov.msgsSeen)) + assert.Equal(t, 4, numTrans) // Two transfers and their inverses + assert.Equal(t, int64(0), netValueTrans) // Still zero because everything flow cancels + assert.Equal(t, 1, numPending) // Not released yet + assert.Equal(t, uint64(500), valuePending) + + // Check the state of the governor + chainEntryEthereum = gov.chains[vaa.ChainIDEthereum] + chainEntrySui = gov.chains[vaa.ChainIDSui] + assert.Equal(t, int(2), len(chainEntryEthereum.transfers)) + assert.Equal(t, int(1), len(chainEntryEthereum.pending)) // We have not yet released the pending transfer + assert.Equal(t, int(2), len(chainEntrySui.transfers)) + sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(9000), sumEth) // We freed up room because of Sui incoming + assert.Equal(t, int(2), len(ethTransfers)) // Two transfers cancel each other out + require.NoError(t, err) + sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int(2), len(suiTransfers)) + assert.Equal(t, int64(-9000), sumSui) // We consumed some outbound capacity + require.NoError(t, err) + suiGovernorUsage, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, uint64(0), suiGovernorUsage) // Still zero because it's still negative + require.NoError(t, err) + + // Stage 4: Release the pending transfer. We deliberately do not advance the time here because we are relying + // on the pending transfer being released as a result of flow-cancelling and not because 24 hours have passed. + // NOTE that even though the function says "Checked..." it modifies `gov` as a side-effect when a pending + // transfer is ready to be released + toBePublished, err := gov.CheckPendingForTime(time.Unix(int64(transferTime.Unix()-1000), 0)) + require.NoError(t, err) + assert.Equal(t, 1, len(toBePublished)) + + // Stage 4: Pending transfer released. This increases the Ethereum Governor usage again and reduces Sui. + numTrans, netValueTrans, numPending, valuePending = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 3, len(gov.msgsSeen)) + assert.Equal(t, 6, numTrans) // Two new transfers created from previous pending transfer + assert.Equal(t, int64(0), netValueTrans) // Still zero because everything flow cancels + assert.Equal(t, 0, numPending) // Pending transfer has been released + assert.Equal(t, uint64(0), valuePending) + + // Verify the stats that are non flow-cancelling. + // In practice this is the sum of the absolute value of all the transfers, including the inverses. + // 2 * (10000 + 1000 + 500) = 23000 + _, absValueTrans, _, _ := gov.getStatsForAllChains() + assert.Equal(t, uint64(23000), absValueTrans) + + // Check the state of the governor + chainEntryEthereum = gov.chains[vaa.ChainIDEthereum] + chainEntrySui = gov.chains[vaa.ChainIDSui] + assert.Equal(t, int(3), len(chainEntryEthereum.transfers)) // Two outbound, one inverse from Sui + assert.Equal(t, int(0), len(chainEntryEthereum.pending)) // Released + assert.Equal(t, int(3), len(chainEntrySui.transfers)) // One outbound, two inverses from Ethereum + sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(9500), sumEth) + assert.Equal(t, int(3), len(ethTransfers)) + require.NoError(t, err) + sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int(3), len(suiTransfers)) // New inverse transfer added after pending transfer was released + assert.Equal(t, int64(-9500), sumSui) // Flow-cancelling inverse transfer added to Sui after released + require.NoError(t, err) + suiGovernorUsage, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, uint64(0), suiGovernorUsage) // Still zero + require.NoError(t, err) +} + func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx) @@ -1775,7 +2183,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { func TestMainnetConfigIsValid(t *testing.T) { logger := zap.NewNop() var db db.MockGovernorDB - gov := NewChainGovernor(logger, &db, common.GoTest) + gov := NewChainGovernor(logger, &db, common.GoTest, true) gov.env = common.TestNet err := gov.initConfig() @@ -1785,7 +2193,7 @@ func TestMainnetConfigIsValid(t *testing.T) { func TestTestnetConfigIsValid(t *testing.T) { logger := zap.NewNop() var db db.MockGovernorDB - gov := NewChainGovernor(logger, &db, common.GoTest) + gov := NewChainGovernor(logger, &db, common.GoTest, true) gov.env = common.TestNet err := gov.initConfig() @@ -2061,9 +2469,9 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T // But the big transaction should not affect the daily notional. ce, exists := gov.chains[vaa.ChainIDEthereum] require.Equal(t, true, exists) - valueTrans, err = sumValue(ce.transfers, now) + _, outgoing, _, err := sumValue(ce.transfers, now) require.NoError(t, err) - assert.Equal(t, uint64(0), valueTrans) + assert.Equal(t, uint64(0), outgoing) } func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) { diff --git a/node/pkg/governor/testnet_config.go b/node/pkg/governor/testnet_config.go index c2650bcc17..6c7788f096 100644 --- a/node/pkg/governor/testnet_config.go +++ b/node/pkg/governor/testnet_config.go @@ -14,8 +14,12 @@ func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfig {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usdc", decimals: 6, price: 1}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 1 } - flowCancelTokens := []tokenConfigEntry{ - {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usdc", decimals: 6, price: 1}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 1 + flowCancelTokens := []tokenConfigEntry{} + + if gov.flowCancelEnabled { + flowCancelTokens = []tokenConfigEntry{ + {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usdc", decimals: 6, price: 1}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 1 + } } chains := []chainConfigEntry{ diff --git a/node/pkg/node/node_test.go b/node/pkg/node/node_test.go index 8c37e3dbf4..ecedf76573 100644 --- a/node/pkg/node/node_test.go +++ b/node/pkg/node/node_test.go @@ -188,7 +188,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui GuardianOptionDatabase(db), GuardianOptionWatchers(watcherConfigs, nil), GuardianOptionNoAccountant(), // disable accountant - GuardianOptionGovernor(true), + GuardianOptionGovernor(true, false), GuardianOptionGatewayRelayer("", nil), // disable gateway relayer GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, cfg.p2pPort, "", 0, "", "", func() string { return "" }), GuardianOptionPublicRpcSocket(cfg.publicSocket, publicRpcLogDetail), diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 8ccd9e3073..7bd0d88be1 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -195,14 +195,18 @@ func GuardianOptionAccountant( // GuardianOptionGovernor enables or disables the governor. // Dependencies: db -func GuardianOptionGovernor(governorEnabled bool) *GuardianOption { +func GuardianOptionGovernor(governorEnabled bool, flowCancelEnabled bool) *GuardianOption { return &GuardianOption{ name: "governor", dependencies: []string{"db"}, f: func(ctx context.Context, logger *zap.Logger, g *G) error { if governorEnabled { - logger.Info("chain governor is enabled") - g.gov = governor.NewChainGovernor(logger, g.db, g.env) + if flowCancelEnabled { + logger.Info("chain governor is enabled with flow cancel enabled") + } else { + logger.Info("chain governor is enabled without flow cancel") + } + g.gov = governor.NewChainGovernor(logger, g.db, g.env, flowCancelEnabled) } else { logger.Info("chain governor is disabled") } diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index b4ffded1d7..ed353938b3 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -459,7 +459,11 @@ func Run(params *RunParams) func(ctx context.Context) error { features := make([]string, 0) if params.gov != nil { - features = append(features, "governor") + if params.gov.IsFlowCancelEnabled() { + features = append(features, "governor:fc") + } else { + features = append(features, "governor") + } } if params.acct != nil { features = append(features, params.acct.FeatureString()) diff --git a/node/pkg/proto/gossip/v1/gossip.pb.go b/node/pkg/proto/gossip/v1/gossip.pb.go index 96ab3498eb..0fb9ea3d49 100644 --- a/node/pkg/proto/gossip/v1/gossip.pb.go +++ b/node/pkg/proto/gossip/v1/gossip.pb.go @@ -720,11 +720,12 @@ type ChainGovernorConfig struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - NodeName string `protobuf:"bytes,1,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"` - Counter int64 `protobuf:"varint,2,opt,name=counter,proto3" json:"counter,omitempty"` - Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Chains []*ChainGovernorConfig_Chain `protobuf:"bytes,4,rep,name=chains,proto3" json:"chains,omitempty"` - Tokens []*ChainGovernorConfig_Token `protobuf:"bytes,5,rep,name=tokens,proto3" json:"tokens,omitempty"` + NodeName string `protobuf:"bytes,1,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"` + Counter int64 `protobuf:"varint,2,opt,name=counter,proto3" json:"counter,omitempty"` + Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Chains []*ChainGovernorConfig_Chain `protobuf:"bytes,4,rep,name=chains,proto3" json:"chains,omitempty"` + Tokens []*ChainGovernorConfig_Token `protobuf:"bytes,5,rep,name=tokens,proto3" json:"tokens,omitempty"` + FlowCancelEnabled bool `protobuf:"varint,6,opt,name=flow_cancel_enabled,json=flowCancelEnabled,proto3" json:"flow_cancel_enabled,omitempty"` } func (x *ChainGovernorConfig) Reset() { @@ -794,6 +795,13 @@ func (x *ChainGovernorConfig) GetTokens() []*ChainGovernorConfig_Token { return nil } +func (x *ChainGovernorConfig) GetFlowCancelEnabled() bool { + if x != nil { + return x.FlowCancelEnabled + } + return false +} + // This message is published every minute. type SignedChainGovernorStatus struct { state protoimpl.MessageState @@ -1404,9 +1412,12 @@ type ChainGovernorStatus_Chain struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ChainId uint32 `protobuf:"varint,1,opt,name=chain_id,json=chainId,proto3" json:"chain_id,omitempty"` - RemainingAvailableNotional uint64 `protobuf:"varint,2,opt,name=remaining_available_notional,json=remainingAvailableNotional,proto3" json:"remaining_available_notional,omitempty"` - Emitters []*ChainGovernorStatus_Emitter `protobuf:"bytes,3,rep,name=emitters,proto3" json:"emitters,omitempty"` + ChainId uint32 `protobuf:"varint,1,opt,name=chain_id,json=chainId,proto3" json:"chain_id,omitempty"` + RemainingAvailableNotional uint64 `protobuf:"varint,2,opt,name=remaining_available_notional,json=remainingAvailableNotional,proto3" json:"remaining_available_notional,omitempty"` + Emitters []*ChainGovernorStatus_Emitter `protobuf:"bytes,3,rep,name=emitters,proto3" json:"emitters,omitempty"` + SmallTxNetNotionalValue int64 `protobuf:"varint,4,opt,name=small_tx_net_notional_value,json=smallTxNetNotionalValue,proto3" json:"small_tx_net_notional_value,omitempty"` + SmallTxOutgoingNotionalValue uint64 `protobuf:"varint,5,opt,name=small_tx_outgoing_notional_value,json=smallTxOutgoingNotionalValue,proto3" json:"small_tx_outgoing_notional_value,omitempty"` + FlowCancelNotionalValue uint64 `protobuf:"varint,6,opt,name=flow_cancel_notional_value,json=flowCancelNotionalValue,proto3" json:"flow_cancel_notional_value,omitempty"` } func (x *ChainGovernorStatus_Chain) Reset() { @@ -1462,6 +1473,27 @@ func (x *ChainGovernorStatus_Chain) GetEmitters() []*ChainGovernorStatus_Emitter return nil } +func (x *ChainGovernorStatus_Chain) GetSmallTxNetNotionalValue() int64 { + if x != nil { + return x.SmallTxNetNotionalValue + } + return 0 +} + +func (x *ChainGovernorStatus_Chain) GetSmallTxOutgoingNotionalValue() uint64 { + if x != nil { + return x.SmallTxOutgoingNotionalValue + } + return 0 +} + +func (x *ChainGovernorStatus_Chain) GetFlowCancelNotionalValue() uint64 { + if x != nil { + return x.FlowCancelNotionalValue + } + return 0 +} + var File_gossip_v1_gossip_proto protoreflect.FileDescriptor var file_gossip_v1_gossip_proto_rawDesc = []byte{ @@ -1586,8 +1618,8 @@ var file_gossip_v1_gossip_proto_rawDesc = []byte{ 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x0c, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x22, 0xd1, - 0x03, 0x0a, 0x13, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, + 0x52, 0x0c, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x22, 0x81, + 0x04, 0x0a, 0x13, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x18, 0x02, @@ -1601,7 +1633,10 @@ var file_gossip_v1_gossip_proto_rawDesc = []byte{ 0x65, 0x6e, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, - 0x06, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x1a, 0x7b, 0x0a, 0x05, 0x43, 0x68, 0x61, 0x69, 0x6e, + 0x06, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x12, 0x2e, 0x0a, 0x13, 0x66, 0x6c, 0x6f, 0x77, 0x5f, + 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x5f, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x11, 0x66, 0x6c, 0x6f, 0x77, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, + 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x1a, 0x7b, 0x0a, 0x05, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x6e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x02, 0x20, @@ -1623,7 +1658,7 @@ var file_gossip_v1_gossip_proto_rawDesc = []byte{ 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x67, 0x75, - 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x22, 0x98, 0x05, 0x0a, 0x13, 0x43, + 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x22, 0xdb, 0x06, 0x0a, 0x13, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, @@ -1654,7 +1689,7 @@ var file_gossip_v1_gossip_proto_rawDesc = []byte{ 0x32, 0x2a, 0x2e, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x41, 0x41, 0x52, 0x0c, 0x65, 0x6e, - 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x61, 0x61, 0x73, 0x1a, 0xa8, 0x01, 0x0a, 0x05, 0x43, + 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x61, 0x61, 0x73, 0x1a, 0xeb, 0x02, 0x0a, 0x05, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x40, 0x0a, 0x1c, 0x72, 0x65, 0x6d, 0x61, 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x5f, 0x61, 0x76, 0x61, @@ -1665,23 +1700,35 @@ var file_gossip_v1_gossip_proto_rawDesc = []byte{ 0x03, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x45, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x72, 0x52, 0x08, 0x65, 0x6d, 0x69, - 0x74, 0x74, 0x65, 0x72, 0x73, 0x22, 0x57, 0x0a, 0x12, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x51, - 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x71, - 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x0c, 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x22, 0x5a, - 0x0a, 0x13, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x71, - 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, - 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x42, 0x41, 0x5a, 0x3f, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x65, 0x72, 0x74, 0x75, 0x73, 0x6f, - 0x6e, 0x65, 0x2f, 0x77, 0x6f, 0x72, 0x6d, 0x68, 0x6f, 0x6c, 0x65, 0x2f, 0x6e, 0x6f, 0x64, 0x65, - 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x73, 0x73, 0x69, - 0x70, 0x2f, 0x76, 0x31, 0x3b, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x76, 0x31, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x74, 0x65, 0x72, 0x73, 0x12, 0x3c, 0x0a, 0x1b, 0x73, 0x6d, 0x61, 0x6c, 0x6c, 0x5f, 0x74, + 0x78, 0x5f, 0x6e, 0x65, 0x74, 0x5f, 0x6e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x5f, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x17, 0x73, 0x6d, 0x61, 0x6c, + 0x6c, 0x54, 0x78, 0x4e, 0x65, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x56, 0x61, + 0x6c, 0x75, 0x65, 0x12, 0x46, 0x0a, 0x20, 0x73, 0x6d, 0x61, 0x6c, 0x6c, 0x5f, 0x74, 0x78, 0x5f, + 0x6f, 0x75, 0x74, 0x67, 0x6f, 0x69, 0x6e, 0x67, 0x5f, 0x6e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, + 0x6c, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x1c, 0x73, + 0x6d, 0x61, 0x6c, 0x6c, 0x54, 0x78, 0x4f, 0x75, 0x74, 0x67, 0x6f, 0x69, 0x6e, 0x67, 0x4e, 0x6f, + 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x3b, 0x0a, 0x1a, 0x66, + 0x6c, 0x6f, 0x77, 0x5f, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x5f, 0x6e, 0x6f, 0x74, 0x69, 0x6f, + 0x6e, 0x61, 0x6c, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x17, 0x66, 0x6c, 0x6f, 0x77, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x4e, 0x6f, 0x74, 0x69, 0x6f, + 0x6e, 0x61, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x57, 0x0a, 0x12, 0x53, 0x69, 0x67, 0x6e, + 0x65, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x23, + 0x0a, 0x0d, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, + 0x65, 0x22, 0x5a, 0x0a, 0x13, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x71, 0x75, 0x65, 0x72, + 0x79, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x0d, 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x42, 0x41, 0x5a, + 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x65, 0x72, 0x74, + 0x75, 0x73, 0x6f, 0x6e, 0x65, 0x2f, 0x77, 0x6f, 0x72, 0x6d, 0x68, 0x6f, 0x6c, 0x65, 0x2f, 0x6e, + 0x6f, 0x64, 0x65, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, + 0x73, 0x73, 0x69, 0x70, 0x2f, 0x76, 0x31, 0x3b, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x76, 0x31, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/node/pkg/publicrpc/publicrpcserver_test.go b/node/pkg/publicrpc/publicrpcserver_test.go index e9b614d262..a6b9dad416 100644 --- a/node/pkg/publicrpc/publicrpcserver_test.go +++ b/node/pkg/publicrpc/publicrpcserver_test.go @@ -69,7 +69,7 @@ func TestGetSignedVAABadAddress(t *testing.T) { func TestGovernorIsVAAEnqueuedNoMessage(t *testing.T) { ctx := context.Background() logger, _ := zap.NewProduction() - gov := governor.NewChainGovernor(logger, nil, common.GoTest) + gov := governor.NewChainGovernor(logger, nil, common.GoTest, false) server := &PublicrpcServer{logger: logger, gov: gov} // A message without the messageId set should not panic but return an error instead. diff --git a/proto/gossip/v1/gossip.proto b/proto/gossip/v1/gossip.proto index 914d707f01..05564fedfb 100644 --- a/proto/gossip/v1/gossip.proto +++ b/proto/gossip/v1/gossip.proto @@ -156,6 +156,7 @@ message ChainGovernorConfig { int64 timestamp = 3; repeated Chain chains = 4; repeated Token tokens = 5; + bool flow_cancel_enabled = 6; } // This message is published every minute. @@ -188,6 +189,9 @@ message ChainGovernorStatus { uint32 chain_id = 1; uint64 remaining_available_notional = 2; repeated Emitter emitters = 3; + int64 small_tx_net_notional_value = 4; + uint64 small_tx_outgoing_notional_value = 5; + uint64 flow_cancel_notional_value = 6; } string node_name = 1;