From 3f3549160a8135e6ebddec3d4bcca16a6ebd1df2 Mon Sep 17 00:00:00 2001 From: Will Lahti Date: Wed, 3 May 2017 17:27:30 -0400 Subject: [PATCH] [FAB-3663] Switch event producer timeout to duration This CR updates the event producer's timeout from an integer to a time duration. This has been done for consistency with other timeouts defined for the peer and orderer. Change-Id: I803a755b20943d192bdb8af92b995dcccdafe531 Signed-off-by: Will Lahti --- events/producer/events.go | 8 ++++---- events/producer/producer.go | 3 ++- events/producer/producer_test.go | 13 +++++++++++++ peer/node/start.go | 2 +- sampleconfig/core.yaml | 4 ++-- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/events/producer/events.go b/events/producer/events.go index 3a0a712e8db..8fb01717a69 100644 --- a/events/producer/events.go +++ b/events/producer/events.go @@ -198,11 +198,11 @@ type eventProcessor struct { //we could generalize this with mutiple channels each with its own size eventChannel chan *pb.Event - //milliseconds timeout for producer to send an event. + //timeout duration for producer to send an event. //if < 0, if buffer full, unblocks immediately and not send //if 0, if buffer full, will block and guarantee the event will be sent out //if > 0, if buffer full, blocks till timeout - timeout int + timeout time.Duration } //global eventProcessor singleton created by initializeEvents. Openchain producers @@ -236,7 +236,7 @@ func (ep *eventProcessor) start() { } //initialize and start -func initializeEvents(bufferSize uint, tout int) { +func initializeEvents(bufferSize uint, tout time.Duration) { if gEventProcessor != nil { panic("should not be called twice") } @@ -329,7 +329,7 @@ func Send(e *pb.Event) error { logger.Debugf("Event processor timeout > 0") select { case gEventProcessor.eventChannel <- e: - case <-time.After(time.Duration(gEventProcessor.timeout) * time.Millisecond): + case <-time.After(gEventProcessor.timeout): return fmt.Errorf("could not send the blocking event") } } diff --git a/events/producer/producer.go b/events/producer/producer.go index 77aab332682..c3fb96321ee 100644 --- a/events/producer/producer.go +++ b/events/producer/producer.go @@ -19,6 +19,7 @@ package producer import ( "fmt" "io" + "time" "github.com/hyperledger/fabric/common/flogging" pb "github.com/hyperledger/fabric/protos/peer" @@ -34,7 +35,7 @@ type EventsServer struct { var globalEventsServer *EventsServer // NewEventsServer returns a EventsServer -func NewEventsServer(bufferSize uint, timeout int) *EventsServer { +func NewEventsServer(bufferSize uint, timeout time.Duration) *EventsServer { if globalEventsServer != nil { panic("Cannot create multiple event hub servers") } diff --git a/events/producer/producer_test.go b/events/producer/producer_test.go index b68f7bd8a97..b84daaabb23 100644 --- a/events/producer/producer_test.go +++ b/events/producer/producer_test.go @@ -30,6 +30,8 @@ import ( "github.com/hyperledger/fabric/msp/mgmt/testtools" "github.com/hyperledger/fabric/protos/peer" "github.com/hyperledger/fabric/protos/utils" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" ) func createEvent() (*peer.Event, error) { @@ -118,6 +120,17 @@ func TestSignedEvent(t *testing.T) { } } +func TestNewEventsServer(t *testing.T) { + viper.Set("peer.events.buffersize", 100) + viper.Set("peer.events.timeout", "1ms") + + ehServer := NewEventsServer( + uint(viper.GetInt("peer.events.buffersize")), + viper.GetDuration("peer.events.timeout")) + + assert.NotNil(t, ehServer, "nil EventServer found") +} + var signer msp.SigningIdentity var signerSerialized []byte diff --git a/peer/node/start.go b/peer/node/start.go index b46894159c3..9c0c57c1210 100644 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -326,7 +326,7 @@ func createEventHubServer(secureConfig comm.SecureServerConfig) (comm.GRPCServer } ehServer := producer.NewEventsServer( uint(viper.GetInt("peer.events.buffersize")), - viper.GetInt("peer.events.timeout")) + viper.GetDuration("peer.events.timeout")) pb.RegisterEventsServer(grpcServer.Server(), ehServer) return grpcServer, nil diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index d6e9ad84dbd..e4292ed154a 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -158,11 +158,11 @@ peer: # validator sends buffersize: 100 - # milliseconds timeout for producer to send an event. + # timeout duration for producer to send an event. # if < 0, if buffer full, unblocks immediately and not send # if 0, if buffer full, will block and guarantee the event will be sent out # if > 0, if buffer full, blocks till timeout - timeout: 10 + timeout: 10ms # TLS Settings for p2p communications tls: