Skip to content

Commit

Permalink
status-im/status-mobile#9203 getBalance caching
Browse files Browse the repository at this point in the history
  • Loading branch information
rasom committed Jan 2, 2020
1 parent 0ed66b9 commit 9f60462
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 22 deletions.
15 changes: 12 additions & 3 deletions api/geth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,11 +907,20 @@ func (b *GethStatusBackend) startWallet() error {
for i, addr := range watchAddresses {
allAddresses[1+i] = common.Address(addr)
}

uniqAddressesMap := map[common.Address]struct{}{}
uniqAddresses := []common.Address{}
for _, address := range allAddresses {
if _, ok := uniqAddressesMap[address]; !ok {
uniqAddressesMap[address] = struct{}{}
uniqAddresses = append(uniqAddresses, address)
}
}

return wallet.StartReactor(
b.statusNode.RPCClient().Ethclient(),
allAddresses,
new(big.Int).SetUint64(b.statusNode.Config().NetworkID),
)
uniqAddresses,
new(big.Int).SetUint64(b.statusNode.Config().NetworkID))
}

// InjectChatAccount selects the current chat account using chatKeyHex and injects the key into whisper.
Expand Down
53 changes: 53 additions & 0 deletions services/wallet/balance_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package wallet

import (
"context"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
)

type balanceCache struct {
// cache maps an address to a map of a block number and the balance of this particular address
cache map[common.Address]map[*big.Int]*big.Int
lock sync.RWMutex
}

func (b *balanceCache) readCachedBalance(account common.Address, blockNumber *big.Int) *big.Int {
b.lock.RLock()
defer b.lock.RUnlock()

return b.cache[account][blockNumber]
}

func (b *balanceCache) addBalanceToCache(account common.Address, blockNumber *big.Int, balance *big.Int) {
b.lock.Lock()
defer b.lock.Unlock()

_, exists := b.cache[account]
if !exists {
b.cache[account] = make(map[*big.Int]*big.Int)
}
b.cache[account][blockNumber] = balance
}

func (b *balanceCache) BalanceAt(client BalanceReader, ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
cachedBalance := b.readCachedBalance(account, blockNumber)
if cachedBalance != nil {
return cachedBalance, nil
}
balance, err := client.BalanceAt(ctx, account, blockNumber)
if err != nil {
return nil, err
}
b.addBalanceToCache(account, blockNumber, balance)

return balance, nil
}

func newBalanceCache() *balanceCache {
return &balanceCache{
cache: make(map[common.Address]map[*big.Int]*big.Int),
}
}
27 changes: 15 additions & 12 deletions services/wallet/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
)

type ethHistoricalCommand struct {
db *Database
eth TransferDownloader
address common.Address
client reactorClient
feed *event.Feed
db *Database
eth TransferDownloader
address common.Address
client reactorClient
balanceCache *balanceCache
feed *event.Feed

from, to *big.Int
}
Expand Down Expand Up @@ -47,22 +48,22 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
start := time.Now()
downloadEthConcurrently(concurrent, c.client, c.eth, c.address, c.from, c.to)
downloadEthConcurrently(concurrent, c.client, c.balanceCache, c.eth, c.address, c.from, c.to)
select {
case <-concurrent.WaitAsync():
case <-ctx.Done():
log.Error("eth downloader is stuck")
return errors.New("eth downloader is stuck")
}
if concurrent.Error() != nil {
log.Error("failed to dowload transfers using concurrent downloader", "error", err)
log.Error("failed to dowload transfers using concurrent downloader", "error", concurrent.Error())
return concurrent.Error()
}
transfers := concurrent.Get()
log.Info("eth historical downloader finished successfully", "total transfers", len(transfers), "time", time.Since(start))
log.Info("eth historical downloader finished successfully", "address", c.address, "total transfers", len(transfers), "time", time.Since(start))
err = c.db.ProcessTranfers(transfers, []common.Address{c.address}, headersFromTransfers(transfers), nil, ethSync)
if err != nil {
log.Error("failed to save downloaded erc20 transfers", "error", err)
log.Error("failed to save downloaded eth transfers", "error", err)
return err
}
if len(transfers) > 0 {
Expand Down Expand Up @@ -310,6 +311,7 @@ type controlCommand struct {
func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error {
start := time.Now()
group := NewGroup(ctx)

for _, address := range c.accounts {
erc20 := &erc20HistoricalCommand{
db: c.db,
Expand All @@ -321,9 +323,10 @@ func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error {
}
group.Add(erc20.Command())
eth := &ethHistoricalCommand{
db: c.db,
client: c.client,
address: address,
db: c.db,
client: c.client,
balanceCache: newBalanceCache(),
address: address,
eth: &ETHTransferDownloader{
client: c.client,
accounts: []common.Address{address},
Expand Down
33 changes: 27 additions & 6 deletions services/wallet/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,43 @@ type TransferDownloader interface {
GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error)
}

func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) {
func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, cache *balanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int) {
c.Add(func(ctx context.Context) error {
if low.Cmp(high) >= 0 {
return nil
}
log.Debug("eth transfers comparing blocks", "low", low, "high", high)
lb, err := client.BalanceAt(ctx, account, low)
lb, err := cache.BalanceAt(client, ctx, account, low)
//lb, err := client.BalanceAt(ctx, account, low)
if err != nil {
return err
}
hb, err := client.BalanceAt(ctx, account, high)
hb, err := cache.BalanceAt(client, ctx, account, high)
//hb, err := client.BalanceAt(ctx, account, high)
if err != nil {
return err
}
if lb.Cmp(hb) == 0 {
log.Debug("balances are equal", "low", low, "high", high)
return nil
// In case if balances are equal but non zero we want to check if
// eth_getTransactionCount return different values, because there
// still might be transactions
if lb.Cmp(zero) != 0 {
return nil
}

ln, err := client.NonceAt(ctx, account, low)
if err != nil {
return err
}
hn, err := client.NonceAt(ctx, account, high)
if err != nil {
return err
}
if ln == hn {
log.Debug("transaction count is also equal", "low", low, "high", high)
return nil
}
}
if new(big.Int).Sub(high, low).Cmp(one) == 0 {
transfers, err := downloader.GetTransfersByNumber(ctx, high)
Expand All @@ -73,9 +93,10 @@ func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, down
}
mid := new(big.Int).Add(low, high)
mid = mid.Div(mid, two)
cache.BalanceAt(client, ctx, account, mid)
log.Debug("balances are not equal. spawn two concurrent downloaders", "low", low, "mid", mid, "high", high)
downloadEthConcurrently(c, client, downloader, account, low, mid)
downloadEthConcurrently(c, client, downloader, account, mid, high)
downloadEthConcurrently(c, client, cache, downloader, account, low, mid)
downloadEthConcurrently(c, client, cache, downloader, account, mid, high)
return nil
})
}
2 changes: 1 addition & 1 deletion services/wallet/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
downloadEthConcurrently(
concurrent, tc.options.balances, tc.options.batches,
concurrent, tc.options.balances, newBalanceCache(), tc.options.batches,
common.Address{}, zero, tc.options.last)
concurrent.Wait()
require.NoError(t, concurrent.Error())
Expand Down
1 change: 1 addition & 0 deletions services/wallet/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type HeaderReader interface {
// BalanceReader interface for reading balance at a specifeid address.
type BalanceReader interface {
BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
}

type reactorClient interface {
Expand Down

0 comments on commit 9f60462

Please sign in to comment.