Skip to content

Commit

Permalink
[FAB-8944] Refresh membership only on config update
Browse files Browse the repository at this point in the history
Change-Id: Ia919caf72c247ed0df25a658c4575befb119bf09
Signed-off-by: Divyank Katira <Divyank.Katira@securekey.com>
  • Loading branch information
d1vyank committed Mar 21, 2018
1 parent 4d29ac5 commit 39fafe9
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 45 deletions.
1 change: 1 addition & 0 deletions pkg/common/providers/fab/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ChannelConfig interface {
// ChannelCfg contains channel configuration
type ChannelCfg interface {
ID() string
BlockNumber() uint64
MSPs() []*mspCfg.MSPConfig
AnchorPeers() []*OrgAnchorPeer
Orderers() []string
Expand Down
33 changes: 2 additions & 31 deletions pkg/fab/channel/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ func createChaincodeQueryResponse(tpr *fab.TransactionProposalResponse) (*pb.Cha

// QueryConfigBlock returns the current configuration block for the specified channel. If the
// peer doesn't belong to the channel, return error
func (c *Ledger) QueryConfigBlock(reqCtx reqContext.Context, targets []fab.ProposalProcessor, verifier ResponseVerifier) (*common.ConfigEnvelope, error) {

func (c *Ledger) QueryConfigBlock(reqCtx reqContext.Context, targets []fab.ProposalProcessor, verifier ResponseVerifier) (*common.Block, error) {
if len(targets) == 0 {
return nil, errors.New("target(s) required")
}
Expand All @@ -226,10 +225,7 @@ func (c *Ledger) QueryConfigBlock(reqCtx reqContext.Context, targets []fab.Propo
return nil, matchErr
}

block, _ := createCommonBlock(tprs[0])

return createConfigEnvelope(block.Data.Data[0])

return createCommonBlock(tprs[0])
}

func collectProposalResponses(tprs []*fab.TransactionProposalResponse) [][]byte {
Expand Down Expand Up @@ -286,28 +282,3 @@ func createChaincodeInvokeRequest() fab.ChaincodeInvokeRequest {
}
return cir
}

func createConfigEnvelope(data []byte) (*common.ConfigEnvelope, error) {

envelope := &common.Envelope{}
if err := proto.Unmarshal(data, envelope); err != nil {
return nil, errors.Wrap(err, "unmarshal envelope from config block failed")
}
payload := &common.Payload{}
if err := proto.Unmarshal(envelope.Payload, payload); err != nil {
return nil, errors.Wrap(err, "unmarshal payload from envelope failed")
}
channelHeader := &common.ChannelHeader{}
if err := proto.Unmarshal(payload.Header.ChannelHeader, channelHeader); err != nil {
return nil, errors.Wrap(err, "unmarshal payload from envelope failed")
}
if common.HeaderType(channelHeader.Type) != common.HeaderType_CONFIG {
return nil, errors.New("block must be of type 'CONFIG'")
}
configEnvelope := &common.ConfigEnvelope{}
if err := proto.Unmarshal(payload.Data, configEnvelope); err != nil {
return nil, errors.Wrap(err, "unmarshal config envelope failed")
}

return configEnvelope, nil
}
20 changes: 14 additions & 6 deletions pkg/fab/channel/membership/reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Ref struct {
*lazyref.Reference
chConfigRef *lazyref.Reference
context Context
// Note: the following variables are only accessed from Ref.initializer which is synchronized
configBlockNumber uint64
mem fab.ChannelMembership
}

// NewRef returns a new membership reference
Expand Down Expand Up @@ -64,7 +67,7 @@ func (ref *Ref) get() (fab.ChannelMembership, error) {

func (ref *Ref) initializer() lazyref.Initializer {
return func() (interface{}, error) {
logger.Debugf("Creating membership...")
logger.Debugf("Initializing membership reference...")

channelCfg, err := ref.chConfigRef.Get()
if err != nil {
Expand All @@ -75,12 +78,17 @@ func (ref *Ref) initializer() lazyref.Initializer {
return nil, errors.New("chConfigRef.Get() returned unexpected value ")
}

//TODO: create new membership only if config block number has changed
membership, err := New(ref.context, cfg)
if err != nil {
return nil, err
logger.Debugf("Got config block with number %d have %d", cfg.BlockNumber(), ref.configBlockNumber)

// Membership is refreshed only if we have a newer config block
if ref.mem == nil || cfg.BlockNumber() > ref.configBlockNumber {
logger.Debugf("Creating membership...")
ref.mem, err = New(ref.context, cfg)
if err != nil {
return nil, err
}
}

return membership, nil
return ref.mem, nil
}
}
27 changes: 21 additions & 6 deletions pkg/fab/chconfig/chconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type ChannelConfig struct {
// ChannelCfg contains channel configuration
type ChannelCfg struct {
id string
blockNumber uint64
msps []*mb.MSPConfig
anchorPeers []*fab.OrgAnchorPeer
orderers []string
Expand All @@ -80,6 +81,11 @@ func (cfg *ChannelCfg) ID() string {
return cfg.id
}

// BlockNumber returns the channel config block number
func (cfg *ChannelCfg) BlockNumber() uint64 {
return cfg.blockNumber
}

// MSPs returns msps
func (cfg *ChannelCfg) MSPs() []*mb.MSPConfig {
return cfg.msps
Expand Down Expand Up @@ -157,22 +163,22 @@ func (c *ChannelConfig) queryPeers(reqCtx reqContext.Context) (*ChannelCfg, erro
targets = peersToTxnProcessors(c.opts.Targets)
}

configEnvelope, err := l.QueryConfigBlock(reqCtx, targets, &channel.TransactionProposalResponseVerifier{MinResponses: minResponses})
block, err := l.QueryConfigBlock(reqCtx, targets, &channel.TransactionProposalResponseVerifier{MinResponses: minResponses})
if err != nil {
return nil, errors.WithMessage(err, "QueryBlockConfig failed")
}

return extractConfig(c.channelID, configEnvelope)
return extractConfig(c.channelID, block)
}

func (c *ChannelConfig) queryOrderer(reqCtx reqContext.Context) (*ChannelCfg, error) {

configEnvelope, err := resource.LastConfigFromOrderer(reqCtx, c.channelID, c.opts.Orderer)
block, err := resource.LastConfigFromOrderer(reqCtx, c.channelID, c.opts.Orderer)
if err != nil {
return nil, errors.WithMessage(err, "LastConfigFromOrderer failed")
}

return extractConfig(c.channelID, configEnvelope)
return extractConfig(c.channelID, block)
}

func (c *ChannelConfig) getLimitOpts(ctx context.Client) (int, int) {
Expand Down Expand Up @@ -251,7 +257,15 @@ func prepareOpts(options ...Option) (Opts, error) {
return opts, nil
}

func extractConfig(channelID string, configEnvelope *common.ConfigEnvelope) (*ChannelCfg, error) {
func extractConfig(channelID string, block *common.Block) (*ChannelCfg, error) {
if block.Header == nil {
return nil, errors.New("expected header in block")
}

configEnvelope, err := resource.CreateConfigEnvelope(block.Data.Data[0])
if err != nil {
return nil, err
}

group := configEnvelope.Config.ChannelGroup

Expand All @@ -261,13 +275,14 @@ func extractConfig(channelID string, configEnvelope *common.ConfigEnvelope) (*Ch

config := &ChannelCfg{
id: channelID,
blockNumber: block.Header.Number,
msps: []*mb.MSPConfig{},
anchorPeers: []*fab.OrgAnchorPeer{},
orderers: []string{},
versions: versions,
}

err := loadConfig(config, config.versions.Channel, group, "base", "", true)
err = loadConfig(config, config.versions.Channel, group, "base", "", true)
if err != nil {
return nil, errors.WithMessage(err, "load config items from config group failed")
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/fab/mocks/mockchconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
// MockChannelCfg contains mock channel configuration
type MockChannelCfg struct {
MockID string
MockBlockNumber uint64
MockMSPs []*msp.MSPConfig
MockAnchorPeers []*fab.OrgAnchorPeer
MockOrderers []string
Expand All @@ -34,6 +35,11 @@ func (cfg *MockChannelCfg) ID() string {
return cfg.MockID
}

// BlockNumber returns block number
func (cfg *MockChannelCfg) BlockNumber() uint64 {
return cfg.MockBlockNumber
}

// MSPs returns msps
func (cfg *MockChannelCfg) MSPs() []*msp.MSPConfig {
return cfg.MockMSPs
Expand Down
4 changes: 2 additions & 2 deletions pkg/fab/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func GenesisBlockFromOrderer(reqCtx reqContext.Context, channelName string, orde

// LastConfigFromOrderer fetches the current configuration block for the specified channel
// from the given orderer
func LastConfigFromOrderer(reqCtx reqContext.Context, channelName string, orderer fab.Orderer) (*common.ConfigEnvelope, error) {
func LastConfigFromOrderer(reqCtx reqContext.Context, channelName string, orderer fab.Orderer) (*common.Block, error) {
logger.Debugf("channelConfig - start for channel %s", channelName)

// Get the newest block
Expand Down Expand Up @@ -144,7 +144,7 @@ func LastConfigFromOrderer(reqCtx reqContext.Context, channelName string, ordere
return nil, errors.New("apiconfig block must contain one transaction")
}

return CreateConfigEnvelope(block.Data.Data[0])
return block, nil
}

// JoinChannel sends a join channel proposal to the target peer.
Expand Down

0 comments on commit 39fafe9

Please sign in to comment.