From 13698cca327b66fd71999d82b31d36bc294885d1 Mon Sep 17 00:00:00 2001
From: zhongwencool <zhongwencool@gmail.com>
Date: Fri, 7 May 2021 09:05:08 +0800
Subject: [PATCH] [gun_trailers message] return grpc_error message when stream
 is breaked by etcd server

---
 rebar.lock                             |  7 +++++--
 src/eetcd.app.src                      |  2 +-
 src/eetcd_election.erl                 | 12 +++++++++++-
 src/eetcd_stream.erl                   |  1 +
 src/eetcd_watch.erl                    |  9 +++++++--
 test/eetcd_election_leader_example.erl | 20 ++++++++++++++------
 6 files changed, 39 insertions(+), 12 deletions(-)

diff --git a/rebar.lock b/rebar.lock
index 03e6c25..6dde131 100644
--- a/rebar.lock
+++ b/rebar.lock
@@ -1,8 +1,11 @@
-{"1.1.0",
+{"1.2.0",
 [{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.7.3">>},1},
  {<<"gun">>,{pkg,<<"gun">>,<<"1.3.3">>},0}]}.
 [
 {pkg_hash,[
  {<<"cowlib">>, <<"A7FFCD0917E6D50B4D5FB28E9E2085A0CEB3C97DEA310505F7460FF5ED764CE9">>},
- {<<"gun">>, <<"CF8B51BEB36C22B9C8DF1921E3F2BC4D2B1F68B49AD4FBC64E91875AA14E16B4">>}]}
+ {<<"gun">>, <<"CF8B51BEB36C22B9C8DF1921E3F2BC4D2B1F68B49AD4FBC64E91875AA14E16B4">>}]},
+{pkg_hash_ext,[
+ {<<"cowlib">>, <<"1E1A3D176D52DAEBBECBBCDFD27C27726076567905C2A9D7398C54DA9D225761">>},
+ {<<"gun">>, <<"3106CE167F9C9723F849E4FB54EA4A4D814E3996AE243A1C828B256E749041E0">>}]}
 ].
diff --git a/src/eetcd.app.src b/src/eetcd.app.src
index 1646d50..d7d584b 100644
--- a/src/eetcd.app.src
+++ b/src/eetcd.app.src
@@ -1,7 +1,7 @@
 {application, eetcd,
     [
         {description, "ETCD V3 client"},
-        {vsn, "0.3.3"},
+        {vsn, "0.3.4"},
         {registered, [eetcd_sup, eetcd_conn_sup, eetcd_lease_sup]},
         {mod, {eetcd_app, []}},
         {applications, [kernel, stdlib, gun]},
diff --git a/src/eetcd_election.erl b/src/eetcd_election.erl
index b980e56..ba1cff3 100644
--- a/src/eetcd_election.erl
+++ b/src/eetcd_election.erl
@@ -285,12 +285,22 @@ observe_stream(OCtx, Msg) ->
 resp_stream(#{stream_ref := Ref, http2_pid := Pid},
     {gun_response, Pid, Ref, nofin, 200, _Headers}) ->
     receive {gun_data, Pid, Ref, nofin, Bin} ->
-        {ok, Bin}
+        receive {gun_trailers, Pid, Ref, [{<<"grpc-status">>, <<"0">>}, {<<"grpc-message">>, <<>>}]} ->
+            {ok, Bin};
+        {gun_trailers, Pid, Ref, [{<<"grpc-status">>, GrpcStatus}, {<<"grpc-message">>, GrpcMsg}]} ->
+            {error, ?GRPC_ERROR(GrpcStatus, GrpcMsg)}
+        after 2000 -> unknown
+        end
     after 2000 -> unknown
     end;
 resp_stream(#{stream_ref := Ref, http2_pid := Pid},
     {gun_data, Pid, Ref, nofin, Bin}) ->
     {ok, Bin};
+resp_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
+    {gun_trailers, Pid, SRef, [{<<"grpc-status">>, GrpcStatus}, {<<"grpc-message">>, GrpcMsg}]}) ->    %% grpc error
+    erlang:demonitor(MRef, [flush]),
+    gun:cancel(Pid, SRef),
+    {error, ?GRPC_ERROR(GrpcStatus, GrpcMsg)};
 resp_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
     {gun_error, Pid, SRef, Reason}) -> %% stream error
     erlang:demonitor(MRef, [flush]),
diff --git a/src/eetcd_stream.erl b/src/eetcd_stream.erl
index cdcb980..1aa5f08 100644
--- a/src/eetcd_stream.erl
+++ b/src/eetcd_stream.erl
@@ -139,6 +139,7 @@ await_body(ServerPid, StreamRef, Timeout, MRef, Acc) ->
         {gun_data, ServerPid, StreamRef, fin, Data} ->
             {ok, <<Acc/binary, Data/binary>>};
     %% It's OK to return trailers here because the client specifically requested them
+    %% Trailers are grpc_status and grpc_message headers
         {gun_trailers, ServerPid, StreamRef, Trailers} ->
             {ok, Acc, Trailers};
         {gun_error, ServerPid, StreamRef, Reason} ->
diff --git a/src/eetcd_watch.erl b/src/eetcd_watch.erl
index e12c4ec..a754e5a 100644
--- a/src/eetcd_watch.erl
+++ b/src/eetcd_watch.erl
@@ -168,13 +168,13 @@ watch(Name, CreateReq, Timeout) ->
 %%that is, a gun_* message received on the gun connection.
 %%If it is, then this function will parse the message, turn it into  watch responses, and possibly take action given the responses.
 %%If there's no error, this function returns {ok, WatchConn, 'Etcd.WatchResponse'()}|{more, WatchConn}
-%%If there's an error, {error, {stream_error | conn_error | http2_down, term()} | timeout} is returned.
+%%If there's an error, {error, {grpc_error, stream_error | conn_error | http2_down, term()} | timeout} is returned.
 %%If the given message is not from the gun connection, this function returns unknown.
 -spec watch_stream(watch_conn(), Message) ->
     {ok, watch_conn(), router_pb:'Etcd.WatchResponse'()}
     | {more, watch_conn()}
     | unknown
-    | {error, {stream_error | conn_error | http2_down, term()}} when
+    | {error, {grpc_error, stream_error | conn_error | http2_down, term()}} when
     Message :: term().
 
 watch_stream(#{stream_ref := Ref, http2_pid := Pid, unprocessed := Unprocessed} = Conn,
@@ -194,6 +194,11 @@ watch_stream(#{stream_ref := Ref, http2_pid := Pid, unprocessed := Unprocessed}
                 Resp};
         more -> {more, Conn#{unprocessed => Bin}}
     end;
+watch_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
+    {gun_trailers, Pid, SRef, [{<<"grpc-status">>, Status}, {<<"grpc-message">>, Msg}]}) ->
+    erlang:demonitor(MRef, [flush]),
+    gun:cancel(Pid, SRef),
+    {error, {grpc_error, ?GRPC_ERROR(Status, Msg)}};
 watch_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
     {gun_error, Pid, SRef, Reason}) -> %% stream error
     erlang:demonitor(MRef, [flush]),
diff --git a/test/eetcd_election_leader_example.erl b/test/eetcd_election_leader_example.erl
index c0951e1..b2b8b96 100644
--- a/test/eetcd_election_leader_example.erl
+++ b/test/eetcd_election_leader_example.erl
@@ -32,6 +32,7 @@ resign(Pid) ->
 %%%===================================================================
 
 init([Etcd, LeaderKey, Value]) ->
+    logger:set_primary_config(#{level => info}),
     erlang:process_flag(trap_exit, true),
     {ok, #{'ID' := LeaseID}} = eetcd_lease:grant(Etcd, 8),
     {ok, _} = eetcd_lease:keep_alive(Etcd, LeaseID),
@@ -66,9 +67,11 @@ handle_info(Msg, State) ->
     #{campaign := Campaign, observe := Observe} = State,
     case eetcd_election:campaign_response(Campaign, Msg) of
         {ok, NewCampaign = #{campaign := Leader}} ->
+            %% Only get this response when you win campaign by yourself.
+            %% You are leader!
             win_campaign_event(Leader),
             {noreply, State#{campaign => NewCampaign}};
-        {error, Reason} -> %% you can just let it crash and restart process
+        {error, Reason} -> %% you can just let it crash and restart process or recampaign !!!
             campaign_unexpected_error(Reason),
             {noreply, State};
         unknown ->
@@ -95,17 +98,22 @@ code_change(_OldVsn, State = #{}, _Extra) ->
 %%%===================================================================
 %%% Internal functions
 %%%===================================================================
-win_campaign_event(_Leader) ->
+win_campaign_event(Leader) ->
+    logger:info("win campaign event:~p", [Leader]),
     "Todo".
 
-campaign_unexpected_error(_Reason) ->
+campaign_unexpected_error(Reason) ->
+    logger:info("campaign unexpected error:~p", [Reason]),
     "Todo: try to recampaign".
 
-leader_change_event(_Leader) ->
+leader_change_event(Leader) ->
+    logger:info("leader change event:~p", [Leader]),
     "Todo".
 
-observe_unexpected_error(_Reason) ->
+observe_unexpected_error(Reason) ->
+    logger:info("observe unexpect error:~p", [Reason]),
     "Todo: try to reobserve after some sleep.".
 
-handle_info_your_own_msg(_Msg, _State) ->
+handle_info_your_own_msg(Msg, State) ->
+    logger:info("hanle info your own msg:~p ~p", [Msg, State]),
     "Todo".
\ No newline at end of file