Skip to content

Commit

Permalink
[FAB-2552] Allow concurrent config proposals
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-2552

The original config proposal framework was designed under the idea that
only one committable proposal was ever given at a time.  However,
because of the need to support partial updates, there are additional
situations where a commit-like path needs to be performed for the config
concurrently with other commit-like paths.

This CR adds an internal tx identifier to configuration proposals to
allow for the safe concurrent processing of configuration.

Change-Id: I9f8cb5eb8a6c426fb930591792781e1159f8e601
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Mar 7, 2017
1 parent bcb9259 commit a552e22
Show file tree
Hide file tree
Showing 19 changed files with 265 additions and 172 deletions.
6 changes: 3 additions & 3 deletions common/cauthdsl/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func makePolicySource(policyResult bool) *cb.Policy {
}

func addPolicy(manager policies.Proposer, id string, policy *cb.Policy) {
manager.BeginPolicyProposals(nil)
err := manager.ProposePolicy(id, &cb.ConfigPolicy{
manager.BeginPolicyProposals(id, nil)
err := manager.ProposePolicy(id, id, &cb.ConfigPolicy{
Policy: policy,
})
if err != nil {
panic(err)
}
manager.CommitProposals()
manager.CommitProposals(id)
}

func providerMap() map[int32]policies.Provider {
Expand Down
10 changes: 5 additions & 5 deletions common/config/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,19 @@ type Orderer interface {

type ValueProposer interface {
// BeginValueProposals called when a config proposal is begun
BeginValueProposals(groups []string) ([]ValueProposer, error)
BeginValueProposals(tx interface{}, groups []string) ([]ValueProposer, error)

// ProposeValue called when config is added to a proposal
ProposeValue(key string, configValue *cb.ConfigValue) error
ProposeValue(tx interface{}, key string, configValue *cb.ConfigValue) error

// RollbackProposals called when a config proposal is abandoned
RollbackProposals()
RollbackProposals(tx interface{})

// PreCommit is invoked before committing the config to catch
// any errors which cannot be caught on a per proposal basis
// TODO, rename other methods to remove Value/Proposal references
PreCommit() error
PreCommit(tx interface{}) error

// CommitProposals called when a config proposal is committed
CommitProposals()
CommitProposals(tx interface{})
}
2 changes: 1 addition & 1 deletion common/config/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewApplicationConfig(ag *ApplicationGroup) *ApplicationConfig {
}
}

func (ac *ApplicationConfig) Validate(groups map[string]ValueProposer) error {
func (ac *ApplicationConfig) Validate(tx interface{}, groups map[string]ValueProposer) error {
ac.applicationOrgs = make(map[string]ApplicationOrg)
var ok bool
for key, value := range groups {
Expand Down
4 changes: 2 additions & 2 deletions common/config/applicationorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ func NewApplicationOrgConfig(aog *ApplicationOrgGroup) *ApplicationOrgConfig {
return aoc
}

func (aoc *ApplicationOrgConfig) Validate(groups map[string]ValueProposer) error {
func (aoc *ApplicationOrgConfig) Validate(tx interface{}, groups map[string]ValueProposer) error {
if logger.IsEnabledFor(logging.DEBUG) {
logger.Debugf("Anchor peers for org %s are %v", aoc.applicationOrgGroup.name, aoc.protos.AnchorPeers)
}
return aoc.OrganizationConfig.Validate(groups)
return aoc.OrganizationConfig.Validate(tx, groups)
}
2 changes: 1 addition & 1 deletion common/config/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (cc *ChannelConfig) OrdererAddresses() []string {

// Validate inspects the generated configuration protos, ensures that the values are correct, and
// sets the ChannelConfig fields that may be referenced after Commit
func (cc *ChannelConfig) Validate(groups map[string]ValueProposer) error {
func (cc *ChannelConfig) Validate(tx interface{}, groups map[string]ValueProposer) error {
for _, validator := range []func() error{
cc.validateHashingAlgorithm,
cc.validateBlockDataHashingStructure,
Expand Down
72 changes: 51 additions & 21 deletions common/config/msp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package msp
import (
"fmt"
"reflect"
"sync"

"github.com/hyperledger/fabric/msp"
mspprotos "github.com/hyperledger/fabric/protos/msp"
Expand All @@ -36,37 +37,59 @@ type mspConfigStore struct {

// MSPConfigHandler
type MSPConfigHandler struct {
pendingConfig *mspConfigStore
pendingConfig map[interface{}]*mspConfigStore
pendingLock sync.RWMutex
msp.MSPManager
}

func NewMSPConfigHandler() *MSPConfigHandler {
return &MSPConfigHandler{
pendingConfig: make(map[interface{}]*mspConfigStore),
}
}

// BeginConfig called when a config proposal is begun
func (bh *MSPConfigHandler) BeginConfig() {
if bh.pendingConfig != nil {
panic("Programming error, called BeginValueProposals while a proposal was in process")
func (bh *MSPConfigHandler) BeginConfig(tx interface{}) {
bh.pendingLock.Lock()
defer bh.pendingLock.Unlock()
_, ok := bh.pendingConfig[tx]
if ok {
panic("Programming error, called BeginConfig mulitply for the same tx")
}
bh.pendingConfig = &mspConfigStore{
bh.pendingConfig[tx] = &mspConfigStore{
idMap: make(map[string]*pendingMSPConfig),
}
}

// RollbackProposals called when a config proposal is abandoned
func (bh *MSPConfigHandler) RollbackProposals() {
bh.pendingConfig = nil
func (bh *MSPConfigHandler) RollbackProposals(tx interface{}) {
bh.pendingLock.Lock()
defer bh.pendingLock.Unlock()
delete(bh.pendingConfig, tx)
}

// CommitProposals called when a config proposal is committed
func (bh *MSPConfigHandler) CommitProposals() {
if bh.pendingConfig == nil {
panic("Programming error, called CommitProposals with no proposal in process")
func (bh *MSPConfigHandler) CommitProposals(tx interface{}) {
bh.pendingLock.Lock()
defer bh.pendingLock.Unlock()
pendingConfig, ok := bh.pendingConfig[tx]
if !ok {
panic("Programming error, called BeginConfig mulitply for the same tx")
}

bh.MSPManager = bh.pendingConfig.proposedMgr
bh.pendingConfig = nil
bh.MSPManager = pendingConfig.proposedMgr
delete(bh.pendingConfig, tx)
}

// ProposeValue called when config is added to a proposal
func (bh *MSPConfigHandler) ProposeMSP(mspConfig *mspprotos.MSPConfig) (msp.MSP, error) {
func (bh *MSPConfigHandler) ProposeMSP(tx interface{}, mspConfig *mspprotos.MSPConfig) (msp.MSP, error) {
bh.pendingLock.RLock()
pendingConfig, ok := bh.pendingConfig[tx]
bh.pendingLock.RUnlock()
if !ok {
panic("Programming error, called BeginConfig mulitply for the same tx")
}

// check that the type for that MSP is supported
if mspConfig.Type != int32(msp.FABRIC) {
return nil, fmt.Errorf("Setup error: unsupported msp type %d", mspConfig.Type)
Expand All @@ -90,12 +113,12 @@ func (bh *MSPConfigHandler) ProposeMSP(mspConfig *mspprotos.MSPConfig) (msp.MSP,
return nil, fmt.Errorf("Could not extract msp identifier, err %s", err)
}

existingPendingMSPConfig, ok := bh.pendingConfig.idMap[mspID]
existingPendingMSPConfig, ok := pendingConfig.idMap[mspID]
if ok && !reflect.DeepEqual(existingPendingMSPConfig.mspConfig, mspConfig) {
return nil, fmt.Errorf("Attempted to define two different versions of MSP: %s", mspID)
}

bh.pendingConfig.idMap[mspID] = &pendingMSPConfig{
pendingConfig.idMap[mspID] = &pendingMSPConfig{
mspConfig: mspConfig,
msp: mspInst,
}
Expand All @@ -104,20 +127,27 @@ func (bh *MSPConfigHandler) ProposeMSP(mspConfig *mspprotos.MSPConfig) (msp.MSP,
}

// PreCommit instantiates the MSP manager
func (bh *MSPConfigHandler) PreCommit() error {
if len(bh.pendingConfig.idMap) == 0 {
func (bh *MSPConfigHandler) PreCommit(tx interface{}) error {
bh.pendingLock.RLock()
pendingConfig, ok := bh.pendingConfig[tx]
bh.pendingLock.RUnlock()
if !ok {
panic("Programming error, called PreCommit for tx which was not started")
}

if len(pendingConfig.idMap) == 0 {
// Cannot instantiate an MSP manager with no MSPs
return nil
}

mspList := make([]msp.MSP, len(bh.pendingConfig.idMap))
mspList := make([]msp.MSP, len(pendingConfig.idMap))
i := 0
for _, pendingMSP := range bh.pendingConfig.idMap {
for _, pendingMSP := range pendingConfig.idMap {
mspList[i] = pendingMSP.msp
i++
}

bh.pendingConfig.proposedMgr = msp.NewMSPManager()
err := bh.pendingConfig.proposedMgr.Setup(mspList)
pendingConfig.proposedMgr = msp.NewMSPManager()
err := pendingConfig.proposedMgr.Setup(mspList)
return err
}
16 changes: 8 additions & 8 deletions common/config/msp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func TestMSPConfigManager(t *testing.T) {
// test success:

// begin/propose/commit
mspCH := &MSPConfigHandler{}
mspCH.BeginConfig()
_, err = mspCH.ProposeMSP(conf)
mspCH := NewMSPConfigHandler()
mspCH.BeginConfig(t)
_, err = mspCH.ProposeMSP(t, conf)
assert.NoError(t, err)
mspCH.PreCommit()
mspCH.CommitProposals()
mspCH.PreCommit(t)
mspCH.CommitProposals(t)

msps, err := mspCH.GetMSPs()
assert.NoError(t, err)
Expand All @@ -47,9 +47,9 @@ func TestMSPConfigManager(t *testing.T) {

// test failure
// begin/propose/commit
mspCH.BeginConfig()
_, err = mspCH.ProposeMSP(conf)
mspCH.BeginConfig(t)
_, err = mspCH.ProposeMSP(t, conf)
assert.NoError(t, err)
_, err = mspCH.ProposeMSP(&mspprotos.MSPConfig{Config: []byte("BARF!")})
_, err = mspCH.ProposeMSP(t, &mspprotos.MSPConfig{Config: []byte("BARF!")})
assert.Error(t, err)
}
2 changes: 1 addition & 1 deletion common/config/orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (oc *OrdererConfig) KafkaBrokers() []string {
return oc.protos.KafkaBrokers.Brokers
}

func (oc *OrdererConfig) Validate(groups map[string]ValueProposer) error {
func (oc *OrdererConfig) Validate(tx interface{}, groups map[string]ValueProposer) error {
for _, validator := range []func() error{
oc.validateConsensusType,
oc.validateBatchSize,
Expand Down
8 changes: 4 additions & 4 deletions common/config/organization.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,19 @@ func NewOrganizationConfig(og *OrganizationGroup) *OrganizationConfig {
}

// Validate returns whether the configuration is valid
func (oc *OrganizationConfig) Validate(groups map[string]ValueProposer) error {
return oc.validateMSP()
func (oc *OrganizationConfig) Validate(tx interface{}, groups map[string]ValueProposer) error {
return oc.validateMSP(tx)
}

func (oc *OrganizationConfig) Commit() {
oc.organizationGroup.OrganizationConfig = oc
}

func (oc *OrganizationConfig) validateMSP() error {
func (oc *OrganizationConfig) validateMSP(tx interface{}) error {
var err error

logger.Debugf("Setting up MSP for org %s", oc.organizationGroup.name)
oc.msp, err = oc.organizationGroup.mspConfigHandler.ProposeMSP(oc.protos.MSP)
oc.msp, err = oc.organizationGroup.mspConfigHandler.ProposeMSP(tx, oc.protos.MSP)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit a552e22

Please sign in to comment.