Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KTOR-7186 Improve the docs by showcasing broadcasting messages to WebSocket clients via SharedFlow #495

Merged
merged 9 commits into from
Jul 17, 2024
2 changes: 1 addition & 1 deletion codeSnippets/settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ module("snippets", "client-http-send")
module("snippets", "deployment-ktor-plugin")
module("snippets", "tutorial-website-interactive")
module("snippets", "tutorial-website-static")
module("snippets", "tutorial-websockets-server")
module("snippets", "server-websockets-sharedflow")
module("snippets", "tutorial-websockets-client")
module("snippets", "tutorial-client-get-started")
module("snippets", "tutorial-server-get-started")
Expand Down
27 changes: 27 additions & 0 deletions codeSnippets/snippets/server-websockets-sharedflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# WebSockets chat server

A sample Ktor project showing how to create
a WebSocket chat server using Ktor.

> This sample is a part of the [codeSnippets](../../README.md) Gradle project.

## Run

To run the service, execute the following command in the repository's root directory:

```bash
./gradlew :server-websockets-sharedflow:run
```

The service will be running at `http://0.0.0.0:8080`. To test the functionality,
connect multiple WebSocket clients to `ws://localhost:8080/ws` and send messages to observe broadcasting.

You can use the `websocket.http` file to run the requests directly from the IDE.

## Unit Testing

Unit tests are defined in `src/test/kotlin/com/example`. To run the tests, use the following command:

```bash
./gradlew :server-websockets-sharedflow:test
```
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ plugins {
application
kotlin("jvm")
id("io.ktor.plugin") version "2.3.12"
kotlin("plugin.serialization").version("1.9.10")
}

application {
Expand All @@ -21,6 +22,7 @@ dependencies {
implementation("io.ktor:ktor-server-core-jvm")
implementation("io.ktor:ktor-server-websockets-jvm")
implementation("io.ktor:ktor-server-netty-jvm")
implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")
implementation("ch.qos.logback:logback-classic:$logback_version")
testImplementation("io.ktor:ktor-server-tests-jvm")
testImplementation("org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.example.models

import kotlinx.serialization.Serializable

@Serializable
data class MessageResponse(val message: String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.example.plugins

import com.example.*
import com.example.models.MessageResponse
import io.ktor.server.application.*
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import io.ktor.websocket.*
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch
import java.time.*

fun Application.configureSockets() {
install(WebSockets) {
pingPeriod = Duration.ofSeconds(15)
timeout = Duration.ofSeconds(15)
maxFrameSize = Long.MAX_VALUE
masking = false
}
routing {
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()

webSocket("/ws") {
send("You are connected to WebSocket!")

val job = launch {
sharedFlow.collect { message ->
send(message.message)
}
}

runCatching {
incoming.consumeEach { frame ->
if (frame is Frame.Text) {
val receivedText = frame.readText()
val messageResponse = MessageResponse(receivedText)
messageResponseFlow.emit(messageResponse)
}
}
}.onFailure { exception ->
println("WebSocket exception: ${exception.localizedMessage}")
}.also {
job.cancel()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.example

import io.ktor.client.plugins.websocket.*
import io.ktor.server.testing.*
import io.ktor.websocket.*
import kotlin.test.*
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

class ApplicationTest {
@Test
fun testConversation() {
testApplication {
coroutineScope {
val client1 = createClient {
install(WebSockets)
}
val client2 = createClient {
install(WebSockets)
}

launch {
client1.webSocket("/ws") {
assertEquals("You are connected to WebSocket!", (incoming.receive() as Frame.Text).readText())
send(Frame.Text("Hello from client1"))
assertEquals("Hello from client1", (incoming.receive() as Frame.Text).readText())
assertEquals("Hello from client2", (incoming.receive() as Frame.Text).readText())
}
}

launch {
client2.webSocket("/ws") {
assertEquals("You are connected to WebSocket!", (incoming.receive() as Frame.Text).readText())
delay(100) // Ensure client1's message is sent first
send(Frame.Text("Hello from client2"))
assertEquals("Hello from client1", (incoming.receive() as Frame.Text).readText())
assertEquals("Hello from client2", (incoming.receive() as Frame.Text).readText())
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
WEBSOCKET ws://0.0.0.0:8080/chat
WEBSOCKET ws://0.0.0.0:8080/ws
Content-Type: text/plain

Hello, there!

###
WEBSOCKET ws://0.0.0.0:8080/chat
WEBSOCKET ws://0.0.0.0:8080/ws
Content-Type: text/plain

Hi!
11 changes: 0 additions & 11 deletions codeSnippets/snippets/tutorial-websockets-server/README.md

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

25 changes: 17 additions & 8 deletions topics/server-websockets.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,34 @@ see [server-websockets](https://github.com/ktorio/ktor-documentation/tree/%ktor_

### Example: Handle multiple sessions {id="handle-multiple-session"}

To handle multiple WebSocket sessions, you need to
store each session on a server. To do this, you define a connection with a unique name and associate it with a specified
session. A sample `Connection` class below shows how to do this:
To efficiently manage multiple WebSocket sessions and handle broadcasting, you can utilize Kotlin's
[`SharedFlow`](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-shared-flow/).
This approach provides a scalable and concurrency-friendly method for managing WebSocket communications. Here's how to
implement this pattern:

1. Define a `SharedFlow` for broadcasting messages:

```kotlin
```

{src="snippets/tutorial-websockets-server/src/main/kotlin/com/example/Connection.kt" include-lines="3-11"}
{src="snippets/server-websockets-sharedflow/src/main/kotlin/com/example/plugins/Sockets.kt" include-lines="23-24"}

Then, you can create a new connection inside the `webSocket` handler when a new client connects to the WebSocket
endpoint:
2. In your WebSocket route, implement the broadcasting and message handling logic:

```kotlin
```

{src="snippets/tutorial-websockets-server/src/main/kotlin/com/example/plugins/Sockets.kt" include-lines="19-42"}
{src="snippets/server-websockets-sharedflow/src/main/kotlin/com/example/plugins/Sockets.kt" include-lines="25-48"}

The `runCatching` block processes
incoming messages and emits them to the `SharedFlow`, which then broadcasts to all collectors.

By using this pattern, you can efficiently manage multiple WebSocket sessions without manually tracking individual
connections. This approach scales well for applications with many concurrent WebSocket connections and provides a clean,
reactive way to handle message broadcasting.

For the full example,
see [tutorial-websockets-server](https://github.com/ktorio/ktor-documentation/tree/%ktor_version%/codeSnippets/snippets/tutorial-websockets-server).
see [server-websockets-sharedflow](https://github.com/ktorio/ktor-documentation/tree/%ktor_version%/codeSnippets/snippets/server-websockets-sharedflow).

## The WebSocket API and Ktor {id="websocket-api"}

Expand Down