Skip to content

Commit d0816f1

Browse files
authored
add test for assembler-consenter reconnection (#126)
* add test for assembler-consenter reconnection Signed-off-by: Natalie Morad <natalie.morad@ibm.com> * fix review Signed-off-by: Natalie Morad <natalie.morad@ibm.com> --------- Signed-off-by: Natalie Morad <natalie.morad@ibm.com>
1 parent 52b0516 commit d0816f1

File tree

3 files changed

+168
-63
lines changed

3 files changed

+168
-63
lines changed

node/assembler/assembler_batcher_consenter_test.go

Lines changed: 63 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,65 +16,84 @@ import (
1616
"github.com/hyperledger/fabric-x-orderer/node/assembler"
1717
"github.com/hyperledger/fabric-x-orderer/node/comm/tlsgen"
1818
"github.com/hyperledger/fabric-x-orderer/node/config"
19+
"github.com/hyperledger/fabric-x-orderer/node/consensus/state"
1920
"github.com/hyperledger/fabric-x-orderer/testutil"
2021

2122
"github.com/hyperledger/fabric-protos-go-apiv2/common"
2223
"github.com/hyperledger/fabric-protos-go-apiv2/orderer"
2324
"github.com/stretchr/testify/require"
2425
)
2526

26-
func TestAssemblerAppendBlockAndIgnoreDuplicate(t *testing.T) {
27+
func TestAssemblerHandlesConsenterReconnect(t *testing.T) {
2728
ca, err := tlsgen.NewCA()
2829
require.NoError(t, err)
2930

30-
batcherStub := NewStubBatcher(t, types.ShardID(1), types.PartyID(1), ca)
31-
defer batcherStub.Stop()
31+
numParties := 4
32+
partyID := types.PartyID(1)
33+
shardID := types.ShardID(1)
3234

33-
consenterStub := NewStubConsenter(t, types.PartyID(1), ca)
34-
defer consenterStub.Stop()
35+
batchersStub, batcherInfos, cleanup := createStubBatchersAndInfos(t, numParties, shardID, ca)
36+
defer cleanup()
3537

36-
assembler, clean := newAssemblerTest(t, 1, 1, ca, batcherStub.batcherInfo, consenterStub.consenterInfo)
37-
defer clean()
38+
consenterStub := NewStubConsenter(t, partyID, ca)
39+
defer consenterStub.Shutdown()
3840

39-
// genesis block added
41+
assembler := newAssemblerTest(t, partyID, shardID, ca, batcherInfos, consenterStub.consenterInfo)
42+
defer assembler.Stop()
43+
44+
// wait for genesis block
4045
require.Eventually(t, func() bool {
4146
return assembler.GetTxCount() == 1
4247
}, 3*time.Second, 100*time.Millisecond)
4348

4449
obaCreator, _ := NewOrderedBatchAttestationCreator()
4550

46-
// create batch with 1 req, send from batcher and consenter
51+
// send batch and matching decision
4752
batch1 := testutil.CreateMockBatch(1, 1, 1, []int{1})
48-
batcherStub.SetNextBatch(batch1)
49-
53+
batchersStub[0].SetNextBatch(batch1)
5054
oba1 := obaCreator.Append(batch1, 1, 1, 1)
51-
consenterStub.SetNextDecision(oba1)
55+
consenterStub.SetNextDecision(oba1.(*state.AvailableBatchOrdered))
5256

5357
require.Eventually(t, func() bool {
5458
return assembler.GetTxCount() == 2
5559
}, 3*time.Second, 100*time.Millisecond)
5660

57-
// create batch with 2 reqs, send from batcher and consenter
61+
// stop consenter and send next batch
62+
consenterStub.Stop()
63+
5864
batch2 := testutil.CreateMockBatch(1, 1, 2, []int{2, 3})
59-
batcherStub.SetNextBatch(batch2)
65+
batchersStub[0].SetNextBatch(batch2)
66+
67+
// restart consenter and send matching decision
68+
consenterStub.Restart()
6069

6170
oba2 := obaCreator.Append(batch2, 2, 1, 1)
62-
consenterStub.SetNextDecision(oba2)
71+
consenterStub.SetNextDecision(oba2.(*state.AvailableBatchOrdered))
6372

6473
require.Eventually(t, func() bool {
6574
return assembler.GetTxCount() == 4
6675
}, 3*time.Second, 100*time.Millisecond)
6776

68-
// send duplicate batch+oba, should be ignored
69-
batcherStub.SetNextBatch(batch2)
70-
consenterStub.SetNextDecision(oba2)
77+
// send next decision and restart consenter
78+
batch3 := testutil.CreateMockBatch(1, 1, 3, []int{4})
79+
oba3 := obaCreator.Append(batch3, 3, 1, 1)
80+
consenterStub.SetNextDecision(oba3.(*state.AvailableBatchOrdered))
81+
82+
// wait for decistion will be sent
83+
time.Sleep(3 * time.Second)
84+
85+
consenterStub.Stop()
86+
consenterStub.Restart()
87+
88+
// send matching batch
89+
batchersStub[0].SetNextBatch(batch3)
7190

72-
require.Never(t, func() bool {
73-
return assembler.GetTxCount() > 4
74-
}, 3*time.Millisecond, 100*time.Millisecond)
91+
require.Eventually(t, func() bool {
92+
return assembler.GetTxCount() == 5
93+
}, 3*time.Second, 100*time.Millisecond)
7594
}
7695

77-
func newAssemblerTest(t *testing.T, partyID int, shardID int, ca tlsgen.CA, batcherInfo config.BatcherInfo, consenterInfo config.ConsenterInfo) (*assembler.Assembler, func()) {
96+
func newAssemblerTest(t *testing.T, partyID types.PartyID, shardID types.ShardID, ca tlsgen.CA, batchersInfo []config.BatcherInfo, consenterInfo config.ConsenterInfo) *assembler.Assembler {
7897
genesisBlock := utils.EmptyGenesisBlock("arma")
7998
genesisBlock.Metadata = &common.BlockMetadata{
8099
Metadata: [][]byte{nil, nil, []byte("dummy"), []byte("dummy")},
@@ -85,19 +104,17 @@ func newAssemblerTest(t *testing.T, partyID int, shardID int, ca tlsgen.CA, batc
85104

86105
shards := []config.ShardInfo{
87106
{
88-
ShardId: types.ShardID(shardID),
89-
Batchers: []config.BatcherInfo{
90-
batcherInfo,
91-
},
107+
ShardId: shardID,
108+
Batchers: batchersInfo,
92109
},
93110
}
94111

95112
nodeConfig := &config.AssemblerNodeConfig{
96113
TLSPrivateKeyFile: ckp.Key,
97114
TLSCertificateFile: ckp.Cert,
98-
PartyId: types.PartyID(partyID),
115+
PartyId: partyID,
99116
Directory: t.TempDir(),
100-
ListenAddress: "0.0.0.0:0",
117+
ListenAddress: "127.0.0.1:0",
101118
PrefetchBufferMemoryBytes: 1 * 1024 * 1024 * 1024,
102119
RestartLedgerScanTimeout: 5 * time.Second,
103120
PrefetchEvictionTtl: time.Hour,
@@ -111,15 +128,30 @@ func newAssemblerTest(t *testing.T, partyID int, shardID int, ca tlsgen.CA, batc
111128

112129
assemblerGRPC := node.CreateGRPCAssembler(nodeConfig)
113130

114-
assembler := assembler.NewAssembler(nodeConfig, assemblerGRPC, genesisBlock, testutil.CreateLogger(t, partyID))
131+
assembler := assembler.NewAssembler(nodeConfig, assemblerGRPC, genesisBlock, testutil.CreateLogger(t, int(partyID)))
115132

116133
orderer.RegisterAtomicBroadcastServer(assemblerGRPC.Server(), assembler)
117134
go func() {
118135
err := assemblerGRPC.Start()
119136
require.NoError(t, err)
120137
}()
121138

122-
return assembler, func() {
123-
assembler.Stop()
139+
return assembler
140+
}
141+
142+
func createStubBatchersAndInfos(t *testing.T, numParties int, shardID types.ShardID, ca tlsgen.CA) ([]*stubBatcher, []config.BatcherInfo, func()) {
143+
var batchers []*stubBatcher
144+
var batcherInfos []config.BatcherInfo
145+
146+
for i := 1; i <= numParties; i++ {
147+
b := NewStubBatcher(t, shardID, types.PartyID(i), ca)
148+
batchers = append(batchers, b)
149+
batcherInfos = append(batcherInfos, b.batcherInfo)
150+
}
151+
152+
return batchers, batcherInfos, func() {
153+
for _, b := range batchers {
154+
b.Shutdown()
155+
}
124156
}
125157
}

node/assembler/stub_batcher_test.go

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package assembler_test
88

99
import (
1010
"fmt"
11-
"sync"
1211
"testing"
1312

1413
"github.com/hyperledger/fabric-x-orderer/common/types"
@@ -31,8 +30,7 @@ type stubBatcher struct {
3130
cert []byte
3231
key []byte
3332
batcherInfo config.BatcherInfo
34-
storedBatch *common.Block
35-
blockLock sync.Mutex
33+
batches chan *common.Block
3634
}
3735

3836
func NewStubBatcher(t *testing.T, shardID types.ShardID, partyID types.PartyID, ca tlsgen.CA) *stubBatcher {
@@ -64,6 +62,7 @@ func NewStubBatcher(t *testing.T, shardID types.ShardID, partyID types.PartyID,
6462
cert: certKeyPair.Cert,
6563
key: certKeyPair.Key,
6664
batcherInfo: batcherInfo,
65+
batches: make(chan *common.Block, 100),
6766
}
6867

6968
orderer.RegisterAtomicBroadcastServer(server.Server(), stubBatcher)
@@ -79,21 +78,58 @@ func (sb *stubBatcher) Stop() {
7978
sb.server.Stop()
8079
}
8180

82-
func (sb *stubBatcher) Broadcast(stream orderer.AtomicBroadcast_BroadcastServer) error {
83-
return fmt.Errorf("not implemented")
81+
func (sb *stubBatcher) Shutdown() {
82+
close(sb.batches)
83+
sb.server.Stop()
8484
}
8585

86-
func (sb *stubBatcher) Deliver(stream orderer.AtomicBroadcast_DeliverServer) error {
87-
sb.blockLock.Lock()
88-
defer sb.blockLock.Unlock()
89-
return stream.Send(&orderer.DeliverResponse{
90-
Type: &orderer.DeliverResponse_Block{Block: sb.storedBatch},
86+
func (sb *stubBatcher) Restart() {
87+
server, err := comm.NewGRPCServer(sb.endpoint, comm.ServerConfig{
88+
SecOpts: comm.SecureOptions{
89+
UseTLS: true,
90+
Certificate: sb.cert,
91+
Key: sb.key,
92+
},
9193
})
94+
if err != nil {
95+
panic(fmt.Sprintf("failed to restart gRPC server: %v", err))
96+
}
97+
98+
sb.server = server
99+
100+
orderer.RegisterAtomicBroadcastServer(server.Server(), sb)
101+
102+
go func() {
103+
if err := sb.server.Start(); err != nil {
104+
panic(err)
105+
}
106+
}()
107+
}
108+
109+
func (sb *stubBatcher) Deliver(stream orderer.AtomicBroadcast_DeliverServer) error {
110+
for {
111+
select {
112+
case b := <-sb.batches:
113+
if b == nil {
114+
return nil
115+
}
116+
err := stream.Send(&orderer.DeliverResponse{
117+
Type: &orderer.DeliverResponse_Block{Block: b},
118+
})
119+
if err != nil {
120+
return err
121+
}
122+
case <-stream.Context().Done():
123+
return stream.Context().Err()
124+
}
125+
}
126+
}
127+
128+
func (sb *stubBatcher) Broadcast(stream orderer.AtomicBroadcast_BroadcastServer) error {
129+
return fmt.Errorf("not implemented")
92130
}
93131

94132
func (sb *stubBatcher) SetNextBatch(batch core.Batch) {
95133
block, _ := ledger.NewFabricBatchFromRequests(sb.partyID, sb.shardID, batch.Seq(), batch.Requests(), []byte(""))
96-
sb.blockLock.Lock()
97-
defer sb.blockLock.Unlock()
98-
sb.storedBatch = (*common.Block)(block)
134+
sb.batches <- (*common.Block)(block)
99135
}

node/assembler/stub_consenter_test.go

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@ package assembler_test
99
import (
1010
"encoding/asn1"
1111
"fmt"
12-
"sync"
1312
"testing"
1413

1514
"github.com/hyperledger/fabric-x-orderer/common/types"
16-
"github.com/hyperledger/fabric-x-orderer/core"
1715
"github.com/hyperledger/fabric-x-orderer/node/comm"
1816
"github.com/hyperledger/fabric-x-orderer/node/comm/tlsgen"
1917
"github.com/hyperledger/fabric-x-orderer/node/config"
@@ -32,8 +30,7 @@ type stubConsenter struct {
3230
cert []byte
3331
key []byte
3432
consenterInfo config.ConsenterInfo
35-
storedBlock *common.Block
36-
blockLock sync.Mutex
33+
decisions chan *common.Block
3734
}
3835

3936
func NewStubConsenter(t *testing.T, partyID types.PartyID, ca tlsgen.CA) *stubConsenter {
@@ -63,6 +60,7 @@ func NewStubConsenter(t *testing.T, partyID types.PartyID, ca tlsgen.CA) *stubCo
6360
key: certKeyPair.Key,
6461
endpoint: server.Address(),
6562
consenterInfo: consenterInfo,
63+
decisions: make(chan *common.Block, 100),
6664
}
6765

6866
orderer.RegisterAtomicBroadcastServer(server.Server(), stubConsenter)
@@ -75,25 +73,62 @@ func NewStubConsenter(t *testing.T, partyID types.PartyID, ca tlsgen.CA) *stubCo
7573
return stubConsenter
7674
}
7775

78-
func (sc *stubConsenter) Deliver(stream orderer.AtomicBroadcast_DeliverServer) error {
79-
sc.blockLock.Lock()
80-
defer sc.blockLock.Unlock()
81-
return stream.Send(&orderer.DeliverResponse{
82-
Type: &orderer.DeliverResponse_Block{Block: sc.storedBlock},
83-
})
76+
func (sc *stubConsenter) Stop() {
77+
sc.server.Stop()
8478
}
8579

86-
func (sc *stubConsenter) Broadcast(stream orderer.AtomicBroadcast_BroadcastServer) error {
87-
return fmt.Errorf("not implemented")
80+
func (sc *stubConsenter) Shutdown() {
81+
close(sc.decisions)
82+
sc.server.Stop()
8883
}
8984

90-
func (sc *stubConsenter) Stop() {
91-
sc.server.Stop()
85+
func (sc *stubConsenter) Restart() {
86+
server, err := comm.NewGRPCServer(sc.endpoint, comm.ServerConfig{
87+
SecOpts: comm.SecureOptions{
88+
UseTLS: true,
89+
Certificate: sc.cert,
90+
Key: sc.key,
91+
},
92+
})
93+
if err != nil {
94+
panic(fmt.Sprintf("failed to restart gRPC server: %v", err))
95+
}
96+
97+
sc.server = server
98+
99+
orderer.RegisterAtomicBroadcastServer(server.Server(), sc)
100+
101+
go func() {
102+
if err := sc.server.Start(); err != nil {
103+
panic(err)
104+
}
105+
}()
106+
}
107+
108+
func (sc *stubConsenter) Deliver(stream orderer.AtomicBroadcast_DeliverServer) error {
109+
for {
110+
select {
111+
case b := <-sc.decisions:
112+
if b == nil {
113+
return nil
114+
}
115+
err := stream.Send(&orderer.DeliverResponse{
116+
Type: &orderer.DeliverResponse_Block{Block: b},
117+
})
118+
if err != nil {
119+
return err
120+
}
121+
case <-stream.Context().Done():
122+
return stream.Context().Err()
123+
}
124+
}
92125
}
93126

94-
func (sc *stubConsenter) SetNextDecision(oba core.OrderedBatchAttestation) {
95-
ba := oba.(*state.AvailableBatchOrdered)
127+
func (sc *stubConsenter) Broadcast(stream orderer.AtomicBroadcast_BroadcastServer) error {
128+
return fmt.Errorf("not implemented")
129+
}
96130

131+
func (sc *stubConsenter) SetNextDecision(ba *state.AvailableBatchOrdered) {
97132
proposal := smartbft_types.Proposal{
98133
Header: (&state.Header{
99134
Num: ba.OrderingInformation.DecisionNum,
@@ -113,9 +148,11 @@ func (sc *stubConsenter) SetNextDecision(oba core.OrderedBatchAttestation) {
113148
signatures := []smartbft_types.Signature{{Value: sigBytes}}
114149
bytes := state.DecisionToBytes(proposal, signatures)
115150

116-
sc.blockLock.Lock()
117-
defer sc.blockLock.Unlock()
118-
sc.storedBlock = &common.Block{
151+
sc.decisions <- &common.Block{
152+
Header: &common.BlockHeader{
153+
Number: uint64(ba.OrderingInformation.DecisionNum),
154+
PreviousHash: ba.OrderingInformation.PrevHash,
155+
},
119156
Data: &common.BlockData{Data: [][]byte{bytes}},
120157
}
121158
}

0 commit comments

Comments
 (0)