Skip to content

Commit

Permalink
[FAB-3663] Switch event producer timeout to duration
Browse files Browse the repository at this point in the history
This CR updates the event producer's timeout from an integer to a time
duration. This has been done for consistency with other timeouts defined
for the peer and orderer.

Change-Id: I803a755b20943d192bdb8af92b995dcccdafe531
Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
  • Loading branch information
wlahti committed May 4, 2017
1 parent f2e94b3 commit 3f35491
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 8 deletions.
8 changes: 4 additions & 4 deletions events/producer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ type eventProcessor struct {
//we could generalize this with mutiple channels each with its own size
eventChannel chan *pb.Event

//milliseconds timeout for producer to send an event.
//timeout duration for producer to send an event.
//if < 0, if buffer full, unblocks immediately and not send
//if 0, if buffer full, will block and guarantee the event will be sent out
//if > 0, if buffer full, blocks till timeout
timeout int
timeout time.Duration
}

//global eventProcessor singleton created by initializeEvents. Openchain producers
Expand Down Expand Up @@ -236,7 +236,7 @@ func (ep *eventProcessor) start() {
}

//initialize and start
func initializeEvents(bufferSize uint, tout int) {
func initializeEvents(bufferSize uint, tout time.Duration) {
if gEventProcessor != nil {
panic("should not be called twice")
}
Expand Down Expand Up @@ -329,7 +329,7 @@ func Send(e *pb.Event) error {
logger.Debugf("Event processor timeout > 0")
select {
case gEventProcessor.eventChannel <- e:
case <-time.After(time.Duration(gEventProcessor.timeout) * time.Millisecond):
case <-time.After(gEventProcessor.timeout):
return fmt.Errorf("could not send the blocking event")
}
}
Expand Down
3 changes: 2 additions & 1 deletion events/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package producer
import (
"fmt"
"io"
"time"

"github.com/hyperledger/fabric/common/flogging"
pb "github.com/hyperledger/fabric/protos/peer"
Expand All @@ -34,7 +35,7 @@ type EventsServer struct {
var globalEventsServer *EventsServer

// NewEventsServer returns a EventsServer
func NewEventsServer(bufferSize uint, timeout int) *EventsServer {
func NewEventsServer(bufferSize uint, timeout time.Duration) *EventsServer {
if globalEventsServer != nil {
panic("Cannot create multiple event hub servers")
}
Expand Down
13 changes: 13 additions & 0 deletions events/producer/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/hyperledger/fabric/msp/mgmt/testtools"
"github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)

func createEvent() (*peer.Event, error) {
Expand Down Expand Up @@ -118,6 +120,17 @@ func TestSignedEvent(t *testing.T) {
}
}

func TestNewEventsServer(t *testing.T) {
viper.Set("peer.events.buffersize", 100)
viper.Set("peer.events.timeout", "1ms")

ehServer := NewEventsServer(
uint(viper.GetInt("peer.events.buffersize")),
viper.GetDuration("peer.events.timeout"))

assert.NotNil(t, ehServer, "nil EventServer found")
}

var signer msp.SigningIdentity
var signerSerialized []byte

Expand Down
2 changes: 1 addition & 1 deletion peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func createEventHubServer(secureConfig comm.SecureServerConfig) (comm.GRPCServer
}
ehServer := producer.NewEventsServer(
uint(viper.GetInt("peer.events.buffersize")),
viper.GetInt("peer.events.timeout"))
viper.GetDuration("peer.events.timeout"))

pb.RegisterEventsServer(grpcServer.Server(), ehServer)
return grpcServer, nil
Expand Down
4 changes: 2 additions & 2 deletions sampleconfig/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ peer:
# validator sends
buffersize: 100

# milliseconds timeout for producer to send an event.
# timeout duration for producer to send an event.
# if < 0, if buffer full, unblocks immediately and not send
# if 0, if buffer full, will block and guarantee the event will be sent out
# if > 0, if buffer full, blocks till timeout
timeout: 10
timeout: 10ms

# TLS Settings for p2p communications
tls:
Expand Down

0 comments on commit 3f35491

Please sign in to comment.