Skip to content

Fix auto-commit issues with multi-threading #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 10, 2013

Conversation

mahendra
Copy link
Collaborator

@mahendra mahendra commented Jun 3, 2013

This issue fixes the auto-commit threading issue that is there in the Kafka library. I thought of multiple approaches, but this simple approach seems to solve the problem.

Design considerations

  • The solution must not involve locking, since this can impact performance
  • The solution must work in the fastest manner when threads are not used (for commit). For eg: adding a lock will be a detriment when auto_commit_every_t is set to None and the code is essentially single threaded.
  • The code should not be overtly complex.

Approaches considered

  • Using a thread safe data structure like a dictionary to match correlation_id to the data
  • To avoid thread issues, we can use an Event() to notify the thread of the data when it is available.
  • Use a separate thread/process to send and fetch the data (way too complex).

However the base problem is that the recv operation is not atomic for a message. We first fetch 4 bytes and check the size of the message. We then fetch the remaining message based on this. This code is not re-entrant. Also reading the message is not in a single request. It is possible that both threads can read different bits of a message.

At this point, the simplest solution seems to allow both the threads to use their own sockets. So, we can get into creating multiple sockets etc.

A simpler solution to achieve this is to make the KafkaConnection object a thread-local object. This way, it is initialized for each thread that is created.

@mumrah
Copy link
Collaborator

mumrah commented Jun 8, 2013

@mahendra separating send from recv might have been a mistake on my part.

What we want is one socket connection per broker, and a way to serialize requests for a given broker. The correlation aspect of Kafka APIs is nice, and would allow us to take advantage of pipelining/multiplexing requests, but it would take some effort to code it up.

One of the goals I have for this library is to be very lightweight. I'm afraid making the KafkaConnection thread-local would result in too many socket connections.

Here is a strawman proposal for serializing requests with threading https://gist.github.com/mumrah/5733579. Ideally, there is a global connection manager/pool that KafkaClient can use.

I'm probably being overly cautious here, so feel free to make a case against the threaded approach.

@sandello
Copy link
Contributor

sandello commented Jun 8, 2013

@mumrah Can you explain the rationale behind "one socket connection per broken"? This would add extra implementation complexity yet the benefits are quite unclear for me right now. Talking about "too many socket connections", don't this is a real issue: Kafka is designed with multiple clients in mind, so it should scale to at least 10K connections. And at that scale you would have other problems to care. :)

@mumrah
Copy link
Collaborator

mumrah commented Jun 8, 2013

@sandello I'm not so concerned with Kafka's ability to maintain the socket connections, but rather on the Python's. After thinking about it more, I'm inclined to just say "not thread safe" for the current implementation, and leave it up to users to synchronize where they need to (i.e., wrap the consumer iterator in a Queue).

Figuring out multithreaded/async producing is easy, but with consumers is a bit more tricky. Ideally, we'd have one consumer thread/process per partition that has a single socket connection to Kafka. This is the main reason I want to separate user created threads from the library's threads. Basically, we should just copy the design of the Scala client :)

I'll work on some changes this next week, assuming I can find some time.

@mumrah
Copy link
Collaborator

mumrah commented Jun 9, 2013

Here's an API-compatible version of KafkaConnection that is thread-safe

https://gist.github.com/mumrah/961228b501e915c4779c

@mahendra
Copy link
Collaborator Author

@mumrah While working on this ticket, I had considered using Event() and synchronizing the data as well. The reason I did not go for that was for simplicity.

First design point that I considered - Python threads are bad and best avoided.

In your approach, you are creating threads with queues and using events to synchronize the data flow. Even when the user does not want to use timed-auto-commit, we will end up creating threads for sending and receiving data. We are adding unwanted complexity for the cases of manual-commits and auto-commit-after-n-msgs.

(I had developed another approach using a thread lock and a dict[correlation_id] to fix this problem, but again I did not want to penalize the single threaded guys)

By using thread local, we are just wasting some extra memory and creating one more socket. A python client will not have a lot of consumer instances. Each consumer instance will have two open sockets. There is some memory overhead in having this as well as OS overhead in keeping an extra socket (which is not much in modern day OS's). I believe this trade-off is better than making the whole library multi-threaded.

I agree that using thread local is not an ideal solution. We need to design something better, but until then we can use some extra memory without penalizing single threaded clients. We also need to see if we can use other asynchronous frameworks like gevent to avoid the timer thread for auto-commit-after-x-secs cases.

@mumrah
Copy link
Collaborator

mumrah commented Jun 10, 2013

First design point that I considered - Python threads are bad and best avoided.

Yes, true in most cases.

In your approach, you are creating threads with queues and using events to synchronize the data flow. Even when the user does not want to use timed-auto-commit, we will end up creating threads for sending and receiving data. We are adding unwanted complexity for the cases of manual-commits and auto-commit-after-n-msgs.

Good point. The current consumer was not really designed with multithreading in mind which was really just an oversight on my part.

I agree that using thread local is not an ideal solution. We need to design something better, but until then we can use some extra memory without penalizing single threaded clients. We also need to see if we can use other asynchronous frameworks like gevent to avoid the timer thread for auto-commit-after-x-secs cases.

I came across https://github.com/benoitc/socketpool, which has a pluggable backend (including gevent). I have not looked at it in detail to see if it really addresses our needs.

mumrah added a commit that referenced this pull request Jun 10, 2013
Fix auto-commit issues with multi-threading
@mumrah mumrah merged commit 77b8301 into dpkp:master Jun 10, 2013
@mahendra
Copy link
Collaborator Author

@mumrah thanks for merging. socketpool looks interesting. Will have a look and see if we can use it.

@mahendra mahendra deleted the threading branch June 10, 2013 13:06
@mumrah
Copy link
Collaborator

mumrah commented Jun 10, 2013

@mahendra So I was thinking about making the KafkaClient more stateless and thread/mp friendly. However, there needs to be code somewhere that resolves the topic+partition -> leader -> broker -> socket connection map. I've kept these things intentionally decoupled so that we could handle leader changes and broker disconnects (a TODO). This means KafkaClient needs to be stateful and cannot easily be thread-safe.

I think, for safety, it's best to assert that one KafkaClient goes with exactly one consumer. I'll think about this some more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants