Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] status-im/status-react#9203 BalanceAt caching #1744

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions api/geth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,11 +907,20 @@ func (b *GethStatusBackend) startWallet() error {
for i, addr := range watchAddresses {
allAddresses[1+i] = common.Address(addr)
}

uniqAddressesMap := map[common.Address]struct{}{}
uniqAddresses := []common.Address{}
for _, address := range allAddresses {
if _, ok := uniqAddressesMap[address]; !ok {
uniqAddressesMap[address] = struct{}{}
uniqAddresses = append(uniqAddresses, address)
}
}

return wallet.StartReactor(
b.statusNode.RPCClient().Ethclient(),
allAddresses,
new(big.Int).SetUint64(b.statusNode.Config().NetworkID),
)
uniqAddresses,
new(big.Int).SetUint64(b.statusNode.Config().NetworkID))
}

// InjectChatAccount selects the current chat account using chatKeyHex and injects the key into whisper.
Expand Down
53 changes: 53 additions & 0 deletions services/wallet/balance_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package wallet

import (
"context"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
)

type balanceCache struct {
// cache maps an address to a map of a block number and the balance of this particular address
cache map[common.Address]map[*big.Int]*big.Int
rasom marked this conversation as resolved.
Show resolved Hide resolved
lock sync.RWMutex
}

func (b *balanceCache) readCachedBalance(account common.Address, blockNumber *big.Int) *big.Int {
b.lock.RLock()
defer b.lock.RUnlock()

return b.cache[account][blockNumber]
}

func (b *balanceCache) addBalanceToCache(account common.Address, blockNumber *big.Int, balance *big.Int) {
b.lock.Lock()
defer b.lock.Unlock()

_, exists := b.cache[account]
if !exists {
b.cache[account] = make(map[*big.Int]*big.Int)
}
b.cache[account][blockNumber] = balance
}

func (b *balanceCache) BalanceAt(client BalanceReader, ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
cachedBalance := b.readCachedBalance(account, blockNumber)
if cachedBalance != nil {
return cachedBalance, nil
}
balance, err := client.BalanceAt(ctx, account, blockNumber)
if err != nil {
return nil, err
}
b.addBalanceToCache(account, blockNumber, balance)

return balance, nil
}

func newBalanceCache() *balanceCache {
return &balanceCache{
cache: make(map[common.Address]map[*big.Int]*big.Int),
}
}
27 changes: 15 additions & 12 deletions services/wallet/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
)

type ethHistoricalCommand struct {
db *Database
eth TransferDownloader
address common.Address
client reactorClient
feed *event.Feed
db *Database
eth TransferDownloader
address common.Address
client reactorClient
balanceCache *balanceCache
feed *event.Feed

from, to *big.Int
}
Expand Down Expand Up @@ -47,22 +48,22 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
start := time.Now()
downloadEthConcurrently(concurrent, c.client, c.eth, c.address, c.from, c.to)
downloadEthConcurrently(concurrent, c.client, c.balanceCache, c.eth, c.address, c.from, c.to)
select {
case <-concurrent.WaitAsync():
case <-ctx.Done():
log.Error("eth downloader is stuck")
return errors.New("eth downloader is stuck")
}
if concurrent.Error() != nil {
log.Error("failed to dowload transfers using concurrent downloader", "error", err)
log.Error("failed to dowload transfers using concurrent downloader", "error", concurrent.Error())
return concurrent.Error()
}
transfers := concurrent.Get()
log.Info("eth historical downloader finished successfully", "total transfers", len(transfers), "time", time.Since(start))
log.Info("eth historical downloader finished successfully", "address", c.address, "total transfers", len(transfers), "time", time.Since(start))
err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headersFromTransfers(transfers), nil, ethSync)
if err != nil {
log.Error("failed to save downloaded erc20 transfers", "error", err)
log.Error("failed to save downloaded eth transfers", "error", err)
return err
}
if len(transfers) > 0 {
Expand Down Expand Up @@ -310,6 +311,7 @@ type controlCommand struct {
func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error {
start := time.Now()
group := NewGroup(ctx)

for _, address := range c.accounts {
erc20 := &erc20HistoricalCommand{
db: c.db,
Expand All @@ -321,9 +323,10 @@ func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error {
}
group.Add(erc20.Command())
eth := &ethHistoricalCommand{
db: c.db,
client: c.client,
address: address,
db: c.db,
client: c.client,
balanceCache: newBalanceCache(),
address: address,
eth: &ETHTransferDownloader{
client: c.client,
accounts: []common.Address{address},
Expand Down
33 changes: 27 additions & 6 deletions services/wallet/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,43 @@ type TransferDownloader interface {
GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error)
}

func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) {
func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, cache *balanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int) {
c.Add(func(ctx context.Context) error {
if low.Cmp(high) >= 0 {
return nil
}
log.Debug("eth transfers comparing blocks", "low", low, "high", high)
lb, err := client.BalanceAt(ctx, account, low)
lb, err := cache.BalanceAt(client, ctx, account, low)
//lb, err := client.BalanceAt(ctx, account, low)
if err != nil {
return err
}
hb, err := client.BalanceAt(ctx, account, high)
hb, err := cache.BalanceAt(client, ctx, account, high)
//hb, err := client.BalanceAt(ctx, account, high)
if err != nil {
return err
}
if lb.Cmp(hb) == 0 {
log.Debug("balances are equal", "low", low, "high", high)
return nil
// In case if balances are equal but non zero we want to check if
// eth_getTransactionCount return different values, because there
// still might be transactions
if lb.Cmp(zero) != 0 {
return nil
}

ln, err := client.NonceAt(ctx, account, low)
if err != nil {
return err
}
hn, err := client.NonceAt(ctx, account, high)
if err != nil {
return err
}
if ln == hn {
log.Debug("transaction count is also equal", "low", low, "high", high)
return nil
}
}
if new(big.Int).Sub(high, low).Cmp(one) == 0 {
transfers, err := downloader.GetTransfersByNumber(ctx, high)
Expand All @@ -73,9 +93,10 @@ func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, down
}
mid := new(big.Int).Add(low, high)
mid = mid.Div(mid, two)
cache.BalanceAt(client, ctx, account, mid)
log.Debug("balances are not equal. spawn two concurrent downloaders", "low", low, "mid", mid, "high", high)
downloadEthConcurrently(c, client, downloader, account, low, mid)
downloadEthConcurrently(c, client, downloader, account, mid, high)
downloadEthConcurrently(c, client, cache, downloader, account, low, mid)
downloadEthConcurrently(c, client, cache, downloader, account, mid, high)
return nil
})
}
2 changes: 1 addition & 1 deletion services/wallet/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
downloadEthConcurrently(
concurrent, tc.options.balances, tc.options.batches,
concurrent, tc.options.balances, newBalanceCache(), tc.options.batches,
common.Address{}, zero, tc.options.last)
concurrent.Wait()
require.NoError(t, concurrent.Error())
Expand Down
1 change: 1 addition & 0 deletions services/wallet/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type HeaderReader interface {
// BalanceReader interface for reading balance at a specifeid address.
type BalanceReader interface {
BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
}

type reactorClient interface {
Expand Down