Skip to content

Commit

Permalink
fix server stream handling
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <spacewanderlzx@gmail.com>
  • Loading branch information
spacewander committed Apr 20, 2022
1 parent 449d933 commit 4a2ca9c
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 7 deletions.
2 changes: 1 addition & 1 deletion apisix/stream/xrpc/runner.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ end


local function put_req_ctx(session, ctx)
local id = ctx.id
local id = ctx._id
session._ctxs[id] = nil

core.tablepool.release("xrpc_ctxs", ctx)
Expand Down
14 changes: 8 additions & 6 deletions t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -205,19 +205,21 @@ function _M.from_upstream(session, downstream, upstream)
core.log.info("send heartbeat")

-- need to reset read buf as we won't forward it
downstream:reset_read_buf()
downstream:send(ffi_str(p, HDR_LEN))
upstream:reset_read_buf()
upstream:send(ffi_str(p, HDR_LEN))
return DONE
end

local stream_id = p[3] * 256 + p[4]
local ctx = sdk.get_req_ctx(session, stream_id)

local body_len = to_int32(p, 6)
if body_len ~= ctx.len - HDR_LEN then
core.log.error("upstream body len mismatch, expected: ", ctx.len - HDR_LEN,
", actual: ", body_len)
return DECLINED
if ctx.len then
if body_len ~= ctx.len - HDR_LEN then
core.log.error("upstream body len mismatch, expected: ", ctx.len - HDR_LEN,
", actual: ", body_len)
return DECLINED
end
end

local p = read_data(upstream, body_len, true)
Expand Down
52 changes: 52 additions & 0 deletions t/xrpc/pingpong.t
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,55 @@ RPC is not finished
RPC is not finished
--- no_error_log
[error]
=== TEST 12: server stream, heartbeat
--- request eval
"POST /t
" .
"pp\x03\x00\x01\x00\x00\x00\x00\x03ABC"
--- stream_conf_enable
--- stream_upstream_code
local sock = ngx.req.socket(true)
sock:settimeout(10)
local data1 = sock:receive(13)
if not data1 then
return
end
local hb = "pp\x01\x00\x00\x00\x00\x00\x00\x00"
assert(sock:send(hb))
local data2 = sock:receive(10)
if not data2 then
return
end
assert(data2 == hb)
assert(sock:send(data1))
--- response_body eval
"pp\x03\x00\x01\x00\x00\x00\x00\x03ABC"
--- no_error_log
RPC is not finished
[error]
=== TEST 13: server stream
--- request eval
"POST /t
" .
"pp\x03\x00\x01\x00\x00\x00\x00\x01A"
--- stream_conf_enable
--- stream_upstream_code
local sock = ngx.req.socket(true)
sock:settimeout(10)
local data1 = sock:receive(11)
if not data1 then
return
end
assert(sock:send("pp\x03\x00\x03\x00\x00\x00\x00\x03ABC"))
assert(sock:send("pp\x03\x00\x02\x00\x00\x00\x00\x02AB"))
assert(sock:send(data1))
--- response_body eval
"pp\x03\x00\x03\x00\x00\x00\x00\x03ABC" .
"pp\x03\x00\x02\x00\x00\x00\x00\x02AB" .
"pp\x03\x00\x01\x00\x00\x00\x00\x01A"

0 comments on commit 4a2ca9c

Please sign in to comment.