Skip to content

Commit

Permalink
Streaming multipart/form-data (#2062)
Browse files Browse the repository at this point in the history
* Streaming multipart/form-data

* Streaming encoding of multipart/form-data and other fixes

* Fix

* Fix 3.2.2

* Update zio-http/src/main/scala/zio/http/forms/Form.scala

Co-authored-by: John A. De Goes <john@degoes.net>

* Fix collectAll/collect

---------

Co-authored-by: John A. De Goes <john@degoes.net>
  • Loading branch information
vigoo and jdegoes authored Mar 27, 2023
1 parent 6024ea0 commit 633d862
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 120 deletions.
24 changes: 22 additions & 2 deletions zio-http/src/main/scala/zio/http/Body.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,29 @@ trait Body { self =>
def asMultipartForm(implicit trace: Trace): Task[Form] =
for {
bytes <- asChunk
form <- Form.fromMultipartBytes(bytes, Charsets.Http).mapError(_.asException)
form <- Form.fromMultipartBytes(bytes, Charsets.Http)
} yield form

/**
* Returns an effect that decodes the streaming body as a multipart form.
*
* The result is a stream of FormData objects, where each FormData may be a
* StreamingBinary or a Text object. The StreamingBinary object contains a
* stream of bytes, which has to be consumed asynchronously by the user to get
* the next FormData from the stream.
*/
def asMultipartFormStream(implicit trace: Trace): Task[StreamingForm] =
boundary match {
case Some(boundary) =>
ZIO.succeed(
StreamingForm(asStream, Boundary(boundary.toString), Charsets.Http),
)
case None =>
ZIO.fail(
new IllegalStateException("Cannot decode body as streaming multipart/form-data without a known boundary"),
)
}

/**
* Returns a stream that contains the bytes of the body. This method is safe
* to use with large bodies, because the elements of the returned stream are
Expand Down Expand Up @@ -138,7 +158,7 @@ object Body {
case Some(value) => form.encodeAsMultipartBytes(charset, value)
case None => form.encodeAsMultipartBytes(charset)
}
ChunkBody(bytes).withContentType(MediaType.multipart.`form-data`, Some(boundary))
StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(boundary))
}

/**
Expand Down
132 changes: 71 additions & 61 deletions zio-http/src/main/scala/zio/http/forms/Form.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ final case class Form(formData: Chunk[FormData]) {

def append(field: FormData): Form = Form(formData :+ field)

/**
* Runs all streaming form data and stores them in memory, returning a Form
* that has no streaming parts
*/
def collectAll: ZIO[Any, Throwable, Form] =
ZIO
.foreach(formData) {
case streamingBinary: StreamingBinary =>
streamingBinary.collect
case other =>
ZIO.succeed(other)
}
.map(Form(_))

def get(name: String): Option[FormData] = formData.find(_.name == name)

def encodeAsURLEncoded(charset: Charset = StandardCharsets.UTF_8): String = {
Expand All @@ -61,64 +75,82 @@ final case class Form(formData: Chunk[FormData]) {

def encodeAsMultipartBytes(
charset: Charset = StandardCharsets.UTF_8,
rng: () => String = () => new SecureRandom().nextLong().toString(),
): (CharSequence, Chunk[Byte]) =
rng: () => String = () => new SecureRandom().nextLong().toString,
): (CharSequence, ZStream[Any, Nothing, Byte]) =
encodeAsMultipartBytes(charset, Boundary.generate(rng))

def encodeAsMultipartBytes(
charset: Charset,
boundary: Boundary,
): (CharSequence, Chunk[Byte]) = {
): (CharSequence, ZStream[Any, Nothing, Byte]) = {

val encapsulatingBoundary = EncapsulatingBoundary(boundary)
val closingBoundary = ClosingBoundary(boundary)

val ast = formData.flatMap {
val astStreams = formData.map {
case fd @ Simple(name, value) =>
Chunk(
encapsulatingBoundary,
EoL,
Header.contentDisposition(name),
EoL,
Header.contentType(fd.contentType),
EoL,
EoL,
Content(Chunk.fromArray(value.getBytes(charset))),
EoL,
ZStream.fromChunk(
Chunk(
encapsulatingBoundary,
EoL,
Header.contentDisposition(name),
EoL,
Header.contentType(fd.contentType),
EoL,
EoL,
Content(Chunk.fromArray(value.getBytes(charset))),
EoL,
),
)

case Text(name, value, contentType, filename) =>
Chunk(
encapsulatingBoundary,
EoL,
Header.contentDisposition(name, filename),
EoL,
Header.contentType(contentType),
EoL,
EoL,
Content(Chunk.fromArray(value.getBytes(charset))),
EoL,
ZStream.fromChunk(
Chunk(
encapsulatingBoundary,
EoL,
Header.contentDisposition(name, filename),
EoL,
Header.contentType(contentType),
EoL,
EoL,
Content(Chunk.fromArray(value.getBytes(charset))),
EoL,
),
)
case Binary(name, data, contentType, transferEncoding, filename) =>
val xferEncoding =
transferEncoding.map(enc => Chunk(Header.contentTransferEncoding(enc), EoL)).getOrElse(Chunk.empty)

Chunk(
encapsulatingBoundary,
EoL,
Header.contentDisposition(name, filename),
EoL,
Header.contentType(contentType),
EoL,
) ++ xferEncoding ++
ZStream.fromChunk(
Chunk(
encapsulatingBoundary,
EoL,
Header.contentDisposition(name, filename),
EoL,
Header.contentType(contentType),
EoL,
) ++ xferEncoding ++ Chunk(EoL, Content(data), EoL),
)

case StreamingBinary(name, contentType, transferEncoding, filename, data) =>
val xferEncoding =
transferEncoding.map(enc => Chunk(Header.contentTransferEncoding(enc), EoL)).getOrElse(Chunk.empty)

ZStream.fromChunk(
Chunk(
encapsulatingBoundary,
EoL,
Content(data),
Header.contentDisposition(name, filename),
EoL,
)
} ++ Chunk(closingBoundary, EoL)
Header.contentType(contentType),
EoL,
) ++ xferEncoding :+ EoL,
) ++ data.chunks.map(Content(_)) ++ ZStream(EoL)
}

boundary.id -> ast.flatMap(_.bytes)
val stream = ZStream.fromChunk(astStreams).flatten ++ ZStream.fromChunk(Chunk(closingBoundary, EoL))

boundary.id -> stream.map(_.bytes).flattenChunks
}
}

Expand All @@ -133,35 +165,13 @@ object Form {
def fromMultipartBytes(
bytes: Chunk[Byte],
charset: Charset = StandardCharsets.UTF_8,
): ZIO[Any, FormDecodingError, Form] = {
def process(boundary: Boundary) = ZStream
.fromChunk(bytes)
.mapAccum(FormState.fromBoundary(boundary)) { (state, byte) =>
state match {
case BoundaryClosed(tree) => (FormState.fromBoundary(boundary), tree)
case BoundaryEncapsulated(tree) => (FormState.fromBoundary(boundary, Some(byte)), tree)
case buffer: FormStateBuffer =>
val state = buffer.append(byte)
state match {
case BoundaryClosed(prevContent) => (state, prevContent)
case _ => (state, Chunk.empty[FormAST])
}
}
}
.collectZIO {
case chunk if chunk.nonEmpty =>
FormData.fromFormAST(chunk, charset)
}
.runCollect
.map(apply)

): ZIO[Any, Throwable, Form] =
for {
boundary <- ZIO
.fromOption(Boundary.fromContent(bytes, charset))
.mapError(_ => FormDecodingError.BoundaryNotFoundInContent)
form <- process(boundary)
.orElseFail(FormDecodingError.BoundaryNotFoundInContent.asException)
form <- StreamingForm(ZStream.fromChunk(bytes), boundary, charset).collectAll
} yield form
}

def fromURLEncoded(encoded: String, encoding: Charset): ZIO[Any, FormDecodingError, Form] = {
val fields = ZIO.foreach(encoded.split("&")) { pair =>
Expand Down
9 changes: 8 additions & 1 deletion zio-http/src/main/scala/zio/http/forms/FormAST.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ import zio._
import zio.http.model.Header.ContentTransferEncoding
import zio.http.model._

private[forms] sealed trait FormAST { def bytes: Chunk[Byte] }
private[forms] sealed trait FormAST {
def bytes: Chunk[Byte]

def isContent: Boolean = this match {
case FormAST.Content(_) => true
case _ => false
}
}

private[forms] object FormAST {

Expand Down
69 changes: 66 additions & 3 deletions zio-http/src/main/scala/zio/http/forms/FormData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import java.nio.charset._

import zio._

import zio.stream.{Take, ZStream}

import zio.http.forms.FormAST._
import zio.http.forms.FormDecodingError._
import zio.http.model.Header.ContentTransferEncoding
Expand Down Expand Up @@ -66,6 +68,20 @@ object FormData {
filename: Option[String] = None,
) extends FormData

final case class StreamingBinary(
name: String,
contentType: MediaType,
transferEncoding: Option[ContentTransferEncoding] = None,
filename: Option[String] = None,
data: ZStream[Any, Nothing, Byte],
) extends FormData {
def collect: ZIO[Any, Nothing, Binary] = {
data.runCollect.map { bytes =>
Binary(name, bytes, contentType, transferEncoding, filename)
}
}
}

final case class Text(
name: String,
value: String,
Expand Down Expand Up @@ -96,9 +112,9 @@ object FormData {
}

for {
disposition <- ZIO.fromOption(extract._1).mapError(_ => FormDataMissingContentDisposition)
name <- ZIO.fromOption(extract._1.flatMap(_.fields.get("name"))).mapError(_ => ContentDispositionMissingName)
charset <- ZIO
disposition <- ZIO.fromOption(extract._1).orElseFail(FormDataMissingContentDisposition)
name <- ZIO.fromOption(extract._1.flatMap(_.fields.get("name"))).orElseFail(ContentDispositionMissingName)
charset <- ZIO
.attempt(extract._2.flatMap(x => x.fields.get("charset").map(Charset.forName)).getOrElse(defaultCharset))
.mapError(e => InvalidCharset(e.getMessage))
contentParts = extract._4.tail // Skip the first empty line
Expand All @@ -115,6 +131,45 @@ object FormData {
else Binary(name, content, contentType, transferEncoding, disposition.fields.get("filename"))
}

private[http] def getContentType(ast: Chunk[FormAST]): MediaType =
ast.collectFirst {
case header: Header if header.name == "Content-Type" =>
MediaType.forContentType(header.preposition)
}.flatten.getOrElse(MediaType.text.plain)

private[http] def incomingStreamingBinary(
ast: Chunk[FormAST],
queue: Queue[Take[Nothing, Byte]],
): ZIO[Any, FormDecodingError, FormData] = {
val extract =
ast.foldLeft((Option.empty[Header], Option.empty[Header], Option.empty[Header])) {
case (accum, header: Header) if header.name == "Content-Disposition" =>
(Some(header), accum._2, accum._3)
case (accum, header: Header) if header.name == "Content-Type" =>
(accum._1, Some(header), accum._3)
case (accum, header: Header) if header.name == "Content-Transfer-Encoding" =>
(accum._1, accum._2, Some(header))
case (accum, _) => accum
}

for {
disposition <- ZIO.fromOption(extract._1).orElseFail(FormDataMissingContentDisposition)
name <- ZIO.fromOption(extract._1.flatMap(_.fields.get("name"))).orElseFail(ContentDispositionMissingName)
contentType = extract._2
.flatMap(x => MediaType.forContentType(x.preposition))
.getOrElse(MediaType.text.plain)
transferEncoding = extract._3
.flatMap(x => ContentTransferEncoding.parse(x.preposition).toOption)

} yield StreamingBinary(
name,
contentType,
transferEncoding,
disposition.fields.get("filename"),
ZStream.fromQueue(queue).flattenTake,
)
}

def textField(name: String, value: String, mediaType: MediaType = MediaType.text.plain): FormData =
Text(name, value, mediaType, None)

Expand All @@ -127,4 +182,12 @@ object FormData {
transferEncoding: Option[ContentTransferEncoding] = None,
filename: Option[String] = None,
): FormData = Binary(name, data, mediaType, transferEncoding, filename)

def streamingBinaryField(
name: String,
data: ZStream[Any, Nothing, Byte],
mediaType: MediaType,
transferEncoding: Option[ContentTransferEncoding] = None,
filename: Option[String] = None,
): FormData = StreamingBinary(name, mediaType, transferEncoding, filename, data)
}
7 changes: 5 additions & 2 deletions zio-http/src/main/scala/zio/http/forms/FormState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ private[forms] object FormState {
buffer: Chunk[Byte],
lastByte: Option[Byte],
boundary: Boundary,
dropContents: Boolean,
) extends FormState { self =>

def append(byte: Byte): FormState = {
Expand All @@ -42,7 +43,7 @@ private[forms] object FormState {

def flush(ast: FormAST): FormStateBuffer =
self.copy(
tree = tree :+ ast,
tree = if (ast.isContent && dropContents) tree else tree :+ ast,
buffer = Chunk.empty,
lastByte = None,
phase = phase0,
Expand Down Expand Up @@ -76,14 +77,16 @@ private[forms] object FormState {
}

}

def startIgnoringContents: FormStateBuffer = self.copy(dropContents = true)
}

final case class BoundaryEncapsulated(buffer: Chunk[FormAST]) extends FormState

final case class BoundaryClosed(buffer: Chunk[FormAST]) extends FormState

def fromBoundary(boundary: Boundary, lastByte: Option[Byte] = None): FormState =
FormStateBuffer(Chunk.empty, Phase.Part1, Chunk.empty, lastByte, boundary)
FormStateBuffer(Chunk.empty, Phase.Part1, Chunk.empty, lastByte, boundary, dropContents = false)

sealed trait Phase

Expand Down
Loading

0 comments on commit 633d862

Please sign in to comment.