diff --git a/block/block_exchange.go b/block/block_sync.go similarity index 61% rename from block/block_exchange.go rename to block/block_sync.go index 60728be2731..8a3e2159efe 100644 --- a/block/block_exchange.go +++ b/block/block_sync.go @@ -24,10 +24,10 @@ import ( "github.com/rollkit/rollkit/types" ) -// P2P Exchange Service for block that implements the go-header interface. +// P2P Sync Service for block that implements the go-header interface. // Contains a block store where synced blocks are stored. // Uses the go-header library for handling all P2P logic. -type BlockExchangeService struct { +type BlockSyncService struct { conf config.NodeConfig genesis *cmtypes.GenesisDoc p2p *p2p.Client @@ -43,7 +43,7 @@ type BlockExchangeService struct { ctx context.Context } -func NewBlockExchangeService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*BlockExchangeService, error) { +func NewBlockSyncService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*BlockSyncService, error) { if genesis == nil { return nil, errors.New("genesis doc cannot be nil") } @@ -56,12 +56,12 @@ func NewBlockExchangeService(ctx context.Context, store ds.TxnDatastore, conf co if !ok { return nil, errors.New("failed to access the datastore") } - ss, err := goheaderstore.NewStore[*types.Block](storeBatch, goheaderstore.WithStorePrefix("blockEx")) + ss, err := goheaderstore.NewStore[*types.Block](storeBatch, goheaderstore.WithStorePrefix("blockSync")) if err != nil { return nil, fmt.Errorf("failed to initialize the block store: %w", err) } - return &BlockExchangeService{ + return &BlockSyncService{ conf: conf, genesis: genesis, p2p: p2p, @@ -72,19 +72,19 @@ func NewBlockExchangeService(ctx context.Context, store ds.TxnDatastore, conf co }, nil } -// BlockStore returns the blockstore of the BlockExchangeService -func (bExService *BlockExchangeService) BlockStore() *goheaderstore.Store[*types.Block] { - return bExService.blockStore +// BlockStore returns the blockstore of the BlockSyncService +func (bSyncService *BlockSyncService) BlockStore() *goheaderstore.Store[*types.Block] { + return bSyncService.blockStore } -func (bExService *BlockExchangeService) initBlockStoreAndStartSyncer(ctx context.Context, initial *types.Block) error { +func (bSyncService *BlockSyncService) initBlockStoreAndStartSyncer(ctx context.Context, initial *types.Block) error { if initial == nil { return fmt.Errorf("failed to initialize the blockstore and start syncer") } - if err := bExService.blockStore.Init(ctx, initial); err != nil { + if err := bSyncService.blockStore.Init(ctx, initial); err != nil { return err } - if err := bExService.StartSyncer(); err != nil { + if err := bSyncService.StartSyncer(); err != nil { return err } return nil @@ -92,74 +92,74 @@ func (bExService *BlockExchangeService) initBlockStoreAndStartSyncer(ctx context // Initialize block store if needed and broadcasts provided block. // Note: Only returns an error in case block store can't be initialized. Logs error if there's one while broadcasting. -func (bExService *BlockExchangeService) WriteToBlockStoreAndBroadcast(ctx context.Context, block *types.Block) error { +func (bSyncService *BlockSyncService) WriteToBlockStoreAndBroadcast(ctx context.Context, block *types.Block) error { // For genesis block initialize the store and start the syncer - if int64(block.Height()) == bExService.genesis.InitialHeight { - if err := bExService.blockStore.Init(ctx, block); err != nil { + if int64(block.Height()) == bSyncService.genesis.InitialHeight { + if err := bSyncService.blockStore.Init(ctx, block); err != nil { return fmt.Errorf("failed to initialize block store") } - if err := bExService.StartSyncer(); err != nil { + if err := bSyncService.StartSyncer(); err != nil { return fmt.Errorf("failed to start syncer after initializing block store") } } // Broadcast for subscribers - if err := bExService.sub.Broadcast(ctx, block); err != nil { - bExService.logger.Error("failed to broadcast block", "error", err) + if err := bSyncService.sub.Broadcast(ctx, block); err != nil { + bSyncService.logger.Error("failed to broadcast block", "error", err) } return nil } -func (bExService *BlockExchangeService) isInitialized() bool { - return bExService.blockStore.Height() > 0 +func (bSyncService *BlockSyncService) isInitialized() bool { + return bSyncService.blockStore.Height() > 0 } // OnStart is a part of Service interface. -func (bExService *BlockExchangeService) Start() error { +func (bSyncService *BlockSyncService) Start() error { // have to do the initializations here to utilize the p2p node which is created on start - ps := bExService.p2p.PubSub() - chainIDBlock := bExService.genesis.ChainID + "-block" - bExService.sub = goheaderp2p.NewSubscriber[*types.Block](ps, pubsub.DefaultMsgIdFn, chainIDBlock) - if err := bExService.sub.Start(bExService.ctx); err != nil { + ps := bSyncService.p2p.PubSub() + chainIDBlock := bSyncService.genesis.ChainID + "-block" + bSyncService.sub = goheaderp2p.NewSubscriber[*types.Block](ps, pubsub.DefaultMsgIdFn, chainIDBlock) + if err := bSyncService.sub.Start(bSyncService.ctx); err != nil { return fmt.Errorf("error while starting subscriber: %w", err) } - if _, err := bExService.sub.Subscribe(); err != nil { + if _, err := bSyncService.sub.Subscribe(); err != nil { return fmt.Errorf("error while subscribing: %w", err) } - if err := bExService.blockStore.Start(bExService.ctx); err != nil { + if err := bSyncService.blockStore.Start(bSyncService.ctx); err != nil { return fmt.Errorf("error while starting block store: %w", err) } var err error - _, _, network, err := bExService.p2p.Info() + _, _, network, err := bSyncService.p2p.Info() if err != nil { return fmt.Errorf("error while fetching the network: %w", err) } networkIDBlock := network + "-block" - if bExService.p2pServer, err = newBlockP2PServer(bExService.p2p.Host(), bExService.blockStore, networkIDBlock); err != nil { + if bSyncService.p2pServer, err = newBlockP2PServer(bSyncService.p2p.Host(), bSyncService.blockStore, networkIDBlock); err != nil { return fmt.Errorf("error while creating p2p server: %w", err) } - if err := bExService.p2pServer.Start(bExService.ctx); err != nil { + if err := bSyncService.p2pServer.Start(bSyncService.ctx); err != nil { return fmt.Errorf("error while starting p2p server: %w", err) } - peerIDs := bExService.p2p.PeerIDs() - if bExService.ex, err = newBlockP2PExchange(bExService.p2p.Host(), peerIDs, networkIDBlock, chainIDBlock, bExService.p2p.ConnectionGater()); err != nil { + peerIDs := bSyncService.p2p.PeerIDs() + if bSyncService.ex, err = newBlockP2PExchange(bSyncService.p2p.Host(), peerIDs, networkIDBlock, chainIDBlock, bSyncService.p2p.ConnectionGater()); err != nil { return fmt.Errorf("error while creating exchange: %w", err) } - if err := bExService.ex.Start(bExService.ctx); err != nil { + if err := bSyncService.ex.Start(bSyncService.ctx); err != nil { return fmt.Errorf("error while starting exchange: %w", err) } - if bExService.syncer, err = newBlockSyncer(bExService.ex, bExService.blockStore, bExService.sub, goheadersync.WithBlockTime(bExService.conf.BlockTime)); err != nil { + if bSyncService.syncer, err = newBlockSyncer(bSyncService.ex, bSyncService.blockStore, bSyncService.sub, goheadersync.WithBlockTime(bSyncService.conf.BlockTime)); err != nil { return fmt.Errorf("error while creating syncer: %w", err) } - if bExService.isInitialized() { - if err := bExService.StartSyncer(); err != nil { + if bSyncService.isInitialized() { + if err := bSyncService.StartSyncer(); err != nil { return fmt.Errorf("error while starting the syncer: %w", err) } return nil @@ -169,36 +169,36 @@ func (bExService *BlockExchangeService) Start() error { var trustedBlock *types.Block // Try fetching the trusted block from peers if exists if len(peerIDs) > 0 { - if bExService.conf.TrustedHash != "" { - trustedHashBytes, err := hex.DecodeString(bExService.conf.TrustedHash) + if bSyncService.conf.TrustedHash != "" { + trustedHashBytes, err := hex.DecodeString(bSyncService.conf.TrustedHash) if err != nil { return fmt.Errorf("failed to parse the trusted hash for initializing the blockstore: %w", err) } - if trustedBlock, err = bExService.ex.Get(bExService.ctx, header.Hash(trustedHashBytes)); err != nil { + if trustedBlock, err = bSyncService.ex.Get(bSyncService.ctx, header.Hash(trustedHashBytes)); err != nil { return fmt.Errorf("failed to fetch the trusted block for initializing the blockStore: %w", err) } } else { // Try fetching the genesis block if available, otherwise fallback to blocks - if trustedBlock, err = bExService.ex.GetByHeight(bExService.ctx, uint64(bExService.genesis.InitialHeight)); err != nil { + if trustedBlock, err = bSyncService.ex.GetByHeight(bSyncService.ctx, uint64(bSyncService.genesis.InitialHeight)); err != nil { // Full/light nodes have to wait for aggregator to publish the genesis block // proposing aggregator can init the store and start the syncer when the first block is published return fmt.Errorf("failed to fetch the genesis block: %w", err) } } - return bExService.initBlockStoreAndStartSyncer(bExService.ctx, trustedBlock) + return bSyncService.initBlockStoreAndStartSyncer(bSyncService.ctx, trustedBlock) } return nil } // OnStop is a part of Service interface. -func (bExService *BlockExchangeService) Stop() error { - err := bExService.blockStore.Stop(bExService.ctx) - err = multierr.Append(err, bExService.p2pServer.Stop(bExService.ctx)) - err = multierr.Append(err, bExService.ex.Stop(bExService.ctx)) - err = multierr.Append(err, bExService.sub.Stop(bExService.ctx)) - if bExService.syncerStatus.isStarted() { - err = multierr.Append(err, bExService.syncer.Stop(bExService.ctx)) +func (bSyncService *BlockSyncService) Stop() error { + err := bSyncService.blockStore.Stop(bSyncService.ctx) + err = multierr.Append(err, bSyncService.p2pServer.Stop(bSyncService.ctx)) + err = multierr.Append(err, bSyncService.ex.Stop(bSyncService.ctx)) + err = multierr.Append(err, bSyncService.sub.Stop(bSyncService.ctx)) + if bSyncService.syncerStatus.isStarted() { + err = multierr.Append(err, bSyncService.syncer.Stop(bSyncService.ctx)) } return err } @@ -240,16 +240,16 @@ func newBlockSyncer( return goheadersync.NewSyncer[*types.Block](ex, store, sub, opt) } -func (bExService *BlockExchangeService) StartSyncer() error { - bExService.syncerStatus.m.Lock() - defer bExService.syncerStatus.m.Unlock() - if bExService.syncerStatus.started { +func (bSyncService *BlockSyncService) StartSyncer() error { + bSyncService.syncerStatus.m.Lock() + defer bSyncService.syncerStatus.m.Unlock() + if bSyncService.syncerStatus.started { return nil } - err := bExService.syncer.Start(bExService.ctx) + err := bSyncService.syncer.Start(bSyncService.ctx) if err != nil { return err } - bExService.syncerStatus.started = true + bSyncService.syncerStatus.started = true return nil } diff --git a/block/header_exchange.go b/block/header_sync.go similarity index 60% rename from block/header_exchange.go rename to block/header_sync.go index f83c6a376f8..14b2676d8e4 100644 --- a/block/header_exchange.go +++ b/block/header_sync.go @@ -24,10 +24,10 @@ import ( "github.com/rollkit/rollkit/types" ) -// P2P Exchange Service for header that implements the go-header interface. +// P2P Sync Service for header that implements the go-header interface. // Contains a header store where synced headers are stored. // Uses the go-header library for handling all P2P logic. -type HeaderExchangeService struct { +type HeaderSynceService struct { conf config.NodeConfig genesis *cmtypes.GenesisDoc p2p *p2p.Client @@ -43,7 +43,7 @@ type HeaderExchangeService struct { ctx context.Context } -func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*HeaderExchangeService, error) { +func NewHeaderSynceService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*HeaderSynceService, error) { if genesis == nil { return nil, errors.New("genesis doc cannot be nil") } @@ -56,12 +56,12 @@ func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf c if !ok { return nil, errors.New("failed to access the datastore") } - ss, err := goheaderstore.NewStore[*types.SignedHeader](storeBatch, goheaderstore.WithStorePrefix("headerEx")) + ss, err := goheaderstore.NewStore[*types.SignedHeader](storeBatch, goheaderstore.WithStorePrefix("headerSync")) if err != nil { return nil, fmt.Errorf("failed to initialize the header store: %w", err) } - return &HeaderExchangeService{ + return &HeaderSynceService{ conf: conf, genesis: genesis, p2p: p2p, @@ -72,19 +72,19 @@ func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf c }, nil } -// HeaderStore returns the headerstore of the HeaderExchangeService -func (hExService *HeaderExchangeService) HeaderStore() *goheaderstore.Store[*types.SignedHeader] { - return hExService.headerStore +// HeaderStore returns the headerstore of the HeaderSynceService +func (hSyncService *HeaderSynceService) HeaderStore() *goheaderstore.Store[*types.SignedHeader] { + return hSyncService.headerStore } -func (hExService *HeaderExchangeService) initHeaderStoreAndStartSyncer(ctx context.Context, initial *types.SignedHeader) error { +func (hSyncService *HeaderSynceService) initHeaderStoreAndStartSyncer(ctx context.Context, initial *types.SignedHeader) error { if initial == nil { return fmt.Errorf("failed to initialize the headerstore and start syncer") } - if err := hExService.headerStore.Init(ctx, initial); err != nil { + if err := hSyncService.headerStore.Init(ctx, initial); err != nil { return err } - if err := hExService.StartSyncer(); err != nil { + if err := hSyncService.StartSyncer(); err != nil { return err } return nil @@ -92,71 +92,71 @@ func (hExService *HeaderExchangeService) initHeaderStoreAndStartSyncer(ctx conte // Initialize header store if needed and broadcasts provided header. // Note: Only returns an error in case header store can't be initialized. Logs error if there's one while broadcasting. -func (hExService *HeaderExchangeService) WriteToHeaderStoreAndBroadcast(ctx context.Context, signedHeader *types.SignedHeader) error { +func (hSyncService *HeaderSynceService) WriteToHeaderStoreAndBroadcast(ctx context.Context, signedHeader *types.SignedHeader) error { // For genesis header initialize the store and start the syncer - if int64(signedHeader.Height()) == hExService.genesis.InitialHeight { - if err := hExService.headerStore.Init(ctx, signedHeader); err != nil { + if int64(signedHeader.Height()) == hSyncService.genesis.InitialHeight { + if err := hSyncService.headerStore.Init(ctx, signedHeader); err != nil { return fmt.Errorf("failed to initialize header store") } - if err := hExService.StartSyncer(); err != nil { + if err := hSyncService.StartSyncer(); err != nil { return fmt.Errorf("failed to start syncer after initializing header store: %w", err) } } // Broadcast for subscribers - if err := hExService.sub.Broadcast(ctx, signedHeader); err != nil { - hExService.logger.Error("failed to broadcast block header", "error", err) + if err := hSyncService.sub.Broadcast(ctx, signedHeader); err != nil { + hSyncService.logger.Error("failed to broadcast block header", "error", err) } return nil } -func (hExService *HeaderExchangeService) isInitialized() bool { - return hExService.headerStore.Height() > 0 +func (hSyncService *HeaderSynceService) isInitialized() bool { + return hSyncService.headerStore.Height() > 0 } // OnStart is a part of Service interface. -func (hExService *HeaderExchangeService) Start() error { +func (hSyncService *HeaderSynceService) Start() error { // have to do the initializations here to utilize the p2p node which is created on start - ps := hExService.p2p.PubSub() - hExService.sub = goheaderp2p.NewSubscriber[*types.SignedHeader](ps, pubsub.DefaultMsgIdFn, hExService.genesis.ChainID) - if err := hExService.sub.Start(hExService.ctx); err != nil { + ps := hSyncService.p2p.PubSub() + hSyncService.sub = goheaderp2p.NewSubscriber[*types.SignedHeader](ps, pubsub.DefaultMsgIdFn, hSyncService.genesis.ChainID) + if err := hSyncService.sub.Start(hSyncService.ctx); err != nil { return fmt.Errorf("error while starting subscriber: %w", err) } - if _, err := hExService.sub.Subscribe(); err != nil { + if _, err := hSyncService.sub.Subscribe(); err != nil { return fmt.Errorf("error while subscribing: %w", err) } - if err := hExService.headerStore.Start(hExService.ctx); err != nil { + if err := hSyncService.headerStore.Start(hSyncService.ctx); err != nil { return fmt.Errorf("error while starting header store: %w", err) } var err error - _, _, network, err := hExService.p2p.Info() + _, _, network, err := hSyncService.p2p.Info() if err != nil { return fmt.Errorf("error while fetching the network: %w", err) } - if hExService.p2pServer, err = newP2PServer(hExService.p2p.Host(), hExService.headerStore, network); err != nil { + if hSyncService.p2pServer, err = newP2PServer(hSyncService.p2p.Host(), hSyncService.headerStore, network); err != nil { return fmt.Errorf("error while creating p2p server: %w", err) } - if err := hExService.p2pServer.Start(hExService.ctx); err != nil { + if err := hSyncService.p2pServer.Start(hSyncService.ctx); err != nil { return fmt.Errorf("error while starting p2p server: %w", err) } - peerIDs := hExService.p2p.PeerIDs() - if hExService.ex, err = newP2PExchange(hExService.p2p.Host(), peerIDs, network, hExService.genesis.ChainID, hExService.p2p.ConnectionGater()); err != nil { + peerIDs := hSyncService.p2p.PeerIDs() + if hSyncService.ex, err = newP2PExchange(hSyncService.p2p.Host(), peerIDs, network, hSyncService.genesis.ChainID, hSyncService.p2p.ConnectionGater()); err != nil { return fmt.Errorf("error while creating exchange: %w", err) } - if err := hExService.ex.Start(hExService.ctx); err != nil { + if err := hSyncService.ex.Start(hSyncService.ctx); err != nil { return fmt.Errorf("error while starting exchange: %w", err) } - if hExService.syncer, err = newSyncer(hExService.ex, hExService.headerStore, hExService.sub, goheadersync.WithBlockTime(hExService.conf.BlockTime)); err != nil { + if hSyncService.syncer, err = newSyncer(hSyncService.ex, hSyncService.headerStore, hSyncService.sub, goheadersync.WithBlockTime(hSyncService.conf.BlockTime)); err != nil { return fmt.Errorf("error while creating syncer: %w", err) } - if hExService.isInitialized() { - if err := hExService.StartSyncer(); err != nil { + if hSyncService.isInitialized() { + if err := hSyncService.StartSyncer(); err != nil { return fmt.Errorf("error while starting the syncer: %w", err) } return nil @@ -166,37 +166,37 @@ func (hExService *HeaderExchangeService) Start() error { var trustedHeader *types.SignedHeader // Try fetching the trusted header from peers if exists if len(peerIDs) > 0 { - if hExService.conf.TrustedHash != "" { - trustedHashBytes, err := hex.DecodeString(hExService.conf.TrustedHash) + if hSyncService.conf.TrustedHash != "" { + trustedHashBytes, err := hex.DecodeString(hSyncService.conf.TrustedHash) if err != nil { return fmt.Errorf("failed to parse the trusted hash for initializing the headerstore: %w", err) } - if trustedHeader, err = hExService.ex.Get(hExService.ctx, header.Hash(trustedHashBytes)); err != nil { + if trustedHeader, err = hSyncService.ex.Get(hSyncService.ctx, header.Hash(trustedHashBytes)); err != nil { return fmt.Errorf("failed to fetch the trusted header for initializing the headerstore: %w", err) } } else { // Try fetching the genesis header if available, otherwise fallback to signed headers - if trustedHeader, err = hExService.ex.GetByHeight(hExService.ctx, uint64(hExService.genesis.InitialHeight)); err != nil { + if trustedHeader, err = hSyncService.ex.GetByHeight(hSyncService.ctx, uint64(hSyncService.genesis.InitialHeight)); err != nil { // Full/light nodes have to wait for aggregator to publish the genesis header // proposing aggregator can init the store and start the syncer when the first header is published return fmt.Errorf("failed to fetch the genesis header: %w", err) } } - return hExService.initHeaderStoreAndStartSyncer(hExService.ctx, trustedHeader) + return hSyncService.initHeaderStoreAndStartSyncer(hSyncService.ctx, trustedHeader) } return nil } // OnStop is a part of Service interface. -func (hExService *HeaderExchangeService) Stop() error { - err := hExService.headerStore.Stop(hExService.ctx) - err = multierr.Append(err, hExService.p2pServer.Stop(hExService.ctx)) - err = multierr.Append(err, hExService.ex.Stop(hExService.ctx)) - err = multierr.Append(err, hExService.sub.Stop(hExService.ctx)) - if hExService.syncerStatus.isStarted() { - err = multierr.Append(err, hExService.syncer.Stop(hExService.ctx)) +func (hSyncService *HeaderSynceService) Stop() error { + err := hSyncService.headerStore.Stop(hSyncService.ctx) + err = multierr.Append(err, hSyncService.p2pServer.Stop(hSyncService.ctx)) + err = multierr.Append(err, hSyncService.ex.Stop(hSyncService.ctx)) + err = multierr.Append(err, hSyncService.sub.Stop(hSyncService.ctx)) + if hSyncService.syncerStatus.isStarted() { + err = multierr.Append(err, hSyncService.syncer.Stop(hSyncService.ctx)) } return err } @@ -239,16 +239,16 @@ func newSyncer( return goheadersync.NewSyncer[*types.SignedHeader](ex, store, sub, opt) } -func (hExService *HeaderExchangeService) StartSyncer() error { - hExService.syncerStatus.m.Lock() - defer hExService.syncerStatus.m.Unlock() - if hExService.syncerStatus.started { +func (hSyncService *HeaderSynceService) StartSyncer() error { + hSyncService.syncerStatus.m.Lock() + defer hSyncService.syncerStatus.m.Unlock() + if hSyncService.syncerStatus.started { return nil } - err := hExService.syncer.Start(hExService.ctx) + err := hSyncService.syncer.Start(hSyncService.ctx) if err != nil { return err } - hExService.syncerStatus.started = true + hSyncService.syncerStatus.started = true return nil } diff --git a/block/exchange.go b/block/syncer_status.go similarity index 100% rename from block/exchange.go rename to block/syncer_status.go diff --git a/node/full.go b/node/full.go index 2010c8f7ecf..d3963efb78b 100644 --- a/node/full.go +++ b/node/full.go @@ -60,12 +60,12 @@ type FullNode struct { nodeConfig config.NodeConfig - proxyApp proxy.AppConns - eventBus *cmtypes.EventBus - dalc da.DataAvailabilityLayerClient - p2pClient *p2p.Client - hExService *block.HeaderExchangeService - bExService *block.BlockExchangeService + proxyApp proxy.AppConns + eventBus *cmtypes.EventBus + dalc da.DataAvailabilityLayerClient + p2pClient *p2p.Client + hSyncService *block.HeaderSynceService + bSyncService *block.BlockSyncService // TODO(tzdybal): consider extracting "mempool reactor" Mempool mempool.Mempool mempoolIDs *mempoolIDs @@ -120,12 +120,12 @@ func newFullNode( } mainKV := newPrefixKV(baseKV, mainPrefix) - headerExchangeService, err := initHeaderExchangeService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger) + headerSyncService, err := initHeaderSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger) if err != nil { return nil, err } - blockExchangeService, err := initBlockExchangeService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger) + blockSyncService, err := initBlockSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger) if err != nil { return nil, err } @@ -133,7 +133,7 @@ func newFullNode( mempool := initMempool(logger, proxyApp) store := store.New(ctx, mainKV) - blockManager, err := initBlockManager(signingKey, nodeConfig, genesis, store, mempool, proxyApp, dalc, eventBus, logger, blockExchangeService) + blockManager, err := initBlockManager(signingKey, nodeConfig, genesis, store, mempool, proxyApp, dalc, eventBus, logger, blockSyncService) if err != nil { return nil, err } @@ -160,8 +160,8 @@ func newFullNode( TxIndexer: txIndexer, IndexerService: indexerService, BlockIndexer: blockIndexer, - hExService: headerExchangeService, - bExService: blockExchangeService, + hSyncService: headerSyncService, + bSyncService: blockSyncService, ctx: ctx, cancel: cancel, } @@ -217,24 +217,24 @@ func initMempool(logger log.Logger, proxyApp proxy.AppConns) *mempoolv1.TxMempoo return mempool } -func initHeaderExchangeService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.HeaderExchangeService, error) { - headerExchangeService, err := block.NewHeaderExchangeService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "HeaderExchangeService")) +func initHeaderSyncService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.HeaderSynceService, error) { + headerSyncService, err := block.NewHeaderSynceService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "HeaderSyncService")) if err != nil { - return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err) + return nil, fmt.Errorf("HeaderSyncService initialization error: %w", err) } - return headerExchangeService, nil + return headerSyncService, nil } -func initBlockExchangeService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.BlockExchangeService, error) { - blockExchangeService, err := block.NewBlockExchangeService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "BlockExchangeService")) +func initBlockSyncService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.BlockSyncService, error) { + blockSyncService, err := block.NewBlockSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "BlockSyncService")) if err != nil { - return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err) + return nil, fmt.Errorf("HeaderSyncService initialization error: %w", err) } - return blockExchangeService, nil + return blockSyncService, nil } -func initBlockManager(signingKey crypto.PrivKey, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, store store.Store, mempool mempool.Mempool, proxyApp proxy.AppConns, dalc da.DataAvailabilityLayerClient, eventBus *cmtypes.EventBus, logger log.Logger, blockExchangeService *block.BlockExchangeService) (*block.Manager, error) { - blockManager, err := block.NewManager(signingKey, nodeConfig.BlockManagerConfig, genesis, store, mempool, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), blockExchangeService.BlockStore()) +func initBlockManager(signingKey crypto.PrivKey, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, store store.Store, mempool mempool.Mempool, proxyApp proxy.AppConns, dalc da.DataAvailabilityLayerClient, eventBus *cmtypes.EventBus, logger log.Logger, blockSyncService *block.BlockSyncService) (*block.Manager, error) { + blockManager, err := block.NewManager(signingKey, nodeConfig.BlockManagerConfig, genesis, store, mempool, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), blockSyncService.BlockStore()) if err != nil { return nil, fmt.Errorf("BlockManager initialization error: %w", err) } @@ -274,7 +274,7 @@ func (n *FullNode) headerPublishLoop(ctx context.Context) { for { select { case signedHeader := <-n.blockManager.HeaderCh: - err := n.hExService.WriteToHeaderStoreAndBroadcast(ctx, signedHeader) + err := n.hSyncService.WriteToHeaderStoreAndBroadcast(ctx, signedHeader) if err != nil { // failed to init or start headerstore n.Logger.Error(err.Error()) @@ -290,7 +290,7 @@ func (n *FullNode) blockPublishLoop(ctx context.Context) { for { select { case block := <-n.blockManager.BlockCh: - err := n.bExService.WriteToBlockStoreAndBroadcast(ctx, block) + err := n.bSyncService.WriteToBlockStoreAndBroadcast(ctx, block) if err != nil { // failed to init or start blockstore n.Logger.Error(err.Error()) @@ -311,12 +311,12 @@ func (n *FullNode) OnStart() error { return fmt.Errorf("error while starting P2P client: %w", err) } - if err = n.hExService.Start(); err != nil { - return fmt.Errorf("error while starting header exchange service: %w", err) + if err = n.hSyncService.Start(); err != nil { + return fmt.Errorf("error while starting header sync service: %w", err) } - if err = n.bExService.Start(); err != nil { - return fmt.Errorf("error while starting block exchange service: %w", err) + if err = n.bSyncService.Start(); err != nil { + return fmt.Errorf("error while starting block sync service: %w", err) } if err = n.dalc.Start(); err != nil { @@ -356,8 +356,8 @@ func (n *FullNode) OnStop() { n.cancel() err := n.dalc.Stop() err = multierr.Append(err, n.p2pClient.Close()) - err = multierr.Append(err, n.hExService.Stop()) - err = multierr.Append(err, n.bExService.Stop()) + err = multierr.Append(err, n.hSyncService.Stop()) + err = multierr.Append(err, n.bSyncService.Stop()) n.Logger.Error("errors while stopping node:", "errors", err) } diff --git a/node/full_node.md b/node/full_node.md index 9e961544909..5a858e46757 100644 --- a/node/full_node.md +++ b/node/full_node.md @@ -48,11 +48,11 @@ The [Data Availability Layer Client][dalc] is used to interact with the data ava ### hExService -The [Header Exchange Service] is used for exchanging block headers between nodes over P2P. +The [Header Sync Service] is used for syncing block headers between nodes over P2P. -### bExService +### bSyncService -The [Block Exchange Service] is used for exchanging blocks between nodes over P2P. +The [Block Sync Service] is used for syncing blocks between nodes over P2P. ## Message Structure/Communication Format @@ -90,9 +90,9 @@ See [full node] [11] [DA registry][DA registry] -[12] [Header Exchange Service][Header Exchange Service] +[12] [Header Sync Service][Header Sync Service] -[13] [Block Exchange Service][Block Exchange Service] +[13] [Block Sync Service][Block Sync Service] [full node]: ../node/full.go [ABCI app connections]: https://github.com/cometbft/cometbft/blob/main/spec/abci/abci%2B%2B_basic_concepts.md @@ -105,5 +105,5 @@ See [full node] [Block Manager]: ../block/manager.go [dalc]: ../da/da.go [DA registry]: ../da/registry/registry.go -[Header Exchange Service]: ../block/header_exchange.go -[Block Exchange Service]: ../block/block_exchange.go +[Header Sync Service]: ../block/header_sync.go +[Block Sync Service]: ../block/block_sync.go diff --git a/node/full_node_integration_test.go b/node/full_node_integration_test.go index bcca472156f..dab1d4a7b06 100644 --- a/node/full_node_integration_test.go +++ b/node/full_node_integration_test.go @@ -353,7 +353,7 @@ func testSingleAggregatorSingleFullNodeTrustedHash(t *testing.T, source Source) require.NoError(waitForFirstBlock(node1, source)) // Get the trusted hash from node1 and pass it to node2 config - trustedHash, err := node1.hExService.HeaderStore().GetByHeight(aggCtx, 1) + trustedHash, err := node1.hSyncService.HeaderStore().GetByHeight(aggCtx, 1) require.NoError(err) node2.nodeConfig.TrustedHash = trustedHash.Hash().String() require.NoError(node2.Start()) diff --git a/node/light.go b/node/light.go index a88cd0049e5..62508335903 100644 --- a/node/light.go +++ b/node/light.go @@ -28,7 +28,7 @@ type LightNode struct { proxyApp proxy.AppConns - hExService *block.HeaderExchangeService + hSyncService *block.HeaderSynceService ctx context.Context cancel context.CancelFunc @@ -62,19 +62,19 @@ func newLightNode( return nil, err } - headerExchangeService, err := block.NewHeaderExchangeService(ctx, datastore, conf, genesis, client, logger.With("module", "HeaderExchangeService")) + headerSyncService, err := block.NewHeaderSynceService(ctx, datastore, conf, genesis, client, logger.With("module", "HeaderSyncService")) if err != nil { - return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err) + return nil, fmt.Errorf("HeaderSyncService initialization error: %w", err) } ctx, cancel := context.WithCancel(ctx) node := &LightNode{ - P2P: client, - proxyApp: proxyApp, - hExService: headerExchangeService, - cancel: cancel, - ctx: ctx, + P2P: client, + proxyApp: proxyApp, + hSyncService: headerSyncService, + cancel: cancel, + ctx: ctx, } node.P2P.SetTxValidator(node.falseValidator()) @@ -97,8 +97,8 @@ func (ln *LightNode) OnStart() error { return err } - if err := ln.hExService.Start(); err != nil { - return fmt.Errorf("error while starting header exchange service: %w", err) + if err := ln.hSyncService.Start(); err != nil { + return fmt.Errorf("error while starting header sync service: %w", err) } return nil @@ -108,7 +108,7 @@ func (ln *LightNode) OnStop() { ln.Logger.Info("halting light node...") ln.cancel() err := ln.P2P.Close() - err = multierr.Append(err, ln.hExService.Stop()) + err = multierr.Append(err, ln.hSyncService.Stop()) ln.Logger.Error("errors while stopping node:", "errors", err) } diff --git a/node/test_helpers.go b/node/test_helpers.go index 74803c51dde..3fea5c88687 100644 --- a/node/test_helpers.go +++ b/node/test_helpers.go @@ -65,17 +65,17 @@ func getNodeHeight(node Node, source Source) (uint64, error) { func getNodeHeightFromHeader(node Node) (uint64, error) { if fn, ok := node.(*FullNode); ok { - return fn.hExService.HeaderStore().Height(), nil + return fn.hSyncService.HeaderStore().Height(), nil } if ln, ok := node.(*LightNode); ok { - return ln.hExService.HeaderStore().Height(), nil + return ln.hSyncService.HeaderStore().Height(), nil } return 0, errors.New("not a full or light node") } func getNodeHeightFromBlock(node Node) (uint64, error) { if fn, ok := node.(*FullNode); ok { - return fn.bExService.BlockStore().Height(), nil + return fn.bSyncService.BlockStore().Height(), nil } return 0, errors.New("not a full node") }