Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for additional args, which has changed all service func… #71

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
17 changes: 10 additions & 7 deletions include/grpcbox.hrl
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
-record(method, {key :: {unicode:chardata() | '$1', unicode:chardata() | '_'} | '_',
proto :: module() | '$1' | '_',
module :: module() | '_',
function :: atom() | '_',
input :: {term(), boolean()} | '_',
output :: {term(), boolean()} | '_',
opts :: [term()] | '_'}).
-record(method, {key :: {unicode:chardata() | '$1', unicode:chardata() | '_'} | '_',
proto :: module() | '$1' | '_',
module :: module() | '_',
function :: atom() | '_',
input :: {term(), boolean()} | '_',
output :: {term(), boolean()} | '_',
opts :: [term()] | '_',
additional_args :: term()
}
).

%% service definition
-record(grpcbox_def, {service :: atom(),
Expand Down
2 changes: 1 addition & 1 deletion interop/src/grpc_testing_reconnect_service_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-compile(export_all).
-compile(nowarn_export_all).

-include_lib("grpcbox/include/grpcbox.hrl").
-include("grpcbox.hrl").

-define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx).

Expand Down
2 changes: 1 addition & 1 deletion interop/src/grpc_testing_test_service_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-compile(export_all).
-compile(nowarn_export_all).

-include_lib("grpcbox/include/grpcbox.hrl").
-include("grpcbox.hrl").

-define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx).

Expand Down
2 changes: 1 addition & 1 deletion interop/src/grpc_testing_unimplemented_service_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-compile(export_all).
-compile(nowarn_export_all).

-include_lib("grpcbox/include/grpcbox.hrl").
-include("grpcbox.hrl").

-define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx).

Expand Down
17 changes: 10 additions & 7 deletions src/grpcbox_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
-type name() :: t().
-type transport() :: http | https.
-type host() :: inet:ip_address() | inet:hostname().
-type endpoint() :: {transport(), host(), inet:port_number(), ssl:ssl_option()}.
-type endpoint() :: {transport(), host(), inet:port_number(), [gen_tcp:option()], [ssl:ssl_option()]}.

-type options() :: #{balancer => load_balancer(),
encoding => gprcbox:encoding(),
Expand Down Expand Up @@ -169,10 +169,13 @@ insert_stream_interceptor(Name, _Type, Interceptors) ->
end.

start_workers(Pool, StatsHandler, Encoding, Endpoints) ->
[begin
gproc_pool:add_worker(Pool, Endpoint),
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, {Transport, Host, Port, SSLOptions},
Encoding, StatsHandler),
Pid
end || Endpoint={Transport, Host, Port, SSLOptions} <- Endpoints].
[start_worker(Pool, StatsHandler, Encoding, Endpoint) || Endpoint <- Endpoints].

start_worker(Pool, StatsHandler, Encoding, {Transport, Host, Port, SSLOptions}) ->
start_worker(Pool, StatsHandler, Encoding, {Transport, Host, Port, [], SSLOptions});

start_worker(Pool, StatsHandler, Encoding, Endpoint = {Transport, Host, Port, SocketOptions, SSLOptions}) ->
gproc_pool:add_worker(Pool, Endpoint),
{ok, Pid} = grpcbox_subchannel:start_link(Endpoint, Pool, {Transport, Host, Port, SocketOptions, SSLOptions},
Encoding, StatsHandler),
Pid.
51 changes: 51 additions & 0 deletions src/grpcbox_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
stream/5,

send/2,
recv_any/1,
recv_any/2,
recv_headers/1,
recv_headers/2,
recv_data/1,
Expand Down Expand Up @@ -183,6 +185,55 @@ stream(Ctx, Path, Input, Def, Options) ->
Error
end.

recv_any(Stream) ->
recv_any(Stream, 500).

