Skip to content

Commit

Permalink
Interpret x-death header from AMQP 0.9.1 client
Browse files Browse the repository at this point in the history
Fixes #10709

This commit fixes the following regression which worked in 3.12.x, but
stopped working in 3.13.0 and 3.13.1:

```
AMQP 0.9.1 client    --publish-->
Q                    --dead-letter-->
DLQ                  --consume-->
AMQP 0.9.1 client (death count is now 1) --republish-same-message-with-headers-as-just-received-->
Q                    --dead-letter-->
DLQ                  --consume -->
AMQP 0.9.1 (death count is now 1, but should be 2)
```

The reason this behaviour stopped to work in 3.13.0 is that the broker
won't specially interpret x-headers in general, and the x-death header
specifically in this case anymore.

In other words, the new desired 3.13 behaviour with message containers
is that "x-headers belong to the broker".

While this is correct, it does break client applications which depended
on the previous use case.
One simple fix is that the client application does not re-publish with
the x-death header, but instead sets its own custom count header to
determine the number of times it retries.

This commit will only be packported to v3.13.x branch.
In other words, 4.0 won't interpret x-headers as done in 3.13.0 and
3.13.1.

The reason we backport this commit to v3.13.x is that the Spring
documentation expliclity recommends re-publishing the message with
x-death header being set:
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-rabbit/3.0.6.RELEASE/reference/html/spring-cloud-stream-binder-rabbit.html#_retry_with_the_rabbitmq_binder
  • Loading branch information
ansd committed Apr 19, 2024
1 parent 13161d5 commit a9151e3
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 65 deletions.
134 changes: 92 additions & 42 deletions deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -507,45 +507,6 @@ from_basic_message(#basic_message{content = Content,

%% Internal

deaths_to_headers(undefined, Headers) ->
Headers;
deaths_to_headers(#deaths{records = Records}, Headers0) ->
%% sort records by the last timestamp
List = lists:sort(
fun({_, #death{anns = #{last_time := L1}}},
{_, #death{anns = #{last_time := L2}}}) ->
L1 < L2
end, maps:to_list(Records)),
Infos = lists:foldl(
fun ({{QName, Reason}, #death{anns = #{first_time := Ts} = DA,
exchange = Ex,
count = Count,
routing_keys = RoutingKeys}},
Acc) ->
%% The first routing key is the one specified in the
%% basic.publish; all others are CC or BCC keys.
RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers0)],
RKeys = [{longstr, Key} || Key <- RKs],
ReasonBin = atom_to_binary(Reason, utf8),
PerMsgTTL = case maps:get(ttl, DA, undefined) of
undefined -> [];
Ttl when is_integer(Ttl) ->
Expiration = integer_to_binary(Ttl),
[{<<"original-expiration">>, longstr,
Expiration}]
end,
[{table, [{<<"count">>, long, Count},
{<<"reason">>, longstr, ReasonBin},
{<<"queue">>, longstr, QName},
{<<"time">>, timestamp, Ts div 1000},
{<<"exchange">>, longstr, Ex},
{<<"routing-keys">>, array, RKeys}] ++ PerMsgTTL}
| Acc]
end, [], List),
rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos).



strip_header(#content{properties = #'P_basic'{headers = undefined}}
= DecodedContent, _Key) ->
DecodedContent;
Expand Down Expand Up @@ -695,10 +656,11 @@ message_id({utf8, S}, HKey, H0) ->
message_id(undefined, _HKey, H) ->
{H, undefined}.

essential_properties(#content{} = C) ->
essential_properties(#content{properties = Props}) ->
#'P_basic'{delivery_mode = Mode,
priority = Priority,
timestamp = TimestampRaw} = Props = C#content.properties,
timestamp = TimestampRaw,
headers = Headers} = Props,
{ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
Timestamp = case TimestampRaw of
undefined ->
Expand All @@ -708,6 +670,8 @@ essential_properties(#content{} = C) ->
TimestampRaw * 1000
end,
Durable = Mode == 2,
Deaths = headers_to_deaths(Headers),

maps_put_truthy(
?ANN_PRIORITY, Priority,
maps_put_truthy(
Expand All @@ -716,7 +680,93 @@ essential_properties(#content{} = C) ->
?ANN_TIMESTAMP, Timestamp,
maps_put_falsy(
?ANN_DURABLE, Durable,
#{})))).
maps_put_truthy(
deaths, Deaths,
#{}))))).

headers_to_deaths(undefined) ->
undefined;
headers_to_deaths(Headers) ->
case lists:keymember(<<"x-death">>, 1, Headers) of
true ->
case rabbit_misc:amqp_table(Headers) of
#{<<"x-first-death-queue">> := FstQ,
<<"x-first-death-reason">> := FstR,
<<"x-last-death-queue">> := LastQ,
<<"x-last-death-reason">> := LastR,
<<"x-death">> := XDeathList} ->
#deaths{first = {FstQ, binary_to_existing_atom(FstR)},
last = {LastQ, binary_to_existing_atom(LastR)},
records = recover_deaths(XDeathList, #{})};
_ ->
undefined
end;
false ->
undefined
end.

recover_deaths([], Acc) ->
Acc;
recover_deaths([Map = #{<<"exchange">> := Exchange,
<<"queue">> := Queue,
<<"routing-keys">> := RKeys,
<<"reason">> := ReasonBin,
<<"count">> := Count,
<<"time">> := Ts} | Rem], Acc) ->
Reason = binary_to_existing_atom(ReasonBin),
DA = #{first_time => Ts,
last_time => Ts},
DeathAnns = case Map of
#{<<"original-expiration">> := Exp} ->
DA#{ttl => binary_to_integer(Exp)};
_ ->
DA
end,
recover_deaths(Rem,
Acc#{{Queue, Reason} =>
#death{anns = DeathAnns,
exchange = Exchange,
count = Count,
routing_keys = RKeys}});
recover_deaths([_IgnoreInvalid | Rem], Acc) ->
recover_deaths(Rem, Acc).

deaths_to_headers(undefined, Headers) ->
Headers;
deaths_to_headers(#deaths{records = Records}, Headers0) ->
%% sort records by the last timestamp
List = lists:sort(
fun({_, #death{anns = #{last_time := L1}}},
{_, #death{anns = #{last_time := L2}}}) ->
L1 < L2
end, maps:to_list(Records)),
Infos = lists:foldl(
fun ({{QName, Reason}, #death{anns = #{last_time := Ts} = DA,
exchange = Ex,
count = Count,
routing_keys = RoutingKeys}},
Acc) ->
%% The first routing key is the one specified in the
%% basic.publish; all others are CC or BCC keys.
RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers0)],
RKeys = [{longstr, Key} || Key <- RKs],
ReasonBin = atom_to_binary(Reason, utf8),
PerMsgTTL = case maps:get(ttl, DA, undefined) of
undefined -> [];
Ttl when is_integer(Ttl) ->
Expiration = integer_to_binary(Ttl),
[{<<"original-expiration">>, longstr,
Expiration}]
end,
[{table, [{<<"count">>, long, Count},
{<<"reason">>, longstr, ReasonBin},
{<<"queue">>, longstr, QName},
{<<"time">>, timestamp, Ts div 1000},
{<<"exchange">>, longstr, Ex},
{<<"routing-keys">>, array, RKeys}] ++ PerMsgTTL}
| Acc]
end, [], List),
rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos).

%% headers that are added as annotations during conversions
is_internal_header(<<"x-basic-", _/binary>>) ->
Expand Down
45 changes: 44 additions & 1 deletion deps/rabbit/test/dead_lettering_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ groups() ->
dead_letter_routing_key_cycle_ttl,
dead_letter_headers_reason_expired,
dead_letter_headers_reason_expired_per_message,
dead_letter_extra_bcc],
dead_letter_extra_bcc,
x_death_header_from_amqpl_client],
DisabledMetricTests = [metric_maxlen,
metric_rejected,
metric_expired_queue_msg_ttl,
Expand Down Expand Up @@ -1493,6 +1494,48 @@ dead_letter_extra_bcc(Config) ->
[_] = consume(Ch, ExtraBCCQ, [P]),
ok.

%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/10709
%% The Spring Cloud Stream RabbitMQ Binder Reference Guide recommends relying on the
%% count field of the x-death header to determine how often a message was dead lettered
%% in a loop involving RabbitMQ and an AMQP 0.9.1 client.
%% This test therefore asserts that RabbitMQ interprets the x-death header if an AMQP 0.9.1
%% clients (re)publishes a message with the x-death header set.
%% This test case should only pass up to 3.13.
%% Starting with 4.0, RabbitMQ won't interpret x-headers sent from clients anymore in any
%% special way as x-headers "belong to the broker".
x_death_header_from_amqpl_client(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),
DLXQName = ?config(queue_name_dlx, Config),
declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 0}]),

Payload = <<"my payload">>,
ok = amqp_channel:call(Ch,
#'basic.publish'{routing_key = QName},
#amqp_msg{payload = Payload}),
wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
{
#'basic.get_ok'{},
#amqp_msg{props = #'P_basic'{headers = Headers1}} = Msg1
} = amqp_channel:call(Ch, #'basic.get'{queue = DLXQName,
no_ack = true}),
{array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>),
?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)),

ok = amqp_channel:call(Ch,
#'basic.publish'{routing_key = QName},
%% Re-publish the same message we received including the x-death header.
Msg1),
wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
{
#'basic.get_ok'{},
#amqp_msg{payload = Payload,
props = #'P_basic'{headers = Headers2}}
} = amqp_channel:call(Ch, #'basic.get'{queue = DLXQName,
no_ack = true}),
{array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>),
?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)).

