Skip to content

Commit

Permalink
_
Browse files Browse the repository at this point in the history
  • Loading branch information
rasom committed Dec 19, 2019
1 parent f3171fd commit 1b066d5
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 55 deletions.
55 changes: 55 additions & 0 deletions services/wallet/balance_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package wallet

import (
"context"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
)

type BalanceCache struct {
cache map[common.Address]map[*big.Int]*big.Int
lock sync.RWMutex
}

func (b *BalanceCache) readCachedBalance(account common.Address, blockNumber *big.Int) (*big.Int, bool) {
b.lock.RLock()
defer b.lock.RUnlock()

balance, exists := b.cache[account][blockNumber]
return balance, exists
}

func (b *BalanceCache) addBalanceToCache(account common.Address, blockNumber *big.Int, balance *big.Int) {
b.lock.Lock()
defer b.lock.Unlock()

_, exists := b.cache[account]
if !exists {
b.cache[account] = make(map[*big.Int]*big.Int)
}
b.cache[account][blockNumber] = balance
}

func (b *BalanceCache) BalanceAt(client BalanceReader, ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
cachedBalance, exists := b.readCachedBalance(account, blockNumber)
if exists {
return cachedBalance, nil
} else {
balance, err := client.BalanceAt(ctx, account, blockNumber)
if err != nil {
return nil, err
}
b.addBalanceToCache(account, blockNumber, balance)

return balance, nil
}
}

func NewBalanceCache() *BalanceCache {
return &BalanceCache{
cache: make(map[common.Address]map[*big.Int]*big.Int),
lock: sync.RWMutex{},
}
}
20 changes: 11 additions & 9 deletions services/wallet/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
)

type ethHistoricalCommand struct {
db *Database
eth TransferDownloader
address common.Address
client reactorClient
feed *event.Feed
db *Database
eth TransferDownloader
address common.Address
client reactorClient
balanceCache *BalanceCache
feed *event.Feed

from, to *big.Int
}
Expand Down Expand Up @@ -47,7 +48,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
start := time.Now()
downloadEthConcurrently(concurrent, c.client, c.eth, c.address, c.from, c.to)
downloadEthConcurrently(concurrent, c.client, c.balanceCache, c.eth, c.address, c.from, c.to)
select {
case <-concurrent.WaitAsync():
case <-ctx.Done():
Expand Down Expand Up @@ -322,9 +323,10 @@ func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error {
}
group.Add(erc20.Command())
eth := &ethHistoricalCommand{
db: c.db,
client: c.client,
address: address,
db: c.db,
client: c.client,
balanceCache: NewBalanceCache(),
address: address,
eth: &ETHTransferDownloader{
client: c.client,
accounts: []common.Address{address},
Expand Down
51 changes: 6 additions & 45 deletions services/wallet/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,57 +45,18 @@ type TransferDownloader interface {
GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error)
}

var (
balancesCache = map[common.Address]map[*big.Int]*big.Int{}
balanaceMapLock = sync.RWMutex{}
)

func readCachedBalance(account common.Address, blockNumber *big.Int) (*big.Int, bool) {
balanaceMapLock.RLock()
defer balanaceMapLock.RUnlock()

balance, exists := balancesCache[account][blockNumber]
return balance, exists
}

func addBalanceToCache(account common.Address, blockNumber *big.Int, balance *big.Int) {
balanaceMapLock.Lock()
defer balanaceMapLock.Unlock()

_, exists := balancesCache[account]
if !exists {
balancesCache[account] = make(map[*big.Int]*big.Int)
}
balancesCache[account][blockNumber] = balance
}

func getBalanceAt(client BalanceReader, ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
cachedBalance, exists := readCachedBalance(account, blockNumber)
if exists {
return cachedBalance, nil
} else {
balance, err := client.BalanceAt(ctx, account, blockNumber)
if err != nil {
return nil, err
}
addBalanceToCache(account, blockNumber, balance)

return balance, nil
}
}

func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, downloader TransferDownloader, account common.Address, low, high *big.Int) {
func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, balanceCache *BalanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int) {
c.Add(func(ctx context.Context) error {
if low.Cmp(high) >= 0 {
return nil
}
log.Debug("eth transfers comparing blocks", "low", low, "high", high)
lb, err := getBalanceAt(client, ctx, account, low)
lb, err := balanceCache.BalanceAt(client, ctx, account, low)
//lb, err := client.BalanceAt(ctx, account, low)
if err != nil {
return err
}
hb, err := getBalanceAt(client, ctx, account, high)
hb, err := balanceCache.BalanceAt(client, ctx, account, high)
//hb, err := client.BalanceAt(ctx, account, high)
if err != nil {
return err
Expand Down Expand Up @@ -132,10 +93,10 @@ func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, down
}
mid := new(big.Int).Add(low, high)
mid = mid.Div(mid, two)
getBalanceAt(client, ctx, account, mid)
balanceCache.BalanceAt(client, ctx, account, mid)
log.Debug("balances are not equal. spawn two concurrent downloaders", "low", low, "mid", mid, "high", high)
downloadEthConcurrently(c, client, downloader, account, low, mid)
downloadEthConcurrently(c, client, downloader, account, mid, high)
downloadEthConcurrently(c, client, balanceCache, downloader, account, low, mid)
downloadEthConcurrently(c, client, balanceCache, downloader, account, mid, high)
return nil
})
}
2 changes: 1 addition & 1 deletion services/wallet/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
downloadEthConcurrently(
concurrent, tc.options.balances, tc.options.batches,
concurrent, tc.options.balances, NewBalanceCache(), tc.options.batches,
common.Address{}, zero, tc.options.last)
concurrent.Wait()
require.NoError(t, concurrent.Error())
Expand Down

0 comments on commit 1b066d5

Please sign in to comment.