Skip to content

Commit

Permalink
Merge "[FAB-2262] Split configtx manager"
Browse files Browse the repository at this point in the history
  • Loading branch information
Srinivasan Muralidharan authored and Gerrit Code Review committed Feb 17, 2017
2 parents ff5f657 + 3b9cc55 commit 23022d7
Show file tree
Hide file tree
Showing 3 changed files with 275 additions and 228 deletions.
121 changes: 121 additions & 0 deletions common/configtx/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
Copyright IBM Corp. 2016-2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package configtx

import (
"fmt"

"github.com/hyperledger/fabric/common/configtx/api"
cb "github.com/hyperledger/fabric/protos/common"
)

type configResult struct {
handler api.Transactional
subResults []*configResult
}

func (cr *configResult) commit() {
for _, subResult := range cr.subResults {
subResult.commit()
}
cr.handler.CommitConfig()
}

func (cr *configResult) rollback() {
for _, subResult := range cr.subResults {
subResult.rollback()
}
cr.handler.RollbackConfig()
}

// proposeGroup proposes a group configuration with a given handler
// it will in turn recursively call itself until all groups have been exhausted
// at each call, it returns the handler that was passed in, plus any handlers returned
// by recursive calls into proposeGroup
func (cm *configManager) proposeGroup(name string, group *cb.ConfigGroup, handler api.Handler) (*configResult, error) {
subGroups := make([]string, len(group.Groups))
i := 0
for subGroup := range group.Groups {
subGroups[i] = subGroup
i++
}

logger.Debugf("Beginning new config for channel %s and group %s", cm.chainID, name)
subHandlers, err := handler.BeginConfig(subGroups)
if err != nil {
return nil, err
}

if len(subHandlers) != len(subGroups) {
return nil, fmt.Errorf("Programming error, did not return as many handlers as groups %d vs %d", len(subHandlers), len(subGroups))
}

result := &configResult{
handler: handler,
subResults: make([]*configResult, 0, len(subGroups)),
}

for i, subGroup := range subGroups {
subResult, err := cm.proposeGroup(name+"/"+subGroup, group.Groups[subGroup], subHandlers[i])
if err != nil {
result.rollback()
return nil, err
}
result.subResults = append(result.subResults, subResult)
}

for key, value := range group.Values {
if err := handler.ProposeConfig(key, value); err != nil {
result.rollback()
return nil, err
}
}

return result, nil
}

func (cm *configManager) proposePolicies(rootGroup *cb.ConfigGroup) (*configResult, error) {
cm.initializer.PolicyHandler().BeginConfig(nil) // XXX temporary workaround until policy manager is adapted with sub-policies

for key, policy := range rootGroup.Policies {
logger.Debugf("Proposing policy: %s", key)
if err := cm.initializer.PolicyHandler().ProposePolicy(key, []string{RootGroupKey}, policy); err != nil {
cm.initializer.PolicyHandler().RollbackConfig()
return nil, err
}
}

return &configResult{handler: cm.initializer.PolicyHandler()}, nil
}

