diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 0602b6f94ba..dcb6895e362 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -173,6 +173,8 @@ func NewDeliverService(conf *Config, connConfig ConnectionCriteria) (*deliverSer } func (d *deliverServiceImpl) UpdateEndpoints(chainID string, connCriteria ConnectionCriteria) error { + // update the overrides + connCriteria.OrdererEndpointOverrides = d.connConfig.OrdererEndpointOverrides // Use chainID to obtain blocks provider and pass endpoints // for update if dc, ok := d.deliverClients[chainID]; ok { diff --git a/core/deliverservice/deliveryclient_test.go b/core/deliverservice/deliveryclient_test.go index 0ed57f17653..e8c2bbc6834 100644 --- a/core/deliverservice/deliveryclient_test.go +++ b/core/deliverservice/deliveryclient_test.go @@ -28,6 +28,12 @@ import ( "google.golang.org/grpc" ) +//go:generate counterfeiter -o mocks/connection_producer.go -fake-name ConnectionProducer . connectionProducer + +type connectionProducer interface { + comm.ConnectionProducer +} + func init() { msptesttools.LoadMSPSetupForTesting() } @@ -262,59 +268,46 @@ func TestDeliverServiceFailover(t *testing.T) { } func TestDeliverServiceUpdateEndpoints(t *testing.T) { - // TODO: Add test case to check the endpoints update - // Case: Start service with given ordering service endpoint - // send a block, switch to new endpoint and send a new block - // Expected: Delivery service should be able to switch to - // updated endpoint and receive second block within timely manner. - defer ensureNoGoroutineLeak(t)() - - os1 := mocks.NewOrderer(5612, t) - - time.Sleep(time.Second) - gossipServiceAdapter := &mocks.MockGossipServiceAdapter{GossipBlockDisseminations: make(chan uint64)} - - service, err := NewDeliverService(&Config{ - Gossip: gossipServiceAdapter, - CryptoSvc: &mockMCS{}, - ABCFactory: DefaultABCFactory, - ConnFactory: DefaultConnectionFactory, - }, ConnectionCriteria{ - Organizations: []string{"org"}, - OrdererEndpointsByOrg: map[string][]string{ - "org": {"localhost:5612"}, + connProd := &mocks.ConnectionProducer{} + testChannel := "test_channel" + + ds := &deliverServiceImpl{ + connConfig: ConnectionCriteria{ + Organizations: []string{"org1"}, + OrdererEndpointsByOrg: map[string][]string{ + "org1": {"localhost:5612"}, + }, + OrdererEndpointOverrides: map[string]string{ + "localhost:5612": "localhost:5614", + }, }, - }) - defer service.Stop() - - assert.NoError(t, err) - li := &mocks.MockLedgerInfo{Height: uint64(100)} - os1.SetNextExpectedSeek(uint64(100)) - - err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {}) - assert.NoError(t, err, "can't start delivery") - - go os1.SendBlock(uint64(100)) - assertBlockDissemination(100, gossipServiceAdapter.GossipBlockDisseminations, t) + deliverClients: map[string]*deliverClient{ + testChannel: { + bclient: &broadcastClient{ + prod: connProd, + }, + }, + }, + } - os2 := mocks.NewOrderer(5613, t) - defer os2.Shutdown() - os2.SetNextExpectedSeek(uint64(101)) + expectedEC := []comm.EndpointCriteria{ + {Organizations: []string{"org1"}, Endpoint: "localhost:5614"}, + {Organizations: []string{"org2"}, Endpoint: "localhost:5613"}, + } - newConnCriteria := ConnectionCriteria{ - Organizations: []string{"org"}, - OrdererEndpointsByOrg: map[string][]string{ - "org": {"localhost:5613"}, + ds.UpdateEndpoints( + testChannel, + ConnectionCriteria{ + Organizations: []string{"org1", "org2"}, + OrdererEndpointsByOrg: map[string][]string{ + "org1": {"localhost:5612"}, + "org2": {"localhost:5613"}, + }, }, - } - service.UpdateEndpoints("TEST_CHAINID", newConnCriteria) - // Shutdown old ordering service to make sure block will now come from - // updated ordering service - os1.Shutdown() + ) + assert.Equal(t, 1, connProd.UpdateEndpointsCallCount()) + assert.Equal(t, expectedEC, connProd.UpdateEndpointsArgsForCall(0)) - atomic.StoreUint64(&li.Height, uint64(101)) - go os2.SendBlock(uint64(101)) - assertBlockDissemination(101, gossipServiceAdapter.GossipBlockDisseminations, t) } func TestDeliverServiceServiceUnavailable(t *testing.T) { diff --git a/core/deliverservice/mocks/connection_producer.go b/core/deliverservice/mocks/connection_producer.go new file mode 100644 index 00000000000..b1832679ea3 --- /dev/null +++ b/core/deliverservice/mocks/connection_producer.go @@ -0,0 +1,217 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package mocks + +import ( + "sync" + + "github.com/hyperledger/fabric/core/comm" + "google.golang.org/grpc" +) + +type ConnectionProducer struct { + GetEndpointsStub func() []comm.EndpointCriteria + getEndpointsMutex sync.RWMutex + getEndpointsArgsForCall []struct { + } + getEndpointsReturns struct { + result1 []comm.EndpointCriteria + } + getEndpointsReturnsOnCall map[int]struct { + result1 []comm.EndpointCriteria + } + NewConnectionStub func() (*grpc.ClientConn, string, error) + newConnectionMutex sync.RWMutex + newConnectionArgsForCall []struct { + } + newConnectionReturns struct { + result1 *grpc.ClientConn + result2 string + result3 error + } + newConnectionReturnsOnCall map[int]struct { + result1 *grpc.ClientConn + result2 string + result3 error + } + UpdateEndpointsStub func([]comm.EndpointCriteria) + updateEndpointsMutex sync.RWMutex + updateEndpointsArgsForCall []struct { + arg1 []comm.EndpointCriteria + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *ConnectionProducer) GetEndpoints() []comm.EndpointCriteria { + fake.getEndpointsMutex.Lock() + ret, specificReturn := fake.getEndpointsReturnsOnCall[len(fake.getEndpointsArgsForCall)] + fake.getEndpointsArgsForCall = append(fake.getEndpointsArgsForCall, struct { + }{}) + fake.recordInvocation("GetEndpoints", []interface{}{}) + fake.getEndpointsMutex.Unlock() + if fake.GetEndpointsStub != nil { + return fake.GetEndpointsStub() + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.getEndpointsReturns + return fakeReturns.result1 +} + +func (fake *ConnectionProducer) GetEndpointsCallCount() int { + fake.getEndpointsMutex.RLock() + defer fake.getEndpointsMutex.RUnlock() + return len(fake.getEndpointsArgsForCall) +} + +func (fake *ConnectionProducer) GetEndpointsCalls(stub func() []comm.EndpointCriteria) { + fake.getEndpointsMutex.Lock() + defer fake.getEndpointsMutex.Unlock() + fake.GetEndpointsStub = stub +} + +func (fake *ConnectionProducer) GetEndpointsReturns(result1 []comm.EndpointCriteria) { + fake.getEndpointsMutex.Lock() + defer fake.getEndpointsMutex.Unlock() + fake.GetEndpointsStub = nil + fake.getEndpointsReturns = struct { + result1 []comm.EndpointCriteria + }{result1} +} + +func (fake *ConnectionProducer) GetEndpointsReturnsOnCall(i int, result1 []comm.EndpointCriteria) { + fake.getEndpointsMutex.Lock() + defer fake.getEndpointsMutex.Unlock() + fake.GetEndpointsStub = nil + if fake.getEndpointsReturnsOnCall == nil { + fake.getEndpointsReturnsOnCall = make(map[int]struct { + result1 []comm.EndpointCriteria + }) + } + fake.getEndpointsReturnsOnCall[i] = struct { + result1 []comm.EndpointCriteria + }{result1} +} + +func (fake *ConnectionProducer) NewConnection() (*grpc.ClientConn, string, error) { + fake.newConnectionMutex.Lock() + ret, specificReturn := fake.newConnectionReturnsOnCall[len(fake.newConnectionArgsForCall)] + fake.newConnectionArgsForCall = append(fake.newConnectionArgsForCall, struct { + }{}) + fake.recordInvocation("NewConnection", []interface{}{}) + fake.newConnectionMutex.Unlock() + if fake.NewConnectionStub != nil { + return fake.NewConnectionStub() + } + if specificReturn { + return ret.result1, ret.result2, ret.result3 + } + fakeReturns := fake.newConnectionReturns + return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3 +} + +func (fake *ConnectionProducer) NewConnectionCallCount() int { + fake.newConnectionMutex.RLock() + defer fake.newConnectionMutex.RUnlock() + return len(fake.newConnectionArgsForCall) +} + +func (fake *ConnectionProducer) NewConnectionCalls(stub func() (*grpc.ClientConn, string, error)) { + fake.newConnectionMutex.Lock() + defer fake.newConnectionMutex.Unlock() + fake.NewConnectionStub = stub +} + +func (fake *ConnectionProducer) NewConnectionReturns(result1 *grpc.ClientConn, result2 string, result3 error) { + fake.newConnectionMutex.Lock() + defer fake.newConnectionMutex.Unlock() + fake.NewConnectionStub = nil + fake.newConnectionReturns = struct { + result1 *grpc.ClientConn + result2 string + result3 error + }{result1, result2, result3} +} + +func (fake *ConnectionProducer) NewConnectionReturnsOnCall(i int, result1 *grpc.ClientConn, result2 string, result3 error) { + fake.newConnectionMutex.Lock() + defer fake.newConnectionMutex.Unlock() + fake.NewConnectionStub = nil + if fake.newConnectionReturnsOnCall == nil { + fake.newConnectionReturnsOnCall = make(map[int]struct { + result1 *grpc.ClientConn + result2 string + result3 error + }) + } + fake.newConnectionReturnsOnCall[i] = struct { + result1 *grpc.ClientConn + result2 string + result3 error + }{result1, result2, result3} +} + +func (fake *ConnectionProducer) UpdateEndpoints(arg1 []comm.EndpointCriteria) { + var arg1Copy []comm.EndpointCriteria + if arg1 != nil { + arg1Copy = make([]comm.EndpointCriteria, len(arg1)) + copy(arg1Copy, arg1) + } + fake.updateEndpointsMutex.Lock() + fake.updateEndpointsArgsForCall = append(fake.updateEndpointsArgsForCall, struct { + arg1 []comm.EndpointCriteria + }{arg1Copy}) + fake.recordInvocation("UpdateEndpoints", []interface{}{arg1Copy}) + fake.updateEndpointsMutex.Unlock() + if fake.UpdateEndpointsStub != nil { + fake.UpdateEndpointsStub(arg1) + } +} + +func (fake *ConnectionProducer) UpdateEndpointsCallCount() int { + fake.updateEndpointsMutex.RLock() + defer fake.updateEndpointsMutex.RUnlock() + return len(fake.updateEndpointsArgsForCall) +} + +func (fake *ConnectionProducer) UpdateEndpointsCalls(stub func([]comm.EndpointCriteria)) { + fake.updateEndpointsMutex.Lock() + defer fake.updateEndpointsMutex.Unlock() + fake.UpdateEndpointsStub = stub +} + +func (fake *ConnectionProducer) UpdateEndpointsArgsForCall(i int) []comm.EndpointCriteria { + fake.updateEndpointsMutex.RLock() + defer fake.updateEndpointsMutex.RUnlock() + argsForCall := fake.updateEndpointsArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *ConnectionProducer) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.getEndpointsMutex.RLock() + defer fake.getEndpointsMutex.RUnlock() + fake.newConnectionMutex.RLock() + defer fake.newConnectionMutex.RUnlock() + fake.updateEndpointsMutex.RLock() + defer fake.updateEndpointsMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *ConnectionProducer) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +}