From 1a4ff3f9c16b1b6e9dd478f8268119b20938453f Mon Sep 17 00:00:00 2001 From: Gari Singh Date: Fri, 27 Sep 2019 10:26:45 -0400 Subject: [PATCH] FAB-16715 Wire endpoint override in UpdateEndpoints Set OrderEndpointOverrides when UpdateEdnpoints is called using the OrderEndpointOverrides initially set for deliverServiceImpl. Also modifed the test to include the override as well as strictly test the UpdateEndpoints function. Change-Id: Ic7bbc4a21761b797110648ad91ac73afd43f52d0 Signed-off-by: Gari Singh --- core/deliverservice/deliveryclient.go | 2 + core/deliverservice/deliveryclient_test.go | 89 ++++--- .../mocks/connection_producer.go | 217 ++++++++++++++++++ 3 files changed, 260 insertions(+), 48 deletions(-) create mode 100644 core/deliverservice/mocks/connection_producer.go diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 0602b6f94ba..dcb6895e362 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -173,6 +173,8 @@ func NewDeliverService(conf *Config, connConfig ConnectionCriteria) (*deliverSer } func (d *deliverServiceImpl) UpdateEndpoints(chainID string, connCriteria ConnectionCriteria) error { + // update the overrides + connCriteria.OrdererEndpointOverrides = d.connConfig.OrdererEndpointOverrides // Use chainID to obtain blocks provider and pass endpoints // for update if dc, ok := d.deliverClients[chainID]; ok { diff --git a/core/deliverservice/deliveryclient_test.go b/core/deliverservice/deliveryclient_test.go index 0ed57f17653..e8c2bbc6834 100644 --- a/core/deliverservice/deliveryclient_test.go +++ b/core/deliverservice/deliveryclient_test.go @@ -28,6 +28,12 @@ import ( "google.golang.org/grpc" ) +//go:generate counterfeiter -o mocks/connection_producer.go -fake-name ConnectionProducer . connectionProducer + +type connectionProducer interface { + comm.ConnectionProducer +} + func init() { msptesttools.LoadMSPSetupForTesting() } @@ -262,59 +268,46 @@ func TestDeliverServiceFailover(t *testing.T) { } func TestDeliverServiceUpdateEndpoints(t *testing.T) { - // TODO: Add test case to check the endpoints update - // Case: Start service with given ordering service endpoint - // send a block, switch to new endpoint and send a new block - // Expected: Delivery service should be able to switch to - // updated endpoint and receive second block within timely manner. - defer ensureNoGoroutineLeak(t)() - - os1 := mocks.NewOrderer(5612, t) - - time.Sleep(time.Second) - gossipServiceAdapter := &mocks.MockGossipServiceAdapter{GossipBlockDisseminations: make(chan uint64)} - - service, err := NewDeliverService(&Config{ - Gossip: gossipServiceAdapter, - CryptoSvc: &mockMCS{}, - ABCFactory: DefaultABCFactory, - ConnFactory: DefaultConnectionFactory, - }, ConnectionCriteria{ - Organizations: []string{"org"}, - OrdererEndpointsByOrg: map[string][]string{ - "org": {"localhost:5612"}, + connProd := &mocks.ConnectionProducer{} + testChannel := "test_channel" + + ds := &deliverServiceImpl{ + connConfig: ConnectionCriteria{ + Organizations: []string{"org1"}, + OrdererEndpointsByOrg: map[string][]string{ + "org1": {"localhost:5612"}, + }, + OrdererEndpointOverrides: map[string]string{ + "localhost:5612": "localhost:5614", + }, }, - }) - defer service.Stop() - - assert.NoError(t, err) - li := &mocks.MockLedgerInfo{Height: uint64(100)} - os1.SetNextExpectedSeek(uint64(100)) - - err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {}) - assert.NoError(t, err, "can't start delivery") - - go os1.SendBlock(uint64(100)) - assertBlockDissemination(100, gossipServiceAdapter.GossipBlockDisseminations, t) + deliverClients: map[string]*deliverClient{ + testChannel: { + bclient: &broadcastClient{ + prod: connProd, + }, + }, + }, + } - os2 := mocks.NewOrderer(5613, t) - defer os2.Shutdown() - os2.SetNextExpectedSeek(uint64(101)) + expectedEC := []comm.EndpointCriteria{ + {Organizations: []string{"org1"}, Endpoint: "localhost:5614"}, + {Organizations: []string{"org2"}, Endpoint: "localhost:5613"}, + } - newConnCriteria := ConnectionCriteria{ - Organizations: []string{"org"}, - OrdererEndpointsByOrg: map[string][]string{ - "org": {"localhost:5613"}, + ds.UpdateEndpoints( + testChannel, + ConnectionCriteria{ + Organizations: []string{"org1", "org2"}, + OrdererEndpointsByOrg: map[string][]string{ + "org1": {"localhost:5612"}, + "org2": {"localhost:5613"}, + }, }, - } - service.UpdateEndpoints("TEST_CHAINID", newConnCriteria) - // Shutdown old ordering service to make sure block will now come from - // updated ordering service - os1.Shutdown() + ) + assert.Equal(t, 1, connProd.UpdateEndpointsCallCount()) + assert.Equal(t, expectedEC, connProd.UpdateEndpointsArgsForCall(0)) - atomic.StoreUint64(&li.Height, uint64(101)) - go os2.SendBlock(uint64(101)) - assertBlockDissemination(101, gossipServiceAdapter.GossipBlockDisseminations, t) } func TestDeliverServiceServiceUnavailable(t *testing.T) { diff --git a/core/deliverservice/mocks/connection_producer.go b/core/deliverservice/mocks/connection_producer.go new file mode 100644 index 00000000000..b1832679ea3 --- /dev/null +++ b/core/deliverservice/mocks/connection_producer.go @@ -0,0 +1,217 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric/core/comm" + "google.golang.org/grpc" +) + +type ConnectionProducer struct { + GetEndpointsStub func() []comm.EndpointCriteria + getEndpointsMutex sync.RWMutex + getEndpointsArgsForCall []struct { + } + getEndpointsReturns struct { + result1 []comm.EndpointCriteria + } + getEndpointsReturnsOnCall map[int]struct { + result1 []comm.EndpointCriteria + } + NewConnectionStub func() (*grpc.ClientConn, string, error) + newConnectionMutex sync.RWMutex + newConnectionArgsForCall []struct { + } + newConnectionReturns struct { + result1 *grpc.ClientConn + result2 string + result3 error + } + newConnectionReturnsOnCall map[int]struct { + result1 *grpc.ClientConn + result2 string + result3 error + } + UpdateEndpointsStub func([]comm.EndpointCriteria) + updateEndpointsMutex sync.RWMutex + updateEndpointsArgsForCall []struct { + arg1 []comm.EndpointCriteria + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *ConnectionProducer) GetEndpoints() []comm.EndpointCriteria { + fake.getEndpointsMutex.Lock() + ret, specificReturn := fake.getEndpointsReturnsOnCall[len(fake.getEndpointsArgsForCall)] + fake.getEndpointsArgsForCall = append(fake.getEndpointsArgsForCall, struct { + }{}) + fake.recordInvocation("GetEndpoints", []interface{}{}) + fake.getEndpointsMutex.Unlock() + if fake.GetEndpointsStub != nil { + return fake.GetEndpointsStub() + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.getEndpointsReturns + return fakeReturns.result1 +} + +func (fake *ConnectionProducer) GetEndpointsCallCount() int { + fake.getEndpointsMutex.RLock() + defer fake.getEndpointsMutex.RUnlock() + return len(fake.getEndpointsArgsForCall) +} + +func (fake *ConnectionProducer) GetEndpointsCalls(stub func() []comm.EndpointCriteria) { + fake.getEndpointsMutex.Lock() + defer fake.getEndpointsMutex.Unlock() + fake.GetEndpointsStub = stub +} + +func (fake *ConnectionProducer) GetEndpointsReturns(result1 []comm.EndpointCriteria) { + fake.getEndpointsMutex.Lock() + defer fake.getEndpointsMutex.Unlock() + fake.GetEndpointsStub = nil + fake.getEndpointsReturns = struct { + result1 []comm.EndpointCriteria + }{result1} +} + +func (fake *ConnectionProducer) GetEndpointsReturnsOnCall(i int, result1 []comm.EndpointCriteria) { + fake.getEndpointsMutex.Lock() + defer fake.getEndpointsMutex.Unlock() + fake.GetEndpointsStub = nil + if fake.getEndpointsReturnsOnCall == nil { + fake.getEndpointsReturnsOnCall = make(map[int]struct { + result1 []comm.EndpointCriteria + }) + } + fake.getEndpointsReturnsOnCall[i] = struct { + result1 []comm.EndpointCriteria + }{result1} +} + +func (fake *ConnectionProducer) NewConnection() (*grpc.ClientConn, string, error) { + fake.newConnectionMutex.Lock() + ret, specificReturn := fake.newConnectionReturnsOnCall[len(fake.newConnectionArgsForCall)] + fake.newConnectionArgsForCall = append(fake.newConnectionArgsForCall, struct { + }{}) + fake.recordInvocation("NewConnection", []interface{}{}) + fake.newConnectionMutex.Unlock() + if fake.NewConnectionStub != nil { + return fake.NewConnectionStub() + } + if specificReturn { + return ret.result1, ret.result2, ret.result3 + } + fakeReturns := fake.newConnectionReturns + return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3 +} + +func (fake *ConnectionProducer) NewConnectionCallCount() int { + fake.newConnectionMutex.RLock() + defer fake.newConnectionMutex.RUnlock() + return len(fake.newConnectionArgsForCall) +} + +func (fake *ConnectionProducer) NewConnectionCalls(stub func() (*grpc.ClientConn, string, error)) { + fake.newConnectionMutex.Lock() + defer fake.newConnectionMutex.Unlock() + fake.NewConnectionStub = stub +} + +func (fake *ConnectionProducer) NewConnectionReturns(result1 *grpc.ClientConn, result2 string, result3 error) { + fake.newConnectionMutex.Lock() + defer fake.newConnectionMutex.Unlock() + fake.NewConnectionStub = nil + fake.newConnectionReturns = struct { + result1 *grpc.ClientConn + result2 string + result3 error + }{result1, result2, result3} +} + +func (fake *ConnectionProducer) NewConnectionReturnsOnCall(i int, result1 *grpc.ClientConn, result2 string, result3 error) { + fake.newConnectionMutex.Lock() + defer fake.newConnectionMutex.Unlock() + fake.NewConnectionStub = nil + if fake.newConnectionReturnsOnCall == nil { + fake.newConnectionReturnsOnCall = make(map[int]struct { + result1 *grpc.ClientConn + result2 string + result3 error + }) + } + fake.newConnectionReturnsOnCall[i] = struct { + result1 *grpc.ClientConn + result2 string + result3 error + }{result1, result2, result3} +} + +func (fake *ConnectionProducer) UpdateEndpoints(arg1 []comm.EndpointCriteria) { + var arg1Copy []comm.EndpointCriteria + if arg1 != nil { + arg1Copy = make([]comm.EndpointCriteria, len(arg1)) + copy(arg1Copy, arg1) + } + fake.updateEndpointsMutex.Lock() + fake.updateEndpointsArgsForCall = append(fake.updateEndpointsArgsForCall, struct { + arg1 []comm.EndpointCriteria + }{arg1Copy}) + fake.recordInvocation("UpdateEndpoints", []interface{}{arg1Copy}) + fake.updateEndpointsMutex.Unlock() + if fake.UpdateEndpointsStub != nil { + fake.UpdateEndpointsStub(arg1) + } +} + +func (fake *ConnectionProducer) UpdateEndpointsCallCount() int { + fake.updateEndpointsMutex.RLock() + defer fake.updateEndpointsMutex.RUnlock() + return len(fake.updateEndpointsArgsForCall) +} + +func (fake *ConnectionProducer) UpdateEndpointsCalls(stub func([]comm.EndpointCriteria)) { + fake.updateEndpointsMutex.Lock() + defer fake.updateEndpointsMutex.Unlock() + fake.UpdateEndpointsStub = stub +} + +func (fake *ConnectionProducer) UpdateEndpointsArgsForCall(i int) []comm.EndpointCriteria { + fake.updateEndpointsMutex.RLock() + defer fake.updateEndpointsMutex.RUnlock() + argsForCall := fake.updateEndpointsArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *ConnectionProducer) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.getEndpointsMutex.RLock() + defer fake.getEndpointsMutex.RUnlock() + fake.newConnectionMutex.RLock() + defer fake.newConnectionMutex.RUnlock() + fake.updateEndpointsMutex.RLock() + defer fake.updateEndpointsMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *ConnectionProducer) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +}