From f9fa8d6aba50623af5bb88d063c0cddb01ab94a1 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Mon, 20 Mar 2017 14:08:41 +0200 Subject: [PATCH] [FAB-2828] Add resilient delivery client to peer This commit adds a resilient delivery client to the peer's deliveryService. This client: - Accepts the following arguments at creation: - Connection Producer (previous change set) - clientFactory (creates AtomicBroadcastClient from ClientConn) - broadcastSetup (function that is going to be used to send a Send() with SeekInfo to orderer) - backoffStrategy - retry logic descriptor, a function that can implement any kind of backoff policy, i.e exponential backoff, etc. etc. - Able to reconnect to the ordering service when the connection is broken - Hides all failure handling and reconnection logic from its user In a later commit I will: - Move the connection creation code that is invoked only once at creation instead of when needed, to the factory of the core/comm/producer.go that was created in the previous commit. - Change the blocksprovider accordingly to use this newly introduced broadcastClient Provided 12 test cases with code coverage of 100% Change-Id: I96a46b76e8fb227eb8bea4c8ded9b788e4fd0eef Signed-off-by: Yacov Manevich --- core/deliverservice/client.go | 170 +++++++++ core/deliverservice/client_test.go | 551 +++++++++++++++++++++++++++ core/deliverservice/mocks/orderer.go | 55 +++ 3 files changed, 776 insertions(+) create mode 100644 core/deliverservice/client.go create mode 100644 core/deliverservice/client_test.go create mode 100644 core/deliverservice/mocks/orderer.go diff --git a/core/deliverservice/client.go b/core/deliverservice/client.go new file mode 100644 index 00000000000..2d6f65577ba --- /dev/null +++ b/core/deliverservice/client.go @@ -0,0 +1,170 @@ +/* +Copyright IBM Corp. 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deliverclient + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/hyperledger/fabric/core/comm" + "github.com/hyperledger/fabric/core/deliverservice/blocksprovider" + "github.com/hyperledger/fabric/protos/common" + "github.com/hyperledger/fabric/protos/orderer" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +// broadcastSetup is a function that is called by the broadcastClient immediately after each +// successful connection to the ordering service +type broadcastSetup func(blocksprovider.BlocksDeliverer) error + +// retryPolicy receives as parameters the number of times the attempt has failed +// and a duration that specifies the total elapsed time passed since the first attempt. +// If further attempts should be made, it returns: +// - a time duration after which the next attempt would be made, true +// Else, a zero duration, false +type retryPolicy func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) + +// clientFactory creates a gRPC broadcast client out of a ClientConn +type clientFactory func(*grpc.ClientConn) orderer.AtomicBroadcastClient + +type broadcastClient struct { + stopFlag int32 + sync.RWMutex + stopChan chan struct{} + createClient clientFactory + shouldRetry retryPolicy + onConnect broadcastSetup + prod comm.ConnectionProducer + blocksprovider.BlocksDeliverer + conn *grpc.ClientConn +} + +// NewBroadcastClient returns a broadcastClient with the given params +func NewBroadcastClient(prod comm.ConnectionProducer, clFactory clientFactory, onConnect broadcastSetup, bos retryPolicy) *broadcastClient { + return &broadcastClient{prod: prod, onConnect: onConnect, shouldRetry: bos, createClient: clFactory, stopChan: make(chan struct{}, 1)} +} + +// Recv receives a message from the ordering service +func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) { + o, err := bc.try(func() (interface{}, error) { + return bc.BlocksDeliverer.Recv() + }) + if err != nil { + return nil, err + } + return o.(*orderer.DeliverResponse), nil +} + +// Send sends a message to the ordering service +func (bc *broadcastClient) Send(msg *common.Envelope) error { + _, err := bc.try(func() (interface{}, error) { + return nil, bc.BlocksDeliverer.Send(msg) + }) + return err +} + +func (bc *broadcastClient) try(action func() (interface{}, error)) (interface{}, error) { + attempt := 0 + start := time.Now() + var backoffDuration time.Duration + retry := true + for retry && !bc.shouldStop() { + attempt++ + resp, err := bc.doAction(action) + if err != nil { + backoffDuration, retry = bc.shouldRetry(attempt, time.Since(start)) + if !retry { + break + } + bc.sleep(backoffDuration) + continue + } + return resp, nil + } + if bc.shouldStop() { + return nil, errors.New("Client is closing") + } + return nil, fmt.Errorf("Attempts (%d) or elapsed time (%v) exhausted", attempt, time.Since(start)) +} + +func (bc *broadcastClient) doAction(action func() (interface{}, error)) (interface{}, error) { + if bc.BlocksDeliverer == nil { + err := bc.connect() + if err != nil { + return nil, err + } + } + resp, err := action() + if err != nil { + bc.conn.Close() + bc.BlocksDeliverer = nil + bc.conn = nil + return nil, err + } + return resp, nil +} + +func (bc *broadcastClient) sleep(duration time.Duration) { + select { + case <-time.After(duration): + case <-bc.stopChan: + } +} + +func (bc *broadcastClient) connect() error { + conn, endpoint, err := bc.prod.NewConnection() + if err != nil { + logger.Error("Failed obtaining connection:", err) + return err + } + abc, err := bc.createClient(conn).Deliver(context.Background()) + if err != nil { + logger.Error("Connection to ", endpoint, "established but was unable to create gRPC stream:", err) + conn.Close() + return err + } + err = bc.onConnect(bc) + if err == nil { + bc.Lock() + bc.conn = conn + bc.Unlock() + bc.BlocksDeliverer = abc + return nil + } + logger.Error("Failed setting up broadcast:", err) + conn.Close() + return err +} + +func (bc *broadcastClient) shouldStop() bool { + return atomic.LoadInt32(&bc.stopFlag) == int32(1) +} + +func (bc *broadcastClient) Close() { + atomic.StoreInt32(&bc.stopFlag, int32(1)) + bc.stopChan <- struct{}{} + bc.RLock() + defer bc.RUnlock() + if bc.conn == nil { + return + } + bc.conn.Close() +} diff --git a/core/deliverservice/client_test.go b/core/deliverservice/client_test.go new file mode 100644 index 00000000000..af932161b98 --- /dev/null +++ b/core/deliverservice/client_test.go @@ -0,0 +1,551 @@ +/* +Copyright IBM Corp. 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deliverclient + +import ( + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/hyperledger/fabric/core/deliverservice/blocksprovider" + "github.com/hyperledger/fabric/core/deliverservice/mocks" + "github.com/hyperledger/fabric/protos/common" + "github.com/hyperledger/fabric/protos/orderer" + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +var connNumber = 0 + +func newConnection() *grpc.ClientConn { + connNumber++ + // The balancer is in order to check connection leaks. + // When grpc.ClientConn.Close() is called, it calls the balancer's Close() + // method which decrements the connNumber + cc, _ := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(&balancer{})) + return cc +} + +type balancer struct { +} + +func (*balancer) Start(target string) error { + return nil +} + +func (*balancer) Up(addr grpc.Address) (down func(error)) { + return func(error) {} +} + +func (*balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr grpc.Address, put func(), err error) { + return grpc.Address{}, func() {}, errors.New("") +} + +func (*balancer) Notify() <-chan []grpc.Address { + return nil +} + +func (*balancer) Close() error { + connNumber-- + return nil +} + +type blocksDelivererConsumer func(blocksprovider.BlocksDeliverer) error + +var blockDelivererConsumerWithRecv = func(bd blocksprovider.BlocksDeliverer) error { + _, err := bd.Recv() + return err +} + +var blockDelivererConsumerWithSend = func(bd blocksprovider.BlocksDeliverer) error { + return bd.Send(&common.Envelope{}) +} + +type atomicBroadcastBroadcastClient struct { +} + +type abc struct { + shouldFail bool + grpc.ClientStream +} + +func (a *abc) Send(*common.Envelope) error { + if a.shouldFail { + return errors.New("Failed sending") + } + return nil +} + +func (a *abc) Recv() (*orderer.DeliverResponse, error) { + if a.shouldFail { + return nil, errors.New("Failed Recv") + } + return &orderer.DeliverResponse{}, nil +} + +type abclient struct { + shouldFail bool + stream *abc +} + +func (ac *abclient) Broadcast(ctx context.Context, opts ...grpc.CallOption) (orderer.AtomicBroadcast_BroadcastClient, error) { + panic("Not implemented") +} + +func (ac *abclient) Deliver(ctx context.Context, opts ...grpc.CallOption) (orderer.AtomicBroadcast_DeliverClient, error) { + if ac.stream != nil { + return ac.stream, nil + } + if ac.shouldFail { + return nil, errors.New("Failed creating ABC") + } + return &abc{}, nil +} + +type connProducer struct { + shouldFail bool + connAttempts int + connTime time.Duration + ordererEndpoint string +} + +func (cp *connProducer) realConnection() (*grpc.ClientConn, string, error) { + cc, err := grpc.Dial(cp.ordererEndpoint, grpc.WithInsecure()) + if err != nil { + return nil, "", err + } + return cc, cp.ordererEndpoint, nil +} + +func (cp *connProducer) NewConnection() (*grpc.ClientConn, string, error) { + time.Sleep(cp.connTime) + cp.connAttempts++ + if cp.ordererEndpoint != "" { + return cp.realConnection() + } + if cp.shouldFail { + return nil, "", errors.New("Failed connecting") + } + return newConnection(), "localhost:5611", nil +} + +// UpdateEndpoints updates the endpoints of the ConnectionProducer +// to be the given endpoints +func (cp *connProducer) UpdateEndpoints(endpoints []string) { + panic("Not implemented") +} + +func TestOrderingServiceConnFailure(t *testing.T) { + t.Parallel() + testOrderingServiceConnFailure(t, blockDelivererConsumerWithRecv) + testOrderingServiceConnFailure(t, blockDelivererConsumerWithSend) + assert.Equal(t, 0, connNumber) +} + +func testOrderingServiceConnFailure(t *testing.T, bdc blocksDelivererConsumer) { + // Scenario: Create a broadcast client and call Recv/Send. + // Connection to the ordering service should be possible only at second attempt + cp := &connProducer{shouldFail: true} + clFactory := func(*grpc.ClientConn) orderer.AtomicBroadcastClient { + return &abclient{} + } + setupInvoked := 0 + setup := func(blocksprovider.BlocksDeliverer) error { + setupInvoked++ + return nil + } + backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) { + // When called with the second attempt (iteration number 1), + // we should be able to connect to the ordering service. + // Set connection provider mock shouldFail flag to false + // to enable next connection attempt to succeed + if attemptNum == 1 { + cp.shouldFail = false + } + + return time.Duration(0), attemptNum < 2 + } + bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy) + defer bc.Close() + err := bdc(bc) + assert.NoError(t, err) + assert.Equal(t, 2, cp.connAttempts) + assert.Equal(t, 1, setupInvoked) +} + +func TestOrderingServiceStreamFailure(t *testing.T) { + testOrderingServiceStreamFailure(t, blockDelivererConsumerWithRecv) + testOrderingServiceStreamFailure(t, blockDelivererConsumerWithSend) + assert.Equal(t, 0, connNumber) +} + +func testOrderingServiceStreamFailure(t *testing.T, bdc blocksDelivererConsumer) { + // Scenario: Create a broadcast client and call Recv/Send. + // Connection to the ordering service should be possible at first attempt, + // but the atomic broadcast client creation fails at first attempt + abcClient := &abclient{shouldFail: true} + cp := &connProducer{} + clFactory := func(*grpc.ClientConn) orderer.AtomicBroadcastClient { + return abcClient + } + setupInvoked := 0 + setup := func(blocksprovider.BlocksDeliverer) error { + setupInvoked++ + return nil + } + backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) { + // When called with the second attempt (iteration number 1), + // we should be able to finally call Deliver() by the atomic broadcast client + if attemptNum == 1 { + abcClient.shouldFail = false + } + return time.Duration(0), attemptNum < 2 + } + bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy) + defer bc.Close() + err := bdc(bc) + assert.NoError(t, err) + assert.Equal(t, 2, cp.connAttempts) + assert.Equal(t, 1, setupInvoked) +} + +func TestOrderingServiceSetupFailure(t *testing.T) { + testOrderingServiceSetupFailure(t, blockDelivererConsumerWithRecv) + testOrderingServiceSetupFailure(t, blockDelivererConsumerWithSend) + assert.Equal(t, 0, connNumber) +} + +func testOrderingServiceSetupFailure(t *testing.T, bdc blocksDelivererConsumer) { + // Scenario: Create a broadcast client and call Recv/Send. + // Connection to the ordering service should be possible, + // the atomic broadcast client creation succeeds, but invoking the setup function + // fails at the first attempt and succeeds at the second one. + cp := &connProducer{} + clFactory := func(*grpc.ClientConn) orderer.AtomicBroadcastClient { + return &abclient{} + } + setupInvoked := 0 + setup := func(blocksprovider.BlocksDeliverer) error { + setupInvoked++ + if setupInvoked == 1 { + return errors.New("Setup failed") + } + return nil + } + backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) { + return time.Duration(0), true + } + bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy) + defer bc.Close() + err := bdc(bc) + assert.NoError(t, err) + assert.Equal(t, 2, cp.connAttempts) + assert.Equal(t, 2, setupInvoked) +} + +func TestOrderingServiceFirstOperationFailure(t *testing.T) { + testOrderingServiceFirstOperationFailure(t, blockDelivererConsumerWithRecv) + testOrderingServiceFirstOperationFailure(t, blockDelivererConsumerWithSend) + assert.Equal(t, 0, connNumber) +} + +func testOrderingServiceFirstOperationFailure(t *testing.T, bdc blocksDelivererConsumer) { + // Scenario: Creation and connection to the ordering service succeeded + // but the first Recv/Send failed. + // The client should reconnect to the ordering service + cp := &connProducer{} + abStream := &abc{shouldFail: true} + abcClient := &abclient{stream: abStream} + clFactory := func(*grpc.ClientConn) orderer.AtomicBroadcastClient { + return abcClient + } + + setupInvoked := 0 + setup := func(blocksprovider.BlocksDeliverer) error { + // Fix stream success logic at 2nd attempt + if setupInvoked == 1 { + abStream.shouldFail = false + } + setupInvoked++ + return nil + } + backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) { + return time.Duration(0), true + } + bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy) + defer bc.Close() + err := bdc(bc) + assert.NoError(t, err) + assert.Equal(t, 2, setupInvoked) + assert.Equal(t, cp.connAttempts, 2) +} + +func TestOrderingServiceCrashAndRecover(t *testing.T) { + testOrderingServiceCrashAndRecover(t, blockDelivererConsumerWithRecv) + testOrderingServiceCrashAndRecover(t, blockDelivererConsumerWithSend) + assert.Equal(t, 0, connNumber) +} + +func testOrderingServiceCrashAndRecover(t *testing.T, bdc blocksDelivererConsumer) { + // Scenario: The ordering service is OK at first usage of Recv/Send, + // but subsequent calls fails because of connection error. + // A reconnect is needed and only then Recv/Send should succeed + cp := &connProducer{} + abStream := &abc{} + abcClient := &abclient{stream: abStream} + clFactory := func(*grpc.ClientConn) orderer.AtomicBroadcastClient { + return abcClient + } + + setupInvoked := 0 + setup := func(blocksprovider.BlocksDeliverer) error { + // Fix stream success logic at 2nd attempt + if setupInvoked == 1 { + abStream.shouldFail = false + } + setupInvoked++ + return nil + } + backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) { + return time.Duration(0), true + } + bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy) + defer bc.Close() + err := bdc(bc) + assert.NoError(t, err) + // Now fail the subsequent Recv/Send + abStream.shouldFail = true + err = bdc(bc) + assert.NoError(t, err) + assert.Equal(t, 2, cp.connAttempts) + assert.Equal(t, 2, setupInvoked) +} + +func TestOrderingServicePermanentCrash(t *testing.T) { + testOrderingServicePermanentCrash(t, blockDelivererConsumerWithRecv) + testOrderingServicePermanentCrash(t, blockDelivererConsumerWithSend) + assert.Equal(t, 0, connNumber) +} + +func testOrderingServicePermanentCrash(t *testing.T, bdc blocksDelivererConsumer) { + // Scenario: The ordering service is OK at first usage of Recv/Send, + // but subsequent calls fail because it crashes. + // The client should give up after 10 attempts in the second reconnect + cp := &connProducer{} + abStream := &abc{} + abcClient := &abclient{stream: abStream} + clFactory := func(*grpc.ClientConn) orderer.AtomicBroadcastClient { + return abcClient + } + + setupInvoked := 0 + setup := func(blocksprovider.BlocksDeliverer) error { + setupInvoked++ + return nil + } + backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) { + return time.Duration(0), attemptNum < 10 + } + bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy) + defer bc.Close() + err := bdc(bc) + assert.NoError(t, err) + // Now fail the subsequent Recv/Send + abStream.shouldFail = true + cp.shouldFail = true + err = bdc(bc) + assert.Error(t, err) + assert.Equal(t, 10, cp.connAttempts) + assert.Equal(t, 1, setupInvoked) +} + +func TestLimitedConnAttempts(t *testing.T) { + testLimitedConnAttempts(t, blockDelivererConsumerWithRecv) + testLimitedConnAttempts(t, blockDelivererConsumerWithSend) + assert.Equal(t, 0, connNumber) +} + +func testLimitedConnAttempts(t *testing.T, bdc blocksDelivererConsumer) { + // Scenario: The ordering service isn't available, and the backoff strategy + // specifies an upper bound of 10 connection attempts + cp := &connProducer{shouldFail: true} + clFactory := func(*grpc.ClientConn) orderer.AtomicBroadcastClient { + return &abclient{} + } + setupInvoked := 0 + setup := func(blocksprovider.BlocksDeliverer) error { + setupInvoked++ + return nil + } + backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) { + return time.Duration(0), attemptNum < 10 + } + bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy) + defer bc.Close() + err := bdc(bc) + assert.Error(t, err) + assert.Equal(t, 10, cp.connAttempts) + assert.Equal(t, 0, setupInvoked) +} + +func TestLimitedTotalConnTimeRcv(t *testing.T) { + t.Parallel() + testLimitedTotalConnTime(t, blockDelivererConsumerWithRecv) + assert.Equal(t, 0, connNumber) +} + +func TestLimitedTotalConnTimeSnd(t *testing.T) { + t.Parallel() + testLimitedTotalConnTime(t, blockDelivererConsumerWithSend) + assert.Equal(t, 0, connNumber) +} + +func testLimitedTotalConnTime(t *testing.T, bdc blocksDelivererConsumer) { + // Scenario: The ordering service isn't available, and the backoff strategy + // specifies an upper bound of 1 second + // The first attempt fails, and the second attempt doesn't take place + // becuse the creation of connection takes 1.5 seconds. + cp := &connProducer{shouldFail: true, connTime: 1500 * time.Millisecond} + clFactory := func(*grpc.ClientConn) orderer.AtomicBroadcastClient { + return &abclient{} + } + setupInvoked := 0 + setup := func(blocksprovider.BlocksDeliverer) error { + setupInvoked++ + return nil + } + backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) { + return 0, elapsedTime.Nanoseconds() < time.Second.Nanoseconds() + } + bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy) + defer bc.Close() + err := bdc(bc) + assert.Error(t, err) + assert.Equal(t, 1, cp.connAttempts) + assert.Equal(t, 0, setupInvoked) +} + +func TestGreenPath(t *testing.T) { + testGreenPath(t, blockDelivererConsumerWithRecv) + testGreenPath(t, blockDelivererConsumerWithSend) + assert.Equal(t, 0, connNumber) +} + +func testGreenPath(t *testing.T, bdc blocksDelivererConsumer) { + // Scenario: Everything succeeds + cp := &connProducer{} + abcClient := &abclient{} + clFactory := func(*grpc.ClientConn) orderer.AtomicBroadcastClient { + return abcClient + } + + setupInvoked := 0 + setup := func(blocksprovider.BlocksDeliverer) error { + setupInvoked++ + return nil + } + backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) { + return time.Duration(0), attemptNum < 1 + } + bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy) + defer bc.Close() + err := bdc(bc) + assert.NoError(t, err) + assert.Equal(t, 1, cp.connAttempts) + assert.Equal(t, 1, setupInvoked) +} + +func TestCloseWhileRecv(t *testing.T) { + t.Parallel() + // Scenario: Recv is being called and after a while, + // the connection is closed. + // The Recv should return immediately in such a case + fakeOrderer := mocks.NewOrderer(5611) + time.Sleep(time.Second) + defer fakeOrderer.Shutdown() + cp := &connProducer{ordererEndpoint: "localhost:5611"} + clFactory := func(conn *grpc.ClientConn) orderer.AtomicBroadcastClient { + return orderer.NewAtomicBroadcastClient(conn) + } + + setup := func(blocksprovider.BlocksDeliverer) error { + return nil + } + backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) { + return 0, true + } + bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy) + var flag int32 + time.AfterFunc(time.Second, func() { + atomic.StoreInt32(&flag, int32(1)) + bc.Close() + }) + resp, err := bc.Recv() + // Ensure we returned because bc.Close() was called and not because some other reason + assert.Equal(t, int32(1), atomic.LoadInt32(&flag), "Recv returned before bc.Close() was called") + assert.Nil(t, resp) + assert.Error(t, err) + assert.Contains(t, "Client is closing", err.Error()) +} + +func TestCloseWhileSleep(t *testing.T) { + t.Parallel() + testCloseWhileSleep(t, blockDelivererConsumerWithRecv) + testCloseWhileSleep(t, blockDelivererConsumerWithSend) + assert.Equal(t, 0, connNumber) +} + +func testCloseWhileSleep(t *testing.T, bdc blocksDelivererConsumer) { + // Scenario: Recv/Send is being called while sleeping because + // of the backoff policy is programmed to sleep 1000000 seconds + // between retries. + // The Recv/Send should return pretty soon + cp := &connProducer{} + abcClient := &abclient{shouldFail: true} + clFactory := func(*grpc.ClientConn) orderer.AtomicBroadcastClient { + return abcClient + } + + setupInvoked := 0 + setup := func(blocksprovider.BlocksDeliverer) error { + setupInvoked++ + return nil + } + var wg sync.WaitGroup + wg.Add(1) + backoffStrategy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) { + if attemptNum == 1 { + go func() { + time.Sleep(time.Second) + wg.Done() + }() + } + return time.Second * 1000000, true + } + bc := NewBroadcastClient(cp, clFactory, setup, backoffStrategy) + go func() { + wg.Wait() + bc.Close() + }() + err := bdc(bc) + assert.Error(t, err) + assert.Equal(t, 1, cp.connAttempts) + assert.Equal(t, 0, setupInvoked) +} diff --git a/core/deliverservice/mocks/orderer.go b/core/deliverservice/mocks/orderer.go new file mode 100644 index 00000000000..77fbc8905f2 --- /dev/null +++ b/core/deliverservice/mocks/orderer.go @@ -0,0 +1,55 @@ +/* +Copyright IBM Corp. 2017 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mocks + +import ( + "fmt" + "net" + + "github.com/hyperledger/fabric/protos/orderer" + "google.golang.org/grpc" +) + +type Orderer struct { + net.Listener + *grpc.Server +} + +func NewOrderer(port int) *Orderer { + srv := grpc.NewServer() + lsnr, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) + if err != nil { + panic(err) + } + go srv.Serve(lsnr) + o := &Orderer{Server: srv, Listener: lsnr} + orderer.RegisterAtomicBroadcastServer(srv, o) + return o +} + +func (o *Orderer) Shutdown() { + o.Server.Stop() + o.Listener.Close() +} + +func (*Orderer) Broadcast(orderer.AtomicBroadcast_BroadcastServer) error { + panic("Should not have ben called") +} + +func (*Orderer) Deliver(orderer.AtomicBroadcast_DeliverServer) error { + return nil +}