diff --git a/fabric-client/events/eventhub.go b/fabric-client/events/eventhub.go index afdfad66cc..749bf73ece 100644 --- a/fabric-client/events/eventhub.go +++ b/fabric-client/events/eventhub.go @@ -46,13 +46,13 @@ type EventHub interface { SetPeerAddr(peerURL string, certificate string, serverHostOverride string) IsConnected() bool Connect() error + Disconnect() RegisterChaincodeEvent(ccid string, eventname string, callback func(*ChaincodeEvent)) *ChainCodeCBE UnregisterChaincodeEvent(cbe *ChainCodeCBE) RegisterTxEvent(txID string, callback func(string, error)) UnregisterTxEvent(txID string) RegisterBlockEvent(callback func(*common.Block)) UnregisterBlockEvent(callback func(*common.Block)) - Disconnect() } // The EventHubExt interface allows extensions of the SDK to add functionality to EventHub overloads. @@ -156,13 +156,13 @@ func (eventHub *eventHub) SetInterests(block bool) { // Disconnect disconnects from peer event source func (eventHub *eventHub) Disconnect() { + eventHub.mtx.Lock() + defer eventHub.mtx.Unlock() + if !eventHub.connected { return } - eventHub.mtx.Lock() - defer eventHub.mtx.Unlock() - // Unregister interests with server and stop the stream eventHub.client.UnregisterAsync(eventHub.interestedEvents) eventHub.client.Stop() @@ -251,6 +251,9 @@ func (eventHub *eventHub) IsConnected() bool { */ func (eventHub *eventHub) Connect() error { + eventHub.mtx.Lock() + defer eventHub.mtx.Unlock() + if eventHub.connected { logger.Debugf("Nothing to do - EventHub already connected") return nil @@ -260,9 +263,6 @@ func (eventHub *eventHub) Connect() error { return fmt.Errorf("eventHub.peerAddr is empty") } - eventHub.mtx.Lock() - defer eventHub.mtx.Unlock() - if eventHub.client == nil { eventsClient, _ := eventHub.eventsClientFactory.newEventsClient(eventHub.peerAddr, eventHub.peerTLSCertificate, eventHub.peerTLSServerHostOverride, 5, eventHub) eventHub.client = eventsClient @@ -316,13 +316,13 @@ func (eventHub *eventHub) Recv(msg *pb.Event) (bool, error) { // Disconnected implements consumer.EventAdapter interface for receiving events func (eventHub *eventHub) Disconnected(err error) { + eventHub.mtx.Lock() + defer eventHub.mtx.Unlock() + if !eventHub.connected { return } - eventHub.mtx.Lock() - defer eventHub.mtx.Unlock() - eventHub.client.Stop() eventHub.connected = false }