From 39fafe97605a467e5541e885fd2bffdab2223ae3 Mon Sep 17 00:00:00 2001 From: Divyank Katira Date: Wed, 21 Mar 2018 16:45:30 -0400 Subject: [PATCH] [FAB-8944] Refresh membership only on config update Change-Id: Ia919caf72c247ed0df25a658c4575befb119bf09 Signed-off-by: Divyank Katira --- pkg/common/providers/fab/channel.go | 1 + pkg/fab/channel/ledger.go | 33 ++----------------------- pkg/fab/channel/membership/reference.go | 20 ++++++++++----- pkg/fab/chconfig/chconfig.go | 27 +++++++++++++++----- pkg/fab/mocks/mockchconfig.go | 6 +++++ pkg/fab/resource/resource.go | 4 +-- 6 files changed, 46 insertions(+), 45 deletions(-) diff --git a/pkg/common/providers/fab/channel.go b/pkg/common/providers/fab/channel.go index 0c8aa56365..64ede51812 100644 --- a/pkg/common/providers/fab/channel.go +++ b/pkg/common/providers/fab/channel.go @@ -30,6 +30,7 @@ type ChannelConfig interface { // ChannelCfg contains channel configuration type ChannelCfg interface { ID() string + BlockNumber() uint64 MSPs() []*mspCfg.MSPConfig AnchorPeers() []*OrgAnchorPeer Orderers() []string diff --git a/pkg/fab/channel/ledger.go b/pkg/fab/channel/ledger.go index 0deae4e357..f8e174cf62 100644 --- a/pkg/fab/channel/ledger.go +++ b/pkg/fab/channel/ledger.go @@ -209,8 +209,7 @@ func createChaincodeQueryResponse(tpr *fab.TransactionProposalResponse) (*pb.Cha // QueryConfigBlock returns the current configuration block for the specified channel. If the // peer doesn't belong to the channel, return error -func (c *Ledger) QueryConfigBlock(reqCtx reqContext.Context, targets []fab.ProposalProcessor, verifier ResponseVerifier) (*common.ConfigEnvelope, error) { - +func (c *Ledger) QueryConfigBlock(reqCtx reqContext.Context, targets []fab.ProposalProcessor, verifier ResponseVerifier) (*common.Block, error) { if len(targets) == 0 { return nil, errors.New("target(s) required") } @@ -226,10 +225,7 @@ func (c *Ledger) QueryConfigBlock(reqCtx reqContext.Context, targets []fab.Propo return nil, matchErr } - block, _ := createCommonBlock(tprs[0]) - - return createConfigEnvelope(block.Data.Data[0]) - + return createCommonBlock(tprs[0]) } func collectProposalResponses(tprs []*fab.TransactionProposalResponse) [][]byte { @@ -286,28 +282,3 @@ func createChaincodeInvokeRequest() fab.ChaincodeInvokeRequest { } return cir } - -func createConfigEnvelope(data []byte) (*common.ConfigEnvelope, error) { - - envelope := &common.Envelope{} - if err := proto.Unmarshal(data, envelope); err != nil { - return nil, errors.Wrap(err, "unmarshal envelope from config block failed") - } - payload := &common.Payload{} - if err := proto.Unmarshal(envelope.Payload, payload); err != nil { - return nil, errors.Wrap(err, "unmarshal payload from envelope failed") - } - channelHeader := &common.ChannelHeader{} - if err := proto.Unmarshal(payload.Header.ChannelHeader, channelHeader); err != nil { - return nil, errors.Wrap(err, "unmarshal payload from envelope failed") - } - if common.HeaderType(channelHeader.Type) != common.HeaderType_CONFIG { - return nil, errors.New("block must be of type 'CONFIG'") - } - configEnvelope := &common.ConfigEnvelope{} - if err := proto.Unmarshal(payload.Data, configEnvelope); err != nil { - return nil, errors.Wrap(err, "unmarshal config envelope failed") - } - - return configEnvelope, nil -} diff --git a/pkg/fab/channel/membership/reference.go b/pkg/fab/channel/membership/reference.go index 8a7600c5b4..72a00e6900 100644 --- a/pkg/fab/channel/membership/reference.go +++ b/pkg/fab/channel/membership/reference.go @@ -19,6 +19,9 @@ type Ref struct { *lazyref.Reference chConfigRef *lazyref.Reference context Context + // Note: the following variables are only accessed from Ref.initializer which is synchronized + configBlockNumber uint64 + mem fab.ChannelMembership } // NewRef returns a new membership reference @@ -64,7 +67,7 @@ func (ref *Ref) get() (fab.ChannelMembership, error) { func (ref *Ref) initializer() lazyref.Initializer { return func() (interface{}, error) { - logger.Debugf("Creating membership...") + logger.Debugf("Initializing membership reference...") channelCfg, err := ref.chConfigRef.Get() if err != nil { @@ -75,12 +78,17 @@ func (ref *Ref) initializer() lazyref.Initializer { return nil, errors.New("chConfigRef.Get() returned unexpected value ") } - //TODO: create new membership only if config block number has changed - membership, err := New(ref.context, cfg) - if err != nil { - return nil, err + logger.Debugf("Got config block with number %d have %d", cfg.BlockNumber(), ref.configBlockNumber) + + // Membership is refreshed only if we have a newer config block + if ref.mem == nil || cfg.BlockNumber() > ref.configBlockNumber { + logger.Debugf("Creating membership...") + ref.mem, err = New(ref.context, cfg) + if err != nil { + return nil, err + } } - return membership, nil + return ref.mem, nil } } diff --git a/pkg/fab/chconfig/chconfig.go b/pkg/fab/chconfig/chconfig.go index f1ef750c89..0744e15ad2 100644 --- a/pkg/fab/chconfig/chconfig.go +++ b/pkg/fab/chconfig/chconfig.go @@ -63,6 +63,7 @@ type ChannelConfig struct { // ChannelCfg contains channel configuration type ChannelCfg struct { id string + blockNumber uint64 msps []*mb.MSPConfig anchorPeers []*fab.OrgAnchorPeer orderers []string @@ -80,6 +81,11 @@ func (cfg *ChannelCfg) ID() string { return cfg.id } +// BlockNumber returns the channel config block number +func (cfg *ChannelCfg) BlockNumber() uint64 { + return cfg.blockNumber +} + // MSPs returns msps func (cfg *ChannelCfg) MSPs() []*mb.MSPConfig { return cfg.msps @@ -157,22 +163,22 @@ func (c *ChannelConfig) queryPeers(reqCtx reqContext.Context) (*ChannelCfg, erro targets = peersToTxnProcessors(c.opts.Targets) } - configEnvelope, err := l.QueryConfigBlock(reqCtx, targets, &channel.TransactionProposalResponseVerifier{MinResponses: minResponses}) + block, err := l.QueryConfigBlock(reqCtx, targets, &channel.TransactionProposalResponseVerifier{MinResponses: minResponses}) if err != nil { return nil, errors.WithMessage(err, "QueryBlockConfig failed") } - return extractConfig(c.channelID, configEnvelope) + return extractConfig(c.channelID, block) } func (c *ChannelConfig) queryOrderer(reqCtx reqContext.Context) (*ChannelCfg, error) { - configEnvelope, err := resource.LastConfigFromOrderer(reqCtx, c.channelID, c.opts.Orderer) + block, err := resource.LastConfigFromOrderer(reqCtx, c.channelID, c.opts.Orderer) if err != nil { return nil, errors.WithMessage(err, "LastConfigFromOrderer failed") } - return extractConfig(c.channelID, configEnvelope) + return extractConfig(c.channelID, block) } func (c *ChannelConfig) getLimitOpts(ctx context.Client) (int, int) { @@ -251,7 +257,15 @@ func prepareOpts(options ...Option) (Opts, error) { return opts, nil } -func extractConfig(channelID string, configEnvelope *common.ConfigEnvelope) (*ChannelCfg, error) { +func extractConfig(channelID string, block *common.Block) (*ChannelCfg, error) { + if block.Header == nil { + return nil, errors.New("expected header in block") + } + + configEnvelope, err := resource.CreateConfigEnvelope(block.Data.Data[0]) + if err != nil { + return nil, err + } group := configEnvelope.Config.ChannelGroup @@ -261,13 +275,14 @@ func extractConfig(channelID string, configEnvelope *common.ConfigEnvelope) (*Ch config := &ChannelCfg{ id: channelID, + blockNumber: block.Header.Number, msps: []*mb.MSPConfig{}, anchorPeers: []*fab.OrgAnchorPeer{}, orderers: []string{}, versions: versions, } - err := loadConfig(config, config.versions.Channel, group, "base", "", true) + err = loadConfig(config, config.versions.Channel, group, "base", "", true) if err != nil { return nil, errors.WithMessage(err, "load config items from config group failed") } diff --git a/pkg/fab/mocks/mockchconfig.go b/pkg/fab/mocks/mockchconfig.go index 09ab09db68..0bf8a5f3ec 100644 --- a/pkg/fab/mocks/mockchconfig.go +++ b/pkg/fab/mocks/mockchconfig.go @@ -17,6 +17,7 @@ import ( // MockChannelCfg contains mock channel configuration type MockChannelCfg struct { MockID string + MockBlockNumber uint64 MockMSPs []*msp.MSPConfig MockAnchorPeers []*fab.OrgAnchorPeer MockOrderers []string @@ -34,6 +35,11 @@ func (cfg *MockChannelCfg) ID() string { return cfg.MockID } +// BlockNumber returns block number +func (cfg *MockChannelCfg) BlockNumber() uint64 { + return cfg.MockBlockNumber +} + // MSPs returns msps func (cfg *MockChannelCfg) MSPs() []*msp.MSPConfig { return cfg.MockMSPs diff --git a/pkg/fab/resource/resource.go b/pkg/fab/resource/resource.go index f3e9426b9f..d34cd24df7 100644 --- a/pkg/fab/resource/resource.go +++ b/pkg/fab/resource/resource.go @@ -116,7 +116,7 @@ func GenesisBlockFromOrderer(reqCtx reqContext.Context, channelName string, orde // LastConfigFromOrderer fetches the current configuration block for the specified channel // from the given orderer -func LastConfigFromOrderer(reqCtx reqContext.Context, channelName string, orderer fab.Orderer) (*common.ConfigEnvelope, error) { +func LastConfigFromOrderer(reqCtx reqContext.Context, channelName string, orderer fab.Orderer) (*common.Block, error) { logger.Debugf("channelConfig - start for channel %s", channelName) // Get the newest block @@ -144,7 +144,7 @@ func LastConfigFromOrderer(reqCtx reqContext.Context, channelName string, ordere return nil, errors.New("apiconfig block must contain one transaction") } - return CreateConfigEnvelope(block.Data.Data[0]) + return block, nil } // JoinChannel sends a join channel proposal to the target peer.