Skip to content

Commit ed19e93

Browse files
authored
Merge pull request #842 from onflow/mpeter/improve-signing-keys-release-frequency
Improve release frequency for signing keys
2 parents 3fdd51c + 28a04f2 commit ed19e93

File tree

9 files changed

+439
-35
lines changed

9 files changed

+439
-35
lines changed

bootstrap/bootstrap.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
243243
}
244244
}
245245

246-
b.keystore = keystore.New(accountKeys)
246+
b.keystore = keystore.New(ctx, accountKeys, b.client, b.config, b.logger)
247247

248248
// create transaction pool
249249
var txPool requester.TxPool

cmd/run/cmd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ func init() {
282282
Cmd.Flags().StringVar(&cloudKMSLocationID, "coa-cloud-kms-location-id", "", "The location ID where the key ring is grouped into, e.g. 'global'")
283283
Cmd.Flags().StringVar(&cloudKMSKeyRingID, "coa-cloud-kms-key-ring-id", "", "The key ring ID where the KMS keys exist, e.g. 'tx-signing'")
284284
Cmd.Flags().StringVar(&cloudKMSKey, "coa-cloud-kms-key", "", `Name of the KMS key and its version, e.g. "gw-key-6@1"`)
285+
Cmd.Flags().BoolVar(&cfg.COATxLookupEnabled, "coa-tx-lookup-enabled", false, "Tracks cadence transactions to release COA signing keys more quickly. Use this on nodes with high tx volume that frequently run out of proposer keys.")
285286
Cmd.Flags().StringVar(&walletKey, "wallet-api-key", "", "ECDSA private key used for wallet APIs. WARNING: This should only be used locally or for testing, never in production.")
286287
Cmd.Flags().IntVar(&cfg.MetricsPort, "metrics-port", 9091, "Port for the metrics server")
287288
Cmd.Flags().BoolVar(&cfg.IndexOnly, "index-only", false, "Run the gateway in index-only mode which only allows querying the state and indexing, but disallows sending transactions.")

config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ type Config struct {
5656
COAKey crypto.PrivateKey
5757
// COACloudKMSKey is a Cloud KMS key that will be used for signing transactions.
5858
COACloudKMSKey *flowGoKMS.Key
59+
// COATxLookupEnabled enables tracking of Cadence transactions to release COA signing
60+
// keys much faster. Increases capacity of the available COA signing keys for nodes
61+
// with high tx volume.
62+
COATxLookupEnabled bool
5963
// GasPrice is a fixed gas price that will be used when submitting transactions.
6064
GasPrice *big.Int
6165
// EnforceGasPrice defines whether the minimum gas price should be enforced.

services/ingestion/event_subscriber.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,12 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
176176
for _, evt := range blockEvents.Events {
177177
r.keyLock.NotifyTransaction(evt.TransactionID)
178178
}
179-
r.keyLock.NotifyBlock(blockEvents.Height)
179+
r.keyLock.NotifyBlock(
180+
flow.BlockHeader{
181+
ID: blockEvents.BlockID,
182+
Height: blockEvents.Height,
183+
},
184+
)
180185

181186
eventsChan <- evmEvents
182187

services/ingestion/event_subscriber_test.go

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/onflow/flow-go-sdk/access"
1111
gethCommon "github.com/onflow/go-ethereum/common"
1212

13+
"github.com/onflow/flow-evm-gateway/config"
1314
"github.com/onflow/flow-evm-gateway/models"
1415
errs "github.com/onflow/flow-evm-gateway/models/errors"
1516
"github.com/onflow/flow-evm-gateway/services/requester"
@@ -44,9 +45,14 @@ func Test_Subscribing(t *testing.T) {
4445
)
4546
require.NoError(t, err)
4647

47-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
48+
ctx := context.Background()
49+
logger := zerolog.Nop()
50+
cfg := config.Config{COATxLookupEnabled: true}
51+
ks := keystore.New(ctx, nil, client, cfg, logger)
4852

49-
events := subscriber.Subscribe(context.Background())
53+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
54+
55+
events := subscriber.Subscribe(ctx)
5056

5157
var prevHeight uint64
5258

@@ -84,9 +90,14 @@ func Test_MissingBlockEvent(t *testing.T) {
8490
)
8591
require.NoError(t, err)
8692

87-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
93+
ctx := context.Background()
94+
logger := zerolog.Nop()
95+
cfg := config.Config{COATxLookupEnabled: true}
96+
ks := keystore.New(ctx, nil, client, cfg, logger)
97+
98+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
8899

89-
events := subscriber.Subscribe(context.Background())
100+
events := subscriber.Subscribe(ctx)
90101

91102
missingHashes := make([]gethCommon.Hash, 0)
92103

@@ -186,9 +197,14 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
186197
)
187198
require.NoError(t, err)
188199

189-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
200+
ctx := context.Background()
201+
logger := zerolog.Nop()
202+
cfg := config.Config{COATxLookupEnabled: true}
203+
ks := keystore.New(ctx, nil, client, cfg, logger)
204+
205+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
190206

191-
events := subscriber.Subscribe(context.Background())
207+
events := subscriber.Subscribe(ctx)
192208

193209
var prevHeight uint64
194210

@@ -249,9 +265,14 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) {
249265
)
250266
require.NoError(t, err)
251267

252-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
268+
ctx := context.Background()
269+
logger := zerolog.Nop()
270+
cfg := config.Config{COATxLookupEnabled: true}
271+
ks := keystore.New(ctx, nil, client, cfg, logger)
253272

254-
events := subscriber.Subscribe(context.Background())
273+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
274+
275+
events := subscriber.Subscribe(ctx)
255276

256277
var prevHeight uint64
257278

@@ -311,9 +332,14 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) {
311332
)
312333
require.NoError(t, err)
313334

314-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
335+
ctx := context.Background()
336+
logger := zerolog.Nop()
337+
cfg := config.Config{COATxLookupEnabled: true}
338+
ks := keystore.New(ctx, nil, client, cfg, logger)
339+
340+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
315341

316-
events := subscriber.Subscribe(context.Background())
342+
events := subscriber.Subscribe(ctx)
317343

318344
var prevHeight uint64
319345

services/requester/keystore/key_store.go

Lines changed: 152 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,92 @@
11
package keystore
22

33
import (
4+
"context"
45
"fmt"
56
"sync"
67

8+
"github.com/onflow/flow-evm-gateway/config"
79
flowsdk "github.com/onflow/flow-go-sdk"
10+
"github.com/onflow/flow-go-sdk/access"
811
"github.com/onflow/flow-go/model/flow"
12+
"github.com/rs/zerolog"
13+
"google.golang.org/grpc/codes"
14+
"google.golang.org/grpc/status"
915
)
1016

1117
var ErrNoKeysAvailable = fmt.Errorf("no signing keys available")
1218

1319
const accountKeyBlockExpiration = flow.DefaultTransactionExpiry
1420

1521
type KeyLock interface {
22+
// This method is intended for the happy path of valid EVM transactions.
23+
// The event subscriber module only subscribes to EVM-related events:
24+
// - `EVM.TransactionExecuted`
25+
// - `EVM.BlockExecuted`
26+
//
27+
// Valid EVM transactions do emit `EVM.TransactionExecuted` events, so we
28+
// release the account key that was used by the Flow tx which emitted
29+
// the above EVM event.
1630
NotifyTransaction(txID flowsdk.Identifier)
17-
NotifyBlock(blockHeight uint64)
31+
// This method is intended for the unhappy path of invalid EVM transactions.
32+
// For each new Flow block, we check the result status of all included Flow
33+
// transactions, and we release the account keys which they used. This also
34+
// handles the release of expired transactions, that weren't even included
35+
// in a Flow block.
36+
NotifyBlock(blockHeader flowsdk.BlockHeader)
1837
}
1938

2039
type KeyStore struct {
40+
client access.Client
41+
config config.Config
2142
availableKeys chan *AccountKey
2243
usedKeys map[flowsdk.Identifier]*AccountKey
2344
size int
2445
keyMu sync.Mutex
46+
blockChan chan flowsdk.BlockHeader
47+
logger zerolog.Logger
48+
49+
// Signal channel used to prevent blocking writes
50+
// on `blockChan` when the node is shutting down.
51+
done chan struct{}
2552
}
2653

2754
var _ KeyLock = (*KeyStore)(nil)
2855

