Skip to content

Commit

Permalink
fix(kafka-pubsub): restore pb.state appropriately (apache#11135)
Browse files Browse the repository at this point in the history
  • Loading branch information
shreemaan-abhishek authored Apr 10, 2024
1 parent 25da642 commit c7c70e4
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
12 changes: 8 additions & 4 deletions apisix/core/pubsub.lua
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ end
-- send generic response to client
local function send_resp(ws, sequence, data)
data.sequence = sequence
-- only restore state if it has changed
if pb_state ~= pb.state() then
pb.state(pb_state)
end
local ok, encoded = pcall(pb.encode, "PubSubResp", data)
if not ok or not encoded then
log.error("failed to encode response message, err: ", encoded)
Expand Down Expand Up @@ -184,11 +188,11 @@ function _M.wait(self)
goto continue
end

-- recovery of stored pb_store
local pb_old_state = pb.state(pb_state)

-- only recover state if it has changed
if pb.state() ~= pb_state then
pb.state(pb_state)
end
local data, err = pb.decode("PubSubReq", raw_data)
pb.state(pb_old_state)
if not data then
log.error("pubsub server receives undecodable data, err: ", err)
send_error(ws, 0, "wrong command")
Expand Down
3 changes: 3 additions & 0 deletions t/pubsub/kafka.t
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ failed to initialize pubsub module, err: bad "upgrade" request header: nil
# script that prepares the CI environment
location /t {
content_by_lua_block {
local pb = require("pb")
local lib_pubsub = require("lib.pubsub")
local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka")
local data = {
Expand Down Expand Up @@ -235,6 +236,8 @@ failed to initialize pubsub module, err: bad "upgrade" request header: nil
}
for i = 1, #data do
-- force clear state
pb.state(nil)
local data = test_pubsub:send_recv_ws_binary(data[i])
if data.error_resp then
ngx.say(data.sequence..data.error_resp.message)
Expand Down

0 comments on commit c7c70e4

Please sign in to comment.