From e58ce3bc25e34219910313ab51b248229b78337f Mon Sep 17 00:00:00 2001 From: Joao Azevedo Date: Fri, 28 Oct 2022 18:15:19 +0100 Subject: [PATCH] Add a contextual parser for XML --- .../alpakka/xml/impl/StreamingXmlParser.scala | 42 ++++++++++--------- .../alpakka/xml/javadsl/XmlParsing.scala | 35 ++++++++++++++++ .../alpakka/xml/scaladsl/XmlParsing.scala | 17 +++++++- .../docs/scaladsl/XmlProcessingSpec.scala | 41 +++++++++++++++++- 4 files changed, 113 insertions(+), 22 deletions(-) diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala b/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala index faf0ad104e..d02da49a61 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala @@ -22,16 +22,17 @@ private[xml] object StreamingXmlParser { /** * INTERNAL API */ -@InternalApi private[xml] class StreamingXmlParser(ignoreInvalidChars: Boolean, - configureFactory: AsyncXMLInputFactory => Unit) - extends GraphStage[FlowShape[ByteString, ParseEvent]] { - val in: Inlet[ByteString] = Inlet("XMLParser.in") - val out: Outlet[ParseEvent] = Outlet("XMLParser.out") - override val shape: FlowShape[ByteString, ParseEvent] = FlowShape(in, out) +@InternalApi private[xml] class StreamingXmlParser[Ctx](ignoreInvalidChars: Boolean, + configureFactory: AsyncXMLInputFactory => Unit) + extends GraphStage[FlowShape[(ByteString, Ctx), (ParseEvent, Ctx)]] { + val in: Inlet[(ByteString, Ctx)] = Inlet("XMLParser.in") + val out: Outlet[(ParseEvent, Ctx)] = Outlet("XMLParser.out") + override val shape: FlowShape[(ByteString, Ctx), (ParseEvent, Ctx)] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { private var started: Boolean = false + private var context: Ctx = _ import javax.xml.stream.XMLStreamConstants @@ -45,7 +46,9 @@ private[xml] object StreamingXmlParser { setHandlers(in, out, this) override def onPush(): Unit = { - val array = grab(in).toArray + val (bs, ctx) = grab(in) + val array = bs.toArray + context = ctx parser.getInputFeeder.feedInput(array, 0, array.length) advanceParser() } @@ -67,10 +70,10 @@ private[xml] object StreamingXmlParser { case XMLStreamConstants.START_DOCUMENT => started = true - push(out, StartDocument) + push(out, (StartDocument, context)) case XMLStreamConstants.END_DOCUMENT => - push(out, EndDocument) + push(out, (EndDocument, context)) completeStage() case XMLStreamConstants.START_ELEMENT => @@ -91,27 +94,28 @@ private[xml] object StreamingXmlParser { val optNs = optPrefix.flatMap(prefix => Option(parser.getNamespaceURI(prefix))) push( out, - StartElement(parser.getLocalName, - attributes, - optPrefix.filterNot(_ == ""), - optNs.filterNot(_ == ""), - namespaceCtx = namespaces) + (StartElement(parser.getLocalName, + attributes, + optPrefix.filterNot(_ == ""), + optNs.filterNot(_ == ""), + namespaceCtx = namespaces), + context) ) case XMLStreamConstants.END_ELEMENT => - push(out, EndElement(parser.getLocalName)) + push(out, (EndElement(parser.getLocalName), context)) case XMLStreamConstants.CHARACTERS => - push(out, Characters(parser.getText)) + push(out, (Characters(parser.getText), context)) case XMLStreamConstants.PROCESSING_INSTRUCTION => - push(out, ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData))) + push(out, (ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData)), context)) case XMLStreamConstants.COMMENT => - push(out, Comment(parser.getText)) + push(out, (Comment(parser.getText), context)) case XMLStreamConstants.CDATA => - push(out, CData(parser.getText)) + push(out, (CData(parser.getText), context)) // Do not support DTD, SPACE, NAMESPACE, NOTATION_DECLARATION, ENTITY_DECLARATION, PROCESSING_INSTRUCTION // ATTRIBUTE is handled in START_ELEMENT implicitly diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala b/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala index 85826fc6ec..ae0d3c0a67 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala @@ -23,12 +23,28 @@ object XmlParsing { def parser(): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] = xml.scaladsl.XmlParsing.parser.asJava + /** + * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to + * SAX. + */ + def parserWithContext[Ctx](): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + xml.scaladsl.XmlParsing.parserWithContext().asJava + /** * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX. */ def parser(ignoreInvalidChars: Boolean): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] = xml.scaladsl.XmlParsing.parser(ignoreInvalidChars).asJava + /** + * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to + * SAX. + */ + def parserWithContext[Ctx]( + ignoreInvalidChars: Boolean + ): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars).asJava + /** * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX. */ @@ -37,6 +53,15 @@ object XmlParsing { ): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] = xml.scaladsl.XmlParsing.parser(false, configureFactory.accept(_)).asJava + /** + * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to + * SAX. + */ + def parserWithContext[Ctx]( + configureFactory: Consumer[AsyncXMLInputFactory] + ): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + xml.scaladsl.XmlParsing.parserWithContext(false, configureFactory.accept(_)).asJava + /** * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX. */ @@ -46,6 +71,16 @@ object XmlParsing { ): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] = xml.scaladsl.XmlParsing.parser(ignoreInvalidChars, configureFactory.accept(_)).asJava + /** + * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to + * SAX. + */ + def parserWithContext[Ctx]( + ignoreInvalidChars: Boolean, + configureFactory: Consumer[AsyncXMLInputFactory] + ): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars, configureFactory.accept(_)).asJava + /** * A Flow that transforms a stream of XML ParseEvents. This stage coalesces consequitive CData and Characters * events into a single Characters event or fails if the buffered string is larger than the maximum defined. diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala index 7f0ee8c11c..de72f894db 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala @@ -7,7 +7,7 @@ package akka.stream.alpakka.xml.scaladsl import akka.NotUsed import akka.stream.alpakka.xml.ParseEvent import akka.stream.alpakka.xml.impl -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.{Flow, FlowWithContext} import akka.util.ByteString import com.fasterxml.aalto.AsyncXMLInputFactory import org.w3c.dom.Element @@ -40,7 +40,20 @@ object XmlParsing { */ def parser(ignoreInvalidChars: Boolean = false, configureFactory: AsyncXMLInputFactory => Unit = configureDefault): Flow[ByteString, ParseEvent, NotUsed] = - Flow.fromGraph(new impl.StreamingXmlParser(ignoreInvalidChars, configureFactory)) + Flow[ByteString] + .map((_, ())) + .via(Flow.fromGraph(new impl.StreamingXmlParser[Unit](ignoreInvalidChars, configureFactory))) + .map(_._1) + + /** + * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to + * SAX. + */ + def parserWithContext[Ctx]( + ignoreInvalidChars: Boolean = false, + configureFactory: AsyncXMLInputFactory => Unit = configureDefault + ): FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + FlowWithContext.fromTuples(Flow.fromGraph(new impl.StreamingXmlParser[Ctx](ignoreInvalidChars, configureFactory))) /** * A Flow that transforms a stream of XML ParseEvents. This stage coalesces consecutive CData and Characters diff --git a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala index d1dfbac428..7a6a834baa 100644 --- a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala +++ b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala @@ -8,7 +8,7 @@ import akka.actor.ActorSystem import akka.stream.alpakka.testkit.scaladsl.LogCapturing import akka.stream.alpakka.xml._ import akka.stream.alpakka.xml.scaladsl.XmlParsing -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import akka.stream.scaladsl.{Flow, Framing, Keep, Sink, Source} import akka.util.ByteString import org.scalatest.concurrent.ScalaFutures import org.scalatest.BeforeAndAfterAll @@ -356,6 +356,45 @@ class XmlProcessingSpec extends AnyWordSpec with Matchers with ScalaFutures with configWasCalled shouldBe true } + "properly parse XML contextually" in { + val doc = """| + | + | elem1 + | + | + | elem2 + | + |""".stripMargin + val resultFuture = Source + .single(ByteString(doc)) + .via( + Framing.delimiter(delimiter = ByteString(System.lineSeparator), + maximumFrameLength = 65536, + allowTruncation = true) + ) + .zipWithIndex + .runWith(XmlParsing.parserWithContext[Long]().asFlow.toMat(Sink.seq)(Keep.right)) + + resultFuture.futureValue should ===( + List( + (StartDocument, 0L), + (StartElement("doc"), 0L), + (Characters(" "), 1L), + (StartElement("elem"), 1L), + (Characters(" elem1"), 2L), + (Characters(" "), 3L), + (EndElement("elem"), 3L), + (Characters(" "), 4L), + (StartElement("elem"), 4L), + (Characters(" elem2"), 5L), + (Characters(" "), 6L), + (EndElement("elem"), 6L), + (EndElement("doc"), 7L), + (EndDocument, 7L) + ) + ) + } + } override protected def afterAll(): Unit = system.terminate()