diff --git a/eth/backend.go b/eth/backend.go index befc1d9d4b71..c71b4f67dcb1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -18,6 +18,7 @@ package eth import ( + "context" "errors" "fmt" "math/big" @@ -54,6 +55,7 @@ import ( "github.com/scroll-tech/go-ethereum/p2p/enode" "github.com/scroll-tech/go-ethereum/params" "github.com/scroll-tech/go-ethereum/rlp" + "github.com/scroll-tech/go-ethereum/rollup/sync_service" "github.com/scroll-tech/go-ethereum/rpc" ) @@ -67,6 +69,7 @@ type Ethereum struct { // Handlers txPool *core.TxPool + syncService *sync_service.SyncService blockchain *core.BlockChain handler *handler ethDialCandidates enode.Iterator @@ -206,6 +209,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain) + eth.syncService, err = sync_service.NewSyncService(context.Background(), eth.blockchain, eth.chainDb) + if err != nil { + return nil, fmt.Errorf("cannot initialize sync service: %w", err) + } + // Permit the downloader to use the trie cache allowance during fast sync cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit checkpoint := config.Checkpoint @@ -555,6 +563,7 @@ func (s *Ethereum) Stop() error { s.bloomIndexer.Close() close(s.closeBloomHandler) s.txPool.Stop() + s.syncService.Stop() s.miner.Close() s.blockchain.Stop() s.engine.Close() diff --git a/rollup/sync_service/bridge_abi.go b/rollup/sync_service/bridge_abi.go new file mode 100644 index 000000000000..fcd088d41706 --- /dev/null +++ b/rollup/sync_service/bridge_abi.go @@ -0,0 +1,39 @@ +package sync_service + +import ( + "math/big" + + "github.com/scroll-tech/go-ethereum/accounts/abi" + "github.com/scroll-tech/go-ethereum/accounts/abi/bind" + "github.com/scroll-tech/go-ethereum/common" +) + +var ( + // L1MessageQueueABI holds information about L1MessageQueue contract's context and available invokable methods. + L1MessageQueueABI *abi.ABI + + // L1QueueTransactionEventSignature = keccak256("QueueTransaction(address,address,uint256,uint256,uint256,bytes)") + L1QueueTransactionEventSignature common.Hash +) + +func init() { + L1MessageQueueABI, _ = L1MessageQueueMetaData.GetAbi() + L1QueueTransactionEventSignature = L1MessageQueueABI.Events["QueueTransaction"].ID +} + +// Generated manually from abigen and only necessary events and mutable calls are kept. + +// L1MessageQueueMetaData contains all meta data concerning the L1MessageQueue contract. +var L1MessageQueueMetaData = &bind.MetaData{ + ABI: "[{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"internalType\":\"address\",\"name\":\"sender\",\"type\":\"address\"},{\"indexed\":true,\"internalType\":\"address\",\"name\":\"target\",\"type\":\"address\"},{\"indexed\":false,\"internalType\":\"uint256\",\"name\":\"value\",\"type\":\"uint256\"},{\"indexed\":false,\"internalType\":\"uint256\",\"name\":\"queueIndex\",\"type\":\"uint256\"},{\"indexed\":false,\"internalType\":\"uint256\",\"name\":\"gasLimit\",\"type\":\"uint256\"},{\"indexed\":false,\"internalType\":\"bytes\",\"name\":\"data\",\"type\":\"bytes\"}],\"name\":\"QueueTransaction\",\"type\":\"event\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"target\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"gasLimit\",\"type\":\"uint256\"},{\"internalType\":\"bytes\",\"name\":\"data\",\"type\":\"bytes\"}],\"name\":\"appendCrossDomainMessage\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"sender\",\"type\":\"address\"},{\"internalType\":\"address\",\"name\":\"target\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"value\",\"type\":\"uint256\"},{\"internalType\":\"uint256\",\"name\":\"gasLimit\",\"type\":\"uint256\"},{\"internalType\":\"bytes\",\"name\":\"data\",\"type\":\"bytes\"}],\"name\":\"appendEnforcedTransaction\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"sender\",\"type\":\"address\"},{\"internalType\":\"address\",\"name\":\"target\",\"type\":\"address\"},{\"internalType\":\"bytes\",\"name\":\"message\",\"type\":\"bytes\"},{\"internalType\":\"uint256\",\"name\":\"gasLimit\",\"type\":\"uint256\"}],\"name\":\"estimateCrossDomainMessageFee\",\"outputs\":[{\"internalType\":\"uint256\",\"name\":\"\",\"type\":\"uint256\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"uint256\",\"name\":\"queueIndex\",\"type\":\"uint256\"}],\"name\":\"getCrossDomainMessage\",\"outputs\":[{\"internalType\":\"bytes32\",\"name\":\"\",\"type\":\"bytes32\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"nextCrossDomainMessageIndex\",\"outputs\":[{\"internalType\":\"uint256\",\"name\":\"\",\"type\":\"uint256\"}],\"stateMutability\":\"view\",\"type\":\"function\"}]", +} + +// L1QueueTransactionEvent represents a QueueTransaction event raised by the L1MessageQueue contract. +type L1QueueTransactionEvent struct { + Sender common.Address + Target common.Address + Value *big.Int + QueueIndex *big.Int + GasLimit *big.Int + Data []byte +} diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go new file mode 100644 index 000000000000..667c1f4498be --- /dev/null +++ b/rollup/sync_service/sync_service.go @@ -0,0 +1,180 @@ +package sync_service + +import ( + "context" + "errors" + "math/big" + "time" + + "github.com/scroll-tech/go-ethereum" + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/core" + "github.com/scroll-tech/go-ethereum/core/rawdb" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/ethclient" + "github.com/scroll-tech/go-ethereum/ethdb" + "github.com/scroll-tech/go-ethereum/log" +) + +const FetchLimit = uint64(20) +const PollInterval = time.Second * 15 + +type SyncService struct { + bc *core.BlockChain + cancel context.CancelFunc + client *ethclient.Client + ctx context.Context + db ethdb.Database + latestProcessedBlock uint64 + pollInterval time.Duration +} + +func NewSyncService(ctx context.Context, bc *core.BlockChain, db ethdb.Database) (*SyncService, error) { + if bc == nil { + return nil, errors.New("must pass BlockChain to SyncService") + } + + client, err := ethclient.Dial("") // cfg.L1Config.Endpoint + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + latestProcessedBlock := rawdb.ReadSyncedL1BlockNumber(db).Uint64() // TODO + + service := SyncService{ + bc: bc, + cancel: cancel, + client: client, + ctx: ctx, + db: db, + latestProcessedBlock: latestProcessedBlock, + pollInterval: PollInterval, + } + + return &service, nil +} + +func (s *SyncService) Start() { + t := time.NewTicker(s.pollInterval) + defer t.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-t.C: + s.fetchMessages() + } + } +} + +func (s *SyncService) Stop() error { + log.Info("Stopping sync service") + + if s.cancel != nil { + defer s.cancel() + } + return nil +} + +func (s *SyncService) fetchMessages() error { + // TODO + latestConfirmed, err := s.client.BlockNumber(s.ctx) + if err != nil { + log.Warn("eth_blockNumber failed", "err", err) + return nil + } + + // query in batches + for from := s.latestProcessedBlock + 1; from <= latestConfirmed; from += FetchLimit { + select { + case <-s.ctx.Done(): + return nil + default: + } + + to := from + FetchLimit - 1 + + if to > latestConfirmed { + to = latestConfirmed + } + + msgs, err := s.fetchMessagesInRange(from, to) + if err != nil { + return err + } + + s.StoreMessages(msgs) + + s.latestProcessedBlock = to + s.SetLatestSyncedL1BlockNumber(to) + } + + return nil +} + +func (s *SyncService) fetchMessagesInRange(from, to uint64) ([]types.L1MessageTx, error) { + query := ethereum.FilterQuery{ + FromBlock: big.NewInt(0).SetUint64(from), + ToBlock: big.NewInt(0).SetUint64(to), + // Addresses: , + Topics: [][]common.Hash{ + {L1QueueTransactionEventSignature}, + }, + } + + logs, err := s.client.FilterLogs(s.ctx, query) + if err != nil { + log.Warn("eth_getLogs failed", "err", err) + return nil, err + } + + if len(logs) == 0 { + return nil, nil + } + + log.Info("Received new L1 events", "fromBlock", from, "toBlock", to, "count", len(logs)) + + msgs, err := s.parseLogs(logs) + if err != nil { + log.Error("Failed to parse emitted events logs", "err", err) + return nil, err + } + + return msgs, nil +} + +func (s *SyncService) parseLogs(logs []types.Log) ([]types.L1MessageTx, error) { + var msgs []types.L1MessageTx + + for _, vLog := range logs { + event := L1QueueTransactionEvent{} + err := UnpackLog(L1MessageQueueABI, &event, "QueueTransaction", vLog) + if err != nil { + log.Warn("Failed to unpack L1 QueueTransaction event", "err", err) + return msgs, err + } + + msgs = append(msgs, types.L1MessageTx{ + Nonce: event.QueueIndex.Uint64(), + Gas: event.GasLimit.Uint64(), + To: &event.Target, + Value: event.Value, + Data: event.Data, + Sender: &event.Sender, + }) + } + + return msgs, nil +} + +func (s *SyncService) SetLatestSyncedL1BlockNumber(number uint64) { + rawdb.WriteSyncedL1BlockNumber(s.db, big.NewInt(0).SetUint64(number)) +} + +func (s *SyncService) StoreMessages(msgs []types.L1MessageTx) { + if len(msgs) > 0 { + rawdb.WriteL1Messages(s.db, msgs) + } +} diff --git a/rollup/sync_service/util.go b/rollup/sync_service/util.go new file mode 100644 index 000000000000..1d3857eccef9 --- /dev/null +++ b/rollup/sync_service/util.go @@ -0,0 +1,28 @@ +package sync_service + +import ( + "fmt" + + "github.com/scroll-tech/go-ethereum/accounts/abi" + "github.com/scroll-tech/go-ethereum/core/types" +) + +// UnpackLog unpacks a retrieved log into the provided output structure. +// @todo: add unit test. +func UnpackLog(c *abi.ABI, out interface{}, event string, log types.Log) error { + if log.Topics[0] != c.Events[event].ID { + return fmt.Errorf("event signature mismatch") + } + if len(log.Data) > 0 { + if err := c.UnpackIntoInterface(out, event, log.Data); err != nil { + return err + } + } + var indexed abi.Arguments + for _, arg := range c.Events[event].Inputs { + if arg.Indexed { + indexed = append(indexed, arg) + } + } + return abi.ParseTopics(out, indexed, log.Topics[1:]) +}