Skip to content

Commit fb95c78

Browse files
committed
Improve release frequency for signing keys by checking tx result status
1 parent f540b99 commit fb95c78

File tree

7 files changed

+350
-31
lines changed

7 files changed

+350
-31
lines changed

bootstrap/bootstrap.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
243243
}
244244
}
245245

246-
b.keystore = keystore.New(accountKeys)
246+
b.keystore = keystore.New(ctx, accountKeys, b.client, b.logger)
247247

248248
// create transaction pool
249249
var txPool requester.TxPool

services/ingestion/event_subscriber.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
176176
for _, evt := range blockEvents.Events {
177177
r.keyLock.NotifyTransaction(evt.TransactionID)
178178
}
179-
r.keyLock.NotifyBlock(blockEvents.Height)
179+
r.keyLock.NotifyBlock(blockEvents.BlockID)
180180

181181
eventsChan <- evmEvents
182182

services/ingestion/event_subscriber_test.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,13 @@ func Test_Subscribing(t *testing.T) {
4444
)
4545
require.NoError(t, err)
4646

47-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
47+
ctx := context.Background()
48+
logger := zerolog.Nop()
49+
ks := keystore.New(ctx, nil, client, logger)
4850

49-
events := subscriber.Subscribe(context.Background())
51+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
52+
53+
events := subscriber.Subscribe(ctx)
5054

5155
var prevHeight uint64
5256

@@ -84,9 +88,13 @@ func Test_MissingBlockEvent(t *testing.T) {
8488
)
8589
require.NoError(t, err)
8690

87-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
91+
ctx := context.Background()
92+
logger := zerolog.Nop()
93+
ks := keystore.New(ctx, nil, client, logger)
94+
95+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
8896

89-
events := subscriber.Subscribe(context.Background())
97+
events := subscriber.Subscribe(ctx)
9098

9199
missingHashes := make([]gethCommon.Hash, 0)
92100

@@ -186,9 +194,13 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
186194
)
187195
require.NoError(t, err)
188196

189-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
197+
ctx := context.Background()
198+
logger := zerolog.Nop()
199+
ks := keystore.New(ctx, nil, client, logger)
200+
201+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
190202

191-
events := subscriber.Subscribe(context.Background())
203+
events := subscriber.Subscribe(ctx)
192204

193205
var prevHeight uint64
194206

@@ -249,9 +261,13 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) {
249261
)
250262
require.NoError(t, err)
251263

252-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
264+
ctx := context.Background()
265+
logger := zerolog.Nop()
266+
ks := keystore.New(ctx, nil, client, logger)
253267

254-
events := subscriber.Subscribe(context.Background())
268+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
269+
270+
events := subscriber.Subscribe(ctx)
255271

256272
var prevHeight uint64
257273

@@ -311,9 +327,13 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) {
311327
)
312328
require.NoError(t, err)
313329

314-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
330+
ctx := context.Background()
331+
logger := zerolog.Nop()
332+
ks := keystore.New(ctx, nil, client, logger)
333+
334+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
315335

316-
events := subscriber.Subscribe(context.Background())
336+
events := subscriber.Subscribe(ctx)
317337

318338
var prevHeight uint64
319339

services/requester/keystore/key_store.go

Lines changed: 109 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package keystore
22

33
import (
4+
"context"
45
"fmt"
56
"sync"
67

78
flowsdk "github.com/onflow/flow-go-sdk"
9+
"github.com/onflow/flow-go-sdk/access"
810
"github.com/onflow/flow-go/model/flow"
11+
"github.com/rs/zerolog"
912
)
1013

1114
var ErrNoKeysAvailable = fmt.Errorf("no signing keys available")
@@ -14,21 +17,40 @@ const accountKeyBlockExpiration = flow.DefaultTransactionExpiry
1417

1518
type KeyLock interface {
1619
NotifyTransaction(txID flowsdk.Identifier)
17-
NotifyBlock(blockHeight uint64)
20+
NotifyBlock(blockID flowsdk.Identifier)
1821
}
1922

2023
type KeyStore struct {
24+
client access.Client
2125
availableKeys chan *AccountKey
2226
usedKeys map[flowsdk.Identifier]*AccountKey
2327
size int
2428
keyMu sync.Mutex
29+
blockChan chan flowsdk.Identifier
30+
logger zerolog.Logger
31+
32+
// Signal channel used to prevent blocking writes
33+
// on `blockChan` when the node is shutting down.
34+
done chan struct{}
2535
}
2636

2737
var _ KeyLock = (*KeyStore)(nil)
2838

