Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncronous read from deliveryChan timeouts #466

Closed
4 of 7 tasks
VictorDenisov opened this issue May 18, 2020 · 7 comments
Closed
4 of 7 tasks

Asyncronous read from deliveryChan timeouts #466

VictorDenisov opened this issue May 18, 2020 · 7 comments

Comments

@VictorDenisov
Copy link

VictorDenisov commented May 18, 2020

Description

Consuming from delivery channel occasionally gets stuck when we consume from the channel
in a separate go routing.
If I read from deliveryChannel every time after I send a message the problem disappears.

================

func collectResults(n int, res chan error, deliveryChan chan ck.Event) {
	var err error
	for i := 0; i < n; i++ {
		e := <-deliveryChan
		m := e.(*ck.Message)

		if m.TopicPartition.Error != nil {
			err = grpc.Errorf(codes.Internal, "Failed to save some of the relations")
		}
	}
	res <- err
	close(res)
}

func (t *groupOp) run(ops ...o.RelationOp) (err error) {
	deliveryChan := make(chan ck.Event)

	n := len(ops)

	resCh := make(chan error)

	go t.collectResults(n, resCh, deliveryChan)

	for i := 0; i < n; i++ {
		op := ops[i]
		if err := validateGroup(op.GetGroupId()); err != nil {
			t.ErrorCounter.Inc()
			return err
		}

		for {
			err = t.Producer.Produce(&ck.Message{
				TopicPartition: ck.TopicPartition{
					Topic:     &t.OpTopic,
					Partition: ck.PartitionAny,
				},
				Key:   []byte(op.Key()),
				Value: []byte(op.Value()),
			}, deliveryChan)
			if err == nil {
				break
			}
			if (err.(ck.Error)).Code() != ck.ErrQueueFull {
				return err
			}

			time.Sleep(100 * time.Millisecond)
		}

	}
}

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): 1.1.0
  • Apache Kafka broker version: 2.2.1
  • Client configuration: ConfigMap{...}: ck.NewProducer(&ck.ConfigMap{"bootstrap.servers": strings.Join(kafkaBrokers, ",")})
  • Operating system: inux pop-os 5.3.0-7648-generic OSX: unable to build kafkacat #41158679003618.04~600aeb5-Ubuntu SMP Mon Apr 13 17:47:15 UTC x86_64 x86_64 x86_64 GNU/Linux
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

Try to make the deliveryChan buffered and see if it helps

@VictorDenisov
Copy link
Author

VictorDenisov commented May 19, 2020

It didn't help. It looks like the fact that the channel is shared simultaneously between different calls causes the problem.

@VictorDenisov
Copy link
Author

I tried this code:

func (t *groupOp) collectResults(n int, res chan error, deliveryChan chan ck.Event) {
	timer := p.NewTimer(t.collectResultsTimeHistogram)
	defer timer.ObserveDuration()

	var err error
	for i := 0; i < n; i++ {
		e := <-deliveryChan
		m := e.(*ck.Message)

		log.Tracef("Collected event: %v", m)

		if m.TopicPartition.Error != nil {
			t.ErrorCounter.Inc()
			err = grpc.Errorf(codes.Internal, "Failed to save some of the relations")
		}
	}
	res <- err
	log.Tracef("Finished collecting results")
	close(res)
}

func (t *groupOp) run(ops ...o.RelationOp) (err error) {
	n := len(ops)
	deliveryChan := make(chan ck.Event)

	resCh := make(chan error)

	go t.collectResults(n, resCh, deliveryChan)

	timer := p.NewTimer(t.sendBatchTimeHistogram)

	for _, op := range ops {
		if err := validateGroup(op.GetGroupId()); err != nil {
			t.ErrorCounter.Inc()
			return err
		}

		sync := make(chan error)
		go func() {
			localDeliveryChan := make(chan ck.Event)
			for {
				err := t.Producer.Produce(&ck.Message{
					TopicPartition: ck.TopicPartition{
						Topic:     &t.OpTopic,
						Partition: ck.PartitionAny,
					},
					Key:   []byte(op.Key()),
					Value: []byte(op.Value()),
				}, localDeliveryChan)
				if err == nil {
					break
				}
				if (err.(ck.Error)).Code() != ck.ErrQueueFull {
					log.Errorf("Failed to produce message: %v", err)
					sync <- err
				}
				time.Sleep(100 * time.Millisecond)
			}
			close(sync)
			e := <-localDeliveryChan
			deliveryChan <- e
		}()
		err := <-sync
		if err != nil {
			return err
		}
	}

	log.Tracef("Submitted batch: %v. Waiting for results", len(ops))
	timer.ObserveDuration()

	return <-resCh
}

