Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple record batches, closes #1022 #1023

Merged
merged 5 commits into from
Jan 22, 2018

Conversation

bobrik
Copy link
Contributor

@bobrik bobrik commented Jan 14, 2018

cc @wladh

@bobrik
Copy link
Contributor Author

bobrik commented Jan 14, 2018

With added logging:

diff --git a/consumer.go b/consumer.go
index 276bc94..bb850f3 100644
--- a/consumer.go
+++ b/consumer.go
@@ -3,6 +3,7 @@ package sarama
 import (
        "errors"
        "fmt"
+       "log"
        "sync"
        "sync/atomic"
        "time"
@@ -619,6 +620,8 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
                }
        }

+       log.Printf("Returning %d messages from %d batches", len(messages), len(block.Records.recordBatchSet.batches))
+
        return messages, nil
 }

I see multiple batches on the consumer as expected (1MB buffer):

2018/01/14 03:34:23 Returning 2144 messages from 2054 batches
2018/01/14 03:34:23 Returning 2136 messages from 2033 batches
2018/01/14 03:34:23 Returning 2136 messages from 2005 batches
2018/01/14 03:34:23 Returning 2136 messages from 1973 batches
2018/01/14 03:34:23 Returning 2134 messages from 2017 batches
2018/01/14 03:34:23 Returning 2134 messages from 2035 batches
2018/01/14 03:34:23 Returning 2137 messages from 2012 batches
2018/01/14 03:34:23 Returning 2134 messages from 2005 batches
2018/01/14 03:34:23 Returning 2159 messages from 1993 batches
2018/01/14 03:34:23 Returning 2139 messages from 1948 batches
2018/01/14 03:34:23 Returning 2144 messages from 2018 batches
2018/01/14 03:34:23 Returning 2141 messages from 2033 batches
2018/01/14 03:34:23 Returning 1911 messages from 1781 batches
2018/01/14 03:34:23 Returning 54 messages from 53 batches
2018/01/14 03:34:23 Returning 13 messages from 13 batches
2018/01/14 03:34:23 Returning 28 messages from 28 batches
2018/01/14 03:34:24 Returning 7 messages from 7 batches
2018/01/14 03:34:24 Returning 50 messages from 48 batches
2018/01/14 03:34:24 Returning 4 messages from 3 batches

This producer is not great at batching, so numbers match here. You can see big number of batches after restart and them smaller batches as consumer is up to date with production rate.

On another consumer with higher throughput and better batching we see this:

2018/01/14 03:38:12 Returning 12673 messages from 165 batches
2018/01/14 03:38:12 Returning 13256 messages from 168 batches
2018/01/14 03:38:12 Returning 12839 messages from 169 batches
2018/01/14 03:38:12 Returning 12519 messages from 162 batches
2018/01/14 03:38:12 Returning 12783 messages from 166 batches
2018/01/14 03:38:12 Returning 12999 messages from 167 batches
2018/01/14 03:38:13 Returning 12856 messages from 163 batches
2018/01/14 03:38:13 Returning 12764 messages from 165 batches
2018/01/14 03:38:13 Returning 12873 messages from 165 batches
2018/01/14 03:38:13 Returning 13179 messages from 166 batches
2018/01/14 03:38:13 Returning 13196 messages from 168 batches
2018/01/14 03:38:13 Returning 10121 messages from 130 batches
2018/01/14 03:38:13 Returning 312 messages from 4 batches
2018/01/14 03:38:13 Returning 253 messages from 3 batches
2018/01/14 03:38:13 Returning 149 messages from 2 batches
2018/01/14 03:38:13 Returning 88 messages from 1 batches
2018/01/14 03:38:13 Returning 236 messages from 3 batches
2018/01/14 03:38:13 Returning 61 messages from 1 batches
2018/01/14 03:38:13 Returning 85 messages from 1 batches

@wladh
Copy link
Contributor

wladh commented Jan 15, 2018

While this fix addresses the issue of multiple record batches being in one response, there could be a mix of record batches and legacy message sets (see issue #1021). Fixing that issue needs to rework how records are implemented, because they aren't just either legacy or new anymore, and the ordering between record batches and legacy messages needs to be preserved.

@bobrik
Copy link
Contributor Author

bobrik commented Jan 16, 2018

That's a great point. I updated the PR to support mixed scenarios.

Tested with 0.10.2.0 and 0.11.0.0 API with the following log:

# /usr/local/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /state/kafka/dev/sarama.test-0/00000000000000000000.log
Dumping /state/kafka/dev/sarama.test-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 NoTimestampType: -1 isvalid: true size: 40 magic: 0 compresscodec: NONE crc: 396773912
offset: 1 position: 40 NoTimestampType: -1 isvalid: true size: 40 magic: 0 compresscodec: NONE crc: 396773912
baseOffset: 2 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 80 CreateTime: 1516069549010 isvalid: true size: 82 magic: 2 compresscodec: NONE crc: 3770695923
baseOffset: 3 lastOffset: 3 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 162 CreateTime: 1516069549037 isvalid: true size: 82 magic: 2 compresscodec: NONE crc: 431266329
baseOffset: 4 lastOffset: 4 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 244 CreateTime: 1516074820185 isvalid: true size: 84 magic: 2 compresscodec: NONE crc: 2860755674
baseOffset: 5 lastOffset: 5 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 328 CreateTime: 1516074820185 isvalid: true size: 84 magic: 2 compresscodec: NONE crc: 2657354307
baseOffset: 7 lastOffset: 7 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 412 CreateTime: 1516074820198 isvalid: true size: 107 magic: 2 compresscodec: NONE crc: 2824390372
offset: 8 position: 519 NoTimestampType: -1 isvalid: true size: 42 magic: 0 compresscodec: NONE crc: 2108329865
offset: 9 position: 561 NoTimestampType: -1 isvalid: true size: 42 magic: 0 compresscodec: NONE crc: 3835944499
offset: 10 position: 603 NoTimestampType: -1 isvalid: true size: 42 magic: 0 compresscodec: NONE crc: 2108329865
offset: 11 position: 645 NoTimestampType: -1 isvalid: true size: 42 magic: 0 compresscodec: NONE crc: 3835944499
baseOffset: 12 lastOffset: 12 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 687 CreateTime: 1516074874413 isvalid: true size: 84 magic: 2 compresscodec: NONE crc: 795684345
baseOffset: 13 lastOffset: 13 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 771 CreateTime: 1516074874413 isvalid: true size: 84 magic: 2 compresscodec: NONE crc: 462062944
baseOffset: 14 lastOffset: 14 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 855 CreateTime: 1516074874420 isvalid: true size: 84 magic: 2 compresscodec: NONE crc: 1948247459
baseOffset: 15 lastOffset: 15 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 939 CreateTime: 1516074874420 isvalid: true size: 84 magic: 2 compresscodec: NONE crc: 1090014522
baseOffset: 16 lastOffset: 17 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 1023 CreateTime: 1516077773608 isvalid: true size: 107 magic: 2 compresscodec: NONE crc: 293091933
baseOffset: 18 lastOffset: 18 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 1130 CreateTime: 1516077773613 isvalid: true size: 84 magic: 2 compresscodec: NONE crc: 696319396
baseOffset: 19 lastOffset: 19 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false position: 1214 CreateTime: 1516077773613 isvalid: true size: 84 magic: 2 compresscodec: NONE crc: 493311293

I've tried both full (1KB covers everything in this log) and short (128B) reads, both work as expected.

b.RecordsSet = []*Records{}

for {
if recordsDecoder.remaining() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for recordsDecoder.remaining() > 0 { would be more idiomatic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would require one more level of nesting, I think it's better to break early and keep it flat:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow; moving the condition into the for loop that already exists wouldn't add any layers of nesting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I misread the first time. Code is changed now.

record_batch.go Outdated
@@ -35,6 +35,47 @@ func (e recordsArray) decode(pd packetDecoder) error {
return nil
}

type RecordBatchSet struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't seem to use this type anywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this is a leftover from the first attempt. Removed.

records.go Outdated
magicOffset = 16
magicLength = 1
)

// Records implements a union type containing either a RecordBatch or a legacy MessageSet.
type Records struct {
recordsType int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I put this field is to make it easier to debug possible inconsistencies between what the Records should contain and what it actually contains. If we want to refactor this, we could do it in a separate PR because I don't think it's related to the problem at hand (unless I'm missing something).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I built on top of my initial changes, that's why many things were missing. I added another commit to minimize the diff with master.

@@ -31,7 +33,8 @@ type FetchResponseBlock struct {
HighWaterMarkOffset int64
LastStableOffset int64
AbortedTransactions []*AbortedTransaction
Records Records
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically it's a breaking change to remove this; could you leave it with a comment that it's deprecated, and just fill it in with the first set or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, let me know if that's what you had in mind.

@eapache
Copy link
Contributor

eapache commented Jan 19, 2018

This looks pretty good to me, just a few comments. Thanks for all your work on this both of you!

@eapache
Copy link
Contributor

eapache commented Jan 22, 2018

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants