Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: A few improvements. #246

Merged
merged 7 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/code-format.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name: clang-format Check
on: [push, pull_request]
jobs:
formatting-check:
name: Formatting Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run clang-format style check for C/C++/Protobuf programs.
uses: jidicula/clang-format-action@v4.11.0
with:
clang-format-version: '13'
check-path: 'c_src'
9 changes: 9 additions & 0 deletions c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,15 @@ handle_connection_event_streams_available(QuicerConnCTX *c_ctx,
assert(c_ctx->Connection);
ErlNifEnv *env = c_ctx->env;

if (c_ctx->event_mask & QUICER_CONNECTION_EVENT_MASK_NO_STREAMS_AVAILABLE)
{
TP_CB_3(streams_available, (uintptr_t)c_ctx->Connection, 0);
return QUIC_STATUS_SUCCESS;
}
else
{
TP_CB_3(streams_available, (uintptr_t)c_ctx->Connection, 1);
}
ERL_NIF_TERM props_name[] = { ATOM_BIDI_STREAMS, ATOM_UNIDI_STREAMS };
ERL_NIF_TERM props_value[]
= { enif_make_uint64(env, Event->STREAMS_AVAILABLE.BidirectionalCount),
Expand Down
3 changes: 2 additions & 1 deletion c_src/quicer_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ limitations under the License.

typedef enum QUICER_CONNECTION_EVENT_MASKS
{
QUICER_CONNECTION_EVENT_MASK_NST = 0x00000001
QUICER_CONNECTION_EVENT_MASK_NST = 0x00000001,
QUICER_CONNECTION_EVENT_MASK_NO_STREAMS_AVAILABLE = 0x00000002
} QUICER_CONNECTION_EVENT_MASK;

ERL_NIF_TERM
Expand Down
1 change: 1 addition & 0 deletions include/quicer.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@

%% QUICER_CONNECTION_EVENT_MASKS
-define(QUICER_CONNECTION_EVENT_MASK_NST , 16#00000001).
-define(QUICER_CONNECTION_EVENT_MASK_NO_STREAMS_AVAILABLE , 16#00000002).

%% QUICER_STREAM_EVENT_MASKS
-define(QUICER_STREAM_EVENT_MASK_START_COMPLETE , 16#00000001).
Expand Down
14 changes: 0 additions & 14 deletions src/quicer_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -414,20 +414,6 @@ handle_info({quic, dgram_state_changed, C, Flags},
?tp_ignore_side_effects_in_prod(debug, #{module => ?MODULE, conn => C, event => dgram_state_changed, flags => Flags}),
default_cb_ret(M:datagram_state_changed(C, Flags, CBState), State);

%%% ==============================================================
%%% Handle messages for link/monitor
%%% ==============================================================
handle_info({'EXIT', _Pid, {shutdown, normal}}, State) ->
%% exit signal from stream
{noreply, State};

handle_info({'EXIT', _Pid, {shutdown, _Other}}, State) ->
%% @todo
{noreply, State};

handle_info({'EXIT', _Pid, normal}, State) ->
%% @todo
{noreply, State};
handle_info(OtherInfo, #{callback := M,
callback_state := CBState} = State) ->
default_cb_ret(M:handle_info(OtherInfo, CBState), State).
Expand Down
21 changes: 21 additions & 0 deletions src/quicer_local_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
%% @doc Stream initiated from local
-module(quicer_local_stream).

-export([start/4,
start_link/3,
start_link/4
]).

-include("quicer_types.hrl").

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
Expand Down Expand Up @@ -72,5 +77,21 @@
, handle_continue/2
]).

-type local_stream_opts() :: stream_opts() | proplists:proplist().
-type cb_ret() :: quicer_stream:cb_ret().
-type cb_state() :: quicer_stream:cb_state().

-spec start_link(module(), connection_handle(), local_stream_opts()) -> gen_server:start_ret().
start_link(CallbackModule, Connection, Opts) ->
start_link(CallbackModule, Connection, Opts, []).
-spec start_link(module(), connection_handle(), local_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret().
start_link(CallbackModule, Connection, Opts, StartOpts) when is_list(Opts)->
start_link(CallbackModule, Connection, maps:from_list(Opts), StartOpts);
start_link(CallbackModule, Connection, Opts, StartOpts) ->
quicer_stream:start_link(CallbackModule, Connection, Opts#{is_local => true}, StartOpts).

-spec start(module(), connection_handle(), local_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret().
start(CallbackModule, Connection, Opts, StartOpts) when is_list(Opts) ->
start(CallbackModule, Connection, maps:from_list(Opts), StartOpts);
start(CallbackModule, Connection, Opts, StartOpts) ->
quicer_stream:start(CallbackModule, Connection, Opts#{is_local => true}, StartOpts).
33 changes: 33 additions & 0 deletions src/quicer_remote_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@

-include("quicer_types.hrl").

-export([start/4,
start_link/3,
start_link/4,
start/6,
start_link/5,
start_link/6
]).

-callback init_handoff(stream_handle(), stream_opts(), connection_handle(), new_stream_props()) -> cb_ret().
%% Prepare callback state before ownership handoff

Expand Down Expand Up @@ -71,5 +79,30 @@
, handle_continue/2
]).

-type remote_stream_opts() :: stream_opts() | proplists:proplist().
-type cb_ret() :: quicer_stream:cb_ret().
-type cb_state() :: quicer_stream:cb_state().

-spec start_link(module(), connection_handle(), remote_stream_opts()) -> gen_server:start_ret().
start_link(CallbackModule, Connection, Opts) ->
start_link(CallbackModule, Connection, Opts#{is_local => false}, []).

-spec start_link(module(), connection_handle(), remote_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret().
start_link(CallbackModule, Connection, Opts, StartOpts) ->
quicer_stream:start_link(CallbackModule, Connection, Opts#{is_local => false}, StartOpts).

-spec start(module(), connection_handle(), remote_stream_opts(), [gen_server:start_opt()]) -> gen_server:start_ret().
start(CallbackModule, Connection, Opts, StartOpts) ->
quicer_stream:start(CallbackModule, Connection, Opts#{is_local => false}, StartOpts).

-spec start_link(module(), connection_handle(), stream_handle(), remote_stream_opts(), quicer:new_stream_props()) -> gen_server:start_ret().
start_link(CallbackModule, Connection, Stream, Opts, Props) ->
start_link(CallbackModule, Connection, Stream, Opts, Props, []).

-spec start_link(module(), connection_handle(), stream_handle(), remote_stream_opts(), quicer:new_stream_props(), [gen_server:start_opt()]) -> gen_server:start_ret().
start_link(CallbackModule, Connection, Stream, Opts, Props, StartOpts) ->
quicer_stream:start_link(CallbackModule, Connection, Stream, Opts, Props#{is_local => false}, StartOpts).

-spec start(module(), connection_handle(), stream_handle(), remote_stream_opts(), quicer:new_stream_props(), [gen_server:start_opt()]) -> gen_server:start_ret().
start(CallbackModule, Connection, Stream, Opts, Props, StartOpts) ->
quicer_stream:start(CallbackModule, Connection, Stream, Opts, Props, StartOpts).
12 changes: 7 additions & 5 deletions src/quicer_server_conn_callback.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
, new_stream/3
]).

init(ConnOpts) when is_list(ConnOpts) ->
init(maps:from_list(ConnOpts));
-export([handle_info/2]).

init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
init(S#{stream_opts := maps:from_list(SOpts)});
init(ConnOpts) when is_map(ConnOpts) ->
Expand All @@ -50,7 +50,7 @@ closed(_Conn, #{} = _Flags, S)->

new_conn(Conn, #{version := _Vsn}, #{stream_opts := SOpts} = S) ->
%% @TODO configurable behavior of spawning stream acceptor
case quicer_stream:start_link(maps:get(stream_callback, SOpts), Conn, SOpts) of
case quicer_remote_stream:start_link(maps:get(stream_callback, SOpts), Conn, SOpts) of
{ok, Pid} ->
ok = quicer:async_handshake(Conn),
{ok, S#{ conn => Conn
Expand All @@ -73,8 +73,8 @@ nst_received(_Conn, _Data, S) ->
new_stream(Stream, #{is_orphan := true} = StreamProps,
#{conn := Conn, streams := Streams, stream_opts := SOpts} = CBState) ->
%% Spawn new stream
case quicer_stream:start_link(maps:get(stream_callback, SOpts), Stream, Conn,
SOpts, StreamProps)
case quicer_remote_stream:start_link(maps:get(stream_callback, SOpts), Stream, Conn,
SOpts, StreamProps)
of
{ok, StreamOwner} ->
case quicer:handoff_stream(Stream, StreamOwner) of
Expand Down Expand Up @@ -116,6 +116,8 @@ connected(Conn, _Flags, #{ slow_start := false, stream_opts := SOpts
connected(_Connecion, _Flags, S) ->
{ok, S}.

handle_info({'EXIT', _Pid, _Reason}, State) ->
{ok, State}.

%% Internals

Expand Down
29 changes: 24 additions & 5 deletions src/quicer_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@
%% API
-export([ %% Start before conn handshake, with only Conn handle
start_link/3
, start_link/4
, start/4
%% Start after conn handshake with new Stream Handle
, start_link/5
, start_link/6
, start/6
, send/2
, send/3
]).
Expand Down Expand Up @@ -128,8 +132,12 @@
{error, Error :: {already_started, pid()}} |
{error, Error :: term()} |
ignore.
start_link(Callback, Conn, StreamOpts) when is_atom(Callback) ->
gen_server:start_link(?MODULE, [Callback, Conn, StreamOpts], []).
start_link(Callback, Conn, StreamOpts) ->
start_link(Callback, Conn, StreamOpts, []).
start_link(Callback, Conn, StreamOpts, GenStartOpts) when is_atom(Callback) ->
gen_server:start_link(?MODULE, [Callback, Conn, StreamOpts], GenStartOpts).
start(Callback, Conn, StreamOpts, GenStartOpts) when is_atom(Callback) ->
gen_server:start(?MODULE, [Callback, Conn, StreamOpts], GenStartOpts).

%%--------------------------------------------------------------------
%% @doc Start a new stream owner process and
Expand All @@ -144,11 +152,19 @@ start_link(Callback, Conn, StreamOpts) when is_atom(Callback) ->
{error, Error :: {already_started, pid()}} |
{error, Error :: term()} |
ignore.
start_link(Callback, Stream, Conn, StreamOpts, Props)
start_link(Callback, Stream, Conn, StreamOpts, Props) ->
start_link(Callback, Stream, Conn, StreamOpts, Props, []).
start_link(Callback, Stream, Conn, StreamOpts, Props, GenStartOpts)
when Callback =/= undefined
andalso is_atom(Callback)
andalso is_map(Props) ->
gen_server:start_link(?MODULE, [Callback, Stream, Conn, StreamOpts, Props, self()], []).
gen_server:start_link(?MODULE, [Callback, Stream, Conn, StreamOpts, Props, self()], GenStartOpts).

start(Callback, Stream, Conn, StreamOpts, Props, GenStartOpts)
when Callback =/= undefined
andalso is_atom(Callback)
andalso is_map(Props) ->
gen_server:start(?MODULE, [Callback, Stream, Conn, StreamOpts, Props, self()], GenStartOpts).

-spec send(pid(), binary()) -> {ok, Length::non_neg_integer()} | {error, any()}.
send(StreamProc, Data) ->
Expand Down Expand Up @@ -213,6 +229,7 @@ init([Callback, Conn, StreamOpts]) ->
{ok, InitState#{ stream => undefined
, is_owner => false
, is_local => false
, stream_opts => StreamOpts
}};
{error, Reason} ->
{stop, Reason}
Expand All @@ -231,6 +248,7 @@ init([Callback, Conn, StreamOpts]) ->
, is_owner => true
, is_local => true
, is_unidir => IsUni
, stream_opts => StreamOpts
}
}};
{error, Reason, SecReason} ->
Expand All @@ -249,7 +267,8 @@ init([Callback, Stream, Conn, StreamOpts, Props, PrevOwner]) ->
process_flag(trap_exit, true),
case Callback:init_handoff(Stream, StreamOpts, Conn, Props) of
{ok, CBState} ->
State = #{ is_owner => false
State = #{ is_owner => false %% not yet takeover the ownership
, is_local => false
, stream_opts => StreamOpts
, conn => Conn
, stream => Stream
Expand Down
21 changes: 11 additions & 10 deletions test/example_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
, datagram_state_changed/3
]).

-export([handle_info/2]).

start_link(Host, Port, {_COpts, _SOpts} = Opts)->
quicer_connection:start_link(?MODULE, {Host, Port}, Opts).

Expand All @@ -52,13 +54,12 @@ init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
init(S#{stream_opts := maps:from_list(SOpts)});
init(#{conn := Conn, stream_opts := SOpts} = ConnOpts) when is_map(ConnOpts) ->
%% for accepting
{ok, Stream2} = quicer_stream:start_link(example_client_stream, Conn, SOpts#{is_local => false}),
{ok, Stream2} = quicer_remote_stream:start(example_client_stream, Conn, SOpts, [{spawn_opt, [link]}]),
%% for sending unidi_streams
{ok, Stream1} = quicer_stream:start_link(example_client_stream, Conn,
SOpts#{ is_local => true
, open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL
}),
{ok,_} = quicer_stream:send(Stream1, <<"ping_from_example">>, ?QUICER_SEND_FLAG_SYNC bor ?QUIC_SEND_FLAG_FIN),
{ok, Stream1} = quicer_local_stream:start(example_client_stream, Conn,
SOpts#{open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}, [{spawn_opt, [link]}]),

{ok, _} = quicer_stream:send(Stream1, <<"ping_from_example">>, ?QUICER_SEND_FLAG_SYNC bor ?QUIC_SEND_FLAG_FIN),
{ok, ConnOpts#{master_stream_pair => {Stream1, Stream2}}}.

closed(_Conn, #{is_peer_acked := true}, S)->
Expand Down Expand Up @@ -87,7 +88,7 @@ nst_received(_Conn, Data, S) ->
new_stream(Stream, Flags, #{ conn := Conn, streams := Streams
, stream_opts := SOpts} = CBState) ->
%% Spawn new stream
case quicer_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of
case quicer_remote_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of
{ok, StreamOwner} ->
quicer_connection:handoff_stream(Stream, StreamOwner),
{ok, CBState#{ streams := [ {StreamOwner, Stream} | Streams] }};
Expand Down Expand Up @@ -120,7 +121,7 @@ peer_needs_streams(C, #{unidi_streams := Current}, S) ->
{ok, S};
peer_needs_streams(C, #{bidi_streams := Current}, S) ->
ok = quicer:setopt(C, param_conn_settings, #{peer_bidi_stream_count => Current + 1}),
{ok, S};
%% for https://github.com/microsoft/msquic/issues/3120
peer_needs_streams(_C, undefined, S) ->
{ok, S}.

handle_info({'EXIT', _Pid, _Reason}, State) ->
{ok, State}.
12 changes: 7 additions & 5 deletions test/example_server_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
, datagram_state_changed/3
]).

-export([handle_info/2]).

init(ConnOpts) when is_list(ConnOpts) ->
init(maps:from_list(ConnOpts));
init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
Expand All @@ -61,7 +63,7 @@ closed(_Conn, _CloseProp, S) ->
{stop, normal, S}.

new_conn(Conn, #{version := _Vsn}, #{stream_opts := SOpts} = S) ->
case quicer_stream:start_link(example_server_stream, Conn, SOpts) of
case quicer_remote_stream:start_link(example_server_stream, Conn, SOpts) of
{ok, Pid} ->
ok = quicer:async_handshake(Conn),
{ok, S#{ conn => Conn
Expand All @@ -86,7 +88,7 @@ nst_received(_Conn, _Data, S) ->
new_stream(Stream, Flags, #{ conn := Conn, streams := Streams
, stream_opts := SOpts} = CBState) ->
%% Spawn new stream
case quicer_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of
case quicer_remote_stream:start(example_server_stream, Stream, Conn, SOpts, Flags, [{spawn_opt, [link]}]) of
{ok, StreamOwner} ->
case quicer:handoff_stream(Stream, StreamOwner) of
ok ->
Expand Down Expand Up @@ -119,10 +121,10 @@ peer_needs_streams(C, unidi_streams, S) ->
{ok, S};
peer_needs_streams(_C, bidi_streams, S) ->
%% leave it for test case to unblock it, see tc_multi_streams_example_server_3
{ok, S};
%% for https://github.com/microsoft/msquic/issues/3120
peer_needs_streams(_C, undefined, S) ->
{ok, S}.

datagram_state_changed(_C, _Flags, S) ->
{ok, S}.

handle_info({'EXIT', _Pid, _Reason}, State) ->
{ok, State}.
6 changes: 2 additions & 4 deletions test/example_server_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,8 @@ handle_stream_data(Stream, Bin, _Flags, #{is_unidir := true, peer_stream := Peer

case PeerStream of
undefined ->
{ok, StreamProc} = quicer_stream:start_link(?MODULE, Conn,
[ {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}
, {is_local, true}
]),
{ok, StreamProc} = quicer_local_stream:start_link(?MODULE, Conn,
[ {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL} ]),
{ok, _} = quicer_stream:send(StreamProc, Bin),
{ok, State#{peer_stream := StreamProc}};
StreamProc when is_pid(StreamProc) ->
Expand Down