Skip to content

Commit

Permalink
return error from Raft_replication_delegate when plugin is not available
Browse files Browse the repository at this point in the history
Summary:
The new macro is used to call into raft plugin. If plugin gets unloaded
accidentally when enable_raft_plugin is ON, then this STRICT version returns
failure. This is to be called only by raft plugin currently

Reviewed By: anirbanr-fb

Differential Revision: D23065441

fbshipit-source-id: 7d6f3dd77f0
  • Loading branch information
bhatvinay authored and facebook-github-bot committed Oct 6, 2020
1 parent a852fd2 commit b9067f7
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 29 deletions.
43 changes: 24 additions & 19 deletions sql/binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1506,8 +1506,8 @@ binlog_cache_data::flush(THD *thd, my_off_t *bytes_written, bool *wrote_xid,
error= gtid_before_write_cache(thd, this);

if (!error && enable_raft_plugin_save && !mysql_bin_log.is_apply_log) {
error= RUN_HOOK(raft_replication, before_flush,
(thd, &cache_log));
error= RUN_HOOK_STRICT(raft_replication, before_flush,
(thd, &cache_log));

DBUG_EXECUTE_IF("fail_binlog_flush_raft", {error= 1;});

Expand Down Expand Up @@ -5845,7 +5845,7 @@ int MYSQL_BIN_LOG::purge_logs(const char *to_log,
if (!included && file_index_pair.second == 0)
goto err;

error= RUN_HOOK(
error= RUN_HOOK_STRICT(
raft_replication, purge_logs, (current_thd, file_index_pair.second));

if (error)
Expand Down Expand Up @@ -6738,13 +6738,14 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock_log,
else if (config_change_rotate)
op_type= RaftReplicateMsgOpType::OP_TYPE_CHANGE_CONFIG;

error= RUN_HOOK(raft_replication, before_flush,
(current_thd, &raft_io_cache, op_type));
error= RUN_HOOK_STRICT(raft_replication, before_flush,
(current_thd, &raft_io_cache, op_type));

// time to safely readjust the cur_log_ext back to expected value
cur_log_ext++;
if (!error) {
error= RUN_HOOK(raft_replication, before_commit, (current_thd, false));
error= RUN_HOOK_STRICT(
raft_replication, before_commit, (current_thd, false));
}
}

Expand Down Expand Up @@ -6828,7 +6829,8 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock_log,
if (!error && rotate_via_raft && !no_op) {
// not trapping return code, because this is the existing
// pattern in most places of after_commit hook (TODO)
(void)RUN_HOOK(raft_replication, after_commit, (current_thd, false));
(void)RUN_HOOK_STRICT(
raft_replication, after_commit, (current_thd, false));
}

end:
Expand Down Expand Up @@ -9281,7 +9283,8 @@ MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first,
if (!enable_raft_plugin)
(void) RUN_HOOK(transaction, after_commit, (head, all));
else
error = error || RUN_HOOK(raft_replication, after_commit, (head, all));
error = error || RUN_HOOK_STRICT(
raft_replication, after_commit, (head, all));

/*
When after_commit finished for the transaction, clear the run_hooks flag.
Expand Down Expand Up @@ -9348,8 +9351,9 @@ MYSQL_BIN_LOG::process_semisync_stage_queue(THD *queue_head)
}
else
{
error= RUN_HOOK(raft_replication, before_commit,
(last_thd, last_thd->transaction.flags.real_commit));
error= RUN_HOOK_STRICT(
raft_replication, before_commit,
(last_thd, last_thd->transaction.flags.real_commit));
}
DBUG_EXECUTE_IF("simulate_before_commit_error", {error= 1;});

Expand Down Expand Up @@ -9644,13 +9648,13 @@ MYSQL_BIN_LOG::finish_commit(THD *thd, bool async)
*/
if ((thd->commit_error == THD::CE_NONE) && thd->transaction.flags.run_hooks)
{
int error = 0;
int error= 0;

// semi-sync plugin only called when raft is not enabled
if (!enable_raft_plugin)
(void) RUN_HOOK(transaction, after_commit, (thd, all));
else
error= RUN_HOOK(raft_replication, after_commit, (thd, all));
error= RUN_HOOK_STRICT(raft_replication, after_commit, (thd, all));

if (!error &&
opt_raft_signal_async_dump_threads == AFTER_ENGINE_COMMIT &&
Expand Down Expand Up @@ -9848,10 +9852,10 @@ int MYSQL_BIN_LOG::register_log_entities(THD *thd,
mysql_mutex_lock(&LOCK_log);
else
mysql_mutex_assert_owner(&LOCK_log);
int err= RUN_HOOK(raft_replication, setup_flush,
(thd, is_relay_log, &log_file, name, log_file_name,
&LOCK_log, &LOCK_index, &update_cond, &cur_log_ext,
context));
int err= RUN_HOOK_STRICT(raft_replication, setup_flush,
(thd, is_relay_log, &log_file, name, log_file_name,
&LOCK_log, &LOCK_index, &update_cond, &cur_log_ext,
context));
if (need_lock)
mysql_mutex_unlock(&LOCK_log);
return err;
Expand Down Expand Up @@ -9956,9 +9960,10 @@ int ask_server_to_register_with_raft(Raft_Registration_Item item)
s_log_dir.assign(log_dir);

THD *thd= current_thd;
err= RUN_HOOK(raft_replication, register_paths,
(thd, server_uuid, s_wal_dir, s_log_dir,
log_bin_basename, glob_hostname, (uint64_t)mysqld_port));
err= RUN_HOOK_STRICT(raft_replication, register_paths,
(thd, server_uuid, s_wal_dir, s_log_dir,
log_bin_basename, glob_hostname,
(uint64_t)mysqld_port));
break;
}
default:
Expand Down
2 changes: 1 addition & 1 deletion sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7691,7 +7691,7 @@ bool Rotate_log_event::write(IO_CACHE* file)
int Rotate_log_event::do_apply_event(Relay_log_info const *rli)
{
if (enable_raft_plugin)
return RUN_HOOK(raft_replication, after_commit, (thd, false));
return RUN_HOOK_STRICT(raft_replication, after_commit, (thd, false));
return 0;
}

Expand Down
7 changes: 5 additions & 2 deletions sql/mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2151,8 +2151,11 @@ static void close_connections(void)

static void plugin_shutdown_prework()
{
sql_print_information("Sending shutdown call to raft plugin");
RUN_HOOK(raft_replication, before_shutdown, (nullptr));
if (enable_raft_plugin)
{
sql_print_information("Sending shutdown call to raft plugin");
RUN_HOOK_STRICT(raft_replication, before_shutdown, (nullptr));
}
}

#ifdef HAVE_CLOSE_SERVER_SOCK
Expand Down
77 changes: 70 additions & 7 deletions sql/rpl_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,69 @@ void delegates_destroy()
raft_replication_delegate->~Raft_replication_delegate();
}

/*
This macro is used by raft Delegate methods to call into raft plugin
The only difference is that this is a 'stricter' version which will return
failure if the plugin hooks were not called
*/
#define FOREACH_OBSERVER_STRICT(r, f, thd, args) \
if (thd) { \
param.server_id= thd->server_id; \
param.host_or_ip= thd->security_ctx->host_or_ip; \
} \
/*
Use a struct to make sure that they are allocated adjacent, check
delete_dynamic().
*/ \
struct { \
DYNAMIC_ARRAY plugins; \
/* preallocate 8 slots */ \
plugin_ref plugins_buffer[8]; \
} s; \
DYNAMIC_ARRAY *plugins= &s.plugins; \
plugin_ref *plugins_buffer= s.plugins_buffer; \
my_init_dynamic_array2(plugins, sizeof(plugin_ref), \
plugins_buffer, 8, 8); \
read_lock(); \
Observer_info_iterator iter= observer_info_iter(); \
Observer_info *info= iter++; \
(r)= 1; /* Assume failure by default */ \
for (; info; info= iter++) \
{ \
plugin_ref plugin= \
my_plugin_lock(0, &info->plugin); \
if (!plugin) \
{ \
/* plugin is not intialized or deleted, this is an error when
* enable_raft_plugin is ON */ \
enable_raft_plugin ? (r)= 1 : (r)= 0; \
break; \
} \
insert_dynamic(plugins, &plugin); \
if (((Observer *)info->observer)->f \
&& ((Observer *)info->observer)->f args) \
{ \
(r)= 1; \
sql_print_error("Run function '" #f "' in plugin '%s' failed", \
info->plugin_int->name.str); \
break; \
} \
/* Plugin is successfully called, set return status to 0
* indicating success */ \
(r)= 0; \
} \
unlock(); \
/*
Unlock plugins should be done after we released the Delegate lock
to avoid possible deadlock when this is the last user of the
plugin, and when we unlock the plugin, it will try to
deinitialize the plugin, which will try to lock the Delegate in
order to remove the observers.
*/ \
plugin_unlock_list(0, (plugin_ref*)plugins->buffer, \
plugins->elements); \
delete_dynamic(plugins)

/*
This macro is used by almost all the Delegate methods to iterate
over all the observers running given callback function of the
Expand Down Expand Up @@ -558,7 +621,7 @@ int Raft_replication_delegate::before_flush(THD *thd, IO_CACHE* io_cache,

int ret= 0;

FOREACH_OBSERVER(ret, before_flush, thd, (&param, io_cache, op_type));
FOREACH_OBSERVER_STRICT(ret, before_flush, thd, (&param, io_cache, op_type));

DBUG_PRINT("return", ("term: %ld, index: %ld", param.term, param.index));

Expand All @@ -579,7 +642,7 @@ int Raft_replication_delegate::before_commit(THD *thd, bool all)
DBUG_PRINT("enter", ("term: %ld, index: %ld", param.term, param.index));

int ret= 0;
FOREACH_OBSERVER(ret, before_commit, thd, (&param));
FOREACH_OBSERVER_STRICT(ret, before_commit, thd, (&param));

DEBUG_SYNC(thd, "after_call_after_sync_observer");
DBUG_RETURN(ret);
Expand All @@ -598,7 +661,7 @@ int Raft_replication_delegate::setup_flush(

int ret= 0;

FOREACH_OBSERVER(ret, setup_flush, thd,
FOREACH_OBSERVER_STRICT(ret, setup_flush, thd,
(is_relay_log, log_file_cache, log_prefix, log_name,
lock_log, lock_index, update_cond, cur_log_ext, context)
);
Expand All @@ -612,7 +675,7 @@ int Raft_replication_delegate::before_shutdown(THD *thd)
int ret= 0;
Raft_replication_param param;

FOREACH_OBSERVER(ret, before_shutdown, thd, ());
FOREACH_OBSERVER_STRICT(ret, before_shutdown, thd, ());

DBUG_RETURN(ret);
}
Expand All @@ -628,7 +691,7 @@ int Raft_replication_delegate::register_paths(
int ret= 0;
Raft_replication_param param;

FOREACH_OBSERVER(ret, register_paths, thd,
FOREACH_OBSERVER_STRICT(ret, register_paths, thd,
(&raft_listener_queue, s_uuid, wal_dir_parent,
log_dir_parent, raft_log_path_prefix,
s_hostname, port));
Expand All @@ -644,7 +707,7 @@ int Raft_replication_delegate::after_commit(THD *thd, bool all)
thd->get_trans_marker(&param.term, &param.index);

int ret= 0;
FOREACH_OBSERVER(ret, after_commit, thd, (&param));
FOREACH_OBSERVER_STRICT(ret, after_commit, thd, (&param));
DBUG_RETURN(ret);
}

Expand All @@ -654,7 +717,7 @@ int Raft_replication_delegate::purge_logs(THD *thd, uint64_t file_ext)
Raft_replication_param param;
param.purge_file_ext = file_ext;
int ret= 0;
FOREACH_OBSERVER(ret, purge_logs, thd, (&param));
FOREACH_OBSERVER_STRICT(ret, purge_logs, thd, (&param));

// Set the safe purge file that was sent back by the plugin
thd->set_safe_purge_file(param.purge_file);
Expand Down
7 changes: 7 additions & 0 deletions sql/rpl_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,13 @@ extern Raft_replication_delegate *raft_replication_delegate;
(group ##_delegate->is_empty() ? \
0 : group ##_delegate->hook args)

/*
This is same as RUN_HOOK, but return 1 if there are no observers
*/
#define RUN_HOOK_STRICT(group, hook, args) \
(group ##_delegate->is_empty() ? \
1 : group ##_delegate->hook args)

#endif /* RPL_HANDLER_H */

class RaftListenerQueue : public RaftListenerQueueIf
Expand Down

0 comments on commit b9067f7

Please sign in to comment.