Skip to content

Commit

Permalink
Move NumSpeculativeProcs from module variable to function parameter (#…
Browse files Browse the repository at this point in the history
…931)

This will prevent data races when more than one parallel execution are running at the same time.
  • Loading branch information
cffls authored Jul 13, 2023
1 parent 7e0e4c8 commit eedeaed
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 43 deletions.
68 changes: 33 additions & 35 deletions core/blockstm/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ type ExecVersionView struct {
sender common.Address
}

var NumSpeculativeProcs int = 8

func SetProcs(specProcs int) {
NumSpeculativeProcs = specProcs
}

func (ev *ExecVersionView) Execute() (er ExecResult) {
er.ver = ev.ver
if er.err = ev.et.Execute(ev.mvh, ev.ver.Incarnation); er.err != nil {
Expand Down Expand Up @@ -180,6 +174,9 @@ type ParallelExecutor struct {
// Stores the execution statistics for the last incarnation of each task
stats map[int]ExecutionStat

// Number of workers that execute transactions speculatively
numSpeculativeProcs int

statsMutex sync.Mutex

// Channel for tasks that should be prioritized
Expand Down Expand Up @@ -255,7 +252,7 @@ type ExecutionStat struct {
Worker int
}

func NewParallelExecutor(tasks []ExecTask, profile bool, metadata bool) *ParallelExecutor {
func NewParallelExecutor(tasks []ExecTask, profile bool, metadata bool, numProcs int) *ParallelExecutor {
numTasks := len(tasks)

var resultQueue SafeQueue
Expand All @@ -271,27 +268,28 @@ func NewParallelExecutor(tasks []ExecTask, profile bool, metadata bool) *Paralle
}

pe := &ParallelExecutor{
tasks: tasks,
stats: make(map[int]ExecutionStat, numTasks),
chTasks: make(chan ExecVersionView, numTasks),
chSpeculativeTasks: make(chan struct{}, numTasks),
chSettle: make(chan int, numTasks),
chResults: make(chan struct{}, numTasks),
specTaskQueue: specTaskQueue,
resultQueue: resultQueue,
lastSettled: -1,
skipCheck: make(map[int]bool),
execTasks: makeStatusManager(numTasks),
validateTasks: makeStatusManager(0),
diagExecSuccess: make([]int, numTasks),
diagExecAbort: make([]int, numTasks),
mvh: MakeMVHashMap(),
lastTxIO: MakeTxnInputOutput(numTasks),
txIncarnations: make([]int, numTasks),
estimateDeps: make(map[int][]int),
preValidated: make(map[int]bool),
begin: time.Now(),
profile: profile,
tasks: tasks,
numSpeculativeProcs: numProcs,
stats: make(map[int]ExecutionStat, numTasks),
chTasks: make(chan ExecVersionView, numTasks),
chSpeculativeTasks: make(chan struct{}, numTasks),
chSettle: make(chan int, numTasks),
chResults: make(chan struct{}, numTasks),
specTaskQueue: specTaskQueue,
resultQueue: resultQueue,
lastSettled: -1,
skipCheck: make(map[int]bool),
execTasks: makeStatusManager(numTasks),
validateTasks: makeStatusManager(0),
diagExecSuccess: make([]int, numTasks),
diagExecAbort: make([]int, numTasks),
mvh: MakeMVHashMap(),
lastTxIO: MakeTxnInputOutput(numTasks),
txIncarnations: make([]int, numTasks),
estimateDeps: make(map[int][]int),
preValidated: make(map[int]bool),
begin: time.Now(),
profile: profile,
}

return pe
Expand Down Expand Up @@ -329,10 +327,10 @@ func (pe *ParallelExecutor) Prepare() error {
}
}

pe.workerWg.Add(NumSpeculativeProcs + numGoProcs)
pe.workerWg.Add(pe.numSpeculativeProcs + numGoProcs)

// Launch workers that execute transactions
for i := 0; i < NumSpeculativeProcs+numGoProcs; i++ {
for i := 0; i < pe.numSpeculativeProcs+numGoProcs; i++ {
go func(procNum int) {
defer pe.workerWg.Done()

Expand Down Expand Up @@ -366,7 +364,7 @@ func (pe *ParallelExecutor) Prepare() error {
}
}

if procNum < NumSpeculativeProcs {
if procNum < pe.numSpeculativeProcs {
for range pe.chSpeculativeTasks {
doWork(pe.specTaskQueue.Pop().(ExecVersionView))
}
Expand Down Expand Up @@ -597,12 +595,12 @@ func (pe *ParallelExecutor) Step(res *ExecResult) (result ParallelExecutionResul

type PropertyCheck func(*ParallelExecutor) error

func executeParallelWithCheck(tasks []ExecTask, profile bool, check PropertyCheck, metadata bool, interruptCtx context.Context) (result ParallelExecutionResult, err error) {
func executeParallelWithCheck(tasks []ExecTask, profile bool, check PropertyCheck, metadata bool, numProcs int, interruptCtx context.Context) (result ParallelExecutionResult, err error) {
if len(tasks) == 0 {
return ParallelExecutionResult{MakeTxnInputOutput(len(tasks)), nil, nil, nil}, nil
}

pe := NewParallelExecutor(tasks, profile, metadata)
pe := NewParallelExecutor(tasks, profile, metadata, numProcs)
err = pe.Prepare()

if err != nil {
Expand Down Expand Up @@ -636,6 +634,6 @@ func executeParallelWithCheck(tasks []ExecTask, profile bool, check PropertyChec
return
}

func ExecuteParallel(tasks []ExecTask, profile bool, metadata bool, interruptCtx context.Context) (result ParallelExecutionResult, err error) {
return executeParallelWithCheck(tasks, profile, nil, metadata, interruptCtx)
func ExecuteParallel(tasks []ExecTask, profile bool, metadata bool, numProcs int, interruptCtx context.Context) (result ParallelExecutionResult, err error) {
return executeParallelWithCheck(tasks, profile, nil, metadata, numProcs, interruptCtx)
}
10 changes: 6 additions & 4 deletions core/blockstm/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

type OpType int

var numProcs = 8

const readType = 0
const writeType = 1
const otherType = 2
Expand Down Expand Up @@ -425,7 +427,7 @@ func runParallel(t *testing.T, tasks []ExecTask, validation PropertyCheck, metad
profile := false

start := time.Now()
result, err := executeParallelWithCheck(tasks, false, validation, metadata, nil)
result, err := executeParallelWithCheck(tasks, false, validation, metadata, numProcs, nil)

if result.Deps != nil && profile {
result.Deps.Report(*result.Stats, func(str string) { fmt.Println(str) })
Expand Down Expand Up @@ -458,7 +460,7 @@ func runParallel(t *testing.T, tasks []ExecTask, validation PropertyCheck, metad
func runParallelGetMetadata(t *testing.T, tasks []ExecTask, validation PropertyCheck) map[int]map[int]bool {
t.Helper()

res, err := executeParallelWithCheck(tasks, true, validation, false, nil)
res, err := executeParallelWithCheck(tasks, true, validation, false, numProcs, nil)

assert.NoError(t, err, "error occur during parallel execution")

Expand Down Expand Up @@ -943,7 +945,7 @@ func TestBreakFromCircularDependency(t *testing.T) {
cancel()

// This should not hang
_, err := ExecuteParallel(tasks, false, true, ctx)
_, err := ExecuteParallel(tasks, false, true, numProcs, ctx)

if err == nil {
t.Error("Expected cancel error")
Expand Down Expand Up @@ -976,7 +978,7 @@ func TestBreakFromPartialCircularDependency(t *testing.T) {
cancel()

// This should not hang
_, err := ExecuteParallel(tasks, false, true, ctx)
_, err := ExecuteParallel(tasks, false, true, numProcs, ctx)

if err == nil {
t.Error("Expected cancel error")
Expand Down
6 changes: 2 additions & 4 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,6 @@ var parallelizabilityTimer = metrics.NewRegisteredTimer("block/parallelizability
// transactions failed to execute due to insufficient gas it will return an error.
// nolint:gocognit
func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) {
blockstm.SetProcs(cfg.ParallelSpeculativeProcesses)

var (
receipts types.Receipts
header = block.Header()
Expand Down Expand Up @@ -364,7 +362,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
backupStateDB := statedb.Copy()

profile := false
result, err := blockstm.ExecuteParallel(tasks, profile, metadata, interruptCtx)
result, err := blockstm.ExecuteParallel(tasks, profile, metadata, cfg.ParallelSpeculativeProcesses, interruptCtx)

if err == nil && profile && result.Deps != nil {
_, weight := result.Deps.LongestPath(*result.Stats)
Expand Down Expand Up @@ -398,7 +396,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
t.totalUsedGas = usedGas
}

_, err = blockstm.ExecuteParallel(tasks, false, metadata, interruptCtx)
_, err = blockstm.ExecuteParallel(tasks, false, metadata, cfg.ParallelSpeculativeProcesses, interruptCtx)

break
}
Expand Down

0 comments on commit eedeaed

Please sign in to comment.