Skip to content

Commit

Permalink
Adding previous_raft_opid in previous HLC metadata event
Browse files Browse the repository at this point in the history
Summary:
In order to not have to traverse the previous
file to find the last opid, we start carrying over the last
opid of the previous file into the new file. This opid will
be the opid of the rotate event for binlogs.

For Relay logs/follower logs, the rotation opid is passed
in rotate_opid to rotation function.
For binlogs, we have the opid from the before_commit phase

Reviewed By: bhatvinay

Differential Revision: D22500383

fbshipit-source-id: fbc8285cd84
  • Loading branch information
anirbanr-fb authored and facebook-github-bot committed Mar 13, 2021
1 parent 623780a commit c607a5a
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 4 deletions.
15 changes: 15 additions & 0 deletions sql/binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4898,6 +4898,13 @@ bool MYSQL_BIN_LOG::open_binlog(const char *log_name,
current_hlc= mysql_bin_log.get_current_hlc();
}
Metadata_log_event metadata_ev(current_hlc);
if (raft_rotate_info)
{
metadata_ev.set_raft_prev_opid(
raft_rotate_info->rotate_opid.first,
raft_rotate_info->rotate_opid.second);
}

if (metadata_ev.write(&log_file))
goto err;
bytes_written+= metadata_ev.data_written;
Expand Down Expand Up @@ -7120,6 +7127,14 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock_log,
my_error(ER_RAFT_FILE_ROTATION_FAILED, MYF(0), 1);
goto end;
}
if (!error)
{
// Store the rotate op_id in the raft_rotate_info for
// open_binlog to use
int64_t r_term, r_index;
current_thd->get_trans_marker(&r_term, &r_index);
raft_rotate_info->rotate_opid= std::make_pair(r_term, r_index);
}
}

// Need flush before updating binlog_end_pos, otherwise dump thread
Expand Down
78 changes: 77 additions & 1 deletion sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16365,6 +16365,12 @@ Metadata_log_event::Metadata_log_event(
set_hlc_time(hlc_time_ns);
}

Metadata_log_event::Metadata_log_event()
: Log_event(Log_event::EVENT_NO_CACHE,
Log_event::EVENT_IMMEDIATE_LOGGING)
{
}

Metadata_log_event::Metadata_log_event(uint64_t prev_hlc_time_ns)
: Log_event(Log_event::EVENT_NO_CACHE,
Log_event::EVENT_IMMEDIATE_LOGGING)
Expand Down Expand Up @@ -16488,6 +16494,17 @@ void Metadata_log_event::set_raft_str(const std::string& raft_str)
(ENCODED_TYPE_SIZE + ENCODED_LENGTH_SIZE + raft_str_.size());
}

void Metadata_log_event::set_raft_prev_opid(int64_t term, int64_t index)
{
prev_raft_term_= term;
prev_raft_index_= index;

set_exist(Metadata_log_event_types::RAFT_PREV_OPID_TYPE);
// Update the size of the event when it gets serialized into the stream.
size_ +=
(ENCODED_TYPE_SIZE + ENCODED_LENGTH_SIZE + ENCODED_RAFT_PREV_OPID_SIZE);
}

int64_t Metadata_log_event::get_raft_term() const
{
return raft_term_;
Expand All @@ -16503,6 +16520,16 @@ const std::string& Metadata_log_event::get_raft_str() const
return raft_str_;
}

int64_t Metadata_log_event::get_raft_prev_opid_term() const
{
return prev_raft_term_;
}

int64_t Metadata_log_event::get_raft_prev_opid_index() const
{
return prev_raft_index_;
}

