From 449d93360bd191c905423c63644a9619c8385004 Mon Sep 17 00:00:00 2001 From: spacewander Date: Tue, 19 Apr 2022 19:51:02 +0800 Subject: [PATCH 1/2] feat(xRPC): basic stream support Signed-off-by: spacewander --- apisix/stream/xrpc/protocols/redis/init.lua | 5 ++ apisix/stream/xrpc/runner.lua | 86 ++++++++++++++----- apisix/stream/xrpc/sdk.lua | 37 ++++++-- .../stream/xrpc/protocols/pingpong/init.lua | 73 ++++++++++++++-- t/xrpc/pingpong.t | 58 +++++++++++++ 5 files changed, 225 insertions(+), 34 deletions(-) diff --git a/apisix/stream/xrpc/protocols/redis/init.lua b/apisix/stream/xrpc/protocols/redis/init.lua index 4edcb5922973..8f5dcbae2976 100644 --- a/apisix/stream/xrpc/protocols/redis/init.lua +++ b/apisix/stream/xrpc/protocols/redis/init.lua @@ -208,6 +208,11 @@ function _M.connect_upstream(session, ctx) end +function _M.disconnect_upstream(session, upstream, upstream_broken) + sdk.disconnect_upstream(upstream, session.upstream_conf, upstream_broken) +end + + function _M.to_upstream(session, ctx, downstream, upstream) local ok, err = upstream:move(downstream) if not ok then diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua index b364b0fb4c5e..ee266942e29f 100644 --- a/apisix/stream/xrpc/runner.lua +++ b/apisix/stream/xrpc/runner.lua @@ -15,6 +15,8 @@ -- limitations under the License. -- local core = require("apisix.core") +local pairs = pairs +local ngx = ngx local ngx_now = ngx.now local OK = ngx.OK local DECLINED = ngx.DECLINED @@ -27,34 +29,37 @@ local _M = {} local function open_session(conn_ctx) conn_ctx.xrpc_session = { upstream_conf = conn_ctx.matched_upstream, - id_seq = 0, + _ctxs = {}, } return conn_ctx.xrpc_session end -local function close_session(session, upstream_broken) - local upstream = session.upstream - if upstream then - if upstream_broken then - upstream:close() - else - upstream:setkeepalive() - end +local function close_session(session, protocol) + local upstream_ctx = session._upstream_ctx + if upstream_ctx then + upstream_ctx.closed = true + + local up = upstream_ctx.upstream + protocol.disconnect_upstream(session, up, upstream_ctx.broken) + end + + for id in pairs(session._ctxs) do + core.log.info("RPC is not finished, id: ", id) end end local function put_req_ctx(session, ctx) local id = ctx.id - session.ctxs[id] = nil + session._ctxs[id] = nil core.tablepool.release("xrpc_ctxs", ctx) end local function finish_req(protocol, session, ctx) - ctx.rpc_end_time = ngx_now() + ctx._rpc_end_time = ngx_now() protocol.log(session, ctx) put_req_ctx(session, ctx) @@ -62,8 +67,8 @@ end local function open_upstream(protocol, session, ctx) - if session.upstream then - return OK, session.upstream + if session._upstream_ctx then + return OK, session._upstream_ctx end local state, upstream = protocol.connect_upstream(session, session) @@ -71,21 +76,49 @@ local function open_upstream(protocol, session, ctx) return state, nil end - session.upstream = upstream - return OK, upstream + session._upstream_ctx = { + upstream = upstream, + broken = false, + closed = false, + } + return OK, session._upstream_ctx +end + + +local function start_upstream_coroutine(session, protocol, downstream, up_ctx) + local upstream = up_ctx.upstream + while not up_ctx.closed do + local status, ctx = protocol.from_upstream(session, downstream, upstream) + if status ~= OK then + if ctx ~= nil then + finish_req(protocol, session, ctx) + end + + if status == DECLINED then + -- fail to read + break + end + + if status == DONE then + -- a rpc is finished + goto continue + end + end + + ::continue:: + end end function _M.run(protocol, conn_ctx) local session = open_session(conn_ctx) local downstream = protocol.init_downstream(session) - local upstream_broken = false while true do local status, ctx = protocol.from_downstream(session, downstream) if status ~= OK then if ctx ~= nil then - finish_req(session, ctx) + finish_req(protocol, session, ctx) end if status == DECLINED then @@ -100,14 +133,14 @@ function _M.run(protocol, conn_ctx) end -- need to do some auth/routing jobs before reaching upstream - local status, upstream = open_upstream(protocol, session, ctx) + local status, up_ctx = open_upstream(protocol, session, ctx) if status ~= OK then break end - status = protocol.to_upstream(session, ctx, downstream, upstream) + status = protocol.to_upstream(session, ctx, downstream, up_ctx.upstream) if status == DECLINED then - upstream_broken = true + up_ctx.broken = true break end @@ -116,10 +149,21 @@ function _M.run(protocol, conn_ctx) goto continue end + if not up_ctx.coroutine then + local co, err = ngx.thread.spawn( + start_upstream_coroutine, session, protocol, downstream, up_ctx) + if not co then + core.log.error("failed to start upstream coroutine: ", err) + break + end + + up_ctx.coroutine = co + end + ::continue:: end - close_session(session, upstream_broken) + close_session(session, protocol) -- return non-zero code to terminal the session return 200 diff --git a/apisix/stream/xrpc/sdk.lua b/apisix/stream/xrpc/sdk.lua index 63a179dab60c..f480d10e276e 100644 --- a/apisix/stream/xrpc/sdk.lua +++ b/apisix/stream/xrpc/sdk.lua @@ -21,6 +21,7 @@ local core = require("apisix.core") local xrpc_socket = require("resty.apisix.stream.xrpc.socket") local ngx_now = ngx.now +local error = error local _M = {} @@ -41,8 +42,10 @@ function _M.connect_upstream(node, up_conf) core.log.error("failed to connect: ", err) return nil end + -- TODO: support timeout if up_conf.scheme == "tls" then + -- TODO: support mTLS local ok, err = sk:sslhandshake(nil, node.host) if not ok then core.log.error("failed to handshake: ", err) @@ -55,7 +58,24 @@ end --- --- Returns the request level ctx with an optional id +-- Returns disconnected xRPC upstream socket according to the configuration +-- +-- @function xrpc.sdk.disconnect_upstream +-- @tparam table xRPC upstream socket +-- @tparam table upstream configuration +-- @tparam boolean is the upstream already broken +function _M.disconnect_upstream(upstream, up_conf, upstream_broken) + if upstream_broken then + upstream:close() + else + -- TODO: support keepalive according to the up_conf + upstream:setkeepalive() + end +end + + +--- +-- Returns the request level ctx with an id -- -- @function xrpc.sdk.get_req_ctx -- @tparam table xrpc session @@ -63,14 +83,19 @@ end -- @treturn table the request level ctx function _M.get_req_ctx(session, id) if not id then - id = session.id_seq - session.id_seq = session.id_seq + 1 + error("id is required") + end + + local ctx = session._ctxs[id] + if ctx then + return ctx end - local ctx = core.tablepool.fetch("xrpc_ctxs") - session.ctxs[id] = ctx + local ctx = core.tablepool.fetch("xrpc_ctxs", 4, 4) + ctx._id = id + session._ctxs[id] = ctx - ctx.rpc_start_time = ngx_now() + ctx._rpc_start_time = ngx_now() return ctx end diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua index 1b16606b0dd2..d25ecff91038 100644 --- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua +++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua @@ -15,6 +15,7 @@ -- limitations under the License. -- local core = require("apisix.core") +local sdk = require("apisix.stream.xrpc.sdk") local xrpc_socket = require("resty.apisix.stream.xrpc.socket") local bit = require("bit") local lshift = bit.lshift @@ -36,6 +37,7 @@ local _M = {} local HDR_LEN = 10 local TYPE_HEARTBEAT = 1 local TYPE_UNARY = 2 +local TYPE_STREAM = 3 function _M.init_worker() @@ -55,9 +57,7 @@ local function read_data(sk, len, body) local f = body and sk.drain or sk.read local p, err = f(sk, len) if not p then - if err == "closed" then - core.log.info("failed to read: ", err) - else + if err ~= "closed" then core.log.error("failed to read: ", err) end return nil @@ -98,6 +98,9 @@ function _M.from_downstream(session, downstream) return DONE end + local stream_id = p[3] * 256 + p[4] + local ctx = sdk.get_req_ctx(session, stream_id) + local body_len = to_int32(p, 6) core.log.info("read body len: ", body_len) @@ -106,10 +109,11 @@ function _M.from_downstream(session, downstream) return DECLINED end - return OK, { - is_unary = typ == TYPE_UNARY, - len = HDR_LEN + body_len - } + ctx.is_unary = typ == TYPE_UNARY + ctx.is_stream = typ == TYPE_STREAM + ctx.id = stream_id + ctx.len = HDR_LEN + body_len + return OK, ctx end @@ -146,6 +150,14 @@ function _M.connect_upstream(session, ctx) end +function _M.disconnect_upstream(session, upstream, upstream_broken) + -- disconnect upstream created by connect_upstream + -- the upstream_broken flag is used to indicate whether the upstream is + -- already broken + sdk.disconnect_upstream(upstream, session.upstream_conf, upstream_broken) +end + + function _M.to_upstream(session, ctx, downstream, upstream) -- send the request read from downstream to the upstream -- return whether the request is sent @@ -176,6 +188,53 @@ function _M.to_upstream(session, ctx, downstream, upstream) end +function _M.from_upstream(session, downstream, upstream) + local p = read_data(upstream, HDR_LEN, false) + if p == nil then + return DECLINED + end + + local p_b = str_byte("p") + if p[0] ~= p_b or p[1] ~= p_b then + core.log.error("invalid magic number: ", ffi_str(p, 2)) + return DECLINED + end + + local typ = p[2] + if typ == TYPE_HEARTBEAT then + core.log.info("send heartbeat") + + -- need to reset read buf as we won't forward it + downstream:reset_read_buf() + downstream:send(ffi_str(p, HDR_LEN)) + return DONE + end + + local stream_id = p[3] * 256 + p[4] + local ctx = sdk.get_req_ctx(session, stream_id) + + local body_len = to_int32(p, 6) + if body_len ~= ctx.len - HDR_LEN then + core.log.error("upstream body len mismatch, expected: ", ctx.len - HDR_LEN, + ", actual: ", body_len) + return DECLINED + end + + local p = read_data(upstream, body_len, true) + if p == nil then + return DECLINED + end + + local ok, err = downstream:move(upstream) + if not ok then + core.log.error("failed to handle upstream: ", err) + return DECLINED + end + + return DONE, ctx +end + + function _M.log(session, ctx) core.log.info("call pingpong's log") end diff --git a/t/xrpc/pingpong.t b/t/xrpc/pingpong.t index f0248422196a..57e0c376832d 100644 --- a/t/xrpc/pingpong.t +++ b/t/xrpc/pingpong.t @@ -264,3 +264,61 @@ passed end --- error_log failed to read: timeout + + + +=== TEST 10: client stream, N:N +--- request eval +"POST /t +" . +"pp\x03\x00\x01\x00\x00\x00\x00\x03ABC" . +"pp\x03\x00\x02\x00\x00\x00\x00\x04ABCD" +--- stream_conf_enable +--- stream_upstream_code + local sock = ngx.req.socket(true) + sock:settimeout(10) + local data1 = sock:receive(13) + if not data1 then + return + end + local data2 = sock:receive(14) + if not data2 then + return + end + assert(sock:send(data2)) + assert(sock:send(data1)) +--- response_body eval +"pp\x03\x00\x02\x00\x00\x00\x00\x04ABCD" . +"pp\x03\x00\x01\x00\x00\x00\x00\x03ABC" +--- no_error_log +RPC is not finished +[error] + + + +=== TEST 11: client stream, bad response +--- request eval +"POST /t +" . +"pp\x03\x00\x01\x00\x00\x00\x00\x03ABC" . +"pp\x03\x00\x02\x00\x00\x00\x00\x04ABCD" +--- stream_conf_enable +--- stream_upstream_code + local sock = ngx.req.socket(true) + sock:settimeout(10) + local data1 = sock:receive(13) + if not data1 then + return + end + local data2 = sock:receive(14) + if not data2 then + return + end + assert(sock:send(data2)) + assert(sock:send(data1:sub(11))) +--- response_body eval +"pp\x03\x00\x02\x00\x00\x00\x00\x04ABCD" +--- error_log +RPC is not finished +--- no_error_log +[error] From 4a2ca9c4a8543e62e7b313cda9ce803fa9d48cd9 Mon Sep 17 00:00:00 2001 From: spacewander Date: Wed, 20 Apr 2022 18:12:44 +0800 Subject: [PATCH 2/2] fix server stream handling Signed-off-by: spacewander --- apisix/stream/xrpc/runner.lua | 2 +- .../stream/xrpc/protocols/pingpong/init.lua | 14 ++--- t/xrpc/pingpong.t | 52 +++++++++++++++++++ 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua index ee266942e29f..2898f48523a7 100644 --- a/apisix/stream/xrpc/runner.lua +++ b/apisix/stream/xrpc/runner.lua @@ -51,7 +51,7 @@ end local function put_req_ctx(session, ctx) - local id = ctx.id + local id = ctx._id session._ctxs[id] = nil core.tablepool.release("xrpc_ctxs", ctx) diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua index d25ecff91038..f78211c14af9 100644 --- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua +++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua @@ -205,8 +205,8 @@ function _M.from_upstream(session, downstream, upstream) core.log.info("send heartbeat") -- need to reset read buf as we won't forward it - downstream:reset_read_buf() - downstream:send(ffi_str(p, HDR_LEN)) + upstream:reset_read_buf() + upstream:send(ffi_str(p, HDR_LEN)) return DONE end @@ -214,10 +214,12 @@ function _M.from_upstream(session, downstream, upstream) local ctx = sdk.get_req_ctx(session, stream_id) local body_len = to_int32(p, 6) - if body_len ~= ctx.len - HDR_LEN then - core.log.error("upstream body len mismatch, expected: ", ctx.len - HDR_LEN, - ", actual: ", body_len) - return DECLINED + if ctx.len then + if body_len ~= ctx.len - HDR_LEN then + core.log.error("upstream body len mismatch, expected: ", ctx.len - HDR_LEN, + ", actual: ", body_len) + return DECLINED + end end local p = read_data(upstream, body_len, true) diff --git a/t/xrpc/pingpong.t b/t/xrpc/pingpong.t index 57e0c376832d..78cfcdb7805d 100644 --- a/t/xrpc/pingpong.t +++ b/t/xrpc/pingpong.t @@ -322,3 +322,55 @@ RPC is not finished RPC is not finished --- no_error_log [error] + + + +=== TEST 12: server stream, heartbeat +--- request eval +"POST /t +" . +"pp\x03\x00\x01\x00\x00\x00\x00\x03ABC" +--- stream_conf_enable +--- stream_upstream_code + local sock = ngx.req.socket(true) + sock:settimeout(10) + local data1 = sock:receive(13) + if not data1 then + return + end + local hb = "pp\x01\x00\x00\x00\x00\x00\x00\x00" + assert(sock:send(hb)) + local data2 = sock:receive(10) + if not data2 then + return + end + assert(data2 == hb) + assert(sock:send(data1)) +--- response_body eval +"pp\x03\x00\x01\x00\x00\x00\x00\x03ABC" +--- no_error_log +RPC is not finished +[error] + + + +=== TEST 13: server stream +--- request eval +"POST /t +" . +"pp\x03\x00\x01\x00\x00\x00\x00\x01A" +--- stream_conf_enable +--- stream_upstream_code + local sock = ngx.req.socket(true) + sock:settimeout(10) + local data1 = sock:receive(11) + if not data1 then + return + end + assert(sock:send("pp\x03\x00\x03\x00\x00\x00\x00\x03ABC")) + assert(sock:send("pp\x03\x00\x02\x00\x00\x00\x00\x02AB")) + assert(sock:send(data1)) +--- response_body eval +"pp\x03\x00\x03\x00\x00\x00\x00\x03ABC" . +"pp\x03\x00\x02\x00\x00\x00\x00\x02AB" . +"pp\x03\x00\x01\x00\x00\x00\x00\x01A"