Skip to content

Commit

Permalink
[FAB-9036] Source URL and block num in events
Browse files Browse the repository at this point in the history
Block number is added to the Tx Status and
CC event. Also added the URL of the peer
that produced the event to all events.

Change-Id: I9170e0a85279b87d7ba1b3e8d15fe13e6532d72b
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Mar 21, 2018
1 parent 4d29ac5 commit 1b6dbd4
Show file tree
Hide file tree
Showing 28 changed files with 436 additions and 207 deletions.
14 changes: 14 additions & 0 deletions pkg/common/providers/fab/eventservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ import (
type BlockEvent struct {
// Block is the block that was committed
Block *cb.Block
// SourceURL specifies the URL of the peer that produced the event
SourceURL string
}

// FilteredBlockEvent contains the data for a filtered block event
type FilteredBlockEvent struct {
// FilteredBlock contains a filtered version of the block that was committed
FilteredBlock *pb.FilteredBlock
// SourceURL specifies the URL of the peer that produced the event
SourceURL string
}

// TxStatusEvent contains the data for a transaction status event
Expand All @@ -29,6 +33,11 @@ type TxStatusEvent struct {
TxID string
// TxValidationCode is the status code of the commit
TxValidationCode pb.TxValidationCode
// BlockNumber contains the block number in which the
// transaction was committed
BlockNumber uint64
// SourceURL specifies the URL of the peer that produced the event
SourceURL string
}

// CCEvent contains the data for a chaincode event
Expand All @@ -42,6 +51,11 @@ type CCEvent struct {
// Payload contains the payload of the chaincode event
// NOTE: Payload will be nil for filtered events
Payload []byte
// BlockNumber contains the block number in which the
// chaincode event was committed
BlockNumber uint64
// SourceURL specifies the URL of the peer that produced the event
SourceURL string
}

// Registration is a handle that is returned from a successful RegisterXXXEvent.
Expand Down
49 changes: 30 additions & 19 deletions pkg/fab/events/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ const (
var (
peer1 = fabmocks.NewMockPeer("peer1", "grpcs://peer1.example.com:7051")
peer2 = fabmocks.NewMockPeer("peer2", "grpcs://peer2.example.com:7051")

sourceURL = "localhost:9051"
)

func TestConnect(t *testing.T) {
connectionProvider := clientmocks.NewProviderFactory().Provider(
clientmocks.NewMockConnection(
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
),
)

Expand Down Expand Up @@ -90,7 +92,7 @@ func TestFailConnect(t *testing.T) {
fabmocks.NewMockChannelCfg("mychannel"),
mockconn.NewProviderFactory().Provider(
mockconn.NewMockConnection(
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
),
),
failAfterConnectClientProvider, []options.Opt{},
Expand All @@ -111,7 +113,7 @@ func TestCallsOnClosedClient(t *testing.T) {
),
fabmocks.NewMockChannelCfg("mychannel"),
filteredClientProvider,
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand Down Expand Up @@ -153,7 +155,7 @@ func TestCloseIfIdle(t *testing.T) {
),
fabmocks.NewMockChannelCfg(channelID),
clientProvider,
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand Down Expand Up @@ -187,7 +189,7 @@ func TestInvalidUnregister(t *testing.T) {
),
fabmocks.NewMockChannelCfg(channelID),
filteredClientProvider,
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand All @@ -210,7 +212,7 @@ func TestUnauthorizedBlockEvents(t *testing.T) {
),
fabmocks.NewMockChannelCfg(channelID),
filteredClientProvider,
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand All @@ -234,7 +236,7 @@ func TestBlockEvents(t *testing.T) {
),
fabmocks.NewMockChannelCfg(channelID),
clientProvider,
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand Down Expand Up @@ -292,7 +294,7 @@ func TestFilteredBlockEvents(t *testing.T) {
),
fabmocks.NewMockChannelCfg(channelID),
filteredClientProvider,
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand Down Expand Up @@ -369,7 +371,7 @@ func TestBlockAndFilteredBlockEvents(t *testing.T) {
),
fabmocks.NewMockChannelCfg(channelID),
clientProvider,
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand Down Expand Up @@ -447,7 +449,7 @@ func TestTxStatusEvents(t *testing.T) {
),
fabmocks.NewMockChannelCfg(channelID),
filteredClientProvider,
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand Down Expand Up @@ -531,7 +533,7 @@ func TestCCEvents(t *testing.T) {
),
fabmocks.NewMockChannelCfg(channelID),
filteredClientProvider,
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory)),
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL)),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand Down Expand Up @@ -731,7 +733,7 @@ func TestConcurrentEvents(t *testing.T) {
[]options.Opt{
esdispatcher.WithEventConsumerBufferSize(uint(numEvents) * 4),
},
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)),
mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)),
)
if err != nil {
t.Fatalf("error creating channel event client: %s", err)
Expand Down Expand Up @@ -885,14 +887,19 @@ func listenFilteredBlockEvents(channelID string, eventch <-chan *fab.FilteredBlo

func listenChaincodeEvents(channelID string, eventch <-chan *fab.CCEvent, expected int, errch chan<- error) {
numReceived := 0
lastBlockNum := uint64(0)

for {
select {
case _, ok := <-eventch:
case event, ok := <-eventch:
if !ok {
fmt.Printf("CC events channel was closed \n")
return
}
if event.BlockNumber > 0 && event.BlockNumber <= lastBlockNum {
errch <- errors.Errorf("Expected block greater than [%d] but received [%d]", lastBlockNum, event.BlockNumber)
return
}
numReceived++
case <-time.After(5 * time.Second):
if numReceived != expected {
Expand All @@ -919,7 +926,7 @@ func txStatusTest(eventClient *Client, ledger servicemocks.Ledger, channelID str
var receivedEvents int

for i := 0; i < expected; i++ {
txID := fmt.Sprintf("txid_tx_%d", i)
txID := fmt.Sprintf("TxID_%d", i)
go func() {
defer wg.Done()

Expand All @@ -932,18 +939,22 @@ func txStatusTest(eventClient *Client, ledger servicemocks.Ledger, channelID str
}
defer eventClient.Unregister(reg)

ledger.NewBlock(channelID,
block := ledger.NewBlock(channelID,
servicemocks.NewTransactionWithCCEvent(txID, pb.TxValidationCode_VALID, ccID, event1, payload1),
)

select {
case _, ok := <-txeventch:
case event, ok := <-txeventch:
mutex.Lock()
if !ok {
errs = append(errs, errors.New("unexpected closed channel"))
} else {
receivedEvents++
}
if event.BlockNumber != block.Number() {
errch <- errors.Errorf("Expected block number [%d] but received [%d]", block.Number(), event.BlockNumber)
return
}
mutex.Unlock()
case <-time.After(5 * time.Second):
mutex.Lock()
Expand Down Expand Up @@ -971,7 +982,7 @@ func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome mockconn
clientmocks.NewDiscoveryProvider(peer1, peer2),
),
fabmocks.NewMockChannelCfg("mychannel"),
cp.FlakeyProvider(connAttemptResult, mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory))),
cp.FlakeyProvider(connAttemptResult, mockconn.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL))),
clientProvider,
[]options.Opt{
esdispatcher.WithEventConsumerTimeout(time.Second),
Expand Down Expand Up @@ -1000,7 +1011,7 @@ func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expe

connectch := make(chan *dispatcher.ConnectionEvent)

ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory)
ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)

eventClient, _, err := newClientWithMockConnAndOpts(
fabmocks.NewMockContextWithCustomDiscovery(
Expand Down Expand Up @@ -1055,7 +1066,7 @@ func testReconnectRegistration(t *testing.T, expectedBlockEvents mockconn.NumBlo
channelID := "mychannel"
ccID := "mycc"

ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory)
ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL)

cp := mockconn.NewProviderFactory()

Expand Down
8 changes: 5 additions & 3 deletions pkg/fab/events/client/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
var (
peer1 = fabmocks.NewMockPeer("peer1", "grpcs://peer1.example.com:7051")
peer2 = fabmocks.NewMockPeer("peer2", "grpcs://peer2.example.com:7051")

sourceURL = "localhost:9051"
)

func TestConnect(t *testing.T) {
Expand All @@ -38,7 +40,7 @@ func TestConnect(t *testing.T) {
clientmocks.NewProviderFactory().Provider(
clientmocks.NewMockConnection(
clientmocks.WithLedger(
servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory),
servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL),
),
),
),
Expand Down Expand Up @@ -117,7 +119,7 @@ func TestConnectNoPeers(t *testing.T) {
clientmocks.NewProviderFactory().Provider(
clientmocks.NewMockConnection(
clientmocks.WithLedger(
servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory),
servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory, sourceURL),
),
),
),
Expand Down Expand Up @@ -160,7 +162,7 @@ func TestConnectionEvent(t *testing.T) {
clientmocks.NewProviderFactory().Provider(
clientmocks.NewMockConnection(
clientmocks.WithLedger(
servicemocks.NewMockLedger(servicemocks.BlockEventFactory),
servicemocks.NewMockLedger(servicemocks.BlockEventFactory, sourceURL),
),
),
),
Expand Down
29 changes: 29 additions & 0 deletions pkg/fab/events/client/mocks/mockconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,15 @@ type MockConnection struct {
producerch <-chan interface{}
rcvch chan interface{}
closed int32
sourceURL string
}

// Opts contains mock connection options
type Opts struct {
Ledger servicemocks.Ledger
Operations OperationMap
Factory ConnectionFactory
SourceURL string
}

// NewMockConnection returns a new MockConnection using the given options
Expand All @@ -115,13 +117,19 @@ func NewMockConnection(opts ...Opt) *MockConnection {
panic("ledger is nil")
}

sourceURL := copts.SourceURL
if sourceURL == "" {
sourceURL = "localhost:9051"
}

producer := servicemocks.NewMockProducer(copts.Ledger)

c := &MockConnection{
producer: producer,
producerch: producer.Register(),
rcvch: make(chan interface{}),
operations: operations,
sourceURL: sourceURL,
}
return c
}
Expand Down Expand Up @@ -178,6 +186,11 @@ func (c *MockConnection) Ledger() servicemocks.Ledger {
return c.producer.Ledger()
}

// SourceURL returns the event source
func (c *MockConnection) SourceURL() string {
return c.sourceURL
}

// ProviderFactory creates various mock MockConnection Providers
type ProviderFactory struct {
connection Connection
Expand Down Expand Up @@ -289,6 +302,13 @@ func NewResult(operation Operation, result Result, errMsg ...string) *OperationR
}
}

// WithSourceURL provides the mock connection with an event source
func WithSourceURL(sourceURL string) Opt {
return func(opts *Opts) {
opts.SourceURL = sourceURL
}
}

// WithLedger provides the mock connection with a ledger
func WithLedger(ledger servicemocks.Ledger) Opt {
return func(opts *Opts) {
Expand Down Expand Up @@ -318,3 +338,12 @@ func newDeliverStatusResponse(status cb.Status) *pb.DeliverResponse_Status {
Status: status,
}
}

type eventSource struct {
url string
}

// URL returns the URL of the peer that published the event
func (es *eventSource) URL() string {
return es.url
}
18 changes: 17 additions & 1 deletion pkg/fab/events/deliverclient/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type deliverStream interface {
// DeliverConnection manages the connection to the deliver server
type DeliverConnection struct {
comm.GRPCConnection
url string
}

// StreamProvider creates a deliver stream
Expand Down Expand Up @@ -73,6 +74,7 @@ func New(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvider StreamPr

return &DeliverConnection{
GRPCConnection: *connect,
url: url,
}, nil
}

Expand Down Expand Up @@ -133,7 +135,7 @@ func (c *DeliverConnection) Receive(eventch chan<- interface{}) {
break
}

eventch <- in
eventch <- NewEvent(in, c.url)
}
logger.Debugf("Exiting stream listener")
}
Expand Down Expand Up @@ -178,3 +180,17 @@ func (c *DeliverConnection) createSignedEnvelope(msg proto.Message) (*cb.Envelop

return &cb.Envelope{Payload: paylBytes, Signature: signature}, nil
}

// Event contains the deliver event as well as the event source
type Event struct {
SourceURL string
Event interface{}
}

// NewEvent returns a deliver event
func NewEvent(event interface{}, sourceURL string) *Event {
return &Event{
SourceURL: sourceURL,
Event: event,
}
}
Loading

0 comments on commit 1b6dbd4

Please sign in to comment.