29-
func New(keys []*AccountKey) *KeyStore {
56+
func New(
57+
ctx context.Context,
58+
keys []*AccountKey,
59+
client access.Client,
60+
config config.Config,
61+
logger zerolog.Logger,
62+
) *KeyStore {
63+
totalKeys := len(keys)
64+
3065
ks := &KeyStore{
31-
usedKeys: map[flowsdk.Identifier]*AccountKey{},
66+
client: client,
67+
config: config,
68+
availableKeys: make(chan *AccountKey, totalKeys),
69+
usedKeys: map[flowsdk.Identifier]*AccountKey{},
70+
size: totalKeys,
71+
// `KeyStore.NotifyBlock` is called for each new Flow block,
72+
// so we use a buffered channel to write the new block headers
73+
// to the `blockChan`, and read them through `processLockedKeys`.
74+
blockChan: make(chan flowsdk.BlockHeader, 200),
75+
logger: logger,
76+
done: make(chan struct{}),
3277
}
3378

34-
availableKeys := make(chan *AccountKey, len(keys))
3579
for _, key := range keys {
3680
key.ks = ks
37-
availableKeys <- key
81+
ks.availableKeys <- key
82+
}
83+
84+
// For cases where the EVM Gateway is run in an index-mode,
85+
// there is no need to release any keys, since transaction
86+
// submission is not allowed.
87+
if !ks.config.IndexOnly {
88+
go ks.processLockedKeys(ctx)
3889
}
39-
ks.size = len(keys)
40-
ks.availableKeys = availableKeys
4190

4291
return ks
4392
}
@@ -47,6 +96,11 @@ func (k *KeyStore) AvailableKeys() int {
4796
return len(k.availableKeys)
4897
}
4998

99+
// HasKeysInUse returns whether any of the keys are currently being used.
100+
func (k *KeyStore) HasKeysInUse() bool {
101+
return k.AvailableKeys() != k.size
102+
}
103+
50104
// Take reserves a key for use in a transaction.
51105
func (k *KeyStore) Take() (*AccountKey, error) {
52106
select {
@@ -63,22 +117,50 @@ func (k *KeyStore) Take() (*AccountKey, error) {
63117

64118
// NotifyTransaction unlocks a key after use and puts it back into the pool.
65119
func (k *KeyStore) NotifyTransaction(txID flowsdk.Identifier) {
120+
// For cases where the EVM Gateway is run in an index-mode,
121+
// there is no need to release any keys, since transaction
122+
// submission is not allowed. We return early here, to avoid
123+
// any unnecessary steps such as lock acquisition and unlocking
124+
// keys.
125+
if k.config.IndexOnly {
126+
return
127+
}
128+
66129
k.keyMu.Lock()
67130
defer k.keyMu.Unlock()
68131

69132
k.unsafeUnlockKey(txID)
70133
}
71134

72-
// NotifyBlock is called to notify the KeyStore of a new ingested block height.
135+
// NotifyBlock is called to notify the KeyStore of a newly ingested block.
73136
// Pending transactions older than a threshold number of blocks are removed.
74-
func (k *KeyStore) NotifyBlock(blockHeight uint64) {
75-
k.keyMu.Lock()
76-
defer k.keyMu.Unlock()
137+
func (k *KeyStore) NotifyBlock(blockHeader flowsdk.BlockHeader) {
138+
// For cases where the EVM Gateway is run in an index-mode,
139+
// there is no need to release any keys, since transaction
140+
// submission is not allowed. We return early here, to avoid
141+
// blocking forever on writes to `k.blockChan`, because the
142+
// `k.processLockedKeys()` function won't perform any reads
143+
// from `k.blockChan`.
144+
if k.config.IndexOnly {
145+
return
146+
}
77147

78-
for txID, key := range k.usedKeys {
79-
if blockHeight-key.lastLockedBlock.Load() >= accountKeyBlockExpiration {
80-
k.unsafeUnlockKey(txID)
81-
}
148+
select {
149+
case <-k.done:
150+
k.logger.Warn().Msg(
151+
"received `NotifyBlock` while the server is shutting down",
152+
)
153+
case k.blockChan <- blockHeader:
154+
k.logger.Info().Msgf(
155+
"received `NotifyBlock` for block with ID: %s",
156+
blockHeader.ID,
157+
)
158+
default:
159+
// In this case, we only release the account keys which were last
160+
// locked more than or equal to `accountKeyBlockExpiration` blocks
161+
// in the past, in order to avoid slowing down the EVM event
162+
// ingestion engine.
163+
k.releasekeys(blockHeader.Height, nil)
82164
}
83165
}
84166

@@ -106,3 +188,58 @@ func (k *KeyStore) setLockMetadata(
106188
defer k.keyMu.Unlock()
107189
k.usedKeys[txID] = key
108190
}
191+
192+
// processLockedKeys reads from the `blockChan` channel, and for each new
193+
// Flow block, it fetches the transaction results of the given block and
194+
// releases the account keys associated with those transactions.
195+
func (k *KeyStore) processLockedKeys(ctx context.Context) {
196+
for {
197+
select {
198+
case <-ctx.Done():
199+
close(k.done)
200+
return
201+
case blockHeader := <-k.blockChan:
202+
// Optimization to avoid AN calls when no signing keys have
203+
// been used. For example, when back-filling the EVM GW state,
204+
// we don't care about releasing signing keys.
205+
if !k.HasKeysInUse() {
206+
continue
207+
}
208+
209+
var txResults []*flowsdk.TransactionResult
210+
var err error
211+
if k.config.COATxLookupEnabled {
212+
txResults, err = k.client.GetTransactionResultsByBlockID(ctx, blockHeader.ID)
213+
if err != nil && status.Code(err) != codes.Canceled {
214+
k.logger.Error().Err(err).Msgf(
215+
"failed to get transaction results for block ID: %s",
216+
blockHeader.ID.Hex(),
217+
)
218+
continue
219+
}
220+
}
221+
222+
k.releasekeys(blockHeader.Height, txResults)
223+
}
224+
}
225+
}
226+
227+
// releasekeys accepts a block height and a slice of `TransactionResult`
228+
// objects and releases the account keys used for signing the given
229+
// transactions.
230+
// It also releases the account keys which were last locked more than
231+
// or equal to `accountKeyBlockExpiration` blocks in the past.
232+
func (k *KeyStore) releasekeys(blockHeight uint64, txResults []*flowsdk.TransactionResult) {
233+
k.keyMu.Lock()
234+
defer k.keyMu.Unlock()
235+
236+
for _, txResult := range txResults {
237+
k.unsafeUnlockKey(txResult.TransactionID)
238+
}
239+
240+
for txID, key := range k.usedKeys {
241+
if blockHeight-key.lastLockedBlock.Load() >= accountKeyBlockExpiration {
242+
k.unsafeUnlockKey(txID)
243+
}
244+
}
245+
}

0 commit comments

Comments
 (0)