Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting confirmation of delivery takes 50 seconds. #469

Closed
4 of 7 tasks
VictorDenisov opened this issue May 20, 2020 · 5 comments
Closed
4 of 7 tasks

Getting confirmation of delivery takes 50 seconds. #469

VictorDenisov opened this issue May 20, 2020 · 5 comments

Comments

@VictorDenisov
Copy link

Description

This issue is a follow up on this issue:

#466

All solutions described in the previous issue invariably took 50 seconds from time to time.

After rewriting this code to use kafka client from segment: https://github.com/segmentio/kafka-go
It solved the issue.

I understand that it's possible that kafka client from segment is just mishandling some issues that are present in the kafka cluster. I thought that I would just share this suspicious behaviour.

How to reproduce

Code with confluent client:

func (t *groupOp) collectResults(n int, deliveryChan chan ck.Event, opId int) error {
	timer := p.NewTimer(t.collectResultsTimeHistogram)
	defer timer.ObserveDuration()

	var err error
	for i := 0; i < n; i++ {
		e := <-deliveryChan
		m := e.(*ck.Message)

		log.Tracef("%v: Collected event: %v", opId, m)

		if m.TopicPartition.Error != nil {
			t.ErrorCounter.Inc()
			err = grpc.Errorf(codes.Internal, "Failed to save some of the relations")
		}
	}
	log.Tracef("%v: Finished collecting results", opId)
	return err
}

func (t *groupOp) run(ops ...o.RelationOp) (err error) {
	opId := rand.Int() % 10000
	n := len(ops)
	deliveryChan := make(chan ck.Event, n)

	timer := p.NewTimer(t.sendBatchTimeHistogram)

	for _, op := range ops {
		if err := validateGroup(op.GetGroupId()); err != nil {
			t.ErrorCounter.Inc()
			return err
		}

		for {
			err := t.Producer.Produce(&ck.Message{
				TopicPartition: ck.TopicPartition{
					Topic:     &t.OpTopic,
					Partition: ck.PartitionAny,
				},
				Key:   []byte(op.Key()),
				Value: []byte(op.Value()),
			}, deliveryChan)
			if err == nil {
				break
			}
			if (err.(ck.Error)).Code() != ck.ErrQueueFull {
				log.Errorf("%v: Failed to produce message: %v", opId, err)
				return err
			}
			time.Sleep(100 * time.Millisecond)
		}
	}

	log.Tracef("%v: Submitted batch: %v. Waiting for results", opId, len(ops))
	timer.ObserveDuration()

	return t.collectResults(n, deliveryChan, opId)
}

Code that constructs producer:

	producer, err := ck.NewProducer(&ck.ConfigMap{
		"bootstrap.servers": strings.Join(kafkaBrokers, ",")
	})

Code that uses segmentio client:

func (t *groupOp) run(ops ...o.RelationOp) (err error) {
	timer := p.NewTimer(t.sendBatchTimeHistogram)

	var messages []k.Message
	for _, op := range ops {
		if err := validateGroup(op.GetGroupId()); err != nil {
			t.ErrorCounter.Inc()
			return err
		}
		messages = append(messages, k.Message{
			Key:   []byte(op.Key()),
			Value: []byte(op.Value()),
		})
	}

	err = t.OpWriter.WriteMessages(context.Background(), messages...)

	log.Tracef("Submitted batch: %v. Waiting for results", len(ops))
	timer.ObserveDuration()

	return nil
}

Code that constructs segementio writer:

	opWriter := kafka.NewWriter(kafka.WriterConfig{
		Brokers:      kafkaBrokers,
		Topic:        applicationJson.Kafka.OpTopic,
		Balancer:     &kafka.Hash{},
		MaxAttempts:  oneMonthInSecs,
		BatchSize:    1000,
		BatchTimeout: 5 * time.Millisecond,
	})

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): 1.1.0
  • Apache Kafka broker version: 2.2.1
  • Client configuration: ConfigMap{...}: ck.NewProducer(&ck.ConfigMap{"bootstrap.servers": strings.Join(kafkaBrokers, ",")})
  • Operating system: inux pop-os 5.3.0-7648-generic #41158679003618.04~600aeb5-Ubuntu SMP Mon Apr 13 17:47:15 UTC x86_64 x86_64 x86_64 GNU/Linux
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@VictorDenisov
Copy link
Author

Is there any update for it?

@edenhill
Copy link
Contributor

edenhill commented Aug 17, 2020

Can you reproduce this with "debug": "broker,protocol,msg" enabled?

@VictorDenisov
Copy link
Author

Sounds good. I'll try reproducing it.

@VictorDenisov
Copy link
Author

Strangely enough I can't reproduce it any more. I'm running some experiments. I'll report the results once I have them.

@milindl
Copy link
Contributor

milindl commented Mar 5, 2024

Closing this as it's been a while - please reopen if this can be reproduced with the debug logs in the latest version.

@milindl milindl closed this as completed Mar 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants