Skip to content

Commit

Permalink
#63 Fix race condition when an attachment is saved
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Jul 25, 2018
1 parent 5fb10d8 commit 4e1c723
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 28 deletions.
62 changes: 35 additions & 27 deletions app/org/elastic4play/services/AttachmentSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package org.elastic4play.services

import java.io.InputStream
import java.nio.file.Files
import javax.inject.{ Inject, Singleton }

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ ExecutionContext, Future }
Expand All @@ -13,16 +12,18 @@ import play.api.libs.json.Json.toJsFieldJsValueWrapper
import play.api.libs.json._

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.{ FileIO, Sink, Source, StreamConverters }
import akka.util.ByteString
import javax.inject.{ Inject, Singleton }

import org.elastic4play.controllers.JsonFormat.{ attachmentInputValueReads, fileInputValueFormat }
import org.elastic4play.controllers.{ AttachmentInputValue, FileInputValue, JsonInputValue }
import org.elastic4play.database.{ DBCreate, DBFind, DBRemove }
import org.elastic4play.models.{ AttributeDef, BaseModelDef, EntityDef, ModelDef, AttributeFormat F }
import org.elastic4play.services.JsonFormat.attachmentFormat
import org.elastic4play.utils.{ Hash, Hasher }
import org.elastic4play.utils.{ Hash, Hasher, RetryOnError }
import org.elastic4play.{ AttributeCheckingError, InvalidFormatAttributeError, MissingAttributeError }

case class Attachment(name: String, hashes: Seq[Hash], size: Long, contentType: String, id: String)
Expand Down Expand Up @@ -54,6 +55,7 @@ class AttachmentSrv(
getSrv: GetSrv,
findSrv: FindSrv,
attachmentModel: AttachmentModel,
implicit val system: ActorSystem,
implicit val ec: ExecutionContext,
implicit val mat: Materializer) {

Expand All @@ -65,6 +67,7 @@ class AttachmentSrv(
dbFind: DBFind,
findSrv: FindSrv,
attachmentModel: AttachmentModel,
system: ActorSystem,
ec: ExecutionContext,
mat: Materializer) =
this(
Expand All @@ -77,6 +80,7 @@ class AttachmentSrv(
getSrv,
findSrv,
attachmentModel,
system,
ec,
mat)

Expand Down Expand Up @@ -123,37 +127,41 @@ class AttachmentSrv(
val hashes = extraHashers.fromByteArray(data)

for {
attachment getSrv[AttachmentModel, AttachmentChunk](attachmentModel, hash + "_0")
.fallbackTo { // it it doesn't exist, create it
Source.fromIterator(() data.grouped(chunkSize))
.zip(Source.unfold(0)(i Some((i + 1) i)))
.mapAsync(5) {
case (buffer, index)
val data = java.util.Base64.getEncoder.encodeToString(buffer)
dbCreate(attachmentModel.modelName, None, Json.obj("binary" data, "_id" s"${hash}_$index"))
}
.runWith(Sink.ignore)
}
.map(_ Attachment(filename, hashes, data.length, contentType, hash))
attachment RetryOnError() {
getSrv[AttachmentModel, AttachmentChunk](attachmentModel, hash + "_0")
.fallbackTo { // it it doesn't exist, create it
Source.fromIterator(() data.grouped(chunkSize))
.zip(Source.unfold(0)(i Some((i + 1) i)))
.mapAsync(5) {
case (buffer, index)
val data = java.util.Base64.getEncoder.encodeToString(buffer)
dbCreate(attachmentModel.modelName, None, Json.obj("binary" data, "_id" s"${hash}_$index"))
}
.runWith(Sink.ignore)
}
.map(_ Attachment(filename, hashes, data.length, contentType, hash))
}
} yield attachment
}

def save(fiv: FileInputValue): Future[Attachment] = {
for {
hash mainHasher.fromPath(fiv.filepath).map(_.head.toString())
hashes extraHashers.fromPath(fiv.filepath)
attachment getSrv[AttachmentModel, AttachmentChunk](attachmentModel, hash + "_0")
.fallbackTo { // it it doesn't exist, create it
FileIO.fromPath(fiv.filepath, chunkSize)
.zip(Source.fromIterator { () Iterator.iterate(0)(_ + 1) })
.mapAsync(5) {
case (buffer, index)
val data = java.util.Base64.getEncoder.encodeToString(buffer.toArray)
dbCreate(attachmentModel.modelName, None, Json.obj("binary" data, "_id" s"${hash}_$index"))
}
.runWith(Sink.ignore)
}
.map { _ Attachment(hash, hashes, fiv) }
attachment RetryOnError() {
getSrv[AttachmentModel, AttachmentChunk](attachmentModel, hash + "_0")
.fallbackTo { // it it doesn't exist, create it
FileIO.fromPath(fiv.filepath, chunkSize)
.zip(Source.fromIterator { () Iterator.iterate(0)(_ + 1) })
.mapAsync(5) {
case (buffer, index)
val data = java.util.Base64.getEncoder.encodeToString(buffer.toArray)
dbCreate(attachmentModel.modelName, None, Json.obj("binary" data, "_id" s"${hash}_$index"))
}
.runWith(Sink.ignore)
}
.map { _ Attachment(hash, hashes, fiv) }
}
} yield attachment
}

Expand Down Expand Up @@ -184,7 +192,7 @@ class AttachmentSrv(
}

def cleanup: Future[Unit] = {
import com.sksamuel.elastic4s.ElasticDsl.{ search, RichString }
import com.sksamuel.elastic4s.ElasticDsl.{ RichString, search }
dbFind(Some("all"), Nil)(index search(index / attachmentModel.modelName).fetchSource(false))._1
.mapConcat(o (o \ "_id").asOpt[String].toList)
.collect { case id if id.endsWith("_0") id.dropRight(2) }
Expand Down
2 changes: 1 addition & 1 deletion app/org/elastic4play/services/TempSrv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class TempSrv @Inject() (
def releaseTemporaryFiles()(implicit authContext: AuthContext): Unit = {
releaseTemporaryFiles(authContext.requestId)
}

def releaseTemporaryFiles(request: RequestHeader): Unit = {
releaseTemporaryFiles(Instance.getRequestId(request))
}
Expand Down
20 changes: 20 additions & 0 deletions app/org/elastic4play/utils/RetryOnError.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.elastic4play.utils

import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.DurationInt

import akka.actor.ActorSystem

object RetryOnError {
def apply[A](cond: Throwable Boolean = _ true, maxRetry: Int = 5, initialDelay: FiniteDuration = 1.second)(body: Future[A])(implicit system: ActorSystem, ec: ExecutionContext): Future[A] = {
body.recoverWith {
case e if maxRetry > 0 && cond(e)
val resultPromise = Promise[A]
system.scheduler.scheduleOnce(initialDelay) {
resultPromise.completeWith(apply(cond, maxRetry - 1, initialDelay * 2)(body))
}
resultPromise.future
}
}
}

0 comments on commit 4e1c723

Please sign in to comment.