@@ -63,6 +63,10 @@ slave_ignored_err_throttle(window_size,
6363
6464#include "sql_digest.h"
6565
66+ #include <boost/property_tree/ptree.hpp>
67+ #include <boost/property_tree/json_parser.hpp>
68+ #include <boost/algorithm/string.hpp>
69+
6670using std::min;
6771using std::max;
6872
@@ -3153,8 +3157,15 @@ Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli)
31533157 if (is_relay_log_event())
31543158 ptr_group->ts= 0;
31553159 else
3160+ {
31563161 ptr_group->ts=
31573162 when.tv_sec + (time_t) exec_time; // Seconds_behind_master related
3163+ ptr_group->ts_millis= (rli->group_timestamp_millis ?
3164+ rli->group_timestamp_millis : when.tv_sec * 1000)
3165+ + exec_time * 1000;
3166+ // reset for next group
3167+ rli->group_timestamp_millis= 0;
3168+ }
31583169 rli->checkpoint_seqno++;
31593170 /*
31603171 Coordinator should not use the main memroot however its not
@@ -3405,8 +3416,17 @@ void Log_event::do_post_end_event(Relay_log_info *rli, Log_event_wrapper *ev)
34053416 rli->checkpoint_seqno++;
34063417
34073418 // seconds_behind_master related
3408- if (is_relay_log_event ()) ptr_group->ts = 0 ;
3409- else ptr_group->ts = when.tv_sec + (time_t ) exec_time;
3419+ if (is_relay_log_event())
3420+ ptr_group->ts= 0;
3421+ else
3422+ {
3423+ ptr_group->ts= when.tv_sec + (time_t) exec_time;
3424+ ptr_group->ts_millis= (rli->group_timestamp_millis ?
3425+ rli->group_timestamp_millis : when.tv_sec * 1000)
3426+ + exec_time * 1000;
3427+ // reset for next group
3428+ rli->group_timestamp_millis= 0;
3429+ }
34103430}
34113431
34123432/**
@@ -3700,6 +3720,16 @@ int Log_event::apply_event(Relay_log_info *rli)
37003720 worker= NULL;
37013721 rli->mts_group_status= Relay_log_info::MTS_IN_GROUP;
37023722
3723+ // milli-sec behind master related for MTS
3724+ // case: this event contains trx metadata, so save the ts in msec for the grp
3725+ if (opt_binlog_trx_meta_data &&
3726+ get_type_code() == ROWS_QUERY_LOG_EVENT &&
3727+ static_cast<Rows_query_log_event*>(this)->has_trx_meta_data())
3728+ {
3729+ rli->group_timestamp_millis=
3730+ static_cast<Rows_query_log_event*>(this)->extract_last_timestamp();
3731+ }
3732+
37033733 if (!opt_mts_dependency_replication)
37043734 worker= (Relay_log_info*)
37053735 (rli->last_assigned_worker= get_slave_worker(rli));
@@ -7636,7 +7666,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
76367666 (ulong) rli->get_group_master_log_pos()));
76377667 mysql_mutex_unlock(&rli->data_lock);
76387668 if (rli->is_parallel_exec())
7639- rli->reset_notified_checkpoint (0 , 0 ,
7669+ rli->reset_notified_checkpoint(0, 0, 0,
76407670 true/*need_data_lock=true*/);
76417671
76427672 /*
@@ -14793,6 +14823,25 @@ Rows_query_log_event::write_data_body(IO_CACHE *file)
1479314823 (uint) strlen(m_rows_query)));
1479414824}
1479514825
14826+ inline ulonglong Rows_query_log_event::extract_last_timestamp() const
14827+ {
14828+ boost::property_tree::ptree pt;
14829+ std::string json= extract_trx_meta_data();
14830+ if (json.empty())
14831+ return 0;
14832+ std::istringstream is(json);
14833+ try {
14834+ read_json(is, pt);
14835+ } catch (const std::exception& e) {
14836+ sql_print_error("Error while parsing metadata JSON for timestamps");
14837+ return 0;
14838+ }
14839+
14840+ auto timestamps= pt.get_child("timestamps", boost::property_tree::ptree());
14841+ return timestamps.empty() ? 0 :
14842+ timestamps.back().second.get_value<ulonglong>();
14843+ }
14844+
1479614845#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
1479714846int Rows_query_log_event::do_apply_event(Relay_log_info const *rli)
1479814847{
0 commit comments