Skip to content

Commit 88154a7

Browse files
committed
Handle closed osiris reader when dispatching chunk
1 parent bb2f7b6 commit 88154a7

File tree

1 file changed

+64
-40
lines changed

1 file changed

+64
-40
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 64 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1610,19 +1610,36 @@ handle_frame_post_auth(Transport,
16101610
case Consumers of
16111611
#{SubscriptionId := Consumer} ->
16121612
#consumer{credit = AvailableCredit} = Consumer,
1613-
1614-
{{segment, Segment1}, {credit, Credit1}} =
1615-
send_chunks(Transport,
1616-
Consumer,
1617-
AvailableCredit + Credit,
1618-
SendFileOct),
1619-
1620-
Consumer1 = Consumer#consumer{segment = Segment1, credit = Credit1},
1621-
{Connection,
1622-
State#stream_connection_state{consumers =
1623-
Consumers#{SubscriptionId =>
1624-
Consumer1}},
1625-
Rest};
1613+
case send_chunks(Transport,
1614+
Consumer,
1615+
AvailableCredit + Credit,
1616+
SendFileOct)
1617+
of
1618+
{error, closed} ->
1619+
rabbit_log:warning("Reader for subscription ~p has been closed, removing "
1620+
"subscription",
1621+
[SubscriptionId]),
1622+
{Connection1, State1} =
1623+
remove_subscription(SubscriptionId, Connection, State),
1624+
Frame =
1625+
<<?RESPONSE:1,
1626+
?COMMAND_CREDIT:15,
1627+
?VERSION_1:16,
1628+
?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST:16,
1629+
SubscriptionId:8>>,
1630+
FrameSize = byte_size(Frame),
1631+
Transport:send(S, [<<FrameSize:32>>, Frame]),
1632+
{Connection1, State1, Rest};
1633+
{{segment, Segment1}, {credit, Credit1}} ->
1634+
Consumer1 =
1635+
Consumer#consumer{segment = Segment1, credit = Credit1},
1636+
{Connection,
1637+
State#stream_connection_state{consumers =
1638+
Consumers#{SubscriptionId
1639+
=>
1640+
Consumer1}},
1641+
Rest}
1642+
end;
16261643
_ ->
16271644
rabbit_log:warning("Giving credit to unknown subscription: ~p",
16281645
[SubscriptionId]),
@@ -1744,36 +1761,13 @@ handle_frame_post_auth(Transport,
17441761
?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST),
17451762
{Connection, State, Rest};
17461763
true ->
1747-
#{SubscriptionId := Consumer} = Consumers,
1748-
Stream = Consumer#consumer.stream,
1749-
#{Stream := SubscriptionsForThisStream} = StreamSubscriptions,
1750-
SubscriptionsForThisStream1 =
1751-
lists:delete(SubscriptionId, SubscriptionsForThisStream),
1752-
StreamSubscriptions1 =
1753-
case length(SubscriptionsForThisStream1) of
1754-
0 ->
1755-
% no more subscription for this stream
1756-
maps:remove(Stream, StreamSubscriptions);
1757-
_ ->
1758-
StreamSubscriptions#{Stream =>
1759-
SubscriptionsForThisStream1}
1760-
end,
1761-
Connection1 =
1762-
Connection#stream_connection{stream_subscriptions =
1763-
StreamSubscriptions1},
1764-
Consumers1 = maps:remove(SubscriptionId, Consumers),
1765-
Connection2 =
1766-
maybe_clean_connection_from_stream(Stream, Connection1),
1767-
rabbit_stream_metrics:consumer_cancelled(self(),
1768-
stream_r(Stream,
1769-
Connection2),
1770-
SubscriptionId),
1764+
{Connection1, State1} =
1765+
remove_subscription(SubscriptionId, Connection, State),
17711766
response_ok(Transport,
17721767
Connection,
1773-
?COMMAND_SUBSCRIBE,
1768+
?COMMAND_UNSUBSCRIBE,
17741769
CorrelationId),
1775-
{Connection2, State#stream_connection_state{consumers = Consumers1},
1776-
Rest}
1770+
{Connection1, State1, Rest}
17771771
end;
17781772
handle_frame_post_auth(Transport,
17791773
#stream_connection{virtual_host = VirtualHost,
@@ -2344,6 +2338,34 @@ lookup_leader(Stream,
23442338
lookup_leader_from_manager(VirtualHost, Stream) ->
23452339
rabbit_stream_manager:lookup_leader(VirtualHost, Stream).
23462340

2341+
remove_subscription(SubscriptionId,
2342+
#stream_connection{stream_subscriptions =
2343+
StreamSubscriptions} =
2344+
Connection,
2345+
#stream_connection_state{consumers = Consumers} = State) ->
2346+
#{SubscriptionId := Consumer} = Consumers,
2347+
Stream = Consumer#consumer.stream,
2348+
#{Stream := SubscriptionsForThisStream} = StreamSubscriptions,
2349+
SubscriptionsForThisStream1 =
2350+
lists:delete(SubscriptionId, SubscriptionsForThisStream),
2351+
StreamSubscriptions1 =
2352+
case length(SubscriptionsForThisStream1) of
2353+
0 ->
2354+
% no more subscription for this stream
2355+
maps:remove(Stream, StreamSubscriptions);
2356+
_ ->
2357+
StreamSubscriptions#{Stream => SubscriptionsForThisStream1}
2358+
end,
2359+
Connection1 =
2360+
Connection#stream_connection{stream_subscriptions =
2361+
StreamSubscriptions1},
2362+
Consumers1 = maps:remove(SubscriptionId, Consumers),
2363+
Connection2 = maybe_clean_connection_from_stream(Stream, Connection1),
2364+
rabbit_stream_metrics:consumer_cancelled(self(),
2365+
stream_r(Stream, Connection2),
2366+
SubscriptionId),
2367+
{Connection2, State#stream_connection_state{consumers = Consumers1}}.
2368+
23472369
maybe_clean_connection_from_stream(Stream,
23482370
#stream_connection{stream_leaders =
23492371
Leaders} =
@@ -2491,6 +2513,8 @@ send_chunks(Transport,
24912513
of
24922514
{ok, Segment1} ->
24932515
send_chunks(Transport, State, Segment1, Credit - 1, true, Counter);
2516+
{error, closed} ->
2517+
{error, closed};
24942518
{end_of_stream, Segment1} ->
24952519
case Retry of
24962520
true ->

0 commit comments

Comments
 (0)