Skip to content

Commit 230f3cc

Browse files
JonathanLeviGerrit Code Review
authored andcommitted
Merge "[FAB-1623] Add restart support to Kafka orderer"
2 parents d40e10c + 2f0aa7d commit 230f3cc

File tree

6 files changed

+219
-72
lines changed

6 files changed

+219
-72
lines changed

orderer/kafka/orderer.go

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,19 @@ type consenterImpl struct {
7979
// Implements the multichain.Consenter interface. Called by multichain.newChainSupport(), which
8080
// is itself called by multichain.NewManagerImpl() when ranging over the ledgerFactory's existingChains.
8181
func (co *consenterImpl) HandleChain(cs multichain.ConsenterSupport, metadata *cb.Metadata) (multichain.Chain, error) {
82-
return newChain(co, cs), nil
82+
return newChain(co, cs, getLastOffsetPersisted(metadata)), nil
83+
}
84+
85+
func getLastOffsetPersisted(metadata *cb.Metadata) int64 {
86+
if metadata.Value != nil {
87+
// Extract orderer-related metadata from the tip of the ledger first
88+
kafkaMetadata := &ab.KafkaMetadata{}
89+
if err := proto.Unmarshal(metadata.Value, kafkaMetadata); err != nil {
90+
panic("Ledger may be corrupted: cannot unmarshal orderer metadata in most recent block")
91+
}
92+
return kafkaMetadata.LastOffsetPersisted
93+
}
94+
return (sarama.OffsetOldest - 1) // default
8395
}
8496

8597
// When testing we need to inject our own broker/producer/consumer.
@@ -90,18 +102,19 @@ func (co *consenterImpl) HandleChain(cs multichain.ConsenterSupport, metadata *c
90102
// definition of an interface (see testableConsenter below) that will
91103
// be satisfied by both the actual and the mock object and will allow
92104
// us to retrieve these constructors.
93-
func newChain(consenter testableConsenter, support multichain.ConsenterSupport) *chainImpl {
105+
func newChain(consenter testableConsenter, support multichain.ConsenterSupport, lastOffsetPersisted int64) *chainImpl {
106+
logger.Debug("Starting chain with last persisted offset:", lastOffsetPersisted)
94107
return &chainImpl{
95-
consenter: consenter,
96-
support: support,
97-
partition: newChainPartition(support.ChainID(), rawPartition),
98-
batchTimeout: support.SharedConfig().BatchTimeout(),
99-
lastProcessed: sarama.OffsetOldest - 1, // TODO This should be retrieved by ConsenterSupport; also see note in loop() below
100-
producer: consenter.prodFunc()(support.SharedConfig().KafkaBrokers(), consenter.kafkaVersion(), consenter.retryOptions()),
101-
halted: false, // Redundant as the default value for booleans is false but added for readability
102-
exitChan: make(chan struct{}),
103-
haltedChan: make(chan struct{}),
104-
setupChan: make(chan struct{}),
108+
consenter: consenter,
109+
support: support,
110+
partition: newChainPartition(support.ChainID(), rawPartition),
111+
batchTimeout: support.SharedConfig().BatchTimeout(),
112+
lastOffsetPersisted: lastOffsetPersisted,
113+
producer: consenter.prodFunc()(support.SharedConfig().KafkaBrokers(), consenter.kafkaVersion(), consenter.retryOptions()),
114+
halted: false, // Redundant as the default value for booleans is false but added for readability
115+
exitChan: make(chan struct{}),
116+
haltedChan: make(chan struct{}),
117+
setupChan: make(chan struct{}),
105118
}
106119
}
107120

@@ -125,10 +138,10 @@ type chainImpl struct {
125138
consenter testableConsenter
126139
support multichain.ConsenterSupport
127140

128-
partition ChainPartition
129-
batchTimeout time.Duration
130-
lastProcessed int64
131-
lastCutBlock uint64
141+
partition ChainPartition
142+
batchTimeout time.Duration
143+
lastOffsetPersisted int64
144+
lastCutBlock uint64
132145

133146
producer Producer
134147
consumer Consumer
@@ -156,9 +169,7 @@ func (ch *chainImpl) Start() {
156169
}
157170

158171
// 2. Set up the listener/consumer for this partition.
159-
// TODO When restart support gets added to the common components level, start
160-
// the consumer from lastProcessed. For now, hard-code to oldest available.
161-
consumer, err := ch.consenter.consFunc()(ch.support.SharedConfig().KafkaBrokers(), ch.consenter.kafkaVersion(), ch.partition, ch.lastProcessed+1)
172+
consumer, err := ch.consenter.consFunc()(ch.support.SharedConfig().KafkaBrokers(), ch.consenter.kafkaVersion(), ch.partition, ch.lastOffsetPersisted+1)
162173
if err != nil {
163174
logger.Criticalf("Cannot retrieve required offset from Kafka cluster for chain %s: %s", ch.partition, err)
164175
close(ch.exitChan)
@@ -206,6 +217,7 @@ func (ch *chainImpl) loop() {
206217
msg := new(ab.KafkaMessage)
207218
var timer <-chan time.Time
208219
var ttcNumber uint64
220+
var encodedLastOffsetPersisted []byte
209221

210222
defer close(ch.haltedChan)
211223
defer ch.producer.Close()
@@ -237,7 +249,8 @@ func (ch *chainImpl) loop() {
237249
return
238250
}
239251
block := ch.support.CreateNextBlock(batch)
240-
ch.support.WriteBlock(block, committers, nil)
252+
encodedLastOffsetPersisted = utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: in.Offset})
253+
ch.support.WriteBlock(block, committers, encodedLastOffsetPersisted)
241254
ch.lastCutBlock++
242255
logger.Debug("Proper time-to-cut received, just cut block", ch.lastCutBlock)
243256
continue
@@ -264,7 +277,8 @@ func (ch *chainImpl) loop() {
264277
// If !ok, batches == nil, so this will be skipped
265278
for i, batch := range batches {
266279
block := ch.support.CreateNextBlock(batch)
267-
ch.support.WriteBlock(block, committers[i], nil)
280+
encodedLastOffsetPersisted = utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: in.Offset})
281+
ch.support.WriteBlock(block, committers[i], encodedLastOffsetPersisted)
268282
ch.lastCutBlock++
269283
logger.Debug("Batch filled, just cut block", ch.lastCutBlock)
270284
}

orderer/kafka/orderer_test.go

Lines changed: 124 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package kafka
1818

1919
import (
20+
"fmt"
2021
"sync"
2122
"testing"
2223
"time"
@@ -39,30 +40,29 @@ func newMockSharedConfigManager() *mocksharedconfig.Manager {
3940
return &mocksharedconfig.Manager{KafkaBrokersVal: testConf.Kafka.Brokers}
4041
}
4142

42-
func syncQueueMessage(msg *cb.Envelope, chain multichain.Chain, bc *mockblockcutter.Receiver) {
43-
chain.Enqueue(msg)
44-
bc.Block <- struct{}{}
45-
}
46-
4743
type mockConsenterImpl struct {
4844
consenterImpl
4945
prodDisk, consDisk chan *ab.KafkaMessage
5046
consumerSetUp bool
5147
t *testing.T
5248
}
5349

54-
func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) *mockConsenterImpl {
50+
func mockNewConsenter(t *testing.T, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry, nextProducedOffset int64) *mockConsenterImpl {
5551
prodDisk := make(chan *ab.KafkaMessage)
5652
consDisk := make(chan *ab.KafkaMessage)
5753

5854
mockBfValue := func(brokers []string, cp ChainPartition) (Broker, error) {
5955
return mockNewBroker(t, cp)
6056
}
6157
mockPfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, retryOptions config.Retry) Producer {
62-
return mockNewProducer(t, cp, testOldestOffset, prodDisk)
58+
// The first Send on this producer will return a blob with offset #nextProducedOffset
59+
return mockNewProducer(t, cp, nextProducedOffset, prodDisk)
6360
}
64-
mockCfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, offset int64) (Consumer, error) {
65-
return mockNewConsumer(t, cp, offset, consDisk)
61+
mockCfValue := func(brokers []string, kafkaVersion sarama.KafkaVersion, cp ChainPartition, lastPersistedOffset int64) (Consumer, error) {
62+
if lastPersistedOffset != nextProducedOffset {
63+
panic(fmt.Errorf("Mock objects about to be set up incorrectly (consumer to seek to %d, producer to post %d)", lastPersistedOffset, nextProducedOffset))
64+
}
65+
return mockNewConsumer(t, cp, lastPersistedOffset, consDisk)
6666
}
6767

6868
return &mockConsenterImpl{
@@ -96,6 +96,11 @@ func prepareMockObjectDisks(t *testing.T, co *mockConsenterImpl, ch *chainImpl)
9696
}
9797
}
9898

99+
func syncQueueMessage(msg *cb.Envelope, chain multichain.Chain, bc *mockblockcutter.Receiver) {
100+
chain.Enqueue(msg)
101+
bc.Block <- struct{}{}
102+
}
103+
99104
func waitableSyncQueueMessage(env *cb.Envelope, messagesToPickUp int, wg *sync.WaitGroup,
100105
co *mockConsenterImpl, cs *mockmultichain.ConsenterSupport, ch *chainImpl) {
101106
wg.Add(1)
@@ -128,9 +133,10 @@ func TestKafkaConsenterEmptyBatch(t *testing.T) {
128133
}
129134
defer close(cs.BlockCutterVal.Block)
130135

131-
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry)
132-
ch := newChain(co, cs)
133-
ch.lastProcessed = testOldestOffset - 1
136+
lastPersistedOffset := testOldestOffset - 1
137+
nextProducedOffset := lastPersistedOffset + 1
138+
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset)
139+
ch := newChain(co, cs, lastPersistedOffset)
134140

135141
go ch.Start()
136142
defer ch.Halt()
@@ -162,9 +168,10 @@ func TestKafkaConsenterBatchTimer(t *testing.T) {
162168
}
163169
defer close(cs.BlockCutterVal.Block)
164170

165-
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry)
166-
ch := newChain(co, cs)
167-
ch.lastProcessed = testOldestOffset - 1
171+
lastPersistedOffset := testOldestOffset - 1
172+
nextProducedOffset := lastPersistedOffset + 1
173+
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset)
174+
ch := newChain(co, cs, lastPersistedOffset)
168175

169176
go ch.Start()
170177
defer ch.Halt()
@@ -213,9 +220,10 @@ func TestKafkaConsenterTimerHaltOnFilledBatch(t *testing.T) {
213220
}
214221
defer close(cs.BlockCutterVal.Block)
215222

216-
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry)
217-
ch := newChain(co, cs)
218-
ch.lastProcessed = testOldestOffset - 1
223+
lastPersistedOffset := testOldestOffset - 1
224+
nextProducedOffset := lastPersistedOffset + 1
225+
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset)
226+
ch := newChain(co, cs, lastPersistedOffset)
219227

220228
go ch.Start()
221229
defer ch.Halt()
@@ -272,9 +280,10 @@ func TestKafkaConsenterConfigStyleMultiBatch(t *testing.T) {
272280
}
273281
defer close(cs.BlockCutterVal.Block)
274282

275-
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry)
276-
ch := newChain(co, cs)
277-
ch.lastProcessed = testOldestOffset - 1
283+
lastPersistedOffset := testOldestOffset - 1
284+
nextProducedOffset := lastPersistedOffset + 1
285+
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset)
286+
ch := newChain(co, cs, lastPersistedOffset)
278287

279288
go ch.Start()
280289
defer ch.Halt()
@@ -321,9 +330,10 @@ func TestKafkaConsenterTimeToCutForced(t *testing.T) {
321330
}
322331
defer close(cs.BlockCutterVal.Block)
323332

