Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server throws exception while producing with low-level api #1013

Closed
imjustfly opened this issue Dec 28, 2017 · 14 comments
Closed

Server throws exception while producing with low-level api #1013

imjustfly opened this issue Dec 28, 2017 · 14 comments

Comments

@imjustfly
Copy link
Contributor

imjustfly commented Dec 28, 2017

Versions

Sarama Version: 1.14.0
Kafka Version: 1.0
Go Version: 1.8

Configuration

default configuration

Logs
[2017-12-27 19:08:00,897] ERROR [KafkaApi-2] Error when handling request {replica_id=-1,max_wait_time=250,min_bytes=1,topics=[{topic=drc.codis.codis-demo,partitions=[{partition=0,fetch_offset=0,max_bytes=32768}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Illegal offset 1 following previous offset 1 (Offsets must increase monotonically).
	at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:396)
	at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586)
	at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134)
	at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109)
	at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
	at scala.Option.map(Option.scala:146)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
	at scala.Option.flatMap(Option.scala:171)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
	at kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
	at kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
	at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
	at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
	at kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)
	at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587)
	at kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
	at kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
	at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
	at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:100)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
	at java.lang.Thread.run(Thread.java:748)
Problem Description

When I send a produce request with batch records, the broker api returns with no err, but server reports exceptions as above (something wrong about offset), and the consumer get a KError -- ErrUnknown . But If there is only one record in batch, every thing seems ok.

This is my code about producing:

func (w *produceWorker) writeKafka(msgs []*localMessage) error {
	batch := &sarama.RecordBatch{
		FirstTimestamp: time.Now(), // not precise
		Version:        2,          // kafka version > 0.11, batch version is 2
		ProducerID:     -1,         // No producer id
		Codec:          sarama.CompressionNone,
		Records:        make([]*sarama.Record, 0, len(msgs)),
	}
	for _, msg := range msgs {
		batch.Records = append(batch.Records, &sarama.Record{
			Value: msg.Value,
		})
	}
	request := &sarama.ProduceRequest{
		RequiredAcks: sarama.WaitForAll, // all
		Timeout:      10 * 1000,
		Version:      3, // kafka version > 0.11, request version is 3
	}
	request.AddBatch(w.topic, w.partition, batch)

	response, err := w.broker.Produce(request)
	if err != nil {
		return err
	}
	block := response.GetBlock(w.topic, w.partition)
	if block == nil {
		return fmt.Errorf("result for current partition not received")
	}
	if block.Err != sarama.ErrNoError {
		return block.Err
	}
	return nil
}

Did I miss something?

@imjustfly imjustfly changed the title Server throw exception while producing with low-level api Server throws exception while producing with low-level api Dec 30, 2017
@eapache
Copy link
Contributor

eapache commented Dec 31, 2017

Hmm, I don't see anything wrong with your code. Perhaps kafka requires OffsetDelta to be set on records in uncompressed batches now? It's worth trying. But I'd have expected to run into this with the regular Sarama producer in that case...

@eapache
Copy link
Contributor

eapache commented Dec 31, 2017

#1002 and #1015 are only optimizations, but they might be related.

@imjustfly
Copy link
Contributor Author

imjustfly commented Jan 1, 2018

Thanks for your response!

More information: The Value of Record is encoded by msgpack, and the batch codec is set to None.

Is this OK? I'll try it tomorrow.

And, I've tried set OffsetDelta of every record, but this lead to another problem. It seems that the consumer's offset is wrong:

sarama consumer's log:

consumer/drc.codis.codis-demo/0 shutting down because kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition.

The consumer keeps shutting down itself because it's request offset is always greater than server's offset. Weird.

broker log:

...
[2017-12-28 20:32:14,511] INFO [GroupCoordinator 0]: Assignment received from leader for group drc.codis.codis-demo_applier for generation 52 (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,618] INFO [GroupCoordinator 0]: Preparing to rebalance group drc.codis.codis-demo_applier with old generation 52 (__consumer_offsets-43) (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,618] INFO [GroupCoordinator 0]: Stabilized group drc.codis.codis-demo_applier generation 53 (__consumer_offsets-43) (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,619] INFO [GroupCoordinator 0]: Assignment received from leader for group drc.codis.codis-demo_applier for generation 53 (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,726] INFO [GroupCoordinator 0]: Preparing to rebalance group drc.codis.codis-demo_applier with old generation 53 (__consumer_offsets-43) (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,726] INFO [GroupCoordinator 0]: Stabilized group drc.codis.codis-demo_applier generation 54 (__consumer_offsets-43) (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,727] INFO [GroupCoordinator 0]: Assignment received from leader for group drc.codis.codis-demo_applier for generation 54 (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,836] INFO [GroupCoordinator 0]: Preparing to rebalance group drc.codis.codis-demo_applier with old generation 54 (__consumer_offsets-43) (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,836] INFO [GroupCoordinator 0]: Stabilized group drc.codis.codis-demo_applier generation 55 (__consumer_offsets-43) (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,836] INFO [GroupCoordinator 0]: Assignment received from leader for group drc.codis.codis-demo_applier for generation 55 (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,899] INFO [GroupCoordinator 0]: Preparing to rebalance group drc.codis.codis-demo_applier with old generation 55 (__consumer_offsets-43) (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,899] INFO [GroupCoordinator 0]: Stabilized group drc.codis.codis-demo_applier generation 56 (__consumer_offsets-43) (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,900] INFO [GroupCoordinator 0]: Assignment received from leader for group drc.codis.codis-demo_applier for generation 56 (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,904] INFO [GroupCoordinator 0]: Preparing to rebalance group drc.codis.codis-demo_applier with old generation 56 (__consumer_offsets-43) (kafka.coordinator.group.GroupCoordinator)
[2017-12-28 20:32:14,904] INFO [GroupCoordinator 0]: Group drc.codis.codis-demo_applier with generation 57 is now empty (__consumer_offsets-43) (kafka.coordinator.group.GroupCoordinator)
...

