Skip to content

Commit

Permalink
Improve static file serving and blocking IO
Browse files Browse the repository at this point in the history
  • Loading branch information
nsychev committed Nov 17, 2024
1 parent fab9307 commit 9b23ca2
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 29 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies {
implementation(libs.kotlinx.datetime)
implementation(libs.kotlinx.serialization.json)
implementation(libs.h2)
implementation(libs.jooq.kotlin)
implementation(libs.bundles.jooq)
implementation(libs.bcrypt)

testImplementation(libs.ktor.server.test.host)
Expand Down
1 change: 1 addition & 0 deletions frontend/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ fun TaskContainerScope.pnpmBuild(name: String, configure: PnpmTask.(Directory) -
outputs.cacheIf { true }
environment.set(mapOf("BUILD_PATH" to "build"))
inputs.dir(layout.projectDirectory.dir("src"))
inputs.file(layout.projectDirectory.file("index.html"))
inputs.file(layout.projectDirectory.file("package.json"))
inputs.file(layout.projectDirectory.file("pnpm-lock.yaml"))
inputs.file(layout.projectDirectory.file("tsconfig.json"))
Expand Down
10 changes: 8 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ logback = "1.5.12"
[libraries]
ktor-server-core = { module = "io.ktor:ktor-server-core-jvm", version.ref = "ktor" }
ktor-server-auth = { module = "io.ktor:ktor-server-auth", version.ref = "ktor" }
ktor-server-caching-headers = { module = "io.ktor:ktor-server-caching-headers", version.ref = "ktor" }
ktor-server-compression = { module = "io.ktor:ktor-server-compression", version.ref = "ktor" }
ktor-server-conditional-headers = { module = "io.ktor:ktor-server-conditional-headers", version.ref = "ktor" }
ktor-server-content-negotiation = { module = "io.ktor:ktor-server-content-negotiation", version.ref = "ktor" }
ktor-server-auth-jwt = { module = "io.ktor:ktor-server-auth-jwt", version.ref = "ktor" }
ktor-server-netty = { module = "io.ktor:ktor-server-netty-jvm", version.ref = "ktor" }
Expand All @@ -31,12 +34,15 @@ junit-launcher = { module = "org.junit.platform:junit-platform-launcher" }
h2 = { module = "com.h2database:h2", version.ref = "h2" }

jooq-kotlin = { module = "org.jooq:jooq-kotlin", version.ref = "jooq" }
jooq-kotlin-coroutines = { module = "org.jooq:jooq-kotlin-coroutines", version.ref = "jooq" }

[bundles]
ktor = [
"ktor-server-core", "ktor-server-auth", "ktor-server-auth-jwt", "ktor-server-content-negotiation", "ktor-server-netty",
"ktor-server-websockets", "ktor-serialization-kotlinx-json"
"ktor-server-core", "ktor-server-auth", "ktor-server-auth-jwt", "ktor-server-caching-headers", "ktor-server-compression",
"ktor-server-conditional-headers", "ktor-server-content-negotiation", "ktor-server-netty", "ktor-server-websockets",
"ktor-serialization-kotlinx-json"
]
jooq = ["jooq-kotlin", "jooq-kotlin-coroutines"]

[plugins]
jooq = { id = "nu.studer.jooq", version = "9.0" }
Expand Down
51 changes: 46 additions & 5 deletions src/main/kotlin/org/icpclive/balloons/Application.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,25 @@ import com.github.ajalt.clikt.core.findOrSetObject
import com.github.ajalt.clikt.core.main
import com.github.ajalt.clikt.core.subcommands
import com.github.ajalt.clikt.parameters.groups.provideDelegate
import io.ktor.http.CacheControl
import io.ktor.http.content.CachingOptions
import io.ktor.serialization.kotlinx.json.json
import io.ktor.server.application.install
import io.ktor.server.engine.connector
import io.ktor.server.engine.embeddedServer
import io.ktor.server.http.content.react
import io.ktor.server.http.content.singlePageApplication
import io.ktor.server.netty.Netty
import io.ktor.server.plugins.cachingheaders.CachingHeaders
import io.ktor.server.plugins.compression.Compression
import io.ktor.server.plugins.compression.condition
import io.ktor.server.plugins.compression.deflate
import io.ktor.server.plugins.compression.gzip
import io.ktor.server.plugins.compression.identity
import io.ktor.server.plugins.conditionalheaders.ConditionalHeaders
import io.ktor.server.plugins.contentnegotiation.ContentNegotiation
import io.ktor.server.request.uri
import io.ktor.server.routing.route
import io.ktor.server.routing.routing
import io.ktor.server.websocket.WebSockets
import kotlinx.serialization.json.Json
Expand Down Expand Up @@ -63,7 +75,28 @@ object Application : CliktCommand("balloons") {
val eventStream = EventStream(balloonRepository)
val cdsFetcher = CDSFetcher(eventStream, cdsSettings)

embeddedServer(Netty, port = balloonSettings.port) {
embeddedServer(
Netty,
configure = {
connector { port = balloonSettings.port }
callGroupSize = parallelism * 2
runningLimit = parallelism * 16
},
) {
install(Compression) {
gzip {
condition {
request.uri.startsWith("/assets/")
}
}
deflate {
condition {
request.uri.startsWith("/assets/")
}
}
identity()
}

install(ContentNegotiation) {
json(
Json {
Expand All @@ -79,13 +112,21 @@ object Application : CliktCommand("balloons") {
launchCDSFetcher(cdsFetcher)

routing {
singlePageApplication {
react("frontend")
useResources = true
}
adminController(volunteerRepository)
authController(secretKeyRepository, volunteerRepository, balloonSettings.disableRegistration)
contestController(eventStream, webSocketAuthenticator, balloonSettings)
route("/") {
install(CachingHeaders) {
options { _, _ -> CachingOptions(CacheControl.MaxAge(maxAgeSeconds = 30)) }
}

install(ConditionalHeaders)

singlePageApplication {
react("frontend")
useResources = true
}
}
}
}.start(wait = true)
}
Expand Down
18 changes: 10 additions & 8 deletions src/main/kotlin/org/icpclive/balloons/db/BalloonRepository.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.icpclive.balloons.db

import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactive.awaitSingle
import org.icpclive.balloons.db.tables.references.BALLOON
import org.icpclive.balloons.db.tables.references.VOLUNTEER
import org.icpclive.balloons.event.Balloon
Expand All @@ -10,20 +12,20 @@ class BalloonRepository(private val jooq: DSLContext) {
/**
* @return record containing delivery status (`BALLOON.DELIVERED`) and responsible volunteer login (`VOLUNTEER.LOGIN`)
*/
fun getDelivery(balloon: Balloon): Record? =
suspend fun getDelivery(balloon: Balloon): Record? =
jooq.select(BALLOON.DELIVERED, VOLUNTEER.LOGIN)
.from(BALLOON)
.leftJoin(VOLUNTEER).on(BALLOON.VOLUNTEER_ID.eq(VOLUNTEER.ID))
.where(
BALLOON.PROBLEM_ID.eq(balloon.problemId),
BALLOON.TEAM_ID.eq(balloon.teamId),
)
.fetchOne()
.awaitFirstOrNull()

/**
* @return `true` if balloon is reserved for this volunteer (even if it already was), `false` otherwise
*/
fun reserveBalloon(
suspend fun reserveBalloon(
balloon: Balloon,
volunteerId: Long,
): Boolean =
Expand All @@ -39,12 +41,12 @@ class BalloonRepository(private val jooq: DSLContext) {
.set(BALLOON.PROBLEM_ID, balloon.problemId)
.set(BALLOON.TEAM_ID, balloon.teamId)
.set(BALLOON.VOLUNTEER_ID, volunteerId)
.execute() > 0
.awaitSingle() > 0

/**
* @return `true` if balloon was dropped, `false` otherwise
*/
fun dropBalloon(
suspend fun dropBalloon(
balloon: Balloon,
volunteerId: Long,
): Boolean =
Expand All @@ -56,12 +58,12 @@ class BalloonRepository(private val jooq: DSLContext) {
BALLOON.VOLUNTEER_ID.eq(volunteerId),
BALLOON.DELIVERED.eq(false),
)
.execute() > 0
.awaitSingle() > 0

/**
* @return `true` if balloon is delivered (even if it already was), `false` otherwise
*/
fun deliverBalloon(
suspend fun deliverBalloon(
balloon: Balloon,
volunteerId: Long,
): Boolean =
Expand All @@ -72,5 +74,5 @@ class BalloonRepository(private val jooq: DSLContext) {
BALLOON.TEAM_ID.eq(balloon.teamId),
BALLOON.VOLUNTEER_ID.eq(volunteerId),
)
.execute() > 0
.awaitSingle() > 0
}
12 changes: 11 additions & 1 deletion src/main/kotlin/org/icpclive/balloons/db/DatabaseModule.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package org.icpclive.balloons.db

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asExecutor
import org.jooq.ExecutorProvider
import org.jooq.SQLDialect
import org.jooq.impl.DSL
import org.jooq.impl.DefaultConfiguration

data class DatabaseModule(
val balloonRepository: BalloonRepository,
Expand All @@ -11,7 +15,13 @@ data class DatabaseModule(

fun databaseModule(databaseConfig: DatabaseConfig): DatabaseModule {
val dbConnection = databaseConfig.createConnection()
val jooq = DSL.using(dbConnection, SQLDialect.H2)
val jooq =
DSL.using(
DefaultConfiguration()
.set(ExecutorProvider { Dispatchers.IO.asExecutor() })
.set(dbConnection)
.set(SQLDialect.H2),
)
val balloonRepository = BalloonRepository(jooq)
val secretKeyRepository = SecretKeyRepository(jooq)
val volunteerRepository = VolunteerRepository(jooq)
Expand Down
18 changes: 13 additions & 5 deletions src/main/kotlin/org/icpclive/balloons/event/ContestController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.ktor.server.websocket.webSocket
import io.ktor.websocket.Frame
import io.ktor.websocket.readText
import io.ktor.websocket.send
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.serialization.Serializable
Expand Down Expand Up @@ -55,7 +56,12 @@ fun Route.contestController(
}

webSocket("/api/balloons") {
val principal = webSocketAuthenticator.authenticate(this)
val principal =
try {
webSocketAuthenticator.authenticate(this)
} catch (exc: ClosedReceiveChannelException) {
return@webSocket
}

if (principal?.volunteer?.canAccess != true) {
send("""{"error": "access denied"}""")
Expand All @@ -76,7 +82,7 @@ fun Route.contestController(
}
}

runCatching {
try {
incoming.consumeEach { frame ->
if (frame !is Frame.Text) {
return@consumeEach
Expand All @@ -87,9 +93,11 @@ fun Route.contestController(
send("""{"error": "command failed"}""")
}
}
}.onFailure { exception ->
logger.warning { "WebSocket exception: ${exception.localizedMessage}" }
}.also {
} catch (ignored: ClosedReceiveChannelException) {
} catch (exc: Exception) {
logger.warning { "WebSocket exception: ${exc.localizedMessage}" }
throw exc
} finally {
outgoingStream.cancel()
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/main/kotlin/org/icpclive/balloons/event/EventStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ class EventStream(private val balloonRepository: BalloonRepository) {
/**
* @return `true` if command succeeded, `false` otherwise (in case of concurrent modification, etc.)
*/
fun processCommand(
suspend fun processCommand(
command: Command,
volunteerId: Long,
): Boolean =
when (command) {
is BalloonCommand -> processBalloonCommand(command, volunteerId)
}

private fun processBalloonCommand(
private suspend fun processBalloonCommand(
command: BalloonCommand,
volunteerId: Long,
): Boolean {
Expand Down Expand Up @@ -75,7 +75,7 @@ class EventStream(private val balloonRepository: BalloonRepository) {
}

// This can be written in non-concurrent fashion.
fun processRun(runInfo: RunInfo) {
suspend fun processRun(runInfo: RunInfo) {
val runId = runInfo.id.value

val existingRun = runs.value.find { it.runId == runId }
Expand Down Expand Up @@ -108,7 +108,7 @@ class EventStream(private val balloonRepository: BalloonRepository) {
/**
* Recalculates current state by [runs] and commits it to [sink].
*/
private fun synchronizeProblemState(
private suspend fun synchronizeProblemState(
problemId: String,
teamId: String,
) {
Expand All @@ -124,7 +124,7 @@ class EventStream(private val balloonRepository: BalloonRepository) {
}

if (actualRun != null) {
val targetBalloon = actualRun.toBalloon(isFTS = actualRun.runId == actualFTS?.runId).withDelivery()
val targetBalloon = actualRun.toBalloon(isFTS = actualRun.runId == actualFTS?.runId)

if (existingBalloon != targetBalloon) {
updateSink(BalloonUpdated(targetBalloon))
Expand Down Expand Up @@ -173,11 +173,11 @@ class EventStream(private val balloonRepository: BalloonRepository) {
is RunResult.InProgress -> false
}

private fun Run.toBalloon(isFTS: Boolean) =
private suspend fun Run.toBalloon(isFTS: Boolean) =
Balloon(runId, isFTS, teamId, problemId, time)
.withDelivery()

private fun Balloon.withDelivery(): Balloon {
private suspend fun Balloon.withDelivery(): Balloon {
val delivery = balloonRepository.getDelivery(this)
return this.copy(
takenBy = delivery?.get(VOLUNTEER.LOGIN),
Expand Down

0 comments on commit 9b23ca2

Please sign in to comment.