-
-
Notifications
You must be signed in to change notification settings - Fork 964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Courier message dequeuing race condition #1024
Conversation
This will still keep failing messages (e.g. unresolvable email address) indefinitely, right? |
Yeah, this just addresses the multiple sending issue. I admit that saying it fixes #402 is a bit misleading based on the title of that issue, but @aeneasr marked all tickets pertaining to the multiple sending issue as duplicates of #402, I think because the conversation of this issue was most thoroughly examined there, so I referenced that one here. To fix the issue of perpetually retrying unresolvable email domains, we could verify whether the domain is valid before attempting to send. I could include that fix in this PR if desired, but maybe it would be best to address that issue in a separate PR. For now, I've updated the referenced issues to be more accurate. |
a199689
to
d036865
Compare
I was failing some of these end-to-end tests non-deterministically with timeout errors. Eventually they all passed, no code changes. I filed a bug ticket here #1026 |
d036865
to
40e02b8
Compare
40e02b8
to
7fc3ec7
Compare
Thank you, this already looks great! However, I fear this won't solve the issue. Looking at the code, there is a transaction which queries and updates the message row. Unfortunately this can't prevent two nodes sending the same email because this does not prevent a second reader to read at the same time as another reader. The transaction does not lock the table - there are several transaction isolation levels available but I am not sure if we should put a hard lock here, as the mail queue could be frequented quite often. What I suggest doing instead is to have a worker node which is a singleton instance (e.g. runs as a background job) that processes the mail queue. That way, we don't have concurrency issues. To make things a bit easier, we could add a flag What do you think? |
Setting the isolation level per transaction is possible: https://github.com/gobuffalo/pop/blob/5a8f51c37336105490d5c27c25ddf255886ec5d8/db.go#L22-L24 |
Thanks for getting back. While having a singleton job to process mail would trivially eliminate the race condition issue, my fear is this won’t scale at high load. At work, we’re planning on having instances spread across multiple regions and availability zones, with an expected 100 requests per second per region to begin with, and having only 1 job processing the mail queue would create a major bottleneck. We’ve all had that experience of waiting around for a verification email to come in, and it’s an unpleasant UX. I agree that enforcing a hard DB-level isolation level requirement is not ideal. I can think of a few alternatives:
With all available context, I would probably lean towards option 1, what do you think? |
Great find, @zepatrik, I think this would be the way. |
Damn, always those people who don't add comments to stuff they do (not me, never 😅 ) |
Haha I caught that it was you a little late and tried to edit my comment in time. Only mentioned it to explain why I hadn’t been able to find it when I looked 😅. Nice work. |
Np, pop is missing a lot of codedoc in general... I just always work with the code itself |
Thank you for your thoughts. The problem you describe (long wait times for mail delivery) is not that a single worker is processing a mail queue, but usually stems from rate limits on upstream mail servers. Golang is very fast and can easily handle 5000 concurrent HTTP/SMTP clients on a small sized machine which will be more than enough for what you have in mind. The only problem is failover, but this should be handled by your job scheduler (i.e. Kubernetes).
This will have the inverse effect of what you want to achieve - high throughput. Increasing isolation levels can lead to table locking, which means that writes will constantly be blocked when a mail queue is being processed. This is a no-go approach.
A StatefulSet will scale worse and have more issues (e.g. using NFS) than processing the data in a single node. It is much harder to scale concurrent FS writes than it is to optimize a simple SQL
I don't think that throwing a queue system on top is the right approach for the moment. It might make sense if we encounter some serious issues, but as I said above, 100 requests per second
I am open for more ideas but so far I am convinced that a simple job processor is absolutely enough. |
Thanks for your timely and thorough responses, as always. Golang is certainly fast, but this sending of emails to the mail server strikes me is an IO-bound workload; Golang's execution speed would be dwarfed by the network speed. The emails are fetched from the DB 10 at a time, and then sent to the mail-server consecutively, and synchronously. You mentioned Golang's capacity for handling concurrent HTTP/SMTP clients, and I think this, along with a few other options, could be combined to improve the efficiency of the mail sending:
These three changes in combination would mitigate the latency users would experience before receiving an email in the single-worker situation. Even with multiple workers, they would improve latency and leverage the efficiency of Golang's runtime, as you alluded to. However, even with this latency mitigated, my concern would be having a single point of failure for all email sending.
I can't speak to other orchestrators, but (to the best of my knowledge/experience) Kubernetes failover relies on replication to eliminate single points of failure, so that if the node which a given process is running on fails, there are replicas running on other nodes which can handle requests while the failed node is replaced. In a single-worker setup, if the pod containing the worker were to be evicted, or its node were to fail, there'd be no fallback to send email in its place, resulting in potentially minutes of outage in email sending while the pod containing the email worker is rescheduled elsewhere.
In the most restrictive case, full table locking, the table would only be locked while a courier worker fetches its next batch of emails to send, since the transaction ends after fetching the emails and updating their status to "Processing". While the worker is actually processing and sending the emails, the table would be open to dequeued by other workers. We wouldn't even need to dive into isolation level semantics; all of the supported DBs support
It might be enough in terms of latency if we were to add some concurrency to the email delivery, but as mentioned, I would still be worried about building a single point of failure into an otherwise-highly available architecture. Eager to hear your thoughts and do let me know if I've made any oversights/am under any misapprehensions. |
Thank you for your thoughtful response.
These are all things we could and should do! I think the DB can easily handle fetching thousands of records too. It's really a question of average bandwidth - we want to avoid handling over e.g. 10MB per query as that usually causes issues. Most DBs are configured to serve less MB per query afaik - it is however configurable for most. There a projects such as rakyll/hey which can easily create thousands of concurrent clients for load testing. Only thing that limits is the bandwidth, which should usually be 1GBit/s-10GBit/s for cluster internal networking.
Right, I should not have said failover. Failover make sense when you have a system which accepts requests/calls and you want that system to be reliable. Then you have a second process (or third) which takes over if another one fails (usually SEGFAULT / pod eviction / ...). Job scheduling does not work like that. A job has a state (scheduled, running, ran to completion, error). All job managers are able to restart the job if a certain condition is met (e.g. error). You don't need failover, you need something that makes sure that there is always a job running. Distributing jobs only makes sense if you have things such as massive datasets processed by e.g. map/reduce. We don't have that. All we do is take data from (a) and convert it to a TCP call. This is fast and unless we're talking GB of data per second it will never fail to perform IMO. If the job truly fails (e.g. SEGFAULT, OOM, node or pod evicted), the job scheduler is responsible for restarting the job. This usually happens within a milliseconds unless the cluster state is messed up (e.g. no nodes available) which would be a a different problem though. I hope this clarifies things, and I am still open to be convinced otherwise, but I will need to see benchmarks proving that we have an issue at hand!
If the worker queries the table every second, you have a full table lock every second. I can forecast with 99% confidence that this will be a very serious bottleneck - much more serious than the proposed solution.
I hope I was able to clarify why it is not a single point of failure, and I think your plan to have several goroutines handling the workload is a really good one! |
Thanks for the response. Makes sense to me. Having one worker scheduled as a job which fetches emails and then dispatches their processing to concurrent goroutines seems like the optimal solution to me as well, in light of everything. I think it makes sense to leave the transaction in place, to make the fetching and status update atomic, (let me know if you disagree there) but I will bolster this PR to add the concurrent message sending, disable execution of the courier as a background worker, and add the CLI call to execute the courier as a foreground worker. We'll also need to convey this in the docs, as users will need to add a new Job resource to their k8s config. For now, maybe the easiest way would just be to add this worker to the quickstart config. I figure there's always room to add more detail to the documentation in the future, but what do you think the minimal update would need to be in order to get the feature in? As I see it, we'll at least need:
I can see potential future improvements such as allowing the user to pass arguments to the CLI command to set concurrency limits and batch sizes, and these would necessitate supporting documentation, but this configurability probably wouldn't be necessary for the MVP. Let me know your thoughts and I will get to work! |
In my opinion, we should either have a flag
Documenting the Cobra CLI interface (see examples in
I agree - looking very much forward to this feature! |
Gotcha, sounds good. Thanks again! |
Hey @aeneasr, my colleague had another idea that I wanted to run by you:
This way, we'd eliminate the race condition while being able to support multiple background workers that transparently scale up along with our Kratos instances. What are your thoughts on this? This would still leave us open to migrating the background worker to a foreground Job if desired. I'm intending to move forward with adding some concurrency to the courier's sending either way. |
I thought about a similar idea, the problem with a fixed worker ID would be that you have to replace a failing one with an exact copy. Also the number of workers is quite fixed, e.g. if there are just too many messages you can't spin up a view more workers and clean up the queue as messages are already assigned. Instead, I would suggest to use consistent hashing. Each message gets a UUID (or value by some other means) and each worker is responsible for a specific range. Adding a new one just affects a single one and not all of them. Also, you would be able to just schedule a bunch more and they will start working immediately. The question remains how the workers themselves look up which range they are responsible for, and if this is not too costly in it self to implement. But I think adding every worker to the config by URL should be a feasible solution for that. We have config hot reloading implemented already, and each node needs to read the config anyway. |
Good idea RE using consistent hashing. I think that would be a better approach than fixed worker IDs. At this point though, I think it's worth asking whether our efforts would be best spent building out our own queuing solution or Certainly, whether we continue having the courier running as a background worker, or provide the option to break it out into its own foreground job, it seems undesirable to be limited to only being able to run one instance. We can increase its efficiency through concurrency, but at a certain scale, there's only so many emails one instance can process a second, right? We could scale by dynamically adding more and more goroutines to a single instance running on a beefed up node, but why not offload the scaling to Kubernetes? Ex scenario: a massively anticipated site or video game has a "countdown to launch", and when it hits zero, tens or hundreds of thousands of people try to sign up all at once, each of them enqueuing a verification email, each of them expecting to receive their verification email near-instantaneously. Can we be assured that a singular instance will be able to meet that demand? I'm not sure 🤷 |
Okay, I'm hashing (no pun intended) out a potential distributed consistent hashing solution which I'm pretty excited about. It would provide: without needing to integrate a queue, designate a master instance or be limited to a singleton job. I'll provide a proposal tomorrow and see what you guys think. |
I have read through all the comments - thank you! I am really excited for the enthusiasm in this PR and for the engineering ingenuity shown here. While I don't want to reduce the excitement and exploration, I want us to take a step back, look at the original problem and approach it with an easy and robust solution.
It probably will - the problem will not be the worker, it will be the email delivery. Mass spikes in email can lead to landing on a gmail spam list (for example). Again, the 99,999% problems here will not be the db -> email translation but will be somewhere else - from laggish SMTP servers (and their queues!) to network issues. It will be other SMTP servers that are offline. It will be lost somewhere in the network. Email delivery is not reliable. It is slow, things go to spam. The problem with email delivery is the email delivery process, not telling your SMTP server to send it. Your customers will not receive your mail, because their server rejected it, because it landed in spam, or because they entered an incorrect email address. You probably have not received a login or verification email and had to resend it. Like SMS codes for example. The problem, again, is delivery. Even if you have so much mail that 2 CPUs and 4GB of RAM are being 100% utilized. I think this will be an insane number of emails (hundreds of millions maybe billions)! For example, we have a single worker on 2 CPUs and 2GB RAM that processes our 18 Billion API requests we handle every month with heavy data processing and some parallelization. That's about 2TB or more of data per month. We could add 30 more cores if we wanted to, but we don't need to! We could also add 10 workers and use map-reduce but it would probably be slower and much more expensive than scaling up to more cores and more IOPS because of the inherent overhead of distributed systems. Adding 2 CPU node to a distributed system takes away a combined 0.1 CPU for managing the node (don't quote me on that ;) ). Vertical scaling is really good, cheap (in both implementation and overhead), and can be used for this exact use case with very simple means. And again, I don't think you could ever need 32 cores to process a mail queue.
The problem with distributed algorithms such as several hashing algorithms is one of discovery. As proposed here, the shared cluster state comes from a shared filesystem. However, we still need to find the node's URL and can not use any load balancing intermediary. I don't even know if that's possible on Kubernetes! Having a shared filesystem which is written by many is incredibly hard. Sure, we have things like NFS, and it works 99% of the time, but 1% of 365 days is still 3.6 days. Healing, resiliant, and available cluster-shared state is much more complex! And yes, there are indeed battle-tested algorithms! For example Raft (e.g. implemented by etcd, cockroachdb) which does leader election required for managing distributed state. There are also memcached and other k/v distributed stores. But do we really need to add e.g. ZooKeeper or etcd as dependencies because of a hypothetical scalability problem? I really want to urge to reconsider the complexity of the solution. What we need is something simple, something that will work well, is easy to test, easy to run, easy to configure, easy to understand. If the simple and obvious solution does not work, because it really does not perform well (which I still doubt!) and if it can not further be optimized (e.g. adding more CPU, adding more parallelism, more RAM), then we need to talk about more complex scenarios such as leader election protocols, shared cluster state (do we use Apache Zookeeper? etcd? memcached? groupcache?) and more. The problem we are facing today is not one of scale, it is the bug linked in the OP. Let's solve that! We will save a lot of time by not addressing hypothetical issues with premature optimisation and we can still implement something better if this fails! If scalability is an issue, I will be the first one to encourage and lead the discussion and implementation! |
Thanks for getting back and for your very thorough responses. You’re right, premature optimization is the root of all evil, as they say. I will proceed with splitting the mail worker out to a single foreground worker as discussed. At work we’ll actually be going into production with roughly this; we’ve got SMTP disabled on all but 1 instance, so it will be a good proof of concept. If we run into issues of scale, like you said, we can address them then. And I’ll continue exploring that distributed consistent hashing idea as a side project ;) Thanks a bunch! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! :)
@vinckr please put @mattbonnell on the list for the next mailout :)
From my side this is good to merge! Any objections? |
None from my side! |
Awesome, thanks @mattbonnell ! |
BREACKING CHANGES: This patch moves the courier watcher (responsible for sending mail) to its own foreground worker, which can be executed as a, for example, Kubernetes job. It is still possible to have the previous behaviour which would run the worker as a background task when running `kratos serve` by using the `--watch-courier` flag. To run the foreground worker, use `kratos courier watch -c your/config.yaml`. Closes #1033 Closes #1024 Co-authored-by: Matt Bonnell <mbonnell@wish.com>
@aeneasr @mattbonnell Built off of |
I think that should be enough |
Related issue
#402#652, #732Proposed changes
Fixes the courier message dequeuing race condition by modifying
*sql.Persister.NextMessages(ctx context.Context, limit uint8)
to retrieve only messages with statusMessageStatusQueued
and update the status of the retrieved messages toMessageStatusProcessing
within a transaction. On message send failure, the message's status is reset toMessageStatusQueued
, so that the message can be dequeued in a subsequentNextMessages
call. On message send success, the status is updated toMessageStatusSent
(no change there).Checklist
vulnerability. If this pull request addresses a security. vulnerability, I
confirm that I got green light (please contact
security@ory.sh) from the maintainers to push
the changes.
works.
Further comments
I updated the existing message pulling test to demonstrate that messages won't be dequeued twice.
Caveat: This does necessitate the DB having an isolation level equivalent to repeatable reads or better. MySQL's default isolation level is repeatable read, CockroachDB's is serializable, so these two will be free from race conditions by default. PostgreSQL's default isolation level is read committed, so it would still be susceptible to this race condition if the isolation level were not updated. sqlx supports setting per-transaction isolation levels natively,
but pop doesn't expose this functionality.pop does support per-transaction isolation levels as pointed out by @zepatrik, so no need to enforce some DB-level config.