Skip to content

Producer not thread-safe #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
drice opened this issue Jul 21, 2016 · 9 comments
Closed

Producer not thread-safe #19

drice opened this issue Jul 21, 2016 · 9 comments
Labels

Comments

@drice
Copy link

drice commented Jul 21, 2016

I am receiving these errors frequently:

Fatal Python error: PyEval_RestoreThread: NULL tstate
Aborted (core dumped)

As well as

Fatal Python error: ceval: tstate mix-up
Aborted (core dumped)

Both cause core dumps.

I see one potential solution is to include PyEval_InitThreads(); before using threads, which I will try.

@drice
Copy link
Author

drice commented Jul 21, 2016

I'm on ubuntu 15.10 running Python 2.7.10

@edenhill
Copy link
Contributor

edenhill commented Aug 4, 2016

Hi @tbsf,

did inclusion of PyEval_InitThreads() fix the issue?

If not, can you provide a small script to reproduce the issue?

Thanks

@drice
Copy link
Author

drice commented Aug 4, 2016

No, It didn't fix it. I have a single thread reading from a kafka topic and writing out to other topics. It runs for a few seconds and crashes.

Here is the backtrace from one of the errors if it helps:

Fatal Python error: PyEval_RestoreThread: NULL tstate

Program received signal SIGABRT, Aborted.
[Switching to Thread 0x7fff9b7fe700 (LWP 10875)]
0x00007ffff71171c7 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:55
55 ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0 0x00007ffff71171c7 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:55
#1 0x00007ffff7118e2a in __GI_abort () at abort.c:89
#2 0x00000000004e2010 in Py_FatalError (msg=msg@entry=0x55ce90 "PyEval_RestoreThread: NULL tstate") at Python/pythonrun.c:1700
#3 0x00000000004af53a in PyEval_RestoreThread (tstate=0x0) at Python/ceval.c:355
#4 0x00007ffff22e69d9 in Producer_poll0 (tmout=500, self=0x7fffef09b890) at confluent_kafka/src/Producer.c:322
#5 Producer_flush (self=0x7fffef09b890, ignore=) at confluent_kafka/src/Producer.c:354
#6 0x00000000004b7ee3 in call_function (oparg=, pp_stack=0x7fff9b7fd420) at Python/ceval.c:4334
#7 PyEval_EvalFrameEx (f=f@entry=0x7fffec5f0bf0, throwflag=throwflag@entry=0) at Python/ceval.c:2987
#8 0x00000000004b8f6c in PyEval_EvalCodeEx (co=, globals=, locals=locals@entry=0x0, args=, argcount=argcount@entry=2, kws=kws@entry=0x7fffac001cd0, kwcount=2, defs=0x7fffef0b1a40, defcount=2, closure=0x0)
at Python/ceval.c:3582
#9 0x00000000004b7017 in fast_function (nk=, na=2, n=, pp_stack=0x7fff9b7fd5f0, func=0x7fffef0c1938) at Python/ceval.c:4445
#10 call_function (oparg=, pp_stack=0x7fff9b7fd5f0) at Python/ceval.c:4370
#11 PyEval_EvalFrameEx (f=f@entry=0x7fffac001a30, throwflag=throwflag@entry=0) at Python/ceval.c:2987
#12 0x00000000004b8f6c in PyEval_EvalCodeEx (co=, globals=, locals=locals@entry=0x0, args=args@entry=0x7fffec3632f0, argcount=2, kws=kws@entry=0x7ffff7fa7068, kwcount=0, defs=0x0, defcount=0, closure=0x0) at Python/ceval.c:3582
#13 0x000000000052c63e in function_call (func=0x7fffef0c3320, arg=0x7fffec3632d8, kw=0x7fffec37fb40) at Objects/funcobject.c:523
#14 0x000000000042300a in PyObject_Call (func=func@entry=0x7fffef0c3320, arg=arg@entry=0x7fffec3632d8, kw=kw@entry=0x7fffec37fb40) at Objects/abstract.c:2546
#15 0x00000000004b16e6 in ext_do_call (nk=, na=, flags=, pp_stack=0x7fff9b7fd868, func=0x7fffef0c3320) at Python/ceval.c:4664
#16 PyEval_EvalFrameEx (f=f@entry=0x7fffef057de0, throwflag=throwflag@entry=0) at Python/ceval.c:3026
#17 0x00000000004b714c in fast_function (nk=, na=, n=1, pp_stack=0x7fff9b7fd970, func=0x7fffef0c1578) at Python/ceval.c:4435
#18 call_function (oparg=, pp_stack=0x7fff9b7fd970) at Python/ceval.c:4370
#19 PyEval_EvalFrameEx (f=f@entry=0x7fff8c000910, throwflag=throwflag@entry=0) at Python/ceval.c:2987
#20 0x00000000004b714c in fast_function (nk=, na=, n=1, pp_stack=0x7fff9b7fda80, func=0x7ffff7e5fb90) at Python/ceval.c:4435
#21 call_function (oparg=, pp_stack=0x7fff9b7fda80) at Python/ceval.c:4370
#22 PyEval_EvalFrameEx (f=f@entry=0x7fffef04dc90, throwflag=throwflag@entry=0) at Python/ceval.c:2987
#23 0x00000000004b8f6c in PyEval_EvalCodeEx (co=, globals=, locals=locals@entry=0x0, args=args@entry=0x7ffff5f046a8, argcount=1, kws=kws@entry=0x0, kwcount=0, defs=0x0, defcount=0, closure=0x0) at Python/ceval.c:3582
#24 0x000000000052c55c in function_call (func=0x7ffff7e5faa0, arg=0x7ffff5f04690, kw=0x0) at Objects/funcobject.c:523
#25 0x000000000042300a in PyObject_Call (func=func@entry=0x7ffff7e5faa0, arg=arg@entry=0x7ffff5f04690, kw=kw@entry=0x0) at Objects/abstract.c:2546
#26 0x0000000000429e5c in instancemethod_call (func=0x7ffff7e5faa0, arg=0x7ffff5f04690, kw=0x0) at Objects/classobject.c:2602
#27 0x000000000042300a in PyObject_Call (func=func@entry=0x7fffef0aba00, arg=arg@entry=0x7ffff7fa7050, kw=) at Objects/abstract.c:2546
#28 0x00000000004afb97 in PyEval_CallObjectWithKeywords (func=0x7fffef0aba00, arg=0x7ffff7fa7050, kw=) at Python/ceval.c:4219
#29 0x00000000004faff2 in t_bootstrap (boot_raw=0x12925b0) at ./Modules/threadmodule.c:620
#30 0x00007ffff7bc26aa in start_thread (arg=0x7fff9b7fe700) at pthread_create.c:333
#31 0x00007ffff71e913d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109

This the backtrace for the other message:

Fatal Python error: ceval: tstate mix-up

Program received signal SIGABRT, Aborted.
[Switching to Thread 0x7fff60ff9700 (LWP 11729)]
0x00007ffff71171c7 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:55
55 ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0 0x00007ffff71171c7 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:55
#1 0x00007ffff7118e2a in __GI_abort () at abort.c:89
#2 0x00000000004e2010 in Py_FatalError (msg=msg@entry=0x55d5a4 "ceval: tstate mix-up") at Python/pythonrun.c:1700
#3 0x00000000004b0346 in PyEval_EvalFrameEx (f=f@entry=0x7fff80001c00, throwflag=throwflag@entry=0) at Python/ceval.c:1114
#4 0x00000000004b8f6c in PyEval_EvalCodeEx (co=, globals=, locals=locals@entry=0x0, args=args@entry=0x7fffec608848, argcount=2, kws=kws@entry=0x7ffff7fa7068, kwcount=0, defs=0x0, defcount=0, closure=0x0) at Python/ceval.c:3582
#5 0x000000000052c63e in function_call (func=0x7fffef0c3320, arg=0x7fffec608830, kw=0x7fffec61ec58) at Objects/funcobject.c:523
#6 0x000000000042300a in PyObject_Call (func=func@entry=0x7fffef0c3320, arg=arg@entry=0x7fffec608830, kw=kw@entry=0x7fffec61ec58) at Objects/abstract.c:2546
#7 0x00000000004b16e6 in ext_do_call (nk=, na=, flags=, pp_stack=0x7fff60ff8868, func=0x7fffef0c3320) at Python/ceval.c:4664
#8 PyEval_EvalFrameEx (f=f@entry=0x7fffef071a00, throwflag=throwflag@entry=0) at Python/ceval.c:3026
#9 0x00000000004b714c in fast_function (nk=, na=, n=1, pp_stack=0x7fff60ff8970, func=0x7fffef0c1578) at Python/ceval.c:4435
#10 call_function (oparg=, pp_stack=0x7fff60ff8970) at Python/ceval.c:4370
#11 PyEval_EvalFrameEx (f=f@entry=0x7fff80000df0, throwflag=throwflag@entry=0) at Python/ceval.c:2987
#12 0x00000000004b714c in fast_function (nk=, na=, n=1, pp_stack=0x7fff60ff8a80, func=0x7ffff7e5fb90) at Python/ceval.c:4435
#13 call_function (oparg=, pp_stack=0x7fff60ff8a80) at Python/ceval.c:4370
#14 PyEval_EvalFrameEx (f=f@entry=0x7fffef06ae50, throwflag=throwflag@entry=0) at Python/ceval.c:2987
#15 0x00000000004b8f6c in PyEval_EvalCodeEx (co=, globals=, locals=locals@entry=0x0, args=args@entry=0x7fffef06d068, argcount=1, kws=kws@entry=0x0, kwcount=0, defs=0x0, defcount=0, closure=0x0) at Python/ceval.c:3582
#16 0x000000000052c55c in function_call (func=0x7ffff7e5faa0, arg=0x7fffef06d050, kw=0x0) at Objects/funcobject.c:523
#17 0x000000000042300a in PyObject_Call (func=func@entry=0x7ffff7e5faa0, arg=arg@entry=0x7fffef06d050, kw=kw@entry=0x0) at Objects/abstract.c:2546
#18 0x0000000000429e5c in instancemethod_call (func=0x7ffff7e5faa0, arg=0x7fffef06d050, kw=0x0) at Objects/classobject.c:2602
#19 0x000000000042300a in PyObject_Call (func=func@entry=0x7fffef0b3af0, arg=arg@entry=0x7ffff7fa7050, kw=) at Objects/abstract.c:2546
#20 0x00000000004afb97 in PyEval_CallObjectWithKeywords (func=0x7fffef0b3af0, arg=0x7ffff7fa7050, kw=) at Python/ceval.c:4219
#21 0x00000000004faff2 in t_bootstrap (boot_raw=0x10d8e00) at ./Modules/threadmodule.c:620
#22 0x00007ffff7bc26aa in start_thread (arg=0x7fff60ff9700) at pthread_create.c:333
#23 0x00007ffff71e913d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109`

@drice
Copy link
Author

drice commented Aug 4, 2016

I'm using something like:

producer.produce(topic, value=value)
producer.flush()

I think producer.flush() is what is causing the error.

@edenhill
Copy link
Contributor

edenhill commented Aug 5, 2016

I tried reproducing this with the following snippet, but it works:

def test_flush():
    p = Producer({'bootstrap.servers':'localhost'})

    for n in range(1, 5):
        p.produce('mytopic', value='somedata %d' % n)
        p.flush()

Can you reproduce it with that snippet?

If not, what configuration are you passing to the Producer?

@drice
Copy link
Author

drice commented Aug 5, 2016

I have a thread pool sharing a single producer, so it would be more like this (excuse the bad code, it may hang):

import threading
import time
import Queue
import traceback
from workers import ConfluentKafkaProducer as Producer

PRODUCER = Producer()
STOP_THREADS = threading.Event()
QUEUE = Queue.Queue()

def run():
    while not STOP_THREADS.is_set():
        try:
            item = QUEUE.get()
            PRODUCER.send('test', value=item)
        except:
            time.sleep(0.5)


def send(num_messages):
    for ii in range(0, num_messages):
        message = {}
        QUEUE.put(message)

if __name__ == '__main__':
    try:
        threads = []
        for ii in range(0, 100):
            thread = threading.Thread(target=run)
            thread.start()
        send(1000)
    except: 
        STOP_THREADS.set()
        traceback.print_exc()
        for thread in threads:
            thread.join()

If I put a lock around producer.flush, it doesn't crash.

@drice
Copy link
Author

drice commented Aug 5, 2016

This is the class CondfluentKafkaProducer:

class ConfluentKafkaProducer(object):
    def __init__(self, *args, **kwargs):
        kwargs = dict(CONFLUENT_KAFKA_PRODUCER_DEFAULTS.items() + kwargs.items())
        if 'value.serializer' in kwargs:
            self.value_serializer = kwargs.get('value.serializer')
            del kwargs['value.serializer']
        self.producer = confluent_kafka.Producer(**kwargs)

    def send(self, topic, key=None, value=None):
        if hasattr(self, 'value_serializer'):
            value = self.value_serializer(value)
        self.producer.produce(topic, value=value)
        self.producer.flush()

@drice
Copy link
Author

drice commented Aug 5, 2016

by the way, is there a way to get the partition information in the producer so I can write my own partitioner?

@edenhill
Copy link
Contributor

edenhill commented Aug 8, 2016

It looks like your code would call flush() from separate threads, which is probably the issue here since
the python client is currently not completely thread-safe, flush() and poll() in particular.

This will be fixed.

@edenhill edenhill added the bug label Aug 8, 2016
@edenhill edenhill changed the title PyEval_RestoreThread: NULL tstate causes core dump Producer not thread-safe Aug 8, 2016
dtheodor pushed a commit to dtheodor/confluent-kafka-python that referenced this issue Sep 4, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants