Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kafka-pubsub): restore pb.state appropriately #11135

Merged
merged 4 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions apisix/core/pubsub.lua
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ end
-- send generic response to client
local function send_resp(ws, sequence, data)
data.sequence = sequence
-- only restore state if it has changed
if pb_state ~= pb.state() then
pb.state(pb_state)
end
local ok, encoded = pcall(pb.encode, "PubSubResp", data)
if not ok or not encoded then
log.error("failed to encode response message, err: ", encoded)
Expand Down Expand Up @@ -184,11 +188,11 @@ function _M.wait(self)
goto continue
end

-- recovery of stored pb_store
local pb_old_state = pb.state(pb_state)

-- only recover state if it has changed
if pb.state() ~= pb_state then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a good idea to add a check to make sure that the same pb.state is indeed equal. i.e. verifying that == can indeed be used to check for equality.
I'm concerned that this behavior may be broken by updates to openresty and luajit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can deal with that later.

pb.state(pb_state)
end
local data, err = pb.decode("PubSubReq", raw_data)
pb.state(pb_old_state)
if not data then
log.error("pubsub server receives undecodable data, err: ", err)
send_error(ws, 0, "wrong command")
Expand Down
3 changes: 3 additions & 0 deletions t/pubsub/kafka.t
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ failed to initialize pubsub module, err: bad "upgrade" request header: nil
# script that prepares the CI environment
location /t {
content_by_lua_block {
local pb = require("pb")
local lib_pubsub = require("lib.pubsub")
local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka")
local data = {
Expand Down Expand Up @@ -235,6 +236,8 @@ failed to initialize pubsub module, err: bad "upgrade" request header: nil
}

for i = 1, #data do
-- force clear state
pb.state(nil)
local data = test_pubsub:send_recv_ws_binary(data[i])
if data.error_resp then
ngx.say(data.sequence..data.error_resp.message)
Expand Down
Loading