Skip to content

Commit 0ab835f

Browse files
committed
[FAB-5284] Move kafka orderer to new message flow
This patch accomplishs following tasks: - A new message type `ConfigMsg` is added to `msgprocessor`, so that `ClassifyMsg` now spits three possible results: `ConfigUpdateMsg`, `ConfigMsg` or `NormalMsg`. This is for backwards compatibility. - Remove second validation check in Kafka orderer unless config seq has advanced. This is to adapt to new message flow and benchmark suggests ~2x performance improvement. It's also backwards compatible with v1.0.x orderer. Re-submission of re-validated messages will be addressed by followup patches, see FAB-5720 for more information. - `chain_test.go` is restructured to be more logically organized. This patch looks sizeable mostly because of this. No actual logic has been changed other than several newly added tests. Change-Id: I5b231dcecdfefd17d4b0f0679a2cc7796db9242b Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
1 parent 368b92a commit 0ab835f

File tree

7 files changed

+1698
-970
lines changed

7 files changed

+1698
-970
lines changed

orderer/common/msgprocessor/msgprocessor.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,13 @@ const (
3939
// Messages of this type should be processed by ProcessNormalMsg.
4040
NormalMsg Classification = iota
4141

42-
// ConfigUpdateMsg is the class of configuration related messages.
42+
// ConfigUpdateMsg indicates messages of type CONFIG_UPDATE.
4343
// Messages of this type should be processed by ProcessConfigUpdateMsg.
4444
ConfigUpdateMsg
45+
46+
// ConfigMsg indicates message of type ORDERER_TRANSACTION or CONFIG.
47+
// Messages of this type should be processed by ProcessConfigMsg
48+
ConfigMsg
4549
)
4650

4751
// Processor provides the methods necessary to classify and process any message which

orderer/common/msgprocessor/standardchannel.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,11 @@ func (s *StandardChannel) ClassifyMsg(chdr *cb.ChannelHeader) Classification {
6363
case int32(cb.HeaderType_CONFIG_UPDATE):
6464
return ConfigUpdateMsg
6565
case int32(cb.HeaderType_ORDERER_TRANSACTION):
66-
return ConfigUpdateMsg
66+
// In order to maintain backwards compatibility, we must classify these messages
67+
return ConfigMsg
6768
case int32(cb.HeaderType_CONFIG):
68-
return ConfigUpdateMsg
69+
// In order to maintain backwards compatibility, we must classify these messages
70+
return ConfigMsg
6971
default:
7072
return NormalMsg
7173
}

orderer/common/msgprocessor/standardchannel_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ func TestClassifyMsg(t *testing.T) {
4747
})
4848
t.Run("OrdererTx", func(t *testing.T) {
4949
class := (&StandardChannel{}).ClassifyMsg(&cb.ChannelHeader{Type: int32(cb.HeaderType_ORDERER_TRANSACTION)})
50-
assert.Equal(t, class, ConfigUpdateMsg)
50+
assert.Equal(t, class, ConfigMsg)
5151
})
5252
t.Run("ConfigTx", func(t *testing.T) {
5353
class := (&StandardChannel{}).ClassifyMsg(&cb.ChannelHeader{Type: int32(cb.HeaderType_CONFIG)})
54-
assert.Equal(t, class, ConfigUpdateMsg)
54+
assert.Equal(t, class, ConfigMsg)
5555
})
5656
t.Run("EndorserTx", func(t *testing.T) {
5757
class := (&StandardChannel{}).ClassifyMsg(&cb.ChannelHeader{Type: int32(cb.HeaderType_ENDORSER_TRANSACTION)})

orderer/common/multichannel/util_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (mch *mockChain) Start() {
7070

7171
class := mch.support.ClassifyMsg(chdr)
7272
switch class {
73-
case msgprocessor.ConfigUpdateMsg:
73+
case msgprocessor.ConfigMsg:
7474
batch := mch.support.BlockCutter().Cut()
7575
if batch != nil {
7676
block := mch.support.CreateNextBlock(batch)
@@ -90,6 +90,8 @@ func (mch *mockChain) Start() {
9090
block := mch.support.CreateNextBlock(batch)
9191
mch.support.WriteBlock(block, nil)
9292
}
93+
case msgprocessor.ConfigUpdateMsg:
94+
logger.Panicf("Not expecting msg class ConfigUpdateMsg here")
9395
default:
9496
logger.Panicf("Unsupported msg classification: %v", class)
9597
}

orderer/consensus/kafka/chain.go

Lines changed: 132 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -122,37 +122,46 @@ func (chain *chainImpl) Halt() {
122122

123123
// Implements the consensus.Chain interface. Called by Broadcast().
124124
func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error {
125-
if !chain.enqueue(env) {
126-
return fmt.Errorf("Could not enqueue")
125+
marshaledEnv, err := utils.Marshal(env)
126+
if err != nil {
127+
return fmt.Errorf("cannot enqueue, unable to marshal envelope because = %s", err)
128+
}
129+
if !chain.enqueue(newNormalMessage(marshaledEnv, configSeq)) {
130+
return fmt.Errorf("cannot enqueue")
127131
}
128132
return nil
129133
}
130134

131135
// Implements the consensus.Chain interface. Called by Broadcast().
132136
func (chain *chainImpl) Configure(config *cb.Envelope, configSeq uint64) error {
133-
return chain.Order(config, configSeq)
137+
marshaledConfig, err := utils.Marshal(config)
138+
if err != nil {
139+
return fmt.Errorf("cannot enqueue, unable to marshal config because = %s", err)
140+
}
141+
if !chain.enqueue(newConfigMessage(marshaledConfig, configSeq)) {
142+
return fmt.Errorf("cannot enqueue")
143+
}
144+
return nil
134145
}
135146

136147
// enqueue accepts a message and returns true on acceptance, or false otheriwse.
137-
func (chain *chainImpl) enqueue(env *cb.Envelope) bool {
148+
func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
138149
logger.Debugf("[channel: %s] Enqueueing envelope...", chain.support.ChainID())
139150
select {
140151
case <-chain.startChan: // The Start phase has completed
141152
select {
142153
case <-chain.haltChan: // The chain has been halted, stop here
143-
logger.Warningf("[channel: %s] Will not enqueue, consenter for this channel has been halted", chain.support.ChainID())
154+
logger.Warningf("[channel: %s] consenter for this channel has been halted", chain.support.ChainID())
144155
return false
145156
default: // The post path
146-
marshaledEnv, err := utils.Marshal(env)
157+
payload, err := utils.Marshal(kafkaMsg)
147158
if err != nil {
148-
logger.Errorf("[channel: %s] cannot enqueue, unable to marshal envelope = %s", chain.support.ChainID(), err)
159+
logger.Errorf("[channel: %s] unable to marshal Kafka message because = %s", chain.support.ChainID(), err)
149160
return false
150161
}
151-
// We're good to go
152-
payload := utils.MarshalOrPanic(newRegularMessage(marshaledEnv))
153162
message := newProducerMessage(chain.channel, payload)
154-
if _, _, err := chain.producer.SendMessage(message); err != nil {
155-
logger.Errorf("[channel: %s] cannot enqueue envelope = %s", chain.support.ChainID(), err)
163+
if _, _, err = chain.producer.SendMessage(message); err != nil {
164+
logger.Errorf("[channel: %s] cannot enqueue envelope because = %s", chain.support.ChainID(), err)
156165
return false
157166
}
158167
logger.Debugf("[channel: %s] Envelope enqueued successfully", chain.support.ChainID())
@@ -353,11 +362,25 @@ func newConnectMessage() *ab.KafkaMessage {
353362
}
354363
}
355364

356-
func newRegularMessage(payload []byte) *ab.KafkaMessage {
365+
func newNormalMessage(payload []byte, configSeq uint64) *ab.KafkaMessage {
366+
return &ab.KafkaMessage{
367+
Type: &ab.KafkaMessage_Regular{
368+
Regular: &ab.KafkaMessageRegular{
369+
Payload: payload,
370+
ConfigSeq: configSeq,
371+
Class: ab.KafkaMessageRegular_NORMAL,
372+
},
373+
},
374+
}
375+
}
376+
377+
func newConfigMessage(config []byte, configSeq uint64) *ab.KafkaMessage {
357378
return &ab.KafkaMessage{
358379
Type: &ab.KafkaMessage_Regular{
359380
Regular: &ab.KafkaMessageRegular{
360-
Payload: payload,
381+
Payload: config,
382+
ConfigSeq: configSeq,
383+
Class: ab.KafkaMessageRegular_CONFIG,
361384
},
362385
},
363386
}
@@ -387,51 +410,13 @@ func processConnect(channelName string) error {
387410
}
388411

389412
func processRegular(regularMessage *ab.KafkaMessageRegular, support consensus.ConsenterSupport, timer *<-chan time.Time, receivedOffset int64, lastCutBlockNumber *uint64) error {
390-
env := new(cb.Envelope)
391-
if err := proto.Unmarshal(regularMessage.Payload, env); err != nil {
392-
// This shouldn't happen, it should be filtered at ingress
393-
return fmt.Errorf("unmarshal/%s", err)
394-
}
395-
396-
chdr, err := utils.ChannelHeader(env)
397-
if err != nil {
398-
logger.Panicf("If a message has arrived to this point, it should already have had its header inspected once")
399-
}
400-
401-
class := support.ClassifyMsg(chdr)
402-
switch class {
403-
case msgprocessor.ConfigUpdateMsg:
404-
_, err := support.ProcessNormalMsg(env)
405-
if err != nil {
406-
logger.Warningf("[channel: %s] Discarding bad config message: %s", support.ChainID(), err)
407-
break
408-
}
409-
410-
batch := support.BlockCutter().Cut()
411-
if batch != nil {
412-
block := support.CreateNextBlock(batch)
413-
encodedLastOffsetPersisted := utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: receivedOffset - 1})
414-
support.WriteBlock(block, encodedLastOffsetPersisted)
415-
*lastCutBlockNumber++
416-
}
417-
block := support.CreateNextBlock([]*cb.Envelope{env})
418-
encodedLastOffsetPersisted := utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: receivedOffset})
419-
support.WriteConfigBlock(block, encodedLastOffsetPersisted)
420-
*lastCutBlockNumber++
421-
*timer = nil
422-
case msgprocessor.NormalMsg:
423-
_, err := support.ProcessNormalMsg(env)
424-
if err != nil {
425-
logger.Warningf("Discarding bad normal message: %s", err)
426-
break
427-
}
428-
429-
batches, pending := support.BlockCutter().Ordered(env)
413+
commitNormalMsg := func(message *cb.Envelope) {
414+
batches, pending := support.BlockCutter().Ordered(message)
430415
logger.Debugf("[channel: %s] Ordering results: items in batch = %d, pending = %v", support.ChainID(), len(batches), pending)
431416
if len(batches) == 0 && *timer == nil {
432417
*timer = time.After(support.SharedConfig().BatchTimeout())
433418
logger.Debugf("[channel: %s] Just began %s batch timer", support.ChainID(), support.SharedConfig().BatchTimeout().String())
434-
return nil
419+
return
435420
}
436421

437422
offset := receivedOffset
@@ -453,9 +438,101 @@ func processRegular(regularMessage *ab.KafkaMessageRegular, support consensus.Co
453438
if len(batches) > 0 {
454439
*timer = nil
455440
}
441+
}
442+
443+
commitConfigMsg := func(message *cb.Envelope) {
444+
logger.Debugf("[channel: %s] Received config message", support.ChainID())
445+
batch := support.BlockCutter().Cut()
446+
447+
if batch != nil {
448+
logger.Debugf("[channel: %s] Cut pending messages into block", support.ChainID())
449+
block := support.CreateNextBlock(batch)
450+
encodedLastOffsetPersisted := utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: receivedOffset - 1})
451+
support.WriteBlock(block, encodedLastOffsetPersisted)
452+
*lastCutBlockNumber++
453+
}
454+
455+
logger.Debugf("[channel: %s] Creating isolated block for config message", support.ChainID())
456+
block := support.CreateNextBlock([]*cb.Envelope{message})
457+
encodedLastOffsetPersisted := utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: receivedOffset})
458+
support.WriteConfigBlock(block, encodedLastOffsetPersisted)
459+
*lastCutBlockNumber++
460+
*timer = nil
461+
}
462+
463+
seq := support.Sequence()
464+
465+
env := &cb.Envelope{}
466+
if err := proto.Unmarshal(regularMessage.Payload, env); err != nil {
467+
// This shouldn't happen, it should be filtered at ingress
468+
return fmt.Errorf("failed to unmarshal payload of regular message because = %s", err)
469+
}
470+
471+
logger.Debugf("[channel: %s] Processing regular Kafka message of type %s", support.ChainID(), regularMessage.Class.String())
472+
473+
switch regularMessage.Class {
474+
case ab.KafkaMessageRegular_UNKNOWN:
475+
// Received regular message of type UNKNOWN, indicating it's from v1.0.x orderer
476+
chdr, err := utils.ChannelHeader(env)
477+
if err != nil {
478+
return fmt.Errorf("discarding bad config message because of channel header unmarshalling error = %s", err)
479+
}
480+
481+
class := support.ClassifyMsg(chdr)
482+
switch class {
483+
case msgprocessor.ConfigMsg:
484+
if _, _, err := support.ProcessConfigMsg(env); err != nil {
485+
return fmt.Errorf("discarding bad config message because = %s", err)
486+
}
487+
488+
commitConfigMsg(env)
489+
490+
case msgprocessor.NormalMsg:
491+
if _, err := support.ProcessNormalMsg(env); err != nil {
492+
return fmt.Errorf("discarding bad normal message because = %s", err)
493+
}
494+
495+
commitNormalMsg(env)
496+
497+
case msgprocessor.ConfigUpdateMsg:
498+
return fmt.Errorf("not expecting message of type ConfigUpdate")
499+
500+
default:
501+
logger.Panicf("[channel: %s] Unsupported message classification: %v", support.ChainID(), class)
502+
}
503+
504+
case ab.KafkaMessageRegular_NORMAL:
505+
if regularMessage.ConfigSeq < seq {
506+
logger.Debugf("[channel: %s] Config sequence has advanced since this normal message being validated, re-validating", support.ChainID())
507+
if _, err := support.ProcessNormalMsg(env); err != nil {
508+
return fmt.Errorf("discarding bad normal message because = %s", err)
509+
}
510+
511+
// TODO re-submit stale normal message via `Order`, instead of discarding it immediately. Fix this as part of FAB-5720
512+
return fmt.Errorf("discarding stale normal message because config seq has advanced")
513+
}
514+
515+
commitNormalMsg(env)
516+
517+
case ab.KafkaMessageRegular_CONFIG:
518+
if regularMessage.ConfigSeq < seq {
519+
logger.Debugf("[channel: %s] Config sequence has advanced since this config message being validated, re-validating", support.ChainID())
520+
_, _, err := support.ProcessConfigMsg(env)
521+
if err != nil {
522+
return fmt.Errorf("rejecting config message because = %s", err)
523+
}
524+
525+
// TODO re-submit resulting config message via `Configure`, instead of discarding it. Fix this as part of FAB-5720
526+
// Configure(configUpdateEnv, newConfigEnv, seq)
527+
return fmt.Errorf("discarding stale config message because config seq has advanced")
528+
}
529+
530+
commitConfigMsg(env)
531+
456532
default:
457-
logger.Panicf("[channel: %s] Unsupported message classification: %v", support.ChainID(), class)
533+
return fmt.Errorf("unsupported regular kafka message type: %v", regularMessage.Class.String())
458534
}
535+
459536
return nil
460537
}
461538

0 commit comments

Comments
 (0)