324-
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry)
325-
ch := newChain(co, cs)
326-
ch.lastProcessed = testOldestOffset - 1
333+
lastPersistedOffset := testOldestOffset - 1
334+
nextProducedOffset := lastPersistedOffset + 1
335+
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset)
336+
ch := newChain(co, cs, lastPersistedOffset)
327337

328338
go ch.Start()
329339
defer ch.Halt()
@@ -377,9 +387,10 @@ func TestKafkaConsenterTimeToCutDuplicate(t *testing.T) {
377387
}
378388
defer close(cs.BlockCutterVal.Block)
379389

380-
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry)
381-
ch := newChain(co, cs)
382-
ch.lastProcessed = testOldestOffset - 1
390+
lastPersistedOffset := testOldestOffset - 1
391+
nextProducedOffset := lastPersistedOffset + 1
392+
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset)
393+
ch := newChain(co, cs, lastPersistedOffset)
383394

384395
go ch.Start()
385396
defer ch.Halt()
@@ -465,9 +476,10 @@ func TestKafkaConsenterTimeToCutStale(t *testing.T) {
465476
}
466477
defer close(cs.BlockCutterVal.Block)
467478

468-
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry)
469-
ch := newChain(co, cs)
470-
ch.lastProcessed = testOldestOffset - 1
479+
lastPersistedOffset := testOldestOffset - 1
480+
nextProducedOffset := lastPersistedOffset + 1
481+
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset)
482+
ch := newChain(co, cs, lastPersistedOffset)
471483

472484
go ch.Start()
473485
defer ch.Halt()
@@ -523,9 +535,10 @@ func TestKafkaConsenterTimeToCutLarger(t *testing.T) {
523535
}
524536
defer close(cs.BlockCutterVal.Block)
525537

526-
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry)
527-
ch := newChain(co, cs)
528-
ch.lastProcessed = testOldestOffset - 1
538+
lastPersistedOffset := testOldestOffset - 1
539+
nextProducedOffset := lastPersistedOffset + 1
540+
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset)
541+
ch := newChain(co, cs, lastPersistedOffset)
529542

530543
go ch.Start()
531544
defer ch.Halt()
@@ -574,3 +587,81 @@ func TestKafkaConsenterTimeToCutLarger(t *testing.T) {
574587
case <-ch.haltedChan: // If we're here, we definitely had a chance to invoke Append but didn't (which is great)
575588
}
576589
}
590+
591+
func TestGetLastOffsetPersistedEmpty(t *testing.T) {
592+
expected := sarama.OffsetOldest - 1
593+
actual := getLastOffsetPersisted(&cb.Metadata{})
594+
if actual != expected {
595+
t.Fatalf("Expected last offset %d, got %d", expected, actual)
596+
}
597+
}
598+
599+
func TestGetLastOffsetPersistedRight(t *testing.T) {
600+
expected := int64(100)
601+
actual := getLastOffsetPersisted(&cb.Metadata{Value: utils.MarshalOrPanic(&ab.KafkaMetadata{LastOffsetPersisted: expected})})
602+
if actual != expected {
603+
t.Fatalf("Expected last offset %d, got %d", expected, actual)
604+
}
605+
}
606+
607+
func TestKafkaConsenterRestart(t *testing.T) {
608+
var wg sync.WaitGroup
609+
defer wg.Wait()
610+
611+
batchTimeout, _ := time.ParseDuration("1ms")
612+
cs := &mockmultichain.ConsenterSupport{
613+
Batches: make(chan []*cb.Envelope),
614+
BlockCutterVal: mockblockcutter.NewReceiver(),
615+
ChainIDVal: provisional.TestChainID,
616+
SharedConfigVal: &mocksharedconfig.Manager{BatchTimeoutVal: batchTimeout},
617+
}
618+
defer close(cs.BlockCutterVal.Block)
619+
620+
lastPersistedOffset := testOldestOffset - 1
621+
nextProducedOffset := lastPersistedOffset + 1
622+
co := mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset)
623+
ch := newChain(co, cs, lastPersistedOffset)
624+
625+
go ch.Start()
626+
defer ch.Halt()
627+
628+
prepareMockObjectDisks(t, co, ch)
629+
630+
// The second message that will be picked up is the time-to-cut message
631+
// that will be posted when the short timer expires
632+
waitableSyncQueueMessage(newTestEnvelope("one"), 2, &wg, co, cs, ch)
633+
634+
select {
635+
case <-cs.Batches: // This is the success path
636+
case <-time.After(testTimePadding):
637+
t.Fatal("Expected block to be cut because batch timer expired")
638+
}
639+
640+
// Stop the loop
641+
ch.Halt()
642+
643+
select {
644+
case <-cs.Batches:
645+
t.Fatal("Expected no invocations of Append")
646+
case <-ch.haltedChan: // If we're here, we definitely had a chance to invoke Append but didn't (which is great)
647+
}
648+
649+
lastBlock := cs.WriteBlockVal
650+
metadata, err := utils.GetMetadataFromBlock(lastBlock, cb.BlockMetadataIndex_ORDERER)
651+
if err != nil {
652+
logger.Fatalf("Error extracting orderer metadata for chain %x: %s", cs.ChainIDVal, err)
653+
}
654+
655+
lastPersistedOffset = getLastOffsetPersisted(metadata)
656+
nextProducedOffset = lastPersistedOffset + 1
657+
658+
co = mockNewConsenter(t, testConf.Kafka.Version, testConf.Kafka.Retry, nextProducedOffset)
659+
ch = newChain(co, cs, lastPersistedOffset)
660+
go ch.Start()
661+
prepareMockObjectDisks(t, co, ch)
662+
663+
actual := ch.producer.(*mockProducerImpl).producedOffset
664+
if actual != nextProducedOffset {
665+
t.Fatalf("Restarted orderer post-connect should have been at offset %d, got %d instead", nextProducedOffset, actual)
666+
}
667+
}

orderer/multichain/chainsupport.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ func newChainSupport(
127127
if err != nil {
128128
logger.Fatalf("Error extracting orderer metadata for chain %x: %s", cs.ChainID(), err)
129129
}
130+
logger.Debugf("Retrieved metadata for tip of chain (block #%d): %+v", cs.Reader().Height()-1, metadata)
130131

131132
cs.chain, err = consenter.HandleChain(cs, metadata)
132133
if err != nil {

protos/orderer/ab.pb.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)