Skip to content

Commit

Permalink
Convert array from AMQP 1.0 to AMQP 0.9.1 (backport #12571) (#12572)
Browse files Browse the repository at this point in the history
* Convert array from AMQP 1.0 to AMQP 0.9.1

Fix the following crash when an AMQP 0.9.1 client consumes an AMQP 1.0
encoded message that contains an array value in message annotations:
```
crasher:
  initial call: rabbit_channel:init/1
  pid: <0.685.0>
  registered_name: []
  exception exit: {function_clause,
                      [{mc_amqpl,to_091,
                           [<<"x-array">>,
                            {array,utf8,[{utf8,<<"e1">>},{utf8,<<"e2">>}]}],
                           [{file,"mc_amqpl.erl"},{line,737}]},
                       {mc_amqpl,'-convert_from/3-fun-3-',1,
                           [{file,"mc_amqpl.erl"},{line,168}]},
                       {lists,filtermap_1,2,
                           [{file,"lists.erl"},{line,2279}]},
                       {mc_amqpl,convert_from,3,
                           [{file,"mc_amqpl.erl"},{line,158}]},
                       {mc,convert,3,[{file,"mc.erl"},{line,332}]},
                       {rabbit_channel,handle_deliver0,4,
                           [{file,"rabbit_channel.erl"},{line,2619}]},
                       {lists,foldl_1,3,[{file,"lists.erl"},{line,2151}]},
                       {lists,foldl,3,[{file,"lists.erl"},{line,2146}]}]}
```

(cherry picked from commit 814d44d)

# Conflicts:
#	deps/amqp10_client/src/amqp10_msg.erl

* Fix conflict

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
  • Loading branch information
mergify[bot] and ansd authored Oct 22, 2024
1 parent 1c271d7 commit 4911947
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
11 changes: 8 additions & 3 deletions deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ set_delivery_annotations(
Anns1 = #'v1_0.delivery_annotations'{content = maps:to_list(Anns)},
Msg#amqp10_msg{delivery_annotations = Anns1}.

-spec set_message_annotations(#{binary() => binary() | integer() | string()},
amqp10_msg()) -> amqp10_msg().
-spec set_message_annotations(#{binary() => binary() | number() | string() | tuple()},
amqp10_msg()) -> amqp10_msg().
set_message_annotations(Props,
#amqp10_msg{message_annotations = undefined} =
Msg) ->
Expand Down Expand Up @@ -433,7 +433,12 @@ wrap_ap_value(V) when is_integer(V) ->
case V < 0 of
true -> {int, V};
false -> {uint, V}
end.
end;
wrap_ap_value(V) when is_number(V) ->
%% AMQP double and Erlang float are both 64-bit.
{double, V};
wrap_ap_value(TaggedValue) when is_tuple(TaggedValue) ->
TaggedValue.

%% LOCAL
header_value(durable, undefined) -> false;
Expand Down
9 changes: 7 additions & 2 deletions deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -754,9 +754,14 @@ to_091(Key, false) -> {Key, bool, false};
to_091(Key, undefined) -> {Key, void, undefined};
to_091(Key, null) -> {Key, void, undefined};
to_091(Key, {list, L}) ->
{Key, array, [to_091(V) || V <- L]};
to_091_array(Key, L);
to_091(Key, {map, M}) ->
{Key, table, [to_091(unwrap(K), V) || {K, V} <- M]}.
{Key, table, [to_091(unwrap(K), V) || {K, V} <- M]};
to_091(Key, {array, _T, L}) ->
to_091_array(Key, L).

to_091_array(Key, L) ->
{Key, array, [to_091(V) || V <- L]}.

to_091({utf8, V}) -> {longstr, V};
to_091({symbol, V}) -> {longstr, V};
Expand Down
14 changes: 14 additions & 0 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,13 @@ amqp_amqpl(QType, Config) ->
message_format = {uint, 0}},
Body1,
Footer])),
%% Send with an array value in message annotations.
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_message_annotations(
#{<<"x-array">> => {array, utf8, [{utf8, <<"e1">>},
{utf8, <<"e2">>}]}},
amqp10_msg:new(<<>>, Body1, true))),

ok = amqp10_client:detach_link(Sender),
flush(detached),
Expand Down Expand Up @@ -1407,6 +1414,13 @@ amqp_amqpl(QType, Config) ->
?assertEqual([Body1, Footer], amqp10_framing:decode_bin(Payload10))
after 5000 -> ct:fail({missing_deliver, ?LINE})
end,
receive {_, #amqp_msg{payload = Payload11,
props = #'P_basic'{headers = Headers11}}} ->
?assertEqual([Body1], amqp10_framing:decode_bin(Payload11)),
?assertEqual({array, [{longstr, <<"e1">>}, {longstr, <<"e2">>}]},
rabbit_misc:table_lookup(Headers11, <<"x-array">>))
after 5000 -> ct:fail({missing_deliver, ?LINE})
end,

ok = rabbit_ct_client_helpers:close_channel(Ch),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/test/mc_unit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,8 @@ amqp_amqpl(_Config) ->
MAC = [
{{symbol, <<"x-stream-filter">>}, {utf8, <<"apple">>}},
thead2('x-list', list, [utf8(<<"l">>)]),
thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}])
thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}]),
{{symbol, <<"x-array">>}, {array, utf8, [{utf8, <<"a">>}]}}
],
M = #'v1_0.message_annotations'{content = MAC},
P = #'v1_0.properties'{content_type = {symbol, <<"ctype">>},
Expand Down Expand Up @@ -598,6 +599,7 @@ amqp_amqpl(_Config) ->
?assertMatch({_, longstr, <<"apple">>}, header(<<"x-stream-filter">>, HL)),
?assertMatch({_ ,array, [{longstr,<<"l">>}]}, header(<<"x-list">>, HL)),
?assertMatch({_, table, [{<<"k">>,longstr,<<"v">>}]}, header(<<"x-map">>, HL)),
?assertMatch({_, array, [{longstr, <<"a">>}]}, header(<<"x-array">>, HL)),

?assertMatch({_, long, 5}, header(<<"long">>, HL)),
?assertMatch({_, long, 5}, header(<<"ulong">>, HL)),
Expand Down

0 comments on commit 4911947

Please sign in to comment.