Skip to content

Commit

Permalink
fix: prevent room queries with too big collections
Browse files Browse the repository at this point in the history
  • Loading branch information
mytlogos committed Feb 23, 2022
1 parent f0cd8b9 commit 213bc2f
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ class RoomConverter @JvmOverloads constructor(private val loadedData: LoadData =
return this.convert(medium) { inWait: MediumInWait -> this.convert(inWait) }
}

fun convertEditEvents(events: Collection<EditEvent>?): Collection<RoomEditEvent?> {
return this.convert(events) { event: EditEvent? -> this.convert(event) }
fun convertEditEvents(events: Collection<EditEvent>): Collection<RoomEditEvent> {
return this.convert(events) { event: EditEvent -> this.convert(event) }.filterNotNull()
}

fun convertToc(tocs: Collection<Toc>): List<RoomToc> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ import com.mytlogos.enterprise.background.room.model.*
import com.mytlogos.enterprise.background.room.model.RoomExternalMediaList.ExternalListMediaJoin
import com.mytlogos.enterprise.background.room.model.RoomMediaList.MediaListMediaJoin
import com.mytlogos.enterprise.model.*
import com.mytlogos.enterprise.tools.doPartitionedExSuspend
import com.mytlogos.enterprise.tools.doPartitionedSuspend
import com.mytlogos.enterprise.tools.transformFlow
import kotlinx.coroutines.async
import com.mytlogos.enterprise.tools.*
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import org.joda.time.DateTime
Expand Down Expand Up @@ -127,8 +124,7 @@ class RoomStorage(application: Application) : DatabaseStorage {
if (mediaIds.isEmpty()) {
return
}
val converter = RoomConverter()
roomDanglingDao.insertBulk(converter.convertToDangling(mediaIds))
RoomConverter().convertToDangling(mediaIds).doChunked { roomDanglingDao.insertBulk(it) }
}

override suspend fun getListSettingNow(id: Int, isExternal: Boolean): MediaListSetting {
Expand All @@ -142,11 +138,12 @@ class RoomStorage(application: Application) : DatabaseStorage {
}

override suspend fun getSimpleEpisodes(ids: Collection<Int>): List<SimpleEpisode> {
return episodeDao.getSimpleEpisodes(ids)
return ids.mapChunked { episodeDao.getSimpleEpisodes(it) }
}

override suspend fun updateProgress(episodeIds: Collection<Int>, progress: Float) {
episodeDao.updateProgress(episodeIds, progress, DateTime.now())
val now = DateTime.now()
episodeIds.doChunked { episodeDao.updateProgress(it, progress, now) }
}

override fun getReadTodayEpisodes(): Flow<PagingData<ReadEpisode>> {
Expand All @@ -161,7 +158,7 @@ class RoomStorage(application: Application) : DatabaseStorage {
for (id in ids) {
joins.add(MediaListMediaJoin(listId, id))
}
mediaListDao.addJoin(joins)
joins.doChunked { mediaListDao.addJoin(it) }
}

override fun getListSuggestion(name: String): LiveData<MutableList<MediaList>> {
Expand Down Expand Up @@ -192,7 +189,7 @@ class RoomStorage(application: Application) : DatabaseStorage {
}

override suspend fun removeItemFromList(listId: Int, mediumId: Collection<Int>) {
mediaListDao.removeJoin(listId, mediumId)
mediumId.doChunked { mediaListDao.removeJoin(listId, it) }
}

override suspend fun moveItemsToList(oldListId: Int, newListId: Int, ids: Collection<Int>) {
Expand Down Expand Up @@ -244,11 +241,11 @@ class RoomStorage(application: Application) : DatabaseStorage {
override suspend fun insertEditEvent(events: Collection<EditEvent>) {
val converter = RoomConverter()
val roomEditEvent = converter.convertEditEvents(events)
editDao.insertBulk(roomEditEvent)
roomEditEvent.doChunked { editDao.insertBulk(it) }
}

override suspend fun getReadEpisodes(episodeIds: Collection<Int>, read: Boolean): List<Int> {
return episodeDao.getReadEpisodes(episodeIds, read)
return episodeIds.mapChunked { episodeDao.getReadEpisodes(it, read) }
}

override suspend fun getEditEvents(): MutableList<out EditEvent> {
Expand All @@ -257,7 +254,7 @@ class RoomStorage(application: Application) : DatabaseStorage {

override suspend fun removeEditEvents(editEvents: Collection<EditEvent>) {
val converter = RoomConverter()
editDao.deleteBulk(converter.convertEditEvents(editEvents))
converter.convertEditEvents(editEvents).doChunked { editDao.deleteBulk(it) }
}

override suspend fun checkReload(parsedStat: ParsedStat): ReloadStat {
Expand Down Expand Up @@ -363,12 +360,7 @@ class RoomStorage(application: Application) : DatabaseStorage {
true
)
try {
episodeIds.doPartitionedExSuspend { ids: List<Int> ->
async {
episodeDao.updateProgress(ids, 1f, DateTime.now())
false
}
}
episodeIds.doChunked { episodeDao.updateProgress(it, 1f, DateTime.now()) }
} catch (e: Exception) {
e.printStackTrace()
}
Expand All @@ -386,7 +378,7 @@ class RoomStorage(application: Application) : DatabaseStorage {

override suspend fun persistReleases(releases: Collection<ClientRelease>): ClientModelPersister {
val converter = RoomConverter(loadedData)
episodeDao.insertBulkRelease(converter.convertReleases(releases))
converter.convertReleases(releases).doChunked { episodeDao.insertBulkRelease(it) }
return this
}

Expand All @@ -395,8 +387,8 @@ class RoomStorage(application: Application) : DatabaseStorage {
val list = converter.convertEpisodes(filteredEpisodes.newEpisodes)
val update = converter.convertEpisodesClient(filteredEpisodes.updateEpisodes)

episodeDao.insertBulk(list)
episodeDao.updateBulkClient(update)
list.doChunked { episodeDao.insertBulk(it) }
update.doChunked { episodeDao.updateBulkClient(it) }

for (episode in list) {
loadedData.episodes.add(episode.episodeId)
Expand All @@ -406,7 +398,7 @@ class RoomStorage(application: Application) : DatabaseStorage {
value.episodeId,
value.title,
value.url,
value.isLocked,
value.locked,
value.releaseDate
)
})
Expand Down Expand Up @@ -443,8 +435,8 @@ class RoomStorage(application: Application) : DatabaseStorage {
val list = converter.convertMediaList(filteredMediaList.newList)
val update = converter.convertMediaList(filteredMediaList.updateList)

mediaListDao.insertBulk(list)
mediaListDao.updateBulk(update)
list.doChunked { mediaListDao.insertBulk(it) }
update.doChunked { mediaListDao.updateBulk(it) }

for (mediaList in list) {
loadedData.mediaList.add(mediaList.listId)
Expand All @@ -469,8 +461,8 @@ class RoomStorage(application: Application) : DatabaseStorage {
val list = converter.convertExternalMediaList(filteredExtMediaList.newList)
val update = converter.convertExternalMediaList(filteredExtMediaList.updateList)

externalMediaListDao.insertBulk(list)
externalMediaListDao.updateBulk(update)
list.doChunked { externalMediaListDao.insertBulk(it) }
update.doChunked { externalMediaListDao.updateBulk(it) }

for (mediaList in list) {
loadedData.externalMediaList.add(mediaList.externalListId)
Expand All @@ -495,8 +487,8 @@ class RoomStorage(application: Application) : DatabaseStorage {
val newUser = converter.convertExternalUser(filteredExternalUser.newUser)
val updatedUser = converter.convertExternalUser(filteredExternalUser.updateUser)

externalUserDao.insertBulk(newUser)
externalUserDao.updateBulk(updatedUser)
newUser.doChunked { externalUserDao.insertBulk(it) }
updatedUser.doChunked { externalUserDao.updateBulk(it) }

for (user in newUser) {
loadedData.externalUser.add(user.uuid)
Expand All @@ -517,12 +509,12 @@ class RoomStorage(application: Application) : DatabaseStorage {
val updatedMedia = converter.convertSimpleMedia(filteredMedia.updateMedia)

try {
mediumDao.insertBulk(newMedia)
newMedia.doChunked { mediumDao.insertBulk(newMedia) }
} catch (e: SQLiteConstraintException) {
e.printStackTrace()
throw e
}
mediumDao.updateBulk(updatedMedia)
updatedMedia.doChunked { mediumDao.updateBulk(updatedMedia) }

for (medium in newMedia) {
loadedData.media.add(medium.mediumId)
Expand All @@ -543,8 +535,8 @@ class RoomStorage(application: Application) : DatabaseStorage {
}
}

newsDao.insertNews(newNews)
newsDao.updateNews(updatedNews)
newNews.doChunked { newsDao.insertNews(it) }
updatedNews.doChunked { newsDao.updateNews(it) }

for (roomNews in newNews) {
loadedData.news.add(roomNews.newsId)
Expand All @@ -562,8 +554,8 @@ class RoomStorage(application: Application) : DatabaseStorage {
val newParts = converter.convertParts(filteredParts.newParts)
val updatedParts = converter.convertParts(filteredParts.updateParts)

partDao.insertBulk(newParts)
partDao.updateBulk(updatedParts)
newParts.doChunked { partDao.insertBulk(newParts) }
updatedParts.doChunked { partDao.updateBulk(updatedParts) }

for (part in newParts) {
loadedData.part.add(part.partId)
Expand Down Expand Up @@ -605,7 +597,7 @@ class RoomStorage(application: Application) : DatabaseStorage {
override suspend fun persistToDownloads(toDownloads: Collection<ToDownload>): ClientModelPersister {
val roomToDownloads = RoomConverter().convertToDownload(toDownloads)

toDownloadDao.insertBulk(roomToDownloads)
roomToDownloads.doChunked { toDownloadDao.insertBulk(roomToDownloads) }

return this
}
Expand All @@ -632,7 +624,7 @@ class RoomStorage(application: Application) : DatabaseStorage {
}

override suspend fun persistMediaInWait(medium: List<ClientMediumInWait>) {
mediumInWaitDao.insertBulk(RoomConverter().convertClientMediaInWait(medium))
RoomConverter().convertClientMediaInWait(medium).doChunked { mediumInWaitDao.insertBulk(it) }
}

override suspend fun persist(user: ClientSimpleUser?): ClientModelPersister {
Expand All @@ -658,8 +650,7 @@ class RoomStorage(application: Application) : DatabaseStorage {
}

override suspend fun deleteLeftoverEpisodes(partEpisodes: Map<Int, List<Int>>) {
val partIds = partEpisodes.keys
val episodes = episodeDao.getEpisodes(partIds)
val episodes = partEpisodes.keys.mapChunked { episodeDao.getEpisodes(it) }
val deleteEpisodes: List<Int> = episodes.mapNotNull { roomPartEpisode ->
val episodeIds = partEpisodes[roomPartEpisode.partId]

Expand All @@ -669,18 +660,11 @@ class RoomStorage(application: Application) : DatabaseStorage {
return@mapNotNull null
}
}
coroutineScope {
deleteEpisodes.doPartitionedSuspend { ids: List<Int> ->
async {
episodeDao.deletePerId(ids)
false
}
}
}
deleteEpisodes.doChunked { ids: List<Int> -> episodeDao.deletePerId(ids) }
}

override suspend fun deleteLeftoverReleases(partReleases: Map<Int, List<ClientSimpleRelease>>): Collection<Int> {
val roomReleases = episodeDao.getReleases(partReleases.keys)
val roomReleases = partReleases.keys.mapChunked { episodeDao.getReleases(it) }
val deleteRelease: MutableList<RoomRelease> = LinkedList()
val now = DateTime.now()
val unmatchedReleases: MutableCollection<ClientSimpleRelease> = HashSet()
Expand Down Expand Up @@ -709,26 +693,26 @@ class RoomStorage(application: Application) : DatabaseStorage {
for (release in unmatchedReleases) {
episodesToLoad.add(release.episodeId)
}
episodeDao.deleteBulkRelease(deleteRelease)
deleteRelease.doChunked { episodeDao.deleteBulkRelease(it) }
return episodesToLoad
}

override suspend fun deleteLeftoverTocs(mediaTocs: Map<Int, List<String>>) {
val previousTocs = tocDao.getTocs(mediaTocs.keys)
val previousTocs = mediaTocs.keys.mapChunked { tocDao.getTocs(it) }
val removeTocs: MutableList<RoomToc> = ArrayList()
for (entry in previousTocs) {
val currentTocLinks = mediaTocs[entry.mediumId]
if (currentTocLinks == null || !currentTocLinks.contains(entry.link)) {
removeTocs.add(entry)
}
}
tocDao.deleteBulk(removeTocs)
removeTocs.doChunked { tocDao.deleteBulk(it) }
}

override suspend fun persistTocs(tocs: Collection<Toc>): ClientModelPersister {
val roomTocs = RoomConverter().convertToc(tocs)

tocDao.insertBulk(roomTocs)
roomTocs.doChunked { tocDao.insertBulk(it) }

return this
}
Expand Down Expand Up @@ -793,13 +777,13 @@ class RoomStorage(application: Application) : DatabaseStorage {
deletedExLists.add(roomListUser.listId)
}
}
externalMediaListDao.removeJoin(toDeleteExternalJoins)
mediaListDao.removeJoin(toDeleteInternalJoins)
externalMediaListDao.addJoin(newExternalJoins)
mediaListDao.addJoin(newInternalJoins)
externalMediaListDao.delete(deletedExLists)
mediaListDao.delete(deletedLists)
externalUserDao.delete(deletedExUser)
toDeleteExternalJoins.doChunked { externalMediaListDao.removeJoin(it) }
toDeleteInternalJoins.doChunked { mediaListDao.removeJoin(it) }
newExternalJoins.doChunked { externalMediaListDao.addJoin(it) }
newInternalJoins.doChunked { mediaListDao.addJoin(it) }
deletedExLists.doChunked { externalMediaListDao.delete(it) }
deletedLists.doChunked { mediaListDao.delete(it) }
deletedExUser.doChunked { externalUserDao.delete(it) }
return this
}

Expand Down
70 changes: 69 additions & 1 deletion app/src/main/java/com/mytlogos/enterprise/tools/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import kotlinx.coroutines.flow.Flow
import retrofit2.Response
import java.io.IOException
import java.net.URI
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.regex.Pattern
import kotlin.collections.ArrayList

fun getDomain(url: String?): String? {
val host = URI.create(url).host ?: return null
Expand Down Expand Up @@ -85,6 +85,39 @@ suspend fun <E : Any> Collection<E>.doPartitionedExSuspend(
} while (minItem < list.size && maxItem <= list.size)
}

@Throws(Exception::class)
suspend fun <E : Any, R: Any> Collection<E>.mapPartitionedExSuspend(
consumer: (List<E>) -> Deferred<Pair<List<R>, Boolean?>>,
): List<R> {
val list: List<E> = ArrayList(this)
val result: MutableList<R> = ArrayList()
val steps = 100
var minItem = 0
var maxItem = minItem + steps

do {
if (maxItem > list.size) {
maxItem = list.size
}
val subList = list.subList(minItem, maxItem)
val (midResult, retry) = consumer(subList).await()
result.addAll(midResult)

if (retry == true) {
continue
} else if (retry == null) {
break
}
minItem += steps
maxItem = minItem + steps
if (maxItem > list.size) {
maxItem = list.size
}
} while (minItem < list.size && maxItem <= list.size)

return result
}


@Throws(Exception::class)
fun <E : Any> Collection<E>.doPartitionedEx(consumer: (List<E>) -> Boolean?) {
Expand Down Expand Up @@ -202,6 +235,41 @@ suspend fun <T : Any> Collection<T>.doPartitionedRethrowSuspend(
}
}

@Throws(IOException::class)
suspend fun <T : Any, R: Any> Collection<T>.mapPartitionedSuspend(
functionEx: (List<T>) -> Deferred<Pair<List<R>, Boolean?>>,
): List<R> {
try {
return this.mapPartitionedExSuspend(functionEx)
} catch (e: IOException) {
throw e
} catch (e: Exception) {
throw RuntimeException(e)
}
}

suspend fun <T : Any, R: Any> Collection<T>.mapChunked(
functionEx: suspend (List<T>) -> List<R>,
): List<R> {
val that = this
val result = ArrayList<R>()

for (chunk in that.chunked(100)) {
result.addAll(functionEx(chunk))
}

return result
}

suspend fun <T : Any> Collection<T>.doChunked(
functionEx: suspend (List<T>) -> Unit,
) {
val that = this
for (chunk in that.chunked(100)) {
functionEx(chunk)
}
}


fun <T> Collection<CompletableFuture<T>>.finishAll(): CompletableFuture<List<T>> {
val allFuturesResult =
Expand Down

0 comments on commit 213bc2f

Please sign in to comment.