Skip to content

Commit c0f8d75

Browse files
committed
[FAB-3034] Fix bug in delivery client - recursive usage
The delivery client object encapsulates a delivery stream from the peer to the ordering service. It accepts a function in construction time that is invoked upon each successful (re)connection to the ordering service. The purpose of this design, is to make the peer send a SeekInfo message to the orderer, if it disconnects (or, if the orderer crashes and the peer connects to a new orderer). This function may also use the client *itself*. This commit fixes a bug that prevented the usage of the client in the function, because the connection reference and the stream reference were updated only after a successful invocation of the function, and thus- caused recursive reconnection attempts. It also contains a test case for this scenario, which spawns a gRPC server that emulates the ordering service. The ordering service emulation will be used in the next change set to test complex test cases of the delivery client itself Signed-off-by: Yacov Manevich <yacovm@il.ibm.com> Change-Id: I64e2985c227f0f052399e25a047a5984fdb1ba0b
1 parent 4cf2b8f commit c0f8d75

File tree

3 files changed

+189
-23
lines changed

3 files changed

+189
-23
lines changed

core/deliverservice/client.go

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ type clientFactory func(*grpc.ClientConn) orderer.AtomicBroadcastClient
4747

4848
type broadcastClient struct {
4949
stopFlag int32
50-
sync.RWMutex
50+
sync.Mutex
5151
stopChan chan struct{}
5252
createClient clientFactory
5353
shouldRetry retryPolicy
5454
onConnect broadcastSetup
5555
prod comm.ConnectionProducer
5656
blocksprovider.BlocksDeliverer
57-
conn *grpc.ClientConn
57+
conn *connection
5858
}
5959

