Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File: Add zip compression level #2980

Merged
merged 3 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import akka.util.ByteString
ByteString(endFileWord)

def isStartingByteString(b: ByteString): Boolean =
b.utf8String.startsWith(startFileWord)
b.size >= 7 && b.slice(0, 7).utf8String == startFileWord

def isEndingByteString(b: ByteString): Boolean =
b.utf8String == endFileWord
b.size == 5 && b.utf8String == endFileWord

def getPathFromStartingByteString(b: ByteString): String = {
val splitted = b.utf8String.split(separator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import akka.util.{ByteString, ByteStringBuilder}
* INTERNAL API
*/
@InternalApi private[file] final class ZipArchiveFlowStage(
val shape: FlowShape[ByteString, ByteString]
val shape: FlowShape[ByteString, ByteString],
deflateCompression: Option[Int] = None
) extends GraphStageLogic(shape) {

private def in = shape.in
Expand Down Expand Up @@ -77,12 +78,14 @@ import akka.util.{ByteString, ByteStringBuilder}
}
)

override def preStart(): Unit =
deflateCompression.foreach(l => zip.setLevel(l))
}

/**
* INTERNAL API
*/
@InternalApi private[file] final class ZipArchiveFlow extends GraphStage[FlowShape[ByteString, ByteString]] {
@InternalApi private[file] final class ZipArchiveFlow(deflateCompression: Option[Int] = None) extends GraphStage[FlowShape[ByteString, ByteString]] {

val in: Inlet[ByteString] = Inlet(Logging.simpleName(this) + ".in")
val out: Outlet[ByteString] = Outlet(Logging.simpleName(this) + ".out")
Expand All @@ -93,5 +96,5 @@ import akka.util.{ByteString, ByteStringBuilder}
override val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new ZipArchiveFlowStage(shape)
new ZipArchiveFlowStage(shape, deflateCompression)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import akka.util.ByteString
*/
@InternalApi private[file] object ZipArchiveManager {

def zipFlow(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = {
val archiveZipFlow = new ZipArchiveFlow()
def zipFlow(deflateCompression: Option[Int] = None): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = {
val archiveZipFlow = new ZipArchiveFlow(deflateCompression)
Flow[(ArchiveMetadata, Source[ByteString, Any])]
.flatMapConcat {
case (metadata, stream) =>
Expand Down
12 changes: 10 additions & 2 deletions file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@ object Archive {

/**
* Flow for compressing multiple files into one ZIP file.
*
* @param deflateCompression see [[java.util.zip.Deflater Deflater]]
*/
def zip(): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] =
def zip(deflateCompression: Option[Int]): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] =
Flow
.create[Pair[ArchiveMetadata, Source[ByteString, NotUsed]]]()
.map(func(pair => (pair.first, pair.second.asScala)))
.via(scaladsl.Archive.zip().asJava)
.via(scaladsl.Archive.zip(deflateCompression).asJava)

/**
* Flow for compressing multiple files into one ZIP file.
*/
def zip(): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] =
zip(None)

/**
* Flow for reading ZIP files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@ import java.nio.charset.{Charset, StandardCharsets}
*/
object Archive {

/**
* Flow for compressing multiple files into one ZIP file.
*
* @param deflateCompression see [[java.util.zip.Deflater Deflater]]
*/
def zip(deflateCompression: Option[Int]): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] =
ZipArchiveManager.zipFlow(deflateCompression)

/**
* Flow for compressing multiple files into one ZIP file.
*/
def zip(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] =
ZipArchiveManager.zipFlow()
zip(None)

/**
* Flow for reading ZIP files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import akka.util.ByteString
import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike

import java.util.zip.Deflater

class ZipArchiveFlowTest
extends TestKit(ActorSystem("ziparchive"))
with AnyWordSpecLike
Expand Down Expand Up @@ -40,6 +42,27 @@ class ZipArchiveFlowTest
downstream.expectComplete()
}
}

"compression flag given and stream ends" should {
"emit element only when downstream requests" in {
val (upstream, downstream) =
TestSource[ByteString]()
.via(new ZipArchiveFlow(Some(Deflater.NO_COMPRESSION)))
.toMat(TestSink())(Keep.both)
.run()

upstream.sendNext(FileByteStringSeparators.createStartingByteString("test"))
upstream.sendNext(ByteString(1))
upstream.sendNext(FileByteStringSeparators.createEndingByteString())
upstream.sendComplete()

downstream.request(2)
downstream.expectNextN(2)
downstream.request(1)
downstream.expectNextN(1)
downstream.expectComplete()
}
}
}

override def afterAll(): Unit = {
Expand Down
14 changes: 14 additions & 0 deletions file/src/test/scala/docs/scaladsl/ArchiveSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import java.util.zip.Deflater
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -113,6 +114,19 @@ class ArchiveSpec
archiveHelper.unzip(akkaZipped.futureValue).asScala shouldBe inputFiles
}

"archive files with compression flag" in {
val inputFiles = generateInputFiles(5, 100)
val inputStream = filesToStream(inputFiles)
val zipFlow = Archive.zip(Some(Deflater.NO_COMPRESSION))

val akkaZipped: Future[ByteString] =
inputStream
.via(zipFlow)
.runWith(Sink.fold(ByteString.empty)(_ ++ _))

archiveHelper.unzip(akkaZipped.futureValue).asScala shouldBe inputFiles
}

"unarchive files" in {
val inputFiles = generateInputFiles(5, 100)
val inputStream = filesToStream(inputFiles)
Expand Down