Skip to content

Commit

Permalink
[FAB-1362] Add KafkaBrokers to shared config
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1362

The list of Kafka brokers used for ordering needs to be shared across the
shims (ordering service nodes). This changeset introduces the key and
the getter of that key's value to the sharedconfig package. It also adds
the necessary definition in the orderer protos.

This value is not yet encoded as a configuration item in the genesis
block that the static bootstrapper produces. This will come at a
subsequent changeset, with the introduction of the provisional
bootstrapper.

Change-Id: I6268e79dc25ca2cee8da248ba44c9a572c04d909
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Dec 13, 2016
1 parent be08bc5 commit f42b999
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 48 deletions.
83 changes: 73 additions & 10 deletions orderer/common/sharedconfig/sharedconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package sharedconfig

import (
"fmt"
"regexp"
"strconv"
"strings"

cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
Expand All @@ -26,14 +29,19 @@ import (
"github.com/op/go-logging"
)

// ConsensusTypeKey is the cb.ConfigurationItem type key name for the ConsensusType message
const ConsensusTypeKey = "ConsensusType"
const (
// ConsensusTypeKey is the cb.ConfigurationItem type key name for the ConsensusType message
ConsensusTypeKey = "ConsensusType"

// BatchSizeKey is the cb.ConfigurationItem type key name for the BatchSize message
const BatchSizeKey = "BatchSize"
// BatchSizeKey is the cb.ConfigurationItem type key name for the BatchSize message
BatchSizeKey = "BatchSize"

// ChainCreatorsKey is the cb.ConfigurationItem type key name for the ChainCreators message
const ChainCreatorsKey = "ChainCreators"
// ChainCreatorsKey is the cb.ConfigurationItem type key name for the ChainCreators message
ChainCreatorsKey = "ChainCreators"

// KafkaBrokersKey is the cb.ConfigurationItem type key name for the KafkaBrokers message
KafkaBrokersKey = "KafkaBrokers"
)

var logger = logging.MustGetLogger("orderer/common/sharedconfig")

Expand All @@ -55,12 +63,18 @@ type Manager interface {
// ChainCreators returns the policy names which are allowed for chain creation
// This field is only set for the system ordering chain
ChainCreators() []string

// KafkaBrokers returns the addresses (IP:port notation) of a set of "bootstrap"
// Kafka brokers, i.e. this is not necessarily the entire set of Kafka brokers
// used for ordering
KafkaBrokers() []string
}

type ordererConfig struct {
consensusType string
batchSize uint32
chainCreators []string
kafkaBrokers []string
}

// ManagerImpl is an implementation of Manager and configtx.ConfigHandler
Expand Down Expand Up @@ -93,6 +107,13 @@ func (pm *ManagerImpl) ChainCreators() []string {
return pm.config.chainCreators
}

// KafkaBrokers returns the addresses (IP:port notation) of a set of "bootstrap"
// Kafka brokers, i.e. this is not necessarily the entire set of Kafka brokers
// used for ordering
func (pm *ManagerImpl) KafkaBrokers() []string {
return pm.config.kafkaBrokers
}

// BeginConfig is used to start a new configuration proposal
func (pm *ManagerImpl) BeginConfig() {
if pm.pendingConfig != nil {
Expand Down Expand Up @@ -135,7 +156,6 @@ func (pm *ManagerImpl) ProposeConfig(configItem *cb.ConfigurationItem) error {
if consensusType.Type != pm.config.consensusType {
return fmt.Errorf("Attempted to change the consensus type from %s to %s after init", pm.config.consensusType, consensusType.Type)
}

pm.pendingConfig.consensusType = consensusType.Type
case BatchSizeKey:
batchSize := &ab.BatchSize{}
Expand All @@ -147,17 +167,60 @@ func (pm *ManagerImpl) ProposeConfig(configItem *cb.ConfigurationItem) error {
if batchSize.Messages <= 0 {
return fmt.Errorf("Attempted to set the batch size to %d which is less than or equal to 0", batchSize.Messages)
}

pm.pendingConfig.batchSize = batchSize.Messages
case ChainCreatorsKey:
chainCreators := &ab.ChainCreators{}
err := proto.Unmarshal(configItem.Value, chainCreators)
if err != nil {
return fmt.Errorf("Unmarshaling error for ChainCreator: %s", err)
}

pm.pendingConfig.chainCreators = chainCreators.Policies
case KafkaBrokersKey:
kafkaBrokers := &ab.KafkaBrokers{}
err := proto.Unmarshal(configItem.Value, kafkaBrokers)
if err != nil {
return fmt.Errorf("Unmarshaling error for KafkaBrokers: %s", err)
}
if len(kafkaBrokers.Brokers) == 0 {
return fmt.Errorf("Kafka broker set cannot be nil")
}
for _, broker := range kafkaBrokers.Brokers {
if !brokerEntrySeemsValid(broker) {
return fmt.Errorf("Invalid broker entry: %s", broker)
}
}
pm.pendingConfig.kafkaBrokers = kafkaBrokers.Brokers
}

return nil
}

// This does just a barebones sanitfy check.
func brokerEntrySeemsValid(broker string) bool {
if !strings.Contains(broker, ":") {
return false
}

parts := strings.Split(broker, ":")
if len(parts) > 2 {
return false
}

host := parts[0]
port := parts[1]

if _, err := strconv.ParseUint(port, 10, 16); err != nil {
return false
}

// Valid hostnames may contain only the ASCII letters 'a' through 'z' (in a
// case-insensitive manner), the digits '0' through '9', and the hyphen. IP
// v4 addresses are represented in dot-decimal notation, which consists of
// four decimal numbers, each ranging from 0 to 255, separated by dots,
// e.g., 172.16.254.1
// The following regular expression:
// 1. allows just a-z (case-insensitive), 0-9, and the dot and hyphen characters
// 2. does not allow leading trailing dots or hyphens
re, _ := regexp.Compile("^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9.-]*[a-zA-Z0-9])$")
matched := re.FindString(host)
return len(matched) == len(host)
}
80 changes: 72 additions & 8 deletions orderer/common/sharedconfig/sharedconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func TestCommitWithoutBegin(t *testing.T) {
if !crashes {
t.Fatalf("Should have crashed on multiple begin configs")
}

}

func TestRollback(t *testing.T) {
Expand Down Expand Up @@ -123,17 +122,15 @@ func TestConsensusType(t *testing.T) {
if nowType := m.ConsensusType(); nowType != endType {
t.Fatalf("Consensus type should have ended as %s but was %s", endType, nowType)
}

}

func TestBatchSize(t *testing.T) {
endBatchSize := uint32(10)
invalidMessage :=
&cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: []byte("Garbage Data"),
}
invalidMessage := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: []byte("Garbage Data"),
}
zeroBatchSize := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Expand Down Expand Up @@ -167,5 +164,72 @@ func TestBatchSize(t *testing.T) {
if nowBatchSize := m.BatchSize(); nowBatchSize != endBatchSize {
t.Fatalf("Got batch size of %d when expecting batch size of %d", nowBatchSize, endBatchSize)
}
}

func TestKafkaBrokers(t *testing.T) {
endList := []string{"127.0.0.1:9092", "foo.bar:9092"}

invalidMessage := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: KafkaBrokersKey,
Value: []byte("Garbage Data"),
}

zeroBrokers := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: KafkaBrokersKey,
Value: utils.MarshalOrPanic(&ab.KafkaBrokers{}),
}

badList := []string{"127.0.0.1", "foo.bar", "127.0.0.1:-1", "localhost:65536", "foo.bar.:9092", ".127.0.0.1:9092", "-foo.bar:9092"}
badMessages := []*cb.ConfigurationItem{}
for _, badAddress := range badList {
msg := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: KafkaBrokersKey,
Value: utils.MarshalOrPanic(&ab.KafkaBrokers{Brokers: []string{badAddress}}),
}
badMessages = append(badMessages, msg)
}

validMessage := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: KafkaBrokersKey,
Value: utils.MarshalOrPanic(&ab.KafkaBrokers{Brokers: endList}),
}

