diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index c4325425d434..01f99cf30d1b 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -507,45 +507,6 @@ from_basic_message(#basic_message{content = Content, %% Internal -deaths_to_headers(undefined, Headers) -> - Headers; -deaths_to_headers(#deaths{records = Records}, Headers0) -> - %% sort records by the last timestamp - List = lists:sort( - fun({_, #death{anns = #{last_time := L1}}}, - {_, #death{anns = #{last_time := L2}}}) -> - L1 < L2 - end, maps:to_list(Records)), - Infos = lists:foldl( - fun ({{QName, Reason}, #death{anns = #{first_time := Ts} = DA, - exchange = Ex, - count = Count, - routing_keys = RoutingKeys}}, - Acc) -> - %% The first routing key is the one specified in the - %% basic.publish; all others are CC or BCC keys. - RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers0)], - RKeys = [{longstr, Key} || Key <- RKs], - ReasonBin = atom_to_binary(Reason, utf8), - PerMsgTTL = case maps:get(ttl, DA, undefined) of - undefined -> []; - Ttl when is_integer(Ttl) -> - Expiration = integer_to_binary(Ttl), - [{<<"original-expiration">>, longstr, - Expiration}] - end, - [{table, [{<<"count">>, long, Count}, - {<<"reason">>, longstr, ReasonBin}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, Ts div 1000}, - {<<"exchange">>, longstr, Ex}, - {<<"routing-keys">>, array, RKeys}] ++ PerMsgTTL} - | Acc] - end, [], List), - rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos). - - - strip_header(#content{properties = #'P_basic'{headers = undefined}} = DecodedContent, _Key) -> DecodedContent; @@ -695,10 +656,11 @@ message_id({utf8, S}, HKey, H0) -> message_id(undefined, _HKey, H) -> {H, undefined}. -essential_properties(#content{} = C) -> +essential_properties(#content{properties = Props}) -> #'P_basic'{delivery_mode = Mode, priority = Priority, - timestamp = TimestampRaw} = Props = C#content.properties, + timestamp = TimestampRaw, + headers = Headers} = Props, {ok, MsgTTL} = rabbit_basic:parse_expiration(Props), Timestamp = case TimestampRaw of undefined -> @@ -708,6 +670,8 @@ essential_properties(#content{} = C) -> TimestampRaw * 1000 end, Durable = Mode == 2, + Deaths = headers_to_deaths(Headers), + maps_put_truthy( ?ANN_PRIORITY, Priority, maps_put_truthy( @@ -716,7 +680,93 @@ essential_properties(#content{} = C) -> ?ANN_TIMESTAMP, Timestamp, maps_put_falsy( ?ANN_DURABLE, Durable, - #{})))). + maps_put_truthy( + deaths, Deaths, + #{}))))). + +headers_to_deaths(undefined) -> + undefined; +headers_to_deaths(Headers) -> + case lists:keymember(<<"x-death">>, 1, Headers) of + true -> + case rabbit_misc:amqp_table(Headers) of + #{<<"x-first-death-queue">> := FstQ, + <<"x-first-death-reason">> := FstR, + <<"x-last-death-queue">> := LastQ, + <<"x-last-death-reason">> := LastR, + <<"x-death">> := XDeathList} -> + #deaths{first = {FstQ, binary_to_existing_atom(FstR)}, + last = {LastQ, binary_to_existing_atom(LastR)}, + records = recover_deaths(XDeathList, #{})}; + _ -> + undefined + end; + false -> + undefined + end. + +recover_deaths([], Acc) -> + Acc; +recover_deaths([Map = #{<<"exchange">> := Exchange, + <<"queue">> := Queue, + <<"routing-keys">> := RKeys, + <<"reason">> := ReasonBin, + <<"count">> := Count, + <<"time">> := Ts} | Rem], Acc) -> + Reason = binary_to_existing_atom(ReasonBin), + DA = #{first_time => Ts, + last_time => Ts}, + DeathAnns = case Map of + #{<<"original-expiration">> := Exp} -> + DA#{ttl => binary_to_integer(Exp)}; + _ -> + DA + end, + recover_deaths(Rem, + Acc#{{Queue, Reason} => + #death{anns = DeathAnns, + exchange = Exchange, + count = Count, + routing_keys = RKeys}}); +recover_deaths([_IgnoreInvalid | Rem], Acc) -> + recover_deaths(Rem, Acc). + +deaths_to_headers(undefined, Headers) -> + Headers; +deaths_to_headers(#deaths{records = Records}, Headers0) -> + %% sort records by the last timestamp + List = lists:sort( + fun({_, #death{anns = #{last_time := L1}}}, + {_, #death{anns = #{last_time := L2}}}) -> + L1 < L2 + end, maps:to_list(Records)), + Infos = lists:foldl( + fun ({{QName, Reason}, #death{anns = #{last_time := Ts} = DA, + exchange = Ex, + count = Count, + routing_keys = RoutingKeys}}, + Acc) -> + %% The first routing key is the one specified in the + %% basic.publish; all others are CC or BCC keys. + RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers0)], + RKeys = [{longstr, Key} || Key <- RKs], + ReasonBin = atom_to_binary(Reason, utf8), + PerMsgTTL = case maps:get(ttl, DA, undefined) of + undefined -> []; + Ttl when is_integer(Ttl) -> + Expiration = integer_to_binary(Ttl), + [{<<"original-expiration">>, longstr, + Expiration}] + end, + [{table, [{<<"count">>, long, Count}, + {<<"reason">>, longstr, ReasonBin}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, Ts div 1000}, + {<<"exchange">>, longstr, Ex}, + {<<"routing-keys">>, array, RKeys}] ++ PerMsgTTL} + | Acc] + end, [], List), + rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos). %% headers that are added as annotations during conversions is_internal_header(<<"x-basic-", _/binary>>) -> diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index 5749a21cacd1..114903d6658d 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -55,7 +55,8 @@ groups() -> dead_letter_routing_key_cycle_ttl, dead_letter_headers_reason_expired, dead_letter_headers_reason_expired_per_message, - dead_letter_extra_bcc], + dead_letter_extra_bcc, + x_death_header_from_amqpl_client], DisabledMetricTests = [metric_maxlen, metric_rejected, metric_expired_queue_msg_ttl, @@ -1493,6 +1494,48 @@ dead_letter_extra_bcc(Config) -> [_] = consume(Ch, ExtraBCCQ, [P]), ok. +%% Test case for https://github.com/rabbitmq/rabbitmq-server/issues/10709 +%% The Spring Cloud Stream RabbitMQ Binder Reference Guide recommends relying on the +%% count field of the x-death header to determine how often a message was dead lettered +%% in a loop involving RabbitMQ and an AMQP 0.9.1 client. +%% This test therefore asserts that RabbitMQ interprets the x-death header if an AMQP 0.9.1 +%% clients (re)publishes a message with the x-death header set. +%% This test case should only pass up to 3.13. +%% Starting with 4.0, RabbitMQ won't interpret x-headers sent from clients anymore in any +%% special way as x-headers "belong to the broker". +x_death_header_from_amqpl_client(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 0}]), + + Payload = <<"my payload">>, + ok = amqp_channel:call(Ch, + #'basic.publish'{routing_key = QName}, + #amqp_msg{payload = Payload}), + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + { + #'basic.get_ok'{}, + #amqp_msg{props = #'P_basic'{headers = Headers1}} = Msg1 + } = amqp_channel:call(Ch, #'basic.get'{queue = DLXQName, + no_ack = true}), + {array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>), + ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)), + + ok = amqp_channel:call(Ch, + #'basic.publish'{routing_key = QName}, + %% Re-publish the same message we received including the x-death header. + Msg1), + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + { + #'basic.get_ok'{}, + #amqp_msg{payload = Payload, + props = #'P_basic'{headers = Headers2}} + } = amqp_channel:call(Ch, #'basic.get'{queue = DLXQName, + no_ack = true}), + {array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>), + ?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)). + set_queue_options(QName, Options) -> rabbit_amqqueue:update(rabbit_misc:r(<<"/">>, queue, QName), fun(Q) -> diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 1655394deb0c..b00a30b744ff 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -192,23 +192,31 @@ amqpl_table_x_header_array_of_tbls(_Config) -> ok. amqpl_death_records(_Config) -> - Content = #content{class_id = 60, - properties = #'P_basic'{headers = []}, - payload_fragments_rev = [<<"data">>]}, - Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())), - - Msg1 = mc:record_death(rejected, <<"q1">>, Msg0), - ?assertEqual([<<"q1">>], mc:death_queue_names(Msg1)), - ?assertMatch({{<<"q1">>, rejected}, + Q = <<"my queue">>, + DLQ = <<"my dead letter queue">>, + + Content0 = #content{class_id = 60, + properties = #'P_basic'{headers = [], + expiration = <<"9999">>}, + payload_fragments_rev = [<<"data">>]}, + Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content0, annotations())), + Msg1 = mc:record_death(rejected, Q, Msg0), + + %% Roundtrip simulates message being sent to and received from AMQP 0.9.1 client. + Content1 = mc:protocol_state(Msg1), + Msg2 = mc:init(mc_amqpl, Content1, annotations()), + + ?assertEqual([Q], mc:death_queue_names(Msg2)), + ?assertMatch({{Q, rejected}, #death{exchange = <<"exch">>, routing_keys = [<<"apple">>], - count = 1}}, mc:last_death(Msg1)), - ?assertEqual(false, mc:is_death_cycle(<<"q1">>, Msg1)), + count = 1}}, mc:last_death(Msg2)), + ?assertEqual(false, mc:is_death_cycle(Q, Msg2)), - #content{properties = #'P_basic'{headers = H1}} = mc:protocol_state(Msg1), + #content{properties = #'P_basic'{headers = H1}} = mc:protocol_state(Msg2), ?assertMatch({_, array, [_]}, header(<<"x-death">>, H1)), - ?assertMatch({_, longstr, <<"q1">>}, header(<<"x-first-death-queue">>, H1)), - ?assertMatch({_, longstr, <<"q1">>}, header(<<"x-last-death-queue">>, H1)), + ?assertMatch({_, longstr, Q}, header(<<"x-first-death-queue">>, H1)), + ?assertMatch({_, longstr, Q}, header(<<"x-last-death-queue">>, H1)), ?assertMatch({_, longstr, <<"exch">>}, header(<<"x-first-death-exchange">>, H1)), ?assertMatch({_, longstr, <<"exch">>}, header(<<"x-last-death-exchange">>, H1)), ?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-first-death-reason">>, H1)), @@ -216,24 +224,37 @@ amqpl_death_records(_Config) -> {_, array, [{table, T1}]} = header(<<"x-death">>, H1), ?assertMatch({_, long, 1}, header(<<"count">>, T1)), ?assertMatch({_, longstr, <<"rejected">>}, header(<<"reason">>, T1)), - ?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T1)), + ?assertMatch({_, longstr, Q}, header(<<"queue">>, T1)), ?assertMatch({_, longstr, <<"exch">>}, header(<<"exchange">>, T1)), ?assertMatch({_, timestamp, _}, header(<<"time">>, T1)), ?assertMatch({_, array, [{longstr, <<"apple">>}]}, header(<<"routing-keys">>, T1)), + ?assertMatch({_, longstr, <<"9999">>}, header(<<"original-expiration">>, T1)), - - %% second dead letter, e.g. a ttl reason returning to source queue - + %% 2nd dead letter, e.g. an expired reason returning to source queue %% record_death uses a timestamp for death record ordering, ensure %% it is definitely higher than the last timestamp taken timer:sleep(2), - Msg2 = mc:record_death(ttl, <<"dl">>, Msg1), + Msg3 = mc:record_death(expired, DLQ, Msg2), - #content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg2), + #content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg3), {_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2), - ?assertMatch({_, longstr, <<"dl">>}, header(<<"queue">>, T2a)), - ?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2b)), - ok. + ?assertMatch({_, longstr, DLQ}, header(<<"queue">>, T2a)), + ?assertMatch({_, longstr, Q}, header(<<"queue">>, T2b)), + + %% 3rd dead letter + timer:sleep(2), + Msg4 = mc:record_death(rejected, Q, Msg3), + + %% Roundtrip simulates message being sent to and received from AMQP 0.9.1 client. + Content2 = mc:protocol_state(Msg4), + Msg5 = mc:init(mc_amqpl, Content2, annotations()), + + ?assertEqual([DLQ, Q], + lists:sort(mc:death_queue_names(Msg5))), + ?assertMatch({{Q, rejected}, + #death{exchange = <<"exch">>, + routing_keys = [<<"apple">>], + count = 2}}, mc:last_death(Msg5)). header(K, H) -> rabbit_basic:header(K, H).