uint Metadata_log_event::read_type(
Metadata_log_event_types type, char const* buffer)
{
Expand All @@ -16513,6 +16540,7 @@ uint Metadata_log_event::read_type(
uint value_length= uint2korr(buffer);
int64_t term= -1, index= -1;
std::string generic_str;
int64_t prev_term= -1, prev_index= -1;

switch (type)
{
Expand All @@ -16536,6 +16564,13 @@ uint Metadata_log_event::read_type(
generic_str.assign(buffer + ENCODED_LENGTH_SIZE, value_length);
set_raft_str(generic_str);
break;
case MLET::RAFT_PREV_OPID_TYPE:
DBUG_ASSERT(value_length == ENCODED_RAFT_PREV_OPID_SIZE);
prev_term= uint8korr(buffer + ENCODED_LENGTH_SIZE);
prev_index= uint8korr(buffer + ENCODED_LENGTH_SIZE
+ sizeof(prev_raft_term_));
set_raft_prev_opid(prev_term, prev_index);
break;
default:
// This is a event which we do not know about. Just skip this
size_ += (ENCODED_TYPE_SIZE + ENCODED_LENGTH_SIZE + value_length);
Expand Down Expand Up @@ -16568,6 +16603,9 @@ bool Metadata_log_event::write_data_body(IO_CACHE *file)
if (write_raft_str(file))
DBUG_RETURN(1);

if (write_raft_prev_opid(file))
DBUG_RETURN(1);

DBUG_RETURN(0);
}

Expand Down Expand Up @@ -16675,6 +16713,36 @@ bool Metadata_log_event::write_raft_str(IO_CACHE* file)
DBUG_RETURN(ret);
}

bool Metadata_log_event::write_raft_prev_opid(IO_CACHE* file)
{
DBUG_ENTER("Metadata_log_event::write_raft_prev_opid");

if (!does_exist(Metadata_log_event_types::RAFT_PREV_OPID_TYPE))
DBUG_RETURN(0); /* No need to write term and index */

char buffer[ENCODED_RAFT_PREV_OPID_SIZE];
char* ptr_buffer= buffer;

if (write_type_and_length(
file,
Metadata_log_event_types::RAFT_PREV_OPID_TYPE,
sizeof(prev_raft_term_) + sizeof(prev_raft_index_)))
{
DBUG_RETURN(1);
}

int8store(ptr_buffer, prev_raft_term_);
ptr_buffer+= sizeof(prev_raft_term_);

int8store(ptr_buffer, prev_raft_index_);
ptr_buffer+= sizeof(prev_raft_index_);

DBUG_ASSERT(ptr_buffer == (buffer + sizeof(buffer)));

bool ret= wrapper_my_b_safe_write(file, (uchar *) buffer, sizeof(buffer));
DBUG_RETURN(ret);
}

bool Metadata_log_event::write_type_and_length(
IO_CACHE* file, Metadata_log_event_types type, uint32_t length)
{
Expand Down Expand Up @@ -16764,6 +16832,10 @@ Metadata_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
", Raft Index: " + std::to_string(raft_index_));
if (does_exist(Metadata_log_event_types::RAFT_GENERIC_STR_TYPE))
buffer.append("\t Raft string: '" + raft_str_ + "'");
if (does_exist(Metadata_log_event_types::RAFT_PREV_OPID_TYPE))
buffer.append(
"\tRaft Prev OPID term: " + std::to_string(prev_raft_term_) +
", Raft Prev OPID Index: " + std::to_string(prev_raft_index_));


print_header(head, print_event_info, FALSE);
Expand Down Expand Up @@ -16808,11 +16880,15 @@ Log_event::enum_skip_reason Metadata_log_event::do_shall_skip(
{
/*
* Metadata event containing previous hlc timestamp has no meaning for slave.
* hence slave should skip such events
* hence slave should skip such events.
* Same with raft previous opid types
*/
if (does_exist(Metadata_log_event_types::PREV_HLC_TYPE))
return Log_event::EVENT_SKIP_IGNORE;

if (does_exist(Metadata_log_event_types::RAFT_PREV_OPID_TYPE))
return Log_event::EVENT_SKIP_IGNORE;

/*
* A metadata event not in the context of a transaction
* can be skipped as it is for a rotate/no-op event. Do this only if MTS is
Expand Down
56 changes: 53 additions & 3 deletions sql/log_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -5300,6 +5300,13 @@ class Metadata_log_event : public Log_event
*/
Metadata_log_event(THD *thd_arg, bool using_trans, uint64_t hlc_time_ns);

/**
* Use this constructor to create a Metadata Log Event which
* will have multiple type's from below and you will use multiple
* set operations to populate the guts.
*/
Metadata_log_event();

/**
* Create a new metadata event which contains Previous HLC. Previous HLC is
* max HLC that could have been potentially stored in all the previous binlog
Expand Down Expand Up @@ -5418,12 +5425,26 @@ class Metadata_log_event : public Log_event
*/
const std::string& get_raft_str() const;

/**
* Get raft previous opid term
*
* @return prev_raft_term if present. -1 otherwise
*/
int64_t get_raft_prev_opid_term() const;

/**
* Get raft previous opid index
*
* @return prev_raft_index if present. -1 otherwise
*/
int64_t get_raft_prev_opid_index() const;

/**
* Set raft_term and raft_index and update internal state needed later to
* write this to stream
*
* @param term - Raft term to sert
* @param index - Raft index to sert
* @param term - Raft term to set
* @param index - Raft index to set
*/
void set_raft_term_and_index(int64_t term, int64_t index);

Expand All @@ -5434,6 +5455,15 @@ class Metadata_log_event : public Log_event
*/
void set_raft_str(const std::string& str);

/**
* Set previous file's last raft_term and raft_index, i.e.
* the opid of the rotate event to the metadata event.
*
* @param term - Raft term to set
* @param index - Raft index to set
*/
void set_raft_prev_opid(int64_t term, int64_t index);

/**
* The spec for different 'types' supported by this event
*/
Expand All @@ -5452,6 +5482,8 @@ class Metadata_log_event : public Log_event
RAFT_TERM_INDEX_TYPE= 2,
/* Config added by raft consensus plugin */
RAFT_GENERIC_STR_TYPE= 3,
/* Raft term and index for the last file*/
RAFT_PREV_OPID_TYPE= 4,
METADATA_EVENT_TYPE_MAX,
};

Expand Down Expand Up @@ -5497,7 +5529,7 @@ class Metadata_log_event : public Log_event
bool write_prev_hlc_time(IO_CACHE* file);

/**
* Write praft term and index to file
* Write raft term and index to file
*
* @param file - file to write into
*
Expand All @@ -5514,6 +5546,15 @@ class Metadata_log_event : public Log_event
*/
bool write_raft_str(IO_CACHE* file);

/**
* Write previous opid term and index to file
*
* @param file - file to write into
*
* @returns - 0 on success, 1 on false
*/
bool write_raft_prev_opid(IO_CACHE* file);

/**
* Write type and length to file
*
Expand Down Expand Up @@ -5553,6 +5594,15 @@ class Metadata_log_event : public Log_event
*/
std::string raft_str_;

/* Previous (term, index). Provided and interpreted by raft consensus plugin.
* Since rotate events are consensus sync point, prev term and prev index is
* committed
* The type corresponding to this is RAFT_PREV_OPID_TYPE */
int64_t prev_raft_term_= -1;
int64_t prev_raft_index_= -1;
static const uint32_t ENCODED_RAFT_PREV_OPID_SIZE=
sizeof(prev_raft_term_) + sizeof(prev_raft_index_);

/* Total size of this event when encoded into the stream */
uint32_t size_= 0;

Expand Down
1 change: 1 addition & 0 deletions sql/raft_listener_queue_if.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class RaftListenerCallbackArg
std::pair<std::string, unsigned int> master_instance;
std::string val_str;
std::map<std::string, unsigned int> val_sys_var_uint;
std::pair<int64_t, int64_t> val_opid;
};

/* Result of the callback execution in the server. This will be set in the
Expand Down
1 change: 1 addition & 0 deletions sql/rpl_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,7 @@ pthread_handler_t process_raft_queue(void *arg)
raft_rotate_info.noop= flags & RaftListenerQueue::RAFT_FLAGS_NOOP;
raft_rotate_info.post_append= flags &
RaftListenerQueue::RAFT_FLAGS_POSTAPPEND;
// raft_rotate_info.rotate_opid= element.arg.val_opid;
result.error= rotate_relay_log_for_raft(&raft_rotate_info);
#endif
break;
Expand Down
5 changes: 5 additions & 0 deletions sql/rpl_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,9 @@ struct RaftRotateInfo {
// rotation to be initiated by server to get consensus on a
// config change (add/remove/modify of the ring)
bool config_change_rotate= false;
// This is the opid of the rotate event which is either
// passed in by plugin or obtained from before_flush.
// During rotation of raft logs, this is put into Metadata event
// as previous opid
std::pair<int64_t, int64_t> rotate_opid= std::make_pair(0,0);
};

0 comments on commit c607a5a

Please sign in to comment.