set_queue_options(QName, Options) ->
rabbit_amqqueue:update(rabbit_misc:r(<<"/">>, queue, QName),
fun(Q) ->
Expand Down
65 changes: 43 additions & 22 deletions deps/rabbit/test/mc_unit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -192,48 +192,69 @@ amqpl_table_x_header_array_of_tbls(_Config) ->
ok.

amqpl_death_records(_Config) ->
Content = #content{class_id = 60,
properties = #'P_basic'{headers = []},
payload_fragments_rev = [<<"data">>]},
Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())),

Msg1 = mc:record_death(rejected, <<"q1">>, Msg0),
?assertEqual([<<"q1">>], mc:death_queue_names(Msg1)),
?assertMatch({{<<"q1">>, rejected},
Q = <<"my queue">>,
DLQ = <<"my dead letter queue">>,

Content0 = #content{class_id = 60,
properties = #'P_basic'{headers = [],
expiration = <<"9999">>},
payload_fragments_rev = [<<"data">>]},
Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content0, annotations())),
Msg1 = mc:record_death(rejected, Q, Msg0),

%% Roundtrip simulates message being sent to and received from AMQP 0.9.1 client.
Content1 = mc:protocol_state(Msg1),
Msg2 = mc:init(mc_amqpl, Content1, annotations()),

?assertEqual([Q], mc:death_queue_names(Msg2)),
?assertMatch({{Q, rejected},
#death{exchange = <<"exch">>,
routing_keys = [<<"apple">>],
count = 1}}, mc:last_death(Msg1)),
?assertEqual(false, mc:is_death_cycle(<<"q1">>, Msg1)),
count = 1}}, mc:last_death(Msg2)),
?assertEqual(false, mc:is_death_cycle(Q, Msg2)),

#content{properties = #'P_basic'{headers = H1}} = mc:protocol_state(Msg1),
#content{properties = #'P_basic'{headers = H1}} = mc:protocol_state(Msg2),
?assertMatch({_, array, [_]}, header(<<"x-death">>, H1)),
?assertMatch({_, longstr, <<"q1">>}, header(<<"x-first-death-queue">>, H1)),
?assertMatch({_, longstr, <<"q1">>}, header(<<"x-last-death-queue">>, H1)),
?assertMatch({_, longstr, Q}, header(<<"x-first-death-queue">>, H1)),
?assertMatch({_, longstr, Q}, header(<<"x-last-death-queue">>, H1)),
?assertMatch({_, longstr, <<"exch">>}, header(<<"x-first-death-exchange">>, H1)),
?assertMatch({_, longstr, <<"exch">>}, header(<<"x-last-death-exchange">>, H1)),
?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-first-death-reason">>, H1)),
?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-last-death-reason">>, H1)),
{_, array, [{table, T1}]} = header(<<"x-death">>, H1),
?assertMatch({_, long, 1}, header(<<"count">>, T1)),
?assertMatch({_, longstr, <<"rejected">>}, header(<<"reason">>, T1)),
?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T1)),
?assertMatch({_, longstr, Q}, header(<<"queue">>, T1)),
?assertMatch({_, longstr, <<"exch">>}, header(<<"exchange">>, T1)),
?assertMatch({_, timestamp, _}, header(<<"time">>, T1)),
?assertMatch({_, array, [{longstr, <<"apple">>}]}, header(<<"routing-keys">>, T1)),
?assertMatch({_, longstr, <<"9999">>}, header(<<"original-expiration">>, T1)),


%% second dead letter, e.g. a ttl reason returning to source queue

%% 2nd dead letter, e.g. an expired reason returning to source queue
%% record_death uses a timestamp for death record ordering, ensure
%% it is definitely higher than the last timestamp taken
timer:sleep(2),
Msg2 = mc:record_death(ttl, <<"dl">>, Msg1),
Msg3 = mc:record_death(expired, DLQ, Msg2),

#content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg2),
#content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg3),
{_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2),
?assertMatch({_, longstr, <<"dl">>}, header(<<"queue">>, T2a)),
?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2b)),
ok.
?assertMatch({_, longstr, DLQ}, header(<<"queue">>, T2a)),
?assertMatch({_, longstr, Q}, header(<<"queue">>, T2b)),

%% 3rd dead letter
timer:sleep(2),
Msg4 = mc:record_death(rejected, Q, Msg3),

%% Roundtrip simulates message being sent to and received from AMQP 0.9.1 client.
Content2 = mc:protocol_state(Msg4),
Msg5 = mc:init(mc_amqpl, Content2, annotations()),

?assertEqual([DLQ, Q],
lists:sort(mc:death_queue_names(Msg5))),
?assertMatch({{Q, rejected},
#death{exchange = <<"exch">>,
routing_keys = [<<"apple">>],
count = 2}}, mc:last_death(Msg5)).

header(K, H) ->
rabbit_basic:header(K, H).
Expand Down

0 comments on commit a9151e3

Please sign in to comment.