Skip to content

Commit

Permalink
status-im/status-mobile#9203 getBalance caching
Browse files Browse the repository at this point in the history
  • Loading branch information
rasom committed Dec 20, 2019
1 parent b99af53 commit ffa7903
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 18 deletions.
12 changes: 11 additions & 1 deletion api/geth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,9 +904,19 @@ func (b *GethStatusBackend) startWallet() error {
for i, addr := range watchAddresses {
allAddresses[1+i] = common.Address(addr)
}

uniqAddressesMap := map[common.Address]struct{}{}
uniqAddresses := []common.Address{}
for _, address := range allAddresses {
if _, ok := uniqAddressesMap[address]; !ok {
uniqAddressesMap[address] = struct{}{}
uniqAddresses = append(uniqAddresses, address)
}
}

return wallet.StartReactor(
b.statusNode.RPCClient().Ethclient(),
allAddresses,
uniqAddresses,
new(big.Int).SetUint64(b.statusNode.Config().NetworkID))
}

Expand Down
55 changes: 55 additions & 0 deletions services/wallet/balance_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package wallet

import (
"context"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
)

type BalanceCache struct {
cache map[common.Address]map[*big.Int]*big.Int
lock sync.RWMutex
}

func (b *BalanceCache) readCachedBalance(account common.Address, blockNumber *big.Int) (*big.Int, bool) {
b.lock.RLock()
defer b.lock.RUnlock()

balance, exists := b.cache[account][blockNumber]
return balance, exists
}

func (b *BalanceCache) addBalanceToCache(account common.Address, blockNumber *big.Int, balance *big.Int) {
b.lock.Lock()
defer b.lock.Unlock()

_, exists := b.cache[account]
if !exists {
b.cache[account] = make(map[*big.Int]*big.Int)
}
b.cache[account][blockNumber] = balance
}

func (b *BalanceCache) BalanceAt(client BalanceReader, ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
cachedBalance, exists := b.readCachedBalance(account, blockNumber)
if exists {
return cachedBalance, nil
} else {
balance, err := client.BalanceAt(ctx, account, blockNumber)
if err != nil {
return nil, err
}
b.addBalanceToCache(account, blockNumber, balance)

return balance, nil
}
}

func NewBalanceCache() *BalanceCache {
return &BalanceCache{
cache: make(map[common.Address]map[*big.Int]*big.Int),
lock: sync.RWMutex{},
}
}
23 changes: 13 additions & 10 deletions services/wallet/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
)

type ethHistoricalCommand struct {
db *Database
eth TransferDownloader
address common.Address
client reactorClient
feed *event.Feed
db *Database
eth TransferDownloader
address common.Address
client reactorClient
balanceCache *BalanceCache
feed *event.Feed

from, to *big.Int
}
Expand Down Expand Up @@ -47,7 +48,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
start := time.Now()
downloadEthConcurrently(concurrent, c.client, c.eth, c.address, c.from, c.to)
downloadEthConcurrently(concurrent, c.client, c.balanceCache, c.eth, c.address, c.from, c.to)
select {
case <-concurrent.WaitAsync():
case <-ctx.Done():
Expand All @@ -59,7 +60,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
return concurrent.Error()
}
transfers := concurrent.Get()
log.Info("eth historical downloader finished successfully", "total transfers", len(transfers), "time", time.Since(start))
log.Info("eth historical downloader finished successfully", "address", c.address, "total transfers", len(transfers), "time", time.Since(start))
err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headersFromTransfers(transfers), nil, ethSync)
if err != nil {
log.Error("failed to save downloaded erc20 transfers", "error", err)
Expand Down Expand Up @@ -310,6 +311,7 @@ type controlCommand struct {
func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error {
start := time.Now()
group := NewGroup(ctx)

for _, address := range c.accounts {
erc20 := &erc20HistoricalCommand{
db: c.db,
Expand All @@ -321,9 +323,10 @@ func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error {
}
group.Add(erc20.Command())
eth := &ethHistoricalCommand{
db: c.db,
client: c.client,
address: address,
db: c.db,
client: c.client,
balanceCache: NewBalanceCache(),
address: address,
eth: &ETHTransferDownloader{
client: c.client,
accounts: []common.Address{address},
Expand Down
33 changes: 27 additions & 6 deletions services/wallet/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,43 @@ type TransferDownloader interface {
GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error)
}

func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) {
func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, balanceCache *BalanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int) {
c.Add(func(ctx context.Context) error {
if low.Cmp(high) >= 0 {
return nil
}
log.Debug("eth transfers comparing blocks", "low", low, "high", high)
lb, err := client.BalanceAt(ctx, account, low)
lb, err := balanceCache.BalanceAt(client, ctx, account, low)
//lb, err := client.BalanceAt(ctx, account, low)
if err != nil {
return err
}
hb, err := client.BalanceAt(ctx, account, high)
hb, err := balanceCache.BalanceAt(client, ctx, account, high)
//hb, err := client.BalanceAt(ctx, account, high)
if err != nil {
return err
}
if lb.Cmp(hb) == 0 {
log.Debug("balances are equal", "low", low, "high", high)
return nil
// In case if balances are equal but non zero we want to check if
// eth_getTransactionCount return different values, because there
// still might be transactions
if lb.Cmp(zero) != 0 {
return nil
}

ln, err := client.NonceAt(ctx, account, low)
if err != nil {
return err
}
hn, err := client.NonceAt(ctx, account, high)
if err != nil {
return err
}
if ln == hn {
log.Debug("transaction count is also equal", "low", low, "high", high)
return nil
}
}
if new(big.Int).Sub(high, low).Cmp(one) == 0 {
transfers, err := downloader.GetTransfersByNumber(ctx, high)
Expand All @@ -73,9 +93,10 @@ func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, down
}
mid := new(big.Int).Add(low, high)
mid = mid.Div(mid, two)
balanceCache.BalanceAt(client, ctx, account, mid)
log.Debug("balances are not equal. spawn two concurrent downloaders", "low", low, "mid", mid, "high", high)
downloadEthConcurrently(c, client, downloader, account, low, mid)
downloadEthConcurrently(c, client, downloader, account, mid, high)
downloadEthConcurrently(c, client, balanceCache, downloader, account, low, mid)
downloadEthConcurrently(c, client, balanceCache, downloader, account, mid, high)
return nil
})
}
2 changes: 1 addition & 1 deletion services/wallet/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
downloadEthConcurrently(
concurrent, tc.options.balances, tc.options.batches,
concurrent, tc.options.balances, NewBalanceCache(), tc.options.batches,
common.Address{}, zero, tc.options.last)
concurrent.Wait()
require.NoError(t, concurrent.Error())
Expand Down
1 change: 1 addition & 0 deletions services/wallet/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type HeaderReader interface {
// BalanceReader interface for reading balance at a specifeid address.
type BalanceReader interface {
BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
}

type reactorClient interface {
Expand Down

0 comments on commit ffa7903

Please sign in to comment.