Skip to content

Commit 0ba50b2

Browse files
committed
Improve release frequency for signing keys by checking tx result status
1 parent fc9e0e6 commit 0ba50b2

File tree

7 files changed

+355
-36
lines changed

7 files changed

+355
-36
lines changed

bootstrap/bootstrap.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
243243
}
244244
}
245245

246-
b.keystore = keystore.New(accountKeys)
246+
b.keystore = keystore.New(ctx, accountKeys, b.client, b.logger)
247247

248248
// create transaction pool
249249
var txPool requester.TxPool

services/ingestion/event_subscriber.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
176176
for _, evt := range blockEvents.Events {
177177
r.keyLock.NotifyTransaction(evt.TransactionID)
178178
}
179-
r.keyLock.NotifyBlock(blockEvents.Height)
179+
r.keyLock.NotifyBlock(blockEvents.BlockID)
180180

181181
eventsChan <- evmEvents
182182

services/ingestion/event_subscriber_test.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,13 @@ func Test_Subscribing(t *testing.T) {
4444
)
4545
require.NoError(t, err)
4646

47-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
47+
ctx := context.Background()
48+
logger := zerolog.Nop()
49+
ks := keystore.New(ctx, nil, client, logger)
4850

49-
events := subscriber.Subscribe(context.Background())
51+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
52+
53+
events := subscriber.Subscribe(ctx)
5054

5155
var prevHeight uint64
5256

@@ -84,9 +88,13 @@ func Test_MissingBlockEvent(t *testing.T) {
8488
)
8589
require.NoError(t, err)
8690

87-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
91+
ctx := context.Background()
92+
logger := zerolog.Nop()
93+
ks := keystore.New(ctx, nil, client, logger)
94+
95+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
8896

89-
events := subscriber.Subscribe(context.Background())
97+
events := subscriber.Subscribe(ctx)
9098

9199
missingHashes := make([]gethCommon.Hash, 0)
92100

@@ -186,9 +194,13 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
186194
)
187195
require.NoError(t, err)
188196

189-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
197+
ctx := context.Background()
198+
logger := zerolog.Nop()
199+
ks := keystore.New(ctx, nil, client, logger)
200+
201+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
190202

191-
events := subscriber.Subscribe(context.Background())
203+
events := subscriber.Subscribe(ctx)
192204

193205
var prevHeight uint64
194206

@@ -249,9 +261,13 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) {
249261
)
250262
require.NoError(t, err)
251263

252-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
264+
ctx := context.Background()
265+
logger := zerolog.Nop()
266+
ks := keystore.New(ctx, nil, client, logger)
253267

254-
events := subscriber.Subscribe(context.Background())
268+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
269+
270+
events := subscriber.Subscribe(ctx)
255271

256272
var prevHeight uint64
257273

@@ -311,9 +327,13 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) {
311327
)
312328
require.NoError(t, err)
313329

314-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
330+
ctx := context.Background()
331+
logger := zerolog.Nop()
332+
ks := keystore.New(ctx, nil, client, logger)
333+
334+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
315335

316-
events := subscriber.Subscribe(context.Background())
336+
events := subscriber.Subscribe(ctx)
317337

318338
var prevHeight uint64
319339

services/requester/keystore/key_store.go

Lines changed: 115 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package keystore
22

33
import (
4+
"context"
45
"fmt"
56
"sync"
67

78
flowsdk "github.com/onflow/flow-go-sdk"
9+
"github.com/onflow/flow-go-sdk/access"
810
"github.com/onflow/flow-go/model/flow"
11+
"github.com/rs/zerolog"
912
)
1013

1114
var ErrNoKeysAvailable = fmt.Errorf("no signing keys available")
@@ -14,30 +17,57 @@ const accountKeyBlockExpiration = flow.DefaultTransactionExpiry
1417

1518
type KeyLock interface {
1619
NotifyTransaction(txID flowsdk.Identifier)
17-
NotifyBlock(blockHeight uint64)
20+
NotifyBlock(blockID flowsdk.Identifier)
1821
}
1922

2023
type KeyStore struct {
24+
client access.Client
2125
availableKeys chan *AccountKey
2226
usedKeys map[flowsdk.Identifier]*AccountKey
2327
size int
2428
keyMu sync.Mutex
29+
blockChan chan flowsdk.Identifier
30+
logger zerolog.Logger
31+
32+
// Signal channel used to prevent blocking writes
33+
// on `blockChan` when the node is shutting down.
34+
done chan struct{}
2535
}
2636

2737
var _ KeyLock = (*KeyStore)(nil)
2838

29-
func New(keys []*AccountKey) *KeyStore {
39+
func New(
40+
ctx context.Context,
41+
keys []*AccountKey,
42+
client access.Client,
43+
logger zerolog.Logger,
44+
) *KeyStore {
45+
totalKeys := len(keys)
46+
3047
ks := &KeyStore{
31-
usedKeys: map[flowsdk.Identifier]*AccountKey{},
48+
client: client,
49+
availableKeys: make(chan *AccountKey, totalKeys),
50+
usedKeys: map[flowsdk.Identifier]*AccountKey{},
51+
size: totalKeys,
52+
// `KeyStore.NotifyBlock` is called for each new Flow block,
53+
// so we use a buffered channel to write the new block IDs
54+
// to the `blockChan`, and read them through `processLockedKeys`.
55+
blockChan: make(chan flowsdk.Identifier, 100),
56+
logger: logger,
57+
done: make(chan struct{}),
3258
}
3359

34-
availableKeys := make(chan *AccountKey, len(keys))
3560
for _, key := range keys {
3661
key.ks = ks
37-
availableKeys <- key
62+
ks.availableKeys <- key
63+
}
64+
65+
// For cases where the EVM Gateway is run in an index-mode,
66+
// there is no need to release any keys, since transaction
67+
// submission is not allowed.
68+
if ks.size > 0 {
69+
go ks.processLockedKeys(ctx)
3870
}
39-
ks.size = len(keys)
40-
ks.availableKeys = availableKeys
4171

4272
return ks
4373
}
@@ -69,16 +99,17 @@ func (k *KeyStore) NotifyTransaction(txID flowsdk.Identifier) {
6999
k.unsafeUnlockKey(txID)
70100
}
71101

72-
// NotifyBlock is called to notify the KeyStore of a new ingested block height.
102+
// NotifyBlock is called to notify the KeyStore of a newly ingested block.
73103
// Pending transactions older than a threshold number of blocks are removed.
74-
func (k *KeyStore) NotifyBlock(blockHeight uint64) {
75-
k.keyMu.Lock()
76-
defer k.keyMu.Unlock()
77-
78-
for txID, key := range k.usedKeys {
79-
if blockHeight-key.lastLockedBlock.Load() >= accountKeyBlockExpiration {
80-
k.unsafeUnlockKey(txID)
81-
}
104+
func (k *KeyStore) NotifyBlock(blockID flowsdk.Identifier) {
105+
select {
106+
case <-k.done:
107+
k.logger.Warn().Msg(
108+
"received `NotifyBlock` while the server is shutting down",
109+
)
110+
return
111+
default:
112+
k.blockChan <- blockID
82113
}
83114
}
84115

@@ -106,3 +137,71 @@ func (k *KeyStore) setLockMetadata(
106137
defer k.keyMu.Unlock()
107138
k.usedKeys[txID] = key
108139
}
140+
141+
// processLockedKeys reads from the `blockChan` channel, and for each new
142+
// Flow block, it fetches the transaction results of the given block and
143+
// releases the account keys associated with those transactions.
144+
func (k *KeyStore) processLockedKeys(ctx context.Context) {
145+
for {
146+
select {
147+
case <-ctx.Done():
148+
close(k.done)
149+
return
150+
case blockID := <-k.blockChan:
151+
// TODO: If calling `k.client.GetTransactionResultsByBlockID` for each
152+
// new block, seems to be problematic for ANs, we can take an approach
153+
// like the one below:
154+
// If the available keys ratio is >= 60% of the total signing keys,
155+
// then we can skip checking the transaction result statuses.
156+
// The signing keys from invalid EVM transactions, will eventually
157+
// come again into availability, after `accountKeyBlockExpiration`
158+
// blocks have passed.
159+
// availablekeysRatio := float64(k.AvailableKeys()) / float64(k.size)
160+
// if availablekeysRatio >= 0.6 {
161+
// continue
162+
// }
163+
164+
// Optimization to avoid AN calls when no signing keys have
165+
// been used. For example, when back-filling the EVM GW state,
166+
// we don't care about releasing signing keys.
167+
if k.AvailableKeys() == k.size {
168+
continue
169+
}
170+
171+
txResults, err := k.client.GetTransactionResultsByBlockID(ctx, blockID)
172+
if err != nil {
173+
k.logger.Error().Err(err).Msgf(
174+
"failed to get transaction results for block ID: %s",
175+
blockID.Hex(),
176+
)
177+
continue
178+
}
179+
180+
k.releasekeys(txResults)
181+
}
182+
}
183+
}
184+
185+
// releasekeys accepts a slice of `TransactionResult` objects and
186+
// releases the account keys used for signing the given transactions.
187+
// It also releases the account keys which were last locked more than
188+
// or equal to `accountKeyBlockExpiration` blocks in the past.
189+
func (k *KeyStore) releasekeys(txResults []*flowsdk.TransactionResult) {
190+
if len(txResults) == 0 {
191+
return
192+
}
193+
194+
k.keyMu.Lock()
195+
defer k.keyMu.Unlock()
196+
197+
for _, txResult := range txResults {
198+
k.unsafeUnlockKey(txResult.TransactionID)
199+
}
200+
201+
blockHeight := txResults[0].BlockHeight
202+
for txID, key := range k.usedKeys {
203+
if blockHeight-key.lastLockedBlock.Load() >= accountKeyBlockExpiration {
204+
k.unsafeUnlockKey(txID)
205+
}
206+
}
207+
}

0 commit comments

Comments
 (0)