Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a synchronous web socket interface #1913

Merged
merged 4 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 0 additions & 68 deletions core/src/main/scala/sttp/client4/SttpApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -255,72 +255,4 @@ trait SttpApi extends SttpExtensions with UriInterpolator {
StreamBody(s)(b),
contentType = Some(MediaType.ApplicationOctetStream)
)

// websocket response specifications

def asWebSocket[F[_], T](f: WebSocket[F] => F[T]): WebSocketResponseAs[F, Either[String, T]] =
asWebSocketEither(asStringAlways, asWebSocketAlways(f))

def asWebSocketWithMetadata[F[_], T](
f: (WebSocket[F], ResponseMetadata) => F[T]
): WebSocketResponseAs[F, Either[String, T]] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysWithMetadata(f))

def asWebSocketAlways[F[_], T](f: WebSocket[F] => F[T]): WebSocketResponseAs[F, T] =
asWebSocketAlwaysWithMetadata((w, _) => f(w))

def asWebSocketAlwaysWithMetadata[F[_], T](f: (WebSocket[F], ResponseMetadata) => F[T]): WebSocketResponseAs[F, T] =
WebSocketResponseAs(ResponseAsWebSocket(f))

def asWebSocketUnsafe[F[_]]: WebSocketResponseAs[F, Either[String, WebSocket[F]]] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysUnsafe)

def asWebSocketAlwaysUnsafe[F[_]]: WebSocketResponseAs[F, WebSocket[F]] =
WebSocketResponseAs(ResponseAsWebSocketUnsafe())

def fromMetadata[F[_], T](
default: ResponseAs[T],
conditions: ConditionalResponseAs[WebSocketResponseAs[F, T]]*
): WebSocketResponseAs[F, T] =
WebSocketResponseAs(ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate))

/** Uses the `onSuccess` response specification for 101 responses (switching protocols) on JVM/Native, 200 responses
* on JS. Otherwise, use the `onError` specification.
*/
def asWebSocketEither[F[_], A, B](
onError: ResponseAs[A],
onSuccess: WebSocketResponseAs[F, B]
): WebSocketResponseAs[F, Either[A, B]] =
SttpExtensions.asWebSocketEitherPlatform(onError, onSuccess)

// websocket stream response specifications

def asWebSocketStream[S](
s: Streams[S]
)(p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame]): WebSocketStreamResponseAs[Either[String, Unit], S] =
asWebSocketEither(asStringAlways, asWebSocketStreamAlways(s)(p))

def asWebSocketStreamAlways[S](s: Streams[S])(
p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame]
): WebSocketStreamResponseAs[Unit, S] = WebSocketStreamResponseAs[Unit, S](ResponseAsWebSocketStream(s, p))

def fromMetadata[T, S](
default: ResponseAs[T],
conditions: ConditionalResponseAs[WebSocketStreamResponseAs[T, S]]*
): WebSocketStreamResponseAs[T, S] =
WebSocketStreamResponseAs[T, S](
ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate)
)

/** Uses the `onSuccess` response specification for 101 responses (switching protocols), and the `onError`
* specification otherwise.
*/
def asWebSocketEither[A, B, S](
onError: ResponseAs[A],
onSuccess: WebSocketStreamResponseAs[B, S]
): WebSocketStreamResponseAs[Either[A, B], S] =
fromMetadata(
onError.map(Left(_)),
ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_)))
).showAs(s"either(${onError.show}, ${onSuccess.show})")
}
41 changes: 41 additions & 0 deletions core/src/main/scala/sttp/client4/SttpWebSocketAsyncApi.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package sttp.client4

import sttp.model.ResponseMetadata
import sttp.ws.WebSocket

trait SttpWebSocketAsyncApi {
def asWebSocket[F[_], T](f: WebSocket[F] => F[T]): WebSocketResponseAs[F, Either[String, T]] =
asWebSocketEither(asStringAlways, asWebSocketAlways(f))

def asWebSocketWithMetadata[F[_], T](
f: (WebSocket[F], ResponseMetadata) => F[T]
): WebSocketResponseAs[F, Either[String, T]] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysWithMetadata(f))

def asWebSocketAlways[F[_], T](f: WebSocket[F] => F[T]): WebSocketResponseAs[F, T] =
asWebSocketAlwaysWithMetadata((w, _) => f(w))

def asWebSocketAlwaysWithMetadata[F[_], T](f: (WebSocket[F], ResponseMetadata) => F[T]): WebSocketResponseAs[F, T] =
WebSocketResponseAs(ResponseAsWebSocket(f))

def asWebSocketUnsafe[F[_]]: WebSocketResponseAs[F, Either[String, WebSocket[F]]] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysUnsafe)

def asWebSocketAlwaysUnsafe[F[_]]: WebSocketResponseAs[F, WebSocket[F]] =
WebSocketResponseAs(ResponseAsWebSocketUnsafe())

def fromMetadata[F[_], T](
default: ResponseAs[T],
conditions: ConditionalResponseAs[WebSocketResponseAs[F, T]]*
): WebSocketResponseAs[F, T] =
WebSocketResponseAs(ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate))

/** Uses the `onSuccess` response specification for 101 responses (switching protocols) on JVM/Native, 200 responses
* on JS. Otherwise, use the `onError` specification.
*/
def asWebSocketEither[F[_], A, B](
onError: ResponseAs[A],
onSuccess: WebSocketResponseAs[F, B]
): WebSocketResponseAs[F, Either[A, B]] =
SttpExtensions.asWebSocketEitherPlatform(onError, onSuccess)
}
37 changes: 37 additions & 0 deletions core/src/main/scala/sttp/client4/SttpWebSocketStreamApi.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package sttp.client4

import sttp.capabilities.Streams
import sttp.model.StatusCode
import sttp.ws.WebSocketFrame

trait SttpWebSocketStreamApi {
def asWebSocketStream[S](
s: Streams[S]
)(p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame]): WebSocketStreamResponseAs[Either[String, Unit], S] =
asWebSocketEither(asStringAlways, asWebSocketStreamAlways(s)(p))

def asWebSocketStreamAlways[S](s: Streams[S])(
p: s.Pipe[WebSocketFrame.Data[_], WebSocketFrame]
): WebSocketStreamResponseAs[Unit, S] = WebSocketStreamResponseAs[Unit, S](ResponseAsWebSocketStream(s, p))

def fromMetadata[T, S](
default: ResponseAs[T],
conditions: ConditionalResponseAs[WebSocketStreamResponseAs[T, S]]*
): WebSocketStreamResponseAs[T, S] =
WebSocketStreamResponseAs[T, S](
ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate)
)

/** Uses the `onSuccess` response specification for 101 responses (switching protocols), and the `onError`
* specification otherwise.
*/
def asWebSocketEither[A, B, S](
onError: ResponseAs[A],
onSuccess: WebSocketStreamResponseAs[B, S]
): WebSocketStreamResponseAs[Either[A, B], S] =
fromMetadata(
onError.map(Left(_)),
ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_)))
).showAs(s"either(${onError.show}, ${onSuccess.show})")

}
44 changes: 44 additions & 0 deletions core/src/main/scala/sttp/client4/SttpWebSocketSyncApi.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package sttp.client4

import sttp.client4.ws.SyncWebSocket
import sttp.model.ResponseMetadata
import sttp.ws.WebSocket

trait SttpWebSocketSyncApi {
def asWebSocket[T](f: SyncWebSocket => T): WebSocketResponseAs[Identity, Either[String, T]] =
asWebSocketEither(asStringAlways, asWebSocketAlways(f))

def asWebSocketWithMetadata[T](
f: (SyncWebSocket, ResponseMetadata) => T
): WebSocketResponseAs[Identity, Either[String, T]] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysWithMetadata(f))

def asWebSocketAlways[T](f: SyncWebSocket => T): WebSocketResponseAs[Identity, T] =
asWebSocketAlwaysWithMetadata((w, _) => f(w))

def asWebSocketAlwaysWithMetadata[T](
f: (SyncWebSocket, ResponseMetadata) => T
): WebSocketResponseAs[Identity, T] =
WebSocketResponseAs[Identity, T](ResponseAsWebSocket[Identity, T]((ws, m) => f(new SyncWebSocket(ws), m)))

def asWebSocketUnsafe: WebSocketResponseAs[Identity, Either[String, SyncWebSocket]] =
asWebSocketEither(asStringAlways, asWebSocketAlwaysUnsafe)

def asWebSocketAlwaysUnsafe: WebSocketResponseAs[Identity, SyncWebSocket] =
WebSocketResponseAs[Identity, WebSocket[Identity]](ResponseAsWebSocketUnsafe()).map(new SyncWebSocket(_))

def fromMetadata[T](
default: ResponseAs[T],
conditions: ConditionalResponseAs[WebSocketResponseAs[Identity, T]]*
): WebSocketResponseAs[Identity, T] =
WebSocketResponseAs(ResponseAsFromMetadata(conditions.map(_.map(_.delegate)).toList, default.delegate))

/** Uses the `onSuccess` response specification for 101 responses (switching protocols) on JVM/Native, 200 responses
* on JS. Otherwise, use the `onError` specification.
*/
def asWebSocketEither[A, B](
onError: ResponseAs[A],
onSuccess: WebSocketResponseAs[Identity, B]
): WebSocketResponseAs[Identity, Either[A, B]] =
SttpExtensions.asWebSocketEitherPlatform(onError, onSuccess)
}
140 changes: 140 additions & 0 deletions core/src/main/scala/sttp/client4/ws/SyncWebSocket.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package sttp.client4.ws

import sttp.client4.Identity
import sttp.model.Headers
import sttp.ws.{WebSocket, WebSocketClosed, WebSocketFrame}

/** Allows interacting with a web socket. Interactions can happen:
*
* - on the frame level, by sending and receiving raw [[WebSocketFrame]] s
* - using the provided `receive*` methods to obtain concatenated data frames, or string/byte payloads, and the
* `send*` method to send string/binary frames.
*
* The `send*` and `receive*` methods may result in a failed effect, with either one of [[sttp.ws.WebSocketException]]
* exceptions, or a backend-specific exception. Specifically, they will fail with [[WebSocketClosed]] if the web socket
* is closed.
*
* See the `either` and `eitherClose` method to lift web socket closed events to the value level.
*/
class SyncWebSocket(val delegate: WebSocket[Identity]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move it to the root client4 package?

The idea is that everything should be available from a single import:

import sttp.client4._

object WebSocketSynchronous extends App {
  def useWebSocket(ws: SyncWebSocket): Unit = ???

  val backend = DefaultSyncBackend()

  try
    basicRequest
      .get(uri"wss://ws.postman-echo.com/raw")
      .response(ws.sync.asWebSocket(useWebSocket))
      .send(backend)
  finally backend.close()
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would rather go to the sttp-shared project, next to WebSocket: https://github.com/softwaremill/sttp-shared/blob/master/ws/src/main/scala/sttp/ws/WebSocket.scala

(it's shared among sttp client / tapir)


/** Receive the next frame from the web socket. This can be a data frame, or a control frame including
* [[WebSocketFrame.Close]]. After receiving a close frame, no further interactions with the web socket should
* happen.
*
* However, not all implementations expose the close frame, and web sockets might also get closed without the proper
* close frame exchange. In such cases, as well as when invoking `receive`/`send` after receiving a close frame, a
* [[WebSocketClosed]] exception will be thrown.
*
* *Should be only called sequentially!* (from a single thread/fiber). Because web socket frames might be fragmented,
* calling this method concurrently might result in threads/fibers receiving fragments of the same frame.
*/
def receive(): WebSocketFrame = delegate.receive()

/** Sends a web socket frame. Can be safely called from multiple threads.
*
* May result in an exception, in case of a network error, or if the socket is closed.
*/
def send(f: WebSocketFrame, isContinuation: Boolean = false): Unit = delegate.send(f, isContinuation)

def isOpen(): Boolean = delegate.isOpen()

/** Receive a single data frame, ignoring others. The frame might be a fragment. Will throw [[WebSocketClosed]] if the
* web socket is closed, or if a close frame is received.
*
* *Should be only called sequentially!* (from a single thread/fiber).
*
* @param pongOnPing
* Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
*/
def receiveDataFrame(pongOnPing: Boolean = true): WebSocketFrame.Data[_] = delegate.receiveDataFrame(pongOnPing)

/** Receive a single text data frame, ignoring others. The frame might be a fragment. To receive whole messages, use
* [[receiveText]]. Will throw [[WebSocketClosed]] if the web socket is closed, or if a close frame is received.
*
* *Should be only called sequentially!* (from a single thread/fiber).
*
* @param pongOnPing
* Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
*/
def receiveTextFrame(pongOnPing: Boolean = true): WebSocketFrame.Text = delegate.receiveTextFrame(pongOnPing)

/** Receive a single binary data frame, ignoring others. The frame might be a fragment. To receive whole messages, use
* [[receiveBinary]]. Will throw [[WebSocketClosed]] if the web socket is closed, or if a close frame is received.
*
* *Should be only called sequentially!* (from a single thread/fiber).
*
* @param pongOnPing
* Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
*/
def receiveBinaryFrame(pongOnPing: Boolean = true): WebSocketFrame.Binary = delegate.receiveBinaryFrame(pongOnPing)

/** Receive a single text message (which might come from multiple, fragmented frames). Ignores non-text frames and
* returns combined results. Will throw [[WebSocketClosed]] if the web socket is closed, or if a close frame is
* received.
*
* *Should be only called sequentially!* (from a single thread/fiber).
*
* @param pongOnPing
* Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
*/
def receiveText(pongOnPing: Boolean = true): String = delegate.receiveText(pongOnPing)

/** Receive a single binary message (which might come from multiple, fragmented frames). Ignores non-binary frames and
* returns combined results. Will throw [[WebSocketClosed]] if the web socket is closed, or if a close frame is
* received.
*
* *Should be only called sequentially!* (from a single thread/fiber).
*
* @param pongOnPing
* Should a [[WebSocketFrame.Pong]] be sent when a [[WebSocketFrame.Ping]] is received.
*/
def receiveBinary(pongOnPing: Boolean): Array[Byte] = delegate.receiveBinary(pongOnPing)

/** Extracts the received close frame (if available) as the left side of an either, or returns the original result on
* the right.
*
* Will throw [[WebSocketClosed]] if the web socket is closed, but no close frame is available.
*
* @param f
* The effect describing web socket interactions.
*/
def eitherClose[T](f: => T): Either[WebSocketFrame.Close, T] =
try Right(f)
catch {
case WebSocketClosed(Some(close)) => Left(close)
}

/** Returns an effect computing a:
*
* - `Left` if the web socket is closed - optionally with the received close frame (if available).
* - `Right` with the original result otherwise.
*
* Will never throw a [[WebSocketClosed]].
*
* @param f
* The effect describing web socket interactions.
*/
def either[T](f: => T): Either[Option[WebSocketFrame.Close], T] =
try Right(f)
catch {
case WebSocketClosed(close) => Left(close)
}

/** Sends a web socket frame with the given payload. Can be safely called from multiple threads.
*
* May result in an exception, in case of a network error, or if the socket is closed.
*/
def sendText(payload: String): Unit = delegate.sendText(payload)

/** Sends a web socket frame with the given payload. Can be safely called from multiple threads.
*
* May result in an exception, in case of a network error, or if the socket is closed.
*/
def sendBinary(payload: Array[Byte]): Unit = delegate.sendBinary(payload)

/** Idempotent when used sequentially. */
def close(): Unit = delegate.close()

def upgradeHeaders: Headers = delegate.upgradeHeaders
}
7 changes: 7 additions & 0 deletions core/src/main/scala/sttp/client4/ws/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package sttp.client4

package object ws {
object async extends SttpWebSocketAsyncApi
object sync extends SttpWebSocketSyncApi
object stream extends SttpWebSocketStreamApi
}
2 changes: 1 addition & 1 deletion core/src/main/scalajs/sttp/client4/SttpExtensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object SttpExtensions {
onError: ResponseAs[A],
onSuccess: WebSocketResponseAs[F, B]
): WebSocketResponseAs[F, Either[A, B]] =
fromMetadata(
ws.async.fromMetadata(
onError.map(Left(_)),
ConditionalResponseAs(_.code == StatusCode.Ok, onSuccess.map(Right(_)))
).showAs(s"either(${onError.show}, ${onSuccess.show})")
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scalajvm/sttp/client4/SttpExtensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ object SttpExtensions {
onError: ResponseAs[A],
onSuccess: WebSocketResponseAs[F, B]
): WebSocketResponseAs[F, Either[A, B]] =
fromMetadata(
onError.map(Left(_)),
ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_)))
).showAs(s"either(${onError.show}, ${onSuccess.show})")
ws.async
.fromMetadata(
onError.map(Left(_)),
ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_)))
)
.showAs(s"either(${onError.show}, ${onSuccess.show})")
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object SttpExtensions {
onError: ResponseAs[A],
onSuccess: WebSocketResponseAs[F, B]
): WebSocketResponseAs[F, Either[A, B]] =
fromMetadata(
sttp.client4.ws.async.fromMetadata(
onError.map(Left(_)),
ConditionalResponseAs(_.code == StatusCode.SwitchingProtocols, onSuccess.map(Right(_)))
).showAs(s"either(${onError.show}, ${onSuccess.show})")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import sttp.client4.SttpClientException.ReadException
import sttp.client4._
import sttp.client4.internal._
import sttp.client4.monad.IdMonad
import sttp.client4.ws.async._
import sttp.model._
import sttp.monad.{FutureMonad, TryMonad}
import sttp.ws.WebSocketFrame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.scalatest.Suite
import org.scalatest.flatspec.AsyncFlatSpecLike
import sttp.client4.testing.HttpTest.wsEndpoint
import sttp.client4._
import sttp.client4.ws.async._
import sttp.monad.MonadError
import sttp.client4.testing.ConvertToFuture

Expand Down
Loading
Loading