Skip to content

Commit

Permalink
fix: use lastBlockNum for updating lastSync of queryAdapters
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh-98 committed Feb 13, 2024
1 parent 4d84bde commit 599ebae
Showing 1 changed file with 28 additions and 22 deletions.
50 changes: 28 additions & 22 deletions models/aggregated_block_feed/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,49 @@ import (
// "fmt"
)

func (mdl *AQFWrapper) Query(queryTill int64) {
// update all queryAdapter only if for the lastBlockNumber as we have fetched prices for that block.
// not update for toSinceTill. if the interval is more than the syncCycle in engin/index.go, the queryAdpater lastsync will be updated but the prices will not be fetched
func (mdl *AQFWrapper) Query(toSinceTill int64) {
if len(mdl.QueryFeeds) == 0 {
return
}
concurrentThreads := 6
ch := make(chan int, concurrentThreads)
// msg
queryFrom := mdl.GetLastSync() + mdl.Interval
log.Infof("Sync %s from %d to %d", mdl.GetName(), queryFrom, queryTill)
log.Infof("Sync %s from %d to %d", mdl.GetName(), queryFrom, toSinceTill)
// timer with query of block
rounds := 0
loopStartTime := time.Now()
roundStartTime := time.Now()
wg := &sync.WaitGroup{}
for blockNum := queryFrom; blockNum <= queryTill; blockNum += mdl.Interval {
mdl.queryPFdeps.aggregatedFetchedBlocks =
append(mdl.queryPFdeps.aggregatedFetchedBlocks, blockNum)
ch <- 1
wg.Add(1)
go mdl.queryAsync(blockNum, ch, wg)
if rounds%100 == 0 {
timeLeft := (time.Since(loopStartTime).Seconds() * float64(queryTill-blockNum)) /
float64(blockNum-mdl.GetLastSync())
timeLeft /= 60
log.Infof("Synced %d in %d rounds(%fs): TimeLeft %f mins", blockNum, rounds, time.Since(roundStartTime).Seconds(), timeLeft)
roundStartTime = time.Now()
lastblockNum := queryFrom
{
rounds := 0
loopStartTime := time.Now()
roundStartTime := time.Now()
wg := &sync.WaitGroup{}
for blockNum := lastblockNum; blockNum <= toSinceTill; blockNum += mdl.Interval {
mdl.queryPFdeps.aggregatedFetchedBlocks =
append(mdl.queryPFdeps.aggregatedFetchedBlocks, blockNum)
ch <- 1
wg.Add(1)
go mdl.queryAsync(blockNum, ch, wg)
if rounds%100 == 0 {
timeLeft := (time.Since(loopStartTime).Seconds() * float64(toSinceTill-blockNum)) /
float64(blockNum-mdl.GetLastSync())
timeLeft /= 60
log.Infof("Synced %d in %d rounds(%fs): TimeLeft %f mins", blockNum, rounds, time.Since(roundStartTime).Seconds(), timeLeft)
roundStartTime = time.Now()
}
rounds++
lastblockNum = blockNum
}
rounds++
wg.Wait()
}
wg.Wait()
// set last_sync on querypricefeed
for _, adapter := range mdl.QueryFeeds {
// yearn price feed can't be disabled from v2
if queryTill <= adapter.GetLastSync() || adapter.IsDisabled() {
if lastblockNum <= adapter.GetLastSync() || adapter.IsDisabled() {
continue
}
adapter.AfterSyncHook(queryTill)
adapter.AfterSyncHook(lastblockNum)
}
// db has saved prices till mdl.GetLastSync()
// queryFrom starts
Expand Down

0 comments on commit 599ebae

Please sign in to comment.