diff --git a/apisix/core/pubsub.lua b/apisix/core/pubsub.lua index 18bb887001c1..5b36b0cc9dff 100644 --- a/apisix/core/pubsub.lua +++ b/apisix/core/pubsub.lua @@ -66,6 +66,10 @@ end -- send generic response to client local function send_resp(ws, sequence, data) data.sequence = sequence + -- only restore state if it has changed + if pb_state ~= pb.state() then + pb.state(pb_state) + end local ok, encoded = pcall(pb.encode, "PubSubResp", data) if not ok or not encoded then log.error("failed to encode response message, err: ", encoded) @@ -184,11 +188,11 @@ function _M.wait(self) goto continue end - -- recovery of stored pb_store - local pb_old_state = pb.state(pb_state) - + -- only recover state if it has changed + if pb.state() ~= pb_state then + pb.state(pb_state) + end local data, err = pb.decode("PubSubReq", raw_data) - pb.state(pb_old_state) if not data then log.error("pubsub server receives undecodable data, err: ", err) send_error(ws, 0, "wrong command") diff --git a/t/pubsub/kafka.t b/t/pubsub/kafka.t index b61af6f0cb83..b779a4846a7d 100644 --- a/t/pubsub/kafka.t +++ b/t/pubsub/kafka.t @@ -159,6 +159,7 @@ failed to initialize pubsub module, err: bad "upgrade" request header: nil # script that prepares the CI environment location /t { content_by_lua_block { + local pb = require("pb") local lib_pubsub = require("lib.pubsub") local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka") local data = { @@ -235,6 +236,8 @@ failed to initialize pubsub module, err: bad "upgrade" request header: nil } for i = 1, #data do + -- force clear state + pb.state(nil) local data = test_pubsub:send_recv_ws_binary(data[i]) if data.error_resp then ngx.say(data.sequence..data.error_resp.message)