recv_any(Stream = #{ stream_id := Id
, stream_pid := Pid
, monitor_ref := Ref}, Timeout) ->
receive
{headers, Id, Headers} ->
case maps:get(<<":status">>, Headers, undefined) of
<<"200">> ->
{ok, {headers, Headers}};
ErrorStatus ->
{http_error, ErrorStatus, Headers}
end;

{trailers, Id, Trailers} ->
dispatch_trailers(Trailers);

{data, Id, Data} ->
case maps:get(stream_interceptor, Stream, undefined) of
undefined ->
{ok, {data, Data}};
#{recv_msg := RecvMsg} ->
RecvMsg(Stream, fun(_, _) -> {ok, Data} end, Timeout)
end;

{'DOWN', Ref, process, Pid, _Reason} ->
receive
{trailers, Id, Trailers} ->
dispatch_trailers(Trailers);
{error, _} ->
stream_finished
after 0 ->
{error, connectionLost}
end
after Timeout ->
case erlang:is_process_alive(Pid) of
true ->
timeout;
false ->
stream_finished
end
end.

dispatch_trailers({<<"0">>, _Message, _Metadata}) ->
stream_finished;
dispatch_trailers({Status, Message, Metadata}) ->
{error, {Status, Message}, #{trailers => Metadata}}.

recv_data(Stream) ->
recv_data(Stream, 500).
recv_data(Stream=#{stream_interceptor := #{recv_msg := RecvMsg}}, Timeout) ->
Expand Down
2 changes: 1 addition & 1 deletion src/grpcbox_health_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-compile(export_all).
-compile(nowarn_export_all).

-include_lib("grpcbox/include/grpcbox.hrl").
-include("grpcbox.hrl").

-define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx).

Expand Down
10 changes: 5 additions & 5 deletions src/grpcbox_health_service.erl
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
-module(grpcbox_health_service).

-export([check/2,
watch/2]).
-export([check/3,
watch/3]).

check(Ctx, #{service := <<>>}) ->
check(Ctx, #{service := <<>>}, _) ->
{ok, #{status => 'SERVING'}, Ctx};
check(Ctx, #{service := _Service}) ->
check(Ctx, #{service := _Service}, _) ->
%% TODO: lookup if we are serving this service
{ok, #{status => 'UNKNOWN'}, Ctx}.

watch(_Request, _Stream) ->
watch(_Request, _Stream, _) ->
ok.
2 changes: 1 addition & 1 deletion src/grpcbox_reflection_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-compile(export_all).
-compile(nowarn_export_all).

-include_lib("grpcbox/include/grpcbox.hrl").
-include("grpcbox.hrl").

-define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx).

Expand Down
6 changes: 3 additions & 3 deletions src/grpcbox_reflection_service.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-module(grpcbox_reflection_service).

-export([server_reflection_info/2]).
-export([server_reflection_info/3]).

-include("grpcbox.hrl").

Expand All @@ -9,13 +9,13 @@
#{error_code => 12,
error_message => "unimplemented method since extensions removed in proto3"}}).

server_reflection_info(Ref, Stream) ->
server_reflection_info(Ref, Stream, OptionalArgs) ->
receive
{Ref, eos} ->
ok;
{Ref, Message} ->
handle_message(Message, Stream),
server_reflection_info(Ref, Stream)
server_reflection_info(Ref, Stream, OptionalArgs)
end.

handle_message(#{message_request := {list_services, _}}=OriginalRequest, Stream) ->
Expand Down
15 changes: 10 additions & 5 deletions src/grpcbox_services_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,15 @@ load_services([ServicePbModule | Rest], Services, ServicesTable) ->
[begin
{{service, _}, Methods} = ServicePbModule:get_service_def(ServiceName),
%% throws exception if ServiceName isn't in the map or doesn't exist
try ServiceModule = maps:get(ServiceName, Services),
{ServiceModule, ServiceModule:module_info(exports)} of
{ServiceModule1, Exports} ->
try {ServiceModule, AdditionalArgs} = case maps:get(ServiceName, Services) of
{A, B} -> {A, B};
A -> {A, undefined}
end,
{ServiceModule, AdditionalArgs, ServiceModule:module_info(exports)} of
{ServiceModule1, AdditionalArgs1, Exports} ->
[begin
SnakedMethodName = atom_snake_case(Name),
case lists:member({SnakedMethodName, 2}, Exports) of
case lists:member({SnakedMethodName, 3}, Exports) of
true ->
ets:insert(ServicesTable, #method{key={atom_to_binary(ServiceName, utf8),
atom_to_binary(Name, utf8)},
Expand All @@ -144,7 +147,9 @@ load_services([ServicePbModule | Rest], Services, ServicesTable) ->
proto=ServicePbModule,
input={Input, InputStream},
output={Output, OutputStream},
opts=Opts});
opts=Opts,
additional_args = AdditionalArgs1
});
false ->
%% TODO: error? log? insert into ets as unimplemented?
unimplemented_method
Expand Down
54 changes: 36 additions & 18 deletions src/grpcbox_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
error/2,
ctx/1,
ctx/2,
handler_pid/1,
handle_streams/2,
handle_call/2,
handle_info/2]).
Expand Down Expand Up @@ -59,7 +60,8 @@
stream_id :: stream_id(),
method :: #method{} | undefined,
stats_handler :: module() | undefined,
stats :: term() | undefined}).
stats :: term() | undefined
}).

-type t() :: #state{}.

Expand Down Expand Up @@ -172,41 +174,50 @@ authenticate({ok, Cert}, Fun) ->
authenticate(_, _) ->
false.

handle_streams(Ref, State=#state{full_method=FullMethod,
handle_streams(Ref, State=#state{handler=Handler,
full_method=FullMethod,
stream_interceptor=StreamInterceptor,
method=#method{module=Module,
function=Function,
additional_args=AdditionalArgs,
output={_, false}}}) ->
process_flag(trap_exit, true),
case (case StreamInterceptor of
undefined -> Module:Function(Ref, State);
undefined -> Module:Function(Ref, State, AdditionalArgs);
_ ->
ServerInfo = #{full_method => FullMethod,
service => Module,
input_stream => true,
output_stream => false},
StreamInterceptor(Ref, State, ServerInfo, fun Module:Function/2)
StreamInterceptor(Ref, State, ServerInfo, fun(R, S) -> Module:Function(R, S, AdditionalArgs) end)
end) of
{ok, Response, State2} ->
send(Response, State2);
E={grpc_error, _} ->
throw(E);
exit(E);
E={grpc_extended_error, _} ->
throw(E)
exit(E)
end;
handle_streams(Ref, State=#state{full_method=FullMethod,
handle_streams(Ref, State=#state{handler=Handler,
full_method=FullMethod,
stream_interceptor=StreamInterceptor,
method=#method{module=Module,
function=Function,
additional_args=AdditionalArgs,
output={_, true}}}) ->
process_flag(trap_exit, true),
case StreamInterceptor of
undefined ->
Module:Function(Ref, State);
case Module:Function(Ref, State, AdditionalArgs) of
ok -> ok;
Error -> exit(Error)
end;
_ ->
ServerInfo = #{full_method => FullMethod,
service => Module,
input_stream => true,
output_stream => true},
StreamInterceptor(Ref, State, ServerInfo, fun Module:Function/2)
StreamInterceptor(Ref, State, ServerInfo, fun(R, S) -> Module:Function(R, S, AdditionalArgs) end)
end.

