Skip to content

Commit

Permalink
_
Browse files Browse the repository at this point in the history
  • Loading branch information
rasom committed Dec 20, 2019
1 parent ffa7903 commit dea56e0
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 28 deletions.
36 changes: 17 additions & 19 deletions services/wallet/balance_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ import (
"github.com/ethereum/go-ethereum/common"
)

type BalanceCache struct {
type balanceCache struct {
// cache maps an address to a map of a block number and the balance of this particular address
cache map[common.Address]map[*big.Int]*big.Int
lock sync.RWMutex
}

func (b *BalanceCache) readCachedBalance(account common.Address, blockNumber *big.Int) (*big.Int, bool) {
func (b *balanceCache) readCachedBalance(account common.Address, blockNumber *big.Int) *big.Int {
b.lock.RLock()
defer b.lock.RUnlock()

balance, exists := b.cache[account][blockNumber]
return balance, exists
return b.cache[account][blockNumber]
}

func (b *BalanceCache) addBalanceToCache(account common.Address, blockNumber *big.Int, balance *big.Int) {
func (b *balanceCache) addBalanceToCache(account common.Address, blockNumber *big.Int, balance *big.Int) {
b.lock.Lock()
defer b.lock.Unlock()

Expand All @@ -32,24 +32,22 @@ func (b *BalanceCache) addBalanceToCache(account common.Address, blockNumber *bi
b.cache[account][blockNumber] = balance
}

func (b *BalanceCache) BalanceAt(client BalanceReader, ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
cachedBalance, exists := b.readCachedBalance(account, blockNumber)
if exists {
func (b *balanceCache) BalanceAt(client BalanceReader, ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
cachedBalance := b.readCachedBalance(account, blockNumber)
if cachedBalance != nil {
return cachedBalance, nil
} else {
balance, err := client.BalanceAt(ctx, account, blockNumber)
if err != nil {
return nil, err
}
b.addBalanceToCache(account, blockNumber, balance)

return balance, nil
}
balance, err := client.BalanceAt(ctx, account, blockNumber)
if err != nil {
return nil, err
}
b.addBalanceToCache(account, blockNumber, balance)

return balance, nil
}

func NewBalanceCache() *BalanceCache {
return &BalanceCache{
func newBalanceCache() *balanceCache {
return &balanceCache{
cache: make(map[common.Address]map[*big.Int]*big.Int),
lock: sync.RWMutex{},
}
}
4 changes: 2 additions & 2 deletions services/wallet/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ethHistoricalCommand struct {
eth TransferDownloader
address common.Address
client reactorClient
balanceCache *BalanceCache
balanceCache *balanceCache
feed *event.Feed

from, to *big.Int
Expand Down Expand Up @@ -325,7 +325,7 @@ func (c *controlCommand) fastIndex(ctx context.Context, to *DBHeader) error {
eth := &ethHistoricalCommand{
db: c.db,
client: c.client,
balanceCache: NewBalanceCache(),
balanceCache: newBalanceCache(),
address: address,
eth: &ETHTransferDownloader{
client: c.client,
Expand Down
12 changes: 6 additions & 6 deletions services/wallet/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,18 @@ type TransferDownloader interface {
GetTransfersByNumber(context.Context, *big.Int) ([]Transfer, error)
}

func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, balanceCache *BalanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int) {
func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, cache *balanceCache, downloader TransferDownloader, account common.Address, low, high *big.Int) {
c.Add(func(ctx context.Context) error {
if low.Cmp(high) >= 0 {
return nil
}
log.Debug("eth transfers comparing blocks", "low", low, "high", high)
lb, err := balanceCache.BalanceAt(client, ctx, account, low)
lb, err := cache.BalanceAt(client, ctx, account, low)
//lb, err := client.BalanceAt(ctx, account, low)
if err != nil {
return err
}
hb, err := balanceCache.BalanceAt(client, ctx, account, high)
hb, err := cache.BalanceAt(client, ctx, account, high)
//hb, err := client.BalanceAt(ctx, account, high)
if err != nil {
return err
Expand Down Expand Up @@ -93,10 +93,10 @@ func downloadEthConcurrently(c *ConcurrentDownloader, client BalanceReader, bala
}
mid := new(big.Int).Add(low, high)
mid = mid.Div(mid, two)
balanceCache.BalanceAt(client, ctx, account, mid)
cache.BalanceAt(client, ctx, account, mid)
log.Debug("balances are not equal. spawn two concurrent downloaders", "low", low, "mid", mid, "high", high)
downloadEthConcurrently(c, client, balanceCache, downloader, account, low, mid)
downloadEthConcurrently(c, client, balanceCache, downloader, account, mid, high)
downloadEthConcurrently(c, client, cache, downloader, account, low, mid)
downloadEthConcurrently(c, client, cache, downloader, account, mid, high)
return nil
})
}
2 changes: 1 addition & 1 deletion services/wallet/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
defer cancel()
concurrent := NewConcurrentDownloader(ctx)
downloadEthConcurrently(
concurrent, tc.options.balances, NewBalanceCache(), tc.options.batches,
concurrent, tc.options.balances, newBalanceCache(), tc.options.batches,
common.Address{}, zero, tc.options.last)
concurrent.Wait()
require.NoError(t, concurrent.Error())
Expand Down

0 comments on commit dea56e0

Please sign in to comment.