Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close function runs before Completion function when writing async #778

Closed
Sparkz0629 opened this issue Nov 3, 2021 · 4 comments · Fixed by #805
Closed

Close function runs before Completion function when writing async #778

Sparkz0629 opened this issue Nov 3, 2021 · 4 comments · Fixed by #805
Assignees
Labels

Comments

@Sparkz0629
Copy link

Describe the bug
When making use of the Completion function on an async write, there is a race condition between the Completion function and the Close function.
This results in us not receiving the offset information from the async write response channel reliably on small batch counts.

Kafka Version
2.8.0

To Reproduce
Creation of the kafka writer:

func newKafkaWriter(config *util.Config) *kafka.Writer {
	batchTimeout := time.Duration(5) * time.Second
	writeTimeout := time.Duration(120) * time.Second
	readTimeout := time.Duration(180) * time.Second
	return kafka.NewWriter(kafka.WriterConfig{
		Brokers:  []string{config.Kafka.Broker},
		Balancer: &kafka.RoundRobin{},
		Dialer:   newKafkaDialer(config),
		BatchSize:    50,
		BatchBytes:   20971520,
		BatchTimeout: batchTimeout,
		RequiredAcks: 0,
		Async:        true,
		MaxAttempts:  2,
		ErrorLogger:  log.Default(), 
		WriteTimeout: writeTimeout,
		ReadTimeout:  readTimeout,
	})
}

Creation of the completion function:

func (kw *KafkaWriter) kafkaCompletion() func(messages []kafka.Message, err error) {
      return func(messages []kafka.Message, err error) {
		if err != nil {
			kw.errCh <- err
		}
		for _, message := range messages {
			kw.responseCh <- KafkaTopicOffsetPartition{
				Topic:     message.Topic,
				Partition: message.Partition,
				Offset:    message.Offset,
			}
		}
	}
}

The WriteMessage function:

func (kw *KafkaWriter) WriteMessage(topic string, batchId string, value string, writeResponse bool) error {
	if kw.kwriter == nil {
		kw.kwriter = newKafkaWriter(kw.Config)
	}
	kafkaMsg := kafka.Message{
		Topic: topic,
		Value: []byte(value),
		Headers: []kafka.Header{{
			Key:   "batchId",
			Value: []byte(batchId),
		}},
	}
	kw.kwriter.Completion = kw.kafkaCompletion()
	kerr := kw.kwriter.WriteMessages(context.Background(), kafkaMsg)
	if kerr != nil {
		fmt.Println(kerr)
		fmt.Println("Establishing new writer kafka connection")
		kw.kwriter = newKafkaWriter(kw.Config)
		kw.kwriter.Completion = kw.kafkaCompletion()
		err := kw.kwriter.WriteMessages(context.Background(), kafkaMsg)
		if err != nil {
			fmt.Println("Failed to write Kafka message to topic [" + kafkaMsg.Topic + "] - " + err.Error())
			return err
		}
	}
	return nil
}

The for loop to loop through the responseChannel and get the earliest offset for each partition:

for response := range responseCh {
			if _, ok := offsetMap[response.Partition]; !ok || response.Offset < offsetMap[response.Partition] {
                                offsetMap[response.Partition] = response.Offset
			}
		}

The creation of the response channel:

responseCh := make(chan service.KafkaTopicOffsetPartition, 50)

The KafkaTopicOffsetPartition struct:

	Topic     string
	Partition int
	Offset    int64
}

Expected behavior
I would expect the close function to be blocked by the Completion function so that we are able to reliably get the offset information from the response channel;.

Additional context
This only happens on small batches ( < 20 records) and it is intermittent. The failure rate is around 50%.

@Sparkz0629 Sparkz0629 added the bug label Nov 3, 2021
@achille-roussel achille-roussel changed the title Close function runs before Completion function when writing asynch Close function runs before Completion function when writing async Nov 12, 2021
@achille-roussel
Copy link
Contributor

Hello @Sparkz0629, thanks for opening the issue!

From the examples you shared, it is not clear how the completion function gets invoked. Would you be able to share a bit more details?

@achille-roussel achille-roussel self-assigned this Nov 12, 2021
@rachitnovostack
Copy link

Facing the same issue

panic: sync: WaitGroup is reused before previous Wait has returned

goroutine 237 [running]:
sync.(*WaitGroup).Wait(0xc000160768)
/usr/local/go/src/sync/waitgroup.go:132 +0xae
github.com/segmentio/kafka-go.(*partitionWriter).close(0xc0001606e0)
/Users/developer/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.23/writer.go:1150 +0x91
github.com/segmentio/kafka-go.(*Writer).Close(0xc0003b8b00, 0xc0003b8b00, 0x0)
/Users/developer/go/pkg/mod/github.com/segmentio/kafka-go@v0.4.23/writer.go:518 +0xc5

This comes on the terminal.

@achille-roussel
Copy link
Contributor

I opened #805 to resolve the issue. Let us know if you are still experiencing the problem with the fix.

@Sparkz0629
Copy link
Author

@achille-roussel . apologies for the delayed response.
Ive upgraded to the latest version, and all appears sorted. Thanks for the help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants