diff --git a/app/org/elastic4play/services/AttachmentSrv.scala b/app/org/elastic4play/services/AttachmentSrv.scala index bebd190..99c1db7 100644 --- a/app/org/elastic4play/services/AttachmentSrv.scala +++ b/app/org/elastic4play/services/AttachmentSrv.scala @@ -2,7 +2,6 @@ package org.elastic4play.services import java.io.InputStream import java.nio.file.Files -import javax.inject.{ Inject, Singleton } import scala.concurrent.duration.DurationInt import scala.concurrent.{ ExecutionContext, Future } @@ -13,16 +12,18 @@ import play.api.libs.json.Json.toJsFieldJsValueWrapper import play.api.libs.json._ import akka.NotUsed +import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.{ FileIO, Sink, Source, StreamConverters } import akka.util.ByteString +import javax.inject.{ Inject, Singleton } import org.elastic4play.controllers.JsonFormat.{ attachmentInputValueReads, fileInputValueFormat } import org.elastic4play.controllers.{ AttachmentInputValue, FileInputValue, JsonInputValue } import org.elastic4play.database.{ DBCreate, DBFind, DBRemove } import org.elastic4play.models.{ AttributeDef, BaseModelDef, EntityDef, ModelDef, AttributeFormat ⇒ F } import org.elastic4play.services.JsonFormat.attachmentFormat -import org.elastic4play.utils.{ Hash, Hasher } +import org.elastic4play.utils.{ Hash, Hasher, RetryOnError } import org.elastic4play.{ AttributeCheckingError, InvalidFormatAttributeError, MissingAttributeError } case class Attachment(name: String, hashes: Seq[Hash], size: Long, contentType: String, id: String) @@ -54,6 +55,7 @@ class AttachmentSrv( getSrv: GetSrv, findSrv: FindSrv, attachmentModel: AttachmentModel, + implicit val system: ActorSystem, implicit val ec: ExecutionContext, implicit val mat: Materializer) { @@ -65,6 +67,7 @@ class AttachmentSrv( dbFind: DBFind, findSrv: FindSrv, attachmentModel: AttachmentModel, + system: ActorSystem, ec: ExecutionContext, mat: Materializer) = this( @@ -77,6 +80,7 @@ class AttachmentSrv( getSrv, findSrv, attachmentModel, + system, ec, mat) @@ -123,18 +127,20 @@ class AttachmentSrv( val hashes = extraHashers.fromByteArray(data) for { - attachment ← getSrv[AttachmentModel, AttachmentChunk](attachmentModel, hash + "_0") - .fallbackTo { // it it doesn't exist, create it - Source.fromIterator(() ⇒ data.grouped(chunkSize)) - .zip(Source.unfold(0)(i ⇒ Some((i + 1) → i))) - .mapAsync(5) { - case (buffer, index) ⇒ - val data = java.util.Base64.getEncoder.encodeToString(buffer) - dbCreate(attachmentModel.modelName, None, Json.obj("binary" → data, "_id" → s"${hash}_$index")) - } - .runWith(Sink.ignore) - } - .map(_ ⇒ Attachment(filename, hashes, data.length, contentType, hash)) + attachment ← RetryOnError() { + getSrv[AttachmentModel, AttachmentChunk](attachmentModel, hash + "_0") + .fallbackTo { // it it doesn't exist, create it + Source.fromIterator(() ⇒ data.grouped(chunkSize)) + .zip(Source.unfold(0)(i ⇒ Some((i + 1) → i))) + .mapAsync(5) { + case (buffer, index) ⇒ + val data = java.util.Base64.getEncoder.encodeToString(buffer) + dbCreate(attachmentModel.modelName, None, Json.obj("binary" → data, "_id" → s"${hash}_$index")) + } + .runWith(Sink.ignore) + } + .map(_ ⇒ Attachment(filename, hashes, data.length, contentType, hash)) + } } yield attachment } @@ -142,18 +148,20 @@ class AttachmentSrv( for { hash ← mainHasher.fromPath(fiv.filepath).map(_.head.toString()) hashes ← extraHashers.fromPath(fiv.filepath) - attachment ← getSrv[AttachmentModel, AttachmentChunk](attachmentModel, hash + "_0") - .fallbackTo { // it it doesn't exist, create it - FileIO.fromPath(fiv.filepath, chunkSize) - .zip(Source.fromIterator { () ⇒ Iterator.iterate(0)(_ + 1) }) - .mapAsync(5) { - case (buffer, index) ⇒ - val data = java.util.Base64.getEncoder.encodeToString(buffer.toArray) - dbCreate(attachmentModel.modelName, None, Json.obj("binary" → data, "_id" → s"${hash}_$index")) - } - .runWith(Sink.ignore) - } - .map { _ ⇒ Attachment(hash, hashes, fiv) } + attachment ← RetryOnError() { + getSrv[AttachmentModel, AttachmentChunk](attachmentModel, hash + "_0") + .fallbackTo { // it it doesn't exist, create it + FileIO.fromPath(fiv.filepath, chunkSize) + .zip(Source.fromIterator { () ⇒ Iterator.iterate(0)(_ + 1) }) + .mapAsync(5) { + case (buffer, index) ⇒ + val data = java.util.Base64.getEncoder.encodeToString(buffer.toArray) + dbCreate(attachmentModel.modelName, None, Json.obj("binary" → data, "_id" → s"${hash}_$index")) + } + .runWith(Sink.ignore) + } + .map { _ ⇒ Attachment(hash, hashes, fiv) } + } } yield attachment } @@ -184,7 +192,7 @@ class AttachmentSrv( } def cleanup: Future[Unit] = { - import com.sksamuel.elastic4s.ElasticDsl.{ search, RichString } + import com.sksamuel.elastic4s.ElasticDsl.{ RichString, search } dbFind(Some("all"), Nil)(index ⇒ search(index / attachmentModel.modelName).fetchSource(false))._1 .mapConcat(o ⇒ (o \ "_id").asOpt[String].toList) .collect { case id if id.endsWith("_0") ⇒ id.dropRight(2) } diff --git a/app/org/elastic4play/services/TempSrv.scala b/app/org/elastic4play/services/TempSrv.scala index 8305c9a..8195e5e 100644 --- a/app/org/elastic4play/services/TempSrv.scala +++ b/app/org/elastic4play/services/TempSrv.scala @@ -59,7 +59,7 @@ class TempSrv @Inject() ( def releaseTemporaryFiles()(implicit authContext: AuthContext): Unit = { releaseTemporaryFiles(authContext.requestId) } - + def releaseTemporaryFiles(request: RequestHeader): Unit = { releaseTemporaryFiles(Instance.getRequestId(request)) } diff --git a/app/org/elastic4play/utils/RetryOnError.scala b/app/org/elastic4play/utils/RetryOnError.scala new file mode 100644 index 0000000..a08b767 --- /dev/null +++ b/app/org/elastic4play/utils/RetryOnError.scala @@ -0,0 +1,20 @@ +package org.elastic4play.utils + +import scala.concurrent.{ ExecutionContext, Future, Promise } +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.DurationInt + +import akka.actor.ActorSystem + +object RetryOnError { + def apply[A](cond: Throwable ⇒ Boolean = _ ⇒ true, maxRetry: Int = 5, initialDelay: FiniteDuration = 1.second)(body: ⇒ Future[A])(implicit system: ActorSystem, ec: ExecutionContext): Future[A] = { + body.recoverWith { + case e if maxRetry > 0 && cond(e) ⇒ + val resultPromise = Promise[A] + system.scheduler.scheduleOnce(initialDelay) { + resultPromise.completeWith(apply(cond, maxRetry - 1, initialDelay * 2)(body)) + } + resultPromise.future + } + } +}