Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume with blockchain progress endpoints #17

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,22 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
} else {
c.log.Info("Sent progress report",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"latestBlockNumber", request.LatestBlockNumber,
"duration", time.Since(start),
)
}
}()

request = BlockchainProgress{
LastIngestedBlockNumber: progress.LastIngestedBlockNumber,
LatestBlockNumber: progress.LatestBlockNumber,
}
url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
payload, err := json.Marshal(progress)
payload, err := json.Marshal(request)
if err != nil {
return err
}
c.log.Info("Sending request", "url", url, "payload", string(payload))
req, err := retryablehttp.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload))
if err != nil {
return err
Expand Down
22 changes: 19 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ func main() {
stdlog.Fatal(err)
}

ctx, cancelFn := context.WithCancel(context.Background())

// Get stored progress unless config indicates we should start from 0
var startBlockNumber int64
// Default to -1 to start where the ingester left off
if cfg.BlockHeight == -1 {
progress, err := duneClient.GetProgressReport(ctx)
if err != nil {
stdlog.Fatal(err)
}
startBlockNumber = progress.LastIngestedBlockNumber + 1
} else {
startBlockNumber = cfg.BlockHeight
}

maxCount := int64(0) // 0 means ingest until cancelled
ingester := ingester.New(
logger,
rpcClient,
Expand All @@ -73,15 +89,15 @@ func main() {
},
)

ctx, cancelFn := context.WithCancel(context.Background())

wg.Add(1)
go func() {
defer wg.Done()
err := ingester.Run(ctx, cfg.BlockHeight, 0 /* maxCount */)
err := ingester.Run(ctx, startBlockNumber, maxCount)
logger.Info("Ingester finished", "err", err)
}()

defer ingester.Close()

// TODO: add a metrics exporter or healthcheck http endpoint ?

quit := make(chan os.Signal, 1)
Expand Down
6 changes: 4 additions & 2 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (

type Ingester interface {
// Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested
Run(ctx context.Context, startBlockNumber, maxCount int64) error
Run(ctx context.Context, startBlockNumber int64, maxCount int64) error

// ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continuously until the context is cancelled
ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error
ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber int64, endBlockNumber int64) error

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
Expand All @@ -28,6 +28,8 @@ type Ingester interface {

// This is just a placeholder for now
Info() Info

Close() error
}

const (
Expand Down
112 changes: 74 additions & 38 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,43 @@ import (
"golang.org/x/sync/errgroup"
)

func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) error {
inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize)
defer close(inFlightChan)
func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error {
ctx, cancel := context.WithCancel(ctx)

var err error
inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize) // we close this after ConsumeBlocks has returned

if startBlockNumber < 0 {
startBlockNumber, err = i.node.LatestBlockNumber()
if err != nil {
return errors.Errorf("failed to get latest block number: %w", err)
}
}
// Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever
endBlockNumber := startBlockNumber - 1 + maxCount

i.log.Info("Starting ingester",
"maxBatchSize", i.cfg.MaxBatchSize,
"startBlockNumber", startBlockNumber,
"endBlockNumber", endBlockNumber,
"maxCount", maxCount,
)

errGroup, ctx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, startBlockNumber+maxCount)
})
errGroup.Go(func() error {
return i.SendBlocks(ctx, inFlightChan)
})
errGroup.Go(func() error {
return i.ReportProgress(ctx)
})

err := i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, endBlockNumber)
close(inFlightChan)
cancel()
if err != nil {
if err := errGroup.Wait(); err != nil {
i.log.Error("errgroup wait", "error", err)
}
return errors.Errorf("consume blocks: %w", err)
}

if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks {
return err
}

return nil
}

Expand All @@ -53,10 +57,9 @@ var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")
func (i *ingester) ConsumeBlocks(
ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64,
) error {
dontStop := endBlockNumber <= startBlockNumber
latestBlockNumber := i.tryUpdateLatestBlockNumber()

waitForBlock := func(ctx context.Context, blockNumber, latestBlockNumber int64) int64 {
waitForBlock := func(ctx context.Context, blockNumber int64, latestBlockNumber int64) int64 {
for blockNumber > latestBlockNumber {
select {
case <-ctx.Done():
Expand All @@ -72,15 +75,19 @@ func (i *ingester) ConsumeBlocks(
return latestBlockNumber
}

// Consume blocks forever if end is before start. This happens if Run is called with a maxCount of <= 0
dontStop := endBlockNumber < startBlockNumber
vegarsti marked this conversation as resolved.
Show resolved Hide resolved

for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ {
latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber)
startTime := time.Now()

i.log.Info("Getting block by number", "blockNumber", blockNumber, "latestBlockNumber", latestBlockNumber)
block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
return err
return ctx.Err()
}

i.log.Error("Failed to get block by number, continuing..",
Expand Down Expand Up @@ -119,33 +126,33 @@ func (i *ingester) ConsumeBlocks(
)
}
}
return ErrFinishedConsumeBlocks // FIXME: this is wrong
// Done consuming blocks, either because we reached the endBlockNumber or the context was canceled
i.log.Info("Finished consuming blocks", "latestBlockNumber", latestBlockNumber, "endBlockNumber", endBlockNumber)
vegarsti marked this conversation as resolved.
Show resolved Hide resolved
return ErrFinishedConsumeBlocks
}

func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error {
for {
select {
case <-ctx.Done():
return nil // context canceled
case payload, ok := <-blocksCh:
// TODO: we should batch RCP blocks here before sending to Dune.
if !ok {
return nil // channel closed
}
if err := i.dune.SendBlock(ctx, payload); err != nil {
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: payload.BlockNumber,
Error: err,
})
} else {
atomic.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber)
for payload := range blocksCh {
// TODO: we should batch RCP blocks here before sending to Dune.
if err := i.dune.SendBlock(ctx, payload); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
return ctx.Err()
}
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: payload.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", payload.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber)
}
}
return ctx.Err() // channel closed
}

func (i *ingester) tryUpdateLatestBlockNumber() int64 {
Expand All @@ -169,7 +176,7 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
return ctx.Err()
case tNow := <-timer.C:
latest := atomic.LoadInt64(&i.info.LatestBlockNumber)
lastIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber)
Expand Down Expand Up @@ -202,10 +209,39 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
previousIngested = lastIngested
previousDistance = newDistance
previousTime = tNow

// TODO: include errors in the report, reset the error list
err := i.dune.PostProgressReport(ctx, models.BlockchainIndexProgress{
vegarsti marked this conversation as resolved.
Show resolved Hide resolved
BlockchainName: i.cfg.BlockchainName,
EVMStack: i.cfg.Stack.String(),
LastIngestedBlockNumber: lastIngested,
LatestBlockNumber: latest,
})
if err != nil {
i.log.Error("Failed to post progress report", "error", err)
}
}
}
}

func (i *ingester) Close() error {
// Send a final progress report to flush progress
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
i.log.Info("Sending final progress report")
err := i.dune.PostProgressReport(
ctx,
models.BlockchainIndexProgress{
BlockchainName: i.cfg.BlockchainName,
EVMStack: i.cfg.Stack.String(),
LastIngestedBlockNumber: i.info.IngestedBlockNumber,
LatestBlockNumber: i.info.LatestBlockNumber,
})
i.log.Info("Closing node")
if err != nil {
_ = i.node.Close()
return err
}

return i.node.Close()
}
Loading
Loading