-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/wrpc #8357
Feat/wrpc #8357
Conversation
dd6c16a
to
5b048d3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great so far; some nits and suggestions for now; feel free to punt some of them during another iteration.
First pass; need to look over kong/tools/wrpc.lua
more though.
local function get_config_service(self) | ||
if not wrpc_config_service then | ||
wrpc_config_service = wrpc.new_service() | ||
wrpc_config_service:add("kong.services.config.v1.config") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we make the version a configuration item; conf.wrpc_version
? This will allow us to switch between versions in the future either during development or a forced downgrade by the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a new protocol version would require new code. i think it's best that each version of the code hardcodes the appropriate version of the .proto; in fact, .proto files are part of the code, even if loaded dynamically instead of generating static code as on Go
end | ||
end | ||
|
||
return _M |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a lot of duplicate code; similar to the other control plane protocol variants. We may want to outside of this PR clean those up for maintenance purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! Let's get this in and then iterate on the remaining items in your list and then we can focus on other issues as them come in. With this code being completely isolated from the JSON payload we will be in good shape.
4e19089
to
bf8fbeb
Compare
08dd8a4
to
7e9a9a2
Compare
kong/include/kong/model/target.proto
Outdated
import "kong/model/upstream.proto"; | ||
|
||
message Target { | ||
double created_at = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the type
should be int32
instead of double
? it is defined as int32
in all other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i've forwarded the fix to the original repo (these .proto files are copied from there, ideally should be copied during build time and not on the repo. we'll get there soon)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And maybe more refactoring(extracting common codes) also helps when implementing the version negotiation.
fe9a68b
to
bd48b61
Compare
rebased on top of master |
The DP-CP config connection uses `wrpc_data_plane.DPCP_CHANNEL_NAME` so for any other use you can just `local p = wrpc.new_remote_client(service, wrpc_data_plane.DPCP_CHANNEL_NAME)`
DP defaults to wrpc
8b44567
to
88d0ce9
Compare
if type_annotations then | ||
local tag_key, tag_value = annotation:match("^%s*(%S-)=(%S+)%s*$") | ||
if tag_key and tag_value then | ||
tag_value = tag_value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems of no effect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right. seems a leftover from debugging code
--- @param dict shdict shdict to use | ||
--- @return any, string, string value, channel name, error | ||
function Channel.wait_all(dict) | ||
local name, err = get_wait(dict, NAME_KEY, now() + DEFAULT_TIMEOUT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just use one dictionary key, and put the name of the channel together into the value? So we could fetch only once.
After all, post keeps blocked if NAME_KEY
is not free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is one of the reasons to move to a socket implementation, it's hard to follow which concepts tie 1:1 into which and which are 1:n
in this case, i think the purpose was to reduce serialization of processes, each client pushes their value without blocking each other, and only block on making the "server" pull the notification. at higher level i think there's still the same contention anyway... but it's important to be able for each client to fire the message and not wait for all the others to be served.
assert(mthd.name) | ||
local rpc_name = srvc.name .. "." .. mthd.name | ||
|
||
local service_id = assert(annotations.service[srvc.name] and annotations.service[srvc.name]["service-id"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just do this?
local id_assigner = {}
id_assigner.__index = id_assigner
setmetatable(id_assigner, id_assigner)
function id_assigner.new()
return setmetatable({
[0] = 0,
}, id_assigner)
end
function id_assigner:get_id(name)
if self[name] == nil then
self[name] = self[0]
self[0] = self[0] + 1
end
return self[name]
end
-- ....
function wrpc_service:add(service_name)
local service_ids = id_assigner_new()
local rpc_ids = {}
local service_fname = assert(proto_searchpath(service_name))
grpc.each_method(service_fname, function(_, srvc, mthd)
assert(srvc.name)
assert(mthd.name)
local rpc_name = srvc.name .. "." .. mthd.name
local service_id = service_ids:get_id(srvc.name)
if rpc_ids[service_id] == nil then
rpc_ids[service_id] = id_assigner_new()
end
local rpc_id = rpc_ids[service_id]:get_id(rpc_name)
local rpc = {
name = rpc_name,
service_id = tonumber(service_id),
rpc_id = tonumber(rpc_id),
input_type = mthd.input_type,
output_type = mthd.output_type,
}
self.methods[service_id .. ":" .. rpc_id] = rpc
self.methods[rpc_name] = rpc
end, true)
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean dynamically assign ids to methods and rpcs? no, we must respect those in the .proto
file
tag_value = tag_value | ||
local tags = type_annotations[identifier] or {} | ||
type_annotations[identifier] = tags | ||
tags[tag_key] = tag_value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tags seem not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they're part of the annotation
structure, it's put into type_annotations
, which is a subtable of annotations
. after scanning the file, it's read to get the service_id
and rpc_id
on the .each_method()
loop
--- If calling the same method with the same args several times, | ||
--- (to the same or different peers), this method returns the | ||
--- invariant part, so it can be cached to reduce encoding overhead | ||
function wrpc_service:encode_args(name, ...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that all call to call
and call_wait
uses a table to contain arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what do you mean. the specific case i had in mind is the CP sending the same config to several DP nodes. instead of repeating the call with the unencoded config, it calls service:encode_args()
once, and then peer:send_encoded_call()
for each DP node
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean like this:
local resp, err = peer:call_wait("ConfigService.ReportMetadata", { plugins = self.plugins_list })
assert(peer:call("ConfigService.PingCP", { hash = hash }))
This comment is related to the comment below.
local num_args = select('#', ...) | ||
local payloads = table.new(num_args, 0) | ||
for i = 1, num_args do | ||
payloads[i] = assert(pb.encode(rpc.input_type, select(i, ...))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Encode every arguments with the same input_type
seems a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the RPC definitions in .proto files only specify one input type. still, the frame structure supports repeated payloads. a single call could specify several items of the same time instead of using an array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I misinterpreted the comment in the wrpc.proto:
// payloads is an array representing the request or response data of an RPC.
// A request message MAY contain multiple elements where each element represents
// an argument to the RPC call. The order of the elements MUST correspond to
// the order of arguments in the function call.
// A response message MUST contain a single payload.
// Use a wrapper type for RPCs which contain multiple responses.
// Unless otherwise specified by the Service or RPC, the encoding method in
// use is PROTO3.
//
// Note: This results in double proto3 encoding. Once encoding of payloads,
// and then encoding of the entire message. We acknowledge that there are
// some overheads here and they will be addressed in a future iteration.
repeated bytes payloads = 9;
By "each element represents an argument" I thought it was for every member of the request message (like "hash" and "plugins" in the previous comment).
Also, if an RPC call receives repeated messages, it cannot use repeat like this:
rpc SomeCall(repeated SomeCallRequest) returns (SomeCallResponse);
But to use something like:
Message RepeatedSomeCallRequest {
repeated SomeCallRequest arguments = 1;
}
So I don't think it makes sense to encode an array of payloads...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, if an RPC call receives repeated messages, it cannot use repeat like this:
rpc SomeCall(repeated SomeCallRequest) returns (SomeCallResponse);
right, the syntax doesn't support this. but in the current implementation, the "repeated" is implied, so this could be done:
rpc SomeCall(SomeCallRequest) returns (someCallResponse);
message SomeCallRequest {
string name = 1;
string value = 2;
}
and in Lua:
peer:call("serv.SomeCall", {name="one", value=1}, {name="two", value=2}, {name="three", value=3})
and it would result in a single call with three payload elements of the same type.
But I agree that it's not an important usecase, and the structure you describe would be just as good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hbagdi May you clarify this? Do we really want to implement it this way?
wRPC protocol in CP/DP communications
Note: there's still some code duplication between the existing (json) protocol and this. This is to ensure the feature flag (cluster_protocol=json|wrpc) doesn't activate any new code. Once the version negotiation feature (#8397 ) is merged in, this will be refactored on top of that and repeated code will be factored away.