Skip to content

Commit 6f911ad

Browse files
committed
Improve release frequency for signing keys by checking tx result status
1 parent f540b99 commit 6f911ad

File tree

6 files changed

+173
-32
lines changed

6 files changed

+173
-32
lines changed

bootstrap/bootstrap.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
243243
}
244244
}
245245

246-
b.keystore = keystore.New(accountKeys)
246+
b.keystore = keystore.New(ctx, accountKeys, b.client, b.logger)
247247

248248
// create transaction pool
249249
var txPool requester.TxPool

services/ingestion/event_subscriber.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
176176
for _, evt := range blockEvents.Events {
177177
r.keyLock.NotifyTransaction(evt.TransactionID)
178178
}
179-
r.keyLock.NotifyBlock(blockEvents.Height)
179+
r.keyLock.NotifyBlock(blockEvents.BlockID)
180180

181181
eventsChan <- evmEvents
182182

services/ingestion/event_subscriber_test.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,13 @@ func Test_Subscribing(t *testing.T) {
4444
)
4545
require.NoError(t, err)
4646

47-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
47+
ctx := context.Background()
48+
logger := zerolog.Nop()
49+
ks := keystore.New(ctx, nil, client, logger)
4850

49-
events := subscriber.Subscribe(context.Background())
51+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
52+
53+
events := subscriber.Subscribe(ctx)
5054

5155
var prevHeight uint64
5256

@@ -84,9 +88,13 @@ func Test_MissingBlockEvent(t *testing.T) {
8488
)
8589
require.NoError(t, err)
8690

87-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
91+
ctx := context.Background()
92+
logger := zerolog.Nop()
93+
ks := keystore.New(ctx, nil, client, logger)
94+
95+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
8896

89-
events := subscriber.Subscribe(context.Background())
97+
events := subscriber.Subscribe(ctx)
9098

9199
missingHashes := make([]gethCommon.Hash, 0)
92100

@@ -186,9 +194,13 @@ func Test_SubscribingWithRetryOnError(t *testing.T) {
186194
)
187195
require.NoError(t, err)
188196

189-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
197+
ctx := context.Background()
198+
logger := zerolog.Nop()
199+
ks := keystore.New(ctx, nil, client, logger)
200+
201+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
190202

191-
events := subscriber.Subscribe(context.Background())
203+
events := subscriber.Subscribe(ctx)
192204

193205
var prevHeight uint64
194206

@@ -249,9 +261,13 @@ func Test_SubscribingWithRetryOnErrorMultipleBlocks(t *testing.T) {
249261
)
250262
require.NoError(t, err)
251263

252-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
264+
ctx := context.Background()
265+
logger := zerolog.Nop()
266+
ks := keystore.New(ctx, nil, client, logger)
253267

254-
events := subscriber.Subscribe(context.Background())
268+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
269+
270+
events := subscriber.Subscribe(ctx)
255271

256272
var prevHeight uint64
257273

@@ -311,9 +327,13 @@ func Test_SubscribingWithRetryOnErrorEmptyBlocks(t *testing.T) {
311327
)
312328
require.NoError(t, err)
313329

314-
subscriber := NewRPCEventSubscriber(zerolog.Nop(), client, flowGo.Previewnet, keystore.New(nil), 1)
330+
ctx := context.Background()
331+
logger := zerolog.Nop()
332+
ks := keystore.New(ctx, nil, client, logger)
333+
334+
subscriber := NewRPCEventSubscriber(logger, client, flowGo.Previewnet, ks, 1)
315335

316-
events := subscriber.Subscribe(context.Background())
336+
events := subscriber.Subscribe(ctx)
317337

318338
var prevHeight uint64
319339

services/requester/keystore/key_store.go

Lines changed: 85 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package keystore
22

33
import (
4+
"context"
45
"fmt"
56
"sync"
67

78
flowsdk "github.com/onflow/flow-go-sdk"
9+
"github.com/onflow/flow-go-sdk/access"
810
"github.com/onflow/flow-go/model/flow"
11+
"github.com/rs/zerolog"
912
)
1013

1114
var ErrNoKeysAvailable = fmt.Errorf("no signing keys available")
@@ -14,21 +17,37 @@ const accountKeyBlockExpiration = flow.DefaultTransactionExpiry
1417

1518
type KeyLock interface {
1619
NotifyTransaction(txID flowsdk.Identifier)
17-
NotifyBlock(blockHeight uint64)
20+
NotifyBlock(blockID flowsdk.Identifier)
1821
}
1922

2023
type KeyStore struct {
24+
client access.Client
2125
availableKeys chan *AccountKey
2226
usedKeys map[flowsdk.Identifier]*AccountKey
2327
size int
2428
keyMu sync.Mutex
29+
blockChan chan flowsdk.Identifier
30+
logger zerolog.Logger
31+
32+
// Signal channel used to prevent blocking writes
33+
// on `blockChan` when the node is shutting down.
34+
done chan struct{}
2535
}
2636

2737
var _ KeyLock = (*KeyStore)(nil)
2838

29-
func New(keys []*AccountKey) *KeyStore {
39+
func New(
40+
ctx context.Context,
41+
keys []*AccountKey,
42+
client access.Client,
43+
logger zerolog.Logger,
44+
) *KeyStore {
3045
ks := &KeyStore{
31-
usedKeys: map[flowsdk.Identifier]*AccountKey{},
46+
client: client,
47+
usedKeys: map[flowsdk.Identifier]*AccountKey{},
48+
blockChan: make(chan flowsdk.Identifier, 100),
49+
logger: logger,
50+
done: make(chan struct{}),
3251
}
3352

3453
availableKeys := make(chan *AccountKey, len(keys))
@@ -39,6 +58,13 @@ func New(keys []*AccountKey) *KeyStore {
3958
ks.size = len(keys)
4059
ks.availableKeys = availableKeys
4160

61+
// For cases where the EVM Gateway is run in an index-mode,
62+
// there is no need to release any keys, since transaction
63+
// submission is not allowed.
64+
if ks.size > 0 {
65+
go ks.processLockedKeys(ctx)
66+
}
67+
4268
return ks
4369
}
4470

@@ -69,16 +95,16 @@ func (k *KeyStore) NotifyTransaction(txID flowsdk.Identifier) {
6995
k.unsafeUnlockKey(txID)
7096
}
7197

72-
// NotifyBlock is called to notify the KeyStore of a new ingested block height.
98+
// NotifyBlock is called to notify the KeyStore of a newly ingested block.
7399
// Pending transactions older than a threshold number of blocks are removed.
74-
func (k *KeyStore) NotifyBlock(blockHeight uint64) {
75-
k.keyMu.Lock()
76-
defer k.keyMu.Unlock()
77-
78-
for txID, key := range k.usedKeys {
79-
if blockHeight-key.lastLockedBlock.Load() >= accountKeyBlockExpiration {
80-
k.unsafeUnlockKey(txID)
81-
}
100+
func (k *KeyStore) NotifyBlock(blockID flowsdk.Identifier) {
101+
select {
102+
case <-k.done:
103+
k.logger.Warn().Msg(
104+
"received `NotifyBlock` while the server is shutting down",
105+
)
106+
default:
107+
k.blockChan <- blockID
82108
}
83109
}
84110

@@ -106,3 +132,50 @@ func (k *KeyStore) setLockMetadata(
106132
defer k.keyMu.Unlock()
107133
k.usedKeys[txID] = key
108134
}
135+
136+
func (k *KeyStore) processLockedKeys(ctx context.Context) {
137+
for {
138+
select {
139+
case <-ctx.Done():
140+
close(k.done)
141+
return
142+
case blockID := <-k.blockChan:
143+
// TODO: If calling `k.client.GetTransactionResultsByBlockID` for each
144+
// new block, seems to be problematic for ANs, we can take an approach
145+
// like the one below:
146+
// If the available keys ratio is >= 60% of the total signing keys,
147+
// then we can skip checking the transaction result statuses.
148+
// The signing keys from invalid EVM transactions, will eventually
149+
// come again into availability, after `accountKeyBlockExpiration`
150+
// blocks have passed.
151+
// availablekeysRatio := float64(k.AvailableKeys()) / float64(k.size)
152+
// if availablekeysRatio >= 0.6 {
153+
// continue
154+
// }
155+
156+
txResults, err := k.client.GetTransactionResultsByBlockID(context.Background(), blockID)
157+
if err != nil || len(txResults) == 0 {
158+
continue
159+
}
160+
161+
k.releasekeys(txResults)
162+
fmt.Println("[processLockedKeys]: ", k.AvailableKeys())
163+
}
164+
}
165+
}
166+
167+
func (k *KeyStore) releasekeys(txResults []*flowsdk.TransactionResult) {
168+
k.keyMu.Lock()
169+
defer k.keyMu.Unlock()
170+
171+
for _, txResult := range txResults {
172+
k.unsafeUnlockKey(txResult.TransactionID)
173+
}
174+
175+
blockHeight := txResults[0].BlockHeight
176+
for txID, key := range k.usedKeys {
177+
if blockHeight-key.lastLockedBlock.Load() >= accountKeyBlockExpiration {
178+
k.unsafeUnlockKey(txID)
179+
}
180+
}
181+
}

services/requester/keystore/key_store_test.go

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
package keystore
22

33
import (
4+
"context"
45
"errors"
56
"sync"
67
"testing"
78
"time"
89

10+
"github.com/onflow/flow-evm-gateway/services/testutils"
911
sdk "github.com/onflow/flow-go-sdk"
12+
"github.com/onflow/flow-go-sdk/access/mocks"
1013
"github.com/onflow/flow-go-sdk/test"
1114
"github.com/onflow/flow-go/utils/unittest"
15+
"github.com/rs/zerolog"
1216
"golang.org/x/sync/errgroup"
1317

1418
"github.com/stretchr/testify/assert"
@@ -25,7 +29,7 @@ func TestTake(t *testing.T) {
2529
keys = append(keys, NewAccountKey(*accountKey, addrGenerator.New(), signer))
2630
}
2731

28-
ks := New(keys)
32+
ks := New(context.Background(), keys, testutils.SetupClientForRange(1, 100), zerolog.Nop())
2933

3034
t.Run("Take with no metadata updates", func(t *testing.T) {
3135
key, err := ks.Take()
@@ -67,10 +71,34 @@ func TestTake(t *testing.T) {
6771
})
6872

6973
t.Run("Take with expiration", func(t *testing.T) {
74+
blockHeight := uint64(10)
75+
blockID10 := identifierFixture()
76+
client := &testutils.MockClient{
77+
Client: &mocks.Client{},
78+
GetTransactionResultsByBlockIDFunc: func(ctx context.Context, blockID sdk.Identifier) ([]*sdk.TransactionResult, error) {
79+
if blockID == blockID10 {
80+
return []*sdk.TransactionResult{}, nil
81+
}
82+
83+
return []*sdk.TransactionResult{
84+
{
85+
Status: sdk.TransactionStatusFinalized,
86+
Error: nil,
87+
Events: []sdk.Event{},
88+
BlockID: blockID,
89+
BlockHeight: blockHeight + accountKeyBlockExpiration,
90+
TransactionID: identifierFixture(),
91+
CollectionID: identifierFixture(),
92+
ComputationUsage: 104_512,
93+
},
94+
}, nil
95+
},
96+
}
97+
ks := New(context.Background(), keys, client, zerolog.Nop())
98+
7099
key, err := ks.Take()
71100
require.NoError(t, err)
72101

73-
blockHeight := uint64(10)
74102
txID := identifierFixture()
75103
key.SetLockMetadata(txID, blockHeight)
76104

@@ -82,13 +110,17 @@ func TestTake(t *testing.T) {
82110
assert.Equal(t, key, ks.usedKeys[txID])
83111

84112
// notify for one block before the expiration block, key should still be reserved
85-
ks.NotifyBlock(blockHeight + accountKeyBlockExpiration - 1)
113+
ks.NotifyBlock(blockID10)
86114

87115
assert.True(t, key.inUse.Load())
88116
assert.Equal(t, key, ks.usedKeys[txID])
89117

90118
// notify for the expiration block
91-
ks.NotifyBlock(blockHeight + accountKeyBlockExpiration)
119+
ks.NotifyBlock(identifierFixture())
120+
121+
// Give some time to allow the KeyStore to check for the
122+
// transaction result statuses in the background.
123+
time.Sleep(time.Second * 2)
92124

93125
// keystore and key should be reset
94126
assert.Equal(t, 2, ks.AvailableKeys())
@@ -106,9 +138,14 @@ func TestKeySigning(t *testing.T) {
106138
accountKey.Index = 0 // the fixture starts from index 1
107139
accountKey.SequenceNumber = 42
108140

109-
ks := New([]*AccountKey{
110-
NewAccountKey(*accountKey, address, signer),
111-
})
141+
ks := New(
142+
context.Background(),
143+
[]*AccountKey{
144+
NewAccountKey(*accountKey, address, signer),
145+
},
146+
testutils.SetupClientForRange(1, 100),
147+
zerolog.Nop(),
148+
)
112149

113150
key, err := ks.Take()
114151
require.NoError(t, err)
@@ -154,7 +191,7 @@ func TestConcurrentUse(t *testing.T) {
154191
keys = append(keys, key)
155192
}
156193

157-
ks := New(keys)
194+
ks := New(context.Background(), keys, testutils.SetupClientForRange(1, 100), zerolog.Nop())
158195

159196
g := errgroup.Group{}
160197
g.SetLimit(concurrentTxCount)

services/testutils/mock_client.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type MockClient struct {
1818
GetEventsForHeightRangeFunc func(
1919
ctx context.Context, eventType string, startHeight uint64, endHeight uint64,
2020
) ([]flow.BlockEvents, error)
21+
GetTransactionResultsByBlockIDFunc func(ctx context.Context, blockID flow.Identifier) ([]*flow.TransactionResult, error)
2122
}
2223

2324
func (c *MockClient) GetBlockHeaderByHeight(ctx context.Context, height uint64) (*flow.BlockHeader, error) {
@@ -50,6 +51,16 @@ func (c *MockClient) GetEventsForHeightRange(
5051
return c.Client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight)
5152
}
5253

54+
func (c *MockClient) GetTransactionResultsByBlockID(
55+
ctx context.Context,
56+
blockID flow.Identifier,
57+
) ([]*flow.TransactionResult, error) {
58+
if c.GetTransactionResultsByBlockIDFunc != nil {
59+
return c.GetTransactionResultsByBlockIDFunc(ctx, blockID)
60+
}
61+
return c.Client.GetTransactionResultsByBlockID(ctx, blockID)
62+
}
63+
5364
func SetupClientForRange(startHeight uint64, endHeight uint64) *MockClient {
5465
client, events := SetupClient(startHeight, endHeight)
5566
go func() {

0 commit comments

Comments
 (0)