Skip to content

Commit

Permalink
[FAB-1242] Add BatchSize.AbsoluteMaxBytes config
Browse files Browse the repository at this point in the history
This changeset just adds the new config property.
A subsequent changeset will exploit it.

Change-Id: I2c0f46b4036575a8d53d71546335c6480a6618c3
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
  • Loading branch information
Luis Sanchez committed Jan 14, 2017
1 parent 9e8fb87 commit f68a97e
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 68 deletions.
3 changes: 2 additions & 1 deletion orderer/common/bootstrap/provisional/provisional.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func New(conf *config.TopLevel) Generator {
chainID: TestChainID,
consensusType: conf.Genesis.OrdererType,
batchSize: &ab.BatchSize{
MaxMessageCount: conf.Genesis.BatchSize.MaxMessageCount,
MaxMessageCount: conf.Genesis.BatchSize.MaxMessageCount,
AbsoluteMaxBytes: conf.Genesis.BatchSize.AbsoluteMaxBytes,
},
batchTimeout: conf.Genesis.BatchTimeout.String(),
}
Expand Down
7 changes: 5 additions & 2 deletions orderer/common/sharedconfig/sharedconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,11 @@ func (pm *ManagerImpl) ProposeConfig(configItem *cb.ConfigurationItem) error {
if err := proto.Unmarshal(configItem.Value, batchSize); err != nil {
return fmt.Errorf("Unmarshaling error for BatchSize: %s", err)
}
if batchSize.MaxMessageCount <= 0 {
return fmt.Errorf("Attempted to set the batch size max message count to %d which is less than or equal to 0", batchSize.MaxMessageCount)
if batchSize.MaxMessageCount == 0 {
return fmt.Errorf("Attempted to set the batch size max message count to an invalid value: 0")
}
if batchSize.AbsoluteMaxBytes == 0 {
return fmt.Errorf("Attempted to set the batch size absolute max bytes to an invalid value: 0")
}
pm.pendingConfig.batchSize = batchSize
case BatchTimeoutKey:
Expand Down
94 changes: 54 additions & 40 deletions orderer/common/sharedconfig/sharedconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
Expand Down Expand Up @@ -132,51 +134,63 @@ func TestConsensusType(t *testing.T) {
}

func TestBatchSize(t *testing.T) {
endBatchSize := struct{ MaxMessageCount uint32 }{
MaxMessageCount: uint32(10),
}
invalidMessage := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: []byte("Garbage Data"),
}
zeroBatchSize := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: 0}),
}
validMessage := &cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: endBatchSize.MaxMessageCount}),
}
m := NewManagerImpl()
m.BeginConfig()

err := m.ProposeConfig(validMessage)
if err != nil {
t.Fatalf("Error applying valid config: %s", err)
}
validMaxMessageCount := uint32(10)
validAbsoluteMaxBytes := uint32(1000)

err = m.ProposeConfig(invalidMessage)
if err == nil {
t.Fatalf("Should have failed on invalid message")
}

err = m.ProposeConfig(zeroBatchSize)
if err == nil {
t.Fatalf("Should have rejected batch size of 0")
}
t.Run("ValidConfiguration", func(t *testing.T) {
m := NewManagerImpl()
m.BeginConfig()
err := m.ProposeConfig(&cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: validMaxMessageCount, AbsoluteMaxBytes: validAbsoluteMaxBytes}),
})
assert.Nil(t, err, "Error applying valid config: %s", err)
m.CommitConfig()
if m.BatchSize().MaxMessageCount != validMaxMessageCount {
t.Fatalf("Got batch size max message count of %d. Expected: %d", m.BatchSize().MaxMessageCount, validMaxMessageCount)
}
if m.BatchSize().AbsoluteMaxBytes != validAbsoluteMaxBytes {
t.Fatalf("Got batch size absolute max bytes of %d. Expected: %d", m.BatchSize().AbsoluteMaxBytes, validAbsoluteMaxBytes)
}
})

m.CommitConfig()
t.Run("UnserializableConfiguration", func(t *testing.T) {
m := NewManagerImpl()
m.BeginConfig()
err := m.ProposeConfig(&cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: []byte("Garbage Data"),
})
assert.NotNil(t, err, "Should have failed on invalid message")
m.CommitConfig()
})