m := NewManagerImpl()
m.BeginConfig()

err := m.ProposeConfig(validMessage)
if err != nil {
t.Fatalf("Error applying valid config: %s", err)
}

err = m.ProposeConfig(invalidMessage)
if err == nil {
t.Fatalf("Should have failed on invalid message")
}

err = m.ProposeConfig(zeroBrokers)
if err == nil {
t.Fatalf("Should have rejected empty brokers list")
}

for i := range badMessages {
err = m.ProposeConfig(badMessages[i])
if err == nil {
t.Fatalf("Should have rejected broker address which is obviously malformed")
}
}

m.CommitConfig()

nowList := m.KafkaBrokers()
switch {
case len(nowList) != len(endList), nowList[0] != endList[0]:
t.Fatalf("Got brokers list %s when expecting brokers list %s", nowList, endList)
default:
return
}
}
7 changes: 7 additions & 0 deletions orderer/mocks/sharedconfig/sharedconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type Manager struct {
BatchSizeVal uint32
// ChainCreatorsVal is returned as the result of ChainCreators()
ChainCreatorsVal []string
// KafkaBrokersVal is returned as the result of KafkaBrokers()
KafkaBrokersVal []string
}

// ConsensusType returns the ConsensusTypeVal
Expand All @@ -40,3 +42,8 @@ func (scm *Manager) BatchSize() uint32 {
func (scm *Manager) ChainCreators() []string {
return scm.ChainCreatorsVal
}

// KafkaBrokers returns the KafkaBrokersVal
func (scm *Manager) KafkaBrokers() []string {
return scm.KafkaBrokersVal
}
4 changes: 4 additions & 0 deletions orderer/multichain/systemchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (msc *mockSharedConfig) ChainCreators() []string {
return msc.chainCreators
}

func (msc *mockSharedConfig) KafkaBrokers() []string {
panic("Unimplemented")
}

type mockSupport struct {
mpm *mockPolicyManager
msc *mockSharedConfig
Expand Down
10 changes: 5 additions & 5 deletions protos/common/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ enum Status {
}

enum HeaderType {
MESSAGE = 0; // Used for messages which are signed but opaque
CONFIGURATION_TRANSACTION = 1; // Used for messages which reconfigure the chain
CONFIGURATION_ITEM = 2; // Used inside of the the reconfiguration message for signing over ConfigurationItems
ENDORSER_TRANSACTION = 3; // Used by the SDK to submit endorser based transactions
ORDERER_TRANSACTION = 4; // Used internally by the orderer for management
MESSAGE = 0; // Used for messages which are signed but opaque
CONFIGURATION_TRANSACTION = 1; // Used for messages which reconfigure the chain
CONFIGURATION_ITEM = 2; // Used inside of the the reconfiguration message for signing over ConfigurationItems
ENDORSER_TRANSACTION = 3; // Used by the SDK to submit endorser based transactions
ORDERER_TRANSACTION = 4; // Used internally by the orderer for management
}

// This enum enlist indexes of the block metadata array
Expand Down
1 change: 1 addition & 0 deletions protos/orderer/ab.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 27 additions & 16 deletions protos/orderer/configuration.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f42b999

Please sign in to comment.