diff --git a/big_tests/tests/rdbms_SUITE.erl b/big_tests/tests/rdbms_SUITE.erl index 36f5e863a9f..1039044e63a 100644 --- a/big_tests/tests/rdbms_SUITE.erl +++ b/big_tests/tests/rdbms_SUITE.erl @@ -67,6 +67,7 @@ rdbms_queries_cases() -> insert_batch_with_null_case, test_cast_insert, test_request_insert, + test_incremental_upsert, arguments_from_two_tables]. suite() -> @@ -85,9 +86,15 @@ init_per_suite(Config) -> end_per_suite(Config) -> escalus:end_per_suite(Config). +init_per_testcase(test_incremental_upsert, Config) -> + sql_query(Config, <<"TRUNCATE TABLE inbox">>), + escalus:init_per_testcase(test_incremental_upsert, Config); init_per_testcase(CaseName, Config) -> escalus:init_per_testcase(CaseName, Config). +end_per_testcase(test_incremental_upsert, Config) -> + sql_query(Config, <<"TRUNCATE TABLE inbox">>), + escalus:end_per_testcase(test_incremental_upsert, Config); end_per_testcase(CaseName, Config) -> escalus:end_per_testcase(CaseName, Config). @@ -367,6 +374,28 @@ test_request_insert(Config) -> selected_to_sorted(SelectResult)) end, ok, #{name => request_queries}). +test_incremental_upsert(Config) -> + case is_odbc() of + true -> + ok; + false -> + do_test_incremental_upsert(Config) + end. + +do_test_incremental_upsert(Config) -> + KeyFields = [<<"luser">>, <<"lserver">>, <<"remote_bare_jid">>], + InsertFields = KeyFields ++ [<<"msg_id">>, <<"content">>, <<"unread_count">>, <<"timestamp">>], + + Key = [<<"alice">>, <<"localhost">>, <<"bob@localhost">>], + Insert = [<<"alice">>, <<"localhost">>, <<"bob@localhost">>, <<"msg_id">>, <<"content">>, 1], + sql_prepare_upsert(Config, upsert_incr, inbox, + InsertFields, [<<"timestamp">>], KeyFields, <<"timestamp">>), + sql_execute_upsert(Config, upsert_incr, Insert ++ [42], [42], Key), + sql_execute_upsert(Config, upsert_incr, Insert ++ [43], [43], Key), + sql_execute_upsert(Config, upsert_incr, Insert ++ [0], [0], Key), + SelectResult = sql_query(Config, <<"SELECT timestamp FROM inbox">>), + ?assertEqual({selected, [{<<"43">>}]}, selected_to_binary(SelectResult)). + %%-------------------------------------------------------------------- %% Text searching %%-------------------------------------------------------------------- @@ -388,6 +417,9 @@ sql_query(_Config, Query) -> sql_prepare(_Config, Name, Table, Fields, Query) -> escalus_ejabberd:rpc(mongoose_rdbms, prepare, [Name, Table, Fields, Query]). +sql_prepare_upsert(_Config, Name, Table, Insert, Update, Unique, Incr) -> + escalus_ejabberd:rpc(rdbms_queries, prepare_upsert, [host_type(), Name, Table, Insert, Update, Unique, Incr]). + sql_execute(_Config, Name, Parameters) -> slow_rpc(mongoose_rdbms, execute, [host_type(), Name, Parameters]). @@ -400,6 +432,9 @@ sql_query_cast(_Config, Query) -> sql_execute_request(_Config, Name, Parameters) -> slow_rpc(mongoose_rdbms, execute_request, [host_type(), Name, Parameters]). +sql_execute_upsert(_Config, Name, Insert, Update, Unique) -> + slow_rpc(rdbms_queries, execute_upsert, [host_type(), Name, Insert, Update, Unique]). + sql_query_request(_Config, Query) -> slow_rpc(mongoose_rdbms, sql_query_request, [host_type(), Query]). diff --git a/doc/modules/mod_inbox.md b/doc/modules/mod_inbox.md index 601f7efdb46..a92e319468c 100644 --- a/doc/modules/mod_inbox.md +++ b/doc/modules/mod_inbox.md @@ -11,7 +11,9 @@ To use it, enable mod\_inbox in the config file. * **Default:** `"rdbms"` * **Example:** `backend = "rdbms_async"` -Only RDBMS storage is supported, but `rdbms` means flushes to inbox are synchronous with each message, while `rdbms_async` is instead asynchronous. For performance reasons, `rdbms_async` is almost always preferred, but, it doesn't ensure the same consistency characteristics, as different MongooseIM nodes might race on updates. +Only RDBMS storage is supported, but `rdbms` means flushes to DB are synchronous with each message, while `rdbms_async` is instead asynchronous. + +Regular `rdbms` has worse performance characteristics, but it has better consistency properties, as events aren't lost nor reordered. `rdbms_async` processes events asynchronously and potentially unloading a lot of aggregation from the DB. Like the case of the asynchronous workers for MAM, it is the preferred method, with the risk messages being lost on an ungraceful shutdown. #### `modules.mod_inbox.async_writer.pool_size` * **Syntax:** non-negative integer diff --git a/doc/modules/mod_smart_markers.md b/doc/modules/mod_smart_markers.md index cbcd441b51e..86d987ca7ca 100644 --- a/doc/modules/mod_smart_markers.md +++ b/doc/modules/mod_smart_markers.md @@ -12,9 +12,13 @@ Strategy to handle incoming IQ requests. For details, please refer to [IQ processing policies](../configuration/Modules.md#iq-processing-policies). ### `modules.mod_smart_markers.backend` -* **Syntax:** string, only `"rdbms"` is supported at the moment. +* **Syntax:** string, one of `"rdbms"`, `"rdbms_async"` * **Default:** `"rdbms"` -* **Example:** `backend = "rdbms"` +* **Example:** `backend = "rdbms_async"` + +Only RDBMS storage is supported, but `rdbms` means flushes to DB are synchronous with each message, while `rdbms_async` is instead asynchronous. + +Regular `rdbms` has worse performance characteristics, but it has better consistency properties, as events aren't lost nor reordered. `rdbms_async` processes events asynchronously and potentially unloading a lot of aggregation from the DB. Like the case of the asynchronous workers for MAM, it is the preferred method, with the risk messages being lost on an ungraceful shutdown. ### `modules.mod_smart_markers.keep_private` * **Syntax:** boolean diff --git a/src/inbox/mod_inbox_rdbms.erl b/src/inbox/mod_inbox_rdbms.erl index f0fd5b31167..b01f193d183 100644 --- a/src/inbox/mod_inbox_rdbms.erl +++ b/src/inbox/mod_inbox_rdbms.erl @@ -60,18 +60,23 @@ init(HostType, _Options) -> [lserver], <<"DELETE FROM inbox WHERE lserver = ?">>), % upserts UniqueKeyFields = [<<"luser">>, <<"lserver">>, <<"remote_bare_jid">>], - UpdateFields = [<<"msg_id">>, <<"box = CASE WHEN inbox.box='archive' THEN 'inbox' ELSE inbox.box END">>, - <<"content">>, <<"timestamp">>, <<"unread_count">>], - InsertFields = UniqueKeyFields ++ [<<"msg_id">>, <<"content">>, <<"timestamp">>, <<"unread_count">>], + InsertFields = UniqueKeyFields ++ [<<"msg_id">>, <<"content">>, <<"unread_count">>, <<"timestamp">>], rdbms_queries:prepare_upsert(HostType, inbox_upsert, inbox, - InsertFields, UpdateFields, UniqueKeyFields), + InsertFields, + [<<"msg_id">>, + {assignment, <<"box">>, <<"CASE WHEN inbox.box='archive' THEN 'inbox' ELSE inbox.box END">>}, + <<"content">>, + <<"unread_count">>, + <<"timestamp">>], + UniqueKeyFields, <<"timestamp">>), rdbms_queries:prepare_upsert(HostType, inbox_upsert_incr_unread, inbox, InsertFields, [<<"msg_id">>, - <<"box = CASE WHEN inbox.box='archive' THEN 'inbox' ELSE inbox.box END">>, - <<"content">>, <<"timestamp">>, - {<<"unread_count">>, <<"unread_count = inbox.unread_count + ?">>}], - UniqueKeyFields), + {assignment, <<"box">>, <<"CASE WHEN inbox.box='archive' THEN 'inbox' ELSE inbox.box END">>}, + <<"content">>, + {expression, <<"unread_count">>, <<"inbox.unread_count + ?">>}, + <<"timestamp">>], + UniqueKeyFields, <<"timestamp">>), ok. -spec get_inbox(HostType :: mongooseim:host_type(), @@ -105,8 +110,8 @@ get_inbox_unread(HostType, {LUser, LServer, RemBareJID}) -> Timestamp :: integer(). set_inbox(HostType, {LUser, LServer, LToBareJid}, Content, Count, MsgId, Timestamp) -> Unique = [LUser, LServer, LToBareJid], - Update = [MsgId, Content, Timestamp, Count], - Insert = [LUser, LServer, LToBareJid, MsgId, Content, Timestamp, Count], + Update = [MsgId, Content, Count, Timestamp], + Insert = [LUser, LServer, LToBareJid, MsgId, Content, Count, Timestamp], Res = rdbms_queries:execute_upsert(HostType, inbox_upsert, Insert, Update, Unique), %% MySQL returns 1 when an upsert is an insert %% and 2, when an upsert acts as update @@ -138,8 +143,8 @@ set_inbox_incr_unread(HostType, Entry, Content, MsgId, Timestamp) -> Incrs :: pos_integer()) -> mod_inbox:count_res(). set_inbox_incr_unread(HostType, {LUser, LServer, LToBareJid}, Content, MsgId, Timestamp, Incrs) -> Unique = [LUser, LServer, LToBareJid], - Update = [MsgId, Content, Timestamp, Incrs], - Insert = [LUser, LServer, LToBareJid, MsgId, Content, Timestamp, Incrs], + Update = [MsgId, Content, Incrs, Timestamp], + Insert = [LUser, LServer, LToBareJid, MsgId, Content, Incrs, Timestamp], Res = rdbms_queries:execute_upsert(HostType, inbox_upsert_incr_unread, Insert, Update, Unique), check_result(Res). diff --git a/src/inbox/mod_inbox_rdbms_async.erl b/src/inbox/mod_inbox_rdbms_async.erl index 34bf8d6544e..436f953476f 100644 --- a/src/inbox/mod_inbox_rdbms_async.erl +++ b/src/inbox/mod_inbox_rdbms_async.erl @@ -58,14 +58,14 @@ request(Task, _Extra = #{host_type := HostType}) -> request_one(HostType, {set_inbox, {LUser, LServer, LToBareJid}, Content, Count, MsgId, Timestamp}) -> Unique = [LUser, LServer, LToBareJid], - Update = [MsgId, Content, Timestamp, Count], - Insert = [LUser, LServer, LToBareJid, MsgId, Content, Timestamp, Count], + Update = [MsgId, Content, Count, Timestamp], + Insert = [LUser, LServer, LToBareJid, MsgId, Content, Count, Timestamp], rdbms_queries:request_upsert(HostType, inbox_upsert, Insert, Update, Unique); request_one(HostType, {set_inbox_incr_unread, {LUser, LServer, LToBareJid}, Content, MsgId, Timestamp, Incrs}) -> Unique = [LUser, LServer, LToBareJid], - Update = [MsgId, Content, Timestamp, Incrs], - Insert = [LUser, LServer, LToBareJid, MsgId, Content, Timestamp, Incrs], + Update = [MsgId, Content, Incrs, Timestamp], + Insert = [LUser, LServer, LToBareJid, MsgId, Content, Incrs, Timestamp], rdbms_queries:request_upsert(HostType, inbox_upsert_incr_unread, Insert, Update, Unique); request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) -> diff --git a/src/rdbms/rdbms_queries.erl b/src/rdbms/rdbms_queries.erl index e59e1482418..a9546e42e56 100644 --- a/src/rdbms/rdbms_queries.erl +++ b/src/rdbms/rdbms_queries.erl @@ -42,12 +42,12 @@ -export([join/2, prepare_upsert/6, + prepare_upsert/7, execute_upsert/5, request_upsert/5]). -ignore_xref([ - request_upsert/5, count_records_where/3, - get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0 + count_records_where/3, get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0 ]). %% We have only two compile time options for db queries: @@ -55,7 +55,6 @@ %%-define(mssql, true). -ifndef(mssql). -undef(generic). --define(generic, true). -endif. -define(RDBMS_TYPE, (mongoose_rdbms:db_type())). @@ -116,11 +115,22 @@ request_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) -> QueryName :: atom(), TableName :: atom(), InsertFields :: [binary()], - Updates :: [binary() | {binary(), binary()}], + Updates :: [binary() | {assignment | expression, binary(), binary()}], UniqueKeyFields :: [binary()]) -> {ok, QueryName :: atom()} | {error, already_exists}. prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields) -> - SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields), + prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, none). + +-spec prepare_upsert(Host :: mongoose_rdbms:server(), + QueryName :: atom(), + TableName :: atom(), + InsertFields :: [ColumnName :: binary()], + Updates :: [binary() | {assignment | expression, binary(), binary()}], + UniqueKeyFields :: [binary()], + IncrementalField :: none | binary()) -> + {ok, QueryName :: atom()} | {error, already_exists}. +prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> + SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField), Query = iolist_to_binary(SQL), ?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}), Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields), @@ -134,12 +144,12 @@ prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) -> _ -> InsertFields ++ UpdateFields end. -upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields) -> +upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of {mysql, _} -> - upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields); + upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField); {pgsql, _} -> - upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields); + upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField); {odbc, mssql} -> upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields); NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) @@ -154,22 +164,39 @@ mysql_and_pgsql_insert(Table, Columns) -> join(Placeholders, ", "), ")"]. -upsert_mysql_query(Table, InsertFields, Updates, [Key | _]) -> +upsert_mysql_query(Table, InsertFields, Updates, [Key | _], IncrementalField) -> Insert = mysql_and_pgsql_insert(Table, InsertFields), - OnConflict = mysql_on_conflict(Updates, Key), + OnConflict = mysql_on_conflict(Table, Updates, Key, IncrementalField), [Insert, OnConflict]. -upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields) -> +upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> Insert = mysql_and_pgsql_insert(Table, InsertFields), OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields), - [Insert, OnConflict]. + WhereIncrements = pgsql_ensure_increments(Table, IncrementalField), + [Insert, OnConflict, WhereIncrements]. -mysql_on_conflict([], Key) -> +mysql_on_conflict(_Table, [], Key, _) -> %% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL [" ON DUPLICATE KEY UPDATE ", Key, " = ", Key]; -mysql_on_conflict(UpdateFields, _) -> +mysql_on_conflict(_Table, UpdateFields, _, none) -> [" ON DUPLICATE KEY UPDATE ", - update_fields_on_conflict(UpdateFields)]. + update_fields_on_conflict(UpdateFields)]; +mysql_on_conflict(Table, UpdateFields, _, IncrementalField) -> + TableName = atom_to_list(Table), + FieldsWithPlaceHolders = [mysql_fields_with_placeholders(TableName, Update, IncrementalField) + || Update <- UpdateFields], + IncrUpdates = join(FieldsWithPlaceHolders, ", "), + [" AS alias ON DUPLICATE KEY UPDATE ", IncrUpdates]. + +mysql_fields_with_placeholders(TableName, UpdateField, IncrementalField) -> + Alternatives = case UpdateField of + {Op, Column, Expression} when Op =:= assignment; Op =:= expression -> + [Expression, ", ", TableName, ".", Column, ")"]; + Column -> + ["? , ", TableName, ".", Column, ")"] + end, + [ Column, " = IF(", TableName, ".", IncrementalField, " < alias.", IncrementalField, ", " + | Alternatives]. pgsql_on_conflict([], UniqueKeyFields) -> JoinedKeys = join(UniqueKeyFields, ", "), @@ -181,9 +208,15 @@ pgsql_on_conflict(UpdateFields, UniqueKeyFields) -> update_fields_on_conflict(UpdateFields)]. update_fields_on_conflict(Updates) -> - FieldsWithPlaceHolders = [update_field_expression(Update) || Update <- Updates], + FieldsWithPlaceHolders = [get_field_expression(Update) || Update <- Updates], join(FieldsWithPlaceHolders, ", "). +pgsql_ensure_increments(_Table, none) -> + []; +pgsql_ensure_increments(Table, IncrementalField) -> + TableName = atom_to_list(Table), + [" WHERE ", TableName, ".", IncrementalField, " < EXCLUDED.", IncrementalField]. + upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) -> UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields], BinTab = atom_to_binary(Table, utf8), @@ -202,28 +235,17 @@ mssql_on_conflict([]) -> ";"; mssql_on_conflict(Updates) -> [" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"]. -update_field_expression(Update) -> - case get_field_expression(Update) of - {true, Expr} -> Expr; - true -> [Update, " = ?"]; - false -> Update - end. - -get_field_expression({_, FieldExpr}) -> - case binary:match(FieldExpr, <<"=">>) of - nomatch -> false; - _ -> {true, FieldExpr} - end; -get_field_expression(FieldExpr) -> - binary:match(FieldExpr, <<"=">>) =:= nomatch. - -get_field_name({Field, _}) -> - case binary:match(Field, <<"=">>) of - nomatch -> {true, Field}; - _ -> false - end; -get_field_name(FieldExpr) -> - binary:match(FieldExpr, <<"=">>) =:= nomatch. +get_field_expression({Op, ColumnName, Expr}) when Op =:= assignment; Op =:= expression -> + [ColumnName, " = ", Expr]; +get_field_expression(Field) when is_binary(Field) -> + [Field, " = ?"]. + +get_field_name({assignment, Field, _}) when is_binary(Field) -> + false; +get_field_name({expression, Field, _}) when is_binary(Field) -> + {true, Field}; +get_field_name(Field) when is_binary(Field) -> + true. %% F can be either a fun or a list of queries %% TODO: We should probably move the list of queries transaction diff --git a/src/smart_markers/mod_smart_markers_rdbms.erl b/src/smart_markers/mod_smart_markers_rdbms.erl index 36de987bf08..1d7f26bfa27 100644 --- a/src/smart_markers/mod_smart_markers_rdbms.erl +++ b/src/smart_markers/mod_smart_markers_rdbms.erl @@ -24,7 +24,7 @@ init(HostType, _) -> UpdateFields = [<<"msg_id">>, <<"timestamp">>], InsertFields = KeyFields ++ UpdateFields, rdbms_queries:prepare_upsert(HostType, smart_markers_upsert, smart_markers, - InsertFields, UpdateFields, KeyFields), + InsertFields, UpdateFields, KeyFields, <<"timestamp">>), mongoose_rdbms:prepare(smart_markers_select_conv, smart_markers, [lserver, luser, to_jid, thread, timestamp], <<"SELECT lserver, luser, to_jid, thread, type, msg_id, timestamp FROM smart_markers " diff --git a/src/smart_markers/mod_smart_markers_rdbms_async.erl b/src/smart_markers/mod_smart_markers_rdbms_async.erl index b0303e0ba6f..ce0a975d802 100644 --- a/src/smart_markers/mod_smart_markers_rdbms_async.erl +++ b/src/smart_markers/mod_smart_markers_rdbms_async.erl @@ -113,4 +113,3 @@ verify(Answer, Marker, _Extra) -> marker => Marker}); _ -> ok end. -