Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pubsub framework #7028

Merged
merged 36 commits into from
May 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
27a0f2f
feat: add pubsub proto define
bzp2010 May 11, 2022
6ab23d6
feat: add pubsub core module
bzp2010 May 11, 2022
c728ca2
docs: add tparam comment
bzp2010 May 11, 2022
d8e95f0
chore: pubsub in-situ error handling
bzp2010 May 11, 2022
b623413
fix: review
bzp2010 May 11, 2022
06d417e
chore: install pubsub proto
bzp2010 May 11, 2022
285dcc2
docs: add
bzp2010 May 11, 2022
2881b97
test: add pubsub cases
bzp2010 May 11, 2022
bedb6f7
fix: lint
bzp2010 May 11, 2022
ee1c552
test: add pubsub
bzp2010 May 11, 2022
ddcbe5f
feat: ensure use instance level unique pb_state
bzp2010 May 11, 2022
c921ff1
feat: ensure use module level pb_state
bzp2010 May 11, 2022
3015c34
fix: pubsub test lib pb_state cache
bzp2010 May 11, 2022
3d734f2
fix: review
bzp2010 May 12, 2022
36c9039
fix: lint
bzp2010 May 12, 2022
d5ee31d
fix: zh doc
bzp2010 May 12, 2022
04c1be0
docs: remove chinese docs
bzp2010 May 12, 2022
5f06d70
fix: docs category
bzp2010 May 12, 2022
f64166f
test: add unregisted command case
bzp2010 May 12, 2022
fcd1fd0
fix: typo
bzp2010 May 12, 2022
f4fc509
feat: improve empty and wrong command log
bzp2010 May 12, 2022
bb09cc7
test: add text ws send helper
bzp2010 May 12, 2022
2fe4f76
test: add more cases
bzp2010 May 12, 2022
1f329ba
fix: typo
bzp2010 May 12, 2022
80dfd9c
docs: remove supported kafka
bzp2010 May 12, 2022
bd0dc52
test: add undecodable case
bzp2010 May 12, 2022
1091934
chore: delete kafka of pubsub proto
bzp2010 May 12, 2022
abe114d
docs: delete kafka example
bzp2010 May 12, 2022
132a664
feat: add ping/pong and empty command
bzp2010 May 12, 2022
c9a026b
test: change to ping and empty test cases
bzp2010 May 12, 2022
0d86e27
fix: code..body resp
bzp2010 May 12, 2022
2986572
chore: adjust control flow to avoid errors
bzp2010 May 12, 2022
096773d
fix: ldoc comment
bzp2010 May 12, 2022
1b914f9
fix: lint
bzp2010 May 12, 2022
874905f
fix: pb.encode and ws.send errors not handled
bzp2010 May 12, 2022
646b3b8
fix: typo error
bzp2010 May 12, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- linux_openresty_1_17
test_dir:
- t/plugin
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
- t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plugin < pubsub < node, so we should put it here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed #7043 , According to #6995 (comment), I have alphabetically aligned it between node and router


runs-on: ${{ matrix.platform }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/centos7-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
matrix:
test_dir:
- t/plugin
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
- t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library

steps:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ install: runtime
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix
$(ENV_INSTALL) apisix/*.lua $(ENV_INST_LUADIR)/apisix/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/include/apisix/model
$(ENV_INSTALL) apisix/include/apisix/model/*.proto $(ENV_INST_LUADIR)/apisix/include/apisix/model/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/admin
$(ENV_INSTALL) apisix/admin/*.lua $(ENV_INST_LUADIR)/apisix/admin/

Expand Down
1 change: 1 addition & 0 deletions apisix/core.lua
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ return {
tablepool = require("tablepool"),
resolver = require("apisix.core.resolver"),
os = require("apisix.core.os"),
pubsub = require("apisix.core.pubsub"),
}
228 changes: 228 additions & 0 deletions apisix/core/pubsub.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

--- Extensible framework to support publish-and-subscribe scenarios
--
-- @module core.pubsub

local log = require("apisix.core.log")
local ws_server = require("resty.websocket.server")
local protoc = require("protoc")
local pb = require("pb")
local setmetatable = setmetatable
local pcall = pcall
local pairs = pairs


local _M = { version = 0.1 }
local mt = { __index = _M }

local pb_state
local function init_pb_state()
-- clear current pb state
pb.state(nil)
membphis marked this conversation as resolved.
Show resolved Hide resolved

-- set int64 rule for pubsub module
pb.option("int64_as_string")

-- initialize protoc compiler
protoc.reload()
local pubsub_protoc = protoc.new()

-- compile the protobuf file on initial load module
-- ensure that each worker is loaded once
if not pubsub_protoc.loaded["pubsub.proto"] then
pubsub_protoc:addpath("apisix/include/apisix/model")
local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
if not ok then
pubsub_protoc:reset()
return "failed to load pubsub protocol: " .. err
end
end

pb_state = pb.state(nil)
membphis marked this conversation as resolved.
Show resolved Hide resolved
end


-- send error response to client
local function send_error(ws, sequence, err_msg)
local ok, data = pcall(pb.encode, "PubSubResp", {
sequence = sequence,
error_resp = {
code = 0,
message = err_msg,
},
})
if not ok or not data then
log.error("failed to encode error response message, err: ", data)
end

local _, err = ws:send_binary(data)
if err then
log.error("failed to send response to client, err: ", err)
end
end


---
-- Create pubsub module instance
--
-- @function core.pubsub.new
-- @treturn pubsub module instance
-- @treturn string|nil error message if present
-- @usage
-- local pubsub, err = core.pubsub.new()
function _M.new()
if not pb_state then
local err = init_pb_state()
if err then
return nil, err
end
end

local ws, err = ws_server:new()
if not ws then
return nil, err
end

local obj = setmetatable({
ws_server = ws,
cmd_handler = {},
}, mt)

return obj
end


---
-- Add command callbacks to pubsub module instances
--
-- The callback function prototype: function (params)
-- The params in the parameters contain the data defined in the requested command.
-- Its first return value is the data, which needs to contain the data needed for
-- the particular resp, returns nil if an error exists.
-- Its second return value is a string type error message, no need to return when
-- no error exists.
--
-- @function core.pubsub.on
-- @tparam string command to add callback
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The syntax is @tparam type name desc, see

-- @tparam string|table data The data to be encoded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue adjust #7043

-- @tparam function handler callback on receipt of command
-- @usage
-- pubsub:on(command, function (params)
-- return data, err
-- end)
function _M.on(self, command, handler)
self.cmd_handler[command] = handler
end


---
-- Put the pubsub instance into an event loop, waiting to process client commands
--
-- @function core.pubsub.wait
-- @usage
-- local err = pubsub:wait()
function _M.wait(self)
local fatal_err
local ws = self.ws_server
while true do
-- read raw data frames from websocket connection
local raw_data, raw_type, err = ws:recv_frame()
if err then
-- terminate the event loop when a fatal error occurs
if ws.fatal then
fatal_err = err
break
end

-- skip this loop for non-fatal errors
log.error("failed to receive websocket frame: ", err)
goto continue
end

-- handle client close connection
if raw_type == "close" then
break
end

-- the pubsub messages use binary, if the message is not
-- binary, skip this message
if raw_type ~= "binary" then
log.warn("pubsub server receive non-binary data, type: ",
raw_type, ", data: ", raw_data)
goto continue
end

-- recovery of stored pb_store
pb.state(pb_state)

local data, err = pb.decode("PubSubReq", raw_data)
if not data then
log.error("pubsub server receives undecodable data, err: ", err)
send_error(ws, 0, "wrong command")
goto continue
end

-- command sequence code
local sequence = data.sequence

-- call command handler to generate response data
for key, value in pairs(data) do
-- There are sequence and command properties in the data,
-- select the handler according to the command value.
if key ~= "sequence" then
local handler = self.cmd_handler[key]
if not handler then
log.error("pubsub callback handler not registered for the",
" command, command: ", key)
send_error(ws, sequence, "unknown command: " .. key)
break
end

local resp, err = handler(value)
if not resp then
send_error(ws, sequence, err)
break
end

-- write back the sequence
resp.sequence = sequence
local ok, data = pcall(pb.encode, "PubSubResp", resp)
if not ok or not data then
log.error("failed to encode response message, err: ", data)
break
end
local _, err = ws:send_binary(data)
if err then
log.error("failed to send response to client, err: ", err)
end
break
end
log.warn("pubsub server receives empty command")
end

::continue::
end

if fatal_err then
log.error("fatal error in pubsub websocket server, err: ", fatal_err)
end
ws:send_close()
end


return _M
96 changes: 96 additions & 0 deletions apisix/include/apisix/model/pubsub.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

syntax = "proto3";

option java_package = "org.apache.apisix.api.pubsub";
option java_outer_classname = "PubSubProto";
option java_multiple_files = true;
option go_package = "github.com/apache/apisix/api/pubsub;pubsub";

/**
* Ping command, used to keep the websocket connection alive
*
* The state field is used to pass some non-specific information,
* which will be returned in the pong response as is.
*/
message CmdPing {
bytes state = 1;
}

/**
* An empty command, a placeholder for testing purposes only
*/
message CmdEmpty {}

/**
* Client request definition for pubsub scenarios
*
* The sequence field is used to associate requests and responses.
* Apache APISIX will set a consistent sequence for the associated
* requests and responses, and the client can explicitly know the
* response corresponding to any of the requests.
*
* The req field is the command data sent by the client, and its
* type will be chosen from any of the lists in the definition.
*
* Field numbers 1 to 30 in the definition are used to define basic
* information and future extensions, and numbers after 30 are used
* to define commands.
*/
message PubSubReq {
int64 sequence = 1;
oneof req {
CmdEmpty cmd_empty = 31;
CmdPing cmd_ping = 32;
};
}

/**
* The response body of the service when an error occurs,
* containing the error code and the error message.
*/
message ErrorResp {
int32 code = 1;
string message = 2;
}

/**
* Pong response, the state field will pass through the
* value in the Ping command field.
*/
message PongResp {
bytes state = 1;
}

/**
* Server response definition for pubsub scenarios
*
* The sequence field will be the same as the value in the
* request, which is used to associate the associated request
* and response.
*
* The resp field is the response data sent by the server, and
* its type will be chosen from any of the lists in the definition.
*/
message PubSubResp {
int64 sequence = 1;
oneof resp {
ErrorResp error_resp = 31;
PongResp pong_resp = 32;
};
}
4 changes: 4 additions & 0 deletions docs/assets/images/pubsub-architecture.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions docs/en/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@
"discovery/kubernetes"
]
},
{
"type": "category",
"label": "PubSub",
"items": [
"pubsub"
]
},
{
"type": "category",
"label": "xRPC",
Expand Down
Loading