Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updater cleanup and improvements #416

Merged
merged 1 commit into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package suwayomi.tachidesk.manga.controller

import io.javalin.http.HttpCode
import io.javalin.websocket.WsConfig
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import org.kodein.di.DI
import org.kodein.di.conf.global
Expand All @@ -15,6 +14,7 @@ import suwayomi.tachidesk.manga.impl.update.UpdateStatus
import suwayomi.tachidesk.manga.impl.update.UpdaterSocket
import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass
import suwayomi.tachidesk.manga.model.dataclass.MangaChapterDataClass
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
import suwayomi.tachidesk.manga.model.dataclass.PaginatedList
import suwayomi.tachidesk.server.JavalinSetup.future
import suwayomi.tachidesk.server.util.formParam
Expand Down Expand Up @@ -68,22 +68,18 @@ object UpdateController {
}
},
behaviorOf = { ctx, categoryId ->
val categoriesForUpdate = ArrayList<CategoryDataClass>()
if (categoryId == null) {
logger.info { "Adding Library to Update Queue" }
categoriesForUpdate.addAll(Category.getCategoryList())
addCategoriesToUpdateQueue(Category.getCategoryList(), true)
} else {
val category = Category.getCategoryById(categoryId)
if (category != null) {
categoriesForUpdate.add(category)
addCategoriesToUpdateQueue(listOf(category), true)
} else {
logger.info { "No Category found" }
ctx.status(HttpCode.BAD_REQUEST)
return@handler
}
}
addCategoriesToUpdateQueue(categoriesForUpdate, true)
ctx.status(HttpCode.OK)
},
withResults = {
httpCode(HttpCode.OK)
Expand All @@ -94,14 +90,15 @@ object UpdateController {
private fun addCategoriesToUpdateQueue(categories: List<CategoryDataClass>, clear: Boolean = false) {
val updater by DI.global.instance<IUpdater>()
if (clear) {
runBlocking { updater.reset() }
updater.reset()
}
categories.forEach { category ->
val mangas = CategoryManga.getCategoryMangaList(category.id)
mangas.forEach { manga ->
categories
.flatMap { CategoryManga.getCategoryMangaList(it.id) }
.distinctBy { it.id }
.sortedWith(compareBy(String.CASE_INSENSITIVE_ORDER, MangaDataClass::title))
.forEach { manga ->
updater.addMangaToQueue(manga)
}
}
}

fun categoryUpdateWS(ws: WsConfig) {
Expand All @@ -125,7 +122,7 @@ object UpdateController {
},
behaviorOf = { ctx ->
val updater by DI.global.instance<IUpdater>()
ctx.json(updater.getStatus().value.getJsonSummary())
ctx.json(updater.status.value)
},
withResults = {
json<UpdateStatus>(HttpCode.OK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass

interface IUpdater {
fun addMangaToQueue(manga: MangaDataClass)
fun getStatus(): StateFlow<UpdateStatus>
suspend fun reset(): Unit
val status: StateFlow<UpdateStatus>
fun reset()
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ enum class JobStatus {
FAILED
}

class UpdateJob(val manga: MangaDataClass, var status: JobStatus = JobStatus.PENDING) {

override fun toString(): String {
return "UpdateJob(status=$status, manga=${manga.title})"
}
}
data class UpdateJob(
val manga: MangaDataClass,
val status: JobStatus = JobStatus.PENDING
)
Original file line number Diff line number Diff line change
@@ -1,33 +1,23 @@
package suwayomi.tachidesk.manga.impl.update

import com.fasterxml.jackson.annotation.JsonIgnore
import mu.KotlinLogging
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass

var logger = KotlinLogging.logger {}
class UpdateStatus(
var statusMap: MutableMap<JobStatus, MutableList<MangaDataClass>> = mutableMapOf<JobStatus, MutableList<MangaDataClass>>(),
var running: Boolean = false,
val logger = KotlinLogging.logger {}
data class UpdateStatus(
val statusMap: Map<JobStatus, List<MangaDataClass>> = emptyMap(),
val running: Boolean = false,
@JsonIgnore
val numberOfJobs: Int = 0
) {
var numberOfJobs: Int = 0

constructor(jobs: List<UpdateJob>, running: Boolean) : this(
mutableMapOf<JobStatus, MutableList<MangaDataClass>>(),
running
) {
this.numberOfJobs = jobs.size
jobs.forEach {
val list = statusMap.getOrDefault(it.status, mutableListOf())
list.add(it.manga)
statusMap[it.status] = list
}
}

override fun toString(): String {
return "UpdateStatus(statusMap=${statusMap.map { "${it.key} : ${it.value.size}" }.joinToString("; ")}, running=$running)"
}

// serialize to summary json
fun getJsonSummary(): String {
return """{"statusMap":{${statusMap.map { "\"${it.key}\" : ${it.value.size}" }.joinToString(",")}}, "running":$running}"""
}
statusMap = jobs.groupBy { it.status }
.mapValues { entry ->
entry.value.map { it.manga }
},
running = running,
numberOfJobs = jobs.size
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,74 +3,76 @@ package suwayomi.tachidesk.manga.impl.update
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import mu.KotlinLogging
import suwayomi.tachidesk.manga.impl.Chapter
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
import java.util.concurrent.ConcurrentHashMap

class Updater : IUpdater {
private val logger = KotlinLogging.logger {}
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)

private var tracker = mutableMapOf<String, UpdateJob>()
private var updateChannel = Channel<UpdateJob>()
private val statusChannel = MutableStateFlow(UpdateStatus())
private var updateJob: Job? = null
private val _status = MutableStateFlow(UpdateStatus())
override val status = _status.asStateFlow()

init {
updateJob = createUpdateJob()
}
private val tracker = ConcurrentHashMap<Int, UpdateJob>()
private var updateChannel = createUpdateChannel()

private fun createUpdateJob(): Job {
return scope.launch {
while (true) {
val job = updateChannel.receive()
process(job)
statusChannel.value = UpdateStatus(tracker.values.toList(), !updateChannel.isEmpty)
private fun createUpdateChannel(): Channel<UpdateJob> {
val channel = Channel<UpdateJob>(Channel.UNLIMITED)
channel.consumeAsFlow()
.onEach { job ->
_status.value = UpdateStatus(
process(job),
tracker.any { (_, job) ->
job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING
}
)
}
}
.catch { logger.error(it) { "Error during updates" } }
.launchIn(scope)
return channel
}

private suspend fun process(job: UpdateJob) {
job.status = JobStatus.RUNNING
tracker["${job.manga.id}"] = job
statusChannel.value = UpdateStatus(tracker.values.toList(), true)
try {
private suspend fun process(job: UpdateJob): List<UpdateJob> {
tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING)
_status.update { UpdateStatus(tracker.values.toList(), true) }
tracker[job.manga.id] = try {
logger.info { "Updating ${job.manga.title}" }
Chapter.getChapterList(job.manga.id, true)
job.status = JobStatus.COMPLETE
job.copy(status = JobStatus.COMPLETE)
} catch (e: Exception) {
if (e is CancellationException) throw e
logger.error(e) { "Error while updating ${job.manga.title}" }
job.status = JobStatus.FAILED
job.copy(status = JobStatus.FAILED)
}
tracker["${job.manga.id}"] = job
return tracker.values.toList()
}

override fun addMangaToQueue(manga: MangaDataClass) {
scope.launch {
updateChannel.send(UpdateJob(manga))
}
tracker["${manga.id}"] = UpdateJob(manga)
statusChannel.value = UpdateStatus(tracker.values.toList(), true)
}

override fun getStatus(): StateFlow<UpdateStatus> {
return statusChannel
tracker[manga.id] = UpdateJob(manga)
_status.update { UpdateStatus(tracker.values.toList(), true) }
}

override suspend fun reset() {
override fun reset() {
scope.coroutineContext.cancelChildren()
tracker.clear()
_status.update { UpdateStatus() }
updateChannel.cancel()
statusChannel.value = UpdateStatus()
updateJob?.cancel("Reset")
updateChannel = Channel()
updateJob = createUpdateJob()
updateChannel = createUpdateChannel()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,26 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.launch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import mu.KotlinLogging
import org.kodein.di.DI
import org.kodein.di.conf.global
import org.kodein.di.instance

object UpdaterSocket : Websocket() {
object UpdaterSocket : Websocket<UpdateStatus>() {
private val logger = KotlinLogging.logger {}
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val updater by DI.global.instance<IUpdater>()
private var job: Job? = null

override fun notifyClient(ctx: WsContext) {
ctx.send(updater.getStatus().value.getJsonSummary())
override fun notifyClient(ctx: WsContext, value: UpdateStatus?) {
ctx.send(value ?: updater.status.value)
}

override fun handleRequest(ctx: WsMessageContext) {
when (ctx.message()) {
"STATUS" -> notifyClient(ctx)
"STATUS" -> notifyClient(ctx, updater.status.value)
else -> ctx.send(
"""
|Invalid command.
Expand All @@ -40,7 +40,7 @@ object UpdaterSocket : Websocket() {
override fun addClient(ctx: WsContext) {
logger.info { ctx.sessionId }
super.addClient(ctx)
if (job == null) {
if (job?.isActive != true) {
job = start()
}
}
Expand All @@ -54,12 +54,10 @@ object UpdaterSocket : Websocket() {
}

fun start(): Job {
return scope.launch {
while (true) {
updater.getStatus().collectLatest {
notifyAllClients()
}
return updater.status
.onEach {
notifyAllClients(it)
}
}
.launchIn(scope)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ import io.javalin.websocket.WsContext
import io.javalin.websocket.WsMessageContext
import java.util.concurrent.ConcurrentHashMap

abstract class Websocket {
abstract class Websocket<T> {
protected val clients = ConcurrentHashMap<String, WsContext>()
open fun addClient(ctx: WsContext) {
clients[ctx.sessionId] = ctx
notifyClient(ctx)
notifyClient(ctx, null)
}
open fun removeClient(ctx: WsContext) {
clients.remove(ctx.sessionId)
}
open fun notifyAllClients() {
clients.values.forEach { notifyClient(it) }
open fun notifyAllClients(value: T) {
clients.values.forEach { notifyClient(it, value) }
}
abstract fun notifyClient(ctx: WsContext)
abstract fun notifyClient(ctx: WsContext, value: T?)
abstract fun handleRequest(ctx: WsMessageContext)
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal class UpdateControllerTest : ApplicationTest() {
UpdateController.categoryUpdate(ctx)
verify { ctx.status(HttpCode.BAD_REQUEST) }
val updater by DI.global.instance<IUpdater>()
assertEquals(0, updater.getStatus().value.numberOfJobs)
assertEquals(0, updater.status.value.numberOfJobs)
}

@Test
Expand All @@ -44,7 +44,7 @@ internal class UpdateControllerTest : ApplicationTest() {
UpdateController.categoryUpdate(ctx)
verify { ctx.status(HttpCode.OK) }
val updater by DI.global.instance<IUpdater>()
assertEquals(1, updater.getStatus().value.numberOfJobs)
assertEquals(1, updater.status.value.numberOfJobs)
}

@Test
Expand All @@ -60,7 +60,7 @@ internal class UpdateControllerTest : ApplicationTest() {
UpdateController.categoryUpdate(ctx)
verify { ctx.status(HttpCode.OK) }
val updater by DI.global.instance<IUpdater>()
assertEquals(3, updater.getStatus().value.numberOfJobs)
assertEquals(3, updater.status.value.numberOfJobs)
}

private fun createLibraryManga(
Expand Down
Loading