-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide report channel Events()
for confluent kafka producer
#1031
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just starting the review until I noticed we are already passing an error channel to Produce which according to Confluent docs is an option to not have to poll Events() channel and should not lead to memory leak. Do you see memory leaks in our implementation?
It is also possible to direct delivery reports to alternate channels by providing a non-nil
chan Event
channel to.Produce()
https://docs.confluent.io/platform/current/clients/confluent-kafka-go/index.html
After running a service with our current implementation for a few days, The memory usage is from 75 MB to about 180 MB Note: All the confluent producer samples initialize a goroutine to retrieve the event from the |
Sorry for being a bit pedantic here, but since we follow Confluent's producer logic and pass an event channel, either something is fundamentally wrong with the Confluent library or we are not using it correctly. I don't see adding another goroutine to handle this solving the problem IF the Confluent library behaves as expected, allowing a return channel on produce which we use accordingly:
Can you please debug further whether that channel is not drained correctly and whether you see events also in |
Btw (unrelated to this, but still important): we aren't calling
|
@embano1 You get the point. It does seem wired to use |
Yes, We don't need to call the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, We don't need to call the
Flush()
. Cause each time we send a message, then wait directly for the result from thedeliveryChan
.
Makes sense.
However, I wonder what happens if a user constructs the producer with the following option:
If no delivery reports are wanted they can be completely disabled by setting configuration property
"go.delivery.reports": false
.
Would the producer block indefinitely or panic on nil
event? Can you please test this and make sure your Send
still works?
Sure! I verified the following cases: 1. What happens when set both
2. What happens with the configuration
3. What happens with the configuration
I also found that this issue, #1030 is not necessarily related to our current implementation, so I am planning to close the issue and the current PR. @embano1 What do you think? |
That result is what we are expecting, thanks.
Interesting, seems custom event delivery channel overwrites this setting then. Weird configuration behavior, but in-line with the underlying Kafka callback implementation so perhaps reasonable.
OK, so that setting works then. Thx for your investigations, proposed next steps SGTM! |
@embano1 It does report the connection event from the {"level":"info","ts":1712455974.5608,"logger":"fallback","caller":"v2/protocol.go:291","msg":"get an error event from producer.Events() chan: 127.0.0.1:9093/bootstrap: Connect to ipv4#127.0.0.1:9093 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)"} |
@yanmxa thx for verifying! So I suggest implementing a |
9720fb1
to
ea2f222
Compare
@embano1 I'm worried that switching
Yes, We release the Is the current implementation consistent with your ideas? |
Select blocks, ie, is not making the code async. Let me see if I can propose something to simplify the current PR. |
Quick question: what was the reason we decided to use a custom event delivery channel instead of reading everything from producer.Events()? |
@embano1 use the "delieryChan" to get the result after each send so that we can change the asynchronous method I choose Now we know that they do this because |
This is really non-intuitive API design in the Confluent go SDK if you ask me :/ I'm proposing a slightly different implementation to
diff --git a/protocol/kafka_confluent/v2/protocol.go b/protocol/kafka_confluent/v2/protocol.go
index 8aa9068..e98ab51 100644
--- a/protocol/kafka_confluent/v2/protocol.go
+++ b/protocol/kafka_confluent/v2/protocol.go
@@ -12,9 +12,10 @@
"io"
"sync"
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"
- "github.com/confluentinc/confluent-kafka-go/v2/kafka"
cecontext "github.com/cloudevents/sdk-go/v2/context"
)
@@ -33,15 +34,14 @@ type Protocol struct {
consumerTopics []string
consumerRebalanceCb kafka.RebalanceCb // optional
consumerPollTimeout int // optional
- consumerErrorHandler func(ctx context.Context, err kafka.Error) //optional
+ consumerErrorHandler func(ctx context.Context, err kafka.Error) // optional
consumerMux sync.Mutex
consumerIncoming chan *kafka.Message
consumerCtx context.Context
consumerCancel context.CancelFunc
producer *kafka.Producer
- producerDeliveryChan chan kafka.Event // optional
- producerDefaultTopic string // optional
+ producerDefaultTopic string // optional
closerMux sync.Mutex
}
@@ -85,9 +85,6 @@ func New(opts ...Option) (*Protocol, error) {
if p.kafkaConfigMap == nil && p.producer == nil && p.consumer == nil {
return nil, errors.New("at least one of the following to initialize the protocol must be set: config, producer, or consumer")
}
- if p.producer != nil {
- p.producerDeliveryChan = make(chan kafka.Event)
- }
return p, nil
}
@@ -128,23 +125,25 @@ func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ..
kafkaMsg.Key = []byte(messageKey)
}
- err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...)
- if err != nil {
- return err
+ if err = WriteProducerMessage(ctx, in, kafkaMsg, transformers...); err != nil {
+ return fmt.Errorf("create producer message: %w", err)
}
- err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan)
- if err != nil {
- return err
- }
- e := <-p.producerDeliveryChan
- m := e.(*kafka.Message)
- if m.TopicPartition.Error != nil {
- return m.TopicPartition.Error
+ if err = p.producer.Produce(kafkaMsg, nil); err != nil {
+ return fmt.Errorf("send message: %w", err)
}
+
return nil
}
+func (p *Protocol) Events() (chan kafka.Event, error) {
+ if p.producer == nil {
+ return nil, errors.New("producer not set")
+ }
+
+ return p.producer.Events(), nil
+}
+
func (p *Protocol) OpenInbound(ctx context.Context) error {
if p.consumer == nil {
return errors.New("the consumer client must be set")
@@ -238,7 +237,6 @@ func (p *Protocol) Close(ctx context.Context) error {
if p.producer != nil && !p.producer.IsClosed() {
p.producer.Close()
- close(p.producerDeliveryChan)
}
return nil |
If we agree on the above, needs code comments for public methods and usage example |
Hello, @embano1! I'd like to consult your opinion on something related to kubeCon China. Since I couldn't find your contact information, I'm reaching out to you here. Over the past year, we've brought some updates about the message queue to this repos. If you're interested, I can draft a proposal for the enhancements of cloudevents message queue to the KubeCon China. The main people involved would be the two of us. The main topics covered include bringing MQTT binding to support IoT cases, and some improvements related to Kafka such as supporting message confirm, consumption from a specific point, batch subscribing to topics, and supporting ordered partition offsets, etc. How do you feel about it? |
It's a good choice to give the If we deprecate the
|
Do we need formal deprecation of |
Can you please reach out on the CloudEvents Slack channel and ping me there? |
I think we can just remove the code cause the user cannot sense it. Yes. Calling It looks like we have reached a consensus. Then I will proceed with these changes and add some examples. |
Events()
for confluent kafka producer
} | ||
|
||
err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan) | ||
if err != nil { | ||
if err = p.producer.Produce(kafkaMsg, nil); err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no formatted error like above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to return a kafka.Error
here for fear that users might also want to check its type using an assert statement like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's why I'm wrapping the error with %w
to allow Go idiomatic use of https://pkg.go.dev/errors#As
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yanmxa WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@embano1
While the type of kafka.Error
is not the type of error we typically refer to.
It's an object that only implements the error
interface. So once the kafka.Error
is formatted with %w
, then it can't be unwrapped back to kafka.Error
.
err = p.producer.Produce(kafkaMsg, nil)
if err != nil {
err = fmt.Errorf("send message: %w", err)
}
...
if err != nil {
if err.(kafka.Error).Code() == kafka.ErrQueueFull {
// Producer queue is full, wait 1s for messages
// to be delivered then try again.
time.Sleep(time.Second)
continue
}
fmt.Printf("Failed to produce message: %v\n", err)
}
panic: interface conversion: error is *fmt.wrapError, not kafka.Error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we're talking past each other :)
This doesn't work?
var kafkaError kafka.Error
if errors.As(err, &kafkaError) {
// use kafkaError to handle and access fields
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yanmxa ICYMI ^
} | ||
return p.producer.Events(), nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please add a comment on Send to reference/explain to also use the Events channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
c, err := cloudevents.NewClient(receiver, client.WithPollGoroutines(1)) | ||
// The 'WithBlockingCallback()' is to make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1). | ||
// These two options make sure the events from kafka partition are processed in order | ||
c, err := cloudevents.NewClient(receiver, client.WithBlockingCallback(), client.WithPollGoroutines(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh wow, man...this is really not intuitive to use
// Producer queue is full, wait 1s for messages | ||
// to be delivered then try again. | ||
time.Sleep(time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a best practice? Wondering if it complicates the example, unless you think it's always important to implement this logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've seen them use assert like the above when sending messages asynchronously.
But I tried to set the producer buffer channel size with 1 "go.produce.channel.size": 1
, and sending 10,000 messages within 1 second several times. It never catches the kafka.ErrQueueFull
error. So let's remove it.
continue | ||
} | ||
} | ||
if cloudevents.IsUndelivered(result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is probably sufficient for error handling instead of the above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
emm... remove it now.
defer sender.Close(ctx) | ||
|
||
// Listen to all the events on the default events channel | ||
// It's important to read these events otherwise the events channel will eventually fill up | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you're not waiting for this goroutine to finish before returning from main
- which could prevent your goroutine from printing anything. typically use https://pkg.go.dev/golang.org/x/sync/errgroup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Thx @yanmxa ! Definitely easier code on our side - however, that Confluent SDK is really hard to use given all the asynchrony and error handling one has to perform to catch all possible scenarios - guess not much further we can do here :/ |
Absolutely! It seems like they traded ease of use for improved performance with their asynchronous approach. That's made us put in extra work to handle it. I'm thinking we might consider adding an option in the future to switch the producer to a synchronous method, avoiding users having to watch err = producer.Produce(msg, nil)
if err != nil {
return fmt.Errorf("failed to produce message: %w", err)
}
event := <-producer.Events() |
Initially I tried that, but IMHO it's not that easy. Because |
} | ||
|
||
err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan) | ||
if err != nil { | ||
if err = p.producer.Produce(kafkaMsg, nil); err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's why I'm wrapping the error with %w
to allow Go idiomatic use of https://pkg.go.dev/errors#As
|
||
if p.consumerCancel != nil { | ||
p.consumerCancel() | ||
} | ||
|
||
if p.producer != nil && !p.producer.IsClosed() { | ||
// Flush and close the producer and the events channel | ||
for p.producer.Flush(10000) > 0 { | ||
logger.Info("Still waiting to flush outstanding messages") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.Info("Still waiting to flush outstanding messages") | |
logger.Info("Flushing outstanding messages") |
Question: 10000
is 10 seconds timeout or delay? How often is this message printed? Risk of floding logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10000
is 10 seconds timeout.
Actually, I haven't seen the message in a normal case. It only echoes when Invoke Close()
after sending to a disconnected broker. And with the current 10 second timeout, it prints the message every 10 seconds.
Exactly! Making the
However, should we convert it to synchronous? Which option should we use? If it is necessary, we can discuss it later. |
Yeah, let's keep this out of this PR for now |
} | ||
|
||
err = p.producer.Produce(kafkaMsg, p.producerDeliveryChan) | ||
if err != nil { | ||
if err = p.producer.Produce(kafkaMsg, nil); err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yanmxa ICYMI ^
Signed-off-by: Meng Yan <myan@redhat.com> reply review Signed-off-by: Meng Yan <myan@redhat.com> Update protocol/kafka_confluent/v2/protocol.go Co-authored-by: Michael Gasch <15986659+embano1@users.noreply.github.com> Signed-off-by: Meng Yan <myan@redhat.com>
105e713
to
7fef294
Compare
Signed-off-by: myan myan@redhat.com
Resolved: #1030