Skip to content

Commit

Permalink
Add a contextual parser for XML
Browse files Browse the repository at this point in the history
  • Loading branch information
jcazevedo committed Oct 28, 2022
1 parent 4f77c8b commit e58ce3b
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ private[xml] object StreamingXmlParser {
/**
* INTERNAL API
*/
@InternalApi private[xml] class StreamingXmlParser(ignoreInvalidChars: Boolean,
configureFactory: AsyncXMLInputFactory => Unit)
extends GraphStage[FlowShape[ByteString, ParseEvent]] {
val in: Inlet[ByteString] = Inlet("XMLParser.in")
val out: Outlet[ParseEvent] = Outlet("XMLParser.out")
override val shape: FlowShape[ByteString, ParseEvent] = FlowShape(in, out)
@InternalApi private[xml] class StreamingXmlParser[Ctx](ignoreInvalidChars: Boolean,
configureFactory: AsyncXMLInputFactory => Unit)
extends GraphStage[FlowShape[(ByteString, Ctx), (ParseEvent, Ctx)]] {
val in: Inlet[(ByteString, Ctx)] = Inlet("XMLParser.in")
val out: Outlet[(ParseEvent, Ctx)] = Outlet("XMLParser.out")
override val shape: FlowShape[(ByteString, Ctx), (ParseEvent, Ctx)] = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private var started: Boolean = false
private var context: Ctx = _

import javax.xml.stream.XMLStreamConstants

Expand All @@ -45,7 +46,9 @@ private[xml] object StreamingXmlParser {
setHandlers(in, out, this)

override def onPush(): Unit = {
val array = grab(in).toArray
val (bs, ctx) = grab(in)
val array = bs.toArray
context = ctx
parser.getInputFeeder.feedInput(array, 0, array.length)
advanceParser()
}
Expand All @@ -67,10 +70,10 @@ private[xml] object StreamingXmlParser {

case XMLStreamConstants.START_DOCUMENT =>
started = true
push(out, StartDocument)
push(out, (StartDocument, context))

case XMLStreamConstants.END_DOCUMENT =>
push(out, EndDocument)
push(out, (EndDocument, context))
completeStage()

case XMLStreamConstants.START_ELEMENT =>
Expand All @@ -91,27 +94,28 @@ private[xml] object StreamingXmlParser {
val optNs = optPrefix.flatMap(prefix => Option(parser.getNamespaceURI(prefix)))
push(
out,
StartElement(parser.getLocalName,
attributes,
optPrefix.filterNot(_ == ""),
optNs.filterNot(_ == ""),
namespaceCtx = namespaces)
(StartElement(parser.getLocalName,
attributes,
optPrefix.filterNot(_ == ""),
optNs.filterNot(_ == ""),
namespaceCtx = namespaces),
context)
)

case XMLStreamConstants.END_ELEMENT =>
push(out, EndElement(parser.getLocalName))
push(out, (EndElement(parser.getLocalName), context))

case XMLStreamConstants.CHARACTERS =>
push(out, Characters(parser.getText))
push(out, (Characters(parser.getText), context))

case XMLStreamConstants.PROCESSING_INSTRUCTION =>
push(out, ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData)))
push(out, (ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData)), context))

case XMLStreamConstants.COMMENT =>
push(out, Comment(parser.getText))
push(out, (Comment(parser.getText), context))

case XMLStreamConstants.CDATA =>
push(out, CData(parser.getText))
push(out, (CData(parser.getText), context))

// Do not support DTD, SPACE, NAMESPACE, NOTATION_DECLARATION, ENTITY_DECLARATION, PROCESSING_INSTRUCTION
// ATTRIBUTE is handled in START_ELEMENT implicitly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,28 @@ object XmlParsing {
def parser(): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] =
xml.scaladsl.XmlParsing.parser.asJava

/**
* Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to
* SAX.
*/
def parserWithContext[Ctx](): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] =
xml.scaladsl.XmlParsing.parserWithContext().asJava

/**
* Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX.
*/
def parser(ignoreInvalidChars: Boolean): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] =
xml.scaladsl.XmlParsing.parser(ignoreInvalidChars).asJava

/**
* Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to
* SAX.
*/
def parserWithContext[Ctx](
ignoreInvalidChars: Boolean
): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] =
xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars).asJava

/**
* Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX.
*/
Expand All @@ -37,6 +53,15 @@ object XmlParsing {
): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] =
xml.scaladsl.XmlParsing.parser(false, configureFactory.accept(_)).asJava

/**
* Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to
* SAX.
*/
def parserWithContext[Ctx](
configureFactory: Consumer[AsyncXMLInputFactory]
): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] =
xml.scaladsl.XmlParsing.parserWithContext(false, configureFactory.accept(_)).asJava

/**
* Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX.
*/
Expand All @@ -46,6 +71,16 @@ object XmlParsing {
): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] =
xml.scaladsl.XmlParsing.parser(ignoreInvalidChars, configureFactory.accept(_)).asJava

/**
* Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to
* SAX.
*/
def parserWithContext[Ctx](
ignoreInvalidChars: Boolean,
configureFactory: Consumer[AsyncXMLInputFactory]
): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] =
xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars, configureFactory.accept(_)).asJava

/**
* A Flow that transforms a stream of XML ParseEvents. This stage coalesces consequitive CData and Characters
* events into a single Characters event or fails if the buffered string is larger than the maximum defined.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.stream.alpakka.xml.scaladsl
import akka.NotUsed
import akka.stream.alpakka.xml.ParseEvent
import akka.stream.alpakka.xml.impl
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.{Flow, FlowWithContext}
import akka.util.ByteString
import com.fasterxml.aalto.AsyncXMLInputFactory
import org.w3c.dom.Element
Expand Down Expand Up @@ -40,7 +40,20 @@ object XmlParsing {
*/
def parser(ignoreInvalidChars: Boolean = false,
configureFactory: AsyncXMLInputFactory => Unit = configureDefault): Flow[ByteString, ParseEvent, NotUsed] =
Flow.fromGraph(new impl.StreamingXmlParser(ignoreInvalidChars, configureFactory))
Flow[ByteString]
.map((_, ()))
.via(Flow.fromGraph(new impl.StreamingXmlParser[Unit](ignoreInvalidChars, configureFactory)))
.map(_._1)

/**
* Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to
* SAX.
*/
def parserWithContext[Ctx](
ignoreInvalidChars: Boolean = false,
configureFactory: AsyncXMLInputFactory => Unit = configureDefault
): FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] =
FlowWithContext.fromTuples(Flow.fromGraph(new impl.StreamingXmlParser[Ctx](ignoreInvalidChars, configureFactory)))

/**
* A Flow that transforms a stream of XML ParseEvents. This stage coalesces consecutive CData and Characters
Expand Down
41 changes: 40 additions & 1 deletion xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.actor.ActorSystem
import akka.stream.alpakka.testkit.scaladsl.LogCapturing
import akka.stream.alpakka.xml._
import akka.stream.alpakka.xml.scaladsl.XmlParsing
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.scaladsl.{Flow, Framing, Keep, Sink, Source}
import akka.util.ByteString
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.BeforeAndAfterAll
Expand Down Expand Up @@ -356,6 +356,45 @@ class XmlProcessingSpec extends AnyWordSpec with Matchers with ScalaFutures with
configWasCalled shouldBe true
}

"properly parse XML contextually" in {
val doc = """|<doc>
| <elem>
| elem1
| </elem>
| <elem>
| elem2
| </elem>
|</doc>""".stripMargin
val resultFuture = Source
.single(ByteString(doc))
.via(
Framing.delimiter(delimiter = ByteString(System.lineSeparator),
maximumFrameLength = 65536,
allowTruncation = true)
)
.zipWithIndex
.runWith(XmlParsing.parserWithContext[Long]().asFlow.toMat(Sink.seq)(Keep.right))

resultFuture.futureValue should ===(
List(
(StartDocument, 0L),
(StartElement("doc"), 0L),
(Characters(" "), 1L),
(StartElement("elem"), 1L),
(Characters(" elem1"), 2L),
(Characters(" "), 3L),
(EndElement("elem"), 3L),
(Characters(" "), 4L),
(StartElement("elem"), 4L),
(Characters(" elem2"), 5L),
(Characters(" "), 6L),
(EndElement("elem"), 6L),
(EndElement("doc"), 7L),
(EndDocument, 7L)
)
)
}

}

override protected def afterAll(): Unit = system.terminate()
Expand Down

0 comments on commit e58ce3b

Please sign in to comment.