Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming multipart/form-data #2062

Merged
merged 6 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions zio-http/src/main/scala/zio/http/Body.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,29 @@ trait Body { self =>
def asMultipartForm(implicit trace: Trace): Task[Form] =
for {
bytes <- asChunk
form <- Form.fromMultipartBytes(bytes, Charsets.Http).mapError(_.asException)
form <- Form.fromMultipartBytes(bytes, Charsets.Http)
} yield form

/**
* Returns an effect that decodes the streaming body as a multipart form.
*
* The result is a stream of FormData objects, where each FormData may be a
* StreamingBinary or a Text object. The StreamingBinary object contains a
* stream of bytes, which has to be consumed asynchronously by the user to get
* the next FormData from the stream.
*/
def asMultipartFormStream(implicit trace: Trace): Task[StreamingForm] =
boundary match {
case Some(boundary) =>
ZIO.succeed(
StreamingForm(asStream, Boundary(boundary.toString), Charsets.Http),
)
case None =>
ZIO.fail(
new IllegalStateException("Cannot decode body as streaming multipart/form-data without a known boundary"),
)
}

/**
* Returns a stream that contains the bytes of the body. This method is safe
* to use with large bodies, because the elements of the returned stream are
Expand Down Expand Up @@ -138,7 +158,7 @@ object Body {
case Some(value) => form.encodeAsMultipartBytes(charset, value)
case None => form.encodeAsMultipartBytes(charset)
}
ChunkBody(bytes).withContentType(MediaType.multipart.`form-data`, Some(boundary))
StreamBody(bytes, Some(MediaType.multipart.`form-data`), Some(boundary))
}

/**
Expand Down
132 changes: 71 additions & 61 deletions zio-http/src/main/scala/zio/http/forms/Form.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ final case class Form(formData: Chunk[FormData]) {

def append(field: FormData): Form = Form(formData :+ field)

/**
* Runs all streaming form data and stores them in memory, returning a Form
* that has no streaming parts
*/
def collectAll: ZIO[Any, Throwable, Form] =
ZIO
.foreach(formData) {
case streamingBinary: StreamingBinary =>
streamingBinary.collect
case other =>
ZIO.succeed(other)
}
.map(Form(_))

def get(name: String): Option[FormData] = formData.find(_.name == name)

def encodeAsURLEncoded(charset: Charset = StandardCharsets.UTF_8): String = {
Expand All @@ -61,64 +75,82 @@ final case class Form(formData: Chunk[FormData]) {

def encodeAsMultipartBytes(
charset: Charset = StandardCharsets.UTF_8,
rng: () => String = () => new SecureRandom().nextLong().toString(),
): (CharSequence, Chunk[Byte]) =
rng: () => String = () => new SecureRandom().nextLong().toString,
): (CharSequence, ZStream[Any, Nothing, Byte]) =
encodeAsMultipartBytes(charset, Boundary.generate(rng))

def encodeAsMultipartBytes(
charset: Charset,
boundary: Boundary,
): (CharSequence, Chunk[Byte]) = {
): (CharSequence, ZStream[Any, Nothing, Byte]) = {

val encapsulatingBoundary = EncapsulatingBoundary(boundary)
val closingBoundary = ClosingBoundary(boundary)

val ast = formData.flatMap {
val astStreams = formData.map {
case fd @ Simple(name, value) =>
Chunk(
encapsulatingBoundary,
EoL,
Header.contentDisposition(name),
EoL,
Header.contentType(fd.contentType),
EoL,
EoL,
Content(Chunk.fromArray(value.getBytes(charset))),
EoL,
ZStream.fromChunk(
Chunk(
encapsulatingBoundary,
EoL,
Header.contentDisposition(name),
EoL,
Header.contentType(fd.contentType),
EoL,
EoL,
Content(Chunk.fromArray(value.getBytes(charset))),
EoL,
),
)

case Text(name, value, contentType, filename) =>
Chunk(
encapsulatingBoundary,
EoL,
Header.contentDisposition(name, filename),
EoL,
Header.contentType(contentType),
EoL,
EoL,
Content(Chunk.fromArray(value.getBytes(charset))),
EoL,
ZStream.fromChunk(
Chunk(
encapsulatingBoundary,
EoL,
Header.contentDisposition(name, filename),
EoL,
Header.contentType(contentType),
EoL,
EoL,
Content(Chunk.fromArray(value.getBytes(charset))),
EoL,
),
)
case Binary(name, data, contentType, transferEncoding, filename) =>
val xferEncoding =
transferEncoding.map(enc => Chunk(Header.contentTransferEncoding(enc), EoL)).getOrElse(Chunk.empty)

Chunk(
encapsulatingBoundary,
EoL,
Header.contentDisposition(name, filename),
EoL,
Header.contentType(contentType),
EoL,
) ++ xferEncoding ++
ZStream.fromChunk(
Chunk(
encapsulatingBoundary,
EoL,
Header.contentDisposition(name, filename),
EoL,
Header.contentType(contentType),
EoL,
) ++ xferEncoding ++ Chunk(EoL, Content(data), EoL),
)

case StreamingBinary(name, contentType, transferEncoding, filename, data) =>
val xferEncoding =
transferEncoding.map(enc => Chunk(Header.contentTransferEncoding(enc), EoL)).getOrElse(Chunk.empty)

ZStream.fromChunk(
Chunk(
encapsulatingBoundary,
EoL,
Content(data),
Header.contentDisposition(name, filename),
EoL,
)
} ++ Chunk(closingBoundary, EoL)
Header.contentType(contentType),
EoL,
) ++ xferEncoding :+ EoL,
) ++ data.chunks.map(Content(_)) ++ ZStream(EoL)
}

boundary.id -> ast.flatMap(_.bytes)
val stream = ZStream.fromChunk(astStreams).flatten ++ ZStream.fromChunk(Chunk(closingBoundary, EoL))

boundary.id -> stream.map(_.bytes).flattenChunks
}
}

Expand All @@ -133,35 +165,13 @@ object Form {
def fromMultipartBytes(
bytes: Chunk[Byte],
charset: Charset = StandardCharsets.UTF_8,
): ZIO[Any, FormDecodingError, Form] = {
def process(boundary: Boundary) = ZStream
.fromChunk(bytes)
.mapAccum(FormState.fromBoundary(boundary)) { (state, byte) =>
state match {
case BoundaryClosed(tree) => (FormState.fromBoundary(boundary), tree)
case BoundaryEncapsulated(tree) => (FormState.fromBoundary(boundary, Some(byte)), tree)
case buffer: FormStateBuffer =>
val state = buffer.append(byte)
state match {
case BoundaryClosed(prevContent) => (state, prevContent)
case _ => (state, Chunk.empty[FormAST])
}
}
}
.collectZIO {
case chunk if chunk.nonEmpty =>
FormData.fromFormAST(chunk, charset)
}
.runCollect
.map(apply)

): ZIO[Any, Throwable, Form] =
for {
boundary <- ZIO
.fromOption(Boundary.fromContent(bytes, charset))
.mapError(_ => FormDecodingError.BoundaryNotFoundInContent)
form <- process(boundary)
.orElseFail(FormDecodingError.BoundaryNotFoundInContent.asException)
form <- StreamingForm(ZStream.fromChunk(bytes), boundary, charset).collectAll
} yield form
}

def fromURLEncoded(encoded: String, encoding: Charset): ZIO[Any, FormDecodingError, Form] = {
val fields = ZIO.foreach(encoded.split("&")) { pair =>
Expand Down
9 changes: 8 additions & 1 deletion zio-http/src/main/scala/zio/http/forms/FormAST.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ import zio._
import zio.http.model.Header.ContentTransferEncoding
import zio.http.model._

private[forms] sealed trait FormAST { def bytes: Chunk[Byte] }
private[forms] sealed trait FormAST {
def bytes: Chunk[Byte]

def isContent: Boolean = this match {
case FormAST.Content(_) => true
case _ => false
}
}

private[forms] object FormAST {

Expand Down
69 changes: 66 additions & 3 deletions zio-http/src/main/scala/zio/http/forms/FormData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import java.nio.charset._

import zio._

import zio.stream.{Take, ZStream}

import zio.http.forms.FormAST._
import zio.http.forms.FormDecodingError._
import zio.http.model.Header.ContentTransferEncoding
Expand Down Expand Up @@ -66,6 +68,20 @@ object FormData {
filename: Option[String] = None,
) extends FormData

final case class StreamingBinary(
name: String,
contentType: MediaType,
transferEncoding: Option[ContentTransferEncoding] = None,
filename: Option[String] = None,
data: ZStream[Any, Nothing, Byte],
) extends FormData {
def collect: ZIO[Any, Nothing, Binary] = {
data.runCollect.map { bytes =>
Binary(name, bytes, contentType, transferEncoding, filename)
}
}
}

final case class Text(
name: String,
value: String,
Expand Down Expand Up @@ -96,9 +112,9 @@ object FormData {
}

for {
disposition <- ZIO.fromOption(extract._1).mapError(_ => FormDataMissingContentDisposition)
name <- ZIO.fromOption(extract._1.flatMap(_.fields.get("name"))).mapError(_ => ContentDispositionMissingName)
charset <- ZIO
disposition <- ZIO.fromOption(extract._1).orElseFail(FormDataMissingContentDisposition)
name <- ZIO.fromOption(extract._1.flatMap(_.fields.get("name"))).orElseFail(ContentDispositionMissingName)
charset <- ZIO
.attempt(extract._2.flatMap(x => x.fields.get("charset").map(Charset.forName)).getOrElse(defaultCharset))
.mapError(e => InvalidCharset(e.getMessage))
contentParts = extract._4.tail // Skip the first empty line
Expand All @@ -115,6 +131,45 @@ object FormData {
else Binary(name, content, contentType, transferEncoding, disposition.fields.get("filename"))
}

private[http] def getContentType(ast: Chunk[FormAST]): MediaType =
ast.collectFirst {
case header: Header if header.name == "Content-Type" =>
MediaType.forContentType(header.preposition)
}.flatten.getOrElse(MediaType.text.plain)

private[http] def incomingStreamingBinary(
ast: Chunk[FormAST],
queue: Queue[Take[Nothing, Byte]],
): ZIO[Any, FormDecodingError, FormData] = {
val extract =
ast.foldLeft((Option.empty[Header], Option.empty[Header], Option.empty[Header])) {
case (accum, header: Header) if header.name == "Content-Disposition" =>
(Some(header), accum._2, accum._3)
case (accum, header: Header) if header.name == "Content-Type" =>
(accum._1, Some(header), accum._3)
case (accum, header: Header) if header.name == "Content-Transfer-Encoding" =>
(accum._1, accum._2, Some(header))
case (accum, _) => accum
}

for {
disposition <- ZIO.fromOption(extract._1).orElseFail(FormDataMissingContentDisposition)
name <- ZIO.fromOption(extract._1.flatMap(_.fields.get("name"))).orElseFail(ContentDispositionMissingName)
contentType = extract._2
.flatMap(x => MediaType.forContentType(x.preposition))
.getOrElse(MediaType.text.plain)
transferEncoding = extract._3
.flatMap(x => ContentTransferEncoding.parse(x.preposition).toOption)

} yield StreamingBinary(
name,
contentType,
transferEncoding,
disposition.fields.get("filename"),
ZStream.fromQueue(queue).flattenTake,
)
}

def textField(name: String, value: String, mediaType: MediaType = MediaType.text.plain): FormData =
Text(name, value, mediaType, None)

Expand All @@ -127,4 +182,12 @@ object FormData {
transferEncoding: Option[ContentTransferEncoding] = None,
filename: Option[String] = None,
): FormData = Binary(name, data, mediaType, transferEncoding, filename)

def streamingBinaryField(
name: String,
data: ZStream[Any, Nothing, Byte],
mediaType: MediaType,
transferEncoding: Option[ContentTransferEncoding] = None,
filename: Option[String] = None,
): FormData = StreamingBinary(name, mediaType, transferEncoding, filename, data)
}
7 changes: 5 additions & 2 deletions zio-http/src/main/scala/zio/http/forms/FormState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ private[forms] object FormState {
buffer: Chunk[Byte],
lastByte: Option[Byte],
boundary: Boundary,
dropContents: Boolean,
) extends FormState { self =>

def append(byte: Byte): FormState = {
Expand All @@ -42,7 +43,7 @@ private[forms] object FormState {

def flush(ast: FormAST): FormStateBuffer =
self.copy(
tree = tree :+ ast,
tree = if (ast.isContent && dropContents) tree else tree :+ ast,
buffer = Chunk.empty,
lastByte = None,
phase = phase0,
Expand Down Expand Up @@ -76,14 +77,16 @@ private[forms] object FormState {
}

}

def startIgnoringContents: FormStateBuffer = self.copy(dropContents = true)
}

final case class BoundaryEncapsulated(buffer: Chunk[FormAST]) extends FormState

final case class BoundaryClosed(buffer: Chunk[FormAST]) extends FormState

def fromBoundary(boundary: Boundary, lastByte: Option[Byte] = None): FormState =
FormStateBuffer(Chunk.empty, Phase.Part1, Chunk.empty, lastByte, boundary)
FormStateBuffer(Chunk.empty, Phase.Part1, Chunk.empty, lastByte, boundary, dropContents = false)

sealed trait Phase

Expand Down
Loading