Skip to content

Commit

Permalink
Avoid unnecessarily creating tuples in non-contextual implementation
Browse files Browse the repository at this point in the history
Attempts to avoid the unnecessary creation of tuples by parameterizing
the internal API and letting callers decide how elements in the flow are
built.
  • Loading branch information
jcazevedo committed Nov 26, 2022
1 parent e58ce3b commit d66f71c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ private[xml] object StreamingXmlParser {
/**
* INTERNAL API
*/
@InternalApi private[xml] class StreamingXmlParser[Ctx](ignoreInvalidChars: Boolean,
configureFactory: AsyncXMLInputFactory => Unit)
extends GraphStage[FlowShape[(ByteString, Ctx), (ParseEvent, Ctx)]] {
val in: Inlet[(ByteString, Ctx)] = Inlet("XMLParser.in")
val out: Outlet[(ParseEvent, Ctx)] = Outlet("XMLParser.out")
override val shape: FlowShape[(ByteString, Ctx), (ParseEvent, Ctx)] = FlowShape(in, out)
@InternalApi private[xml] class StreamingXmlParser[A, B, Ctx](ignoreInvalidChars: Boolean,
configureFactory: AsyncXMLInputFactory => Unit,
getByteString: A => ByteString,
getContext: A => Ctx,
buildOutput: (ParseEvent, Ctx) => B)
extends GraphStage[FlowShape[A, B]] {
val in: Inlet[A] = Inlet("XMLParser.in")
val out: Outlet[B] = Outlet("XMLParser.out")
override val shape: FlowShape[A, B] = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
Expand All @@ -46,9 +49,10 @@ private[xml] object StreamingXmlParser {
setHandlers(in, out, this)

override def onPush(): Unit = {
val (bs, ctx) = grab(in)
val a = grab(in)
val bs = getByteString(a)
context = getContext(a)
val array = bs.toArray
context = ctx
parser.getInputFeeder.feedInput(array, 0, array.length)
advanceParser()
}
Expand All @@ -70,10 +74,10 @@ private[xml] object StreamingXmlParser {

case XMLStreamConstants.START_DOCUMENT =>
started = true
push(out, (StartDocument, context))
push(out, buildOutput(StartDocument, context))

case XMLStreamConstants.END_DOCUMENT =>
push(out, (EndDocument, context))
push(out, buildOutput(EndDocument, context))
completeStage()

case XMLStreamConstants.START_ELEMENT =>
Expand All @@ -94,28 +98,29 @@ private[xml] object StreamingXmlParser {
val optNs = optPrefix.flatMap(prefix => Option(parser.getNamespaceURI(prefix)))
push(
out,
(StartElement(parser.getLocalName,
attributes,
optPrefix.filterNot(_ == ""),
optNs.filterNot(_ == ""),
namespaceCtx = namespaces),
context)
buildOutput(StartElement(parser.getLocalName,
attributes,
optPrefix.filterNot(_ == ""),
optNs.filterNot(_ == ""),
namespaceCtx = namespaces),
context)
)

case XMLStreamConstants.END_ELEMENT =>
push(out, (EndElement(parser.getLocalName), context))
push(out, buildOutput(EndElement(parser.getLocalName), context))

case XMLStreamConstants.CHARACTERS =>
push(out, (Characters(parser.getText), context))
push(out, buildOutput(Characters(parser.getText), context))

case XMLStreamConstants.PROCESSING_INSTRUCTION =>
push(out, (ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData)), context))
push(out,
buildOutput(ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData)), context))

case XMLStreamConstants.COMMENT =>
push(out, (Comment(parser.getText), context))
push(out, buildOutput(Comment(parser.getText), context))

case XMLStreamConstants.CDATA =>
push(out, (CData(parser.getText), context))
push(out, buildOutput(CData(parser.getText), context))

// Do not support DTD, SPACE, NAMESPACE, NOTATION_DECLARATION, ENTITY_DECLARATION, PROCESSING_INSTRUCTION
// ATTRIBUTE is handled in START_ELEMENT implicitly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,15 @@ object XmlParsing {
*/
def parser(ignoreInvalidChars: Boolean = false,
configureFactory: AsyncXMLInputFactory => Unit = configureDefault): Flow[ByteString, ParseEvent, NotUsed] =
Flow[ByteString]
.map((_, ()))
.via(Flow.fromGraph(new impl.StreamingXmlParser[Unit](ignoreInvalidChars, configureFactory)))
.map(_._1)
Flow[ByteString].via(
Flow.fromGraph(
new impl.StreamingXmlParser[ByteString, ParseEvent, Unit](ignoreInvalidChars,
configureFactory,
getByteString = identity,
getContext = _ => (),
buildOutput = (parseEvent, _) => parseEvent)
)
)

/**
* Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to
Expand All @@ -53,7 +58,15 @@ object XmlParsing {
ignoreInvalidChars: Boolean = false,
configureFactory: AsyncXMLInputFactory => Unit = configureDefault
): FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] =
FlowWithContext.fromTuples(Flow.fromGraph(new impl.StreamingXmlParser[Ctx](ignoreInvalidChars, configureFactory)))
FlowWithContext.fromTuples(
Flow.fromGraph(
new impl.StreamingXmlParser[(ByteString, Ctx), (ParseEvent, Ctx), Ctx](ignoreInvalidChars,
configureFactory,
getByteString = _._1,
getContext = _._2,
buildOutput = (pe, ctx) => (pe, ctx))
)
)

/**
* A Flow that transforms a stream of XML ParseEvents. This stage coalesces consecutive CData and Characters
Expand Down

0 comments on commit d66f71c

Please sign in to comment.