diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index a1c899bcf741..b77a7f5b0846 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1505,7 +1505,6 @@ handle_frame_pre_auth(Transport, send(Transport, S, F), Connection#stream_connection{connection_step = failure} end, - {Connection1, State}; handle_frame_pre_auth(_Transport, Connection, State, heartbeat) -> rabbit_log:debug("Received heartbeat frame pre auth"), @@ -1614,21 +1613,8 @@ handle_frame_post_auth(Transport, Challenge}}; {ok, NewUser = #user{username = NewUsername}} -> case NewUsername of - Username -> - rabbit_core_metrics:auth_attempt_succeeded(Host, - Username, - stream), - notify_auth_result(Username, - user_authentication_success, - [], - C1, - S1), - rabbit_log:debug("Successfully updated secret for username '~ts'", [Username]), - {C1#stream_connection{user = NewUser, - authentication_state = done, - connection_step = authenticated}, - {sasl_authenticate, ?RESPONSE_CODE_OK, - <<>>}}; + Username -> + complete_secret_update(NewUser, C1, S1); _ -> rabbit_core_metrics:auth_attempt_failed(Host, Username, @@ -2780,6 +2766,32 @@ handle_frame_post_auth(Transport, increase_protocol_counter(?UNKNOWN_FRAME), {Connection#stream_connection{connection_step = close_sent}, State}. +complete_secret_update(NewUser = #user{username = Username}, + #stream_connection{host = Host, + socket = S, + virtual_host = VH} = C1, S1) -> + notify_auth_result(Username, user_authentication_success, [], C1, S1), + rabbit_core_metrics:auth_attempt_succeeded(Host, Username, stream), + rabbit_log_connection:debug("Stream connection has successfully checked updated secret (token) for username '~ts'", + [Username]), + try + rabbit_log_connection:debug("Stream connection: will verify virtual host access after secret (token) update"), + rabbit_access_control:check_vhost_access(NewUser, VH, {socket, S}, #{}), + rabbit_log_connection:debug("Stream connection: successfully re-verified virtual host access"), + + {C1#stream_connection{user = NewUser, + authentication_state = done, + connection_step = authenticated}, + {sasl_authenticate, ?RESPONSE_CODE_OK, + <<>>}} + catch exit:#amqp_error{explanation = Explanation} -> + rabbit_log_connection:warning("Stream connection no longer has the permissions to access its target virtual host ('~ts') after a secret (token) update: ~ts", + [VH, Explanation]), + silent_close_delay(), + {C1#stream_connection{connection_step = failure}, + {sasl_authenticate, ?RESPONSE_VHOST_ACCESS_FAILURE, <<>>}} + end. + process_client_command_versions(C, []) -> C; process_client_command_versions(C, [H | T]) -> diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 0935554bde4a..7f9dc73eb6c6 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -49,6 +49,7 @@ groups() -> cannot_update_username_after_authenticated, cannot_use_another_authmechanism_when_updating_secret, update_secret_should_close_connection_if_wrong_secret, + update_secret_should_close_connection_if_unauthorized_vhost, unauthenticated_client_rejected_tcp_connected, timeout_tcp_connected, unauthenticated_client_rejected_peer_properties_exchanged, @@ -166,6 +167,12 @@ init_per_testcase(cannot_update_username_after_authenticated = TestCase, Config) ok = rabbit_ct_broker_helpers:add_user(Config, <<"other">>), rabbit_ct_helpers:testcase_started(Config, TestCase); +init_per_testcase(update_secret_should_close_connection_if_unauthorized_vhost = TestCase, + Config) -> + ok = rabbit_ct_broker_helpers:add_user(Config, <<"other">>), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, <<"other">>, <<"/">>), + rabbit_ct_helpers:testcase_started(Config, TestCase); + init_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) -> ok = rabbit_ct_broker_helpers:rpc(Config, 0, @@ -201,6 +208,11 @@ end_per_testcase(cannot_update_username_after_authenticated = TestCase, Config) ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>), rabbit_ct_helpers:testcase_finished(Config, TestCase); +end_per_testcase(update_secret_should_close_connection_if_unauthorized_vhost = TestCase, + Config) -> + ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>), + rabbit_ct_helpers:testcase_finished(Config, TestCase); + end_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) -> ok = rabbit_ct_broker_helpers:rpc(Config, 0, @@ -286,7 +298,7 @@ test_update_secret(Config) -> {S, C0} = connect_and_authenticate(Transport, Config), rabbit_ct_broker_helpers:change_password(Config, <<"guest">>, <<"password">>), C1 = expect_successful_authentication( - try_authenticate(Transport, S, C0, <<"PLAIN">>, <<"guest">>, <<"password">>)), + try_authenticate(Transport, S, C0, <<"PLAIN">>, <<"guest">>, <<"password">>)), _C2 = test_close(Transport, S, C1), closed = wait_for_socket_close(Transport, S, 10), ok. @@ -317,6 +329,22 @@ update_secret_should_close_connection_if_wrong_secret(Config) -> closed = wait_for_socket_close(Transport, S, 10), ok. +update_secret_should_close_connection_if_unauthorized_vhost(Config) -> + T = gen_tcp, + Port = get_port(T, Config), + Opts = get_opts(T), + {ok, S} = T:connect("localhost", Port, Opts), + C0 = rabbit_stream_core:init(0), + C1 = test_peer_properties(T, S, C0), + Username = <<"other">>, + C2 = test_authenticate(T, S, C1, Username), + ok = rabbit_ct_broker_helpers:clear_permissions(Config, Username, <<"/">>), + _C3 = expect_unsuccessful_authentication( + try_authenticate(gen_tcp, S, C2, <<"PLAIN">>, Username, Username), + ?RESPONSE_VHOST_ACCESS_FAILURE), + closed = wait_for_socket_close(T, S, 10), + ok. + test_stream_tls(Config) -> Stream = atom_to_binary(?FUNCTION_NAME, utf8), test_server(ssl, Stream, Config),