Skip to content

Commit f8e5ac8

Browse files
committed
add config-puller delivery client to router, and pull config blocks for consensus
add new config blocks to the config-store add test Signed-off-by: Dor.Katzelnick <Dor.Katzelnick@ibm.com>
1 parent 24d2fe4 commit f8e5ac8

File tree

5 files changed

+291
-10
lines changed

5 files changed

+291
-10
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package delivery
8+
9+
import (
10+
"context"
11+
12+
"github.com/hyperledger/fabric-protos-go-apiv2/common"
13+
"github.com/hyperledger/fabric-protos-go-apiv2/orderer"
14+
"github.com/hyperledger/fabric-x-orderer/common/types"
15+
"github.com/hyperledger/fabric-x-orderer/node/comm"
16+
node_config "github.com/hyperledger/fabric-x-orderer/node/config"
17+
"github.com/hyperledger/fabric/protoutil"
18+
)
19+
20+
const (
21+
configBlocksChanSize = 100
22+
)
23+
24+
// ConsensusConfigPuller pulls decisions from consensus and consumes config blocks.
25+
type ConsensusConfigPuller struct {
26+
tlsKey, tlsCert []byte
27+
endpoint string
28+
cc comm.ClientConfig
29+
logger types.Logger
30+
seekInfo *orderer.SeekInfo
31+
cancelCtx context.Context
32+
ctxCancelFunc context.CancelFunc
33+
}
34+
35+
func NewConsensusConfigPuller(config *node_config.RouterNodeConfig, logger types.Logger, seekInfo *orderer.SeekInfo) *ConsensusConfigPuller {
36+
ctx, cancelFunc := context.WithCancel(context.Background())
37+
configPuller := &ConsensusConfigPuller{
38+
cc: clientConfig(config.Consenter.TLSCACerts, config.TLSPrivateKeyFile, config.TLSCertificateFile),
39+
endpoint: config.Consenter.Endpoint,
40+
logger: logger,
41+
seekInfo: seekInfo,
42+
tlsKey: config.TLSPrivateKeyFile,
43+
tlsCert: config.TLSCertificateFile,
44+
cancelCtx: ctx,
45+
ctxCancelFunc: cancelFunc,
46+
}
47+
return configPuller
48+
}
49+
50+
func (ccp *ConsensusConfigPuller) PullConfigBlocks() <-chan *common.Block {
51+
endpoint := func() string {
52+
return ccp.endpoint
53+
}
54+
55+
requestEnvelopeFactoryFunc := func() *common.Envelope {
56+
requestEnvelope, err := protoutil.CreateSignedEnvelopeWithTLSBinding(
57+
common.HeaderType_DELIVER_SEEK_INFO,
58+
"consensus",
59+
nil,
60+
ccp.seekInfo,
61+
int32(0),
62+
uint64(0),
63+
nil,
64+
)
65+
if err != nil {
66+
ccp.logger.Panicf("Failed creating signed envelope: %v", err)
67+
}
68+
69+
return requestEnvelope
70+
}
71+
72+
res := make(chan *common.Block, configBlocksChanSize)
73+
74+
blockHandlerFunc := func(block *common.Block) {
75+
// check if the decision contains a config block. then extract the config block and send it on the res channel
76+
header, _, err := extractHeaderAndSigsFromBlock(block)
77+
if err != nil {
78+
ccp.logger.Panicf("Failed extracting header from decision: %s", err)
79+
}
80+
81+
if header.Num == header.DecisionNumOfLastConfigBlock {
82+
res <- header.AvailableCommonBlocks[len(header.AvailableCommonBlocks)-1]
83+
}
84+
}
85+
86+
onClose := func() {
87+
close(res)
88+
}
89+
90+
go Pull(ccp.cancelCtx, "consensus-configBlock-pull", ccp.logger, endpoint, requestEnvelopeFactoryFunc, ccp.cc, blockHandlerFunc, onClose)
91+
92+
return res
93+
}
94+
95+
func (ccp *ConsensusConfigPuller) Stop() {
96+
ccp.ctxCancelFunc()
97+
}

node/router/router.go

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"sync/atomic"
1919
"time"
2020

21+
"github.com/hyperledger/fabric-protos-go-apiv2/common"
2122
"github.com/hyperledger/fabric-protos-go-apiv2/orderer"
2223
"github.com/hyperledger/fabric-x-common/common/policies"
2324
"github.com/hyperledger/fabric-x-orderer/common/configstore"
@@ -26,6 +27,8 @@ import (
2627
"github.com/hyperledger/fabric-x-orderer/config"
2728
"github.com/hyperledger/fabric-x-orderer/node"
2829
nodeconfig "github.com/hyperledger/fabric-x-orderer/node/config"
30+
"github.com/hyperledger/fabric-x-orderer/node/delivery"
31+
"github.com/hyperledger/fabric-x-orderer/node/ledger"
2932
protos "github.com/hyperledger/fabric-x-orderer/node/protos/comm"
3033
)
3134

@@ -34,6 +37,11 @@ type Net interface {
3437
Address() string
3538
}
3639

40+
type ConfigPuller interface {
41+
PullConfigBlocks() <-chan *common.Block
42+
Stop()
43+
}
44+
3745
type Router struct {
3846
mapper ShardMapper
3947
net Net
@@ -46,6 +54,7 @@ type Router struct {
4654
stopChan chan struct{}
4755
configStore *configstore.Store
4856
configSubmitter ConfigurationSubmitter
57+
configPuller ConfigPuller
4958
}
5059

5160
func NewRouter(config *nodeconfig.RouterNodeConfig, logger types.Logger) *Router {
@@ -81,13 +90,43 @@ func NewRouter(config *nodeconfig.RouterNodeConfig, logger types.Logger) *Router
8190
configSubmitter := NewConfigSubmitter(config.Consenter.Endpoint, tlsCAsOfConsenter,
8291
config.TLSCertificateFile, config.TLSPrivateKeyFile, logger)
8392

93+
configStore, err := configstore.NewStore(config.ConfigStorePath)
94+
if err != nil {
95+
logger.Panicf("Failed creating router config store: %s", err)
96+
}
97+
98+
seekInfo := NextSeekInfoFromConfigStore(configStore, logger)
99+
100+
// TODO - pull cofig blocks from all consenter nodes, not only the one in party
101+
configPuller := delivery.NewConsensusConfigPuller(config, logger, seekInfo)
102+
84103
verifier := createVerifier(config)
85104

86-
r := createRouter(shardIDs, batcherEndpoints, tlsCAsOfBatchers, config, logger, verifier, configSubmitter)
105+
r := createRouter(shardIDs, batcherEndpoints, tlsCAsOfBatchers, config, logger, configStore, verifier, configSubmitter, configPuller)
87106
r.init()
88107
return r
89108
}
90109

110+
// NextSeekInfoFromConfigStore creates a SeekInfo to start pulling config blocks from consensus, based on the last connfig block stored in the config store.
111+
func NextSeekInfoFromConfigStore(configStore *configstore.Store, logger types.Logger) *orderer.SeekInfo {
112+
lastBlock, err := configStore.Last()
113+
if err != nil {
114+
logger.Panicf("Failed getting last config block from config store: %s", err)
115+
}
116+
117+
// check if last block is genesis block
118+
if lastBlock.GetHeader().GetNumber() == 0 {
119+
return delivery.NextSeekInfo(1)
120+
}
121+
122+
ordererBlockMetadata := lastBlock.Metadata.Metadata[common.BlockMetadataIndex_ORDERER]
123+
_, _, _, lastDecisionNumber, _, _, _, err := ledger.AssemblerBlockMetadataFromBytes(ordererBlockMetadata)
124+
if err != nil {
125+
logger.Panicf("Failed extracting decision number from last config block: %s", err)
126+
}
127+
return delivery.NextSeekInfo(uint64(lastDecisionNumber) + 1)
128+
}
129+
91130
func (r *Router) StartRouterService() <-chan struct{} {
92131
srv := node.CreateGRPCRouter(r.routerNodeConfig)
93132

@@ -108,6 +147,8 @@ func (r *Router) StartRouterService() <-chan struct{} {
108147

109148
r.configSubmitter.Start()
110149

150+
go r.pullAndProcessConfigBlocks()
151+
111152
return stop
112153
}
113154

@@ -184,7 +225,7 @@ func (r *Router) Deliver(server orderer.AtomicBroadcast_DeliverServer) error {
184225
return fmt.Errorf("not implemented")
185226
}
186227

187-
func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]string, batcherRootCAs map[types.ShardID][][]byte, rconfig *nodeconfig.RouterNodeConfig, logger types.Logger, verifier *requestfilter.RulesVerifier, configSubmitter ConfigurationSubmitter) *Router {
228+
func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]string, batcherRootCAs map[types.ShardID][][]byte, rconfig *nodeconfig.RouterNodeConfig, logger types.Logger, configStore *configstore.Store, verifier *requestfilter.RulesVerifier, configSubmitter ConfigurationSubmitter, configPuller ConfigPuller) *Router {
188229
if rconfig.NumOfConnectionsForBatcher == 0 {
189230
rconfig.NumOfConnectionsForBatcher = config.DefaultRouterParams.NumberOfConnectionsPerBatcher
190231
}
@@ -193,11 +234,6 @@ func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]s
193234
rconfig.NumOfgRPCStreamsPerConnection = config.DefaultRouterParams.NumberOfStreamsPerConnection
194235
}
195236

196-
configStore, err := configstore.NewStore(rconfig.ConfigStorePath)
197-
if err != nil {
198-
logger.Panicf("Failed creating router config store: %s", err)
199-
}
200-
201237
r := &Router{
202238
mapper: MapperCRC64{
203239
Logger: logger,
@@ -211,6 +247,7 @@ func createRouter(shardIDs []types.ShardID, batcherEndpoints map[types.ShardID]s
211247
stopChan: make(chan struct{}),
212248
configStore: configStore,
213249
configSubmitter: configSubmitter,
250+
configPuller: configPuller,
214251
}
215252

216253
for _, shardId := range shardIDs {
@@ -363,6 +400,37 @@ func createVerifier(config *nodeconfig.RouterNodeConfig) *requestfilter.RulesVer
363400
return rv
364401
}
365402

403+
// pullAndProcessConfigBlocks pulls config blocks from consensus and processes them. this function should be run as a goroutine.
404+
func (r *Router) pullAndProcessConfigBlocks() {
405+
configBlocksChan := r.configPuller.PullConfigBlocks()
406+
defer func() {
407+
r.configPuller.Stop()
408+
r.logger.Infof("Stopped config puller")
409+
}()
410+
411+
for {
412+
select {
413+
case configBlock, ok := <-configBlocksChan:
414+
if !ok {
415+
r.logger.Infof("Config blocks channel closed, stopping config blocks processing")
416+
return
417+
}
418+
r.logger.Infof("Received new config block from consensus with block number %d", configBlock.GetHeader().GetNumber())
419+
420+
// TODO process the config block. store in config store and apply.
421+
if err := r.configStore.Add(configBlock); err != nil {
422+
r.logger.Errorf("Failed adding config block to config store: %s", err)
423+
continue
424+
}
425+
r.logger.Infof("Added config block %d to config store", configBlock.GetHeader().GetNumber())
426+
427+
case <-r.stopChan:
428+
r.logger.Infof("Stopping config blocks processing")
429+
return
430+
}
431+
}
432+
}
433+
366434
// IsAllStreamsOK checks that all the streams accross all shard-routers are non-faulty.
367435
// Use for testing only.
368436
func (r *Router) IsAllStreamsOK() bool {
@@ -384,3 +452,13 @@ func (r *Router) IsAllConnectionsDown() bool {
384452
}
385453
return true
386454
}
455+
456+
// GetConfigStoreSize returns the number of config blocks stored in the config store.
457+
// Use for testing only.
458+
func (r *Router) GetConfigStoreSize() int {
459+
list, err := r.configStore.ListBlockNumbers()
460+
if err != nil {
461+
r.logger.Panicf("Failed listing config store block numbers: %s", err)
462+
}
463+
return len(list)
464+
}

node/router/router_test.go

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ import (
1717

1818
"github.com/hyperledger/fabric-protos-go-apiv2/common"
1919
ab "github.com/hyperledger/fabric-protos-go-apiv2/orderer"
20+
"github.com/hyperledger/fabric-x-orderer/common/configstore"
2021
policyMocks "github.com/hyperledger/fabric-x-orderer/common/policy/mocks"
2122
"github.com/hyperledger/fabric-x-orderer/common/types"
2223
"github.com/hyperledger/fabric-x-orderer/node/comm"
2324
"github.com/hyperledger/fabric-x-orderer/node/comm/tlsgen"
2425
"github.com/hyperledger/fabric-x-orderer/node/config"
26+
"github.com/hyperledger/fabric-x-orderer/node/consensus/state"
2527
protos "github.com/hyperledger/fabric-x-orderer/node/protos/comm"
2628
"github.com/hyperledger/fabric-x-orderer/node/router"
2729
configMocks "github.com/hyperledger/fabric-x-orderer/test/mocks"
@@ -59,7 +61,9 @@ func (r *routerTestSetup) Close() {
5961
batcher.server.Stop()
6062
}
6163

62-
r.consenter.Stop()
64+
if r.consenter != nil {
65+
r.consenter.Stop()
66+
}
6367
}
6468

6569
func (r *routerTestSetup) isReconnectComplete() bool {
@@ -86,6 +90,7 @@ func createRouterTestSetup(t *testing.T, partyID types.PartyID, numOfShards int,
8690
for _, batcher := range batchers {
8791
batcher.Start()
8892
}
93+
8994
// create and start stub-consenter
9095
stubConsenter := router.NewStubConsenter(t, ca, partyID)
9196
stubConsenter.Start()
@@ -510,6 +515,39 @@ func submitConfigRequest(t *testing.T, conn *grpc.ClientConn) error {
510515
return nil
511516
}
512517

518+
func TestConfigPullFromConsensus(t *testing.T) {
519+
testSetup := createRouterTestSetup(t, types.PartyID(1), 1, true, false)
520+
err := createServerTLSClientConnection(testSetup, testSetup.ca)
521+
require.NoError(t, err)
522+
require.NotNil(t, testSetup.clientConn)
523+
sc := testSetup.consenter
524+
defer testSetup.Close()
525+
526+
initialConfigStoreSize := testSetup.router.GetConfigStoreSize()
527+
528+
// create a decision with a config block
529+
configBlock := common.Block{Header: &common.BlockHeader{Number: 999}, Data: &common.BlockData{}}
530+
acb := make([]*common.Block, 2)
531+
acb[len(acb)-1] = &configBlock
532+
err = sc.DeliverDecisionFromHeader(&state.Header{Num: 1, DecisionNumOfLastConfigBlock: 1, AvailableCommonBlocks: acb})
533+
require.NoError(t, err)
534+
535+
// check if config block is stored in router's config store
536+
require.Eventually(t, func() bool {
537+
return testSetup.router.GetConfigStoreSize() == initialConfigStoreSize+1
538+
}, 10*time.Second, 10*time.Millisecond)
539+
540+
// create a decision, with no config block
541+
configBlock = common.Block{Header: &common.BlockHeader{Number: 1000}, Data: &common.BlockData{}}
542+
acb = make([]*common.Block, 6)
543+
acb[len(acb)-1] = &configBlock
544+
err = sc.DeliverDecisionFromHeader(&state.Header{Num: 2, DecisionNumOfLastConfigBlock: 1, AvailableCommonBlocks: acb})
545+
require.NoError(t, err)
546+
547+
time.Sleep(100 * time.Millisecond)
548+
require.Equal(t, initialConfigStoreSize+1, testSetup.router.GetConfigStoreSize(), "no new config block should be stored")
549+
}
550+
513551
func createServerTLSClientConnection(testSetup *routerTestSetup, ca tlsgen.CA) error {
514552
cc := comm.ClientConfig{
515553
SecOpts: comm.SecureOptions{
@@ -686,13 +724,17 @@ func createAndStartRouter(t *testing.T, partyID types.PartyID, ca tlsgen.CA, bat
686724

687725
stubConsenterInfo := config.ConsenterInfo{PartyID: partyID, Endpoint: consenter.GetConsenterEndpoint(), TLSCACerts: []config.RawBytes{ca.CertBytes()}}
688726

727+
configStorePath := t.TempDir()
728+
cs, _ := configstore.NewStore(configStorePath)
729+
cs.Add(&common.Block{Header: &common.BlockHeader{Number: 0}, Data: &common.BlockData{}}) // add dummy genesis block
730+
689731
conf := &config.RouterNodeConfig{
690732
PartyID: partyID,
691733
TLSCertificateFile: ckp.Cert,
692734
UseTLS: useTLS,
693735
TLSPrivateKeyFile: ckp.Key,
694736
ListenAddress: "127.0.0.1:0",
695-
ConfigStorePath: t.TempDir(),
737+
ConfigStorePath: configStorePath,
696738
ClientAuthRequired: clientAuthRequired,
697739
Shards: shards,
698740
Consenter: stubConsenterInfo,

0 commit comments

Comments
 (0)