Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pubsub framework #7028

Merged
merged 36 commits into from
May 12, 2022
Merged

feat: add pubsub framework #7028

merged 36 commits into from
May 12, 2022

Conversation

bzp2010
Copy link
Contributor

@bzp2010 bzp2010 commented May 11, 2022

Description

Support for publish-subscribe scenarios implemented in the form of websocket + protobuf, this PR contains its basic framework implementation.

Split from #6995

Checklist

  • I have explained the need for this PR and the problem it solves
  • I have explained the changes or the new features added to this PR
  • I have added tests corresponding to this change
  • I have updated the documentation to reflect this change
  • I have verified that this change is backward compatible (If not, please discuss on the APISIX mailing list first)

@bzp2010 bzp2010 added enhancement New feature or request doc Documentation things labels May 11, 2022
@bzp2010 bzp2010 self-assigned this May 11, 2022
@bzp2010
Copy link
Contributor Author

bzp2010 commented May 11, 2022

Update

The pubsub module core and documentation was split to the current PR, and the review comments in #6995 have been modified.

@bzp2010
Copy link
Contributor Author

bzp2010 commented May 11, 2022

Update

The current pubsub module switches to use the pubsub module-level independent pb_state database, just like the pb_state cached in lrucache in grpc-transcode, which will switch to the cached pb_state before each decoding command.

But what I'm not sure is if in the extreme case grpc-transcode and pubsub switch pb_state concurrently at the same time will cause confusion. 🤔

@spacewander
Copy link
Member

Update

The current pubsub module switches to use the pubsub module-level independent pb_state database, just like the pb_state cached in lrucache in grpc-transcode, which will switch to the cached pb_state before each decoding command.

But what I'm not sure is if in the extreme case grpc-transcode and pubsub switch pb_state concurrently at the same time will cause confusion. 🤔

There won't be concurrent issue as pubsub only modify the global pb_state when the APISIX starts.

apisix/core/pubsub.lua Outdated Show resolved Hide resolved
apisix/core/pubsub.lua Outdated Show resolved Hide resolved
apisix/core/pubsub.lua Outdated Show resolved Hide resolved
docs/en/latest/pubsub.md Outdated Show resolved Hide resolved
docs/en/latest/pubsub.md Outdated Show resolved Hide resolved

## How to support other messaging systems

An extensible pubsub module is implemented in Apache APISIX, which is responsible for starting the WebSocket server, coding and decoding communication protocols, handling client commands, and through which new messaging system support can be simply added.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
An extensible pubsub module is implemented in Apache APISIX, which is responsible for starting the WebSocket server, coding and decoding communication protocols, handling client commands, and through which new messaging system support can be simply added.
Apache APISIX implemented an extensible pubsub module responsible for starting the WebSocket server, coding and decoding communication protocols, handling client commands, and adding support for the new messaging system.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

docs/en/latest/pubsub.md Outdated Show resolved Hide resolved
@bzp2010 bzp2010 requested a review from tzssangglass May 12, 2022 03:32
docs/zh/latest/pubsub.md Outdated Show resolved Hide resolved
docs/zh/latest/pubsub.md Outdated Show resolved Hide resolved
t/pubsub/pubsub.t Outdated Show resolved Hide resolved
@bzp2010 bzp2010 requested a review from membphis May 12, 2022 07:08
@@ -31,7 +31,7 @@ jobs:
- linux_openresty_1_17
test_dir:
- t/plugin
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
- t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plugin < pubsub < node, so we should put it here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed #7043 , According to #6995 (comment), I have alphabetically aligned it between node and router

-- no error exists.
--
-- @function core.pubsub.on
-- @tparam string command to add callback
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The syntax is @tparam type name desc, see

-- @tparam string|table data The data to be encoded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue adjust #7043

-- handle client close connection
if raw_type == "close" then
ws:send_close()
return
Copy link
Member

@spacewander spacewander May 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use break in the while loop and handle the common logic in the same place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not an error, there is no return value (in fact the return value of wait has also been removed, which does not cause confusion in the location of the error handling code), and if the client initiates a close connection, the server will also close the connection and exit the loop directly, without logging and subsequent processing. The other cases have been logged and changed to post-processing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use break here and check fatal_err? It is smelly to use both break and return in the loop,
This will cause error-prone control flow,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if err then
-- terminate the event loop when a fatal error occurs
if ws.fatal then
ws:send_close()
Copy link
Member

@spacewander spacewander May 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we always close connection outside the loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


local resp, err = handler(value)
if not resp then
send_error_resp(ws, sequence, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to handle err in the various send_xxx operation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed #7043


for _, data in ipairs(data) do
local code, body = t(data.url, ngx.HTTP_PUT, data.data)
ngx.say(code..body)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better not directly print the 201 status code, because when we rerun the test, code 200 will return instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@tzssangglass
Copy link
Member

Other is good for me, CI is broken.



local function send_error_resp(ws, sequence, err_msg)
ws:send_binary(pb.encode("PubSubResp", {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to capture the return value, all right?

pb.encode and ws:send_binary, they may fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, according to the practice in grpc-transcode, pb.encode does error handling via pcall, and logs are printed for errors in both cases

@spacewander spacewander merged commit 4690feb into apache:master May 12, 2022
Liu-Junlin pushed a commit to Liu-Junlin/apisix that referenced this pull request May 20, 2022
spacewander pushed a commit that referenced this pull request Jun 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc Documentation things enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants