Skip to content

Commit

Permalink
fix ack
Browse files Browse the repository at this point in the history
  • Loading branch information
yuz10 authored Jul 6, 2024
1 parent 12935dd commit ff934a9
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions lib/resty/rocketmq/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function _M.new(nameservers, processor)
RPCHook = {},
useTLS = false,
timeout = 3000,

topicPublishInfoTable = {},
topicSubscribeInfoTable = {},
topicRouteTable = {},
Expand Down Expand Up @@ -205,12 +205,12 @@ local function updateTopicRouteInfoFromNameserver(self, topic)
end
local publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData)
self.topicPublishInfoTable[topic] = publishInfo

local subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData)
self.topicSubscribeInfoTable[topic] = subscribeInfo

self.topicRouteTable[topic] = topicRouteData

for _, bd in ipairs(topicRouteData.brokerDatas) do
self.brokerAddrTable[bd.brokerName] = bd.brokerAddrs
end
Expand Down Expand Up @@ -302,7 +302,7 @@ end

function _M:updateConsumeOffsetToBroker(mq, offset)
local brokerAddr = findBrokerAddressInSubscribe(self, mq.brokerName)

return self:request(REQUEST_CODE.UPDATE_CONSUMER_OFFSET, brokerAddr, {
topic = mq.topic,
consumerGroup = mq.consumerGroup,
Expand All @@ -314,7 +314,7 @@ end

function _M:fetchConsumeOffsetFromBroker(consumerGroup, mq)
local brokerAddr = findBrokerAddressInSubscribe(self, mq.brokerName)

local h, b, err = self:request(REQUEST_CODE.QUERY_CONSUMER_OFFSET, brokerAddr, {
topic = mq.topic,
consumerGroup = consumerGroup,
Expand Down Expand Up @@ -538,7 +538,7 @@ local function processPopResponse(mq, status, extFields, messages)
msg.properties["1ST_POP_TIME"] = msg.properties["1ST_POP_TIME"] or tostring(extFields.popTime)
msg.brokerName = mq.brokerName
end

return {
restNum = tonumber(extFields.restNum),
invisibleTime = tonumber(extFields.invisibleTime),
Expand Down Expand Up @@ -643,9 +643,17 @@ end

function _M:doAck(topic, consumerGroup, extraInfo)
local extraInfoStrs = split(extraInfo, ' ')
local topicVersion = tonumber(extraInfoStrs[5])
local brokerName = extraInfoStrs[6]
local queueId = tonumber(extraInfoStrs[7])
local offset = extraInfoStrs[8]
-- finalOffset popTime invisibleTime reviveQid topicVersion brokerName queueId queueOffset
local realTopic = topic
if topicVersion == 1 then
realTopic = '%RETRY%' .. consumerGroup .. '_' .. topic
elseif topicVersion == 2 then
realTopic = '%RETRY%' .. consumerGroup .. '+' .. topic
end
local brokerAddr = findBrokerAddressInSubscribe(self, brokerName)
if brokerAddr == nil then
updateTopicRouteInfoFromNameserver(self, topic)
Expand All @@ -655,7 +663,7 @@ function _M:doAck(topic, consumerGroup, extraInfo)
return nil
end
local h, b, err = self:request(REQUEST_CODE.ACK_MESSAGE, brokerAddr, {
topic = topic,
topic = realTopic,
queueId = queueId,
offset = offset,
consumerGroup = consumerGroup,
Expand Down

0 comments on commit ff934a9

Please sign in to comment.