Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a way to share transactions between selected clients #2289

Merged
merged 3 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ var TXPoolFlags = []Flag{
TxPoolAccountQueueFlag,
TxPoolGlobalQueueFlag,
TxPoolLifetimeFlag,
TxPoolSharingClientsFlag,
}

var WorkShareFlags = []Flag{
Expand Down Expand Up @@ -404,6 +405,12 @@ var (
Usage: "Maximum amount of time non-executable transaction are queued" + generateEnvDoc(c_TXPoolPrefix+"lifetime"),
}

TxPoolSharingClientsFlag = Flag{
Name: c_TXPoolPrefix + "sharing-clients",
Value: "",
Usage: "Comma separated list of clients end points that the node would directy share the transactions with" + generateEnvDoc(c_TXPoolPrefix+"sharing-clients"),
}

CacheFlag = Flag{
Name: c_NodeFlagPrefix + "cache",
Value: 1024,
Expand Down Expand Up @@ -1099,6 +1106,8 @@ func setTxPool(cfg *core.TxPoolConfig, nodeLocation common.Location) {
if viper.IsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = viper.GetDuration(TxPoolLifetimeFlag.Name)
}

cfg.SharingClientsEndpoints = SplitAndTrim(viper.GetString(TxPoolSharingClientsFlag.Name))
}

func setConsensusEngineConfig(cfg *quaiconfig.Config) {
Expand Down
29 changes: 16 additions & 13 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ import (
)

const (
c_maxAppendQueue = 100000 // Maximum number of future headers we can store in cache
c_maxFutureTime = 30 // Max time into the future (in seconds) we will accept a block
c_appendQueueRetryPeriod = 1 // Time (in seconds) before retrying to append from AppendQueue
c_appendQueueThreshold = 200 // Number of blocks to load from the disk to ram on every proc of append queue
c_processingCache = 10 // Number of block hashes held to prevent multi simultaneous appends on a single block hash
c_primeRetryThreshold = 1800 // Number of times a block is retry to be appended before eviction from append queue in Prime
c_regionRetryThreshold = 1200 // Number of times a block is retry to be appended before eviction from append queue in Region
c_zoneRetryThreshold = 600 // Number of times a block is retry to be appended before eviction from append queue in Zone
c_appendQueueRetryPriorityThreshold = 5 // If retry counter for a block is less than this number, then its put in the special list that is tried first to be appended
c_appendQueueRemoveThreshold = 10 // Number of blocks behind the block should be from the current header to be eligble for removal from the append queue
c_normalListProcCounter = 1 // Ratio of Number of times the PriorityList is serviced over the NormalList
c_statsPrintPeriod = 60 // Time between stats prints
c_maxAppendQueue = 100000 // Maximum number of future headers we can store in cache
c_maxFutureTime = 30 // Max time into the future (in seconds) we will accept a block
c_appendQueueRetryPeriod = 100 * time.Millisecond // Time before retrying to append from AppendQueue
c_appendQueueThreshold = 200 // Number of blocks to load from the disk to ram on every proc of append queue
c_processingCache = 10 // Number of block hashes held to prevent multi simultaneous appends on a single block hash
c_primeRetryThreshold = 1800 // Number of times a block is retry to be appended before eviction from append queue in Prime
c_regionRetryThreshold = 1200 // Number of times a block is retry to be appended before eviction from append queue in Region
c_zoneRetryThreshold = 600 // Number of times a block is retry to be appended before eviction from append queue in Zone
c_appendQueueRetryPriorityThreshold = 5 // If retry counter for a block is less than this number, then its put in the special list that is tried first to be appended
c_appendQueueRemoveThreshold = 10 // Number of blocks behind the block should be from the current header to be eligble for removal from the append queue
c_normalListProcCounter = 1 // Ratio of Number of times the PriorityList is serviced over the NormalList
c_statsPrintPeriod = 60 // Time between stats prints
c_appendQueuePrintSize = 10
c_normalListBackoffThreshold = 5 // Max multiple on the c_normalListProcCounter
c_maxRemoteTxQueue = 50000
Expand Down Expand Up @@ -442,7 +442,7 @@ func (c *Core) updateAppendQueue() {
}).Error("Go-Quai Panicked")
}
}()
futureTimer := time.NewTicker(c_appendQueueRetryPeriod * time.Second)
futureTimer := time.NewTicker(c_appendQueueRetryPeriod)
defer futureTimer.Stop()
for {
select {
Expand Down Expand Up @@ -1331,6 +1331,9 @@ func (c *Core) ContentFrom(addr common.Address) (types.Transactions, types.Trans
}
return c.sl.txPool.ContentFrom(internal)
}
func (c *Core) SendTxToSharingClients(tx *types.Transaction) {
c.sl.txPool.SendTxToSharingClients(tx)
}

func (c *Core) SuggestFinalityDepth(qiValue *big.Int, correlatedRisk *big.Int) *big.Int {
qiRewardPerBlock := misc.CalculateQiReward(c.CurrentHeader().WorkObjectHeader())
Expand Down
3 changes: 0 additions & 3 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,9 +979,6 @@ func (hc *HeaderChain) RecoverCurrentHeader() *types.WorkObject {
// In theory, if header is present in the database, all relative components
// like td and hash->number should be present too.
func (hc *HeaderChain) HasHeader(hash common.Hash, number uint64) bool {
if hc.numberCache.Contains(hash) || hc.headerCache.Contains(hash) {
return true
}
return rawdb.HasHeader(hc.headerDb, hash, number)
}

Expand Down
130 changes: 106 additions & 24 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"context"
"errors"
"fmt"
"math"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/metrics_config"
"github.com/dominant-strategies/go-quai/params"
"github.com/dominant-strategies/go-quai/quaiclient"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -193,6 +195,8 @@ type TxPoolConfig struct {
QiTxLifetime time.Duration // Maximum amount of time Qi transactions are queued
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
ReorgFrequency time.Duration // Frequency of reorgs outside of new head events

SharingClientsEndpoints []string // List of end points of the nodes to share the incoming local transactions with
}

// DefaultTxPoolConfig contains the default configurations for the transaction
Expand Down Expand Up @@ -356,6 +360,9 @@ type TxPool struct {
wg sync.WaitGroup // tracks loop, scheduleReorgLoop

logger *log.Logger

poolSharingClients []*quaiclient.Client
poolSharingTxCh chan *types.Transaction
}

type txpoolResetRequest struct {
Expand Down Expand Up @@ -403,30 +410,32 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block

// Create the transaction pool with its initial settings
pool := &TxPool{
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.LatestSigner(chainconfig),
pending: make(map[common.InternalAddress]*txList),
queue: make(map[common.InternalAddress]*txList),
beats: make(map[common.InternalAddress]time.Time),
sendersCh: make(chan newSender, config.SendersChBuffer),
feesCh: make(chan newFee, config.SendersChBuffer),
invalidQiTxsCh: make(chan []*common.Hash, config.SendersChBuffer),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest, chainHeadChanSize),
reqPromoteCh: make(chan *accountSet, chainHeadChanSize),
queueTxEventCh: make(chan *types.Transaction, chainHeadChanSize),
broadcastSet: make(types.Transactions, 0),
reorgDoneCh: make(chan chan struct{}, chainHeadChanSize),
reorgShutdownCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
localTxsCount: 0,
remoteTxsCount: 0,
reOrgCounter: 0,
logger: logger,
db: db,
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.LatestSigner(chainconfig),
pending: make(map[common.InternalAddress]*txList),
queue: make(map[common.InternalAddress]*txList),
beats: make(map[common.InternalAddress]time.Time),
sendersCh: make(chan newSender, config.SendersChBuffer),
feesCh: make(chan newFee, config.SendersChBuffer),
invalidQiTxsCh: make(chan []*common.Hash, config.SendersChBuffer),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest, chainHeadChanSize),
reqPromoteCh: make(chan *accountSet, chainHeadChanSize),
queueTxEventCh: make(chan *types.Transaction, chainHeadChanSize),
broadcastSet: make(types.Transactions, 0),
reorgDoneCh: make(chan chan struct{}, chainHeadChanSize),
reorgShutdownCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
localTxsCount: 0,
remoteTxsCount: 0,
reOrgCounter: 0,
logger: logger,
db: db,
poolSharingClients: make([]*quaiclient.Client, len(config.SharingClientsEndpoints)),
poolSharingTxCh: make(chan *types.Transaction, 100),
}

qiPool, _ := lru.New[common.Hash, *types.TxWithMinerFee](int(config.QiPoolSize))
Expand Down Expand Up @@ -455,6 +464,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pool.wg.Add(1)
go pool.scheduleReorgLoop()

pool.wg.Add(1)
go pool.txListenerLoop()

// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal, logger)
Expand All @@ -467,6 +479,21 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
}
}

// connect to the pool sharing clients
for i := range config.SharingClientsEndpoints {
go func() {
defer func() {
if r := recover(); r != nil {
pool.logger.WithFields(log.Fields{
"error": r,
"stacktrace": string(debug.Stack()),
}).Error("Go-Quai Panicked")
}
}()
pool.createSharingClient(i)
}()
}

// Subscribe events from blockchain and start the main event loop.
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
pool.wg.Add(1)
Expand Down Expand Up @@ -1091,6 +1118,7 @@ func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
if len(txs) == 0 {
return []error{}
}

return pool.addTxs(txs, !pool.config.NoLocals, true)
}

Expand Down Expand Up @@ -1542,6 +1570,60 @@ func (pool *TxPool) queueTxEvent(tx *types.Transaction) {
}
}

// SendTxToSharingClients sends the tx into the pool sharing tx ch and
// if its full logs it
func (pool *TxPool) SendTxToSharingClients(tx *types.Transaction) {
select {
case pool.poolSharingTxCh <- tx:
default:
pool.logger.Warn("pool sharing tx ch is full")
}
}

// txListenerLoop listens to tx coming on the pool sharing tx ch and
// then sends the tx into the pool sharing clients
func (pool *TxPool) txListenerLoop() {
defer func() {
if r := recover(); r != nil {
pool.logger.WithFields(log.Fields{
"error": r,
"stacktrace": string(debug.Stack()),
}).Error("Go-Quai Panicked")
}
}()
defer pool.wg.Done()

for {
select {
case tx := <-pool.poolSharingTxCh:
// send to all pool sharing clients
for _, client := range pool.poolSharingClients {
if client != nil {
err := client.SendTransactionToPoolSharingClient(context.Background(), tx)
if err != nil {
pool.logger.WithField("err", err).Error("Error sending transaction to pool sharing client")
}
}
}
}
}
}

// createSharingClient creates a quaiclient connection and writes it to the
// SharingClients
func (pool *TxPool) createSharingClient(index int) {
client, err := quaiclient.Dial(pool.config.SharingClientsEndpoints[index], pool.logger)
if err != nil {
pool.logger.WithField("end point", pool.config.SharingClientsEndpoints[index]).Warn("Client was nil trying to send transactions")
return
}

pool.logger.WithField("endpoint", pool.config.SharingClientsEndpoints[index]).Info("Pool sharing client connected")

// set the created client into the pool sharing client
pool.poolSharingClients[index] = client
}

// scheduleReorgLoop schedules runs of reset and promoteExecutables. Code above should not
// call those methods directly, but request them being run using requestReset and
// requestPromoteExecutables instead.
Expand Down
24 changes: 24 additions & 0 deletions internal/quaiapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1564,9 +1564,33 @@ func (s *PublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, input
if err != nil {
return common.Hash{}, err
}
// Send the tx to tx pool sharing clients
s.b.SendTxToSharingClients(tx)

return SubmitTransaction(ctx, s.b, tx)
}

func (s *PublicTransactionPoolAPI) ReceiveTxFromPoolSharingClient(ctx context.Context, input hexutil.Bytes) error {
tx := new(types.Transaction)
protoTransaction := new(types.ProtoTransaction)
err := proto.Unmarshal(input, protoTransaction)
if err != nil {
return err
}
err = tx.ProtoDecode(protoTransaction, s.b.NodeLocation())
if err != nil {
return err
}

s.b.Logger().Info("Received a tx from pool sharing client")

_, err = SubmitTransaction(ctx, s.b, tx)
if err != nil {
return err
}
return nil
}

// PublicDebugAPI is the collection of Quai APIs exposed over the public
// debugging endpoint.
type PublicDebugAPI struct {
Expand Down
1 change: 1 addition & 0 deletions internal/quaiapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type Backend interface {
TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions)
GetMinGasPrice() *big.Int
GetPoolGasPrice() *big.Int
SendTxToSharingClients(tx *types.Transaction)

// Filter API
BloomStatus() (uint64, uint64)
Expand Down
1 change: 1 addition & 0 deletions internal/quaiapi/quai_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,7 @@ func (s *PublicBlockChainQuaiAPI) CalcOrder(ctx context.Context, raw hexutil.Byt
}
return hexutil.Uint(order), nil
}

jdowning100 marked this conversation as resolved.
Show resolved Hide resolved
func (s *PublicBlockChainQuaiAPI) SuggestFinalityDepth(ctx context.Context, qiValue hexutil.Uint64, correlatedRisk hexutil.Uint64) (hexutil.Uint64, error) {

depth, err := s.b.SuggestFinalityDepth(ctx, big.NewInt(int64(qiValue)), big.NewInt(int64(correlatedRisk)))
Expand Down
4 changes: 4 additions & 0 deletions quai/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ func (b *QuaiAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address)
return b.quai.core.Nonce(addr), nil
}

func (b *QuaiAPIBackend) SendTxToSharingClients(tx *types.Transaction) {
b.quai.core.SendTxToSharingClients(tx)
}

func (b *QuaiAPIBackend) GetMinGasPrice() *big.Int {
return b.quai.core.GetMinGasPrice()
}
Expand Down
1 change: 1 addition & 0 deletions quai/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (h *handler) checkNextPrimeBlock() {
// protocol.C_NumPrimeBlocksToDownload
if i < 2*protocol.C_NumPrimeBlocksToDownload {
h.GetNextPrimeBlock(syncHeight.Add(syncHeight, big.NewInt(protocol.C_NumPrimeBlocksToDownload)))
h.GetNextPrimeBlock(syncHeight.Add(new(big.Int).Add(syncHeight, big.NewInt(protocol.C_NumPrimeBlocksToDownload)), big.NewInt(protocol.C_NumPrimeBlocksToDownload)))
}
break
}
Expand Down
18 changes: 17 additions & 1 deletion quaiclient/quaiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func DialContext(ctx context.Context, rawurl string, logger *log.Logger) (*Clien
attempts += 1
// exponential back-off implemented
// delaySecs := int64(math.Floor((math.Pow(2, float64(attempts)) - 1) * 0.5))
delaySecs := int64(1)
delaySecs := int64(attempts) * 5
if delaySecs > exponentialBackoffCeilingSecs {
return nil, err
}
Expand Down Expand Up @@ -332,3 +332,19 @@ func (ec *Client) CalcOrder(ctx context.Context, header *types.WorkObject) (int,
}
return int(result), nil
}

// SendTransactionToPoolSharingClient injects a signed transaction into the pending pool for execution.
//
// If the transaction was a contract creation use the TransactionReceipt method to get the
// contract address after the transaction has been mined.
func (ec *Client) SendTransactionToPoolSharingClient(ctx context.Context, tx *types.Transaction) error {
protoTx, err := tx.ProtoEncode()
if err != nil {
return err
}
data, err := proto.Marshal(protoTx)
if err != nil {
return err
}
return ec.c.CallContext(ctx, nil, "quai_receiveTxFromPoolSharingClient", hexutil.Encode(data))
}
Loading