-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mempool concurrency: transaction validation #103
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ type TxInclusionResponse struct { | |
|
||
type Mempool struct { | ||
txReqs chan *txRequest | ||
validatedTxReqs chan *txRequest | ||
quit chan bool | ||
flushReq chan flushSpendReq | ||
txPool []MempoolTx | ||
|
@@ -50,36 +51,58 @@ type flushSpendReq struct { | |
|
||
func NewMempool(storage db.Storage, client eth.Client) *Mempool { | ||
return &Mempool{ | ||
txReqs: make(chan *txRequest), | ||
quit: make(chan bool), | ||
flushReq: make(chan flushSpendReq), | ||
txPool: make([]MempoolTx, 0), | ||
poolSpends: make(map[string]bool), | ||
storage: storage, | ||
client: client, | ||
txReqs: make(chan *txRequest), | ||
validatedTxReqs: make(chan *txRequest), | ||
quit: make(chan bool), | ||
flushReq: make(chan flushSpendReq), | ||
txPool: make([]MempoolTx, 0), | ||
poolSpends: make(map[string]bool), | ||
storage: storage, | ||
client: client, | ||
} | ||
} | ||
|
||
func (m *Mempool) validateTransactionRequest(req *txRequest) { | ||
tx := req.tx | ||
var err error | ||
if tx.Body.IsDeposit() { | ||
err = m.VerifyDepositTransaction(&tx) | ||
} else { | ||
// 300-500us | ||
err = m.VerifySpendTransaction(&tx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once you move out the I know you said the In summary: Thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Second thoughts on this: realized that if a block is about to be written with transaction X (via flush), but a double-spend transaction X' is in a go-routine just past this line, it's possible for a double-spend to sneak in. Basically mempool validation can't occur while a block is about to be written. So you're right - I will need to re-think a little bit more There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, mempool validation cannot occur while a block is about to be/is being written. That's handled by the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I said the double spend check can't be parallelized I was referring to how it requires that blocks be written to disk before new transactions can be processed by the mempool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah but I think there is a race-condition introduced here. As described in my last comment: https://github.com/kyokan/plasma/pull/103/files#r259601343. The transaction validation can occur simultaneously as a package block write. Basically we should wrap the go-routine in a lock. |
||
} | ||
if err != nil { | ||
mPoolLogger.WithFields(logrus.Fields{ | ||
"hash": tx.Body.SignatureHash().Hex(), | ||
"reason": err, | ||
}).Warn("transaction rejected from mempool") | ||
|
||
req.res <- TxInclusionResponse{ | ||
Error: err, | ||
} | ||
return | ||
} | ||
|
||
m.validatedTxReqs <- req | ||
} | ||
|
||
func (m *Mempool) Start() error { | ||
go func() { | ||
for { | ||
select { | ||
case req := <-m.txReqs: | ||
if len(m.txPool) == MaxMempoolSize { | ||
req.res <- TxInclusionResponse{ | ||
Error: errors.New("mempool is full"), | ||
} | ||
continue | ||
} | ||
|
||
tx := req.tx | ||
var err error | ||
if tx.Body.IsDeposit() { | ||
err = m.VerifyDepositTransaction(&tx) | ||
} else { | ||
err = m.VerifySpendTransaction(&tx) | ||
} | ||
if err != nil { | ||
go m.validateTransactionRequest(req) | ||
case req := <-m.validatedTxReqs: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We validate the incoming transaction request first against previous block history for double-spends and once that's cleared we do our mem-pool integrity checks synchronously below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this approach. |
||
if len(m.txPool) == MaxMempoolSize { | ||
req.res <- TxInclusionResponse{ | ||
Error: errors.New("mempool is full"), | ||
} | ||
continue | ||
} | ||
|
||
tx := req.tx | ||
|
||
if err := m.ensureNoPoolSpend(&tx); err != nil { | ||
mPoolLogger.WithFields(logrus.Fields{ | ||
"hash": tx.Body.SignatureHash().Hex(), | ||
"reason": err, | ||
|
@@ -90,10 +113,13 @@ func (m *Mempool) Start() error { | |
} | ||
continue | ||
} | ||
|
||
m.txPool = append(m.txPool, MempoolTx{ | ||
Tx: tx, | ||
Response: req.res, | ||
}) | ||
|
||
// 3us | ||
m.updatePoolSpends(&tx) | ||
case req := <-m.flushReq: | ||
res := m.txPool | ||
|
@@ -134,18 +160,10 @@ func (m *Mempool) Append(tx chain.Transaction) TxInclusionResponse { | |
} | ||
|
||
func (m *Mempool) VerifySpendTransaction(tx *chain.Transaction) (error) { | ||
if err := m.ensureNoPoolSpend(tx); err != nil { | ||
return err | ||
} | ||
|
||
return validation.ValidateSpendTransaction(m.storage, tx) | ||
} | ||
|
||
func (m *Mempool) VerifyDepositTransaction(tx *chain.Transaction) error { | ||
if err := m.ensureNoPoolSpend(tx); err != nil { | ||
return err | ||
} | ||
|
||
return validation.ValidateDepositTransaction(m.storage, m.client, tx) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,31 +14,38 @@ import ( | |
) | ||
|
||
func ValidateSpendTransaction(storage db.Storage, tx *chain.Transaction) (error) { | ||
// 300 ns | ||
if tx.Body.Output0.Amount.Cmp(big.NewInt(0)) == -1 { | ||
return NewErrNegativeOutput(0) | ||
} | ||
if tx.Body.Output1.Amount.Cmp(big.NewInt(0)) == -1 { | ||
return NewErrNegativeOutput(1) | ||
} | ||
|
||
// 20 Us | ||
prevTx0Conf, err := storage.FindTransactionByBlockNumTxIdx(tx.Body.Input0.BlockNumber, tx.Body.Input0.TransactionIndex) | ||
if err == leveldb.ErrNotFound { | ||
return NewErrTxNotFound(0, tx.Body.Input0.BlockNumber, tx.Body.Input0.TransactionIndex) | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// 10 Us | ||
prevTx0 := prevTx0Conf.Transaction | ||
if prevTx0Conf.ConfirmSigs[0] != tx.Body.Input0ConfirmSig { | ||
return NewErrConfirmSigMismatch(0) | ||
} | ||
sigHash0 := tx.Body.SignatureHash() | ||
|
||
// 200 Us | ||
prevTx0Output := prevTx0.Body.OutputAt(tx.Body.Input0.OutputIndex) | ||
err = eth.ValidateSignature(sigHash0, tx.Sigs[0][:], prevTx0Output.Owner) | ||
if err != nil { | ||
return NewErrInvalidSignature(0) | ||
} | ||
|
||
// 450 ns | ||
totalInput := big.NewInt(0) | ||
totalInput = totalInput.Add(totalInput, prevTx0Output.Amount) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another thing: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean never gets tripped? You mean if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah if |
||
|
@@ -60,14 +67,17 @@ func ValidateSpendTransaction(storage db.Storage, tx *chain.Transaction) (error) | |
} | ||
prevTx1Output := prevTx1.Body.OutputAt(tx.Body.Input1.OutputIndex) | ||
sigHash1 := tx.Body.SignatureHash() | ||
|
||
err = eth.ValidateSignature(sigHash1, tx.Sigs[1][:], prevTx1Output.Owner) | ||
|
||
if err != nil { | ||
return NewErrInvalidSignature(1) | ||
} | ||
|
||
totalInput = totalInput.Add(totalInput, prevTx1Output.Amount) | ||
} | ||
|
||
// 600-900 ns | ||
totalOutput := big.NewInt(0) | ||
totalOutput = totalOutput.Add(totalOutput, tx.Body.Output0.Amount) | ||
totalOutput = totalOutput.Add(totalOutput, tx.Body.Fee) | ||
|
@@ -79,6 +89,7 @@ func ValidateSpendTransaction(storage db.Storage, tx *chain.Transaction) (error) | |
return NewErrInputOutputValueMismatch(totalInput, totalOutput) | ||
} | ||
|
||
// 20 Us | ||
isDoubleSpent, err := storage.IsDoubleSpent(tx) | ||
if err != nil { | ||
return err | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically 70% of the work happens right here on this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you do the rough math here: If this takes say on average 200 us. That means about 5 transactions processed a millisecond. And in a 100ms block with 16-cores (my setup) parallelizing the processing you get: 5 transactions/ms x 100 ms/block x 16cores = 8000 TPS. I'm basically this figure plus/minus overhead.EDIT: Sorry my math was horrible. And this line doesn't average near 200us. It's much lower but it's still the main bottleneck during transaction validation and eating 70% of the time. This is pre-block-packaging.