29-
func New(keys []*AccountKey) *KeyStore {
39+
func New(
40+
ctx context.Context,
41+
keys []*AccountKey,
42+
client access.Client,
43+
logger zerolog.Logger,
44+
) *KeyStore {
3045
ks := &KeyStore{
46+
client: client,
3147
usedKeys: map[flowsdk.Identifier]*AccountKey{},
48+
// `KeyStore.NotifyBlock` is called for each new Flow block,
49+
// so we use a buffered channel to write the new block IDs
50+
// to the `blockChan`, and read them through `processLockedKeys`.
51+
blockChan: make(chan flowsdk.Identifier, 100),
52+
logger: logger,
53+
done: make(chan struct{}),
3254
}
3355

3456
availableKeys := make(chan *AccountKey, len(keys))
@@ -39,6 +61,13 @@ func New(keys []*AccountKey) *KeyStore {
3961
ks.size = len(keys)
4062
ks.availableKeys = availableKeys
4163

64+
// For cases where the EVM Gateway is run in an index-mode,
65+
// there is no need to release any keys, since transaction
66+
// submission is not allowed.
67+
if ks.size > 0 {
68+
go ks.processLockedKeys(ctx)
69+
}
70+
4271
return ks
4372
}
4473

@@ -69,16 +98,17 @@ func (k *KeyStore) NotifyTransaction(txID flowsdk.Identifier) {
6998
k.unsafeUnlockKey(txID)
7099
}
71100

72-
// NotifyBlock is called to notify the KeyStore of a new ingested block height.
101+
// NotifyBlock is called to notify the KeyStore of a newly ingested block.
73102
// Pending transactions older than a threshold number of blocks are removed.
74-
func (k *KeyStore) NotifyBlock(blockHeight uint64) {
75-
k.keyMu.Lock()
76-
defer k.keyMu.Unlock()
77-
78-
for txID, key := range k.usedKeys {
79-
if blockHeight-key.lastLockedBlock.Load() >= accountKeyBlockExpiration {
80-
k.unsafeUnlockKey(txID)
81-
}
103+
func (k *KeyStore) NotifyBlock(blockID flowsdk.Identifier) {
104+
select {
105+
case <-k.done:
106+
k.logger.Warn().Msg(
107+
"received `NotifyBlock` while the server is shutting down",
108+
)
109+
return
110+
default:
111+
k.blockChan <- blockID
82112
}
83113
}
84114

@@ -106,3 +136,71 @@ func (k *KeyStore) setLockMetadata(
106136
defer k.keyMu.Unlock()
107137
k.usedKeys[txID] = key
108138
}
139+
140+
// processLockedKeys reads from the `blockChan` channel, and for each new
141+
// Flow block, it fetches the transaction results of the given block and
142+
// releases the account keys associated with those transactions.
143+
func (k *KeyStore) processLockedKeys(ctx context.Context) {
144+
for {
145+
select {
146+
case <-ctx.Done():
147+
close(k.done)
148+
return
149+
case blockID := <-k.blockChan:
150+
// TODO: If calling `k.client.GetTransactionResultsByBlockID` for each
151+
// new block, seems to be problematic for ANs, we can take an approach
152+
// like the one below:
153+
// If the available keys ratio is >= 60% of the total signing keys,
154+
// then we can skip checking the transaction result statuses.
155+
// The signing keys from invalid EVM transactions, will eventually
156+
// come again into availability, after `accountKeyBlockExpiration`
157+
// blocks have passed.
158+
// availablekeysRatio := float64(k.AvailableKeys()) / float64(k.size)
159+
// if availablekeysRatio >= 0.6 {
160+
// continue
161+
// }
162+
163+
// Optimization to avoid AN calls when no signing keys have
164+
// been used. For example, when back-filling the EVM GW state,
165+
// we don't care about releasing signing keys.
166+
if k.AvailableKeys() == k.size {
167+
continue
168+
}
169+
170+
txResults, err := k.client.GetTransactionResultsByBlockID(ctx, blockID)
171+
if err != nil {
172+
k.logger.Error().Err(err).Msgf(
173+
"failed to get transaction results for block ID: %s",
174+
blockID.Hex(),
175+
)
176+
continue
177+
}
178+
179+
k.releasekeys(txResults)
180+
}
181+
}
182+
}
183+
184+
// releasekeys accepts a slice of `TransactionResult` objects and
185+
// releases the account keys used for signing the given transactions.
186+
// It also releases the account keys which were last locked more than
187+
// or equal to `accountKeyBlockExpiration` blocks in the past.
188+
func (k *KeyStore) releasekeys(txResults []*flowsdk.TransactionResult) {
189+
if len(txResults) == 0 {
190+
return
191+
}
192+
193+
k.keyMu.Lock()
194+
defer k.keyMu.Unlock()
195+
196+
for _, txResult := range txResults {
197+
k.unsafeUnlockKey(txResult.TransactionID)
198+
}
199+
200+
blockHeight := txResults[0].BlockHeight
201+
for txID, key := range k.usedKeys {
202+
if blockHeight-key.lastLockedBlock.Load() >= accountKeyBlockExpiration {
203+
k.unsafeUnlockKey(txID)
204+
}
205+
}
206+
}

services/requester/keystore/key_store_test.go

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
package keystore
22

33
import (
4+
"context"
45
"errors"
56
"sync"
67
"testing"
78
"time"
89

10+
"github.com/onflow/flow-evm-gateway/services/testutils"
911
sdk "github.com/onflow/flow-go-sdk"
12+
"github.com/onflow/flow-go-sdk/access/mocks"
1013
"github.com/onflow/flow-go-sdk/test"
1114
"github.com/onflow/flow-go/utils/unittest"
15+
"github.com/rs/zerolog"
1216
"golang.org/x/sync/errgroup"
1317

1418
"github.com/stretchr/testify/assert"
@@ -25,7 +29,7 @@ func TestTake(t *testing.T) {
2529
keys = append(keys, NewAccountKey(*accountKey, addrGenerator.New(), signer))
2630
}
2731

28-
ks := New(keys)
32+
ks := New(context.Background(), keys, testutils.SetupClientForRange(1, 100), zerolog.Nop())
2933

3034
t.Run("Take with no metadata updates", func(t *testing.T) {
3135
key, err := ks.Take()
@@ -67,10 +71,50 @@ func TestTake(t *testing.T) {
6771
})
6872

6973
t.Run("Take with expiration", func(t *testing.T) {
74+
blockHeight := uint64(10)
75+
blockID10 := identifierFixture()
76+
blockIDNonExpired := identifierFixture()
77+
client := &testutils.MockClient{
78+
Client: &mocks.Client{},
79+
GetTransactionResultsByBlockIDFunc: func(ctx context.Context, blockID sdk.Identifier) ([]*sdk.TransactionResult, error) {
80+
if blockID == blockID10 {
81+
return []*sdk.TransactionResult{}, nil
82+
}
83+
84+
if blockID == blockIDNonExpired {
85+
return []*sdk.TransactionResult{
86+
{
87+
Status: sdk.TransactionStatusFinalized,
88+
Error: nil,
89+
Events: []sdk.Event{},
90+
BlockID: blockID,
91+
BlockHeight: blockHeight + accountKeyBlockExpiration - 1,
92+
TransactionID: identifierFixture(),
93+
CollectionID: identifierFixture(),
94+
ComputationUsage: 104_512,
95+
},
96+
}, nil
97+
}
98+
99+
return []*sdk.TransactionResult{
100+
{
101+
Status: sdk.TransactionStatusFinalized,
102+
Error: nil,
103+
Events: []sdk.Event{},
104+
BlockID: blockID,
105+
BlockHeight: blockHeight + accountKeyBlockExpiration,
106+
TransactionID: identifierFixture(),
107+
CollectionID: identifierFixture(),
108+
ComputationUsage: 104_512,
109+
},
110+
}, nil
111+
},
112+
}
113+
ks := New(context.Background(), keys, client, zerolog.Nop())
114+
70115
key, err := ks.Take()
71116
require.NoError(t, err)
72117

73-
blockHeight := uint64(10)
74118
txID := identifierFixture()
75119
key.SetLockMetadata(txID, blockHeight)
76120

@@ -82,13 +126,21 @@ func TestTake(t *testing.T) {
82126
assert.Equal(t, key, ks.usedKeys[txID])
83127

84128
// notify for one block before the expiration block, key should still be reserved
85-
ks.NotifyBlock(blockHeight + accountKeyBlockExpiration - 1)
129+
ks.NotifyBlock(blockIDNonExpired)
130+
131+
// Give some time to allow the KeyStore to check for the
132+
// transaction result statuses in the background.
133+
time.Sleep(time.Second * 2)
86134

87135
assert.True(t, key.inUse.Load())
88136
assert.Equal(t, key, ks.usedKeys[txID])
89137

90138
// notify for the expiration block
91-
ks.NotifyBlock(blockHeight + accountKeyBlockExpiration)
139+
ks.NotifyBlock(identifierFixture())
140+
141+
// Give some time to allow the KeyStore to check for the
142+
// transaction result statuses in the background.
143+
time.Sleep(time.Second * 2)
92144

93145
// keystore and key should be reset
94146
assert.Equal(t, 2, ks.AvailableKeys())
@@ -106,9 +158,14 @@ func TestKeySigning(t *testing.T) {
106158
accountKey.Index = 0 // the fixture starts from index 1
107159
accountKey.SequenceNumber = 42
108160

109-
ks := New([]*AccountKey{
110-
NewAccountKey(*accountKey, address, signer),
111-
})
161+
ks := New(
162+
context.Background(),
163+
[]*AccountKey{
164+
NewAccountKey(*accountKey, address, signer),
165+
},
166+
testutils.SetupClientForRange(1, 100),
167+
zerolog.Nop(),
168+
)
112169

113170
key, err := ks.Take()
114171
require.NoError(t, err)
@@ -154,7 +211,7 @@ func TestConcurrentUse(t *testing.T) {
154211
keys = append(keys, key)
155212
}
156213

157-
ks := New(keys)
214+
ks := New(context.Background(), keys, testutils.SetupClientForRange(1, 100), zerolog.Nop())
158215

159216
g := errgroup.Group{}
160217
g.SetLimit(concurrentTxCount)

0 commit comments

Comments
 (0)