diff --git a/Makefile b/Makefile index ef57b007300f..23e7913c610d 100644 --- a/Makefile +++ b/Makefile @@ -215,7 +215,8 @@ ifeq ($(shell uname), Linux) -v /etc/group:/etc/group:ro \ -v ~/src:/home/$${USER}/src \ -v $(shell pwd):/data -u $$(id -u):$$(id -g) \ - -w /data metadium/bobthe:latest make; \ + -w /data metadium/bobthe:latest \ + make USE_ROCKSDB=$(USE_ROCKSDB); \ fi else @docker --version > /dev/null 2>&1; \ @@ -223,7 +224,8 @@ else echo "Docker not found."; \ else \ docker run -e HOME=/tmp -it --rm -v $(shell pwd):/data \ - -w /data metadium/bobthe:latest make; \ + -w /data metadium/bobthe:latest \ + make USE_ROCKSDB=$(USE_ROCKSDB); \ fi endif diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 15ce58e00b93..880073934dad 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -39,14 +39,14 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/internal/debug" "github.com/ethereum/go-ethereum/log" + metadium "github.com/ethereum/go-ethereum/metadium" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" - metadium "github.com/ethereum/go-ethereum/metadium" cli "gopkg.in/urfave/cli.v1" ) const ( - clientIdentifier = "geth" // Client identifier to advertise over the network + clientIdentifier = "gmet" // Client identifier to advertise over the network ) var ( @@ -182,6 +182,7 @@ var ( utils.UseRocksDb, utils.PrefetchCount, utils.LogFlag, + utils.MaxTxsPerBlock, } ) @@ -189,7 +190,7 @@ func init() { // Initialize the CLI app and start Geth app.Action = geth app.HideVersion = true // we have a command to print the version - app.Copyright = "Copyright 2013-2018 The go-ethereum Authors" + app.Copyright = "Copyright 2013-2019 The go-ethereum / go-metadium Authors" app.Commands = []cli.Command{ // See chaincmd.go: initCommand, @@ -412,7 +413,7 @@ func limitMaxRss(max int64) { if err != nil { log.Error("Getrusage() failed:", "reason", err) } else { - if (rusage.Maxrss > max) { + if rusage.Maxrss > max { log.Info("Calling FreeOSMemory()", "Max", max, "Rusage.Maxrss", rusage.Maxrss) godebug.FreeOSMemory() } diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 44694c75ffb4..5e313d4d7e9b 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -249,6 +249,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.UseRocksDb, utils.PrefetchCount, utils.LogFlag, + utils.MaxTxsPerBlock, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 5c7baf7ea58a..391e52339326 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -685,6 +685,11 @@ var ( Usage: "Rotating log file: ,,", Value: "log,5,10M", } + MaxTxsPerBlock = cli.IntFlag{ + Name: "maxtxsperblock", + Usage: "Max # of transactions in a block", + Value: params.MaxTxsPerBlock, + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -1377,6 +1382,9 @@ func SetMetadiumConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { if ctx.GlobalIsSet(UseRocksDb.Name) { params.UseRocksDb = ctx.GlobalInt(UseRocksDb.Name) } + if ctx.GlobalIsSet(MaxTxsPerBlock.Name) { + params.MaxTxsPerBlock = ctx.GlobalInt(MaxTxsPerBlock.Name) + } if params.ConsensusMethod == params.ConsensusInvalid { params.ConsensusMethod = params.ConsensusPoW diff --git a/eth/handler.go b/eth/handler.go index d9fec2f5f905..68d86ad00796 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -342,7 +342,7 @@ var remoteTxCh = make(chan []*types.Transaction, 100000) func (pm *ProtocolManager) remoteTxFeeder() { for { select { - case txs :=<- remoteTxCh: + case txs := <-remoteTxCh: pm.txpool.AddRemotes(txs) } } @@ -374,86 +374,91 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&query); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } - hashMode := query.Origin.Hash != (common.Hash{}) - first := true - maxNonCanonical := uint64(100) - // Gather headers until the fetch or network limits is reached - var ( - bytes common.StorageSize - headers []*types.Header - unknown bool - ) - for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { - // Retrieve the next header satisfying the query - var origin *types.Header - if hashMode { - if first { - first = false - origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) - if origin != nil { - query.Origin.Number = origin.Number.Uint64() + // Metadium: it's non-blocking now + go func() error { + hashMode := query.Origin.Hash != (common.Hash{}) + first := true + maxNonCanonical := uint64(100) + + // Gather headers until the fetch or network limits is reached + var ( + bytes common.StorageSize + headers []*types.Header + unknown bool + ) + for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { + // Retrieve the next header satisfying the query + var origin *types.Header + if hashMode { + if first { + first = false + origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) + if origin != nil { + query.Origin.Number = origin.Number.Uint64() + } + } else { + origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number) } } else { - origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number) + origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) } - } else { - origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) - } - if origin == nil { - break - } - headers = append(headers, origin) - bytes += estHeaderRlpSize - - // Advance to the next header of the query - switch { - case hashMode && query.Reverse: - // Hash based traversal towards the genesis block - ancestor := query.Skip + 1 - if ancestor == 0 { - unknown = true - } else { - query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical) - unknown = (query.Origin.Hash == common.Hash{}) + if origin == nil { + break } - case hashMode && !query.Reverse: - // Hash based traversal towards the leaf block - var ( - current = origin.Number.Uint64() - next = current + query.Skip + 1 - ) - if next <= current { - infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") - p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) - unknown = true - } else { - if header := pm.blockchain.GetHeaderByNumber(next); header != nil { - nextHash := header.Hash() - expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical) - if expOldHash == query.Origin.Hash { - query.Origin.Hash, query.Origin.Number = nextHash, next + headers = append(headers, origin) + bytes += estHeaderRlpSize + + // Advance to the next header of the query + switch { + case hashMode && query.Reverse: + // Hash based traversal towards the genesis block + ancestor := query.Skip + 1 + if ancestor == 0 { + unknown = true + } else { + query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical) + unknown = (query.Origin.Hash == common.Hash{}) + } + case hashMode && !query.Reverse: + // Hash based traversal towards the leaf block + var ( + current = origin.Number.Uint64() + next = current + query.Skip + 1 + ) + if next <= current { + infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") + p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) + unknown = true + } else { + if header := pm.blockchain.GetHeaderByNumber(next); header != nil { + nextHash := header.Hash() + expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical) + if expOldHash == query.Origin.Hash { + query.Origin.Hash, query.Origin.Number = nextHash, next + } else { + unknown = true + } } else { unknown = true } + } + case query.Reverse: + // Number based traversal towards the genesis block + if query.Origin.Number >= query.Skip+1 { + query.Origin.Number -= query.Skip + 1 } else { unknown = true } - } - case query.Reverse: - // Number based traversal towards the genesis block - if query.Origin.Number >= query.Skip+1 { - query.Origin.Number -= query.Skip + 1 - } else { - unknown = true - } - case !query.Reverse: - // Number based traversal towards the leaf block - query.Origin.Number += query.Skip + 1 + case !query.Reverse: + // Number based traversal towards the leaf block + query.Origin.Number += query.Skip + 1 + } } - } - return p.SendBlockHeaders(headers) + return p.SendBlockHeaders(headers) + }() + return nil case msg.Code == BlockHeadersMsg: if metaminer.AmPartner() && !metaminer.IsPartner(p.ID().String()) { @@ -466,60 +471,66 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&headers); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - // If no headers were received, but we're expending a DAO fork check, maybe it's that - if len(headers) == 0 && p.forkDrop != nil { - // Possibly an empty reply to the fork header checks, sanity check TDs - verifyDAO := true - // If we already have a DAO header, we can check the peer's TD against it. If - // the peer's ahead of this, it too must have a reply to the DAO check - if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil { - if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 { - verifyDAO = false + // Metadium: it's non-blocking now + go func() error { + // If no headers were received, but we're expending a DAO fork check, maybe it's that + if len(headers) == 0 && p.forkDrop != nil { + // Possibly an empty reply to the fork header checks, sanity check TDs + verifyDAO := true + + // If we already have a DAO header, we can check the peer's TD against it. If + // the peer's ahead of this, it too must have a reply to the DAO check + if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil { + if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 { + verifyDAO = false + } } - } - // If we're seemingly on the same chain, disable the drop timer - if verifyDAO { - p.Log().Debug("Seems to be on the same side of the DAO fork") - p.forkDrop.Stop() - p.forkDrop = nil - return nil - } - } - // Filter out any explicitly requested headers, deliver the rest to the downloader - filter := len(headers) == 1 - if filter { - // If it's a potential DAO fork check, validate against the rules - if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 { - // Disable the fork drop timer - p.forkDrop.Stop() - p.forkDrop = nil - - // Validate the header and either drop the peer or continue - if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil { - p.Log().Debug("Verified to be on the other side of the DAO fork, dropping") - return err + // If we're seemingly on the same chain, disable the drop timer + if verifyDAO { + p.Log().Debug("Seems to be on the same side of the DAO fork") + p.forkDrop.Stop() + p.forkDrop = nil + return nil } - p.Log().Debug("Verified to be on the same side of the DAO fork") - return nil } - // Otherwise if it's a whitelisted block, validate against the set - if want, ok := pm.whitelist[headers[0].Number.Uint64()]; ok { - if hash := headers[0].Hash(); want != hash { - p.Log().Info("Whitelist mismatch, dropping peer", "number", headers[0].Number.Uint64(), "hash", hash, "want", want) - return errors.New("whitelist block mismatch") + // Filter out any explicitly requested headers, deliver the rest to the downloader + filter := len(headers) == 1 + if filter { + // If it's a potential DAO fork check, validate against the rules + if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 { + // Disable the fork drop timer + p.forkDrop.Stop() + p.forkDrop = nil + + // Validate the header and either drop the peer or continue + if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil { + p.Log().Debug("Verified to be on the other side of the DAO fork, dropping") + return err + } + p.Log().Debug("Verified to be on the same side of the DAO fork") + return nil } - p.Log().Debug("Whitelist block verified", "number", headers[0].Number.Uint64(), "hash", want) + // Otherwise if it's a whitelisted block, validate against the set + if want, ok := pm.whitelist[headers[0].Number.Uint64()]; ok { + if hash := headers[0].Hash(); want != hash { + p.Log().Info("Whitelist mismatch, dropping peer", "number", headers[0].Number.Uint64(), "hash", hash, "want", want) + return errors.New("whitelist block mismatch") + } + p.Log().Debug("Whitelist block verified", "number", headers[0].Number.Uint64(), "hash", want) + } + // Irrelevant of the fork checks, send the header to the fetcher just in case + headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) } - // Irrelevant of the fork checks, send the header to the fetcher just in case - headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) - } - if len(headers) > 0 || !filter { - err := pm.downloader.DeliverHeaders(p.id, headers) - if err != nil { - log.Debug("Failed to deliver headers", "err", err) + if len(headers) > 0 || !filter { + err := pm.downloader.DeliverHeaders(p.id, headers) + if err != nil { + log.Debug("Failed to deliver headers", "err", err) + } } - } + return nil + }() + return nil case msg.Code == GetBlockBodiesMsg: // Decode the retrieval message @@ -529,24 +540,35 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // Gather blocks until the fetch or network limits is reached var ( + hashes []common.Hash hash common.Hash bytes int bodies []rlp.RawValue ) - for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch { - // Retrieve the hash of the next block + for { if err := msgStream.Decode(&hash); err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - // Retrieve the requested block body, stopping if enough was found - if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 { - bodies = append(bodies, data) - bytes += len(data) - } + hashes = append(hashes, hash) } - return p.SendBlockBodiesRLP(bodies) + + // Metadium: it's non-blocking now + go func() error { + for _, hash = range hashes { + if bytes >= softResponseLimit || len(bodies) >= downloader.MaxBlockFetch { + break + } + // Retrieve the requested block body, stopping if enough was found + if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 { + bodies = append(bodies, data) + bytes += len(data) + } + } + return p.SendBlockBodiesRLP(bodies) + }() + return nil case msg.Code == BlockBodiesMsg: if metaminer.AmPartner() && !metaminer.IsPartner(p.ID().String()) { @@ -559,25 +581,31 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - // Deliver them all to the downloader for queuing - transactions := make([][]*types.Transaction, len(request)) - uncles := make([][]*types.Header, len(request)) - for i, body := range request { - transactions[i] = body.Transactions - uncles[i] = body.Uncles - } - // Filter out any explicitly requested bodies, deliver the rest to the downloader - filter := len(transactions) > 0 || len(uncles) > 0 - if filter { - transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now()) - } - if len(transactions) > 0 || len(uncles) > 0 || !filter { - err := pm.downloader.DeliverBodies(p.id, transactions, uncles) - if err != nil { - log.Debug("Failed to deliver bodies", "err", err) + // Metadium: it's non-blocking now + go func() error { + // Deliver them all to the downloader for queuing + transactions := make([][]*types.Transaction, len(request)) + uncles := make([][]*types.Header, len(request)) + + for i, body := range request { + transactions[i] = body.Transactions + uncles[i] = body.Uncles } - } + // Filter out any explicitly requested bodies, deliver the rest to the downloader + filter := len(transactions) > 0 || len(uncles) > 0 + if filter { + transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now()) + } + if len(transactions) > 0 || len(uncles) > 0 || !filter { + err := pm.downloader.DeliverBodies(p.id, transactions, uncles) + if err != nil { + log.Debug("Failed to deliver bodies", "err", err) + } + } + return nil + }() + return nil case p.version >= eth63 && msg.Code == GetNodeDataMsg: // Decode the retrieval message @@ -587,24 +615,35 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // Gather state data until the fetch or network limits is reached var ( - hash common.Hash - bytes int - data [][]byte + hashes []common.Hash + hash common.Hash + bytes int + data [][]byte ) - for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch { - // Retrieve the hash of the next state entry + for { if err := msgStream.Decode(&hash); err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - // Retrieve the requested state entry, stopping if enough was found - if entry, err := pm.blockchain.TrieNode(hash); err == nil { - data = append(data, entry) - bytes += len(entry) - } + hashes = append(hashes, hash) } - return p.SendNodeData(data) + + // Metadium: it's non-blocking now + go func() error { + for _, hash = range hashes { + if bytes >= softResponseLimit || len(data) >= downloader.MaxStateFetch { + break + } + // Retrieve the requested state entry, stopping if enough was found + if entry, err := pm.blockchain.TrieNode(hash); err == nil { + data = append(data, entry) + bytes += len(entry) + } + } + return p.SendNodeData(data) + }() + return nil case p.version >= eth63 && msg.Code == NodeDataMsg: // A batch of node state data arrived to one of our previous requests @@ -612,10 +651,16 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&data); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - // Deliver all to the downloader - if err := pm.downloader.DeliverNodeData(p.id, data); err != nil { - log.Debug("Failed to deliver node state data", "err", err) - } + + // Metadium: it's non-blocking now + go func() error { + // Deliver all to the downloader + if err := pm.downloader.DeliverNodeData(p.id, data); err != nil { + log.Debug("Failed to deliver node state data", "err", err) + } + return nil + }() + return nil case p.version >= eth63 && msg.Code == GetReceiptsMsg: // Decode the retrieval message @@ -625,33 +670,44 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // Gather state data until the fetch or network limits is reached var ( + hashes []common.Hash hash common.Hash bytes int receipts []rlp.RawValue ) - for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch { - // Retrieve the hash of the next block + for { if err := msgStream.Decode(&hash); err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - // Retrieve the requested block's receipts, skipping if unknown to us - results := pm.blockchain.GetReceiptsByHash(hash) - if results == nil { - if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { - continue + hashes = append(hashes, hash) + } + + // Metadium: it's non-blocking now + go func() error { + for _, hash = range hashes { + if bytes >= softResponseLimit || len(receipts) >= downloader.MaxReceiptFetch { + break + } + // Retrieve the requested block's receipts, skipping if unknown to us + results := pm.blockchain.GetReceiptsByHash(hash) + if results == nil { + if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { + continue + } + } + // If known, encode and queue for response packet + if encoded, err := rlp.EncodeToBytes(results); err != nil { + log.Error("Failed to encode receipt", "err", err) + } else { + receipts = append(receipts, encoded) + bytes += len(encoded) } } - // If known, encode and queue for response packet - if encoded, err := rlp.EncodeToBytes(results); err != nil { - log.Error("Failed to encode receipt", "err", err) - } else { - receipts = append(receipts, encoded) - bytes += len(encoded) - } - } - return p.SendReceiptsRLP(receipts) + return p.SendReceiptsRLP(receipts) + }() + return nil case p.version >= eth63 && msg.Code == ReceiptsMsg: // A batch of receipts arrived to one of our previous requests @@ -659,10 +715,16 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&receipts); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - // Deliver all to the downloader - if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil { - log.Debug("Failed to deliver receipts", "err", err) - } + + // Metadium: it's non-blocking now + go func() error { + // Deliver all to the downloader + if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil { + log.Debug("Failed to deliver receipts", "err", err) + } + return nil + }() + return nil case msg.Code == NewBlockHashesMsg: if metaminer.AmPartner() && !metaminer.IsPartner(p.ID().String()) { @@ -674,20 +736,26 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } - // Mark the hashes as present at the remote node - for _, block := range announces { - p.MarkBlock(block.Hash) - } - // Schedule all the unknown hashes for retrieval - unknown := make(newBlockHashesData, 0, len(announces)) - for _, block := range announces { - if !pm.blockchain.HasBlock(block.Hash, block.Number) { - unknown = append(unknown, block) + + // Metadium: it's non-blocking now + go func() error { + // Mark the hashes as present at the remote node + for _, block := range announces { + p.MarkBlock(block.Hash) } - } - for _, block := range unknown { - pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) - } + // Schedule all the unknown hashes for retrieval + unknown := make(newBlockHashesData, 0, len(announces)) + for _, block := range announces { + if !pm.blockchain.HasBlock(block.Hash, block.Number) { + unknown = append(unknown, block) + } + } + for _, block := range unknown { + pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) + } + return nil + }() + return nil case msg.Code == NewBlockMsg: if metaminer.AmPartner() && !metaminer.IsPartner(p.ID().String()) { @@ -700,31 +768,37 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } - request.Block.ReceivedAt = msg.ReceivedAt - request.Block.ReceivedFrom = p - - // Mark the peer as owning the block and schedule it for import - p.MarkBlock(request.Block.Hash()) - pm.fetcher.Enqueue(p.id, request.Block) - // Assuming the block is importable by the peer, but possibly not yet done so, - // calculate the head hash and TD that the peer truly must have. - var ( - trueHead = request.Block.ParentHash() - trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) - ) - // Update the peer's total difficulty if better than the previous - if _, td := p.Head(); trueTD.Cmp(td) > 0 { - p.SetHead(trueHead, trueTD) - - // Schedule a sync if above ours. Note, this will not fire a sync for a gap of - // a single block (as the true TD is below the propagated block), however this - // scenario should easily be covered by the fetcher. - currentBlock := pm.blockchain.CurrentBlock() - if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { - go pm.synchronise(p) + // Metadium: it's non-blocking now + go func() error { + request.Block.ReceivedAt = msg.ReceivedAt + request.Block.ReceivedFrom = p + + // Mark the peer as owning the block and schedule it for import + p.MarkBlock(request.Block.Hash()) + pm.fetcher.Enqueue(p.id, request.Block) + + // Assuming the block is importable by the peer, but possibly not yet done so, + // calculate the head hash and TD that the peer truly must have. + var ( + trueHead = request.Block.ParentHash() + trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) + ) + // Update the peer's total difficulty if better than the previous + if _, td := p.Head(); trueTD.Cmp(td) > 0 { + p.SetHead(trueHead, trueTD) + + // Schedule a sync if above ours. Note, this will not fire a sync for a gap of + // a single block (as the true TD is below the propagated block), however this + // scenario should easily be covered by the fetcher. + currentBlock := pm.blockchain.CurrentBlock() + if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { + go pm.synchronise(p) + } } - } + return nil + }() + return nil case msg.Code == TxMsg: // Transactions arrived, make sure we have a valid and fresh chain to handle them @@ -736,15 +810,21 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&txs); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - for i, tx := range txs { - // Validate and mark the remote transaction - if tx == nil { - return errResp(ErrDecode, "transaction %d is nil", i) + + // Metadium: it's non-blocking now + go func() error { + for i, tx := range txs { + // Validate and mark the remote transaction + if tx == nil { + return errResp(ErrDecode, "transaction %d is nil", i) + } + p.MarkTransaction(tx.Hash()) } - p.MarkTransaction(tx.Hash()) - } - //pm.txpool.AddRemotes(txs) - remoteTxCh <- txs + //pm.txpool.AddRemotes(txs) + remoteTxCh <- txs + return nil + }() + return nil // Metadium: leader wants to get left-over transactions if any case p.version >= eth63 && msg.Code == GetPendingTxsMsg: @@ -754,14 +834,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // don't do this /* - if !pm.txpool.PendingEmpty() { - txs, err := pm.txpool.Pending() - if err != nil { - return err - } else { - p.resendPendingTxs(txs) + if !pm.txpool.PendingEmpty() { + txs, err := pm.txpool.Pending() + if err != nil { + return err + } else { + p.resendPendingTxs(txs) + } } - } */ return nil @@ -770,12 +850,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // ignore it return nil } - statusEx := metaapi.GetMinerStatus() - statusEx.LatestBlockTd = pm.blockchain.GetTd(statusEx.LatestBlockHash, - statusEx.LatestBlockHeight.Uint64()) - if err := p.SendStatusEx(statusEx); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } + + // Metadium: it's non-blocking now + go func() error { + statusEx := metaapi.GetMinerStatus() + statusEx.LatestBlockTd = pm.blockchain.GetTd(statusEx.LatestBlockHash, + statusEx.LatestBlockHeight.Uint64()) + if err := p.SendStatusEx(statusEx); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + return nil + }() return nil case p.version >= eth63 && msg.Code == StatusExMsg: @@ -787,10 +872,15 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&status); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - if _, td := p.Head(); status.LatestBlockTd.Cmp(td) > 0 { - p.SetHead(status.LatestBlockHash, status.LatestBlockTd) - } - metaapi.GotStatusEx(&status) + + // Metadium: it's non-blocking now + go func() error { + if _, td := p.Head(); status.LatestBlockTd.Cmp(td) > 0 { + p.SetHead(status.LatestBlockHash, status.LatestBlockTd) + } + metaapi.GotStatusEx(&status) + return nil + }() return nil case p.version >= eth63 && msg.Code == EtcdAddMemberMsg: @@ -798,10 +888,15 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // ignore it return nil } - cluster, _ := metaapi.EtcdAddMember(p.ID().String()) - if err := p.SendEtcdCluster(cluster); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } + + // Metadium: it's non-blocking now + go func() error { + cluster, _ := metaapi.EtcdAddMember(p.ID().String()) + if err := p.SendEtcdCluster(cluster); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + return nil + }() return nil case p.version >= eth63 && msg.Code == EtcdClusterMsg: @@ -813,7 +908,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&cluster); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - metaapi.GotEtcdCluster(cluster) + go metaapi.GotEtcdCluster(cluster) return nil default: diff --git a/metadium/admin.go b/metadium/admin.go index 2e3ae54d07b9..25e8bc797ce3 100644 --- a/metadium/admin.go +++ b/metadium/admin.go @@ -87,6 +87,12 @@ type metaAdmin struct { blocksMined int } +// latest block generated +type metaWork struct { + Height int64 `json:"height"` + Hash common.Hash `json:"hash"` +} + var ( // "Metadium Registry" magic, _ = big.NewInt(0).SetString("0x4d6574616469756d205265676973747279", 0) @@ -1055,7 +1061,7 @@ func (ma *metaAdmin) pendingEmpty() bool { return status.Pending == 0 } -func LogBlock(height int64) { +func LogBlock(height int64, hash common.Hash) { if admin == nil || admin.self == nil { return } @@ -1063,16 +1069,29 @@ func LogBlock(height int64) { admin.lock.Lock() defer admin.lock.Unlock() + work, err := json.Marshal(&metaWork{ + Height: height, + Hash: hash, + }) + if err != nil { + log.Error("marshaling failure????") + } + + tstart := time.Now() + if err := admin.etcdPut("metadium-work", string(work)); err != nil { + log.Error("Metadium - failed to log the latest block", + "height", height, "hash", hash, "took", time.Since(tstart)) + } else { + log.Info("Metadium - logged the latest block", + "height", height, "hash", hash, "took", time.Since(tstart)) + } + admin.blocksMined++ height++ if admin.blocksMined >= admin.blocksPer && int(height)%admin.blocksPer == 0 { // time to yield leader role - if !admin.pendingEmpty() { - log.Info("Metadium - not yielding due to pending txs...") - return - } _, next, _ := admin.getMinerNodes(int(height), true) if next.Id == admin.self.Id { log.Info("Metadium - yield to self", "mined", admin.blocksMined, diff --git a/metadium/etcdutil.go b/metadium/etcdutil.go index 199b88340731..439989fef0dc 100644 --- a/metadium/etcdutil.go +++ b/metadium/etcdutil.go @@ -444,6 +444,51 @@ func (ma *metaAdmin) etcdLeader(locked bool) (uint64, *metaNode) { return 0, nil } +func (ma *metaAdmin) etcdPut(key, value string) error { + if !ma.etcdIsRunning() { + return ErrNotRunning + } + + ctx, cancel := context.WithTimeout(context.Background(), + ma.etcd.Server.Cfg.ReqTimeout()) + defer cancel() + _, err := ma.etcdCli.Put(ctx, key, value) + return err +} + +func (ma *metaAdmin) etcdGet(key string) (string, error) { + if !ma.etcdIsRunning() { + return "", ErrNotRunning + } + + ctx, cancel := context.WithTimeout(context.Background(), + time.Duration(1)*time.Second) + defer cancel() + rsp, err := ma.etcdCli.Get(ctx, key) + if err != nil { + return "", err + } else if rsp.Count == 0 { + return "", nil + } else { + var v string + for _, kv := range rsp.Kvs { + v = string(kv.Value) + } + return v, nil + } +} + +func (ma *metaAdmin) etcdDelete(key string) error { + if !ma.etcdIsRunning() { + return ErrNotRunning + } + ctx, cancel := context.WithTimeout(context.Background(), + ma.etcd.Server.Cfg.ReqTimeout()) + defer cancel() + _, err := ma.etcdCli.Delete(ctx, key) + return err +} + func (ma *metaAdmin) etcdInfo() interface{} { if ma.etcd == nil { return ErrNotRunning diff --git a/metadium/miner/miner.go b/metadium/miner/miner.go index 5313344bf404..17fc9bd0ff50 100644 --- a/metadium/miner/miner.go +++ b/metadium/miner/miner.go @@ -14,7 +14,7 @@ var ( IsMinerFunc func() bool AmPartnerFunc func() bool IsPartnerFunc func(string) bool - LogBlockFunc func(int64) + LogBlockFunc func(int64, common.Hash) CalculateRewardsFunc func(*big.Int, *big.Int, *big.Int, func(common.Address, *big.Int)) (*common.Address, []byte, error) VerifyRewardsFunc func(*big.Int, string) error SignBlockFunc func(hash common.Hash) (nodeid, sig []byte, err error) @@ -48,9 +48,9 @@ func AmPartner() bool { } } -func LogBlock(height int64) { +func LogBlock(height int64, hash common.Hash) { if LogBlockFunc != nil { - LogBlockFunc(height) + LogBlockFunc(height, hash) } } diff --git a/metadium/sync.go b/metadium/sync.go index b554fe1e4e09..4a9a9e118c4b 100644 --- a/metadium/sync.go +++ b/metadium/sync.go @@ -3,7 +3,9 @@ package metadium import ( + "bytes" "context" + "encoding/json" "math/big" "sync" "time" @@ -35,7 +37,7 @@ func (ma *metaAdmin) getLatestBlockInfo(node *metaNode) (height *big.Int, hash c timer := time.NewTimer(60 * time.Second) err = ma.rpcCli.CallContext(ctx, nil, "admin_requestMinerStatus", &node.Id) if err != nil { - log.Error("Metadium RequestMinerStatus Failed", "id", node.Id, "error", err) + log.Info("Metadium RequestMinerStatus Failed", "id", node.Id, "error", err) return } @@ -66,15 +68,16 @@ func (ma *metaAdmin) getLatestBlockInfo(node *metaNode) (height *big.Int, hash c } // syncLock should be held by the caller -func (ma *metaAdmin) syncWith(node *metaNode) { +func (ma *metaAdmin) syncWith(node *metaNode) error { + tsync := time.Now() height, hash, td, err := ma.getLatestBlockInfo(node) if err != nil { log.Error("Metadium", "failed to synchronize with", node.Name, - "error", err) - return + "error", err, "took", time.Since(tsync)) + return err } else { - log.Error("Metadium", "synchronizing with", node.Name, - "height", height, "hash", hash, "td", td) + log.Info("Metadium", "synchronizing with", node.Name, + "height", height, "hash", hash, "td", td, "took", time.Since(tsync)) } ctx, cancel := context.WithCancel(context.Background()) @@ -82,30 +85,168 @@ func (ma *metaAdmin) syncWith(node *metaNode) { err = ma.rpcCli.CallContext(ctx, nil, "admin_synchroniseWith", &node.Id) if err != nil { log.Error("Metadium", "failed to synchronize with", node.Name, - "error", err) + "error", err, "took", time.Since(tsync)) } else { - log.Error("Metadium", "synchronized with", node.Name) + log.Info("Metadium", "synchronized with", node.Name, "took", time.Since(tsync)) } + return err } -func (ma *metaAdmin) updateMiner(locked bool) { +// return true if this node still is the miner after update +func (ma *metaAdmin) updateMiner(locked bool) bool { if ma.etcd == nil { - return + return false } + syncLock.Lock() + defer syncLock.Unlock() + lid, lnode := ma.etcdLeader(locked) - if lid != leaderId && lid != 0 { - syncLock.Lock() - _, oldLeader := leaderId, leader - leaderId, leader = lid, lnode - if leader == ma.self && oldLeader != nil { - log.Error("Metadium: we are the new leader", - "syncing with", oldLeader.Name) + if lid == leaderId || lid == 0 { + return lnode == ma.self + } + + _, oldLeader := leaderId, leader + leaderId, leader = lid, lnode + if leader == ma.self && oldLeader != nil { + // We are the new leader. Make sure we have the latest block. + // Otherwise, punt the leadership to the next in line. + // If all fails, accept the potential fork and move on. + + log.Info("Metadium: we are the new leader") + tstart := time.Now() + + // get the latest work info from etcd + getLatestWork := func() (*metaWork, error) { + var ( + workInfo string + work *metaWork + retries = 60 + err error + ) + + for ; retries > 0; retries-- { + workInfo, err = ma.etcdGet("metadium-work") + if err != nil { + // TODO: ignore if error is not found + log.Info("Metadium - cannot get the latest work info", + "error", err, "took", time.Since(tstart)) + continue + } + + if workInfo == "" { + log.Info("Metadium - the latest work info not logged yet") + return nil, nil + } else { + if err = json.Unmarshal([]byte(workInfo), &work); err != nil { + log.Error("Metadium - cannot get the latest work info", + "error", err, "took", time.Since(tstart)) + return nil, err + } + log.Info("Metadium - got the latest work info", + "height", work.Height, "hash", work.Hash, + "took", time.Since(tstart)) + return work, nil + } + } + return nil, ethereum.NotFound + } + + // check if we are in sync with the latest work info recorded + inSync := func(work *metaWork) (synced bool, latestNum uint64, curNum uint64) { + synced, latestNum, curNum = false, 0, 0 + + if work == nil { + synced = true + return + } + latestNum = uint64(work.Height) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cur, err := ma.cli.HeaderByNumber(ctx, big.NewInt(work.Height)) + if err != nil { + return + } + curNum = uint64(cur.Number.Int64()) + synced = bytes.Equal(cur.Hash().Bytes(), work.Hash.Bytes()) + return + } + + // if we are not in sync, punt the leadership to the next in line + // if all fails, just move on + puntLeadership := func() error { + nodes := ma.getNodes() + if len(nodes) == 0 { + return ethereum.NotFound + } + + ix := 0 + for i, node := range nodes { + if node.Id == ma.self.Id { + ix = i + break + } + } + if ix >= len(nodes) { + return ethereum.NotFound + } + + var err error + for i, j := 0, (ix+1)%len(nodes); i < len(nodes)-1; i++ { + err = ma.etcdMoveLeader(nodes[j].Name) + if err == nil { + return nil + } + j = (j + 1) % len(nodes) + } + + return err + } + + work, err := getLatestWork() + if err != nil { + log.Error("Metadium - cannot get the latest work information. Yielding leadeship") + err = puntLeadership() + if err != nil { + log.Error("Metadium - leadership yielding failed", "error", err) + } else { + log.Info("Metadium - yielded leadership") + } + } else if work == nil { + // this must be the first block, juts move on + log.Info("Metadium - not initialized yet. Starting mining") + } else if synced, _, height := inSync(work); synced { + log.Info("Metadium - in sync. Starting mining", "height", height) + } else { + // sync with the previous leader ma.syncWith(oldLeader) - log.Error("Metadium", "sync done with", oldLeader.Name) + + // check sync again + work, err = getLatestWork() + if work == nil { + // this must be the first block, juts move on + } else if synced, _, height := inSync(work); !synced { + // if still not in sync, give up leadership + err = puntLeadership() + if err != nil { + log.Error("Metadium - not in sync. Leadership yielding failed", + "latest", work.Height, "current", height, "error", err) + } else { + log.Error("Metadium - not in sync. Yielded leadership", + "latest", work.Height, "current", height) + } + } + } + + // update leader info again + lid, lnode = ma.etcdLeader(locked) + if lid != leaderId && lid != 0 { + leaderId, leader = lid, lnode } - syncLock.Unlock() } + + return leader == ma.self } func IsMiner() bool { @@ -125,8 +266,7 @@ func IsMiner() bool { } if admin.etcdIsLeader() { - admin.updateMiner(false) - return true + return admin.updateMiner(false) } else { admin.blocksMined = 0 return false diff --git a/miner/unconfirmed.go b/miner/unconfirmed.go index f35aa29697ff..50104656c6ca 100644 --- a/miner/unconfirmed.go +++ b/miner/unconfirmed.go @@ -85,7 +85,7 @@ func (set *unconfirmedBlocks) Insert(index uint64, hash common.Hash) { } // Display a log for the user to notify of a new mined block unconfirmed header := set.chain.GetHeaderByNumber(index) - go metaminer.LogBlock(header.Number.Int64()) + go metaminer.LogBlock(header.Number.Int64(), hash) log.Info("🔨 mined potential block", "number", index, "hash", hash, "elapsed", common.PrettyDuration(time.Since(time.Unix(header.Time.Int64(), 0)))) } diff --git a/miner/worker.go b/miner/worker.go index c90f5c779a22..ab9b4df6dcb2 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -179,6 +179,9 @@ type worker struct { resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval. } +// compare and swap lock for mining thread +var busyMining int32 + func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(*types.Block) bool) *worker { worker := &worker{ config: config, @@ -196,7 +199,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), - newWorkCh: make(chan *newWorkReq, 2), + newWorkCh: make(chan *newWorkReq, 4), taskCh: make(chan *task), resultCh: make(chan *types.Block, resultQueueSize), exitCh: make(chan struct{}), @@ -405,11 +408,10 @@ func (w *worker) newWorkLoopEx(recommit time.Duration) { // commitSimple just starts a new commitNewWork commitSimple := func() { - if isBusyMining() { - return - } else { + if atomic.CompareAndSwapInt32(&busyMining, 0, 1) { w.newWorkCh <- &newWorkReq{interrupt: nil, noempty: false, timestamp: time.Now().Unix()} atomic.StoreInt32(&w.newTxs, 0) + atomic.StoreInt32(&busyMining, 0) } } // clearPending cleans the stale pending tasks. @@ -802,6 +804,10 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin if tx == nil { break } + // Break if it has to enough transactions + if params.MaxTxsPerBlock > 0 && w.current.tcount >= params.MaxTxsPerBlock { + break + } // Break if it took too long if tstart != nil && time.Since(*tstart).Seconds() >= 4 { break @@ -1131,8 +1137,6 @@ func (w *worker) commitTransactionsEx(num *big.Int, interrupt *int32, tstart tim return false } -var busyMining int32 - func isBusyMining() bool { return atomic.LoadInt32(&busyMining) != 0 } @@ -1445,6 +1449,10 @@ func (w *worker) commitEx(uncles []*types.Header, interval func(), update bool, } log.Info("Successfully sealed new block", "number", sealedBlock.Number(), "sealhash", sealhash, "hash", hash, "elapsed", common.PrettyDuration(time.Since(createdAt))) + if !metaminer.IsMiner() { + return errors.New("Not Miner") + } + // Broadcast the block and announce chain insertion event w.mux.Post(core.NewMinedBlockEvent{Block: sealedBlock}) diff --git a/node/config.go b/node/config.go index 7b32a590875f..db41b3074238 100644 --- a/node/config.go +++ b/node/config.go @@ -200,7 +200,7 @@ func DefaultIPCEndpoint(clientIdentifier string) string { panic("empty executable name") } } - config := &Config{DataDir: DefaultDataDir(), IPCPath: clientIdentifier + ".ipc"} + config := &Config{DataDir: DefaultDataDir(), IPCPath: "geth.ipc"} return config.IPCEndpoint() } @@ -238,8 +238,8 @@ func DefaultWSEndpoint() string { func (c *Config) NodeName() string { name := c.name() // Backwards compatibility: previous versions used title-cased "Geth", keep that. - if name == "geth" || name == "geth-testnet" { - name = "Geth" + if name == "geth" || name == "geth-testnet" || name == "gmet" || name == "gmet-testnet" { + name = "Gmet" } if c.UserIdent != "" { name += "/" + c.UserIdent @@ -258,7 +258,12 @@ func (c *Config) name() string { if progname == "" { panic("empty executable name, set Config.Name") } + if strings.EqualFold(c.Name, "gmet") { + progname = "geth" + } return progname + } else if strings.EqualFold(c.Name, "gmet") { + return "geth" } return c.Name } diff --git a/params/protocol_params.go b/params/protocol_params.go index a428e75b6576..8499b9c3044b 100644 --- a/params/protocol_params.go +++ b/params/protocol_params.go @@ -115,4 +115,5 @@ var ( NonceLimit uint64 = 0 // nonce limit for non-governing accounts UseRocksDb int = 1 // LevelDB (0) or RocksDB (1) PrefetchCount int = 0 // Transaction Prefetch count for faster db read + MaxTxsPerBlock int = 5000 // Max # of transactions in a block ) diff --git a/params/version.go b/params/version.go index e3815aaf0d12..f90b0931c239 100644 --- a/params/version.go +++ b/params/version.go @@ -21,9 +21,9 @@ import ( ) const ( - VersionMajor = 1 // Major version component of the current release - VersionMinor = 8 // Minor version component of the current release - VersionPatch = 23 // Patch version component of the current release + VersionMajor = 0 // Major version component of the current release + VersionMinor = 9 // Minor version component of the current release + VersionPatch = 3 // Patch version component of the current release VersionMeta = "stable" // Version metadata to append to the version string )