Skip to content

Commit

Permalink
[FAB-1243] Limit batch size to a preferred size
Browse files Browse the repository at this point in the history
 - Added BatchSize.PreferredMaxBytes config property
 - Changed Ordered to limit batch byte size to
   PreferredMaxBytes.

Change-Id: I2f631b15dcb598fc995cc50120e40ae053f76914
Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
  • Loading branch information
Luis Sanchez committed Jan 16, 2017
1 parent 67455b3 commit 4f90cd9
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 50 deletions.
6 changes: 3 additions & 3 deletions orderer/common/blockcutter/blockcutter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Receiver interface {
// * true (indicating ok).
// Otherwise, given a valid message, the pending batch, if not empty, will be cut and returned if:
// - The current message needs to be isolated (as determined during filtering).
// - The current message will cause the pending batch size in bytes to exceed BatchSize.AbsoluteMaxBytes.
// - The current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes.
// - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount.
Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool)

Expand Down Expand Up @@ -82,7 +82,7 @@ func NewReceiverImpl(sharedConfigManager sharedconfig.Manager, filters *filter.R
// * true (indicating ok).
// Otherwise, given a valid message, the pending batch, if not empty, will be cut and returned if:
// - The current message needs to be isolated (as determined during filtering).
// - The current message will cause the pending batch size in bytes to exceed BatchSize.AbsoluteMaxBytes.
// - The current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes.
// - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount.
func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Committer, bool) {
// The messages must be filtered a second time in case configuration has changed since the message was received
Expand Down Expand Up @@ -116,7 +116,7 @@ func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, [][]filter.Commi
committerBatches := [][]filter.Committer{}

messageSizeBytes := messageSizeBytes(msg)
messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > r.sharedConfigManager.BatchSize().AbsoluteMaxBytes
messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes

if messageWillOverflowBatchSizeBytes {
logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes)
Expand Down
33 changes: 19 additions & 14 deletions orderer/common/blockcutter/blockcutter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ var unmatchedTx = &cb.Envelope{Payload: []byte("UNMATCHED")}
func TestNormalBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)

batches, committers, ok := r.Ordered(goodTx)

Expand All @@ -108,8 +109,9 @@ func TestNormalBatch(t *testing.T) {
func TestBadMessageInBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)

batches, committers, ok := r.Ordered(badTx)

Expand Down Expand Up @@ -145,8 +147,9 @@ func TestBadMessageInBatch(t *testing.T) {
func TestUnmatchedMessageInBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)

batches, committers, ok := r.Ordered(unmatchedTx)

Expand Down Expand Up @@ -182,8 +185,9 @@ func TestUnmatchedMessageInBatch(t *testing.T) {
func TestIsolatedEmptyBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)

batches, committers, ok := r.Ordered(isolatedTx)

Expand All @@ -207,8 +211,9 @@ func TestIsolatedEmptyBatch(t *testing.T) {
func TestIsolatedPartialBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)

batches, committers, ok := r.Ordered(goodTx)

Expand Down Expand Up @@ -247,18 +252,18 @@ func TestIsolatedPartialBatch(t *testing.T) {
}
}

func TestBatchSizeAbsoluteMaxBytesOverflow(t *testing.T) {
func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) {
filters := getFilters()

goodTxBytes := messageSizeBytes(goodTx)

// set absolute max bytes such that 10 goodTx will not fit
absoluteMaxBytes := goodTxBytes*10 - 1
// set preferred max bytes such that 10 goodTx will not fit
preferredMaxBytes := goodTxBytes*10 - 1

// set message count > 9
maxMessageCount := uint32(20)

r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes}}, filters)
r := NewReceiverImpl(&mocksharedconfig.Manager{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 2, PreferredMaxBytes: preferredMaxBytes}}, filters)

// enqueue 9 messages
for i := 0; i < 9; i++ {
Expand Down
5 changes: 3 additions & 2 deletions orderer/common/bootstrap/provisional/provisional.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ func New(conf *config.TopLevel) Generator {
chainID: TestChainID,
consensusType: conf.Genesis.OrdererType,
batchSize: &ab.BatchSize{
MaxMessageCount: conf.Genesis.BatchSize.MaxMessageCount,
AbsoluteMaxBytes: conf.Genesis.BatchSize.AbsoluteMaxBytes,
MaxMessageCount: conf.Genesis.BatchSize.MaxMessageCount,
AbsoluteMaxBytes: conf.Genesis.BatchSize.AbsoluteMaxBytes,
PreferredMaxBytes: conf.Genesis.BatchSize.PreferredMaxBytes,
},
batchTimeout: conf.Genesis.BatchTimeout.String(),
}
Expand Down
6 changes: 6 additions & 0 deletions orderer/common/sharedconfig/sharedconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ func (pm *ManagerImpl) ProposeConfig(configItem *cb.ConfigurationItem) error {
if batchSize.AbsoluteMaxBytes == 0 {
return fmt.Errorf("Attempted to set the batch size absolute max bytes to an invalid value: 0")
}
if batchSize.PreferredMaxBytes == 0 {
return fmt.Errorf("Attempted to set the batch size absolute max bytes to an invalid value: 0")
}
if batchSize.PreferredMaxBytes > batchSize.AbsoluteMaxBytes {
return fmt.Errorf("Attempted to set the batch size preferred max bytes (%v) greater than the absolute max bytes (%v).", batchSize.PreferredMaxBytes, batchSize.AbsoluteMaxBytes)
}
pm.pendingConfig.batchSize = batchSize
case BatchTimeoutKey:
var timeoutValue time.Duration
Expand Down
22 changes: 19 additions & 3 deletions orderer/common/sharedconfig/sharedconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,15 @@ func TestBatchSize(t *testing.T) {

validMaxMessageCount := uint32(10)
validAbsoluteMaxBytes := uint32(1000)
validPreferredMaxBytes := uint32(500)

t.Run("ValidConfiguration", func(t *testing.T) {
m := NewManagerImpl()
m.BeginConfig()
err := m.ProposeConfig(&cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: validMaxMessageCount, AbsoluteMaxBytes: validAbsoluteMaxBytes}),
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: validMaxMessageCount, AbsoluteMaxBytes: validAbsoluteMaxBytes, PreferredMaxBytes: validPreferredMaxBytes}),
})
assert.Nil(t, err, "Error applying valid config: %s", err)
m.CommitConfig()
Expand All @@ -154,6 +155,9 @@ func TestBatchSize(t *testing.T) {
if m.BatchSize().AbsoluteMaxBytes != validAbsoluteMaxBytes {
t.Fatalf("Got batch size absolute max bytes of %d. Expected: %d", m.BatchSize().AbsoluteMaxBytes, validAbsoluteMaxBytes)
}
if m.BatchSize().PreferredMaxBytes != validPreferredMaxBytes {
t.Fatalf("Got batch size preferred max bytes of %d. Expected: %d", m.BatchSize().PreferredMaxBytes, validPreferredMaxBytes)
}
})

t.Run("UnserializableConfiguration", func(t *testing.T) {
Expand All @@ -174,7 +178,7 @@ func TestBatchSize(t *testing.T) {
err := m.ProposeConfig(&cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: 0, AbsoluteMaxBytes: validAbsoluteMaxBytes}),
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: 0, AbsoluteMaxBytes: validAbsoluteMaxBytes, PreferredMaxBytes: validPreferredMaxBytes}),
})
assert.NotNil(t, err, "Should have rejected batch size max message count of 0")
m.CommitConfig()
Expand All @@ -186,11 +190,23 @@ func TestBatchSize(t *testing.T) {
err := m.ProposeConfig(&cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: validMaxMessageCount, AbsoluteMaxBytes: 0}),
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: validMaxMessageCount, AbsoluteMaxBytes: 0, PreferredMaxBytes: validPreferredMaxBytes}),
})
assert.NotNil(t, err, "Should have rejected batch size absolute max message bytes of 0")
m.CommitConfig()
})

t.Run("TooLargePreferredMaxBytes", func(t *testing.T) {
m := NewManagerImpl()
m.BeginConfig()
err := m.ProposeConfig(&cb.ConfigurationItem{
Type: cb.ConfigurationItem_Orderer,
Key: BatchSizeKey,
Value: utils.MarshalOrPanic(&ab.BatchSize{MaxMessageCount: validMaxMessageCount, AbsoluteMaxBytes: validAbsoluteMaxBytes, PreferredMaxBytes: validAbsoluteMaxBytes + 1}),
})
assert.NotNil(t, err, "Should have rejected batch size preferred max message bytes greater than absolute max message bytes")
m.CommitConfig()
})
}

func TestBatchTimeout(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions orderer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ var testConf = &config.TopLevel{
OrdererType: "kafka",
BatchTimeout: 500 * time.Millisecond,
BatchSize: config.BatchSize{
MaxMessageCount: 100,
AbsoluteMaxBytes: 10 * 1024 * 1024,
MaxMessageCount: 100,
AbsoluteMaxBytes: 10 * 1024 * 1024,
PreferredMaxBytes: 512 * 1024,
},
},
}
Expand Down
13 changes: 9 additions & 4 deletions orderer/localconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ type Genesis struct {

// BatchSize contains configuration affecting the size of batches
type BatchSize struct {
MaxMessageCount uint32
AbsoluteMaxBytes uint32
MaxMessageCount uint32
AbsoluteMaxBytes uint32
PreferredMaxBytes uint32
}

// Profile contains configuration for Go pprof profiling
Expand Down Expand Up @@ -142,8 +143,9 @@ var defaults = TopLevel{
OrdererType: "solo",
BatchTimeout: 10 * time.Second,
BatchSize: BatchSize{
MaxMessageCount: 10,
AbsoluteMaxBytes: 100000000,
MaxMessageCount: 10,
AbsoluteMaxBytes: 100000000,
PreferredMaxBytes: 512 * 1024,
},
},
}
Expand Down Expand Up @@ -202,6 +204,9 @@ func (c *TopLevel) completeInitialization() {
case c.Genesis.BatchSize.AbsoluteMaxBytes == 0:
logger.Infof("Genesis.BatchSize.AbsoluteMaxBytes unset, setting to %s", defaults.Genesis.BatchSize.AbsoluteMaxBytes)
c.Genesis.BatchSize.AbsoluteMaxBytes = defaults.Genesis.BatchSize.AbsoluteMaxBytes
case c.Genesis.BatchSize.PreferredMaxBytes == 0:
logger.Infof("Genesis.BatchSize.PreferredMaxBytes unset, setting to %s", defaults.Genesis.BatchSize.PreferredMaxBytes)
c.Genesis.BatchSize.PreferredMaxBytes = defaults.Genesis.BatchSize.PreferredMaxBytes
default:
// A bit hacky, but its type makes it impossible to test for a nil value.
// This may be overwritten by the Kafka orderer upon instantiation.
Expand Down
5 changes: 5 additions & 0 deletions orderer/orderer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,8 @@ Genesis:
# Absolute Max Bytes: The absolute maximum number of bytes allowed for
# the serialized messages in a batch.
AbsoluteMaxBytes: 100000000

# Preferred Max Bytes: The preferred maximum number of bytes allowed for
# the serialized messages in a batch. A message larger than the preferred
# max bytes will result in a batch larger than preferred max bytes.
PreferredMaxBytes: 524288
48 changes: 26 additions & 22 deletions protos/orderer/configuration.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions protos/orderer/configuration.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ message BatchSize {
// The byte count of the serialized messages in a batch cannot
// exceed this value.
uint32 absoluteMaxBytes = 2;
// The byte count of the serialized messages in a batch should not
// exceed this value.
uint32 preferredMaxBytes = 3;
}

message BatchTimeout {
Expand Down

0 comments on commit 4f90cd9

Please sign in to comment.