-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Problem using async=True #46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Hmmm, this looks like some issue with multiprocessing init in Windows. It is a good bug to catch. I guess we have to take care of platform specific issues w.r.t windows (as documented at http://docs.python.org/2/library/multiprocessing.html#windows) I will work on it and let you know. |
Thankyou - happy to test something at the appropriate time if it helps |
Looked into it further. Looks like in Windows, we cannot have methods as 'target' for Process.init() |
async=False is fine for now. My deployment target is likely to be Linux On 26 September 2013 08:39, Mahendra M notifications@github.com wrote:
|
Have made a tentative fix. Cannot try it since I do not have windows. https://github.com/mahendra/kafka-python/tree/windows |
Unfortunately it's still not going:
|
@sweetcheeks24 also had this issue on Windows. I'm marking as wontfix since I'm not really sure there's anything we can do about it. If you're on Windows, don't use async I guess :-/ |
Checked in detail. this can be fixed, but we will hold on it till the zookeeper branch is checked in. Some of those patches will be required for it. |
@mumrah - we have to be careful of this feature. If multiprocessing is a problem in windows, things like MultiProcessConsumer will not work. We need to address this issue. |
@watchforstock can you give my branch one more try. I have tried to ensure that the following feature will work in windows
Give a quick check. If things work, I will send a merge request to @mumrah |
Looks like I'm still getting the same error unfortunately about not being able to pickle method recvfrom_into of _socket.socket |
oh! ok. let me look into it. a trace would be helpful. i will try and find myself a windows box for debugging. If I can't, I will keep bothering you.. :-) |
I was able to get around the issue by switching multiprocessing.Process to threading.Thread. I am not sure of the overall impact that may have as I am only using the producer in my efforts for integrating with tcollector. (I should also add that I have not actually successfully produced anything since I am troubleshooting other tcollector/windows/python compatibility issues). |
Thread will not exactly solve the problem. This module was for folks who did not want to use threads. I have new feature adding driver support to kafka, where you can switch between gevent, thread and process. Coming up soon. Meanwhile, I figured out this problem (I hope). The issue is that the socket object is not pickle-able. So, I have written some hacks for it and pushed it to my "windows" branch. Could you give it one more try please? Sorry about this. The branch is available here: |
Not sure if it is just my setup, but I got this: Process Process-1: |
Thanks for your efforts - I'll give it a try once I'm back in the office on Thanks On 4 October 2013 17:07, Mahendra M notifications@github.com wrote:
|
Actually, it was my mistake. I had not tested it thoroughly. Have fixed it. Do test it once more when you have time. Will try testing it On Sat, Oct 5, 2013 at 12:51 AM, sweetcheeks24 notifications@github.comwrote:
Mahendra |
The error has changed now so definitely making progress:
It appears that although the code runs, no messages are actually making it to the Kafka broker |
I have done some testing from my side and made a pull request for this #61. @watchforstock if you confirm that it works in windows, I will merge the ticket. |
Ensure that async producer works in windows. Fixes #46
As per the multiprocessing module's documentation, the objects passed to the Process() class must be pickle-able in Windows. So, the Async producer did not work in windows. To fix this we have to ensure that code which uses multiprocessing has to follow certain rules * The target=func should not be a member function * We cannot pass objects like socket() to multiprocessing This ticket fixes these issues. For KafkaClient and KafkaConnection objects, we make copies of the object and reinit() them inside the child processes.
HI , my Producer is import atexit import kafka.io class Producer(kafka.io.IO):
""" PRODUCE_REQUEST_ID = kafka.request_type.PRODUCE def init(self, topic, partition=3, host='localhost', port=9092, max_message_sz=104): def _pack_payload(self, messages):
def _pack_kafka_message(self, payload): def encode_request(self, messages):
def send(self, messages): @contextlib.contextmanager class BatchProducer(Producer):
""" MAX_RESPAWNS = 5 def init(self, topic, batch_interval, partition=3, host='localhost', port=9092): def check_timer(self): def _interval_timer(self): def enqueue(self, message):
def flush(self): def close(self): |
Hi,
I'm having trouble using an asynchronous producer. I'm running the latest copy of the code from github.
I have a simple script which exhibits the problem:
The log output is as follows:
The same code works if I change async to False. This is running on Python 2.7.3 on Windows against Kafka 0.8.0. Any suggestions gratefully received
The text was updated successfully, but these errors were encountered: