From ed79baa29ec83326b6aa962771b86fd9a8117e20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ga=C3=ABl=20Ferrachat?= Date: Tue, 23 May 2023 13:06:58 +0200 Subject: [PATCH 1/3] Add deflate compression level in zip flow #2766 --- .../file/impl/archive/ZipArchiveFlow.scala | 9 +++++--- .../file/impl/archive/ZipArchiveManager.scala | 4 ++-- .../stream/alpakka/file/javadsl/Archive.scala | 12 ++++++++-- .../alpakka/file/scaladsl/Archive.scala | 10 +++++++- .../impl/archive/ZipArchiveFlowTest.scala | 23 +++++++++++++++++++ .../scala/docs/scaladsl/ArchiveSpec.scala | 14 +++++++++++ 6 files changed, 64 insertions(+), 8 deletions(-) diff --git a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlow.scala b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlow.scala index 75cdee5dae..c113aa1276 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlow.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlow.scala @@ -16,7 +16,8 @@ import akka.util.{ByteString, ByteStringBuilder} * INTERNAL API */ @InternalApi private[file] final class ZipArchiveFlowStage( - val shape: FlowShape[ByteString, ByteString] + val shape: FlowShape[ByteString, ByteString], + deflateCompression: Option[Int] = None ) extends GraphStageLogic(shape) { private def in = shape.in @@ -77,12 +78,14 @@ import akka.util.{ByteString, ByteStringBuilder} } ) + override def preStart(): Unit = + deflateCompression.foreach(l => zip.setLevel(l)) } /** * INTERNAL API */ -@InternalApi private[file] final class ZipArchiveFlow extends GraphStage[FlowShape[ByteString, ByteString]] { +@InternalApi private[file] final class ZipArchiveFlow(deflateCompression: Option[Int] = None) extends GraphStage[FlowShape[ByteString, ByteString]] { val in: Inlet[ByteString] = Inlet(Logging.simpleName(this) + ".in") val out: Outlet[ByteString] = Outlet(Logging.simpleName(this) + ".out") @@ -93,5 +96,5 @@ import akka.util.{ByteString, ByteStringBuilder} override val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new ZipArchiveFlowStage(shape) + new ZipArchiveFlowStage(shape, deflateCompression) } diff --git a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveManager.scala b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveManager.scala index 60165ea9a9..7db4c119cb 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveManager.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveManager.scala @@ -15,8 +15,8 @@ import akka.util.ByteString */ @InternalApi private[file] object ZipArchiveManager { - def zipFlow(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = { - val archiveZipFlow = new ZipArchiveFlow() + def zipFlow(deflateCompression: Option[Int] = None): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = { + val archiveZipFlow = new ZipArchiveFlow(deflateCompression) Flow[(ArchiveMetadata, Source[ByteString, Any])] .flatMapConcat { case (metadata, stream) => diff --git a/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala b/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala index d7a7f86855..d2906edeb6 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala @@ -22,12 +22,20 @@ object Archive { /** * Flow for compressing multiple files into one ZIP file. + * + * @param deflateCompression see [[java.util.zip.Deflater Deflater]] */ - def zip(): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] = + def zip(deflateCompression: Option[Int]): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] = Flow .create[Pair[ArchiveMetadata, Source[ByteString, NotUsed]]]() .map(func(pair => (pair.first, pair.second.asScala))) - .via(scaladsl.Archive.zip().asJava) + .via(scaladsl.Archive.zip(deflateCompression).asJava) + + /** + * Flow for compressing multiple files into one ZIP file. + */ + def zip(): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] = + zip(None) /** * Flow for reading ZIP files. diff --git a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Archive.scala b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Archive.scala index 85ba3a0992..2be2b0837b 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Archive.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/scaladsl/Archive.scala @@ -18,11 +18,19 @@ import java.nio.charset.{Charset, StandardCharsets} */ object Archive { + /** + * Flow for compressing multiple files into one ZIP file. + * + * @param deflateCompression see [[java.util.zip.Deflater Deflater]] + */ + def zip(deflateCompression: Option[Int]): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = + ZipArchiveManager.zipFlow(deflateCompression) + /** * Flow for compressing multiple files into one ZIP file. */ def zip(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = - ZipArchiveManager.zipFlow() + zip(None) /** * Flow for reading ZIP files. diff --git a/file/src/test/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlowTest.scala b/file/src/test/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlowTest.scala index 9eb1f4b1f5..2285da16c6 100644 --- a/file/src/test/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlowTest.scala +++ b/file/src/test/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlowTest.scala @@ -13,6 +13,8 @@ import akka.util.ByteString import org.scalatest.BeforeAndAfterAll import org.scalatest.wordspec.AnyWordSpecLike +import java.util.zip.Deflater + class ZipArchiveFlowTest extends TestKit(ActorSystem("ziparchive")) with AnyWordSpecLike @@ -40,6 +42,27 @@ class ZipArchiveFlowTest downstream.expectComplete() } } + + "compression flag given and stream ends" should { + "emit element only when downstream requests" in { + val (upstream, downstream) = + TestSource[ByteString]() + .via(new ZipArchiveFlow(Some(Deflater.NO_COMPRESSION))) + .toMat(TestSink())(Keep.both) + .run() + + upstream.sendNext(FileByteStringSeparators.createStartingByteString("test")) + upstream.sendNext(ByteString(1)) + upstream.sendNext(FileByteStringSeparators.createEndingByteString()) + upstream.sendComplete() + + downstream.request(2) + downstream.expectNextN(2) + downstream.request(1) + downstream.expectNextN(1) + downstream.expectComplete() + } + } } override def afterAll(): Unit = { diff --git a/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala b/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala index 2cceb69971..0bc3a11cc3 100644 --- a/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala +++ b/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala @@ -20,6 +20,7 @@ import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike +import java.util.zip.Deflater import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} @@ -113,6 +114,19 @@ class ArchiveSpec archiveHelper.unzip(akkaZipped.futureValue).asScala shouldBe inputFiles } + "archive files with compression flag" in { + val inputFiles = generateInputFiles(5, 100) + val inputStream = filesToStream(inputFiles) + val zipFlow = Archive.zip(Some(Deflater.NO_COMPRESSION)) + + val akkaZipped: Future[ByteString] = + inputStream + .via(zipFlow) + .runWith(Sink.fold(ByteString.empty)(_ ++ _)) + + archiveHelper.unzip(akkaZipped.futureValue).asScala shouldBe inputFiles + } + "unarchive files" in { val inputFiles = generateInputFiles(5, 100) val inputStream = filesToStream(inputFiles) From b38da063a626978fa78e4e269f67e1175359b9ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ga=C3=ABl=20Ferrachat?= Date: Tue, 23 May 2023 13:12:53 +0200 Subject: [PATCH 2/3] Avoid unnecessary ByteString operation in zip flow * Elements pushed to zip flow might be of any size. Decoding them completly using utf8 just to check a few bytes at the beginning can be a big loss. --- .../alpakka/file/impl/archive/FileByteStringSeparators.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/FileByteStringSeparators.scala b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/FileByteStringSeparators.scala index ec9284eba9..f3cdbfdd2e 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/FileByteStringSeparators.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/FileByteStringSeparators.scala @@ -25,10 +25,10 @@ import akka.util.ByteString ByteString(endFileWord) def isStartingByteString(b: ByteString): Boolean = - b.utf8String.startsWith(startFileWord) + b.size >= 7 && b.slice(0, 7).utf8String == startFileWord def isEndingByteString(b: ByteString): Boolean = - b.utf8String == endFileWord + b.size == 5 && b.utf8String == endFileWord def getPathFromStartingByteString(b: ByteString): String = { val splitted = b.utf8String.split(separator) From 3898df76ecc752841d634663dab797950493c195 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Fri, 9 Jun 2023 13:02:26 +0200 Subject: [PATCH 3/3] scalafmtAll --- .../stream/alpakka/file/impl/archive/ZipArchiveFlow.scala | 3 ++- .../stream/alpakka/file/impl/archive/ZipArchiveManager.scala | 4 +++- .../main/scala/akka/stream/alpakka/file/javadsl/Archive.scala | 4 +++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlow.scala b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlow.scala index c113aa1276..df89fd234f 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlow.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveFlow.scala @@ -85,7 +85,8 @@ import akka.util.{ByteString, ByteStringBuilder} /** * INTERNAL API */ -@InternalApi private[file] final class ZipArchiveFlow(deflateCompression: Option[Int] = None) extends GraphStage[FlowShape[ByteString, ByteString]] { +@InternalApi private[file] final class ZipArchiveFlow(deflateCompression: Option[Int] = None) + extends GraphStage[FlowShape[ByteString, ByteString]] { val in: Inlet[ByteString] = Inlet(Logging.simpleName(this) + ".in") val out: Outlet[ByteString] = Outlet(Logging.simpleName(this) + ".out") diff --git a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveManager.scala b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveManager.scala index 7db4c119cb..ac383bade0 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveManager.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/impl/archive/ZipArchiveManager.scala @@ -15,7 +15,9 @@ import akka.util.ByteString */ @InternalApi private[file] object ZipArchiveManager { - def zipFlow(deflateCompression: Option[Int] = None): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = { + def zipFlow( + deflateCompression: Option[Int] = None + ): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = { val archiveZipFlow = new ZipArchiveFlow(deflateCompression) Flow[(ArchiveMetadata, Source[ByteString, Any])] .flatMapConcat { diff --git a/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala b/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala index d2906edeb6..671a8e186a 100644 --- a/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala +++ b/file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala @@ -25,7 +25,9 @@ object Archive { * * @param deflateCompression see [[java.util.zip.Deflater Deflater]] */ - def zip(deflateCompression: Option[Int]): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] = + def zip( + deflateCompression: Option[Int] + ): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] = Flow .create[Pair[ArchiveMetadata, Source[ByteString, NotUsed]]]() .map(func(pair => (pair.first, pair.second.asScala)))