@@ -5245,6 +5245,53 @@ int check_temp_dir(char* tmp_file)
5245
5245
DBUG_RETURN (0 );
5246
5246
}
5247
5247
5248
+ static std::pair<ulong, ulonglong> cleanup_worker_jobs (Slave_worker *w)
5249
+ {
5250
+ ulong ii= 0 ;
5251
+ ulong current_event_index;
5252
+ ulong purge_cnt= 0 ;
5253
+ ulonglong purge_size= 0 ;
5254
+ struct slave_job_item job_item;
5255
+ std::vector<Log_event*> log_event_free_list;
5256
+
5257
+ mysql_mutex_lock (&w->jobs_lock );
5258
+
5259
+ log_event_free_list.reserve (w->jobs .avail );
5260
+
5261
+ current_event_index = std::max (w->last_current_event_index ,
5262
+ w->current_event_index );
5263
+ while (de_queue (&w->jobs , &job_item))
5264
+ {
5265
+ DBUG_ASSERT (job_item.data );
5266
+
5267
+ Log_event* log_event= static_cast <Log_event*>(job_item.data );
5268
+
5269
+ ii++;
5270
+ if (ii > current_event_index)
5271
+ {
5272
+ purge_size += log_event->data_written ;
5273
+ purge_cnt++;
5274
+ }
5275
+
5276
+ // Save the freeing for outside the mutex
5277
+ log_event_free_list.push_back (log_event);
5278
+ }
5279
+
5280
+ DBUG_ASSERT (w->jobs .len == 0 );
5281
+
5282
+ mysql_mutex_unlock (&w->jobs_lock );
5283
+
5284
+ // Do all the freeing outside the mutex since freeing causes destructors to
5285
+ // be called and some destructors acquire locks which can cause deadlock
5286
+ // scenarios if we are holding this mutex.
5287
+ for (Log_event* log_event : log_event_free_list)
5288
+ {
5289
+ delete log_event;
5290
+ }
5291
+
5292
+ // Return the number and size of the purged events
5293
+ return std::make_pair (purge_cnt, purge_size);
5294
+ }
5248
5295
/*
5249
5296
Worker thread for the parallel execution of the replication events.
5250
5297
*/
@@ -5257,9 +5304,6 @@ pthread_handler_t handle_slave_worker(void *arg)
5257
5304
Relay_log_info* rli= w->c_rli ;
5258
5305
ulong purge_cnt= 0 ;
5259
5306
ulonglong purge_size= 0 ;
5260
- ulong current_event_index = 0 ;
5261
- ulong i = 0 ;
5262
- struct slave_job_item _item, *job_item= &_item;
5263
5307
/* Buffer lifetime extends across the entire runtime of the THD handle. */
5264
5308
char proc_info_buf[256 ]= {0 };
5265
5309
@@ -5321,25 +5365,7 @@ pthread_handler_t handle_slave_worker(void *arg)
5321
5365
thd->clear_error ();
5322
5366
w->cleanup_context (thd, error);
5323
5367
5324
- mysql_mutex_lock (&w->jobs_lock );
5325
-
5326
- current_event_index = max (w->last_current_event_index ,
5327
- w->current_event_index );
5328
- while (de_queue (&w->jobs , job_item))
5329
- {
5330
- i++;
5331
- if (i > current_event_index)
5332
- {
5333
- purge_size += ((Log_event*) (job_item->data ))->data_written ;
5334
- purge_cnt++;
5335
- }
5336
- DBUG_ASSERT (job_item->data );
5337
- delete static_cast <Log_event*>(job_item->data );
5338
- }
5339
-
5340
- DBUG_ASSERT (w->jobs .len == 0 );
5341
-
5342
- mysql_mutex_unlock (&w->jobs_lock );
5368
+ std::tie (purge_cnt, purge_size)= cleanup_worker_jobs (w);
5343
5369
5344
5370
mysql_mutex_lock (&rli->pending_jobs_lock );
5345
5371
rli->pending_jobs -= purge_cnt;
0 commit comments