Skip to content

Commit

Permalink
Make pbft broadcast timeout configurable
Browse files Browse the repository at this point in the history
Add a new parameter general.broadcastTimeout for pbft broadcast progress.
Default set to 1s. Add test for pbft config setting and overriding.

Change-Id: I72acf90be5a5b27cf0c45e249609216e63a7e007
Signed-off-by: jiangyaoguo <jiangyaoguo@gmail.com>
  • Loading branch information
jiangyaoguo committed Aug 23, 2016
1 parent 8ea25a9 commit d74b1c5
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 21 deletions.
2 changes: 1 addition & 1 deletion consensus/pbft/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatc
op.pbft = newPbftCore(id, config, op, etf)
op.manager.Start()
op.externalEventReceiver.manager = op.manager
op.broadcaster = newBroadcaster(id, op.pbft.N, op.pbft.f, stack)
op.broadcaster = newBroadcaster(id, op.pbft.N, op.pbft.f, op.pbft.broadcastTimeout, stack)

op.batchSize = config.GetInt("general.batchsize")
op.batchStore = nil
Expand Down
22 changes: 12 additions & 10 deletions consensus/pbft/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,28 @@ type communicator interface {
type broadcaster struct {
comm communicator

f int
msgChans map[uint64]chan *sendRequest
closed sync.WaitGroup
closedCh chan struct{}
f int
broadcastTimeout time.Duration
msgChans map[uint64]chan *sendRequest
closed sync.WaitGroup
closedCh chan struct{}
}

type sendRequest struct {
msg *pb.Message
done chan bool
}

func newBroadcaster(self uint64, N int, f int, c communicator) *broadcaster {
func newBroadcaster(self uint64, N int, f int, broadcastTimeout time.Duration, c communicator) *broadcaster {
queueSize := 10 // XXX increase after testing

chans := make(map[uint64]chan *sendRequest)
b := &broadcaster{
comm: c,
f: f,
msgChans: chans,
closedCh: make(chan struct{}),
comm: c,
f: f,
broadcastTimeout: broadcastTimeout,
msgChans: chans,
closedCh: make(chan struct{}),
}
for i := 0; i < N; i++ {
if uint64(i) == self {
Expand Down Expand Up @@ -172,7 +174,7 @@ func (b *broadcaster) send(msg *pb.Message, dest *uint64) error {
}

succeeded := 0
timer := time.NewTimer(time.Second) // TODO, make this configurable
timer := time.NewTimer(b.broadcastTimeout)

// This loop will try to send, until one of:
// a) the required number of sends succeed
Expand Down
45 changes: 40 additions & 5 deletions consensus/pbft/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestBroadcast(t *testing.T) {
}
}()

b := newBroadcaster(1, 4, 1, m)
b := newBroadcaster(1, 4, 1, time.Second, m)

msg := &pb.Message{Payload: []byte("hi")}
b.Broadcast(msg)
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestBroadcastStuck(t *testing.T) {
}
}()

b := newBroadcaster(1, 4, 1, m)
b := newBroadcaster(1, 4, 1, time.Second, m)

maxc := 20
for c := 0; c < maxc; c++ {
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestBroadcastUnicast(t *testing.T) {
}
}()

b := newBroadcaster(1, 4, 1, m)
b := newBroadcaster(1, 4, 1, time.Second, m)

msg := &pb.Message{Payload: []byte("hi")}
b.Unicast(msg, 0)
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestBroadcastAllFail(t *testing.T) {
done: make(chan struct{}),
}

b := newBroadcaster(1, 4, 1, m)
b := newBroadcaster(1, 4, 1, time.Second, m)

maxc := 20
for c := 0; c < maxc; c++ {
Expand All @@ -228,6 +228,41 @@ func TestBroadcastAllFail(t *testing.T) {
}
}

func TestBroadcastTimeout(t *testing.T) {
expectTime := 10 * time.Second
deltaTime := 50 * time.Millisecond
m := &mockIndefinitelyStuckComm{
mockComm: mockComm{
self: 1,
n: 4,
msgCh: make(chan mockMsg),
},
done: make(chan struct{}),
}

b := newBroadcaster(1, 4, 1, expectTime, m)
broadcastDone := make(chan time.Time)

beginTime := time.Now()
go func() {
b.Broadcast(&pb.Message{Payload: []byte(fmt.Sprintf("%d", 1))})
broadcastDone <- time.Now()
}()

checkTime := expectTime + deltaTime
select {
case endTime := <-broadcastDone:
t.Log("Broadcast consume time: ", endTime.Sub(beginTime))
close(broadcastDone)
close(m.done)
return
case <-time.After(checkTime):
close(broadcastDone)
close(m.done)
t.Fatalf("Broadcast timeout after %v, expected %v", checkTime, expectTime)
}
}

type mockIndefinitelyStuckComm struct {
mockComm
done chan struct{}
Expand All @@ -250,7 +285,7 @@ func TestBroadcastIndefinitelyStuck(t *testing.T) {
done: make(chan struct{}),
}

b := newBroadcaster(1, 4, 1, m)
b := newBroadcaster(1, 4, 1, time.Second, m)

broadcastDone := make(chan struct{})

Expand Down
3 changes: 3 additions & 0 deletions consensus/pbft/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ general:
# Interval to send "keep-alive" null requests. Set to 0 to disable. If enabled, must be greater than request timeout
nullrequest: 0s

# How long may a message broadcast take.
broadcast: 1s

################################################################################
#
# SECTION: EXECUTOR
Expand Down
6 changes: 6 additions & 0 deletions consensus/pbft/pbft-core.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type pbftCore struct {
newViewTimeout time.Duration // progress timeout for new views
newViewTimerReason string // what triggered the timer
lastNewViewTimeout time.Duration // last timeout we used during this view change
broadcastTimeout time.Duration // progress timeout for broadcast
outstandingReqBatches map[string]*RequestBatch // track whether we are waiting for request batches to execute

nullRequestTimer events.Timer // timeout triggering a null request
Expand Down Expand Up @@ -255,6 +256,10 @@ func newPbftCore(id uint64, config *viper.Viper, consumer innerStack, etf events
if err != nil {
instance.nullRequestTimeout = 0
}
instance.broadcastTimeout, err = time.ParseDuration(config.GetString("general.timeout.broadcast"))
if err != nil {
panic(fmt.Errorf("Cannot parse new broadcast timeout: %s", err))
}

instance.activeView = true
instance.replicaCount = instance.N
Expand All @@ -266,6 +271,7 @@ func newPbftCore(id uint64, config *viper.Viper, consumer innerStack, etf events
logger.Infof("PBFT request timeout = %v", instance.requestTimeout)
logger.Infof("PBFT view change timeout = %v", instance.newViewTimeout)
logger.Infof("PBFT Checkpoint period (K) = %v", instance.K)
logger.Infof("PBFT broadcast timeout = %v", instance.broadcastTimeout)
logger.Infof("PBFT Log multiplier = %v", instance.logMultiplier)
logger.Infof("PBFT log size (L) = %v", instance.L)
if instance.nullRequestTimeout > 0 {
Expand Down
131 changes: 126 additions & 5 deletions consensus/pbft/pbft-core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,41 @@ func init() {
logging.SetLevel(logging.DEBUG, "")
}

func TestConfigSet(t *testing.T) {
config := loadConfig()

testKeys := []string{
"general.mode",
"general.N",
"general.f",
"general.K",
"general.logmultiplier",
"general.batchsize",
"general.byzantine",
"general.viewchangeperiod",
"general.timeout.batch",
"general.timeout.request",
"general.timeout.viewchange",
"general.timeout.resendviewchange",
"general.timeout.nullrequest",
"general.timeout.broadcast",
"executor.queuesize",
}

for _, key := range testKeys {
if ok := config.IsSet(key); !ok {
t.Errorf("Cannot test env override because \"%s\" does not seem to be set", key)
}
}
}

func TestEnvOverride(t *testing.T) {
config := loadConfig()

key := "general.mode" // for a key that exists
envName := "CORE_PBFT_GENERAL_MODE" // env override name
overrideValue := "overide_test" // value to override default value with

// test key
if ok := config.IsSet("general.mode"); !ok {
t.Fatalf("Cannot test env override because \"%s\" does not seem to be set", key)
}

os.Setenv(envName, overrideValue)
// The override config value will cause other calls to fail unless unset.
defer func() {
Expand All @@ -66,6 +89,104 @@ func TestEnvOverride(t *testing.T) {

}

func TestIntEnvOverride(t *testing.T) {
config := loadConfig()

tests := []struct {
key string
envName string
overrideValue string
expectValue int
}{
{"general.N", "CORE_PBFT_GENERAL_N", "8", 8},
{"general.f", "CORE_PBFT_GENERAL_F", "2", 2},
{"general.K", "CORE_PBFT_GENERAL_K", "20", 20},
{"general.logmultiplier", "CORE_PBFT_GENERAL_LOGMULTIPLIER", "6", 6},
{"general.batchsize", "CORE_PBFT_GENERAL_BATCHSIZE", "200", 200},
{"general.viewchangeperiod", "CORE_PBFT_GENERAL_VIEWCHANGEPERIOD", "5", 5},
{"executor.queuesize", "CORE_PBFT_EXECUTOR_QUEUESIZE", "50", 50},
}

for _, test := range tests {
os.Setenv(test.envName, test.overrideValue)

if ok := config.IsSet(test.key); !ok {
t.Errorf("Env override in place, and key \"%s\" is not set", test.key)
}

configVal := config.GetInt(test.key)
if configVal != test.expectValue {
t.Errorf("Env override in place, expected key \"%s\" to be \"%v\" but instead got \"%d\"", test.key, test.expectValue, configVal)
}

os.Unsetenv(test.envName)
}
}

func TestDurationEnvOverride(t *testing.T) {
config := loadConfig()

tests := []struct {
key string
envName string
overrideValue string
expectValue time.Duration
}{
{"general.timeout.batch", "CORE_PBFT_GENERAL_TIMEOUT_BATCH", "2s", 2 * time.Second},
{"general.timeout.request", "CORE_PBFT_GENERAL_TIMEOUT_REQUEST", "4s", 4 * time.Second},
{"general.timeout.viewchange", "CORE_PBFT_GENERAL_TIMEOUT_VIEWCHANGE", "5s", 5 * time.Second},
{"general.timeout.resendviewchange", "CORE_PBFT_GENERAL_TIMEOUT_RESENDVIEWCHANGE", "200ms", 200 * time.Millisecond},
{"general.timeout.nullrequest", "CORE_PBFT_GENERAL_TIMEOUT_NULLREQUEST", "1s", time.Second},
{"general.timeout.broadcast", "CORE_PBFT_GENERAL_TIMEOUT_BROADCAST", "1m", time.Minute},
}

for _, test := range tests {
os.Setenv(test.envName, test.overrideValue)

if ok := config.IsSet(test.key); !ok {
t.Errorf("Env override in place, and key \"%s\" is not set", test.key)
}

configVal := config.GetDuration(test.key)
if configVal != test.expectValue {
t.Errorf("Env override in place, expected key \"%s\" to be \"%v\" but instead got \"%v\"", test.key, test.expectValue, configVal)
}

os.Unsetenv(test.envName)
}
}

func TestBoolEnvOverride(t *testing.T) {
config := loadConfig()

tests := []struct {
key string
envName string
overrideValue string
expectValue bool
}{
{"general.byzantine", "CORE_PBFT_GENERAL_BYZANTINE", "false", false},
{"general.byzantine", "CORE_PBFT_GENERAL_BYZANTINE", "0", false},
{"general.byzantine", "CORE_PBFT_GENERAL_BYZANTINE", "true", true},
{"general.byzantine", "CORE_PBFT_GENERAL_BYZANTINE", "1", true},
}

for i, test := range tests {
os.Setenv(test.envName, test.overrideValue)

if ok := config.IsSet(test.key); !ok {
t.Errorf("Env override in place, and key \"%s\" is not set", test.key)
}

configVal := config.GetBool(test.key)
if configVal != test.expectValue {
t.Errorf("Test %d Env override in place, expected key \"%s\" to be \"%v\" but instead got \"%v\"", i, test.key, test.expectValue, configVal)
}

os.Unsetenv(test.envName)
}
}

func TestMaliciousPrePrepare(t *testing.T) {
mock := &omniProto{
broadcastImpl: func(msgPayload []byte) {
Expand Down

0 comments on commit d74b1c5

Please sign in to comment.