-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a simple mempool module prototype
The prototype expects that new transactions will be delivered to it by means of NewRequests events and stores them in an in-memory map. It uses hashes for transaction and batch IDs. To support crash-recovery, persistent storage should be added for the transactions.
- Loading branch information
Showing
6 changed files
with
243 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package common | ||
|
||
import ( | ||
"github.com/filecoin-project/mir/pkg/pb/requestpb" | ||
t "github.com/filecoin-project/mir/pkg/types" | ||
) | ||
|
||
// ModuleConfig sets the module ids. All replicas are expected to use identical module configurations. | ||
type ModuleConfig struct { | ||
Self t.ModuleID // id of this module | ||
Hasher t.ModuleID | ||
Crypto t.ModuleID | ||
} | ||
|
||
// ModuleParams sets the values for the parameters of an instance of the protocol. | ||
// All replicas are expected to use identical module parameters. | ||
type ModuleParams struct { | ||
MaxTransactionsInBatch int | ||
} | ||
|
||
// State represents the common state accessible to all parts of the module implementation. | ||
type State struct { | ||
TxByID map[t.TxID]*requestpb.Request | ||
} |
64 changes: 64 additions & 0 deletions
64
pkg/mempool/simplemempool/internal/parts/computeids/computeids.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package computeids | ||
|
||
import ( | ||
"github.com/filecoin-project/mir/pkg/dsl" | ||
mpdsl "github.com/filecoin-project/mir/pkg/mempool/dsl" | ||
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common" | ||
mppb "github.com/filecoin-project/mir/pkg/pb/mempoolpb" | ||
"github.com/filecoin-project/mir/pkg/pb/requestpb" | ||
"github.com/filecoin-project/mir/pkg/serializing" | ||
t "github.com/filecoin-project/mir/pkg/types" | ||
) | ||
|
||
// | ||
func IncludeComputationOfTransactionAndBatchIDs( | ||
m dsl.Module, | ||
mc *common.ModuleConfig, | ||
params *common.ModuleParams, | ||
commonState *common.State, | ||
) { | ||
mpdsl.UponRequestTransactionIDs(m, func(txs []*requestpb.Request, origin *mppb.RequestTransactionIDsOrigin) error { | ||
txMsgs := make([][][]byte, len(txs)) | ||
for i, tx := range txs { | ||
txMsgs[i] = serializing.RequestForHash(tx) | ||
} | ||
|
||
dsl.HashRequest(m, mc.Hasher, txMsgs, &computeHashForTransactionIDsContext{origin}) | ||
return nil | ||
}) | ||
|
||
dsl.UponHashResult(m, func(hashes [][]byte, context *computeHashForTransactionIDsContext) error { | ||
txIDs := make([]t.TxID, len(hashes)) | ||
for i, hash := range hashes { | ||
txIDs[i] = t.TxID(hash) | ||
} | ||
|
||
mpdsl.TransactionIDsResponse(m, t.ModuleID(context.origin.Module), txIDs, context.origin) | ||
return nil | ||
}) | ||
|
||
mpdsl.UponRequestBatchID(m, func(txIDs []t.TxID, origin *mppb.RequestBatchIDOrigin) error { | ||
data := make([][]byte, len(txIDs)) | ||
for i, txID := range txIDs { | ||
data[i] = txID.Bytes() | ||
} | ||
|
||
dsl.HashOneMessage(m, mc.Hasher, data, &computeHashForBatchIDContext{origin}) | ||
return nil | ||
}) | ||
|
||
dsl.UponOneHashResult(m, func(hash []byte, context *computeHashForBatchIDContext) error { | ||
mpdsl.BatchIDResponse(m, t.ModuleID(context.origin.Module), t.BatchID(hash), context.origin) | ||
return nil | ||
}) | ||
} | ||
|
||
// Context data structures | ||
|
||
type computeHashForTransactionIDsContext struct { | ||
origin *mppb.RequestTransactionIDsOrigin | ||
} | ||
|
||
type computeHashForBatchIDContext struct { | ||
origin *mppb.RequestBatchIDOrigin | ||
} |
75 changes: 75 additions & 0 deletions
75
pkg/mempool/simplemempool/internal/parts/formbatches/formbatches.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package formbatches | ||
|
||
import ( | ||
"github.com/filecoin-project/mir/pkg/dsl" | ||
mpdsl "github.com/filecoin-project/mir/pkg/mempool/dsl" | ||
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common" | ||
mppb "github.com/filecoin-project/mir/pkg/pb/mempoolpb" | ||
"github.com/filecoin-project/mir/pkg/pb/requestpb" | ||
t "github.com/filecoin-project/mir/pkg/types" | ||
) | ||
|
||
type State struct { | ||
*common.State | ||
NewTxIDs []t.TxID | ||
} | ||
|
||
// IncludeBatchCreation registers event handlers for processing new transactions and forming batches. | ||
func IncludeBatchCreation( | ||
m dsl.Module, | ||
mc *common.ModuleConfig, | ||
params *common.ModuleParams, | ||
commonState *common.State, | ||
) { | ||
state := &State{ | ||
State: commonState, | ||
NewTxIDs: nil, | ||
} | ||
|
||
dsl.UponNewRequests(m, func(txs []*requestpb.Request) error { | ||
mpdsl.RequestTransactionIDs(m, mc.Self, txs, &requestTxIDsContext{txs}) | ||
return nil | ||
}) | ||
|
||
mpdsl.UponTransactionIDsResponse(m, func(txIDs []t.TxID, context *requestTxIDsContext) error { | ||
for i := range txIDs { | ||
state.TxByID[txIDs[i]] = context.txs[i] | ||
} | ||
state.NewTxIDs = append(state.NewTxIDs, txIDs...) | ||
return nil | ||
}) | ||
|
||
mpdsl.UponRequestBatch(m, func(origin *mppb.RequestBatchOrigin) error { | ||
var txIDs []t.TxID | ||
var txs []*requestpb.Request | ||
batchSize := 0 | ||
|
||
var i int | ||
var txID t.TxID | ||
|
||
for i, txID = range state.NewTxIDs { | ||
tx := state.TxByID[txID] | ||
|
||
// TODO: add other limitations (if any) here. | ||
if i == params.MaxTransactionsInBatch { | ||
break | ||
} | ||
|
||
txIDs = append(txIDs, txID) | ||
txs = append(txs, tx) | ||
batchSize += len(tx.Data) | ||
} | ||
|
||
state.NewTxIDs = state.NewTxIDs[i:] | ||
|
||
// Note that a batch may be empty. | ||
mpdsl.NewBatch(m, t.ModuleID(origin.Module), txIDs, txs, origin) | ||
return nil | ||
}) | ||
} | ||
|
||
// Context data structures | ||
|
||
type requestTxIDsContext struct { | ||
txs []*requestpb.Request | ||
} |
30 changes: 30 additions & 0 deletions
30
pkg/mempool/simplemempool/internal/parts/lookuptxs/lockuptxs.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package lookuptxs | ||
|
||
import ( | ||
"github.com/filecoin-project/mir/pkg/dsl" | ||
mpdsl "github.com/filecoin-project/mir/pkg/mempool/dsl" | ||
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common" | ||
mppb "github.com/filecoin-project/mir/pkg/pb/mempoolpb" | ||
"github.com/filecoin-project/mir/pkg/pb/requestpb" | ||
t "github.com/filecoin-project/mir/pkg/types" | ||
) | ||
|
||
// IncludeTransactionLookupByID registers event handlers for transaction looking up transactions in the mempool by | ||
// their IDs. | ||
func IncludeTransactionLookupByID( | ||
m dsl.Module, | ||
mc *common.ModuleConfig, | ||
params *common.ModuleParams, | ||
commonState *common.State, | ||
) { | ||
mpdsl.UponRequestTransactions(m, func(txIDs []t.TxID, origin *mppb.RequestTransactionsOrigin) error { | ||
present := make([]bool, len(txIDs)) | ||
txs := make([]*requestpb.Request, len(txIDs)) | ||
for i, txID := range txIDs { | ||
txs[i], present[i] = commonState.TxByID[txID] | ||
} | ||
|
||
mpdsl.TransactionsResponse(m, t.ModuleID(origin.Module), present, txs, origin) | ||
return nil | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package simplemempool | ||
|
||
import ( | ||
"github.com/filecoin-project/mir/pkg/dsl" | ||
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/common" | ||
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/computeids" | ||
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/formbatches" | ||
"github.com/filecoin-project/mir/pkg/mempool/simplemempool/internal/parts/lookuptxs" | ||
"github.com/filecoin-project/mir/pkg/modules" | ||
"github.com/filecoin-project/mir/pkg/pb/requestpb" | ||
t "github.com/filecoin-project/mir/pkg/types" | ||
) | ||
|
||
// ModuleConfig sets the module ids. All replicas are expected to use identical module configurations. | ||
type ModuleConfig = common.ModuleConfig | ||
|
||
// ModuleParams sets the values for the parameters of an instance of the protocol. | ||
// All replicas are expected to use identical module parameters. | ||
type ModuleParams = common.ModuleParams | ||
|
||
// DefaultModuleConfig returns a valid module config with default names for all modules. | ||
func DefaultModuleConfig() *ModuleConfig { | ||
return &ModuleConfig{ | ||
Self: "availability", | ||
Hasher: "hasher", | ||
Crypto: "crypto", | ||
} | ||
} | ||
|
||
// NewModule creates a new instance of a simple mempool module implementation. It passively waits for | ||
// eventpb.NewRequests events and stores them in a local map. | ||
// | ||
// On a batch request, this implementation creates a batch that consists of as many requests received since the | ||
// previous batch request as possible with respect to params.MaxTransactionsInBatch. | ||
// | ||
// This implementation uses the hash function provided by the mc.Hasher module to compute transaction IDs and batch IDs. | ||
func NewModule(mc *ModuleConfig, params *ModuleParams) modules.Module { | ||
m := dsl.NewModule(mc.Self) | ||
|
||
commonState := &common.State{ | ||
TxByID: make(map[t.TxID]*requestpb.Request), | ||
} | ||
|
||
computeids.IncludeComputationOfTransactionAndBatchIDs(m, mc, params, commonState) | ||
formbatches.IncludeBatchCreation(m, mc, params, commonState) | ||
lookuptxs.IncludeTransactionLookupByID(m, mc, params, commonState) | ||
|
||
return m | ||
} |