From 8a23ce303084d7873bc8f046efac4738fbbec156 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 24 Oct 2023 19:53:06 +0800 Subject: [PATCH] refactor: factor out validateMsg Signed-off-by: tison --- pulsar/producer_partition.go | 40 ++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 82123f988a..62c180ca92 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -495,14 +495,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) { return } - if p.options.DisableMultiSchema { - if msg.Schema != nil && p.options.Schema != nil && - msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() { - runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema")) - p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic) - return - } - } var schema Schema var schemaVersion []byte if msg.Schema != nil { @@ -1121,17 +1113,35 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, p.internalSendAsync(ctx, msg, callback, false) } -func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, - callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { +func (p *partitionProducer) validateMsg(msg *ProducerMessage) error { if msg == nil { - p.log.Error("Message is nil") - runCallback(callback, nil, msg, newError(InvalidMessage, "Message is nil")) - return + return newError(InvalidMessage, "Message is nil") } if msg.Value != nil && msg.Payload != nil { - p.log.Error("Can not set Value and Payload both") - runCallback(callback, nil, msg, newError(InvalidMessage, "Can not set Value and Payload both")) + return newError(InvalidMessage, "Can not set Value and Payload both") + } + + if p.options.DisableMultiSchema { + if msg.Schema != nil && p.options.Schema != nil && + msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() { + p.log.Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic) + return fmt.Errorf("msg schema can not match with producer schema") + } + } + + return nil +} + +func (p *partitionProducer) internalSendAsync( + ctx context.Context, + msg *ProducerMessage, + callback func(MessageID, *ProducerMessage, error), + flushImmediately bool, +) { + if err := p.validateMsg(msg); err != nil { + p.log.Error(err) + runCallback(callback, nil, msg, err) return }