diff --git a/BUILD.bazel b/BUILD.bazel index 927f637..9f26976 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -107,6 +107,8 @@ erlang_app( license_files = [":license_files"], priv = [":priv"], deps = [ + "@khepri//:erlang_app", + "@ra//:erlang_app", "@rabbitmq-server//deps/rabbit:erlang_app", "@rabbitmq-server//deps/rabbit_common:erlang_app", ], @@ -127,6 +129,8 @@ test_erlang_app( license_files = [":license_files"], priv = [":priv"], deps = [ + "@khepri//:erlang_app", + "@ra//:erlang_app", "@rabbitmq-server//deps/rabbit:erlang_app", "@rabbitmq-server//deps/rabbit_common:erlang_app", ], diff --git a/Makefile b/Makefile index c13f053..b9c4791 100644 --- a/Makefile +++ b/Makefile @@ -2,10 +2,10 @@ PROJECT = rabbitmq_delayed_message_exchange PROJECT_DESCRIPTION = RabbitMQ Delayed Message Exchange PROJECT_MOD = rabbit_delayed_message_app -RABBITMQ_VERSION ?= v3.10.x +RABBITMQ_VERSION ?= main define PROJECT_APP_EXTRA_KEYS - {broker_version_requirements, ["3.10.0"]} + {broker_version_requirements, ["3.13.0"]} endef dep_amqp_client = git_rmq-subfolder rabbitmq-erlang-client $(RABBITMQ_VERSION) @@ -14,7 +14,11 @@ dep_rabbit = git_rmq-subfolder rabbitmq-server $(RABBITMQ_VE dep_rabbitmq_ct_client_helpers = git_rmq-subfolder rabbitmq-ct-client-helpers $(RABBITMQ_VERSION) dep_rabbitmq_ct_helpers = git_rmq-subfolder rabbitmq-ct-helpers $(RABBITMQ_VERSION) -DEPS = rabbit_common rabbit +dep_khepri = git https://github.com/rabbitmq/khepri.git main +dep_leveldb = git git@github.com:martinsumner/leveled.git +dep_lz4 = git git@github.com:martinsumner/erlang-lz4.git + +DEPS = rabbit_common rabbit khepri leveldb lz4 TEST_DEPS = ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client dep_ct_helper = git https://github.com/extend/ct_helper.git master diff --git a/app.bzl b/app.bzl index 4b61719..0a275f0 100644 --- a/app.bzl +++ b/app.bzl @@ -7,6 +7,9 @@ def all_srcs(name = "all_srcs"): srcs = [ "src/rabbit_delayed_message.erl", "src/rabbit_delayed_message_app.erl", + "src/rabbit_delayed_message_khepri.erl", + "src/rabbit_delayed_message_kv_store.erl", + "src/rabbit_delayed_message_machine.erl", "src/rabbit_delayed_message_sup.erl", "src/rabbit_delayed_message_utils.erl", "src/rabbit_exchange_type_delayed_message.erl", @@ -39,6 +42,9 @@ def all_beam_files(name = "all_beam_files"): srcs = [ "src/rabbit_delayed_message.erl", "src/rabbit_delayed_message_app.erl", + "src/rabbit_delayed_message_khepri.erl", + "src/rabbit_delayed_message_kv_store.erl", + "src/rabbit_delayed_message_machine.erl", "src/rabbit_delayed_message_sup.erl", "src/rabbit_delayed_message_utils.erl", "src/rabbit_exchange_type_delayed_message.erl", @@ -47,7 +53,7 @@ def all_beam_files(name = "all_beam_files"): app_name = "rabbitmq_delayed_message_exchange", dest = "ebin", erlc_opts = "//:erlc_opts", - deps = ["@rabbitmq-server//deps/rabbit:erlang_app", "@rabbitmq-server//deps/rabbit_common:erlang_app"], + deps = ["@khepri//:erlang_app", "@ra//:erlang_app", "@rabbitmq-server//deps/rabbit:erlang_app", "@rabbitmq-server//deps/rabbit_common:erlang_app"], ) filegroup( name = "beam_files", @@ -61,6 +67,9 @@ def all_test_beam_files(name = "all_test_beam_files"): srcs = [ "src/rabbit_delayed_message.erl", "src/rabbit_delayed_message_app.erl", + "src/rabbit_delayed_message_khepri.erl", + "src/rabbit_delayed_message_kv_store.erl", + "src/rabbit_delayed_message_machine.erl", "src/rabbit_delayed_message_sup.erl", "src/rabbit_delayed_message_utils.erl", "src/rabbit_exchange_type_delayed_message.erl", @@ -69,7 +78,7 @@ def all_test_beam_files(name = "all_test_beam_files"): app_name = "rabbitmq_delayed_message_exchange", dest = "test", erlc_opts = "//:test_erlc_opts", - deps = ["@rabbitmq-server//deps/rabbit:erlang_app", "@rabbitmq-server//deps/rabbit_common:erlang_app"], + deps = ["@khepri//:erlang_app", "@ra//:erlang_app", "@rabbitmq-server//deps/rabbit:erlang_app", "@rabbitmq-server//deps/rabbit_common:erlang_app"], ) filegroup( name = "test_beam_files", diff --git a/moduleindex.yaml b/moduleindex.yaml index 9e067dc..39fb65f 100644 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -3,6 +3,8 @@ amqp10_common: - amqp10_binary_parser - amqp10_framing - amqp10_framing0 +app: +- appup_src aten: - aten - aten_app @@ -11,6 +13,76 @@ aten: - aten_emitter - aten_sink - aten_sup +bazdep: +- bazdep +bbmustache: +- bbmustache +check_undefined_test: +- check_undefined_test_app +- check_undefined_test_sup +chunked_hello_world: +- chunked_hello_world_app +- chunked_hello_world_sup +- toppage_h +codepath: +- codepath +compress_response: +- compress_response_app +- compress_response_sup +- toppage_h +cookie: +- cookie_app +- cookie_sup +- toppage_h +cover: +- foo +cowboy: +- cowboy +- cowboy_app +- cowboy_bstr +- cowboy_children +- cowboy_clear +- cowboy_clock +- cowboy_compress_h +- cowboy_constraints +- cowboy_handler +- cowboy_http +- cowboy_http2 +- cowboy_loop +- cowboy_metrics_h +- cowboy_middleware +- cowboy_req +- cowboy_rest +- cowboy_router +- cowboy_static +- cowboy_stream +- cowboy_stream_h +- cowboy_sub_protocol +- cowboy_sup +- cowboy_tls +- cowboy_tracer_h +- cowboy_websocket +cowlib: +- cow_base64url +- cow_cookie +- cow_date +- cow_hpack +- cow_http +- cow_http2 +- cow_http2_machine +- cow_http_hd +- cow_http_struct_hd +- cow_http_te +- cow_iolists +- cow_link +- cow_mimetypes +- cow_multipart +- cow_qs +- cow_spdy +- cow_sse +- cow_uri +- cow_uri_template +- cow_ws credentials_obfuscation: - credentials_obfuscation - credentials_obfuscation_app @@ -41,12 +113,103 @@ cuttlefish: - cuttlefish_validator - cuttlefish_variable - cuttlefish_vmargs +dummy: +- dummy_app +- dummy_server +- dummy_sup +echo_get: +- echo_get_app +- echo_get_sup +- toppage_h +echo_post: +- echo_post_app +- echo_post_sup +- toppage_h enough: - enough +erlc: +- first_erl +- foo +- foo_app +- foo_test_worker +- foo_worker +eunit: +- foo +eunit_surefire: +- foo +eventsource: +- eventsource_app +- eventsource_h +- eventsource_sup +extended_script_tests: +- extended_script_tests +extension_tests: +- extension_tests +file_server: +- directory_h +- directory_lister +- file_server_app +- file_server_sup +foo: +- java +- lisp +- pascal +- perl +foodep: +- foodep +force_nodetool_tests: +- force_nodetool_tests_app +- force_nodetool_tests_sup gen_batch_server: - gen_batch_server getopt: - getopt +gpb: +- gpb +- gpb_compile +hello_world: +- hello_world_app +- hello_world_sup +- toppage_h +hooks_tests: +- hooks_tests +horus: +- horus +- horus_cover +- horus_utils +katana_code: +- ktn_code +- ktn_dodger +- ktn_io_string +khepri: +- khepri +- khepri_adv +- khepri_app +- khepri_cluster +- khepri_condition +- khepri_event_handler +- khepri_evf +- khepri_export_erlang +- khepri_import_export +- khepri_machine +- khepri_path +- khepri_pattern_tree +- khepri_payload +- khepri_projection +- khepri_sproc +- khepri_sup +- khepri_tree +- khepri_tx +- khepri_tx_adv +- khepri_utils +markdown_middleware: +- erlmarkdown +- markdown_converter +- markdown_middleware_app +- markdown_middleware_sup +neotoma: +- neotoma +- neotoma_parse observer_cli: - observer_cli - observer_cli_application @@ -65,6 +228,7 @@ osiris: - osiris - osiris_app - osiris_bench +- osiris_bloom - osiris_counters - osiris_ets - osiris_log @@ -78,6 +242,23 @@ osiris: - osiris_tracking - osiris_util - osiris_writer +overlay_error_test: +- overlay_error_test_app +- overlay_error_test_sup +powershell_release: +- powershell_release_app +- powershell_release_sup +proto_gpb: +- foo +- foo_app +- foo_sup +proto_protobuffs: +- foo +- foo_app +- foo_sup +protobuffs: +- protobuffs +- protobuffs_compile ra: - ra - ra_app @@ -117,6 +298,11 @@ ra: - ra_system - ra_system_sup - ra_systems_sup +ra_kv_store: +- ra_kv_store +- ra_kv_store_app +- ra_kv_store_handler +- ra_kv_store_sup rabbit: - amqqueue - background_gc @@ -396,6 +582,9 @@ rabbit_common: rabbitmq_delayed_message_exchange: - rabbit_delayed_message - rabbit_delayed_message_app +- rabbit_delayed_message_khepri +- rabbit_delayed_message_kv_store +- rabbit_delayed_message_machine - rabbit_delayed_message_sup - rabbit_delayed_message_utils - rabbit_exchange_type_delayed_message @@ -436,6 +625,60 @@ ranch: - ranch_sup - ranch_tcp - ranch_transport +rebar: +- rebar +- rebar_abnfc_compiler +- rebar_app_utils +- rebar_appups +- rebar_asn1_compiler +- rebar_base_compiler +- rebar_cleaner +- rebar_config +- rebar_core +- rebar_cover_utils +- rebar_ct +- rebar_deps +- rebar_dia_compiler +- rebar_dialyzer +- rebar_edoc +- rebar_erlc_compiler +- rebar_erlydtl_compiler +- rebar_escripter +- rebar_eunit +- rebar_file_utils +- rebar_getopt +- rebar_lfe_compiler +- rebar_log +- rebar_metacmds +- rebar_mustache +- rebar_neotoma_compiler +- rebar_otp_app +- rebar_otp_appup +- rebar_port_compiler +- rebar_proto_compiler +- rebar_proto_gpb_compiler +- rebar_protobuffs_compiler +- rebar_qc +- rebar_rand_compat +- rebar_rel_utils +- rebar_reltool +- rebar_require_vsn +- rebar_shell +- rebar_subdirs +- rebar_templater +- rebar_upgrade +- rebar_utils +- rebar_xref +- rmemo +rebar3_format: +- default_formatter +- erlfmt_formatter +- otp_formatter +- rebar3_ast_formatter +- rebar3_format +- rebar3_format_prv +- rebar3_formatter +- sr_formatter recon: - recon - recon_alloc @@ -450,11 +693,47 @@ redbug: - redbug_lexer - redbug_parser - redbug_targ +relx: +- relx +- rlx_app_info +- rlx_assemble +- rlx_config +- rlx_file_utils +- rlx_log +- rlx_overlay +- rlx_release +- rlx_relup +- rlx_resolve +- rlx_state +- rlx_string +- rlx_tar +- rlx_util +replace_os_vars_tests: +- replace_os_vars_tests +rest_basic_auth: +- rest_basic_auth_app +- rest_basic_auth_sup +- toppage_h +rest_hello_world: +- rest_hello_world_app +- rest_hello_world_sup +- toppage_h +rest_pastebin: +- rest_pastebin_app +- rest_pastebin_sup +- toppage_h seshat: - seshat - seshat_app - seshat_counters_server - seshat_sup +shellcheck_test: +- shellcheck_test_app +- shellcheck_test_sup +ssl_hello_world: +- ssl_hello_world_app +- ssl_hello_world_sup +- toppage_h stdout_formatter: - stdout_formatter - stdout_formatter_paragraph @@ -476,6 +755,9 @@ sysmon_handler: - sysmon_handler_filter - sysmon_handler_sup - sysmon_handler_testhandler +system_libs_tests: +- system_libs_tests_app +- system_libs_tests_sup systemd: - systemd - systemd_app @@ -485,7 +767,30 @@ systemd: - systemd_socket - systemd_sup - systemd_watchdog +tcp_echo: +- echo_protocol +- tcp_echo_app +- tcp_echo_sup +tcp_reverse: +- reverse_protocol +- tcp_reverse_app +- tcp_reverse_sup thoas: - thoas - thoas_decode - thoas_encode +unuseddep: +- unuseddep +upgrade_test: +- upgrade_test_app +- upgrade_test_sup +upload: +- upload_app +- upload_h +- upload_sup +vm_args_inclusion_tests: +- vm_args_inclusion_tests +websocket: +- websocket_app +- websocket_sup +- ws_h diff --git a/src/rabbit_delayed_message.erl b/src/rabbit_delayed_message.erl index 43804e5..413a192 100644 --- a/src/rabbit_delayed_message.erl +++ b/src/rabbit_delayed_message.erl @@ -1,4 +1,4 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public +% This Source Code Form is subject to the terms of the Mozilla Public %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% @@ -14,26 +14,25 @@ -module(rabbit_delayed_message). -include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("khepri/include/khepri.hrl"). + -rabbit_boot_step({?MODULE, [{description, "exchange delayed message mnesia setup"}, - {mfa, {?MODULE, setup_mnesia, []}}, + {mfa, {?MODULE, setup, []}}, {cleanup, {?MODULE, disable_plugin, []}}, - {requires, external_infrastructure}, - {enables, rabbit_registry}]}). + {requires, recovery}]}). + %% {requires, external_infrastructure}, + %% {enables, rabbit_registry}]}). -behaviour(gen_server). --export([start_link/0, delay_message/3, setup_mnesia/0, disable_plugin/0, go/0]). +-export([start_link/0, delay_message/3, setup/0, disable_plugin/0, go/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([messages_delayed/1]). +-export([messages_delayed/1, route/3]). %% For testing, debugging and manual use --export([refresh_config/0, - table_name/0, - index_table_name/0]). - --import(rabbit_delayed_message_utils, [swap_delay_header/1]). +-export([refresh_config/0]). -type t_reference() :: reference(). -type delay() :: non_neg_integer(). @@ -50,28 +49,12 @@ delay()) -> nodelay | {ok, t_reference()}. --define(TABLE_NAME, append_to_atom(?MODULE, node())). --define(INDEX_TABLE_NAME, append_to_atom(?TABLE_NAME, "_index")). +-define(Timeout, 5000). -record(state, {timer, + khepri_mod, stats_state}). --record(delay_key, - { timestamp, %% timestamp delay - exchange %% rabbit_types:exchange() - }). - --record(delay_entry, - { delay_key, %% delay_key record - delivery, %% the message delivery - ref %% ref to make records distinct for 'bag' semantics. - }). - --record(delay_index, - { delay_key, %% delay_key record - const %% record must have two fields - }). - %%-------------------------------------------------------------------- start_link() -> @@ -84,29 +67,14 @@ delay_message(Exchange, Message, Delay) -> gen_server:call(?MODULE, {delay_message, Exchange, Message, Delay}, infinity). -setup_mnesia() -> - _ = mnesia:create_table(?TABLE_NAME, [{record_name, delay_entry}, - {attributes, - record_info(fields, delay_entry)}, - {type, bag}, - {disc_copies, [node()]}]), - _ = mnesia:create_table(?INDEX_TABLE_NAME, [{record_name, delay_index}, - {attributes, - record_info(fields, delay_index)}, - {type, ordered_set}, - {disc_copies, [node()]}]), - rabbit_table:wait([?TABLE_NAME, ?INDEX_TABLE_NAME]). +setup() -> + ok. disable_plugin() -> - _ = mnesia:delete_table(?INDEX_TABLE_NAME), - _ = mnesia:delete_table(?TABLE_NAME), ok. -messages_delayed(Exchange) -> - ExchangeName = Exchange#exchange.name, - MatchHead = #delay_entry{delay_key = make_key('_', #exchange{name = ExchangeName, _ = '_'}), - delivery = '_', ref = '_'}, - Delays = mnesia:dirty_select(?TABLE_NAME, [{MatchHead, [], [true]}]), +messages_delayed(_Exchange) -> + Delays = [], length(Delays). refresh_config() -> @@ -116,13 +84,13 @@ refresh_config() -> init([]) -> _ = recover(), - {ok, #state{timer = not_set}}. + {ok, #state{}}. handle_call({delay_message, Exchange, Message, Delay}, - _From, State = #state{timer = CurrTimer}) -> - Reply = {ok, NewTimer} = internal_delay_message(CurrTimer, Exchange, Message, Delay), - State2 = State#state{timer = NewTimer}, - {reply, Reply, State2}; + _From, State) -> + %% TODO perhaps write and read should be handled in different processes. + ok = internal_delay_message(State, Exchange, Message, Delay), + {reply, ok, State}; handle_call(refresh_config, _From, State) -> {reply, ok, refresh_config(State)}; handle_call(_Req, _From, State) -> @@ -130,20 +98,33 @@ handle_call(_Req, _From, State) -> handle_cast(go, State) -> State2 = refresh_config(State), - {noreply, State2#state{timer = maybe_delay_first()}}; + Ref = erlang:send_after(?Timeout, self(), check_msgs), + rabbit_delayed_stream_handler:setup(), + {noreply, State2#state{timer = Ref}}; handle_cast(_C, State) -> {noreply, State}. -handle_info({timeout, _TimerRef, {deliver, Key}}, State) -> - case mnesia:dirty_read(?TABLE_NAME, Key) of - [] -> - mnesia:dirty_delete(?INDEX_TABLE_NAME, Key); - Deliveries -> - _ = route(Key, Deliveries, State), - mnesia:dirty_delete(?TABLE_NAME, Key), - mnesia:dirty_delete(?INDEX_TABLE_NAME, Key) +handle_info(check_msgs, #state{khepri_mod = Mod} = State) -> + case ra_leaderboard:lookup_leader(Mod:get_store_id()) of + {_Name, Node} when Node == node() -> + {ok, Es} = + Mod:match( + [delayed_message_exchange, + #if_path_matches{regex=any}, + #if_all{conditions = + [delivery_time, + #if_data_matches{pattern = '$1', + conditions = [{'<', '$1', erlang:system_time(milli_seconds)}]} + ] + } + ]), + Keys = [[delayed_message_exchange, Key] || {[delayed_message_exchange, Key|_], _} <- maps:to_list(Es)], + route_messages(Keys, State); + _ -> + ok end, - {noreply, State#state{timer = maybe_delay_first()}}; + Ref = erlang:send_after(?Timeout, self(), check_msgs), + {noreply, State#state{timer = Ref}}; handle_info(_I, State) -> {noreply, State}. @@ -154,84 +135,52 @@ code_change(_, State, _) -> {ok, State}. %%-------------------------------------------------------------------- -maybe_delay_first() -> - case mnesia:dirty_first(?INDEX_TABLE_NAME) of - %% destructuring to prevent matching '$end_of_table' - #delay_key{timestamp = FirstTS} = Key2 -> - %% there are messages that will expire and need to be delivered - Now = erlang:system_time(milli_seconds), - start_timer(FirstTS - Now, Key2); - _ -> - %% nothing to do - not_set - end. - -route(#delay_key{exchange = Ex}, Deliveries, State) -> +route_messages([], State) -> + State; +route_messages([Key|Keys], #state{khepri_mod = Mod} = State) -> + {ok, Exchange} = Mod:get(Key++[exchange]), + [_, MsgKey] = Key, + V = rabbit_delayed_message_kv_store:do_take(MsgKey), + route(Exchange, [V], State), + %%Send tombstone msg on the stream + rabbit_delayed_stream_handler:delete_msg_with_key(MsgKey), + %%Delete msg from Khepri + Mod:delete(Key), + route_messages(Keys, State). + +route(Ex, Deliveries, State) -> ExName = Ex#exchange.name, - lists:map(fun (#delay_entry{delivery = Msg0}) -> + rabbit_log:debug(">>> Delayed message exchange:~nEX:~n~pDevs:~n~p",[Ex, Deliveries]), + lists:map(fun (Msg0) -> Msg1 = case Msg0 of - #delivery{message = BasicMessage} -> + #delivery{message = BasicMessage} -> BasicMessage; - _MC -> - Msg0 - end, - Msg2 = swap_delay_header(Msg1), + _MC -> + Msg0 + end, + Msg2 = rabbit_delayed_message_utils:swap_delay_header(Msg1), Dests = rabbit_exchange:route(Ex, Msg2), Qs = rabbit_amqqueue:lookup_many(Dests), _ = rabbit_queue_type:deliver(Qs, Msg2, #{}, stateless), bump_routed_stats(ExName, Qs, State) end, Deliveries). -internal_delay_message(CurrTimer, Exchange, Message, Delay) -> +internal_delay_message(#state{khepri_mod = Mod}, Exchange, Message, Delay) -> Now = erlang:system_time(milli_seconds), - %% keys are timestamps in milliseconds,in the future DelayTS = Now + Delay, - mnesia:dirty_write(?INDEX_TABLE_NAME, - make_index(DelayTS, Exchange)), - mnesia:dirty_write(?TABLE_NAME, - make_delay(DelayTS, Exchange, Message)), - case CurrTimer of - not_set -> - %% No timer in progress, so we start our own. - {ok, maybe_delay_first()}; - _ -> - case erlang:read_timer(CurrTimer) of - false -> - %% Timer is already expired. Handler will be invoked soon. - {ok, CurrTimer}; - CurrMS when Delay < CurrMS -> - %% Current timer lasts longer that new message delay - _ = erlang:cancel_timer(CurrTimer), - {ok, start_timer(Delay, make_key(DelayTS, Exchange))}; - _ -> - %% Timer is set to expire sooner than this - %% message's scheduled delivery time. - {ok, CurrTimer} - end - end. - -%% Key will be used upon message receipt to fetch -%% the deliveries from the database -start_timer(Delay, Key) -> - erlang:start_timer(erlang:max(0, Delay), self(), {deliver, Key}). - -make_delay(DelayTS, Exchange, Delivery) -> - #delay_entry{delay_key = make_key(DelayTS, Exchange), - delivery = Delivery, - ref = make_ref()}. - -make_index(DelayTS, Exchange) -> - #delay_index{delay_key = make_key(DelayTS, Exchange), - const = true}. - -make_key(DelayTS, Exchange) -> - #delay_key{timestamp = DelayTS, - exchange = Exchange}. + Key = make_key(DelayTS, Exchange), + Mod:put([delayed_message_exchange, Key, delivery_time], DelayTS), + Mod:put([delayed_message_exchange, Key, exchange], Exchange), + rabbit_delayed_stream_handler:store_msg_with_key(Message, Key), + ok. -append_to_atom(Atom, Append) when is_atom(Append) -> - append_to_atom(Atom, atom_to_list(Append)); -append_to_atom(Atom, Append) when is_list(Append) -> - list_to_atom(atom_to_list(Atom) ++ Append). +make_key(_DelayTS, _Exchange) -> + %% TODO: make unique, or store more than one msg/exchange data with the key. + %% BinDelayTS = integer_to_binary(DelayTS), + %% ExchangeName = Exchange#exchange.name#resource.name, + %% <>. + %% TODO: Just a uuid for now. Any need to make the key actually matter? + rabbit_guid:gen(). recover() -> %% topology recovery has already happened, we have to recover state for any durable @@ -251,6 +200,11 @@ recover() -> end. list_exchanges() -> + %% TODO Change to fetch from khperi. + %% I think like this: + %% rabbit_db_exchange:match(#exchange{durable = true, + %% type = 'x-delayed-message', + %% _ = '_'}) case mnesia:transaction( fun () -> mnesia:match_object( @@ -265,10 +219,11 @@ list_exchanges() -> end. recover_exchange_and_bindings(#exchange{name = XName} = X) -> + %%TODO Use khepri here ofc. mnesia:transaction( fun () -> Bindings = rabbit_binding:list_for_source(XName), - _ = [rabbit_exchange_type_delayed_message:add_binding(transaction, X, B) + _ = [rabbit_exchange_type_delayed_message:add_binding(transaction, X, B) || B <- lists:usort(Bindings)], rabbit_log:debug("Delayed message exchange: " "recovered bindings for ~s", @@ -314,11 +269,14 @@ bump_routed_stats(ExName, Qs, State) -> ok end. -refresh_config(State) -> - rabbit_event:init_stats_timer(State, #state.stats_state). - -table_name() -> - ?TABLE_NAME. - -index_table_name() -> - ?INDEX_TABLE_NAME. +refresh_config(State0) -> + Mod = case rabbit_feature_flags:is_enabled(khepri_db) of + true -> + rabbit_khepri; + false -> + ok = rabbit_delayed_message_khepri:setup(), + rabbit_delayed_message_khepri + end, + + State = rabbit_event:init_stats_timer(State0, #state.stats_state), + State#state{khepri_mod = Mod}. diff --git a/src/rabbit_delayed_message_khepri.erl b/src/rabbit_delayed_message_khepri.erl new file mode 100644 index 0000000..6a27390 --- /dev/null +++ b/src/rabbit_delayed_message_khepri.erl @@ -0,0 +1,135 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +%% NOTE that this module uses os:timestamp/0 but in the future Erlang +%% will have a new time API. +%% See: +%% https://www.erlang.org/documentation/doc-7.0-rc1/erts-7.0/doc/html/erlang.html#now-0 +%% and +%% https://www.erlang.org/documentation/doc-7.0-rc1/erts-7.0/doc/html/time_correction.html + +-module(rabbit_delayed_message_khepri). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("kernel/include/logger.hrl"). +-include_lib("rabbit_common/include/logging.hrl"). + +-export([setup/0, + get_store_id/0, + put/2, + delete/1, + get/1, + match/1, + get_many/1]). + +-define(RA_SYSTEM, dmx_coordination). +-define(RA_CLUSTER_NAME, rabbitmq_dmx_metadata). +-define(RA_FRIENDLY_NAME, "RabbitMQ Delayed Message Exchange metadata store"). +-define(STORE_ID, ?RA_CLUSTER_NAME). + +setup() -> + RaServerConfig = #{cluster_name => ?RA_CLUSTER_NAME, + friendly_name => ?RA_FRIENDLY_NAME}, + ok = ensure_ra_system_started(?RA_SYSTEM), + case khepri:start(?RA_SYSTEM, RaServerConfig) of + {ok, ?STORE_ID} -> + wait_for_leader(), + %register_projections() would this be needed for dmx? + ok; + {error, _} = Error -> + exit(Error) + end. + +get_store_id() -> + ?STORE_ID. + +ensure_ra_system_started(RaSystem) -> + RaSystemConfig = get_config(RaSystem), + ?LOG_DEBUG( + "Starting Ra system called \"~ts\" with configuration:~n~tp", + [RaSystem, RaSystemConfig], + #{domain => ?RMQLOG_DOMAIN_GLOBAL}), + case ra_system:start(RaSystemConfig) of + {ok, _} -> + ?LOG_DEBUG( + "Ra system \"~ts\" ready", + [RaSystem], + #{domain => ?RMQLOG_DOMAIN_GLOBAL}), + ok; + {error, {already_started, _}} -> + ?LOG_DEBUG( + "Ra system \"~ts\" ready", + [RaSystem], + #{domain => ?RMQLOG_DOMAIN_GLOBAL}), + ok; + Error -> + ?LOG_ERROR( + "Failed to start Ra system \"~ts\": ~tp", + [RaSystem, Error], + #{domain => ?RMQLOG_DOMAIN_GLOBAL}), + throw(Error) + end. + +-define(COORD_WAL_MAX_SIZE_B, 64_000_000). +get_config(dmx_coordination = RaSystem) -> + DefaultConfig = get_default_config(), + CoordDataDir = filename:join( + [rabbit:data_dir(), "dmx_coordination", node()]), + DefaultConfig#{name => RaSystem, + data_dir => CoordDataDir, + wal_data_dir => CoordDataDir, + wal_max_size_bytes => ?COORD_WAL_MAX_SIZE_B, + names => ra_system:derive_names(RaSystem)}. + +get_default_config() -> + ra_system:default_config(). + +wait_for_leader() -> + wait_for_leader(retry_timeout(), retry_limit()). + +retry_timeout() -> + case application:get_env(rabbit, khepri_leader_wait_retry_timeout) of + {ok, T} -> T; + undefined -> 30000 + end. + +retry_limit() -> + case application:get_env(rabbit, khepri_leader_wait_retry_limit) of + {ok, T} -> T; + undefined -> 10 + end. + +wait_for_leader(_Timeout, 0) -> + exit(timeout_waiting_for_leader); +wait_for_leader(Timeout, Retries) -> + rabbit_log:info("DMX: Waiting for Khepri leader for ~tp ms, ~tp retries left", + [Timeout, Retries - 1]), + Options = #{timeout => Timeout, + favor => compromise}, + case khepri:exists(?STORE_ID, [], Options) of + Exists when is_boolean(Exists) -> + rabbit_log:info("DMX: Khepri leader elected"), + ok; + {error, {timeout, _ServerId}} -> + wait_for_leader(Timeout, Retries -1); + {error, Reason} -> + throw(Reason) + end. + +put(PathPattern, Data) -> + khepri:put(?STORE_ID, PathPattern, Data). + +get(PathPattern) -> + khepri:get(?STORE_ID, PathPattern). + +delete(PathPattern) -> + khepri:delete(?STORE_ID, PathPattern). + +get_many(PathPattern) -> + khepri:get_many(?STORE_ID, PathPattern). + +match(PathPattern) -> + khepri:get_many(?STORE_ID, PathPattern). diff --git a/src/rabbit_delayed_message_kv_store.erl b/src/rabbit_delayed_message_kv_store.erl new file mode 100644 index 0000000..5f8abd1 --- /dev/null +++ b/src/rabbit_delayed_message_kv_store.erl @@ -0,0 +1,81 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +%% NOTE that this module uses os:timestamp/0 but in the future Erlang +%% will have a new time API. +%% See: +%% https://www.erlang.org/documentation/doc-7.0-rc1/erts-7.0/doc/html/erlang.html#now-0 +%% and +%% https://www.erlang.org/documentation/doc-7.0-rc1/erts-7.0/doc/html/time_correction.html + +-module(rabbit_delayed_message_kv_store). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("kernel/include/logger.hrl"). +-include_lib("rabbit_common/include/logging.hrl"). + +%% TODO due to using streams etc, maybe a bit overkill to make this a server... +-behaviour(gen_server). + +-export([start_link/0, leveled_bookie_start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([ + do_write/2, + do_delete/1, + do_take/1 + ]). + +-record(state, + {kv_store_pid + }). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +leveled_bookie_start_link() -> + DataDir = filename:join( + [rabbit:data_dir(), "dmx", node()]), + Result = {ok, Pid} = leveled_bookie:book_start([{root_path, DataDir}, + {log_level, error}]), + register(rabbit_leveled_bookie, Pid), + Result. + +init([]) -> + {ok, #state{kv_store_pid = rabbit_leveled_bookie}}. + + +do_write(Key, Value) -> + gen_server:cast(?MODULE, {write, Key, Value}). + +do_delete(Key) -> + gen_server:cast(?MODULE, {delete, Key}). + +do_take(Key) -> + gen_server:call(?MODULE, {take, Key}). + +handle_call({take, Key}, _From, #state{kv_store_pid = Ref} = State) -> + Value = case leveled_bookie:book_get(whereis(Ref), "foo", Key) of + {ok, V} -> V; + not_found -> not_found + end, + {reply, Value, State}. + +handle_cast({write, Key, Value}, #state{kv_store_pid = Ref} = State) -> + leveled_bookie:book_put(whereis(Ref), "foo", Key, Value, []), + {noreply, State}; +handle_cast({delete, Key}, #state{kv_store_pid = Ref} = State) -> + leveled_bookie:book_delete(whereis(Ref), "foo", Key, []), + {noreply, State}. + +handle_info(_I, State) -> + {noreply, State}. + +terminate(_, _) -> + ok. + +code_change(_, State, _) -> {ok, State}. diff --git a/src/rabbit_delayed_message_sup.erl b/src/rabbit_delayed_message_sup.erl index e4af8d3..024fba1 100644 --- a/src/rabbit_delayed_message_sup.erl +++ b/src/rabbit_delayed_message_sup.erl @@ -24,13 +24,32 @@ {enables, rabbit_exchange_type_delayed_message}, {cleanup, {?MODULE, stop, []}}]}). +child(rabbit_leveled_bookie = Name) -> + child(Name, + rabbit_delayed_message_kv_store, + leveled_bookie_start_link); +child(Name) -> + child(Name, Name, start_link). +child(Name, M, F) -> + #{id => Name, + start => {M, F, []}, + restart => transient, + shutdown => ?WORKER_WAIT, + type => worker, + modules => [M]}. + +children() -> + [child(N) || N <- [rabbit_delayed_message, + rabbit_delayed_message_kv_store, + rabbit_leveled_bookie, + rabbit_delayed_stream_handler]]. + start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). init([]) -> {ok, {{one_for_one, 3, 10}, - [{rabbit_delayed_message, {rabbit_delayed_message, start_link, []}, - transient, ?WORKER_WAIT, worker, [rabbit_delayed_message]}]}}. + children()}}. stop() -> ok = supervisor:terminate_child(rabbit_sup, ?MODULE), diff --git a/src/rabbit_delayed_message_utils.erl b/src/rabbit_delayed_message_utils.erl index fcdd664..a03fd08 100644 --- a/src/rabbit_delayed_message_utils.erl +++ b/src/rabbit_delayed_message_utils.erl @@ -50,7 +50,7 @@ swap_delay_header(Delivery) -> try_convert_to_int(Type, Delay) -> case lists:member(Type, ?STRING_ARG_TYPES) of true -> {ok, rabbit_data_coercion:to_integer(Delay)}; - false -> + false -> case lists:member(Type, ?FLOAT_ARG_TYPES) of true -> {ok, trunc(Delay)}; false -> {error, {unacceptable_type, Type}} diff --git a/src/rabbit_delayed_stream_handler.erl b/src/rabbit_delayed_stream_handler.erl new file mode 100644 index 0000000..8421ad0 --- /dev/null +++ b/src/rabbit_delayed_stream_handler.erl @@ -0,0 +1,200 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +%% NOTE that this module uses os:timestamp/0 but in the future Erlang +%% will have a new time API. +%% See: +%% https://www.erlang.org/documentation/doc-7.0-rc1/erts-7.0/doc/html/erlang.html#now-0 +%% and +%% https://www.erlang.org/documentation/doc-7.0-rc1/erts-7.0/doc/html/time_correction.html + +-module(rabbit_delayed_stream_handler). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("kernel/include/logger.hrl"). +-include_lib("rabbit_common/include/logging.hrl"). + +-behaviour(gen_server). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + + +-export([setup/0, store_msg_with_key/2, delete_msg_with_key/1]). + +-record(state, + {offset = 0, queue_type}). + +-define(STREAM_QUEUE_NAME, <<"internal-dmx-queue">>). +-define(STREAM_QUEUE, + rabbit_misc:r(<<"/">>, queue, ?STREAM_QUEUE_NAME)). +-define(CTAG, <<"dmx-stream-handler">>). + +setup() -> + gen_server:cast(?MODULE, setup). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + {ok, #state{}}. + +handle_call(_M, _From, State) -> + rabbit_log:debug(">>> CALL ~p", [_M]), + {reply, ok, State}. + +handle_cast(setup, #state{offset = Offset} = State) -> + QName = ?STREAM_QUEUE, + case rabbit_amqqueue:exists(QName) of + true -> + {ok, TmpQ} = rabbit_db_queue:get(QName), + #{name := StreamId} = amqqueue:get_type_state(TmpQ), + %% TODO must be some smarter way to figure out if the stream is up and running on this host + case rabbit_stream_coordinator:stream_overview(StreamId) of + {error, noproc} -> + erlang:send_after(5000, self(), call_setup_again), + {noreply, State}; + _ -> + Spec = #{args => [{<<"x-stream-offset">>,long, Offset}], + prefetch_count => 10,channel_pid => self(), + consumer_tag => ?CTAG, exclusive_consume => false, + no_ack => false,ok_msg => undefined}, + InitQType = rabbit_queue_type:init(), + {ok, QType} = rabbit_amqqueue:with( + QName, + fun(Q) -> + rabbit_queue_type:consume(Q, Spec, InitQType) + end), + Offset = get_offset(), + {noreply, State#state{queue_type = QType, offset = Offset}} + end; + false -> + erlang:send_after(5000, self(), call_setup_again), + {noreply, State} + end; +handle_cast({queue_event, _,_} = Event, State) -> + rabbit_log:debug(">>> CAST ~p", [Event]), + case handle_queue_event(Event, State) of + {ok, NState} -> + {noreply, NState}; + {error, Reason, NState} -> + {stop, Reason, NState} + end. + +handle_info(call_setup_again, State) -> + handle_cast(setup, State); +handle_info(_I, State) -> + rabbit_log:debug(">>> INFO ~p", [_I]), + {noreply, State}. + +terminate(_, _) -> + ok. + +code_change(_, State, _) -> {ok, State}. + + +handle_queue_event({queue_event, QName, Evt}, State0 = #state{queue_type = QType0}) -> + case rabbit_queue_type:handle_event(QName, Evt, QType0) of + {ok, QType, Actions} -> + State1 = State0#state{queue_type = QType}, + %% TODO Update offset, both in state, but also in kv store? + State = handle_queue_actions(Actions, State1), + {ok, State}; + {eol, Actions} -> + State1 = handle_queue_actions(Actions, State0), + QType = rabbit_queue_type:remove(QName, QType0), + State = State1#state{queue_type = QType}, + {ok, State}; + {protocol_error, _Type, _Reason, _ReasonArgs} = Error -> + {error, Error, State0} + end. + +handle_queue_actions(Actions, State) -> + rabbit_log:debug(">>> handle actions ~p", [Actions]), + lists:foldl( + fun ({deliver, ?CTAG, Ack, Msgs}, S) -> + read_msgs(Msgs, Ack, S); + ({settled, _QName, _PktIds}, S) -> + S; + ({rejected, _QName, _PktIds}, S) -> + S; + ({block, _QName}, S) -> + S; + ({unblock, _QName}, S) -> + S; + ({queue_down, _QName}, S) -> + S + end, State, Actions). + +read_msgs(Msgs, Ack, State) -> + lists:foldl(fun(Msg, S = #state{queue_type = _QType}) -> + read_msg(Msg, Ack, S) + end, State, Msgs). + + +read_msg({QNameOrType, _QPid, QMsgId, _Redelivered, Mc} = _Delivery, + _Ack, S = #state{queue_type = QType}) -> + case mc:x_header(<<"x-tombstone-key">>, Mc) of + undefined -> + {binary, Key} = mc:x_header(<<"x-delay-key">>, Mc), + rabbit_delayed_message_kv_store:do_write(Key, Mc); + {_, TKey} -> + rabbit_delayed_message_kv_store:do_delete(TKey) + end, + NewOffset = mc:get_annotation(<<"x-stream-offset">>, Mc) + 1, + set_offset(NewOffset), + {ok, QType0, Actions} = + rabbit_queue_type:settle(QNameOrType, none, ?CTAG, [QMsgId], QType), + handle_queue_actions(Actions, S#state{queue_type = QType0, + offset = NewOffset}). + + +%% {ok, QueueType} = rabbit_queue_type:init(). +%% QN = #resource{virtual_host = <<"/">>,kind = queue, +%% name = <<"test">>} +%% Result2 = rabbit_amqqueue:with(QN, fun(Q1) -> rabbit_queue_type:consume(Q1, Spec5, S5) end). +%% rabbit_queue_type:handle_event(QName, E1, S666). +%% rabbit_queue_type:settle(QName, none, <<"foobar">>, NewDevs, S2003). +declare_queue() -> + %%TODO maybe max age and max length bytes from config. + rabbit_amqqueue:declare(?STREAM_QUEUE, + true, + false, + [{<<"x-queue-type">>, longstr, <<"stream">>},{<<"x-max-age">>, longstr, <<"5D">>}], + none, <<"dmx">>, node()). + +add_to_stream(Message) -> + Dests = [?STREAM_QUEUE], + Qs = rabbit_amqqueue:lookup_many(Dests), + _ = rabbit_queue_type:deliver(Qs, Message, #{}, stateless). + +store_msg_with_key(Message, Key) -> + case rabbit_amqqueue:exists(?STREAM_QUEUE) of + true -> + ok; + false -> + declare_queue() + end, + MsgWithKey = mc:set_annotation(<<"x-delay-key">>, Key, Message), + add_to_stream(MsgWithKey). + +delete_msg_with_key(MsgKey) -> + Ann = #{x => <<"">>, rk => [?STREAM_QUEUE_NAME], + <<"x-tombstone-key">> => MsgKey}, + Msg = mc:init(mc_amqp, [], Ann), + add_to_stream(Msg). + +set_offset(Offset) -> + rabbit_delayed_message_kv_store:do_write(offset, Offset). + +get_offset() -> + case rabbit_delayed_message_kv_store:do_take(offset) of + not_found -> + 0; + V -> + V + end. diff --git a/src/rabbit_exchange_type_delayed_message.erl b/src/rabbit_exchange_type_delayed_message.erl index 5979b7b..d4372ae 100644 --- a/src/rabbit_exchange_type_delayed_message.erl +++ b/src/rabbit_exchange_type_delayed_message.erl @@ -48,7 +48,7 @@ route(X = #exchange{name = Name}, %% route the message using proxy module case ?EXCHANGE(X) of rabbit_exchange_type_direct -> - RKs = mc:get_annotation(routing_keys, Message), + RKs = mc:get_annotation(rk, Message), %% Exchange type x-delayed-message routes via "direct exchange routing v1" %% even when feature flag direct_exchange_routing_v2 is enabled because %% table rabbit_index_route only stores bindings whose source exchange @@ -112,7 +112,7 @@ info(Exchange, Items) -> delay_message(Exchange, Message) -> case get_delay(Message) of - {ok, Delay} when Delay > 0, Delay =< ?ERL_MAX_T -> + {ok, Delay} when Delay > 0 -> rabbit_delayed_message:delay_message(Exchange, Message, Delay); _ -> nodelay