Skip to content

Commit

Permalink
Merge pull request #21 from rabbitmq/handle-deleted-backup-items
Browse files Browse the repository at this point in the history
m2k_table_copy: Handle the "delete" backup items
  • Loading branch information
dumbbell authored Sep 12, 2024
2 parents 209080e + 5afe0d1 commit d41e0dd
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 65 deletions.
3 changes: 2 additions & 1 deletion src/m2k_export.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ write(
after
15_000 ->
?LOG_ERROR(
"Mnesia->Khepri data copy: [" ?MODULE_STRING "] timeout: record/~0p",
"Mnesia->Khepri data copy: [" ?MODULE_STRING "] "
"timeout: record/~0p",
[Record],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
{error, timeout}
Expand Down
99 changes: 35 additions & 64 deletions src/m2k_table_copy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -381,29 +381,6 @@ query_table_record_definitions([Table | Rest], RecordDefs) ->
query_table_record_definitions([], RecordDefs) ->
RecordDefs.

is_record_valid(#?MODULE{record_defs = RecordDefs}, Table, Tuple)
when is_tuple(Tuple) ->
#{Table := {RecordName, Arity}} = RecordDefs,
case is_record(Tuple, RecordName, Arity) of
true ->
true;
false ->
?LOG_DEBUG(
"Mnesia->Khepri data copy: "
"the following term is not a valid record for "
"table \"~ts\": ~p",
[Table, Tuple],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
false
end;
is_record_valid(_State, Table, Term) ->
?LOG_DEBUG(
"Mnesia->Khepri data copy: "
"the following term is not a valid record for table \"~ts\": ~p",
[Table, Term],
#{domain => ?KMM_M2K_TABLE_COPY_LOG_DOMAIN}),
false.

do_copy_data(#?MODULE{migration_id = MigrationId, tables = Tables} = State) ->
?LOG_INFO(
"Mnesia->Khepri data copy: "
Expand Down Expand Up @@ -519,46 +496,45 @@ handle_migration_records(
#?MODULE{backup_pid = BackupPid,
converter_mod = Mod,
converter_mod_priv = ModPriv,
tables = Tables} = State0) ->
tables = Tables} = State) ->
receive
{m2k_export, ExportPid, handle_record, Table, Record} ->
{State, Reply} =
case is_record_valid(State0, Table, Record) of
true ->
try
ActualMod = actual_mod(Mod),
Ret = ActualMod:copy_to_khepri(
Table, Record, ModPriv),
case Ret of
{ok, ModPriv1} ->
State1 = State0#?MODULE{
converter_mod_priv =
ModPriv1},
{State1, ok};
Error ->
{State0, Error}
end
catch
Class:Reason:Stacktrace ->
Exception = ?kmm_exception(
converter_mod_exception,
#{converter_mod => Mod,
tables => Tables,
class => Class,
reason => Reason,
stacktrace =>
Stacktrace}),
{State0, {error, Exception}}
end;
false ->
{State0, ok}
{State2, Reply} =
try
ActualMod = actual_mod(Mod),
Ret = case Record of
{_RecordName, Key} ->
ActualMod:delete_from_khepri(
Table, Key, ModPriv);
_ ->
ActualMod:copy_to_khepri(
Table, Record, ModPriv)
end,
case Ret of
{ok, ModPriv1} ->
State1 = State#?MODULE{converter_mod_priv = ModPriv1},
{State1, ok};
Error ->
{State, Error}
end
catch
Class:Reason:Stacktrace ->
Exception = ?kmm_exception(
converter_mod_exception,
#{converter_mod => Mod,
tables => Tables,
class => Class,
reason => Reason,
stacktrace =>
Stacktrace}),
{State, {error, Exception}}
end,
ExportPid ! {self(), record_handled, Reply},
handle_migration_records(State);
handle_migration_records(State2);
{BackupPid, done, Ret} ->
case Ret of
ok ->
State0;
State;
{error,
{'EXIT',
{error,
Expand Down Expand Up @@ -748,14 +724,9 @@ consume_mnesia_events(

consume_mnesia_events1(
[{put, Table, Record} | Rest], Mod, ModPriv, State) ->
ModPriv2 = case is_record_valid(State, Table, Record) of
true ->
case Mod:copy_to_khepri(Table, Record, ModPriv) of
{ok, ModPriv1} -> ModPriv1;
Error -> throw(Error)
end;
false ->
ModPriv
ModPriv2 = case Mod:copy_to_khepri(Table, Record, ModPriv) of
{ok, ModPriv1} -> ModPriv1;
Error -> throw(Error)
end,
Remaining = length(Rest),
if
Expand Down
1 change: 1 addition & 0 deletions src/mnesia_to_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

-include_lib("kernel/include/logger.hrl").

-include("src/kmm_error.hrl").
-include("src/kmm_logging.hrl").

-export([sync_cluster_membership/0, sync_cluster_membership/1,
Expand Down

0 comments on commit d41e0dd

Please sign in to comment.