nowBatchSize := struct{ MaxMessageCount uint32 }{
MaxMessageCount: m.BatchSize().MaxMessageCount,
}
t.Run("ZeroMaxMessageCount", func(t *testing.T) {
m := NewManagerImpl()
m.BeginConfig()
err := m.ProposeConfig(&cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: 0, AbsoluteMaxBytes: validAbsoluteMaxBytes}),
})
assert.NotNil(t, err, "Should have rejected batch size max message count of 0")
m.CommitConfig()
})

if nowBatchSize.MaxMessageCount != endBatchSize.MaxMessageCount {
t.Fatalf("Got batch size max message count of %d. Expected: %d", nowBatchSize.MaxMessageCount, endBatchSize.MaxMessageCount)
}
t.Run("ZeroAbsoluteMaxBytes", func(t *testing.T) {
m := NewManagerImpl()
m.BeginConfig()
err := m.ProposeConfig(&cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: validMaxMessageCount, AbsoluteMaxBytes: 0}),
})
assert.NotNil(t, err, "Should have rejected batch size absolute max message bytes of 0")
m.CommitConfig()
})
}

func TestBatchTimeout(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion orderer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ var testConf = &config.TopLevel{
OrdererType: "kafka",
BatchTimeout: 500 * time.Millisecond,
BatchSize: config.BatchSize{
MaxMessageCount: 100,
MaxMessageCount: 100,
AbsoluteMaxBytes: 10 * 1024 * 1024,
},
},
}
Expand Down
9 changes: 7 additions & 2 deletions orderer/localconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type Genesis struct {

// BatchSize contains configuration affecting the size of batches
type BatchSize struct {
MaxMessageCount uint32
MaxMessageCount uint32
AbsoluteMaxBytes uint32
}

// Profile contains configuration for Go pprof profiling
Expand Down Expand Up @@ -141,7 +142,8 @@ var defaults = TopLevel{
OrdererType: "solo",
BatchTimeout: 10 * time.Second,
BatchSize: BatchSize{
MaxMessageCount: 10,
MaxMessageCount: 10,
AbsoluteMaxBytes: 100000000,
},
},
}
Expand Down Expand Up @@ -197,6 +199,9 @@ func (c *TopLevel) completeInitialization() {
case c.Genesis.BatchSize.MaxMessageCount == 0:
logger.Infof("Genesis.BatchSize.MaxMessageCount unset, setting to %s", defaults.Genesis.BatchSize.MaxMessageCount)
c.Genesis.BatchSize.MaxMessageCount = defaults.Genesis.BatchSize.MaxMessageCount
case c.Genesis.BatchSize.AbsoluteMaxBytes == 0:
logger.Infof("Genesis.BatchSize.AbsoluteMaxBytes unset, setting to %s", defaults.Genesis.BatchSize.AbsoluteMaxBytes)
c.Genesis.BatchSize.AbsoluteMaxBytes = defaults.Genesis.BatchSize.AbsoluteMaxBytes
default:
// A bit hacky, but its type makes it impossible to test for a nil value.
// This may be overwritten by the Kafka orderer upon instantiation.
Expand Down
3 changes: 3 additions & 0 deletions orderer/orderer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,6 @@ Genesis:
# Max Message Count: The maximum number of messages to permit in a batch
MaxMessageCount: 10

# Absolute Max Bytes: The absolute maximum number of bytes allowed for
# the serialized messages in a batch.
AbsoluteMaxBytes: 100000000
46 changes: 25 additions & 21 deletions protos/orderer/configuration.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions protos/orderer/configuration.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ message BatchSize {
// Simply specified as number of messages for now, in the future
// we may want to allow this to be specified by size in bytes
uint32 maxMessageCount = 1;
// The byte count of the serialized messages in a batch cannot
// exceed this value.
uint32 absoluteMaxBytes = 2;
}

message BatchTimeout {
Expand Down
2 changes: 1 addition & 1 deletion protos/utils/blockutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func CopyBlockMetadata(src *cb.Block, dst *cb.Block) {
InitBlockMetadata(dst)
}

// CopyBlockMetadata copies metadata from one block into another
// InitBlockMetadata copies metadata from one block into another
func InitBlockMetadata(block *cb.Block) {
if block.Metadata == nil {
block.Metadata = &cb.BlockMetadata{Metadata: [][]byte{[]byte{}, []byte{}, []byte{}}}
Expand Down

0 comments on commit f68a97e

Please sign in to comment.