Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use watched addresses from direct indexing params by default while serving statediff APIs #262

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 66 additions & 21 deletions statediff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
var db sql.Database
var err error
quitCh := make(chan bool)
if params.IndexerConfig != nil {
indexerConfigAvailable := params.IndexerConfig != nil
if indexerConfigAvailable {
info := nodeinfo.Info{
GenesisBlock: blockChain.Genesis().Hash().Hex(),
NetworkID: strconv.FormatUint(cfg.NetworkId, 10),
Expand Down Expand Up @@ -201,11 +202,13 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
statediffMetrics: statediffMetrics,
sqlFileWaitingForWrite: false,
}
if params.IndexerConfig.Type() == shared.POSTGRES {
knownGaps.checkForGaps = true
} else {
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
knownGaps.checkForGaps = false
if indexerConfigAvailable {
if params.IndexerConfig.Type() == shared.POSTGRES {
knownGaps.checkForGaps = true
} else {
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
knownGaps.checkForGaps = false
}
}
sds := &Service{
Mutex: sync.Mutex{},
Expand All @@ -226,9 +229,11 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs())

err = loadWatchedAddresses(indexer)
if err != nil {
return err
if indexerConfigAvailable {
err = loadWatchedAddresses(indexer)
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -477,6 +482,13 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, er
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending state diff", "block height", blockNumber)

// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()

Expand All @@ -493,6 +505,13 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload
currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
log.Info("sending state diff", "block hash", blockHash)

// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()

Expand Down Expand Up @@ -777,6 +796,15 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ
// This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
log.Info("writing state diff at", "block height", blockNumber)

// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()

Expand All @@ -793,6 +821,15 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
// This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error {
log.Info("writing state diff for", "block hash", blockHash)

// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()

Expand Down Expand Up @@ -895,9 +932,11 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
}

// update the db
err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
if err != nil {
return err
if sds.indexer != nil {
err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
if err != nil {
return err
}
}

// update in-memory params
Expand All @@ -917,9 +956,11 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
}

// update the db
err = sds.indexer.RemoveWatchedAddresses(args)
if err != nil {
return err
if sds.indexer != nil {
err = sds.indexer.RemoveWatchedAddresses(args)
if err != nil {
return err
}
}

// update in-memory params
Expand All @@ -933,19 +974,23 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
}

// update the db
err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber)
if err != nil {
return err
if sds.indexer != nil {
err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber)
if err != nil {
return err
}
}

// update in-memory params
writeLoopParams.WatchedAddresses = argAddresses
writeLoopParams.ComputeWatchedAddressesLeafPaths()
case types2.Clear:
// update the db
err := sds.indexer.ClearWatchedAddresses()
if err != nil {
return err
if sds.indexer != nil {
err := sds.indexer.ClearWatchedAddresses()
if err != nil {
return err
}
}

// update in-memory params
Expand Down
7 changes: 4 additions & 3 deletions statediff/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ var (
event3 = core.ChainEvent{Block: testBlock3}

defaultParams = statediff.Params{
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
WatchedAddresses: []common.Address{},
}
)

Expand Down