on_send_push_promise(_, State) ->
Expand Down Expand Up @@ -271,17 +282,18 @@ handle_unary(Ctx, Message, State=#state{unary_interceptor=UnaryInterceptor,
full_method=FullMethod,
method=#method{module=Module,
function=Function,
additional_args=AdditionalArgs,
proto=_Proto,
input={_Input, _InputStream},
output={_Output, _OutputStream}}}) ->
Ctx1 = ctx_with_stream(Ctx, State),
case (case UnaryInterceptor of
undefined -> Module:Function(Ctx1, Message);
undefined -> Module:Function(Ctx1, Message, AdditionalArgs);
_ ->
ServerInfo = #{full_method => FullMethod,
service => Module},
UnaryInterceptor(Ctx1, Message, ServerInfo,
fun Module:Function/2)
fun (C, S) -> Module:Function(C, S, AdditionalArgs) end)
end) of
{ok, Response, Ctx2} ->
State1 = from_ctx(Ctx2),
Expand Down Expand Up @@ -342,7 +354,7 @@ end_stream(Status, Message, State=#state{connection_pid=ConnPid,
ctx=Ctx,
resp_trailers=Trailers}) ->
EncodedTrailers = grpcbox_utils:encode_headers(Trailers),
h2_connection:send_trailers(ConnPid, StreamId, [{<<"grpc-status">>, Status},
h2_connection:send_trailers(ConnPid, StreamId, [{<<"grpc-status">>, if is_integer(Status) -> integer_to_binary(Status); true -> Status end},
{<<"grpc-message">>, Message} | EncodedTrailers],
[{send_end_stream, true}]),
Ctx1 = ctx:with_value(Ctx, grpc_server_status, grpcbox_utils:status_to_string(Status)),
Expand All @@ -358,7 +370,7 @@ send_headers(State) ->

send_headers(Ctx, Headers) when is_map(Headers) ->
State = from_ctx(Ctx),
send_headers(maps:to_list(maybe_encode_headers(Headers)), State);
ctx_with_stream(Ctx, send_headers(maps:to_list(maybe_encode_headers(Headers)), State));

send_headers(_Metadata, State=#state{headers_sent=true}) ->
State;
Expand Down Expand Up @@ -397,6 +409,12 @@ ctx(#state{handler=Pid}) ->
ctx(#state{handler=Pid}, Ctx) ->
h2_stream:call(Pid, {ctx, Ctx}).

handler_pid(#state{handler=Pid}) ->
Pid;
handler_pid(Ctx) ->
#state{handler=Pid} = from_ctx(Ctx),
Pid.

handle_call(ctx, State=#state{ctx=Ctx}) ->
{ok, Ctx, State};
handle_call({ctx, Ctx}, State) ->
Expand All @@ -408,23 +426,23 @@ handle_info({add_trailers, Trailers}, State) ->
update_trailers(Trailers, State);
handle_info({send_proto, Message}, State) ->
send(false, Message, State);
handle_info({'EXIT', _, normal}, State) ->
handle_info({'EXIT', _Pid, normal}, State) ->
end_stream(State),
State;
handle_info({'EXIT', _, {grpc_error, {Status, Message}}}, State) ->
handle_info({'EXIT', _Pid, {grpc_error, {Status, Message}}}, State) ->
end_stream(Status, Message, State),
State;
handle_info({'EXIT', _, {grpc_extended_error, #{status := Status, message := Message} = ErrorData}}, State) ->
handle_info({'EXIT', _Pid, {grpc_extended_error, #{status := Status, message := Message} = ErrorData}}, State) ->
State1 = add_trailers_from_error_data(ErrorData, State),
end_stream(Status, Message, State1),
State1;
handle_info({'EXIT', _, _Other}, State) ->
handle_info({'EXIT', _Pid, _Other}, State) ->
end_stream(?GRPC_STATUS_UNKNOWN, <<"process exited without reason">>, State),
State;
handle_info({timeout,_Ref,<<"grpc-timeout">>}, State) ->
end_stream(?GRPC_STATUS_DEADLINE_EXCEEDED, <<"Deadline expired">>, State),
State;
handle_info(_, State) ->
handle_info(_Other, State) ->
State.

add_headers(Headers, #state{handler=Pid}) ->
Expand Down
Loading