@eapache
Copy link
Contributor

eapache commented Jan 1, 2018

That is weird. It looks like you are using consumer groups - are you using https://github.com/bsm/sarama-cluster ? They would have a better idea how to look into the consumer offset issue.

@imjustfly
Copy link
Contributor Author

Yes. I'm using sarama cluster. I will have a try.

@imjustfly
Copy link
Contributor Author

I've tested on kafka 0.11 and 1.0:

  1. uncompressed record without OffsetDelta will lead to a server exception.
  2. compressed record without OffsetDelta is OK
  3. no matter the record is compressed or not , if OffsetDelta is set , consumer will receive an offset error

So now, I avoid this problem by compressing the record and not setting the OffsetDelta. And I 've reported this issue to sarama cluster. Waiting for their response.

THX!

@eapache
Copy link
Contributor

eapache commented Jan 2, 2018

cc @bobrik @emfree on the off-chance the above seems familiar to you, since you added the Offset/OffsetDelta stuff recently

@bobrik
Copy link
Contributor

bobrik commented Jan 3, 2018

1.14.0 is from November, it doesn't have patches for OffsetDelta. Have you tried the latest master?

In my tests I've been using Sarama from master for producer with 1.0.0 API and very old Sarama and 0.8.0 (the oldest supported version, which is the default) consumer.

@imjustfly
Copy link
Contributor Author

imjustfly commented Jan 3, 2018

@bobrik I've tried the latest master, but I think there is nothing to do with the version and OffsetDelta patches. I produce message with the low-level api, not default sarama producer.

I don't know whether this is a sarama issue or a sarama cluster issue:

bsm/sarama-cluster#204

@imjustfly
Copy link
Contributor Author

imjustfly commented Jan 3, 2018

And I find something:

If OffsetDelta of Record is set, topic offset starts from 1. But if not set, topic offset starts from 0.

OffsetDelta set:

~|⇒ go run test.go
MSG: topic-x-0/0 "DATA-0001"
MSG: topic-x-0/1 "DATA-0002"

OffsetDelta not set:

~|⇒ go run test.go
MSG: topic-x-0/1 "DATA-0001"
MSG: topic-x-0/2 "DATA-0002"

test.go:

package main

import (
	"fmt"
	"time"

	"github.com/Shopify/sarama"
	"github.com/eapache/go-xerial-snappy"
)

func run(addrs []string, topic string) error {
	config := sarama.NewConfig()
	config.Version = sarama.V0_11_0_0
	config.Consumer.Return.Errors = true

	client, err := sarama.NewClient(addrs, config)
	if err != nil {
		return err
	}
	defer client.Close()

	broker, err := client.Leader(topic, 0)
	if err != nil {
		return err
	}
	defer broker.Close()

	req := &sarama.ProduceRequest{
		RequiredAcks: sarama.WaitForAll,
		Timeout:      10 * 1000,
		Version:      3,
	}
	req.AddBatch(topic, 0, &sarama.RecordBatch{
		FirstTimestamp: time.Now(),
		Version:        2,
		ProducerID:     -1,
		Codec:          sarama.CompressionSnappy,
		Records: []*sarama.Record{
			{OffsetDelta: 0, Value: snappy.Encode([]byte("DATA-0001"))},
			{OffsetDelta: 1, Value: snappy.Encode([]byte("DATA-0002"))},
		},
	})

	if _, err := broker.Produce(req); err != nil {
		return err
	}

	consumer, err := sarama.NewConsumerFromClient(client)
	if err != nil {
		return err
	}
	defer consumer.Close()

	pc, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
	if err != nil {
		return err
	}
	defer pc.Close()

	go func() {
		for err := range pc.Errors() {
			fmt.Println("ERR", err)
		}
	}()

	for msg := range pc.Messages() {
		value, err := snappy.Decode(msg.Value)
		if err != nil {
			return err
		}
		fmt.Printf("MSG: %s-%d/%d %q\n", msg.Topic, msg.Partition, msg.Offset, value)
	}
	return nil
}

func main() {
	if err := run([]string{"192.168.6.151:9092"}, "topic-x"); err != nil {
		fmt.Println("FATAL", err)
	}
}

@imjustfly
Copy link
Contributor Author

I think I find the reason.

If OffsetDelta is set and the offset is n, when consuming, messages are returned from offset n+1. But if OffsetDelta is not set, the messages are returned from offset n.

@eapache
Copy link
Contributor

eapache commented Jan 4, 2018

That makes sense, since the offset delta is added to the base offset. What I don’t understand is why the delta needs to be set at all for uncompressed messages.

@eapache
Copy link
Contributor

eapache commented Jan 22, 2018

@imjustfly Could you try with #1026? It seems like it might be related.

@eapache
Copy link
Contributor

eapache commented Feb 9, 2018

I think on reflection this is basically the same issue as #1032, so I'll roll it up there.

@eapache eapache closed this as completed Feb 9, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants