diff --git a/apps/ejabberd/include/mod_offline.hrl b/apps/ejabberd/include/mod_offline.hrl new file mode 100644 index 00000000000..2c29b19eef8 --- /dev/null +++ b/apps/ejabberd/include/mod_offline.hrl @@ -0,0 +1 @@ +-record(offline_msg, {us, timestamp, expire, from, to, packet}). diff --git a/apps/ejabberd/priv/mysql.sql b/apps/ejabberd/priv/mysql.sql index 0c5cde5ebed..8faa1bfe9df 100644 --- a/apps/ejabberd/priv/mysql.sql +++ b/apps/ejabberd/priv/mysql.sql @@ -283,3 +283,14 @@ CREATE TABLE mam_muc_message( ); CREATE INDEX i_mam_muc_message_room_name_added_at USING BTREE ON mam_muc_message(room_id, id); + +CREATE TABLE offline_message( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + timestamp BIGINT UNSIGNED NOT NULL, + expire BIGINT UNSIGNED, + server varchar(250) NOT NULL, + username varchar(250) NOT NULL, + from_jid varchar(250) NOT NULL, + packet blob NOT NULL +); +CREATE INDEX i_offline_message USING BTREE ON offline_message(server, username, id); diff --git a/apps/ejabberd/priv/pg.sql b/apps/ejabberd/priv/pg.sql index 0ce5622415f..6578cc989d7 100644 --- a/apps/ejabberd/priv/pg.sql +++ b/apps/ejabberd/priv/pg.sql @@ -292,3 +292,17 @@ CREATE INDEX i_mam_muc_message_room_name_added_at ON mam_muc_message USING BTREE (room_id, id); + +CREATE TABLE offline_message( + id SERIAL UNIQUE PRIMARY Key, + timestamp BIGINT NOT NULL, + expire BIGINT, + server varchar(250) NOT NULL, + username varchar(250) NOT NULL, + from_jid varchar(250) NOT NULL, + packet text NOT NULL +); +CREATE INDEX i_offline_message + ON offline_message + USING BTREE + (server, username, id); diff --git a/apps/ejabberd/src/mod_offline.erl b/apps/ejabberd/src/mod_offline.erl index cd379aa0758..82a5b0c933c 100644 --- a/apps/ejabberd/src/mod_offline.erl +++ b/apps/ejabberd/src/mod_offline.erl @@ -1,7 +1,8 @@ %%%---------------------------------------------------------------------- %%% File : mod_offline.erl %%% Author : Alexey Shchepin -%%% Purpose : Store and manage offline messages in Mnesia database. +%%% Purpose : Store and manage offline messages +%%% See : XEP-0160: Best Practices for Handling Offline Messages %%% Created : 5 Jan 2003 by Alexey Shchepin %%% %%% @@ -29,42 +30,59 @@ -behaviour(gen_mod). --export([start/2, - loop/1, - stop/1, - store_packet/3, - resend_offline_messages/2, - pop_offline_messages/3, - get_sm_features/5, - remove_expired_messages/0, - remove_old_messages/1, - remove_user/2, - get_queue_length/2, - webadmin_page/3, - webadmin_user/4, - webadmin_user_parse_query/5]). +%% gen_mod handlers +-export([start/2, stop/1]). + +%% Hook handlers +-export([inspect_packet/3, + resend_offline_messages/2, + pop_offline_messages/3, + get_sm_features/5, + remove_expired_messages/1, + remove_old_messages/2, + remove_user/2]). + +%% Internal exports +-export([start_link/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). -include("ejabberd.hrl"). -include("jlib.hrl"). --include("ejabberd_http.hrl"). --include("ejabberd_web_admin.hrl"). - --record(offline_msg, {us, timestamp, expire, from, to, packet}). +-include("mod_offline.hrl"). -define(PROCNAME, ejabberd_offline). --define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). %% default value for the maximum number of user messages -define(MAX_USER_MESSAGES, infinity). +-define(BACKEND, (mod_offline_backend:backend())). + +-record(state, {host, access_max_user_messages}). + +%% ------------------------------------------------------------------ +%% Backend callbacks + +-callback init(Host, Opts) -> ok when + Host :: binary(), + Opts :: list(). + +-callback remove_user(LUser, LServer) -> ok when + LUser :: binary(), + LServer :: binary(). + +%% gen_mod callbacks +%% ------------------------------------------------------------------ start(Host, Opts) -> - mnesia:create_table(offline_msg, - [{disc_only_copies, [node()]}, - {type, bag}, - {attributes, record_info(fields, offline_msg)}]), - update_table(), + AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, + max_user_offline_messages), + start_backend_module(Opts), + ?BACKEND:init(Host, Opts), + start_worker(Host, AccessMaxOfflineMsgs), ejabberd_hooks:add(offline_message_hook, Host, - ?MODULE, store_packet, 50), + ?MODULE, inspect_packet, 50), ejabberd_hooks:add(resend_offline_messages_hook, Host, ?MODULE, pop_offline_messages, 50), ejabberd_hooks:add(remove_user, Host, @@ -75,52 +93,59 @@ start(Host, Opts) -> ?MODULE, get_sm_features, 50), ejabberd_hooks:add(disco_local_features, Host, ?MODULE, get_sm_features, 50), - ejabberd_hooks:add(webadmin_page_host, Host, - ?MODULE, webadmin_page, 50), - ejabberd_hooks:add(webadmin_user, Host, - ?MODULE, webadmin_user, 50), - ejabberd_hooks:add(webadmin_user_parse_query, Host, - ?MODULE, webadmin_user_parse_query, 50), - AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, max_user_offline_messages), - register(gen_mod:get_module_proc(Host, ?PROCNAME), - spawn(?MODULE, loop, [AccessMaxOfflineMsgs])). - -loop(AccessMaxOfflineMsgs) -> - receive - #offline_msg{us=US} = Msg -> - Msgs = receive_all(US, [Msg]), - Len = length(Msgs), - {User, Host} = US, - MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs, - User, Host), - F = fun() -> - %% Only count messages if needed: - Count = if MaxOfflineMsgs =/= infinity -> - Len + p1_mnesia:count_records( - offline_msg, - #offline_msg{us=US, _='_'}); - true -> - 0 - end, - if - Count > MaxOfflineMsgs -> - discard_warn_sender(Msgs); - true -> - if - Len >= ?OFFLINE_TABLE_LOCK_THRESHOLD -> - mnesia:write_lock_table(offline_msg); - true -> - ok - end, - lists:foreach(fun(M) -> - mnesia:write(M) - end, Msgs) - end - end, - mnesia:transaction(F), - loop(AccessMaxOfflineMsgs); - _ -> - loop(AccessMaxOfflineMsgs) + ok. + +stop(Host) -> + ejabberd_hooks:delete(offline_message_hook, Host, + ?MODULE, inspect_packet, 50), + ejabberd_hooks:delete(resend_offline_messages_hook, Host, + ?MODULE, pop_offline_messages, 50), + ejabberd_hooks:delete(remove_user, Host, + ?MODULE, remove_user, 50), + ejabberd_hooks:delete(anonymous_purge_hook, Host, + ?MODULE, remove_user, 50), + ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50), + ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50), + stop_worker(Host), + ok. + + +%% Dynamic modules +%% ------------------------------------------------------------------ + +start_backend_module(Opts) -> + Backend = gen_mod:get_opt(backend, Opts, mnesia), + {Mod, Code} = dynamic_compile:from_string(mod_offline_backend(Backend)), + code:load_binary(Mod, "mod_offline_backend.erl", Code). + +-spec mod_offline_backend(atom()) -> string(). +mod_offline_backend(Backend) when is_atom(Backend) -> + lists:flatten( + ["-module(mod_offline_backend). + -export([backend/0]). + -spec backend() -> atom(). + backend() -> + mod_offline_", + atom_to_list(Backend), + ".\n"]). + +%% Server side functions +%% ------------------------------------------------------------------ + +handle_offline_msg(#offline_msg{us=US} = Msg, AccessMaxOfflineMsgs) -> + {LUser, LServer} = US, + Msgs = receive_all(US, [Msg]), + MaxOfflineMsgs = get_max_user_messages( + AccessMaxOfflineMsgs, LUser, LServer), + case ?BACKEND:write_messages(LUser, LServer, Msgs, MaxOfflineMsgs) of + ok -> + ok; + {discarded, DiscardedMsgs} -> + discard_warn_sender(DiscardedMsgs); + {error, Reason} -> + ?ERROR_MSG("~ts@~ts: write_messages failed with ~p.", + [LUser, LServer, Reason]), + discard_warn_sender(Msgs) end. %% Function copied from ejabberd_sm.erl: @@ -139,69 +164,161 @@ receive_all(US, Msgs) -> Msgs end. - -stop(Host) -> - ejabberd_hooks:delete(offline_message_hook, Host, - ?MODULE, store_packet, 50), - ejabberd_hooks:delete(resend_offline_messages_hook, Host, - ?MODULE, pop_offline_messages, 50), - ejabberd_hooks:delete(remove_user, Host, - ?MODULE, remove_user, 50), - ejabberd_hooks:delete(anonymous_purge_hook, Host, - ?MODULE, remove_user, 50), - ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50), - ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50), - ejabberd_hooks:delete(webadmin_page_host, Host, - ?MODULE, webadmin_page, 50), - ejabberd_hooks:delete(webadmin_user, Host, - ?MODULE, webadmin_user, 50), - ejabberd_hooks:delete(webadmin_user_parse_query, Host, - ?MODULE, webadmin_user_parse_query, 50), - Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - exit(whereis(Proc), stop), - {wait, Proc}. - -get_sm_features(Acc, _From, _To, "", _Lang) -> - Feats = case Acc of - {result, I} -> I; - _ -> [] - end, - {result, Feats ++ [?NS_FEATURE_MSGOFFLINE]}; - +%% Supervision +%% ------------------------------------------------------------------ + +start_worker(Host, AccessMaxOfflineMsgs) -> + Proc = srv_name(Host), + ChildSpec = + {Proc, + {?MODULE, start_link, [Proc, Host, AccessMaxOfflineMsgs]}, + permanent, + 5000, + worker, + [?MODULE]}, + supervisor:start_child(ejabberd_sup, ChildSpec), + ok. + +stop_worker(Host) -> + Proc = srv_name(Host), + supervisor:terminate_child(ejabberd_sup, Proc), + supervisor:delete_child(ejabberd_sup, Proc). + +start_link(Name, Host, AccessMaxOfflineMsgs) -> + gen_server:start_link({local, Name}, ?MODULE, [Host, AccessMaxOfflineMsgs], []). + +srv_name() -> + mod_offline. + +srv_name(Host) -> + gen_mod:get_module_proc(Host, srv_name()). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([Host, AccessMaxOfflineMsgs]) -> + {ok, #state{ + host = Host, + access_max_user_messages = AccessMaxOfflineMsgs}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call(_, _, State) -> + {reply, ok, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- + +handle_cast(Msg, State) -> + ?WARNING_MSG("Strange message ~p.", [Msg]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info(Msg=#offline_msg{}, + State=#state{access_max_user_messages = AccessMaxOfflineMsgs}) -> + handle_offline_msg(Msg, AccessMaxOfflineMsgs), + {noreply, State}; +handle_info(Msg, State) -> + ?WARNING_MSG("Strange message ~p.", [Msg]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%% Handlers +%% ------------------------------------------------------------------ + +get_sm_features(Acc, _From, _To, <<"">> = _Node, _Lang) -> + add_feature(Acc, ?NS_FEATURE_MSGOFFLINE); get_sm_features(_Acc, _From, _To, ?NS_FEATURE_MSGOFFLINE, _Lang) -> %% override all lesser features... {result, []}; - get_sm_features(Acc, _From, _To, _Node, _Lang) -> Acc. - -store_packet(From, To, Packet) -> - Type = xml:get_tag_attr_s("type", Packet), - if - (Type /= "error") and (Type /= "groupchat") and - (Type /= "headline") -> - case check_event_chatstates(From, To, Packet) of - true -> - #jid{luser = LUser, lserver = LServer} = To, - TimeStamp = now(), - #xmlel{children = Els} = Packet, - Expire = find_x_expire(TimeStamp, Els), - gen_mod:get_module_proc(To#jid.lserver, ?PROCNAME) ! - #offline_msg{us = {LUser, LServer}, - timestamp = TimeStamp, - expire = Expire, - from = From, - to = To, - packet = Packet}, - stop; - _ -> - ok - end; - true -> - ok +add_feature({result, Features}, Feature) -> + {result, Features ++ [Feature]}; +add_feature(_, Feature) -> + {result, [Feature]}. + +inspect_packet(From, To, Packet) -> + case is_interesting_packet(Packet) of + true -> + case check_event_chatstates(From, To, Packet) of + true -> + store_packet(From, To, Packet), + stop; + false -> + ok + end; + false -> + ok end. +store_packet( + From, + To = #jid{luser = LUser, lserver = LServer}, + Packet = #xmlel{children = Els}) -> + TimeStamp = now(), + Expire = find_x_expire(TimeStamp, Els), + Pid = srv_name(LServer), + Msg = #offline_msg{us = {LUser, LServer}, + timestamp = TimeStamp, + expire = Expire, + from = From, + to = To, + packet = Packet}, + Pid ! Msg, + ok. + +is_interesting_packet(Packet) -> + Type = xml:get_tag_attr_s(<<"type">>, Packet), + is_interesting_packet_type(Type). + +is_interesting_packet_type(<<"error">>) -> false; +is_interesting_packet_type(<<"groupchat">>) -> false; +is_interesting_packet_type(<<"headline">>) -> false; +is_interesting_packet_type(_) -> true. + %% Check if the packet has any content about XEP-0022 or XEP-0085 check_event_chatstates(From, To, Packet) -> #xmlel{children = Els} = Packet, @@ -218,25 +335,13 @@ check_event_chatstates(From, To, Packet) -> false; %% There was an x:event element, and maybe also other stuff {El, _, _} when El /= false -> - case xml:get_subtag(El, "id") of + case xml:get_subtag(El, <<"id">>) of false -> - case xml:get_subtag(El, "offline") of + case xml:get_subtag(El, <<"offline">>) of false -> true; _ -> - ID = case xml:get_tag_attr_s("id", Packet) of - "" -> - #xmlel{name = "id"}; - S -> - #xmlel{name = "id", - children = [#xmlcdata{content = S}]} - end, - ejabberd_router:route( - To, From, Packet#xmlel{children = [#xmlel{name = "x", - attrs = [{"xmlns", - ?NS_EVENT}], - children = [ID, - #xmlel{name = "offline"}]}]}), + ejabberd_router:route(To, From, patch_offline_message(Packet)), true end; _ -> @@ -244,13 +349,29 @@ check_event_chatstates(From, To, Packet) -> end end. +patch_offline_message(Packet) -> + ID = case xml:get_tag_attr_s(<<"id">>, Packet) of + <<"">> -> + #xmlel{name = <<"id">>}; + S -> + #xmlel{name = <<"id">>, + children = [#xmlcdata{content = S}]} + end, + Packet#xmlel{children = [x_elem(ID)]}. + +x_elem(ID) -> + #xmlel{ + name = <<"x">>, + attrs = [{<<"xmlns">>, ?NS_EVENT}], + children = [ID, #xmlel{name = <<"offline">>}]}. + %% Check if the packet has subelements about XEP-0022, XEP-0085 or other find_x_event_chatstates([], Res) -> Res; find_x_event_chatstates([#xmlcdata{} | Els], Res) -> find_x_event_chatstates(Els, Res); find_x_event_chatstates([El | Els], {A, B, C}) -> - case xml:get_tag_attr_s("xmlns", El) of + case xml:get_tag_attr_s(<<"xmlns">>, El) of ?NS_EVENT -> find_x_event_chatstates(Els, {El, B, C}); ?NS_CHATSTATES -> @@ -264,9 +385,9 @@ find_x_expire(_, []) -> find_x_expire(TimeStamp, [#xmlcdata{} | Els]) -> find_x_expire(TimeStamp, Els); find_x_expire(TimeStamp, [El | Els]) -> - case xml:get_tag_attr_s("xmlns", El) of + case xml:get_tag_attr_s(<<"xmlns">>, El) of ?NS_EXPIRE -> - Val = xml:get_tag_attr_s("seconds", El), + Val = xml:get_tag_attr_s(<<"seconds">>, El), case catch list_to_integer(Val) of {'EXIT', _} -> never; @@ -283,373 +404,75 @@ find_x_expire(TimeStamp, [El | Els]) -> find_x_expire(TimeStamp, Els) end. +pop_offline_messages(Ls, User, Server) -> + Ls ++ pop_offline_messages(User, Server). -resend_offline_messages(User, Server) -> +pop_offline_messages(User, Server) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), - US = {LUser, LServer}, - F = fun() -> - Rs = mnesia:wread({offline_msg, US}), - mnesia:delete({offline_msg, US}), - Rs - end, - case mnesia:transaction(F) of - {atomic, Rs} -> - lists:foreach( - fun(R) -> - XE = #xmlel{children = Els} = R#offline_msg.packet, - ejabberd_sm ! - {route, - R#offline_msg.from, - R#offline_msg.to, - XE#xmlel{children = Els ++ - [jlib:timestamp_to_xml(calendar:now_to_universal_time(R#offline_msg.timestamp), - utc, - jlib:make_jid(<<>>, - Server, - <<>>), - "Offline Storage"),%% TODO: Delete the next three lines once XEP-0091 is Obsolete - jlib:timestamp_to_xml(calendar:now_to_universal_time(R#offline_msg.timestamp))]}} - end, - lists:keysort(#offline_msg.timestamp, Rs)); - _ -> - ok + case ?BACKEND:pop_messages(LUser, LServer) of + {ok, Rs} -> + lists:map(fun(R) -> + Packet = resend_offline_message_packet(Server, R), + compose_offline_message(R, Packet) + end, Rs); + {error, Reason} -> + ?ERROR_MSG("~ts@~ts: pop_messages failed with ~p.", [LUser, LServer, Reason]), + [] end. -pop_offline_messages(Ls, User, Server) -> +resend_offline_messages(User, Server) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), - US = {LUser, LServer}, - F = fun() -> - Rs = mnesia:wread({offline_msg, US}), - mnesia:delete({offline_msg, US}), - Rs - end, - case mnesia:transaction(F) of - {atomic, Rs} -> - TS = now(), - Ls ++ lists:map( - fun(R) -> - XE = #xmlel{children = Els} = R#offline_msg.packet, - {route, - R#offline_msg.from, - R#offline_msg.to, - XE#xmlel{children = Els ++ - [jlib:timestamp_to_xml(calendar:now_to_universal_time(R#offline_msg.timestamp), - utc, - jlib:make_jid(<<>>, - Server, - <<>>), - "Offline Storage"),%% TODO: Delete the next three lines once XEP-0091 is Obsolete - jlib:timestamp_to_xml(calendar:now_to_universal_time(R#offline_msg.timestamp))]}} - end, - lists:filter( - fun(R) -> - case R#offline_msg.expire of - never -> - true; - TimeStamp -> - TS < TimeStamp - end - end, - lists:keysort(#offline_msg.timestamp, Rs))); - _ -> - Ls + case ?BACKEND:pop_messages(LUser, LServer) of + {ok, Rs} -> + lists:foreach(fun(R) -> + Packet = resend_offline_message_packet(Server, R), + route_offline_message(R, Packet) + end, Rs); + {error, _Reason} -> + ok end. +route_offline_message(#offline_msg{from=From, to=To}, Packet) -> + ejabberd_sm:route(From, To, Packet). -remove_expired_messages() -> - TimeStamp = now(), - F = fun() -> - mnesia:write_lock_table(offline_msg), - mnesia:foldl( - fun(Rec, _Acc) -> - case Rec#offline_msg.expire of - never -> - ok; - TS -> - if - TS < TimeStamp -> - mnesia:delete_object(Rec); - true -> - ok - end - end - end, ok, offline_msg) - end, - mnesia:transaction(F). - -remove_old_messages(Days) -> - {MegaSecs, Secs, _MicroSecs} = now(), - S = MegaSecs * 1000000 + Secs - 60 * 60 * 24 * Days, - MegaSecs1 = S div 1000000, - Secs1 = S rem 1000000, - TimeStamp = {MegaSecs1, Secs1, 0}, - F = fun() -> - mnesia:write_lock_table(offline_msg), - mnesia:foldl( - fun(#offline_msg{timestamp = TS} = Rec, _Acc) - when TS < TimeStamp -> - mnesia:delete_object(Rec); - (_Rec, _Acc) -> ok - end, ok, offline_msg) - end, - mnesia:transaction(F). +compose_offline_message(#offline_msg{from=From, to=To}, Packet) -> + {route, From, To, Packet}. -remove_user(User, Server) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), - US = {LUser, LServer}, - F = fun() -> - mnesia:delete({offline_msg, US}) - end, - mnesia:transaction(F). - -update_table() -> - Fields = record_info(fields, offline_msg), - case mnesia:table_info(offline_msg, attributes) of - Fields -> - ok; - [user, timestamp, expire, from, to, packet] -> - ?INFO_MSG("Converting offline_msg table from " - "{user, timestamp, expire, from, to, packet} format", []), - Host = ?MYNAME, - {atomic, ok} = mnesia:create_table( - mod_offline_tmp_table, - [{disc_only_copies, [node()]}, - {type, bag}, - {local_content, true}, - {record_name, offline_msg}, - {attributes, record_info(fields, offline_msg)}]), - mnesia:transform_table(offline_msg, ignore, Fields), - F1 = fun() -> - mnesia:write_lock_table(mod_offline_tmp_table), - mnesia:foldl( - fun(#offline_msg{us = U} = R, _) -> - mnesia:dirty_write( - mod_offline_tmp_table, - R#offline_msg{us = {U, Host}}) - end, ok, offline_msg) - end, - mnesia:transaction(F1), - mnesia:clear_table(offline_msg), - F2 = fun() -> - mnesia:write_lock_table(offline_msg), - mnesia:foldl( - fun(R, _) -> - mnesia:dirty_write(R) - end, ok, mod_offline_tmp_table) - end, - mnesia:transaction(F2), - mnesia:delete_table(mod_offline_tmp_table); - [user, timestamp, from, to, packet] -> - ?INFO_MSG("Converting offline_msg table from " - "{user, timestamp, from, to, packet} format", []), - Host = ?MYNAME, - {atomic, ok} = mnesia:create_table( - mod_offline_tmp_table, - [{disc_only_copies, [node()]}, - {type, bag}, - {local_content, true}, - {record_name, offline_msg}, - {attributes, record_info(fields, offline_msg)}]), - mnesia:transform_table( - offline_msg, - fun({_, U, TS, F, T, P}) -> - #xmlel{children = Els} = P, - Expire = find_x_expire(TS, Els), - #offline_msg{us = U, - timestamp = TS, - expire = Expire, - from = F, - to = T, - packet = P} - end, Fields), - F1 = fun() -> - mnesia:write_lock_table(mod_offline_tmp_table), - mnesia:foldl( - fun(#offline_msg{us = U} = R, _) -> - mnesia:dirty_write( - mod_offline_tmp_table, - R#offline_msg{us = {U, Host}}) - end, ok, offline_msg) - end, - mnesia:transaction(F1), - mnesia:clear_table(offline_msg), - F2 = fun() -> - mnesia:write_lock_table(offline_msg), - mnesia:foldl( - fun(R, _) -> - mnesia:dirty_write(R) - end, ok, mod_offline_tmp_table) - end, - mnesia:transaction(F2), - mnesia:delete_table(mod_offline_tmp_table); - _ -> - ?INFO_MSG("Recreating offline_msg table", []), - mnesia:transform_table(offline_msg, ignore, Fields) - end. +resend_offline_message_packet(Server, + #offline_msg{timestamp=TimeStamp, packet = Packet}) -> + add_timestamp(TimeStamp, Server, Packet). + +add_timestamp(undefined, _Server, Packet) -> + Packet; +add_timestamp(TimeStamp, Server, Packet) -> + Time = calendar:now_to_universal_time(TimeStamp), + %% TODO: Delete the next element once XEP-0091 is Obsolete + TimeStampLegacyXML = timestamp_legacy_xml(Server, Time), + TimeStampXML = jlib:timestamp_to_xml(Time), + xml:append_subtags(Packet, [TimeStampLegacyXML, TimeStampXML]). + +timestamp_legacy_xml(Server, Time) -> + FromJID = jlib:make_jid(<<>>, Server, <<>>), + jlib:timestamp_to_xml(Time, utc, FromJID, <<"Offline Storage">>). +remove_expired_messages(Host) -> + ?BACKEND:remove_expired_messages(Host). -%% Helper functions: +remove_old_messages(Host, Days) -> + ?BACKEND:remove_expired_messages(Host, Days). + +remove_user(User, Server) -> + ?BACKEND:remove_user(User, Server). %% Warn senders that their messages have been discarded: discard_warn_sender(Msgs) -> lists:foreach( fun(#offline_msg{from=From, to=To, packet=Packet}) -> - ErrText = "Your contact offline message queue is full. The message has been discarded.", - Lang = xml:get_tag_attr_s("xml:lang", Packet), + ErrText = <<"Your contact offline message queue is full. The message has been discarded.">>, + Lang = xml:get_tag_attr_s(<<"xml:lang">>, Packet), Err = jlib:make_error_reply( Packet, ?ERRT_RESOURCE_CONSTRAINT(Lang, ErrText)), - ejabberd_router:route( - To, - From, Err) + ejabberd_router:route(To, From, Err) end, Msgs). - - -webadmin_page(_, Host, - #request{us = _US, - path = ["user", U, "queue"], - q = Query, - lang = Lang} = _Request) -> - Res = user_queue(U, Host, Query, Lang), - {stop, Res}; - -webadmin_page(Acc, _, _) -> Acc. - -user_queue(User, Server, Query, Lang) -> - US = {jlib:nodeprep(User), jlib:nameprep(Server)}, - Res = user_queue_parse_query(US, Query), - MsgsAll = lists:keysort(#offline_msg.timestamp, - mnesia:dirty_read({offline_msg, US})), - Msgs = get_messages_subset(User, Server, MsgsAll), - FMsgs = - lists:map( - fun(#offline_msg{timestamp = TimeStamp, from = From, to = To, - packet = XE = #xmlel{attrs = Attrs}} = Msg) -> - ID = jlib:encode_base64(binary_to_list(term_to_binary(Msg))), - {{Year, Month, Day}, {Hour, Minute, Second}} = - calendar:now_to_local_time(TimeStamp), - Time = lists:flatten( - io_lib:format( - "~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w", - [Year, Month, Day, Hour, Minute, Second])), - SFrom = jlib:jid_to_binary(From), - STo = jlib:jid_to_binary(To), - Attrs2 = jlib:replace_from_to_attrs(SFrom, STo, Attrs), - Packet = XE#xmlel{attrs = Attrs2}, - FPacket = ejabberd_web_admin:pretty_print_xml(Packet), - ?XE("tr", - [?XAE("td", [{"class", "valign"}], [?INPUT("checkbox", "selected", ID)]), - ?XAC("td", [{"class", "valign"}], Time), - ?XAC("td", [{"class", "valign"}], SFrom), - ?XAC("td", [{"class", "valign"}], STo), - ?XAE("td", [{"class", "valign"}], [?XC("pre", FPacket)])] - ) - end, Msgs), - [?XC("h1", (io_lib:format(?T("~s's Offline Messages Queue"), - [us_to_list(US)])))] ++ - case Res of - ok -> [?XREST("Submitted")]; - nothing -> [] - end ++ - [?XAE("form", [{"action", ""}, {"method", "post"}], - [?XE("table", - [?XE("thead", - [?XE("tr", - [?X("td"), - ?XCT("td", "Time"), - ?XCT("td", "From"), - ?XCT("td", "To"), - ?XCT("td", "Packet") - ])]), - ?XE("tbody", - if - FMsgs == [] -> - [?XE("tr", - [?XAC("td", [{"colspan", "4"}], " ")] - )]; - true -> - FMsgs - end - )]), - ?BR, - ?INPUTT("submit", "delete", "Delete Selected") - ])]. - -user_queue_parse_query(US, Query) -> - case lists:keysearch("delete", 1, Query) of - {value, _} -> - Msgs = lists:keysort(#offline_msg.timestamp, - mnesia:dirty_read({offline_msg, US})), - F = fun() -> - lists:foreach( - fun(Msg) -> - ID = jlib:encode_base64( - binary_to_list(term_to_binary(Msg))), - case lists:member({"selected", ID}, Query) of - true -> - mnesia:delete_object(Msg); - false -> - ok - end - end, Msgs) - end, - mnesia:transaction(F), - ok; - false -> - nothing - end. - -us_to_list({User, Server}) -> - jlib:jid_to_binary({User, Server, ""}). - -get_queue_length(User, Server) -> - length(mnesia:dirty_read({offline_msg, {User, Server}})). - -get_messages_subset(User, Host, MsgsAll) -> - Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, - max_user_offline_messages), - MaxOfflineMsgs = case get_max_user_messages(Access, User, Host) of - Number when is_integer(Number) -> Number; - _ -> 100 - end, - Length = length(MsgsAll), - get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll). - -get_messages_subset2(Max, Length, MsgsAll) when Length =< Max*2 -> - MsgsAll; -get_messages_subset2(Max, Length, MsgsAll) -> - FirstN = Max, - {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), - MsgsLastN = lists:nthtail(Length - FirstN - FirstN, Msgs2), - NoJID = jlib:make_jid("...", "...", ""), - IntermediateMsg = #offline_msg{timestamp = now(), from = NoJID, to = NoJID, - packet = #xmlel{name = "..."}}, - MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN. - -webadmin_user(Acc, User, Server, Lang) -> - QueueLen = get_queue_length(jlib:nodeprep(User), jlib:nameprep(Server)), - FQueueLen = [?AC("queue/", - integer_to_list(QueueLen))], - Acc ++ [?XCT("h3", "Offline Messages:")] ++ FQueueLen ++ [?C(" "), ?INPUTT("submit", "removealloffline", "Remove All Offline Messages")]. - -webadmin_user_parse_query(_, "removealloffline", User, Server, _Query) -> - US = {User, Server}, - F = fun() -> - mnesia:write_lock_table(offline_msg), - lists:foreach( - fun(Msg) -> - mnesia:delete_object(Msg) - end, mnesia:dirty_read({offline_msg, US})) - end, - case mnesia:transaction(F) of - {aborted, Reason} -> - ?ERROR_MSG("Failed to remove offline messages: ~p", [Reason]), - {stop, error}; - {atomic, ok} -> - ?INFO_MSG("Removed all offline messages for ~s@~s", [User, Server]), - {stop, ok} - end; -webadmin_user_parse_query(Acc, _Action, _User, _Server, _Query) -> - Acc. diff --git a/apps/ejabberd/src/mod_offline_mnesia.erl b/apps/ejabberd/src/mod_offline_mnesia.erl new file mode 100644 index 00000000000..d2e441f307f --- /dev/null +++ b/apps/ejabberd/src/mod_offline_mnesia.erl @@ -0,0 +1,183 @@ +%%%---------------------------------------------------------------------- +%%% Copyright notice from the originam mod_offline.erl +%%% +%%% File : mod_offline.erl +%%% Author : Alexey Shchepin +%%% Purpose : Store and manage offline messages in Mnesia database. +%%% Created : 5 Jan 2003 by Alexey Shchepin +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2011 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% +%%%---------------------------------------------------------------------- + +-module(mod_offline_mnesia). +-behaviour(mod_offline). + +-export([init/2, + pop_messages/2, + write_messages/4, + remove_expired_messages/1, + remove_old_messages/2, + remove_user/2]). + +-include("ejabberd.hrl"). +-include("jlib.hrl"). +-include("mod_offline.hrl"). + +-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). + +init(_Host, _Opts) -> + mnesia:create_table(offline_msg, + [{disc_only_copies, [node()]}, + {type, bag}, + {attributes, record_info(fields, offline_msg)}]), + ok. + +pop_messages(LUser, LServer) -> + US = {LUser, LServer}, + F = fun() -> + Rs = mnesia:wread({offline_msg, US}), + mnesia:delete({offline_msg, US}), + Rs + end, + case mnesia:transaction(F) of + {atomic, Rs} -> + TimeStamp = now(), + {ok, skip_expired_messages(TimeStamp, lists:keysort(#offline_msg.timestamp, Rs))}; + {aborted, Reason} -> + {error, Reason}; + {error, Reason} -> + {error, Reason} + end. + +write_messages(LUser, LServer, Msgs, MaxOfflineMsgs) -> + F = fun() -> write_messages_t(LUser, LServer, Msgs, MaxOfflineMsgs) end, + case mnesia:transaction(F) of + {atomic, Result} -> + Result; + {aborted, Reason} -> + {error, {aborted, Reason}}; + {error, Reason} -> + {error, Reason} + end. + +write_messages_t(LUser, LServer, Msgs, MaxOfflineMsgs) -> + Len = length(Msgs), + case is_message_count_threshold_reached(MaxOfflineMsgs, Len, LUser, LServer) of + false -> + write_all_messages_t(Len, Msgs); + true -> + discard_all_messages_t(Msgs) + end. + +is_message_count_threshold_reached(MaxOfflineMsgs, Len, LUser, LServer) -> + case MaxOfflineMsgs of + infinity -> + false; + MaxOfflineMsgs when Len > MaxOfflineMsgs -> + true; + MaxOfflineMsgs -> + %% Only count messages if needed. + MaxArchivedMsg = MaxOfflineMsgs - Len, + MaxArchivedMsg < count_offline_messages(LUser, LServer) + + %% Do not need to count all messages in archive. + % MaxOfflineMsgs < count_offline_messages(LUser, LServer, MaxArchivedMsg + 1) + end. + +write_all_messages_t(Len, Msgs) -> + if + Len >= ?OFFLINE_TABLE_LOCK_THRESHOLD -> + mnesia:write_lock_table(offline_msg); + true -> + ok + end, + [mnesia:write(M) || M <- Msgs], + ok. + +discard_all_messages_t(Msgs) -> + {discarded, Msgs}. + +count_offline_messages(LUser, LServer) -> + US = {LUser, LServer}, + p1_mnesia:count_records(offline_msg, #offline_msg{us=US, _='_'}). + +remove_user(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + US = {LUser, LServer}, + F = fun() -> + mnesia:delete({offline_msg, US}) + end, + mnesia:transaction(F). + +remove_expired_messages(_Host) -> + TimeStamp = now(), + F = fun() -> + mnesia:write_lock_table(offline_msg), + mnesia:foldl( + fun(Rec, _Acc) -> + remove_expired_message(TimeStamp, Rec) + end, ok, offline_msg) + end, + mnesia:transaction(F). + +remove_old_messages(_Host, Days) -> + TimeStamp = fallback_timestamp(Days, now()), + F = fun() -> + mnesia:write_lock_table(offline_msg), + mnesia:foldl( + fun(Rec, _Acc) -> + remove_old_message(TimeStamp, Rec) + end, ok, offline_msg) + end, + mnesia:transaction(F). + +fallback_timestamp(Days, {MegaSecs, Secs, _MicroSecs}) -> + S = MegaSecs * 1000000 + Secs - 60 * 60 * 24 * Days, + MegaSecs1 = S div 1000000, + Secs1 = S rem 1000000, + {MegaSecs1, Secs1, 0}. + +remove_expired_message(TimeStamp, Rec) -> + case is_expired_message(TimeStamp, Rec) of + true -> + mnesia:delete_object(Rec); + false -> + ok + end. + +remove_old_message(TimeStamp, Rec) -> + case is_old_message(TimeStamp, Rec) of + true -> + mnesia:delete_object(Rec); + false -> + ok + end. + +skip_expired_messages(TimeStamp, Rs) -> + [R || R <- Rs, not is_expired_message(TimeStamp, R)]. + +is_expired_message(_TimeStamp, #offline_msg{expire=never}) -> + false; +is_expired_message(TimeStamp, #offline_msg{expire=ExpireTimeStamp}) -> + ExpireTimeStamp < TimeStamp. + +is_old_message(MaxAllowedTimeStamp, #offline_msg{timestamp=TimeStamp}) -> + TimeStamp < MaxAllowedTimeStamp. diff --git a/apps/ejabberd/src/mod_offline_odbc.erl b/apps/ejabberd/src/mod_offline_odbc.erl new file mode 100644 index 00000000000..a0afe155d40 --- /dev/null +++ b/apps/ejabberd/src/mod_offline_odbc.erl @@ -0,0 +1,171 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_offline_odbc.erl +%%% Author : Alexey Shchepin +%%% Purpose : Store and manage offline messages in relational database. +%%% Created : 5 Jan 2003 by Alexey Shchepin +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2011 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% +%%%---------------------------------------------------------------------- + +-module(mod_offline_odbc). +-behaviour(mod_offline). + +-export([init/2, + pop_messages/2, + write_messages/4, + remove_expired_messages/1, + remove_old_messages/2, + remove_user/2]). + +-include("ejabberd.hrl"). +-include("jlib.hrl"). +-include("mod_offline.hrl"). + +-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). + +init(_Host, _Opts) -> + ok. + +pop_messages(LUser, LServer) -> + US = {LUser, LServer}, + To = jlib:make_jid(LUser, LServer, <<>>), + SUser = ejabberd_odbc:escape(LUser), + SServer = ejabberd_odbc:escape(LServer), + TimeStamp = now(), + STimeStamp = encode_timestamp(TimeStamp), + case odbc_queries:pop_offline_messages(LServer, SUser, SServer, STimeStamp) of + {atomic, {selected, [<<"timestamp">>,<<"from_jid">>,<<"packet">>], Rows}} -> + {ok, rows_to_records(US, To, Rows)}; + {aborted, Reason} -> + {error, Reason}; + {error, Reason} -> + {error, Reason} + end. + +rows_to_records(US, To, Rows) -> + [row_to_record(US, To, Row) || Row <- Rows]. + +row_to_record(US, To, {STimeStamp, SFrom, SPacket}) -> + Packet = xml_stream:parse_element(SPacket), + TimeStamp = microseconds_to_now(list_to_integer(binary_to_list(STimeStamp))), + From = jlib:binary_to_jid(SFrom), + #offline_msg{us = US, + timestamp = TimeStamp, + expire = undefined, + from = From, + to = To, + packet = Packet}. + + +write_messages(LUser, LServer, Msgs, MaxOfflineMsgs) -> + SUser = ejabberd_odbc:escape(LUser), + SServer = ejabberd_odbc:escape(LServer), + write_messages_t(LServer, SUser, SServer, Msgs, MaxOfflineMsgs). + +write_messages_t(LServer, SUser, SServer, Msgs, MaxOfflineMsgs) -> + case is_message_count_threshold_reached( + LServer, SUser, SServer, Msgs, MaxOfflineMsgs) of + false -> + write_all_messages_t(LServer, SUser, SServer, Msgs); + true -> + discard_all_messages_t(Msgs) + end. + +is_message_count_threshold_reached(LServer, SUser, SServer, Msgs, MaxOfflineMsgs) -> + Len = length(Msgs), + case MaxOfflineMsgs of + infinity -> + false; + MaxOfflineMsgs when Len > MaxOfflineMsgs -> + true; + MaxOfflineMsgs -> + %% Only count messages if needed. + MaxArchivedMsg = MaxOfflineMsgs - Len, + %% Do not need to count all messages in archive. + MaxOfflineMsgs < count_offline_messages( + LServer, SUser, SServer, MaxArchivedMsg + 1) + end. + +write_all_messages_t(LServer, SUser, SServer, Msgs) -> + Rows = [record_to_row(SUser, SServer, Msg) || Msg <- Msgs], + case catch odbc_queries:push_offline_messages(LServer, Rows) of + {updated, _} -> + ok; + {aborted, Reason} -> + {error, {aborted, Reason}}; + {error, Reason} -> + {error, Reason} + end. + +record_to_row(SUser, SServer, #offline_msg{ + from = From, packet = Packet, timestamp = TimeStamp, expire = Expire}) -> + SFrom = ejabberd_odbc:escape(jlib:jid_to_binary(From)), + SPacket = ejabberd_odbc:escape(xml:element_to_binary(Packet)), + STimeStamp = encode_timestamp(TimeStamp), + SExpire = maybe_encode_timestamp(Expire), + odbc_queries:prepare_offline_message(SUser, SServer, STimeStamp, SExpire, SFrom, SPacket). + +discard_all_messages_t(Msgs) -> + {discarded, Msgs}. + +remove_user(LUser, LServer) -> + SUser = ejabberd_odbc:escape(LUser), + SServer = ejabberd_odbc:escape(LServer), + odbc_queries:remove_offline_messages(LServer, SUser, SServer). + +remove_expired_messages(LServer) -> + TimeStamp = now(), + STimeStamp = encode_timestamp(TimeStamp), + odbc_queries:remove_expired_offline_messages(LServer, STimeStamp). + +remove_old_messages(LServer, Days) -> + TimeStamp = fallback_timestamp(Days, now()), + STimeStamp = encode_timestamp(TimeStamp), + odbc_queries:remove_old_offline_messages(LServer, STimeStamp). + +count_offline_messages(LServer, SUser, SServer, Limit) -> + case odbc_queries:count_offline_messages(LServer, SUser, SServer, Limit) of + {selected, [_], [{Count}]} -> + list_to_integer(binary_to_list(Count)); + Error -> + ?ERROR_MSG("count_offline_messages failed ~p", [Error]), + 0 + end. + +fallback_timestamp(Days, {MegaSecs, Secs, _MicroSecs}) -> + S = MegaSecs * 1000000 + Secs - 60 * 60 * 24 * Days, + MegaSecs1 = S div 1000000, + Secs1 = S rem 1000000, + {MegaSecs1, Secs1, 0}. + +encode_timestamp(TimeStamp) -> + integer_to_list(now_to_microseconds(TimeStamp)). + +maybe_encode_timestamp(never) -> + "null"; +maybe_encode_timestamp(TimeStamp) -> + encode_timestamp(TimeStamp). + +now_to_microseconds({Mega, Secs, Micro}) -> + (1000000 * Mega + Secs) * 1000000 + Micro. + +microseconds_to_now(MicroSeconds) when is_integer(MicroSeconds) -> + Seconds = MicroSeconds div 1000000, + {Seconds div 1000000, Seconds rem 1000000, MicroSeconds rem 1000000}. diff --git a/apps/ejabberd/src/mod_offline_odbc_legacy.erl b/apps/ejabberd/src/mod_offline_odbc_legacy.erl new file mode 100644 index 00000000000..0cbb28a26e0 --- /dev/null +++ b/apps/ejabberd/src/mod_offline_odbc_legacy.erl @@ -0,0 +1,139 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_offline_odbc.erl +%%% Author : Alexey Shchepin +%%% Purpose : Store and manage offline messages in relational database. +%%% Created : 5 Jan 2003 by Alexey Shchepin +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2011 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% +%%%---------------------------------------------------------------------- + +-module(mod_offline_odbc_legacy). +-behaviour(mod_offline). + +-export([init/2, + pop_messages/2, + write_messages/4, + remove_expired_messages/1, + remove_old_messages/2, + remove_user/2]). + +-include("ejabberd.hrl"). +-include("jlib.hrl"). +-include("mod_offline.hrl"). + +-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). + +init(_Host, _Opts) -> + ok. + +pop_messages(LUser, LServer) -> + US = {LUser, LServer}, + SUser = ejabberd_odbc:escape(LUser), + case odbc_queries:get_and_del_spool_msg_t(LServer, SUser) of + {atomic, {selected, [<<"username">>, <<"xml">>], Rows}} -> + {ok, rows_to_records(US, Rows)}; + {aborted, Reason} -> + {error, Reason}; + {error, Reason} -> + {error, Reason} + end. + +rows_to_records(US, Rows) -> + [row_to_record(US, Row) || Row <- Rows]. + +row_to_record(US, {_UserName, SPacket}) -> + Packet = xml_stream:parse_element(SPacket), + To = jlib:binary_to_jid(xml:get_tag_attr_s(<<"to">>, Packet)), + From = jlib:binary_to_jid(xml:get_tag_attr_s(<<"from">>, Packet)), + #offline_msg{us = US, + timestamp = undefined, + expire = undefined, + from = From, + to = To, + packet = Packet}. + + +write_messages(LUser, LServer, Msgs, MaxOfflineMsgs) -> + SUser = ejabberd_odbc:escape(LUser), + write_messages_t(LServer, SUser, Msgs, MaxOfflineMsgs). + +write_messages_t(LServer, SUser, Msgs, MaxOfflineMsgs) -> + case is_message_count_threshold_reached( + LServer, SUser, Msgs, MaxOfflineMsgs) of + false -> + write_all_messages_t(LServer, SUser, Msgs); + true -> + discard_all_messages_t(Msgs) + end. + +is_message_count_threshold_reached(LServer, SUser, Msgs, MaxOfflineMsgs) -> + Len = length(Msgs), + case MaxOfflineMsgs of + infinity -> + false; + MaxOfflineMsgs when Len > MaxOfflineMsgs -> + true; + MaxOfflineMsgs -> + %% Only count messages if needed. + MaxOfflineMsgs < count_offline_messages(LServer, SUser) + end. + +write_all_messages_t(LServer, SUser, Msgs) -> + Rows = [record_to_row(SUser, Msg) || Msg <- Msgs], + case catch odbc_queries:add_spool(LServer, Rows) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + {error, {aborted, Reason}}; + {error, Reason} -> + {error, Reason} + end. + +record_to_row(SUser, #offline_msg{from = From, to = To, + timestamp = TimeStamp, packet = Packet}) -> + Time = calendar:now_to_universal_time(TimeStamp), + TimeStampElem = jlib:timestamp_to_xml(Time), + Packet2 = jlib:replace_from_to(From, To, Packet), + Packet3 = xml:append_subtags(Packet2, [TimeStampElem]), + SPacket = ejabberd_odbc:escape(xml:element_to_binary(Packet3)), + odbc_queries:add_spool_sql(SUser, SPacket). + +discard_all_messages_t(Msgs) -> + {discarded, Msgs}. + +remove_user(LUser, LServer) -> + SUser = ejabberd_odbc:escape(LUser), + odbc_queries:del_spool_msg(LServer, SUser). + +remove_expired_messages(_LServer) -> + %% TODO + ok. + +remove_old_messages(_LServer, _Days) -> + %% TODO + ok. + +count_offline_messages(LServer, SUser) -> + case odbc_queries:count_spool_msg(LServer, SUser) of + {selected, [_], [{Count}]} -> + list_to_integer(binary_to_list(Count)); + _ -> + 0 + end. diff --git a/apps/ejabberd/src/odbc_queries.erl b/apps/ejabberd/src/odbc_queries.erl index 7047b8378a4..447bd063dcd 100644 --- a/apps/ejabberd/src/odbc_queries.erl +++ b/apps/ejabberd/src/odbc_queries.erl @@ -46,6 +46,7 @@ add_spool/2, get_and_del_spool_msg_t/2, del_spool_msg/2, + count_spool_msg/2, get_average_roster_size/1, get_average_rostergroup_size/1, clear_rosters/1, @@ -87,7 +88,14 @@ escape_like_string/1, count_records_where/3, get_roster_version/2, - set_roster_version/2]). + set_roster_version/2, + prepare_offline_message/6, + push_offline_messages/2, + pop_offline_messages/4, + count_offline_messages/4, + remove_old_offline_messages/2, + remove_expired_offline_messages/2, + remove_offline_messages/3]). %% We have only two compile time options for db queries: %%-define(generic, true). @@ -362,6 +370,11 @@ del_spool_msg(LServer, Username) -> LServer, [<<"delete from spool where username='">>, Username, "';"]). +count_spool_msg(LServer, Username) -> + ejabberd_odbc:sql_query( + LServer, + [<<"select count(*) from spool where username='">>, Username, "';"]). + get_average_roster_size(Server) -> ejabberd_odbc:sql_query( Server, @@ -700,6 +713,72 @@ set_roster_version(LUser, Version) -> -endif. + +pop_offline_messages(LServer, SUser, SServer, STimeStamp) -> + SelectSQL = select_offline_messages_sql(SUser, SServer, STimeStamp), + DeleteSQL = delete_offline_messages_sql(SUser, SServer), + F = fun() -> + Res = ejabberd_odbc:sql_query_t(SelectSQL), + ejabberd_odbc:sql_query_t(DeleteSQL), + Res + end, + ejabberd_odbc:sql_transaction(LServer, F). + +select_offline_messages_sql(SUser, SServer, STimeStamp) -> + [<<"select timestamp, from_jid, packet from offline_message " + "where server = '">>, SServer, <<"' and " + "username = '">>, SUser, <<"' and " + "(expire is null or expire > ">>, STimeStamp, <<") " + "ORDER BY timestamp">>]. + +delete_offline_messages_sql(SUser, SServer) -> + [<<"delete from offline_message " + "where server = '">>, SServer, <<"' and " + "username = '">>, SUser, <<"'">>]. + +remove_old_offline_messages(LServer, STimeStamp) -> + ejabberd_odbc:sql_query( + LServer, + [<<"delete from offline_message where timestamp < ">>, STimeStamp]). + +remove_expired_offline_messages(LServer, STimeStamp) -> + ejabberd_odbc:sql_query( + LServer, + [<<"delete from offline_message " + "where expire is not null and expire < ">>, STimeStamp]). + +remove_offline_messages(LServer, SUser, SServer) -> + ejabberd_odbc:sql_query( + LServer, + [<<"delete from offline_message " + "where server = '">>, SServer, <<"' and " + "username = '">>, SUser, <<"'">>]). + +prepare_offline_message(SUser, SServer, STimeStamp, SExpire, SFrom, SPacket) -> + [<<"('">>, SUser, + <<"', '">>, SServer, + <<"', ">>, STimeStamp, + <<", ">>, SExpire, + <<", '">>, SFrom, + <<"', '">>, SPacket, + <<"')">>]. + +push_offline_messages(LServer, Rows) -> + ejabberd_odbc:sql_query( + LServer, + [<<"INSERT INTO offline_message " + "(username, server, timestamp, expire, from_jid, packet) " + "VALUES ">>, join(Rows, ", ")]). + +count_offline_messages(LServer, SUser, SServer, Limit) -> + ejabberd_odbc:sql_query( + LServer, + [<<"select count(*) from offline_message " + "where server = '">>, SServer, <<"' and " + "username = '">>, SUser, <<"' " + "limit ">>, integer_to_list(Limit)]). + + %% ----------------- %% MSSQL queries -ifdef(mssql). @@ -811,6 +890,10 @@ del_spool_msg(LServer, Username) -> LServer, ["EXECUTE dbo.del_spool_msg '", Username, "'"]). +count_spool_msg(LServer, Username) -> + %% TODO + 0. + get_roster(LServer, Username) -> ejabberd_odbc:sql_query( LServer,