-
Notifications
You must be signed in to change notification settings - Fork 416
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Client hanging on unexpected Channel closures #2610
Conversation
Upon some further investigation, I'm beginning to suspect that this is related to how ZPool handles the "minimum" number of items in the pool Without disabling the connection pool, using
By x=5 it's more or less guaranteed to be flaky |
I don't think this is correct regarding |
I think you just need to use the live clock. |
@adamgfraser my bad you're right regarding Note that disabling the connection pool or setting a dynamic one with a lower bound of 0 or 1 in the code below works though import zio.*
import zio.http.*
import zio.http.netty.NettyConfig
import zio.http.netty.client.NettyClientDriver
object Main extends ZIOAppDefault {
private val untrusted = URL.decode("https://untrusted-root.badssl.com/").toOption.get
override def run = {
ZIO.foreach(1 to 20) { _ =>
ZIO.scoped[Client](Client.request(Request.get(untrusted))).exit.debug
}
}.provide(
ZLayer.succeed(ZClient.Config.default.ssl(sslConfig)),
Client.customized,
NettyClientDriver.live,
DnsResolver.default,
ZLayer.succeed(NettyConfig.default),
)
val sslConfig = ClientSSLConfig.FromTrustStoreResource(
trustStorePath = "truststore.jks",
trustStorePassword = "changeit",
)
} |
@kyri-petrou Can we create a reproducer that doesn't involve ZIO HTTP? I think we need to resolve the underlying issue versus disabling connection pooling. |
Agree 👍 |
Ok this took me some time (and a lot of banging my head on the wall) but I managed to isolate the issue. The problem is that if a ZPool has a minimum size requirement, and the effect used to create the resource fails, then the The very interesting thing in this case is that initially the effect that creates the channel succeeds - it's only after some time that it starts failing. The reason for it is that netty connects to the host lazily, so during the initial creation of the pool the effect succeeds. Once we start invalidating entries however and need to create a new Channel, Netty (likely due to caching) immediately returns an error, and then the client hangs cause it's not able to obtain a channel from the ZPool. I can provide a reproducer tomorrow demonstrating how Zpool hangs when the resource effect fails. PS: I have a fix in mind that doesn't require changes to ZPool - I'll update the PR tomorrow. However I do think it's probably good if ZPool didn't indefinitely hang if it's unable to obtain the minimum amount of resources |
Nice investigation, thank you! |
Thanks! Will take a look! |
I'm not sure this is correct. I created the following to reproduce the circumstances you describe. We create a pool with a minimum size and a resource that succeeds the first thousand times it is created but fails thereafter. We then get items from the pool a hundred thousand times, invalidating each one that is successfully created. This program completes normally. It is obviously hard to prove a negative so maybe there is some other logic path that is creating an issue. But in general the pool maintains a minimum number of items, whether successful or not, to avoid exactly this issue. So definitely looking forward to seeing the reproducer. 😄 import zio._
sealed trait Item {
def isValid: Boolean
}
object Item {
def make: ZIO[Any, String, Item] =
ZIO.suspendSucceed {
if (counter.incrementAndGet() > 1000) ZIO.fail("Couldn't create item")
else ZIO.succeed(new Item { def isValid = false })
}
private val counter = new java.util.concurrent.atomic.AtomicInteger(0)
}
object Example extends ZIOAppDefault {
def invalidate(pool: ZPool[String, Item], item: Item): ZIO[Any, Nothing, Unit] =
if (item.isValid) ZIO.unit
else pool.invalidate(item)
def worker(pool: ZPool[String, Item]): ZIO[Any, String, Unit] =
ZIO.scoped {
pool.get.flatMap { item =>
invalidate(pool, item)
}
}
val run =
for {
pool <- ZPool.make(Item.make, 2 to 10, 1.second)
_ <- ZIO.sleep(1.second)
_ <- ZIO.foreach(1 to 100000)(n => worker(pool).exit.debug(n.toString))
} yield ()
} |
@adamgfraser first of all, apologies for sending you down the wrong track. Seems that I was wrong on the cause of the connection pool hanging. Turns out it was caused by us manually invalidating the entries via the forked effect, which meant that entries kept invalidating themselves after being put in the pool. I updated the PR that fixes the issue in these ways:
Please let me know what you think. While the PR is fixes the issue it's a bit rough around the edges so I'm happy to hear any recommendations you might have. |
Codecov ReportAll modified and coverable lines are covered by tests ✅
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## main #2610 +/- ##
==========================================
+ Coverage 64.76% 64.87% +0.11%
==========================================
Files 140 140
Lines 8488 8487 -1
Branches 1574 1567 -7
==========================================
+ Hits 5497 5506 +9
+ Misses 2991 2981 -10 ☔ View full report in Codecov by Sentry. |
@kyri-petrou So I think the issue is more fundamental here. We need to invalidate the entries, or have confirmed that we do not need to invalidate them because we successfully cleaned them up, before we return them to the pool. Otherwise we are just creating a race condition when someone else uses them. |
I think we could do that by adding a finalizer to the EDIT: @adamgfraser I added the finalizer to the |
So I think we need to think about why the connection is invalid. If the connection becomes invalid because of something we did with it when we used it before then we need to check for that before returning it to the pool. If we check the connection was successfully cleaned up after we used it and invalidate it if not then another user of the pool should never observe an invalid connection because of our prior use. If the connection becomes invalid just because connections randomly become invalid at some frequency then that is a problem that the pool cannot solve. Even if we checked whether the connection was valid one moment it could become invalid the next. So if that is a possibility it has to be build into the use of the connection that it may essentially become invalid at any point and the user has to handle that, likely by invalidating the connection, returning it to the pool, and retrying. |
This is the case we're facing here. There are a couple of reason that the connection might become invalid while in the pool, where we have no control over it:
This is what I had in mind when adding the check during |
Okay, so I think we have a situation where one part of the system is expecting guarantees that another part of the system does not and cannot provide. Given what you said above it is not an accurate assumption that "any connection you get from the pool is valid" and in fact even if you get a connection and determine it is valid it may become invalid by the time you try to do anything with it. So the code that uses the connection needs to fail if the connection is invalid instead of suspending indefinitely. |
I agree. This is the purpose of this addition. PS: I wasn't suggesting that this needs to be handled by ZPool in its entirety. I was only suggesting that it might be a good if it provided some guarantees that what it'll be retrieved from it is valid. Nonetheless, ZClient still needs to be robust enough to handle cases that the connection is dropped across the whole lifecycle of the request |
Right. I guess what I'm saying is that guarantee is not possible in the presence of nondeterministic failures because even if an item was valid immediately before being gotten it could be invalid by the time that it is gotten. So the best you can do is get an item and then check whether it is valid, though even then it could become invalid immediately after you check so you still have to handle the possibility of failure afterwards. You could also have some other process that periodically gets items and invalidates them if necessary, though that would also be at most probabilistic. Anyway, I think you're on the right track making the use of the connection robust to the connection becoming invalid. |
if (channel.isOpen) ZIO.succeed(channel) | ||
else invalidate(channel) *> ZIO.fail(None) | ||
} | ||
.retry(retrySchedule(minSize(key) + 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it makes sense to have this logic because we already know that the channel might become invalid immediately after we check it so we still need to fail if we use a channel that is invalid and so can handle all of the logic there.
Also if we do have something like this I don't think there is any logical relationship between the pool size and the number of times we should retry. This seems to be based on the idea that we're checking "all" the values in the pool before giving up but the items in the pool are potentially constantly changing so a fixed number of retries might make more sense here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think you might be right. I was thinking of covering a potential edge case where all the channels in the pool are closed at the same time, and there is only a single thread executing a request. However, I think it's likely better for the behaviour of the pool to be more deterministic by having a fixed number of retries. I used 3 (for no specific reason) but I'm happy to change it to something lower / higher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adamgfraser I had to change the code back to having dynamic retries. I added a test that showcases that a fixed number of retries would cause issues in cases that all channels in the connection pool are closed (e.g., in case of server idle timeouts)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be based on the idea that we're checking "all" the values in the pool before giving up but the items in the pool are potentially constantly changing so a fixed number of retries might make more sense here.
PS: I agree with your point here, having the number of retries equal the max number of items that are possible to exist in the pool doesn't guarantee that we checked every single item. However, it does guarantee that the get
effect of ZPool is executed at least once
@@ -49,11 +49,11 @@ object ClientHttpsSpec extends ZIOHttpSpec { | |||
test("respond Ok") { | |||
val actual = Client.request(Request.get(waterAerobics)) | |||
assertZIO(actual)(anything) | |||
}, | |||
} @@ ignore, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can fix this one and the one below it by using a different URL. The water aerobics one has an expired SSL certificate.
@adamgfraser just going through the code to double-check everything and I stumbled across this. Do you recon we should only do |
@kyri-petrou You're saying only call And we want to have a situation where we know by the time the promise is completed that user is completely done with the connection, since as soon as the promise is completed the connection may be reallocated to someone else who is working under the assumption that they have exclusive access to it. |
@adamgfraser thanks a lot for the pointers. I think the PR is ready to be re-reviewed now. Just some notes on the most recent changes:
I think perhaps we could benefit from adding a "ConnectionManager" that monitors items in the pool and invalidates them / refreshes them asynchronously, but I think we can flag that as a separate issue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to merge when tests are passing!
@adamgfraser do you happen to have any idea why the 2 tests that I re-enabled (which are not related to this PR) are passing when I run them locally but fail with My gut tells me it seems like an issue with the DNS configuration of the GHA workers. I tried different endpoints (google.com, example.com, jsonplaceholder.typicode.com/posts) and they all seem to be failing with the same error |
OK I ended up using |
/claim #2479
/fixes #2479
The fix to the flaky test might seem trivial but it actually identifies a much more severe issue with the client's connection pool.
When a channel is marked as
Invalid
, the invalidation happens viaZPool
'sinvalidate
method (see here). However, the method (as pointed out in the Scaladoc) doesn't guarantee that the invalidation will take place immediately. This means that once the scoped is closed (and the channel returned back to the pool) it is possible for an invalid channel to be picked out of the connection pool prior to it being actually invalidated.I'm not 100% sure what is the best way forward with this - I think perhaps the issue should be addressed in ZIO, where
ZPool
should either expose aninvalidateNow
method, or mark items that have been invalidated "to be discarded", and ensure that they will not be used again.Thoughts on this? Is this indeed an issue / limitation to be addressed in ZPool's implementation, or should we add some interim patch for this behaviour in zio-http?