func (cm *configManager) processConfig(channelGroup *cb.ConfigGroup) (*configResult, error) {
helperGroup := cb.NewConfigGroup()
helperGroup.Groups[RootGroupKey] = channelGroup
groupResult, err := cm.proposeGroup("", helperGroup, cm.initializer)
if err != nil {
return nil, err
}

policyResult, err := cm.proposePolicies(channelGroup)
if err != nil {
groupResult.rollback()
return nil, err
}
policyResult.subResults = []*configResult{groupResult}

return policyResult, nil
}
228 changes: 0 additions & 228 deletions common/configtx/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import (
"regexp"

"github.com/hyperledger/fabric/common/configtx/api"
"github.com/hyperledger/fabric/common/policies"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"

logging "github.com/op/go-logging"
)
Expand All @@ -43,25 +41,6 @@ var (
// NewConfigItemPolicyKey is the ID of the policy used when no other policy can be resolved, for instance when attempting to create a new config item
const NewConfigItemPolicyKey = "NewConfigItemPolicy"

type configResult struct {
handler api.Transactional
subResults []*configResult
}

func (cr *configResult) commit() {
for _, subResult := range cr.subResults {
subResult.commit()
}
cr.handler.CommitConfig()
}

func (cr *configResult) rollback() {
for _, subResult := range cr.subResults {
subResult.rollback()
}
cr.handler.RollbackConfig()
}

type configManager struct {
api.Resources
sequence uint64
Expand Down Expand Up @@ -166,213 +145,6 @@ func (cm *configManager) commitCallbacks() {
}
}

// proposeGroup proposes a group configuration with a given handler
// it will in turn recursively call itself until all groups have been exhausted
// at each call, it returns the handler that was passed in, plus any handlers returned
// by recursive calls into proposeGroup
func (cm *configManager) proposeGroup(name string, group *cb.ConfigGroup, handler api.Handler) (*configResult, error) {
subGroups := make([]string, len(group.Groups))
i := 0
for subGroup := range group.Groups {
subGroups[i] = subGroup
i++
}

logger.Debugf("Beginning new config for channel %s and group %s", cm.chainID, name)
subHandlers, err := handler.BeginConfig(subGroups)
if err != nil {
return nil, err
}

if len(subHandlers) != len(subGroups) {
return nil, fmt.Errorf("Programming error, did not return as many handlers as groups %d vs %d", len(subHandlers), len(subGroups))
}

result := &configResult{
handler: handler,
subResults: make([]*configResult, 0, len(subGroups)),
}

for i, subGroup := range subGroups {
subResult, err := cm.proposeGroup(name+"/"+subGroup, group.Groups[subGroup], subHandlers[i])
if err != nil {
result.rollback()
return nil, err
}
result.subResults = append(result.subResults, subResult)
}

for key, value := range group.Values {
if err := handler.ProposeConfig(key, value); err != nil {
result.rollback()
return nil, err
}
}

return result, nil
}

func (cm *configManager) proposePolicies(rootGroup *cb.ConfigGroup) (*configResult, error) {
cm.initializer.PolicyHandler().BeginConfig(nil) // XXX temporary workaround until policy manager is adapted with sub-policies

for key, policy := range rootGroup.Policies {
logger.Debugf("Proposing policy: %s", key)
if err := cm.initializer.PolicyHandler().ProposePolicy(key, []string{RootGroupKey}, policy); err != nil {
cm.initializer.PolicyHandler().RollbackConfig()
return nil, err
}
}

return &configResult{handler: cm.initializer.PolicyHandler()}, nil
}

// authorizeUpdate validates that all modified config has the corresponding modification policies satisfied by the signature set
// it returns a map of the modified config
func (cm *configManager) authorizeUpdate(configUpdateEnv *cb.ConfigUpdateEnvelope) (map[string]comparable, error) {
if configUpdateEnv == nil {
return nil, fmt.Errorf("Cannot process nil ConfigUpdateEnvelope")
}

config, err := UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)
if err != nil {
return nil, err
}

if config.Header == nil {
return nil, fmt.Errorf("Must have header set")
}

seq := computeSequence(config.WriteSet)
if err != nil {
return nil, err
}

signedData, err := configUpdateEnv.AsSignedData()
if err != nil {
return nil, err
}

// Verify config is a sequential update to prevent exhausting sequence numbers
if seq != cm.sequence+1 {
return nil, fmt.Errorf("Config sequence number jumped from %d to %d", cm.sequence, seq)
}

// Verify config is intended for this globally unique chain ID
if config.Header.ChannelId != cm.chainID {
return nil, fmt.Errorf("Config is for the wrong chain, expected %s, got %s", cm.chainID, config.Header.ChannelId)
}

configMap, err := mapConfig(config.WriteSet)
if err != nil {
return nil, err
}
for key, value := range configMap {
logger.Debugf("Processing key %s with value %v", key, value)
if key == "[Groups] /Channel" {
// XXX temporary hack to prevent group evaluation for modification
continue
}

// Ensure the config sequence numbers are correct to prevent replay attacks
var isModified bool

oldValue, ok := cm.config[key]
if ok {
isModified = !value.equals(oldValue)
} else {
if value.version() != seq {
return nil, fmt.Errorf("Key %v was new, but had an older Sequence %d set", key, value.version())
}
isModified = true
}

// If a config item was modified, its Version must be set correctly, and it must satisfy the modification policy
if isModified {
logger.Debugf("Proposed config item %s on channel %s has been modified", key, cm.chainID)

if value.version() != seq {
return nil, fmt.Errorf("Key %s was modified, but its Version %d does not equal current configtx Sequence %d", key, value.version(), seq)
}

// Get the modification policy for this config item if one was previously specified
// or accept it if it is new, as the group policy will be evaluated for its inclusion
var policy policies.Policy
if ok {
policy, _ = cm.PolicyManager().GetPolicy(oldValue.modPolicy())
// Ensure the policy is satisfied
if err = policy.Evaluate(signedData); err != nil {
return nil, err
}
}

}
}

// Ensure that any config items which used to exist still exist, to prevent implicit deletion
for key, _ := range cm.config {
_, ok := configMap[key]
if !ok {
return nil, fmt.Errorf("Missing key %v in new config", key)
}

}

return cm.computeUpdateResult(configMap), nil
}

// computeUpdateResult takes a configMap generated by an update and produces a new configMap overlaying it onto the old config
func (cm *configManager) computeUpdateResult(updatedConfig map[string]comparable) map[string]comparable {
newConfigMap := make(map[string]comparable)
for key, value := range cm.config {
newConfigMap[key] = value
}

for key, value := range updatedConfig {
newConfigMap[key] = value
}
return newConfigMap
}

func (cm *configManager) processConfig(channelGroup *cb.ConfigGroup) (*configResult, error) {
helperGroup := cb.NewConfigGroup()
helperGroup.Groups[RootGroupKey] = channelGroup
groupResult, err := cm.proposeGroup("", helperGroup, cm.initializer)
if err != nil {
return nil, err
}

policyResult, err := cm.proposePolicies(channelGroup)
if err != nil {
groupResult.rollback()
return nil, err
}
policyResult.subResults = []*configResult{groupResult}

return policyResult, nil
}

func envelopeToConfigUpdate(configtx *cb.Envelope) (*cb.ConfigUpdateEnvelope, error) {
payload, err := utils.UnmarshalPayload(configtx.Payload)
if err != nil {
return nil, err
}

if payload.Header == nil || payload.Header.ChannelHeader == nil {
return nil, fmt.Errorf("Envelope must have ChannelHeader")
}

if payload.Header == nil || payload.Header.ChannelHeader.Type != int32(cb.HeaderType_CONFIG_UPDATE) {
return nil, fmt.Errorf("Not a tx of type CONFIG_UPDATE")
}

configUpdateEnv, err := UnmarshalConfigUpdateEnvelope(payload.Data)
if err != nil {
return nil, fmt.Errorf("Error unmarshaling ConfigUpdateEnvelope: %s", err)
}

return configUpdateEnv, nil
}

// Validate attempts to validate a new configtx against the current config state
func (cm *configManager) Validate(configtx *cb.Envelope) error {
configUpdateEnv, err := envelopeToConfigUpdate(configtx)
Expand Down
Loading

0 comments on commit 23022d7

Please sign in to comment.