@@ -174,19 +174,7 @@ _release_xid_data(_PyCrossInterpreterData *data, int ignoreexc)
174
174
}
175
175
int res = _PyCrossInterpreterData_Release (data );
176
176
if (res < 0 ) {
177
- // XXX Fix this!
178
- /* The owning interpreter is already destroyed.
179
- * Ideally, this shouldn't ever happen. When an interpreter is
180
- * about to be destroyed, we should clear out all of its objects
181
- * from every channel associated with that interpreter.
182
- * For now we hack around that to resolve refleaks, by decref'ing
183
- * the released object here, even if its the wrong interpreter.
184
- * The owning interpreter has already been destroyed
185
- * so we should be okay, especially since the currently
186
- * shareable types are all very basic, with no GC.
187
- * That said, it becomes much messier once interpreters
188
- * no longer share a GIL, so this needs to be fixed before then. */
189
- _PyCrossInterpreterData_Clear (NULL , data );
177
+ /* The owning interpreter is already destroyed. */
190
178
if (ignoreexc ) {
191
179
// XXX Emit a warning?
192
180
PyErr_Clear ();
@@ -489,6 +477,30 @@ _channelqueue_get(_channelqueue *queue)
489
477
return _channelitem_popped (item );
490
478
}
491
479
480
+ static void
481
+ _channelqueue_drop_interpreter (_channelqueue * queue , int64_t interp )
482
+ {
483
+ _channelitem * prev = NULL ;
484
+ _channelitem * next = queue -> first ;
485
+ while (next != NULL ) {
486
+ _channelitem * item = next ;
487
+ next = item -> next ;
488
+ if (item -> data -> interp == interp ) {
489
+ if (prev == NULL ) {
490
+ queue -> first = item -> next ;
491
+ }
492
+ else {
493
+ prev -> next = item -> next ;
494
+ }
495
+ _channelitem_free (item );
496
+ queue -> count -= 1 ;
497
+ }
498
+ else {
499
+ prev = item ;
500
+ }
501
+ }
502
+ }
503
+
492
504
/* channel-interpreter associations */
493
505
494
506
struct _channelend ;
@@ -693,6 +705,20 @@ _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
693
705
return 0 ;
694
706
}
695
707
708
+ static void
709
+ _channelends_drop_interpreter (_channelends * ends , int64_t interp )
710
+ {
711
+ _channelend * end ;
712
+ end = _channelend_find (ends -> send , interp , NULL );
713
+ if (end != NULL ) {
714
+ _channelends_close_end (ends , end , 1 );
715
+ }
716
+ end = _channelend_find (ends -> recv , interp , NULL );
717
+ if (end != NULL ) {
718
+ _channelends_close_end (ends , end , 0 );
719
+ }
720
+ }
721
+
696
722
static void
697
723
_channelends_close_all (_channelends * ends , int which , int force )
698
724
{
@@ -841,6 +867,18 @@ _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
841
867
return res ;
842
868
}
843
869
870
+ static void
871
+ _channel_drop_interpreter (_PyChannelState * chan , int64_t interp )
872
+ {
873
+ PyThread_acquire_lock (chan -> mutex , WAIT_LOCK );
874
+
875
+ _channelqueue_drop_interpreter (chan -> queue , interp );
876
+ _channelends_drop_interpreter (chan -> ends , interp );
877
+ chan -> open = _channelends_is_open (chan -> ends );
878
+
879
+ PyThread_release_lock (chan -> mutex );
880
+ }
881
+
844
882
static int
845
883
_channel_close_all (_PyChannelState * chan , int end , int force )
846
884
{
@@ -1213,6 +1251,21 @@ _channels_list_all(_channels *channels, int64_t *count)
1213
1251
return cids ;
1214
1252
}
1215
1253
1254
+ static void
1255
+ _channels_drop_interpreter (_channels * channels , int64_t interp )
1256
+ {
1257
+ PyThread_acquire_lock (channels -> mutex , WAIT_LOCK );
1258
+
1259
+ _channelref * ref = channels -> head ;
1260
+ for (; ref != NULL ; ref = ref -> next ) {
1261
+ if (ref -> chan != NULL ) {
1262
+ _channel_drop_interpreter (ref -> chan , interp );
1263
+ }
1264
+ }
1265
+
1266
+ PyThread_release_lock (channels -> mutex );
1267
+ }
1268
+
1216
1269
/* support for closing non-empty channels */
1217
1270
1218
1271
struct _channel_closing {
@@ -1932,6 +1985,19 @@ _global_channels(void) {
1932
1985
}
1933
1986
1934
1987
1988
+ static void
1989
+ clear_interpreter (void * data )
1990
+ {
1991
+ if (_globals .module_count == 0 ) {
1992
+ return ;
1993
+ }
1994
+ PyInterpreterState * interp = (PyInterpreterState * )data ;
1995
+ assert (interp == _get_current_interp ());
1996
+ int64_t id = PyInterpreterState_GetID (interp );
1997
+ _channels_drop_interpreter (& _globals .channels , id );
1998
+ }
1999
+
2000
+
1935
2001
static PyObject *
1936
2002
channel_create (PyObject * self , PyObject * Py_UNUSED (ignored ))
1937
2003
{
@@ -2339,6 +2405,10 @@ module_exec(PyObject *mod)
2339
2405
goto error ;
2340
2406
}
2341
2407
2408
+ // Make sure chnnels drop objects owned by this interpreter
2409
+ PyInterpreterState * interp = _get_current_interp ();
2410
+ _Py_AtExit (interp , clear_interpreter , (void * )interp );
2411
+
2342
2412
return 0 ;
2343
2413
2344
2414
error :
0 commit comments