It still gets stuck occasionally. There is something I don't understand about kafka driver, but I can't quite make out what.

@VictorDenisov
Copy link
Author

It looks like people report various issues with cgo performance: golang/go#19574

@edenhill
Copy link
Contributor

Looking at your code it seems that if produce fails (with anything but QueueFull) it will either get stuck or panic in that go-routine where it sends to the sync channel, since the caller is just reading one err event from sync and then returning.
Maybe there should be a break where you send to sync.

@VictorDenisov
Copy link
Author

@edenhill I rewrote the code without any goroutines:

This code takes 200 microseconds for the batches of 1-2 records.

func (t *groupOp) run(ops ...o.RelationOp) (err error) {
	deliveryChan := make(chan ck.Event)

	timer := p.NewTimer(t.sendBatchTimeHistogram)

	for _, op := range ops {
		if err := validateGroup(op.GetGroupId()); err != nil {
			t.ErrorCounter.Inc()
			return err
		}

		for {
			err = t.Producer.Produce(&ck.Message{
				TopicPartition: ck.TopicPartition{
					Topic:     &t.OpTopic,
					Partition: ck.PartitionAny,
				},
				Key:   []byte(op.Key()),
				Value: []byte(op.Value()),
			}, deliveryChan)
			if err == nil {
				break
			}
			if (err.(ck.Error)).Code() != ck.ErrQueueFull {
				log.Errorf("Failed to produce message: %v", err)
				return err
			}

			e := <-deliveryChan
			m := e.(*ck.Message)

			if m.TopicPartition.Error != nil {
				t.ErrorCounter.Inc()
				err = grpc.Errorf(codes.Internal, "Failed to save some of the relations")
				return err
			}

			time.Sleep(100 * time.Millisecond)
		}

	}

	log.Tracef("Submitted batch: %v. Waiting for results", len(ops))
	timer.ObserveDuration()

	return nil
}

This code takes 50 milliseconds and occasionally jumps to 50 seconds. As you can see this code is synchronous except it waits for the results at the end:

func (t *groupOp) collectResults(n int, deliveryChan chan ck.Event, opId int) error {
	timer := p.NewTimer(t.collectResultsTimeHistogram)
	defer timer.ObserveDuration()

	var err error
	for i := 0; i < n; i++ {
		e := <-deliveryChan
		m := e.(*ck.Message)

		log.Tracef("%v: Collected event: %v", opId, m)

		if m.TopicPartition.Error != nil {
			t.ErrorCounter.Inc()
			err = grpc.Errorf(codes.Internal, "Failed to save some of the relations")
		}
	}
	log.Tracef("%v: Finished collecting results", opId)
	return err
}

func (t *groupOp) run(ops ...o.RelationOp) (err error) {
	opId := rand.Int() % 10000
	n := len(ops)
	deliveryChan := make(chan ck.Event, n)

	timer := p.NewTimer(t.sendBatchTimeHistogram)

	for _, op := range ops {
		if err := validateGroup(op.GetGroupId()); err != nil {
			t.ErrorCounter.Inc()
			return err
		}

		for {
			err := t.Producer.Produce(&ck.Message{
				TopicPartition: ck.TopicPartition{
					Topic:     &t.OpTopic,
					Partition: ck.PartitionAny,
				},
				Key:   []byte(op.Key()),
				Value: []byte(op.Value()),
			}, deliveryChan)
			if err == nil {
				break
			}
			if (err.(ck.Error)).Code() != ck.ErrQueueFull {
				log.Errorf("%v: Failed to produce message: %v", opId, err)
				return err
			}
			time.Sleep(100 * time.Millisecond)
		}
	}

	log.Tracef("%v: Submitted batch: %v. Waiting for results", opId, len(ops))
	timer.ObserveDuration()

	return t.collectResults(n, deliveryChan, opId)
}

@VictorDenisov
Copy link
Author

Sorry, I misplaced the call that retrieves the results:

e := <-deliveryChan
			m := e.(*ck.Message)

			if m.TopicPartition.Error != nil {
				t.ErrorCounter.Inc()
				err = grpc.Errorf(codes.Internal, "Failed to save some of the relations")
				return err
			}

It should have been outside the for loop. Closing the ticket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants