Skip to content

Commit

Permalink
Merge pull request #225 from yacovm/createPoolWithTimeoutHandler
Browse files Browse the repository at this point in the history
Create pool with timeout handler
  • Loading branch information
yacovm authored Aug 5, 2019
2 parents b94942b + 84d0f9b commit 2bb2eaf
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 39 deletions.
18 changes: 14 additions & 4 deletions internal/bft/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,21 @@ import (
"time"

"github.com/SmartBFT-Go/consensus/internal/bft"
"github.com/SmartBFT-Go/consensus/internal/bft/mocks"
"github.com/SmartBFT-Go/consensus/pkg/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
)

var (
noopTimeoutHandler = &mocks.RequestTimeoutHandler{}
)

func init() {
noopTimeoutHandler.On("OnRequestTimeout", mock.Anything, mock.Anything)
}

func TestBatcherBasic(t *testing.T) {
basicLog, err := zap.NewDevelopment()
assert.NoError(t, err)
Expand All @@ -25,7 +35,7 @@ func TestBatcherBasic(t *testing.T) {
byteReq1 := makeTestRequest("1", "1", "foo")
byteReq2 := makeTestRequest("2", "2", "foo")
byteReq3 := makeTestRequest("3", "3", "foo")
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 3})
pool := bft.NewPool(log, insp, noopTimeoutHandler, bft.PoolOptions{QueueSize: 3})
err = pool.Submit(byteReq1)
assert.NoError(t, err)

Expand Down Expand Up @@ -95,7 +105,7 @@ func TestBatcherWhileSubmitting(t *testing.T) {
assert.NoError(t, err)
log := basicLog.Sugar()
insp := &testRequestInspector{}
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 200})
pool := bft.NewPool(log, insp, noopTimeoutHandler, bft.PoolOptions{QueueSize: 200})

batcher := bft.NewBatchBuilder(pool, 100, 100*time.Second) // long time

Expand Down Expand Up @@ -136,7 +146,7 @@ func TestBatcherClose(t *testing.T) {
insp := &testRequestInspector{}

byteReq := makeTestRequest("1", "1", "foo")
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 3})
pool := bft.NewPool(log, insp, noopTimeoutHandler, bft.PoolOptions{QueueSize: 3})
err = pool.Submit(byteReq)
assert.NoError(t, err)

Expand Down Expand Up @@ -170,7 +180,7 @@ func TestBatcherReset(t *testing.T) {
insp := &testRequestInspector{}

byteReq1 := makeTestRequest("1", "1", "foo")
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 3})
pool := bft.NewPool(log, insp, noopTimeoutHandler, bft.PoolOptions{QueueSize: 3})
err = pool.Submit(byteReq1)
assert.NoError(t, err)

Expand Down
21 changes: 9 additions & 12 deletions internal/bft/requestpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

const (
DefaultRequestTimeout = 60000 * time.Millisecond
DefaultRequestTimeout = 10 * time.Second
)

//go:generate mockery -dir . -name RequestTimeoutHandler -case underscore -output ./mocks/
Expand Down Expand Up @@ -68,7 +68,7 @@ type PoolOptions struct {
}

// NewPool constructs new requests pool
func NewPool(log api.Logger, inspector api.RequestInspector, options PoolOptions) *Pool {
func NewPool(log api.Logger, inspector api.RequestInspector, th RequestTimeoutHandler, options PoolOptions) *Pool {
if options.RequestTimeout == 0 {
options.RequestTimeout = DefaultRequestTimeout
}
Expand All @@ -80,19 +80,16 @@ func NewPool(log api.Logger, inspector api.RequestInspector, options PoolOptions
}

return &Pool{
logger: log,
inspector: inspector,
fifo: list.New(),
semaphore: semaphore.NewWeighted(options.QueueSize),
existMap: make(map[types.RequestInfo]*list.Element),
options: options,
timeoutHandler: th,
logger: log,
inspector: inspector,
fifo: list.New(),
semaphore: semaphore.NewWeighted(options.QueueSize),
existMap: make(map[types.RequestInfo]*list.Element),
options: options,
}
}

func (rp *Pool) SetTimeoutHandler(handler RequestTimeoutHandler) {
rp.timeoutHandler = handler
}

func (rp *Pool) isStopped() bool {
rp.lock.Lock()
defer rp.lock.Unlock()
Expand Down
27 changes: 9 additions & 18 deletions internal/bft/requestpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ func TestReqPoolBasic(t *testing.T) {
t.Run("create close", func(t *testing.T) {
timeoutHandler := &mocks.RequestTimeoutHandler{}

pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 3, RequestTimeout: time.Hour})
pool.SetTimeoutHandler(timeoutHandler)
pool := bft.NewPool(log, insp, timeoutHandler, bft.PoolOptions{QueueSize: 3, RequestTimeout: time.Hour})

assert.Equal(t, 0, pool.Size())
err = pool.Submit(byteReq1)
Expand All @@ -53,9 +52,8 @@ func TestReqPoolBasic(t *testing.T) {
t.Run("submit remove next", func(t *testing.T) {
timeoutHandler := &mocks.RequestTimeoutHandler{}

pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 3, RequestTimeout: time.Hour})
pool := bft.NewPool(log, insp, timeoutHandler, bft.PoolOptions{QueueSize: 3, RequestTimeout: time.Hour})
defer pool.Close()
pool.SetTimeoutHandler(timeoutHandler)

assert.Equal(t, 0, pool.Size())
err = pool.Submit(byteReq1)
Expand Down Expand Up @@ -152,9 +150,8 @@ func TestReqPoolCapacity(t *testing.T) {

t.Run("eventually submit", func(t *testing.T) {
timeoutHandler := &mocks.RequestTimeoutHandler{}
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 50, RequestTimeout: time.Hour})
pool := bft.NewPool(log, insp, timeoutHandler, bft.PoolOptions{QueueSize: 50, RequestTimeout: time.Hour})
defer pool.Close()
pool.SetTimeoutHandler(timeoutHandler)

wg := sync.WaitGroup{}
wg.Add(2 * numReq)
Expand Down Expand Up @@ -188,9 +185,8 @@ func TestReqPoolCapacity(t *testing.T) {

t.Run("submit storm", func(t *testing.T) {
timeoutHandler := &mocks.RequestTimeoutHandler{}
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 50, RequestTimeout: time.Hour})
pool := bft.NewPool(log, insp, timeoutHandler, bft.PoolOptions{QueueSize: 50, RequestTimeout: time.Hour})
defer pool.Close()
pool.SetTimeoutHandler(timeoutHandler)

wg := sync.WaitGroup{}
wg.Add(2 * numReq)
Expand Down Expand Up @@ -237,9 +233,8 @@ func TestReqPoolPrune(t *testing.T) {

byteReq1 := makeTestRequest("1", "1", "foo")
byteReq2 := makeTestRequest("2", "2", "bar")
pool := bft.NewPool(log, insp, bft.PoolOptions{QueueSize: 3, RequestTimeout: time.Hour})
pool := bft.NewPool(log, insp, timeoutHandler, bft.PoolOptions{QueueSize: 3, RequestTimeout: time.Hour})
defer pool.Close()
pool.SetTimeoutHandler(timeoutHandler)

assert.Equal(t, 0, pool.Size())

Expand Down Expand Up @@ -291,7 +286,7 @@ func TestReqPoolTimeout(t *testing.T) {
assert.Fail(t, "called OnAutoRemoveTimeout")
}).Return()

pool := bft.NewPool(log, insp,
pool := bft.NewPool(log, insp, timeoutHandler,
bft.PoolOptions{
QueueSize: 3,
RequestTimeout: 10 * time.Millisecond,
Expand All @@ -300,7 +295,6 @@ func TestReqPoolTimeout(t *testing.T) {
},
)
defer pool.Close()
pool.SetTimeoutHandler(timeoutHandler)

assert.Equal(t, 0, pool.Size())
err = pool.Submit(byteReq1)
Expand Down Expand Up @@ -336,7 +330,7 @@ func TestReqPoolTimeout(t *testing.T) {
assert.Fail(t, "called OnAutoRemoveTimeout")
}).Return()

pool := bft.NewPool(log, insp,
pool := bft.NewPool(log, insp, timeoutHandler,
bft.PoolOptions{
QueueSize: 3,
RequestTimeout: 10 * time.Millisecond,
Expand All @@ -345,7 +339,6 @@ func TestReqPoolTimeout(t *testing.T) {
},
)
defer pool.Close()
pool.SetTimeoutHandler(timeoutHandler)

assert.Equal(t, 0, pool.Size())
err = pool.Submit(byteReq1)
Expand Down Expand Up @@ -383,7 +376,7 @@ func TestReqPoolTimeout(t *testing.T) {
to3WG.Done()
}).Return()

pool := bft.NewPool(log, insp,
pool := bft.NewPool(log, insp, timeoutHandler,
bft.PoolOptions{
QueueSize: 3,
RequestTimeout: 10 * time.Millisecond,
Expand All @@ -392,7 +385,6 @@ func TestReqPoolTimeout(t *testing.T) {
},
)
defer pool.Close()
pool.SetTimeoutHandler(timeoutHandler)

assert.Equal(t, 0, pool.Size())
err = pool.Submit(byteReq1)
Expand Down Expand Up @@ -429,7 +421,7 @@ func TestReqPoolTimeout(t *testing.T) {
assert.Fail(t, "called OnAutoRemoveTimeout")
}).Return()

pool := bft.NewPool(log, insp,
pool := bft.NewPool(log, insp, timeoutHandler,
bft.PoolOptions{
QueueSize: 3,
RequestTimeout: 100 * time.Millisecond,
Expand All @@ -438,7 +430,6 @@ func TestReqPoolTimeout(t *testing.T) {
},
)
defer pool.Close()
pool.SetTimeoutHandler(timeoutHandler)
assert.Equal(t, 0, pool.Size())
err = pool.Submit(byteReq1)
assert.NoError(t, err)
Expand Down
9 changes: 4 additions & 5 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,12 @@ func (c *Consensus) Start() {
LeaderFwdTimeout: requestTimeout,
AutoRemoveTimeout: requestTimeout,
}
pool := algorithm.NewPool(c.Logger, c.RequestInspector, opts)
batchBuilder := algorithm.NewBatchBuilder(pool, c.BatchSize, c.BatchTimeout)

c.controller = &algorithm.Controller{
ProposerBuilder: c,
WAL: c.WAL,
ID: c.SelfID,
N: c.N,
Batcher: batchBuilder,
RequestPool: pool,
RequestTimeout: requestTimeout,
Verifier: c.Verifier,
Logger: c.Logger,
Expand All @@ -86,7 +82,10 @@ func (c *Consensus) Start() {
RequestInspector: c.RequestInspector,
}

pool.SetTimeoutHandler(c.controller)
pool := algorithm.NewPool(c.Logger, c.RequestInspector, c.controller, opts)
batchBuilder := algorithm.NewBatchBuilder(pool, c.BatchSize, c.BatchTimeout)
c.controller.RequestPool = pool
c.controller.Batcher = batchBuilder

// If we delivered to the application proposal with sequence i,
// then we are expecting to be proposed a proposal with sequence i+1.
Expand Down

0 comments on commit 2bb2eaf

Please sign in to comment.