Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ember wsclient #7196

Merged
merged 50 commits into from
Aug 25, 2023
Merged

Ember wsclient #7196

merged 50 commits into from
Aug 25, 2023

Conversation

danghieutrung
Copy link
Contributor

@danghieutrung danghieutrung commented Jul 1, 2023


This is my current draft PR. I will later create the WSConnection for the WSClient

Copy link

@yilinwei yilinwei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments. I'm super excited to have this in ember, so thank you very much for looking into it!

unixSockets
.orElse(defaultUnixSockets)
.liftTo(
new RuntimeException(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe IllegalStateException or UnsupportedOperationException? It'd be nice if we didn't need to throw at all though.

I'm guessing that this is thrown if the JDK doesn't include the unix socket support or we don't have fs2 netty or something? Maybe we can have a reference to how to fix that in the exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think this code is copy-pasted from elsewhere.

In any case, currently this exception is thrown only on Scala Native, and that will be solved very soon.

// call it repeatedly until all frames in the buffer are decoded.
val frames = ArrayBuffer.empty[WebSocketFrame]
var byteBuffer = ByteBuffer.wrap(buffer)
val transcoder = new FrameTranscoder(isClient)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use FrameTranscoder.getMsgLength to peek at the frame rather than this try catch approach?

It will mean that we don't have to throw exceptions as part of the normal control flow, which is a little expensive.

case Some((chunk, next)) =>
ApplicativeThrow[Pull[F, WebSocketFrame, *]]
.catchNonFatal { // `bufferToFrame` might throw
val buffer = acc ++ chunk.toArray[Byte]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's worth breaking up the flow into two separate flows, since I think that the single frame case is possibly common enough to optimize; I'd need to check the runtime profile of some of the websocket apps that we're using though.

Copy link
Contributor

@diesalbla diesalbla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few suggestions

Comment on lines +394 to +398
tlsContextOptWithDefault <-
tlsContextOpt
.fold(Network[F].tlsContext.systemResource.attempt.map(_.toOption))(
_.some.pure[Resource[F, *]]
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO once an option fold takes more than two lines, a match may come simpler:

Suggested change
tlsContextOptWithDefault <-
tlsContextOpt
.fold(Network[F].tlsContext.systemResource.attempt.map(_.toOption))(
_.some.pure[Resource[F, *]]
)
tlsContextOptWithDefault <- tlsContextOpt match {
case None => Network[F].tlsContext.systemResource.attempt.map(_.toOption)
case Some(tlsContext) => Resource.pure(Some(tlsContext))
}

Comment on lines 37 to 41
Stream.eval(read).flatMap {
case Some(bytes) =>
Stream.chunk(bytes) ++ readStream(read)
case None => Stream.empty
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought there was a Stream factory method for this kind of loop, F[Option[Chunk[A]]] => Stream[F, A], but there is not.

Copy link

@yilinwei yilinwei Jul 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Steam.eval(read).repeat.unNoneTerminate.unchunks?

Comment on lines 86 to 88
def receive: F[Option[org.http4s.client.websocket.WSFrame]] = ???
def send(wsf: org.http4s.client.websocket.WSFrame): F[Unit] = ???
def sendMany[G[_], A <: org.http4s.client.websocket.WSFrame](wsfs: G[A])(implicit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add an import org.http4s.client.websocket.WSFrame at top?

Comment on lines 97 to 101
for {
_ <- emberClient
.use { client =>
client
.run(wsRequest(s"ws://${server.address.getHostName}:${server.address.getPort}/ws-echo"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for {
_ <- emberClient
.use { client =>
client
.run(wsRequest(s"ws://${server.address.getHostName}:${server.address.getPort}/ws-echo"))
val req = s"ws://${server.address.getHostName}:${server.address.getPort}/ws-echo"
for {
_ <- emberClient
.use { client =>
client
.run(wsRequest(req))

Comment on lines 60 to 62
Resource.eval(validateServerHandshake(res, exampleSecWebSocketKey)).map { isValid =>
isValid match {
case Left(_) => None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the body of a map, or flatMap, or other higher-order-function, has the form of a pattern-matching, it is possible to omit the variable and just write the cases.

Suggested change
Resource.eval(validateServerHandshake(res, exampleSecWebSocketKey)).map { isValid =>
isValid match {
case Left(_) => None
Resource.eval(validateServerHandshake(res, exampleSecWebSocketKey)).map {
case Left(_) => None

Comment on lines +464 to +466
) { case ((response, drain), exitCase) =>
exitCase match {
case Resource.ExitCase.Succeeded =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, if we have a pattern-match as the body of a case, we can flatten the pattern-matches by nesting the cases:

Suggested change
) { case ((response, drain), exitCase) =>
exitCase match {
case Resource.ExitCase.Succeeded =>
) {
case ((response, drain), Resource.ExitCase.Succeeded) =>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the suggestions!

Copy link
Member

@armanbilge armanbilge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome!

def send(wsf: org.http4s.client.websocket.WSFrame): F[Unit] = ???
def sendMany[G[_], A <: org.http4s.client.websocket.WSFrame](wsfs: G[A])(implicit
def receive: F[Option[WSFrame]] = for {
_ <- socket.reads.through(decodeFrames(true)).evalTap(clientReceiveQueue.offer(_)).compile.drain
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should move this one into the for and run it with .background.

} yield frame
def send(wsf: WSFrame): F[Unit] = for {
_ <- toWebSocketFrame(wsf).map(clientSendQueue.offer(_))
_ <- clientSendQueue.take.map(f => frameToBytes(f, true).traverse_(c => socket.write(c)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with this one, I think you might need to do foreverM.background so that it loops forever.

Comment on lines 64 to 75
def getSocket[F[_]](client: Client[F], request: Request[F])(implicit
F: MonadCancel[F, Throwable]
): Resource[F, Resource[F, Option[Socket[F]]]] = {
val webSocketKey = createWebSocketKey[F]
client
.run(request)
.map { res =>
Resource
.eval(validateServerHandshake(res, exampleSecWebSocketKey))
.map(isValid => isValid.toOption *> res.attributes.lookup(webSocketKey))
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type could be simplified a bit here

Suggested change
def getSocket[F[_]](client: Client[F], request: Request[F])(implicit
F: MonadCancel[F, Throwable]
): Resource[F, Resource[F, Option[Socket[F]]]] = {
val webSocketKey = createWebSocketKey[F]
client
.run(request)
.map { res =>
Resource
.eval(validateServerHandshake(res, exampleSecWebSocketKey))
.map(isValid => isValid.toOption *> res.attributes.lookup(webSocketKey))
}
}
def getSocket[F[_]](client: Client[F], request: Request[F])(implicit
F: MonadCancel[F, Throwable]
): Resource[F, Option[Socket[F]]] = {
val webSocketKey = createWebSocketKey[F]
client
.run(request)
.flatMap { res =>
Resource
.eval(validateServerHandshake(res, exampleSecWebSocketKey))
.map(isValid => isValid.toOption *> res.attributes.lookup(webSocketKey))
}
}

Comment on lines 107 to 108
socketResource <- getSocket(emberClient, httpWSRequest)
socketOption <- socketResource
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
socketResource <- getSocket(emberClient, httpWSRequest)
socketOption <- socketResource
socketOption <- getSocket(emberClient, httpWSRequest)

Comment on lines 70 to 75
.run(request)
.flatMap { res =>
Resource
.eval(validateServerHandshake(res, exampleSecWebSocketKey))
.map(isValid => isValid.toOption *> res.attributes.lookup(webSocketKey))
}
Copy link
Contributor

@diesalbla diesalbla Jul 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • The inner Resource.eval(ioa).map(a2b) should be equivalent to a Resource.eval(ioa.map(a2b))
  • The outer Resource.flatMap(res => Resource.eval(resToB)) should be equivalent to a Resource.evalMap.
  • Also, that isValid.toOption *> is easier as an if-then-else or an ifF.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks will edit this part of code!

Comment on lines +39 to +42
val httpWSRequest = Request[F]()
.withUri(wsRequest.uri)
.withHeaders(wsRequest.headers)
.withMethod(Method.GET)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a bug here: the Uri for wsRequest will be either ws:// or wss:// but this is not correct for a HTTP request. So we should rewrite that to http:// or https:// before making the request.

Comment on lines 67 to 71
_ <- clientSendQueue.take
.flatMap(f => frameToBytes(f, true).traverse_(c => socket.write(c)))
.foreverM
.void
.background
Copy link
Member

@armanbilge armanbilge Aug 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change this to an FS2 Channel so that it can have graceful closure. Then, we should close the Channel after sending a CloseFrame.

Finally, to make sure that all the frames are sent before closing the connection, we should wait for the process to complete naturally.

So something like this:

        sendingFinished <- clientSendChannel
          .foreach(f => frameToBytes(f, true).traverse_(c => socket.write(c)))
          .compile
          .drain
          .background
       _ <- Resource.onFinalize(sendingFinished.void) // wait for sending to finish

@armanbilge armanbilge merged commit 3d739a9 into http4s:ember-wsclient Aug 25, 2023
@armanbilge
Copy link
Member

Published as 0.23.23-90-3d739a9-SNAPSHOT. Please give it a try!

https://github.com/http4s/http4s/actions/runs/5977900248/attempts/2#summary-16220706487

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants