Skip to content

Commit

Permalink
feat: add pubsub framework (apache#7028)
Browse files Browse the repository at this point in the history
  • Loading branch information
bzp2010 authored and Liu-Junlin committed May 20, 2022
1 parent d74a66f commit 93184fa
Show file tree
Hide file tree
Showing 11 changed files with 756 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- linux_openresty_1_17
test_dir:
- t/plugin
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
- t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc

runs-on: ${{ matrix.platform }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/centos7-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
matrix:
test_dir:
- t/plugin
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
- t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
- t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library

steps:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ install: runtime
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix
$(ENV_INSTALL) apisix/*.lua $(ENV_INST_LUADIR)/apisix/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/include/apisix/model
$(ENV_INSTALL) apisix/include/apisix/model/*.proto $(ENV_INST_LUADIR)/apisix/include/apisix/model/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/admin
$(ENV_INSTALL) apisix/admin/*.lua $(ENV_INST_LUADIR)/apisix/admin/

Expand Down
1 change: 1 addition & 0 deletions apisix/core.lua
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ return {
tablepool = require("tablepool"),
resolver = require("apisix.core.resolver"),
os = require("apisix.core.os"),
pubsub = require("apisix.core.pubsub"),
}
228 changes: 228 additions & 0 deletions apisix/core/pubsub.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

--- Extensible framework to support publish-and-subscribe scenarios
--
-- @module core.pubsub

local log = require("apisix.core.log")
local ws_server = require("resty.websocket.server")
local protoc = require("protoc")
local pb = require("pb")
local setmetatable = setmetatable
local pcall = pcall
local pairs = pairs


local _M = { version = 0.1 }
local mt = { __index = _M }

local pb_state
local function init_pb_state()
-- clear current pb state
pb.state(nil)

-- set int64 rule for pubsub module
pb.option("int64_as_string")

-- initialize protoc compiler
protoc.reload()
local pubsub_protoc = protoc.new()

-- compile the protobuf file on initial load module
-- ensure that each worker is loaded once
if not pubsub_protoc.loaded["pubsub.proto"] then
pubsub_protoc:addpath("apisix/include/apisix/model")
local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
if not ok then
pubsub_protoc:reset()
return "failed to load pubsub protocol: " .. err
end
end

pb_state = pb.state(nil)
end


-- send error response to client
local function send_error(ws, sequence, err_msg)
local ok, data = pcall(pb.encode, "PubSubResp", {
sequence = sequence,
error_resp = {
code = 0,
message = err_msg,
},
})
if not ok or not data then
log.error("failed to encode error response message, err: ", data)
end

local _, err = ws:send_binary(data)
if err then
log.error("failed to send response to client, err: ", err)
end
end


---
-- Create pubsub module instance
--
-- @function core.pubsub.new
-- @treturn pubsub module instance
-- @treturn string|nil error message if present
-- @usage
-- local pubsub, err = core.pubsub.new()
function _M.new()
if not pb_state then
local err = init_pb_state()
if err then
return nil, err
end
end

local ws, err = ws_server:new()
if not ws then
return nil, err
end

local obj = setmetatable({
ws_server = ws,
cmd_handler = {},
}, mt)

return obj
end


---
-- Add command callbacks to pubsub module instances
--
-- The callback function prototype: function (params)
-- The params in the parameters contain the data defined in the requested command.
-- Its first return value is the data, which needs to contain the data needed for
-- the particular resp, returns nil if an error exists.
-- Its second return value is a string type error message, no need to return when
-- no error exists.
--
-- @function core.pubsub.on
-- @tparam string command to add callback
-- @tparam function handler callback on receipt of command
-- @usage
-- pubsub:on(command, function (params)
-- return data, err
-- end)
function _M.on(self, command, handler)
self.cmd_handler[command] = handler
end


---
-- Put the pubsub instance into an event loop, waiting to process client commands
--
-- @function core.pubsub.wait
-- @usage
-- local err = pubsub:wait()
function _M.wait(self)
local fatal_err
local ws = self.ws_server
while true do
-- read raw data frames from websocket connection
local raw_data, raw_type, err = ws:recv_frame()
if err then
-- terminate the event loop when a fatal error occurs
if ws.fatal then
fatal_err = err
break
end

-- skip this loop for non-fatal errors
log.error("failed to receive websocket frame: ", err)
goto continue
end

-- handle client close connection
if raw_type == "close" then
break
end

-- the pubsub messages use binary, if the message is not
-- binary, skip this message
if raw_type ~= "binary" then
log.warn("pubsub server receive non-binary data, type: ",
raw_type, ", data: ", raw_data)
goto continue
end

-- recovery of stored pb_store
pb.state(pb_state)

local data, err = pb.decode("PubSubReq", raw_data)
if not data then
log.error("pubsub server receives undecodable data, err: ", err)
send_error(ws, 0, "wrong command")
goto continue
end

-- command sequence code
local sequence = data.sequence

-- call command handler to generate response data
for key, value in pairs(data) do
-- There are sequence and command properties in the data,
-- select the handler according to the command value.
if key ~= "sequence" then
local handler = self.cmd_handler[key]
if not handler then
log.error("pubsub callback handler not registered for the",
" command, command: ", key)
send_error(ws, sequence, "unknown command: " .. key)
break
end

local resp, err = handler(value)
if not resp then
send_error(ws, sequence, err)
break
end

-- write back the sequence
resp.sequence = sequence
local ok, data = pcall(pb.encode, "PubSubResp", resp)
if not ok or not data then
log.error("failed to encode response message, err: ", data)
break
end
local _, err = ws:send_binary(data)
if err then
log.error("failed to send response to client, err: ", err)
end
break
end
log.warn("pubsub server receives empty command")
end

::continue::
end

if fatal_err then
log.error("fatal error in pubsub websocket server, err: ", fatal_err)
end
ws:send_close()
end


return _M
96 changes: 96 additions & 0 deletions apisix/include/apisix/model/pubsub.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

syntax = "proto3";

option java_package = "org.apache.apisix.api.pubsub";
option java_outer_classname = "PubSubProto";
option java_multiple_files = true;
option go_package = "github.com/apache/apisix/api/pubsub;pubsub";

/**
* Ping command, used to keep the websocket connection alive
*
* The state field is used to pass some non-specific information,
* which will be returned in the pong response as is.
*/
message CmdPing {
bytes state = 1;
}

/**
* An empty command, a placeholder for testing purposes only
*/
message CmdEmpty {}

/**
* Client request definition for pubsub scenarios
*
* The sequence field is used to associate requests and responses.
* Apache APISIX will set a consistent sequence for the associated
* requests and responses, and the client can explicitly know the
* response corresponding to any of the requests.
*
* The req field is the command data sent by the client, and its
* type will be chosen from any of the lists in the definition.
*
* Field numbers 1 to 30 in the definition are used to define basic
* information and future extensions, and numbers after 30 are used
* to define commands.
*/
message PubSubReq {
int64 sequence = 1;
oneof req {
CmdEmpty cmd_empty = 31;
CmdPing cmd_ping = 32;
};
}

/**
* The response body of the service when an error occurs,
* containing the error code and the error message.
*/
message ErrorResp {
int32 code = 1;
string message = 2;
}

/**
* Pong response, the state field will pass through the
* value in the Ping command field.
*/
message PongResp {
bytes state = 1;
}

/**
* Server response definition for pubsub scenarios
*
* The sequence field will be the same as the value in the
* request, which is used to associate the associated request
* and response.
*
* The resp field is the response data sent by the server, and
* its type will be chosen from any of the lists in the definition.
*/
message PubSubResp {
int64 sequence = 1;
oneof resp {
ErrorResp error_resp = 31;
PongResp pong_resp = 32;
};
}
4 changes: 4 additions & 0 deletions docs/assets/images/pubsub-architecture.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions docs/en/latest/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@
"discovery/kubernetes"
]
},
{
"type": "category",
"label": "PubSub",
"items": [
"pubsub"
]
},
{
"type": "category",
"label": "xRPC",
Expand Down
Loading

0 comments on commit 93184fa

Please sign in to comment.