Skip to content

Commit

Permalink
feat(sync): save sync state on store
Browse files Browse the repository at this point in the history
  • Loading branch information
bdeneux committed Nov 23, 2022
1 parent 35753b6 commit 8506840
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 13 deletions.
55 changes: 43 additions & 12 deletions app/actor/synchronization/actor.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,54 @@
package synchronization

import (
"context"
"fmt"
"time"

"okp4/nemeton-leaderboard/app/event"
"okp4/nemeton-leaderboard/app/message"
"okp4/nemeton-leaderboard/app/offset"

"github.com/asynkron/protoactor-go/actor"
"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
"github.com/rs/zerolog/log"
)

const ownerOffset = "block-synchronization"

type Actor struct {
context context.Context
grpcClientProps *actor.Props
grpcClient *actor.PID
eventStore *actor.PID
offsetStore *offset.Store
currentBlock int64
}

func NewActor(grpcClientProps *actor.Props, eventStore *actor.PID, blockHeight int64) *Actor {
func NewActor(grpcClientProps *actor.Props, eventStore *actor.PID, mongoUri, dbName string) (*Actor, error) {
ctx := context.Background()
store, err := offset.NewStore(ctx, mongoUri, dbName, ownerOffset)
if err != nil {
return nil, err
}

storeValue, err := store.Get(ctx)
var currentBlock int64
switch resp := storeValue.(type) {
case int64:
currentBlock = resp
default:
currentBlock = 1
}

return &Actor{
context: ctx,
grpcClientProps: grpcClientProps,
grpcClient: nil,
eventStore: eventStore,
currentBlock: blockHeight,
}
offsetStore: store,
currentBlock: currentBlock,
}, nil
}

func (a *Actor) Receive(ctx actor.Context) {
Expand All @@ -47,8 +70,8 @@ func (a *Actor) startSynchronization(ctx actor.Context) {
}

go func() {
for range time.Tick(8 * time.Second) {
block, err := a.getBlock(ctx, a.currentBlock)
for range time.Tick(5 * time.Second) {
block, err := a.getBlock(ctx, a.currentBlock+1)
if err != nil {
log.Err(err).Msg("❌ Could not get block.")
continue
Expand All @@ -69,7 +92,13 @@ func (a *Actor) startSynchronization(ctx actor.Context) {
ctx.Send(a.eventStore, &message.PublishEventMessage{Event: event.NewEvent(NewBlockEventType, blockData)})

log.Info().Int64("blockHeight", block.Header.Height).Msg("Successful request block")
a.currentBlock++

if a.offsetStore.Save(a.context, block.Header.Height) != nil {
log.Err(err).Msg("❌ Failed saved current block height into database")
continue
}

a.currentBlock = block.Header.Height
}
}()
}
Expand All @@ -89,7 +118,7 @@ func (a *Actor) catchUpSyncBlocks(ctx actor.Context) error {
return fmt.Errorf("wrong response message")
}

if a.currentBlock >= latestBlock.Header.Height {
if a.currentBlock+1 >= latestBlock.Header.Height {
return nil
}

Expand All @@ -98,11 +127,10 @@ func (a *Actor) catchUpSyncBlocks(ctx actor.Context) error {
Int64("latestBlock", latestBlock.Header.Height).
Msg("Need to catch up to latest block.")

for i := a.currentBlock; i <= latestBlock.Header.Height; i++ {
for i := a.currentBlock + 1; i <= latestBlock.Header.Height; i++ {
block, err := a.getBlock(ctx, i)
if err != nil {
log.Panic().Err(err).Msg("❌ Could not get block for sync.")
continue
}

blockEvent := NewBlockEvent{
Expand All @@ -113,16 +141,19 @@ func (a *Actor) catchUpSyncBlocks(ctx actor.Context) error {

blockData, err := blockEvent.Marshall()
if err != nil {
log.Err(err).Msg("❌ Failed to marshall event to map interface")
continue
log.Panic().Err(err).Msg("❌ Failed to marshall event to map interface")
}

ctx.Send(a.eventStore, &message.PublishEventMessage{Event: event.NewEvent(NewBlockEventType, blockData)})

if a.offsetStore.Save(a.context, block.Header.Height) != nil {
log.Panic().Err(err).Msg("❌ Failed saved block height into database")
}

log.Info().Int64("blockHeight", block.Header.Height).Msg("Successful request block on sync")
}

a.currentBlock = latestBlock.Header.Height + 1
a.currentBlock = latestBlock.Header.Height
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion app/system/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ func boot(ctx actor.Context, listenAddr, mongoURI, dbName, grpcAddr string, tls
}

blockSync := actor.PropsFromProducer(func() actor.Actor {
return synchronization.NewActor(grpcClientProps, eventStorePID, 16757)
sync, err := synchronization.NewActor(grpcClientProps, eventStorePID, mongoURI, dbName)
if err != nil {
log.Panic().Err(err).Msg("❌ Could not start block synchronisation actor")
}
return sync
})
if _, err := ctx.SpawnNamed(blockSync, "blockSync"); err != nil {
log.Panic().Err(err).Msg("❌Could not create block sync actor")
Expand Down

0 comments on commit 8506840

Please sign in to comment.