Skip to content

Commit

Permalink
[FAB-1443] Extend sharedconfig with BatchTimeout
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1443

As things stand right now, the solo consenter uses the local config file
to load the batchTimeout value. This however belongs to the shared
config.

This changeset:
1. Defines a `BatchTimeout` proto message (and a `BatchTimeout` key).
2. Extends the `sharedconfig.Manager` interface with a `BatchTimeout`
getter.
3. Modifies the solo consenter so that it fetches the value from the
`sharedconfig.Manager` (via the `ConsenterSupport` object).
4. Modifies the mock `sharedconfig` implementation accordingly.

Change-Id: Ibb5c73733febd0d27f5d7eebfd6b3208864e659e
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Dec 24, 2016
1 parent e757dbf commit 8f1e830
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 58 deletions.
40 changes: 31 additions & 9 deletions orderer/common/sharedconfig/sharedconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"regexp"
"strconv"
"strings"
"time"

cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
Expand All @@ -36,6 +37,9 @@ const (
// BatchSizeKey is the cb.ConfigurationItem type key name for the BatchSize message
BatchSizeKey = "BatchSize"

// BatchTimeoutKey is the cb.ConfigurationItem type key name for the BatchTimeout message
BatchTimeoutKey = "BatchTimeout"

// ChainCreatorsKey is the cb.ConfigurationItem type key name for the ChainCreators message
ChainCreatorsKey = "ChainCreators"

Expand All @@ -60,6 +64,9 @@ type Manager interface {
// BatchSize returns the maximum number of messages to include in a block
BatchSize() *ab.BatchSize

// BatchTimeout returns the amount of time to wait before creating a batch
BatchTimeout() time.Duration

// ChainCreators returns the policy names which are allowed for chain creation
// This field is only set for the system ordering chain
ChainCreators() []string
Expand All @@ -73,6 +80,7 @@ type Manager interface {
type ordererConfig struct {
consensusType string
batchSize *ab.BatchSize
batchTimeout time.Duration
chainCreators []string
kafkaBrokers []string
}
Expand Down Expand Up @@ -101,6 +109,11 @@ func (pm *ManagerImpl) BatchSize() *ab.BatchSize {
return pm.config.batchSize
}

// BatchTimeout returns the amount of time to wait before creating a batch
func (pm *ManagerImpl) BatchTimeout() time.Duration {
return pm.config.batchTimeout
}

// ChainCreators returns the policy names which are allowed for chain creation
// This field is only set for the system ordering chain
func (pm *ManagerImpl) ChainCreators() []string {
Expand Down Expand Up @@ -145,8 +158,7 @@ func (pm *ManagerImpl) ProposeConfig(configItem *cb.ConfigurationItem) error {
switch configItem.Key {
case ConsensusTypeKey:
consensusType := &ab.ConsensusType{}
err := proto.Unmarshal(configItem.Value, consensusType)
if err != nil {
if err := proto.Unmarshal(configItem.Value, consensusType); err != nil {
return fmt.Errorf("Unmarshaling error for ConsensusType: %s", err)
}
if pm.config.consensusType == "" {
Expand All @@ -159,26 +171,36 @@ func (pm *ManagerImpl) ProposeConfig(configItem *cb.ConfigurationItem) error {
pm.pendingConfig.consensusType = consensusType.Type
case BatchSizeKey:
batchSize := &ab.BatchSize{}
err := proto.Unmarshal(configItem.Value, batchSize)
if err != nil {
if err := proto.Unmarshal(configItem.Value, batchSize); err != nil {
return fmt.Errorf("Unmarshaling error for BatchSize: %s", err)
}

if batchSize.MaxMessageCount <= 0 {
return fmt.Errorf("Attempted to set the batch size max message count to %d which is less than or equal to 0", batchSize.MaxMessageCount)
}
pm.pendingConfig.batchSize = batchSize
case BatchTimeoutKey:
var timeoutValue time.Duration
var err error
batchTimeout := &ab.BatchTimeout{}
if err = proto.Unmarshal(configItem.Value, batchTimeout); err != nil {
return fmt.Errorf("Unmarshaling error for BatchTimeout: %s", err)
}
if timeoutValue, err = time.ParseDuration(batchTimeout.Timeout); err != nil {
return fmt.Errorf("Attempted to set the batch timeout to a invalid value: %s", err)
}
if timeoutValue <= 0 {
return fmt.Errorf("Attempted to set the batch timeout to a non-positive value: %s", timeoutValue.String())
}
pm.pendingConfig.batchTimeout = timeoutValue
case ChainCreatorsKey:
chainCreators := &ab.ChainCreators{}
err := proto.Unmarshal(configItem.Value, chainCreators)
if err != nil {
if err := proto.Unmarshal(configItem.Value, chainCreators); err != nil {
return fmt.Errorf("Unmarshaling error for ChainCreator: %s", err)
}
pm.pendingConfig.chainCreators = chainCreators.Policies
case KafkaBrokersKey:
kafkaBrokers := &ab.KafkaBrokers{}
err := proto.Unmarshal(configItem.Value, kafkaBrokers)
if err != nil {
if err := proto.Unmarshal(configItem.Value, kafkaBrokers); err != nil {
return fmt.Errorf("Unmarshaling error for KafkaBrokers: %s", err)
}
if len(kafkaBrokers.Brokers) == 0 {
Expand Down
53 changes: 53 additions & 0 deletions orderer/common/sharedconfig/sharedconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"os/exec"
"testing"
"time"

cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
Expand Down Expand Up @@ -172,6 +173,58 @@ func TestBatchSize(t *testing.T) {
}
}

func TestBatchTimeout(t *testing.T) {
endBatchTimeout, _ := time.ParseDuration("1s")
invalidMessage := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchTimeoutKey,
Value: []byte("Garbage Data"),
}
negativeBatchTimeout := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchTimeoutKey,
Value: utils.MarshalOrPanic(&ab.BatchTimeout{Timeout: "-1s"}),
}
zeroBatchTimeout := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchTimeoutKey,
Value: utils.MarshalOrPanic(&ab.BatchTimeout{Timeout: "0s"}),
}
validMessage := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchTimeoutKey,
Value: utils.MarshalOrPanic(&ab.BatchTimeout{Timeout: endBatchTimeout.String()}),
}
m := NewManagerImpl()
m.BeginConfig()

err := m.ProposeConfig(validMessage)
if err != nil {
t.Fatalf("Error applying valid config: %s", err)
}

err = m.ProposeConfig(invalidMessage)
if err == nil {
t.Fatalf("Should have failed on invalid message")
}

err = m.ProposeConfig(negativeBatchTimeout)
if err == nil {
t.Fatalf("Should have rejected negative batch timeout: %s", err)
}

err = m.ProposeConfig(zeroBatchTimeout)
if err == nil {
t.Fatalf("Should have rejected batch timeout of 0")
}

m.CommitConfig()

if nowBatchTimeout := m.BatchTimeout(); nowBatchTimeout != endBatchTimeout {
t.Fatalf("Got batch timeout of %s when expecting batch size of %s", nowBatchTimeout.String(), endBatchTimeout.String())
}
}

func TestKafkaBrokers(t *testing.T) {
endList := []string{"127.0.0.1:9092", "foo.bar:9092"}

Expand Down
2 changes: 1 addition & 1 deletion orderer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func main() {
}

consenters := make(map[string]multichain.Consenter)
consenters["solo"] = solo.New(conf.General.BatchTimeout)
consenters["solo"] = solo.New()
consenters["kafka"] = kafka.New(conf.Kafka.Version, conf.Kafka.Retry)

manager := multichain.NewManagerImpl(lf, consenters)
Expand Down
8 changes: 8 additions & 0 deletions orderer/mocks/sharedconfig/sharedconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ limitations under the License.
package sharedconfig

import ab "github.com/hyperledger/fabric/protos/orderer"
import "time"

// Manager is a mock implementation of sharedconfig.Manager
type Manager struct {
// ConsensusTypeVal is returned as the result of ConsensusType()
ConsensusTypeVal string
// BatchSizeVal is returned as the result of BatchSize()
BatchSizeVal *ab.BatchSize
// BatchTimeoutVal is returned as the result of BatchTimeout()
BatchTimeoutVal time.Duration
// ChainCreatorsVal is returned as the result of ChainCreators()
ChainCreatorsVal []string
// KafkaBrokersVal is returned as the result of KafkaBrokers()
Expand All @@ -40,6 +43,11 @@ func (scm *Manager) BatchSize() *ab.BatchSize {
return scm.BatchSizeVal
}

// BatchTimeout returns the BatchTimeoutVal
func (scm *Manager) BatchTimeout() time.Duration {
return scm.BatchTimeoutVal
}

// ChainCreators returns the ChainCreatorsVal
func (scm *Manager) ChainCreators() []string {
return scm.ChainCreatorsVal
Expand Down
5 changes: 5 additions & 0 deletions orderer/multichain/systemchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package multichain
import (
"reflect"
"testing"
"time"

"github.com/hyperledger/fabric/common/policies"
coreutil "github.com/hyperledger/fabric/core/util"
Expand Down Expand Up @@ -58,6 +59,10 @@ func (msc *mockSharedConfig) BatchSize() *ab.BatchSize {
panic("Unimplemented")
}

func (msc *mockSharedConfig) BatchTimeout() time.Duration {
panic("Unimplemented")
}

func (msc *mockSharedConfig) ChainCreators() []string {
return msc.chainCreators
}
Expand Down
17 changes: 6 additions & 11 deletions orderer/solo/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ func init() {
logging.SetLevel(logging.DEBUG, "")
}

type consenter struct {
batchTimeout time.Duration
}
type consenter struct{}

type chain struct {
support multichain.ConsenterSupport
Expand All @@ -45,20 +43,17 @@ type chain struct {
// The solo consensus scheme is very simple, and allows only one consenter for a given chain (this process).
// It accepts messages being delivered via Enqueue, orders them, and then uses the blockcutter to form the messages
// into blocks before writing to the given ledger
func New(batchTimeout time.Duration) multichain.Consenter {
return &consenter{
// TODO, ultimately this should come from the configManager at HandleChain
batchTimeout: batchTimeout,
}
func New() multichain.Consenter {
return &consenter{}
}

func (solo *consenter) HandleChain(support multichain.ConsenterSupport) (multichain.Chain, error) {
return newChain(solo.batchTimeout, support), nil
return newChain(support), nil
}

func newChain(batchTimeout time.Duration, support multichain.ConsenterSupport) *chain {
func newChain(support multichain.ConsenterSupport) *chain {
return &chain{
batchTimeout: batchTimeout,
batchTimeout: support.SharedConfig().BatchTimeout(),
support: support,
sendChan: make(chan *cb.Envelope),
exitChan: make(chan struct{}),
Expand Down
33 changes: 21 additions & 12 deletions orderer/solo/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/blockcutter"
mockmultichain "github.com/hyperledger/fabric/orderer/mocks/multichain"
mocksharedconfig "github.com/hyperledger/fabric/orderer/mocks/sharedconfig"
cb "github.com/hyperledger/fabric/protos/common"
)

Expand All @@ -48,12 +49,14 @@ func goWithWait(target func()) *waitableGo {
}

func TestEmptyBatch(t *testing.T) {
batchTimeout, _ := time.ParseDuration("1ms")
support := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope),
BlockCutterVal: mockblockcutter.NewReceiver(),
Batches: make(chan []*cb.Envelope),
BlockCutterVal: mockblockcutter.NewReceiver(),
SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: batchTimeout},
}
defer close(support.BlockCutterVal.Block)
bs := newChain(time.Millisecond, support)
bs := newChain(support)
wg := goWithWait(bs.main)
defer bs.Halt()

Expand All @@ -67,12 +70,14 @@ func TestEmptyBatch(t *testing.T) {
}

func TestBatchTimer(t *testing.T) {
batchTimeout, _ := time.ParseDuration("1ms")
support := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope),
BlockCutterVal: mockblockcutter.NewReceiver(),
Batches: make(chan []*cb.Envelope),
BlockCutterVal: mockblockcutter.NewReceiver(),
SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: batchTimeout},
}
defer close(support.BlockCutterVal.Block)
bs := newChain(time.Millisecond, support)
bs := newChain(support)
wg := goWithWait(bs.main)
defer bs.Halt()

Expand Down Expand Up @@ -100,13 +105,15 @@ func TestBatchTimer(t *testing.T) {
}

func TestBatchTimerHaltOnFilledBatch(t *testing.T) {
batchTimeout, _ := time.ParseDuration("1h")
support := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope),
BlockCutterVal: mockblockcutter.NewReceiver(),
Batches: make(chan []*cb.Envelope),
BlockCutterVal: mockblockcutter.NewReceiver(),
SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: batchTimeout},
}
defer close(support.BlockCutterVal.Block)

bs := newChain(time.Hour, support)
bs := newChain(support)
wg := goWithWait(bs.main)
defer bs.Halt()

Expand Down Expand Up @@ -141,12 +148,14 @@ func TestBatchTimerHaltOnFilledBatch(t *testing.T) {
}

func TestConfigStyleMultiBatch(t *testing.T) {
batchTimeout, _ := time.ParseDuration("1h")
support := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope),
BlockCutterVal: mockblockcutter.NewReceiver(),
Batches: make(chan []*cb.Envelope),
BlockCutterVal: mockblockcutter.NewReceiver(),
SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: batchTimeout},
}
defer close(support.BlockCutterVal.Block)
bs := newChain(time.Hour, support)
bs := newChain(support)
wg := goWithWait(bs.main)
defer bs.Halt()

Expand Down
Loading

0 comments on commit 8f1e830

Please sign in to comment.