Skip to content

Commit

Permalink
Display row information in processlist for rbr threads
Browse files Browse the repository at this point in the history
Summary: Set row_query using the fields unpacked from the row event. Use row_query in show processlist.

Test Plan: mtr

Reviewers: tianx

Reviewed By: tianx

Subscribers: webscalesql-eng, ebergen

Differential Revision: https://reviews.facebook.net/D61833
  • Loading branch information
santoshbanda committed Oct 14, 2016
1 parent 2306b6e commit 564824a
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 9 deletions.
3 changes: 3 additions & 0 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12016,6 +12016,9 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
if (thd->slave_thread)
free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
}
mysql_mutex_lock(&thd->LOCK_thd_data);
thd->row_query.clear();
mysql_mutex_unlock(&thd->LOCK_thd_data);
DBUG_RETURN(error);

err:
Expand Down
35 changes: 33 additions & 2 deletions sql/log_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -4346,11 +4346,42 @@ class Rows_log_event : public Log_event
int unpack_current_row(const Relay_log_info *const rli,
MY_BITMAP const *cols)
{
mysql_mutex_lock(&thd->LOCK_thd_data);
DBUG_ASSERT(m_table);
// For a update event we should not clear the query when unpacking after
// image. This can be checked using the cols parameter.
if (cols != &m_cols_ai)
thd->row_query.clear();

Log_event_type type = get_type_code();
if (type == WRITE_ROWS_EVENT || type == WRITE_ROWS_EVENT_V1)
thd->row_query = "INSERT INTO ";
else if (type == DELETE_ROWS_EVENT || type == DELETE_ROWS_EVENT_V1)
thd->row_query = "DELETE FROM ";
else
{
DBUG_ASSERT(type == UPDATE_ROWS_EVENT || type == UPDATE_ROWS_EVENT_V1);
if (cols != &m_cols_ai)
{
thd->row_query = "UPDATE ";
}
}

if (cols != &m_cols_ai)
{
thd->row_query.append(m_table->s->db.str, m_table->s->db.length);
thd->row_query.append(".");
thd->row_query.append(m_table->s->table_name.str,
m_table->s->table_name.length);
thd->row_query.append(" ");
}

ASSERT_OR_RETURN_ERROR(m_curr_row <= m_rows_end, HA_ERR_CORRUPT_EVENT);
return ::unpack_row(rli, m_table, m_width, m_curr_row, cols,
&m_curr_row_end, &m_master_reclength, m_rows_end);
int res = ::unpack_row(rli, m_table, m_width, m_curr_row, cols,
&m_curr_row_end, &m_master_reclength, m_rows_end,
&thd->row_query);
mysql_mutex_unlock(&thd->LOCK_thd_data);
return res;
}

/*
Expand Down
52 changes: 49 additions & 3 deletions sql/rpl_record.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,41 @@ pack_row(TABLE *table, MY_BITMAP const* cols,
#endif

#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
static void insert_row_fields(std::string* row_query, TABLE* table)
{
static uint32_t max_field_len= 100;
row_query->append("(");
String buf;
#ifndef DBUG_OFF
uint index= 0;
#endif
for (Field **ptr=table->field; *ptr; ptr++)
{
#ifndef DBUG_OFF
// Set the read bit to avoid assertions in val_str().
bool flip= false;
if (table->read_set && !bitmap_is_set(table->read_set, index))
{
flip= true;
bitmap_flip_bit(table->read_set, index);
}
#endif
String *s = (*ptr)->val_str(&buf, &buf);
if (!s)
row_query->append("NULL");
else
row_query->append(s->ptr(), min(max_field_len, s->length()));
row_query->append(",");
#ifndef DBUG_OFF
if (flip)
bitmap_flip_bit(table->read_set, index);
++index;
#endif
}
if (row_query->back() == ',')
row_query->back() = ')';
}

/**
Unpack a row into @c table->record[0].
Expand All @@ -181,6 +216,8 @@ pack_row(TABLE *table, MY_BITMAP const* cols,
@param row_end
Pointer to variable that will hold the value of the
end position for the data in the row event
@param row_query
Append the fields to this string
@retval 0 No error
Expand All @@ -195,7 +232,8 @@ int unpack_row_with_column_info(TABLE *table, uint const colcnt,
ulong *const master_reclength,
uchar const *const row_end,
table_def* tabledef,
TABLE *conv_table)
TABLE *conv_table,
std::string* row_query)
{
DBUG_ENTER("unpack_row_with_column_info");
uchar const *null_bits= row_data;
Expand Down Expand Up @@ -273,6 +311,9 @@ int unpack_row_with_column_info(TABLE *table, uint const colcnt,
}
++null_bit_index;
}

insert_row_fields(row_query, table);

*current_row_end = pack_ptr;
if (master_reclength)
{
Expand Down Expand Up @@ -318,6 +359,9 @@ int unpack_row_with_column_info(TABLE *table, uint const colcnt,
Pointer to variable that will hold the value of the
end position for the data in the row event
@param row_query
Append the fields to this string
@retval 0 No error
@retval HA_ERR_GENERIC
Expand All @@ -328,7 +372,7 @@ unpack_row(Relay_log_info const *rli,
TABLE *table, uint const colcnt,
uchar const *const row_data, MY_BITMAP const *cols,
uchar const **const current_row_end, ulong *const master_reclength,
uchar const *const row_end)
uchar const *const row_end, std::string* row_query)
{
DBUG_ENTER("unpack_row");
DBUG_ASSERT(row_data);
Expand Down Expand Up @@ -382,7 +426,8 @@ unpack_row(Relay_log_info const *rli,
{
DBUG_RETURN(unpack_row_with_column_info(table, colcnt, row_data, cols,
current_row_end, master_reclength,
row_end, tabledef, conv_table));
row_end, tabledef, conv_table,
row_query));
}

for (field_ptr= begin_ptr ; field_ptr < end_ptr && *field_ptr ; ++field_ptr)
Expand Down Expand Up @@ -568,6 +613,7 @@ unpack_row(Relay_log_info const *rli,
}
}

insert_row_fields(row_query, table);
/*
We should now have read all the null bytes, otherwise something is
really wrong.
Expand Down
2 changes: 1 addition & 1 deletion sql/rpl_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ int unpack_row(Relay_log_info const *rli,
TABLE *table, uint const colcnt,
uchar const *const row_data, MY_BITMAP const *cols,
uchar const **const curr_row_end, ulong *const master_reclength,
uchar const *const row_end);
uchar const *const row_end, std::string* row_query= nullptr);

// Fill table's record[0] with default values.
int prepare_record(TABLE *const table, const MY_BITMAP *cols, const bool check);
Expand Down
2 changes: 2 additions & 0 deletions sql/sql_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -4612,6 +4612,8 @@ class THD :public MDL_context_owner,

// propagate value from rli
bool skip_unique_check();
// protected by LOCK_thd_data
std::string row_query;
};

/**
Expand Down
17 changes: 14 additions & 3 deletions sql/sql_show.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2197,10 +2197,21 @@ static void set_thread_info_common(thread_info *thd_info, THD *thd, THD *tmp,
thd_info->rows_examined= tmp->get_examined_row_count();
thd_info->rows_sent= tmp->get_sent_row_count();
/* Lock THD mutex that protects its data when looking at it. */
if (tmp->query())
const char* query = NULL;
uint length = 0;
if (!tmp->row_query.empty())
{
query = tmp->row_query.c_str();
length = tmp->row_query.length();
}
else if (tmp->query())
{
uint length= min<uint>(max_query_length, tmp->query_length());
char *q= thd->strmake(tmp->query(),length);
query = tmp->query();
length = tmp->query_length();
}
if (query) {
length= min<uint>(max_query_length, length);
char *q= thd->strmake(query,length);
/* Safety: in case strmake failed, we set length to 0. */
thd_info->query_string=
CSET_STRING(q, q ? length : 0, tmp->query_charset());
Expand Down

0 comments on commit 564824a

Please sign in to comment.