Skip to content

Commit

Permalink
feat(redis): support pipeline (#6959)
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <spacewanderlzx@gmail.com>
  • Loading branch information
spacewander authored May 5, 2022
1 parent f78d045 commit ea3828a
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 35 deletions.
100 changes: 66 additions & 34 deletions apisix/stream/xrpc/protocols/redis/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ local PREFIX_ERR = str_byte("-")


function _M.init_downstream(session)
session.req_id_seq = 0
session.resp_id_seq = 0
return xrpc_socket.downstream.socket()
end

Expand Down Expand Up @@ -73,13 +75,13 @@ local function read_len(sk)
end


local function read_req(sk, ctx)
local function read_req(session, sk)
local narg, err = read_len(sk)
if not narg then
return nil, err
end

ctx.cmd_line = core.table.new(narg, 0)
local cmd_line = core.tablepool.fetch("xrpc_redis_cmd_line", narg, 0)

for i = 1, narg do
local n, err = read_len(sk)
Expand Down Expand Up @@ -110,11 +112,16 @@ local function read_req(sk, ctx)
s = ffi_str(p, n)
end

ctx.cmd_line[i] = s
cmd_line[i] = s
end

session.req_id_seq = session.req_id_seq + 1
local ctx = sdk.get_req_ctx(session, session.req_id_seq)
ctx.cmd_line = cmd_line
ctx.cmd = ctx.cmd_line[1]
return true

local pipelined = sk:has_pending_data()
return true, nil, pipelined
end


Expand All @@ -131,7 +138,6 @@ local function read_reply(sk)

local size = tonumber(ffi_str(line + 1, n - 1))
if size < 0 then
-- return null
return true
end

Expand All @@ -144,46 +150,30 @@ local function read_reply(sk)

elseif prefix == PREFIX_STA then -- char '+'
-- print("status reply")
-- return sub(line, 2)
return true

elseif prefix == PREFIX_ARR then -- char '*'
local narr = tonumber(ffi_str(line + 1, n - 1))

-- print("multi-bulk reply: ", narr)
if narr < 0 then
-- return null
return true
end

local vals = core.table.new(n, 0)
local nvals = 0
for i = 1, narr do
local res, err = read_reply(sk)
if res then
nvals = nvals + 1
vals[nvals] = res

elseif res == nil then
if res == nil then
return nil, err

else
-- be a valid redis error value
nvals = nvals + 1
vals[nvals] = {false, err}
end
end

return vals
return true

elseif prefix == PREFIX_INT then -- char ':'
-- print("integer reply")
-- return tonumber(str_sub(line, 2))
return true

elseif prefix == PREFIX_ERR then -- char '-'
-- print("error reply: ", n)
-- return false, str_sub(line, 2)
return true

else
Expand All @@ -192,17 +182,53 @@ local function read_reply(sk)
end


function _M.from_downstream(session, downstream)
local ctx = sdk.get_req_ctx(session, 0)
local ok, err = read_req(downstream, ctx)
local function handle_reply(session, sk)
local ok, err = read_reply(sk)
if not ok then
if err ~= "timeout" and err ~= "closed" then
core.log.error("failed to read request: ", err)
return nil, err
end

-- TODO: don't update resp_id_seq if the reply is subscribed msg
session.resp_id_seq = session.resp_id_seq + 1
local ctx = sdk.get_req_ctx(session, session.resp_id_seq)

return ctx
end


function _M.from_downstream(session, downstream)
local read_pipeline = false
while true do
local ok, err, pipelined = read_req(session, downstream)
if not ok then
if err ~= "timeout" and err ~= "closed" then
core.log.error("failed to read request: ", err)
end

if read_pipeline and err == "timeout" then
break
end

return DECLINED
end
return DECLINED

if not pipelined then
break
end

if not read_pipeline then
read_pipeline = true
-- set minimal read timeout to read pipelined data
downstream:settimeouts(0, 0, 1)
end
end

if read_pipeline then
-- set timeout back
downstream:settimeouts(0, 0, 0)
end

return OK, ctx
return OK
end


Expand Down Expand Up @@ -236,8 +262,13 @@ function _M.to_upstream(session, ctx, downstream, upstream)
return DECLINED
end

local p, err = read_reply(upstream)
if p == nil then
return OK
end


function _M.from_upstream(session, downstream, upstream)
local ctx, err = handle_reply(session, upstream)
if ctx == nil then
core.log.error("failed to handle upstream: ", err)
return DECLINED
end
Expand All @@ -248,12 +279,13 @@ function _M.to_upstream(session, ctx, downstream, upstream)
return DECLINED
end

return DONE
return DONE, ctx
end


function _M.log(session, ctx)
-- TODO
core.tablepool.release("xrpc_redis_cmd_line", ctx.cmd_line)
ctx.cmd_line = nil
end


Expand Down
55 changes: 54 additions & 1 deletion t/xrpc/redis.t
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ _EOC_
}

if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
$block->set_value("no_error_log", "[error]");
$block->set_value("no_error_log", "[error]\nRPC is not finished");
}

if (!defined $block->request) {
Expand All @@ -50,6 +50,7 @@ _EOC_
$block;
});

worker_connections(1024);
run_tests;

__DATA__
Expand Down Expand Up @@ -204,3 +205,55 @@ hget animals: bark
--- response_body eval
"\r\n" x 16777216
--- stream_conf_enable
=== TEST 5: pipeline
--- config
location /t {
content_by_lua_block {
local cjson = require("cjson")
local redis = require "resty.redis"
local t = {}
for i = 1, 180 do
local th = assert(ngx.thread.spawn(function(i)
local red = redis:new()
local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
if not ok then
ngx.say("failed to connect: ", err)
return
end
red:init_pipeline()
red:set("mark_" .. i, i)
red:get("mark_" .. i)
red:get("counter")
for j = 1, 4 do
red:incr("counter")
end
local results, err = red:commit_pipeline()
if not results then
ngx.say("failed to commit: ", err)
return
end
local begin = tonumber(results[3])
for j = 1, 4 do
local incred = results[3 + j]
if incred ~= results[2 + j] + 1 then
ngx.log(ngx.ERR, cjson.encode(results))
end
end
end, i))
table.insert(t, th)
end
for i, th in ipairs(t) do
ngx.thread.wait(th)
end
}
}
--- response_body
--- stream_conf_enable

0 comments on commit ea3828a

Please sign in to comment.