6060
// NewBroadcastClient returns a broadcastClient with the given params
@@ -65,6 +65,9 @@ func NewBroadcastClient(prod comm.ConnectionProducer, clFactory clientFactory, o
6565
// Recv receives a message from the ordering service
6666
func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) {
6767
o, err := bc.try(func() (interface{}, error) {
68+
if bc.shouldStop() {
69+
return nil, errors.New("closing")
70+
}
6871
return bc.BlocksDeliverer.Recv()
6972
})
7073
if err != nil {
@@ -76,6 +79,9 @@ func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) {
7679
// Send sends a message to the ordering service
7780
func (bc *broadcastClient) Send(msg *common.Envelope) error {
7881
_, err := bc.try(func() (interface{}, error) {
82+
if bc.shouldStop() {
83+
return nil, errors.New("closing")
84+
}
7985
return nil, bc.BlocksDeliverer.Send(msg)
8086
})
8187
return err
@@ -106,17 +112,15 @@ func (bc *broadcastClient) try(action func() (interface{}, error)) (interface{},
106112
}
107113

108114
func (bc *broadcastClient) doAction(action func() (interface{}, error)) (interface{}, error) {
109-
if bc.BlocksDeliverer == nil {
115+
if bc.conn == nil {
110116
err := bc.connect()
111117
if err != nil {
112118
return nil, err
113119
}
114120
}
115121
resp, err := action()
116122
if err != nil {
117-
bc.conn.Close()
118-
bc.BlocksDeliverer = nil
119-
bc.conn = nil
123+
bc.disconnect()
120124
return nil, err
121125
}
122126
return resp, nil
@@ -141,16 +145,45 @@ func (bc *broadcastClient) connect() error {
141145
conn.Close()
142146
return err
143147
}
144-
err = bc.onConnect(bc)
148+
err = bc.afterConnect(conn, abc)
145149
if err == nil {
146-
bc.Lock()
147-
bc.conn = conn
150+
return nil
151+
}
152+
// If we reached here, lets make sure connection is closed
153+
// and nullified before we return
154+
bc.disconnect()
155+
return err
156+
}
157+
158+
func (bc *broadcastClient) afterConnect(conn *grpc.ClientConn, abc orderer.AtomicBroadcast_DeliverClient) error {
159+
bc.Lock()
160+
bc.conn = &connection{ClientConn: conn}
161+
bc.BlocksDeliverer = abc
162+
if bc.shouldStop() {
148163
bc.Unlock()
149-
bc.BlocksDeliverer = abc
164+
return errors.New("closing")
165+
}
166+
bc.Unlock()
167+
// If the client is closed at this point- before onConnect,
168+
// any use of this object by onConnect would return an error.
169+
err := bc.onConnect(bc)
170+
// If the client is closed right after onConnect, but before
171+
// the following lock- this method would return an error because
172+
// the client has been closed.
173+
bc.Lock()
174+
defer bc.Unlock()
175+
if bc.shouldStop() {
176+
return errors.New("closing")
177+
}
178+
// If the client is closed right after this method exits,
179+
// it's because this method returned nil and not an error.
180+
// So- connect() would return nil also, and the flow of the goroutine
181+
// is returned to doAction(), where action() is invoked - and is configured
182+
// to check whether the client has closed or not.
183+
if err == nil {
150184
return nil
151185
}
152186
logger.Error("Failed setting up broadcast:", err)
153-
conn.Close()
154187
return err
155188
}
156189

@@ -159,12 +192,39 @@ func (bc *broadcastClient) shouldStop() bool {
159192
}
160193

161194
func (bc *broadcastClient) Close() {
195+
bc.Lock()
196+
defer bc.Unlock()
197+
if bc.shouldStop() {
198+
return
199+
}
162200
atomic.StoreInt32(&bc.stopFlag, int32(1))
163201
bc.stopChan <- struct{}{}
164-
bc.RLock()
165-
defer bc.RUnlock()
166202
if bc.conn == nil {
167203
return
168204
}
169205
bc.conn.Close()
170206
}
207+
208+
func (bc *broadcastClient) disconnect() {
209+
bc.Lock()
210+
defer bc.Unlock()
211+
if bc.conn == nil {
212+
return
213+
}
214+
bc.conn.Close()
215+
bc.conn = nil
216+
bc.BlocksDeliverer = nil
217+
}
218+
219+
type connection struct {
220+
*grpc.ClientConn
221+
sync.Once
222+
}
223+
224+
func (c *connection) Close() error {
225+
var err error
226+
c.Once.Do(func() {
227+
err = c.ClientConn.Close()
228+
})
229+
return err
230+
}

core/deliverservice/client_test.go

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,20 @@ limitations under the License.
1717
package deliverclient
1818

1919
import (
20+
"crypto/sha256"
2021
"errors"
22+
"math"
2123
"sync"
2224
"sync/atomic"
2325
"testing"
2426
"time"
2527

28+
"github.com/hyperledger/fabric/core/comm"
2629
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
2730
"github.com/hyperledger/fabric/core/deliverservice/mocks"
2831
"github.com/hyperledger/fabric/protos/common"
2932
"github.com/hyperledger/fabric/protos/orderer"
33+
"github.com/hyperledger/fabric/protos/utils"
3034
"github.com/stretchr/testify/assert"
3135
"golang.org/x/net/context"
3236
"google.golang.org/grpc"
@@ -153,7 +157,6 @@ func (cp *connProducer) UpdateEndpoints(endpoints []string) {
153157
}
154158

155159
func TestOrderingServiceConnFailure(t *testing.T) {
156-
t.Parallel()
157160
testOrderingServiceConnFailure(t, blockDelivererConsumerWithRecv)
158161
testOrderingServiceConnFailure(t, blockDelivererConsumerWithSend)
159162
assert.Equal(t, 0, connNumber)
@@ -406,13 +409,11 @@ func testLimitedConnAttempts(t *testing.T, bdc blocksDelivererConsumer) {
406409
}
407410

408411
func TestLimitedTotalConnTimeRcv(t *testing.T) {
409-
t.Parallel()
410412
testLimitedTotalConnTime(t, blockDelivererConsumerWithRecv)
411413
assert.Equal(t, 0, connNumber)
412414
}
413415

414416
func TestLimitedTotalConnTimeSnd(t *testing.T) {
415-
t.Parallel()
416417
testLimitedTotalConnTime(t, blockDelivererConsumerWithSend)
417418
assert.Equal(t, 0, connNumber)
418419
}
@@ -473,11 +474,10 @@ func testGreenPath(t *testing.T, bdc blocksDelivererConsumer) {
473474
}
474475

475476
func TestCloseWhileRecv(t *testing.T) {
476-
t.Parallel()
477477
// Scenario: Recv is being called and after a while,
478478
// the connection is closed.
479479
// The Recv should return immediately in such a case
480-
fakeOrderer := mocks.NewOrderer(5611)
480+
fakeOrderer := mocks.NewOrderer(5611, t)
481481
time.Sleep(time.Second)
482482
defer fakeOrderer.Shutdown()
483483
cp := &connProducer{ordererEndpoint: "localhost:5611"}
@@ -496,6 +496,7 @@ func TestCloseWhileRecv(t *testing.T) {
496496
time.AfterFunc(time.Second, func() {
497497
atomic.StoreInt32(&flag, int32(1))
498498
bc.Close()
499+
bc.Close() // Try to close a second time
499500
})
500501
resp, err := bc.Recv()
501502
// Ensure we returned because bc.Close() was called and not because some other reason
@@ -506,7 +507,6 @@ func TestCloseWhileRecv(t *testing.T) {
506507
}
507508

508509
func TestCloseWhileSleep(t *testing.T) {
509-
t.Parallel()
510510
testCloseWhileSleep(t, blockDelivererConsumerWithRecv)
511511
testCloseWhileSleep(t, blockDelivererConsumerWithSend)
512512
assert.Equal(t, 0, connNumber)
@@ -543,9 +543,61 @@ func testCloseWhileSleep(t *testing.T, bdc blocksDelivererConsumer) {
543543
go func() {
544544
wg.Wait()
545545
bc.Close()
546+
bc.Close() // Try to close a second time
546547
}()
547548
err := bdc(bc)
548549
assert.Error(t, err)
549550
assert.Equal(t, 1, cp.connAttempts)
550551
assert.Equal(t, 0, setupInvoked)
551552
}
553+
554+
type signerMock struct {
555+
}
556+
557+
func (s *signerMock) NewSignatureHeader() (*common.SignatureHeader, error) {
558+
return &common.SignatureHeader{}, nil
559+
}
560+
561+
func (s *signerMock) Sign(message []byte) ([]byte, error) {
562+
hasher := sha256.New()
563+
hasher.Write(message)
564+
return hasher.Sum(nil), nil
565+
}
566+
567+
func TestProductionUsage(t *testing.T) {
568+
// This test configures the client in a similar fashion as will be
569+
// in production, and tests against a live gRPC server.
570+
os := mocks.NewOrderer(5612, t)
571+
os.SetNextExpectedSeek(5)
572+
defer os.Shutdown()
573+
connFact := func(endpoint string) (*grpc.ClientConn, error) {
574+
return grpc.Dial(endpoint, grpc.WithInsecure(), grpc.WithBlock())
575+
}
576+
prod := comm.NewConnectionProducer(connFact, []string{"localhost:5612"})
577+
clFact := func(cc *grpc.ClientConn) orderer.AtomicBroadcastClient {
578+
return orderer.NewAtomicBroadcastClient(cc)
579+
}
580+
onConnect := func(bd blocksprovider.BlocksDeliverer) error {
581+
env, err := utils.CreateSignedEnvelope(common.HeaderType_CONFIG_UPDATE,
582+
"TEST",
583+
&signerMock{}, newTestSeekInfo(), 0, 0)
584+
assert.NoError(t, err)
585+
return bd.Send(env)
586+
}
587+
retryPol := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) {
588+
return time.Second * 3, attemptNum < 2
589+
}
590+
cl := NewBroadcastClient(prod, clFact, onConnect, retryPol)
591+
go os.SendBlock(5)
592+
resp, err := cl.Recv()
593+
assert.NoError(t, err)
594+
assert.NotNil(t, resp)
595+
assert.Equal(t, uint64(5), resp.GetBlock().Header.Number)
596+
}
597+
598+
func newTestSeekInfo() *orderer.SeekInfo {
599+
return &orderer.SeekInfo{Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: 5}}},
600+
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
601+
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
602+
}
603+
}

core/deliverservice/mocks/orderer.go

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,45 @@ package mocks
1919
import (
2020
"fmt"
2121
"net"
22+
"sync/atomic"
23+
"testing"
2224

25+
"github.com/golang/protobuf/proto"
26+
"github.com/hyperledger/fabric/protos/common"
2327
"github.com/hyperledger/fabric/protos/orderer"
28+
"github.com/stretchr/testify/assert"
2429
"google.golang.org/grpc"
2530
)
2631

2732
type Orderer struct {
2833
net.Listener
2934
*grpc.Server
35+
nextExpectedSeek uint64
36+
t *testing.T
37+
blockChannel chan uint64
38+
stopChan chan struct{}
3039
}
3140

32-
func NewOrderer(port int) *Orderer {
41+
func NewOrderer(port int, t *testing.T) *Orderer {
3342
srv := grpc.NewServer()
3443
lsnr, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
3544
if err != nil {
3645
panic(err)
3746
}
3847
go srv.Serve(lsnr)
39-
o := &Orderer{Server: srv, Listener: lsnr}
48+
o := &Orderer{Server: srv,
49+
Listener: lsnr,
50+
t: t,
51+
nextExpectedSeek: uint64(1),
52+
blockChannel: make(chan uint64, 1),
53+
stopChan: make(chan struct{}, 1),
54+
}
4055
orderer.RegisterAtomicBroadcastServer(srv, o)
4156
return o
4257
}
4358

4459
func (o *Orderer) Shutdown() {
60+
o.stopChan <- struct{}{}
4561
o.Server.Stop()
4662
o.Listener.Close()
4763
}
@@ -50,6 +66,44 @@ func (*Orderer) Broadcast(orderer.AtomicBroadcast_BroadcastServer) error {
5066
panic("Should not have ben called")
5167
}
5268

53-
func (*Orderer) Deliver(orderer.AtomicBroadcast_DeliverServer) error {
54-
return nil
69+
func (o *Orderer) SetNextExpectedSeek(seq uint64) {
70+
atomic.StoreUint64(&o.nextExpectedSeek, uint64(seq))
71+
}
72+
73+
func (o *Orderer) SendBlock(seq uint64) {
74+
o.blockChannel <- seq
75+
}
76+
77+
func (o *Orderer) Deliver(stream orderer.AtomicBroadcast_DeliverServer) error {
78+
envlp, err := stream.Recv()
79+
if err != nil {
80+
fmt.Println(err)
81+
return nil
82+
}
83+
payload := &common.Payload{}
84+
proto.Unmarshal(envlp.Payload, payload)
85+
seekInfo := &orderer.SeekInfo{}
86+
proto.Unmarshal(payload.Data, seekInfo)
87+
assert.True(o.t, seekInfo.Behavior == orderer.SeekInfo_BLOCK_UNTIL_READY)
88+
assert.Equal(o.t, atomic.LoadUint64(&o.nextExpectedSeek), seekInfo.Start.GetSpecified().Number)
89+
90+
for {
91+
select {
92+
case <-o.stopChan:
93+
return nil
94+
case seq := <-o.blockChannel:
95+
o.sendBlock(stream, seq)
96+
}
97+
}
98+
}
99+
100+
func (o *Orderer) sendBlock(stream orderer.AtomicBroadcast_DeliverServer, seq uint64) {
101+
block := &common.Block{
102+
Header: &common.BlockHeader{
103+
Number: seq,
104+
},
105+
}
106+
stream.Send(&orderer.DeliverResponse{
107+
Type: &orderer.DeliverResponse_Block{Block: block},
108+
})
55109
}

0 commit comments

Comments
 (0)