Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #272 #274

Closed
wants to merge 2 commits into from
Closed

Conversation

NicolasRouquette
Copy link
Contributor

With the fix, the example in the issue produces the following behavior:

[INFO] [05/06/2019 10:08:13.366] [main] [skuber.api] Using following context for connecting to Kubernetes cluster: Context(Cluster(v1,https://192.168.99.100:8443,false,Some(Left(/home/rouquett/.minikube/ca.crt))),CertAuth(clientCertificate=/home/rouquett/.minikube/client.crt clientKey=/home/rouquett/.minikube/client.key userName= ),Namespace(Namespace,v1,ObjectMeta(default,,,,,,None,None,None,Map(),Map(),List(),0,None,None),None,None))
[INFO] [05/06/2019 10:08:14.128] [KubeJobSystem-akka.actor.default-dispatcher-2] [skuber.api] [ { reqId=aa257631-a6b5-4360-ad7c-3adef1474cb8} } - about to send HTTP request: POST https://192.168.99.100:8443/apis/batch/v1/namespaces/default/jobs]
[INFO] [05/06/2019 10:08:14.435] [KubeJobSystem-akka.actor.default-dispatcher-2] [skuber.api] [ { reqId=aa257631-a6b5-4360-ad7c-3adef1474cb8} } - received response with HTTP status 201]
[INFO] [05/06/2019 10:08:17.515] [KubeJobSystem-akka.actor.default-dispatcher-12] [skuber.api] [ { reqId=544dd3c6-dc6d-43e3-90da-66342c8b173b} } - about to send HTTP request: GET https://192.168.99.100:8443/apis/batch/v1/namespaces/default/jobs/pi-0262cfe4-74a2-4440-9307-356e92ff8400]
[INFO] [05/06/2019 10:08:17.543] [KubeJobSystem-akka.actor.default-dispatcher-5] [skuber.api] [ { reqId=544dd3c6-dc6d-43e3-90da-66342c8b173b} } - received response with HTTP status 200]
[INFO] [05/06/2019 10:08:20.564] [KubeJobSystem-akka.actor.default-dispatcher-12] [skuber.api] [ { reqId=be225dc9-3d77-4f67-8f61-c4e67f8bfe7c} } - about to send HTTP request: GET https://192.168.99.100:8443/apis/batch/v1/namespaces/default/jobs/pi-0262cfe4-74a2-4440-9307-356e92ff8400]
[INFO] [05/06/2019 10:08:20.587] [KubeJobSystem-akka.actor.default-dispatcher-12] [skuber.api] [ { reqId=be225dc9-3d77-4f67-8f61-c4e67f8bfe7c} } - received response with HTTP status 200]
[INFO] [05/06/2019 10:08:23.604] [KubeJobSystem-akka.actor.default-dispatcher-3] [skuber.api] [ { reqId=d748bf19-6c30-44e0-9ad4-528c933782a7} } - about to send HTTP request: GET https://192.168.99.100:8443/apis/batch/v1/namespaces/default/jobs/pi-0262cfe4-74a2-4440-9307-356e92ff8400]
[INFO] [05/06/2019 10:08:23.630] [KubeJobSystem-akka.actor.default-dispatcher-5] [skuber.api] [ { reqId=d748bf19-6c30-44e0-9ad4-528c933782a7} } - received response with HTTP status 200]
[INFO] [05/06/2019 10:08:26.654] [KubeJobSystem-akka.actor.default-dispatcher-4] [skuber.api] [ { reqId=d820ef81-c0c4-4de6-93ed-5e796e87931f} } - about to send HTTP request: GET https://192.168.99.100:8443/apis/batch/v1/namespaces/default/jobs/pi-0262cfe4-74a2-4440-9307-356e92ff8400]
[INFO] [05/06/2019 10:08:26.681] [KubeJobSystem-akka.actor.default-dispatcher-5] [skuber.api] [ { reqId=d820ef81-c0c4-4de6-93ed-5e796e87931f} } - received response with HTTP status 200]
[INFO] [05/06/2019 10:08:26.688] [KubeJobSystem-akka.actor.default-dispatcher-3] [akka://KubeJobSystem/user/main/KubeJob] Success: 2019-05-06T17:08:24Z
[INFO] [05/06/2019 10:08:26.690] [KubeJobSystem-akka.actor.default-dispatcher-4] [akka://KubeJobSystem/user/main] Terminated: Actor[akka://KubeJobSystem/user/main/KubeJob#841428984]
[INFO] [05/06/2019 10:08:26.691] [KubeJobSystem-akka.actor.default-dispatcher-4] [akka://KubeJobSystem/user/main] Terminate!

Clean shutdown, no warnings, no errors.

@NicolasRouquette
Copy link
Contributor Author

The git comment is incorrect.
Fixes #273

val http: HttpExt = Http()
val flow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
if (request.uri.scheme == "https")
http.outgoingConnectionHttps(request.uri.authority.host.address(), request.uri.effectivePort, connectionContext = connectionContext, settings = settings.connectionSettings)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outgoingConnectionHttps and outgoingConnection calls create new connections for each request as I understand it? The current singleRequest call benefits from connection pooling, so I'd be concerned about the additional overhead with this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, there's an overhead with using the low-level connection API but it has a nice feature: the lifecycle of the connection is tied to the lifecycle of the Akka flow graph that uses it. This makes for clean shutdown behavior!

The high-level host API has the advantage of connection polling but there are two caveats to address:

What do you think?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For watch commands (which use long polling GET) skuber uses a tailored pool (see https://github.com/doriordan/skuber/blob/master/client/src/main/scala/skuber/api/watch/LongPollingPool.scala) to avoid the issue you mention with respect to blocking other requests and also avoid timeouts. For other use cases the requests are generally expected to not be long-running.
The Akka docs generally recommend using singleRequest over outgoingConnection... - see https://doc.akka.io/docs/akka-http/current/client-side/connection-level.html

 // The `outgoingConnection` API is very low-level. Use it only if you already have a `Source[HttpRequest, _]`
      // (other than Source.single) available that you want to use to run requests on a single persistent HTTP
      // connection.
      //
      // Unfortunately, this case is so uncommon, that we couldn't come up with a good example.
      //
      // In almost all cases it is better to use the `Http().singleRequest()` API instead.

Is the AbruptTerminationException causing serious concern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is that AbruptTerminationException makes it difficult to tell whether there is a genuine problem somewhere or not.

I would be OK in keeping the host-level client connection strategy as long as the code is updated to ensure that the connection pools are shutdown. This should then avoid these exceptions.

Thanks for mentioning the tailored pool. It's a really neat trick!

I haven't used this part of skuber yet but it seems to me that it contributes to the AbruptTerminationException problem since the tailored pools are not currently shutdown.

Looking at this code: https://github.com/doriordan/skuber/blob/master/client/src/main/scala/skuber/api/client/package.scala#L26

The pool signature is:

  type Pool[T] = Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed]

Based on the Akka doc (https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-connection-pool), it seems to me that it ought to be instead:

  type Pool[T] = Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]

My understanding is that at the end of the flow graph, we would get the materialized value -- i.e., the connection pool that was used -- and this would allow us to cleanly shut it down.

What about then applying the same trick for other kinds of requests?

The difference really is that for watch-related operations, there's a connection pool created per invocation of the watch APIs in Skuber whereas other operations would reuse the same connection pool. The reused connection pool could be shutdown at skuber's close.

It seems to me that this ought to solve the original problem -- i.e., no AbruptTerminationException for any happy path -- while retaining the host-level connection benefits.

@NicolasRouquette
Copy link
Contributor Author

@doriordan This is a complete rewrite of the PR that restores the original invoke() logic based on singleRequest; on the other hand, this update exposes the pool used or created in variants of watch... that, previously, always created a pool without any possibility of shutting it down.

See examples/src/main/scala/skuber/examples/job/PiJobs.scala

To achieve this, I had to make API-breaking changes to the skuber API.

@NicolasRouquette
Copy link
Contributor Author

Closing because the revised PR is for a different issue: #280

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants