Skip to content

Commit

Permalink
add setTimeout for producer and admin
Browse files Browse the repository at this point in the history
  • Loading branch information
yuz10 committed Nov 30, 2021
1 parent 49a750b commit 7c0e598
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 6 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Table of Contents
* [new](#new)
* [addRPCHook](#addRPCHook)
* [setUseTLS](#setUseTLS)
* [setTimeout](#setTimeout)
* [registerSendMessageHook](#registerSendMessageHook)
* [registerEndTransactionHook](#registerEndTransactionHook)
* [send](#send)
Expand All @@ -27,6 +28,7 @@ Table of Contents
* [new](#new-1)
* [addRPCHook](#addRPCHook-1)
* [setUseTLS](#setUseTLS-1)
* [setTimeout](#setTimeout-1)
* [createTopic](#createTopic)
* [createTopicForBroker](#createTopicForBroker)
* [searchOffset](#searchOffset)
Expand Down Expand Up @@ -139,6 +141,12 @@ there is an acl hook provided, usage is:

`useTLS` is a boolean

#### setTimeout

`syntax: p:setTimeout(timeout)`

`timeout` is in milliseconds, default 3000

#### registerSendMessageHook

`syntax: p:registerSendMessageHook(hook)`
Expand Down Expand Up @@ -249,6 +257,12 @@ there is an acl hook provided, usage is:

`useTLS` is a boolean

#### setTimeout

`syntax: p:setTimeout(timeout)`

`timeout` is in milliseconds, default 3000

#### createTopic
`syntax: res, err = adm:createTopic(defaultTopic, newTopic, queueNum, topicSysFlag)`

Expand Down
4 changes: 4 additions & 0 deletions lib/resty/rocketmq/admin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ function _M.setUseTLS(self, useTLS)
self.client:setUseTLS(useTLS)
end

function _M.setTimeout(self, timeout)
self.client:setTimeout(timeout)
end

function _M.createTopic(self, defaultTopic, newTopic, queueNum, topicSysFlag)
if not core.checkTopic(newTopic) then
return nil, ('topic %s invalid format'):format(newTopic)
Expand Down
7 changes: 6 additions & 1 deletion lib/resty/rocketmq/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ function _M.new(nameservers)
current_nameserver = 1,
RPCHook = {},
useTLS = false,
timeout = 3000,

topicPublishInfoTable = {},
brokerAddrTable = {},
Expand All @@ -52,8 +53,12 @@ function _M.setUseTLS(self, useTLS)
self.useTLS = useTLS
end

function _M.setTimeout(self, timeout)
self.timeout = timeout
end

function _M:request(code, addr, header, body, oneway)
return core.request(code, addr, header, body, oneway, self.RPCHook, self.useTLS)
return core.request(code, addr, header, body, oneway, self.RPCHook, self.useTLS, self.timeout)
end

function _M:chooseNameserver()
Expand Down
7 changes: 4 additions & 3 deletions lib/resty/rocketmq/core.lua
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,9 @@ local function getLong(buffer, offset)
return lshift(res1, 32) + res2, offset
end

local function doReqeust(ip, port, send, oneway, useTLS)
local function doReqeust(ip, port, send, oneway, useTLS, timeout)
local sock = ngx_socket_tcp()
sock:settimeout(timeout)
local res, err = sock:connect(ip, port)
if not res then
return nil, nil, ('connect %s:%s fail:%s'):format(ip, port, err)
Expand Down Expand Up @@ -328,7 +329,7 @@ local function doReqeust(ip, port, send, oneway, useTLS)
end
_M.doReqeust = doReqeust

local function request(code, addr, header, body, oneway, RPCHook, useTLS)
local function request(code, addr, header, body, oneway, RPCHook, useTLS, timeout)
if RPCHook then
for _, hook in ipairs(RPCHook) do
hook:doBeforeRequest(addr, header, body)
Expand All @@ -337,7 +338,7 @@ local function request(code, addr, header, body, oneway, RPCHook, useTLS)
ngx.log(ngx.DEBUG, ('\27[33msend: %s %s\27[0m %s %s'):format(addr, REQUEST_CODE_NAME[code] or code, cjson_safe.encode(header), body))
local send = encode(code, header, body, oneway)
local ip, port = unpack(split(addr, ':'))
local respHeader, respBody, err = doReqeust(ip, port, send, oneway, useTLS)
local respHeader, respBody, err = doReqeust(ip, port, send, oneway, useTLS, timeout)
if err then
return nil, nil, err
end
Expand Down
7 changes: 7 additions & 0 deletions lib/resty/rocketmq/producer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ function _M.setUseTLS(self, useTLS)
end
end

function _M.setTimeout(self, timeout)
self.client:setTimeout(timeout)
if self.traceDispatcher then
self.traceDispatcher.producer:setTimeout(timeout)
end
end

function _M.registerSendMessageHook(self, hook)
if type(hook) == 'table' and type(hook.sendMessageBefore) == 'function' and type(hook.sendMessageAfter) == 'function' then
table.insert(self.sendMessageHookList, hook)
Expand Down
4 changes: 2 additions & 2 deletions lua-resty-rocketmq-0.2.0-0.rockspec
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package = "lua-resty-rocketmq"
version = "0.2.0-0"
version = "0.2.1-0"
source = {
url = "git://github.com/yuz10/lua-resty-rocketmq.git",
tag = "0.2.0"
tag = "0.2.1"
}
description = {
summary = "Lua RocketMQ client driver for the ngx_lua based on the cosocket API",
Expand Down

0 comments on commit 7c0e598

Please sign in to comment.