Skip to content

Commit

Permalink
[ISSUE apache#996] invoke user callback and return error info when as…
Browse files Browse the repository at this point in the history
…ync process send resp error
  • Loading branch information
ferrirW committed Feb 10, 2023
1 parent 757e639 commit 6c93d84
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
3 changes: 1 addition & 2 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
case ResSuccess:
status = primitive.SendOK
default:
status = primitive.SendUnknownError
return errors.New(cmd.Remark)
return errors.New(fmt.Sprintf("CODE: %d, DESC: %s", cmd.Code, cmd.Remark))
}

msgIDs := make([]string, 0)
Expand Down
11 changes: 8 additions & 3 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,18 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
err := p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
cancel()
if err != nil {
h(ctx, nil, err)
}

resp := primitive.NewSendResult()
err = p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)
if err != nil {
h(ctx, nil, err)
} else {
p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)
h(ctx, resp, nil)
return
}

h(ctx, resp, nil)
})

if err != nil {
Expand Down

0 comments on commit 6c93d84

Please sign in to comment.