From e58ce3bc25e34219910313ab51b248229b78337f Mon Sep 17 00:00:00 2001 From: Joao Azevedo Date: Fri, 28 Oct 2022 18:15:19 +0100 Subject: [PATCH 1/7] Add a contextual parser for XML --- .../alpakka/xml/impl/StreamingXmlParser.scala | 42 ++++++++++--------- .../alpakka/xml/javadsl/XmlParsing.scala | 35 ++++++++++++++++ .../alpakka/xml/scaladsl/XmlParsing.scala | 17 +++++++- .../docs/scaladsl/XmlProcessingSpec.scala | 41 +++++++++++++++++- 4 files changed, 113 insertions(+), 22 deletions(-) diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala b/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala index faf0ad104e..d02da49a61 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala @@ -22,16 +22,17 @@ private[xml] object StreamingXmlParser { /** * INTERNAL API */ -@InternalApi private[xml] class StreamingXmlParser(ignoreInvalidChars: Boolean, - configureFactory: AsyncXMLInputFactory => Unit) - extends GraphStage[FlowShape[ByteString, ParseEvent]] { - val in: Inlet[ByteString] = Inlet("XMLParser.in") - val out: Outlet[ParseEvent] = Outlet("XMLParser.out") - override val shape: FlowShape[ByteString, ParseEvent] = FlowShape(in, out) +@InternalApi private[xml] class StreamingXmlParser[Ctx](ignoreInvalidChars: Boolean, + configureFactory: AsyncXMLInputFactory => Unit) + extends GraphStage[FlowShape[(ByteString, Ctx), (ParseEvent, Ctx)]] { + val in: Inlet[(ByteString, Ctx)] = Inlet("XMLParser.in") + val out: Outlet[(ParseEvent, Ctx)] = Outlet("XMLParser.out") + override val shape: FlowShape[(ByteString, Ctx), (ParseEvent, Ctx)] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { private var started: Boolean = false + private var context: Ctx = _ import javax.xml.stream.XMLStreamConstants @@ -45,7 +46,9 @@ private[xml] object StreamingXmlParser { setHandlers(in, out, this) override def onPush(): Unit = { - val array = grab(in).toArray + val (bs, ctx) = grab(in) + val array = bs.toArray + context = ctx parser.getInputFeeder.feedInput(array, 0, array.length) advanceParser() } @@ -67,10 +70,10 @@ private[xml] object StreamingXmlParser { case XMLStreamConstants.START_DOCUMENT => started = true - push(out, StartDocument) + push(out, (StartDocument, context)) case XMLStreamConstants.END_DOCUMENT => - push(out, EndDocument) + push(out, (EndDocument, context)) completeStage() case XMLStreamConstants.START_ELEMENT => @@ -91,27 +94,28 @@ private[xml] object StreamingXmlParser { val optNs = optPrefix.flatMap(prefix => Option(parser.getNamespaceURI(prefix))) push( out, - StartElement(parser.getLocalName, - attributes, - optPrefix.filterNot(_ == ""), - optNs.filterNot(_ == ""), - namespaceCtx = namespaces) + (StartElement(parser.getLocalName, + attributes, + optPrefix.filterNot(_ == ""), + optNs.filterNot(_ == ""), + namespaceCtx = namespaces), + context) ) case XMLStreamConstants.END_ELEMENT => - push(out, EndElement(parser.getLocalName)) + push(out, (EndElement(parser.getLocalName), context)) case XMLStreamConstants.CHARACTERS => - push(out, Characters(parser.getText)) + push(out, (Characters(parser.getText), context)) case XMLStreamConstants.PROCESSING_INSTRUCTION => - push(out, ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData))) + push(out, (ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData)), context)) case XMLStreamConstants.COMMENT => - push(out, Comment(parser.getText)) + push(out, (Comment(parser.getText), context)) case XMLStreamConstants.CDATA => - push(out, CData(parser.getText)) + push(out, (CData(parser.getText), context)) // Do not support DTD, SPACE, NAMESPACE, NOTATION_DECLARATION, ENTITY_DECLARATION, PROCESSING_INSTRUCTION // ATTRIBUTE is handled in START_ELEMENT implicitly diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala b/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala index 85826fc6ec..ae0d3c0a67 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala @@ -23,12 +23,28 @@ object XmlParsing { def parser(): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] = xml.scaladsl.XmlParsing.parser.asJava + /** + * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to + * SAX. + */ + def parserWithContext[Ctx](): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + xml.scaladsl.XmlParsing.parserWithContext().asJava + /** * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX. */ def parser(ignoreInvalidChars: Boolean): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] = xml.scaladsl.XmlParsing.parser(ignoreInvalidChars).asJava + /** + * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to + * SAX. + */ + def parserWithContext[Ctx]( + ignoreInvalidChars: Boolean + ): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars).asJava + /** * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX. */ @@ -37,6 +53,15 @@ object XmlParsing { ): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] = xml.scaladsl.XmlParsing.parser(false, configureFactory.accept(_)).asJava + /** + * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to + * SAX. + */ + def parserWithContext[Ctx]( + configureFactory: Consumer[AsyncXMLInputFactory] + ): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + xml.scaladsl.XmlParsing.parserWithContext(false, configureFactory.accept(_)).asJava + /** * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX. */ @@ -46,6 +71,16 @@ object XmlParsing { ): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] = xml.scaladsl.XmlParsing.parser(ignoreInvalidChars, configureFactory.accept(_)).asJava + /** + * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to + * SAX. + */ + def parserWithContext[Ctx]( + ignoreInvalidChars: Boolean, + configureFactory: Consumer[AsyncXMLInputFactory] + ): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + xml.scaladsl.XmlParsing.parserWithContext(ignoreInvalidChars, configureFactory.accept(_)).asJava + /** * A Flow that transforms a stream of XML ParseEvents. This stage coalesces consequitive CData and Characters * events into a single Characters event or fails if the buffered string is larger than the maximum defined. diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala index 7f0ee8c11c..de72f894db 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala @@ -7,7 +7,7 @@ package akka.stream.alpakka.xml.scaladsl import akka.NotUsed import akka.stream.alpakka.xml.ParseEvent import akka.stream.alpakka.xml.impl -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.{Flow, FlowWithContext} import akka.util.ByteString import com.fasterxml.aalto.AsyncXMLInputFactory import org.w3c.dom.Element @@ -40,7 +40,20 @@ object XmlParsing { */ def parser(ignoreInvalidChars: Boolean = false, configureFactory: AsyncXMLInputFactory => Unit = configureDefault): Flow[ByteString, ParseEvent, NotUsed] = - Flow.fromGraph(new impl.StreamingXmlParser(ignoreInvalidChars, configureFactory)) + Flow[ByteString] + .map((_, ())) + .via(Flow.fromGraph(new impl.StreamingXmlParser[Unit](ignoreInvalidChars, configureFactory))) + .map(_._1) + + /** + * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to + * SAX. + */ + def parserWithContext[Ctx]( + ignoreInvalidChars: Boolean = false, + configureFactory: AsyncXMLInputFactory => Unit = configureDefault + ): FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = + FlowWithContext.fromTuples(Flow.fromGraph(new impl.StreamingXmlParser[Ctx](ignoreInvalidChars, configureFactory))) /** * A Flow that transforms a stream of XML ParseEvents. This stage coalesces consecutive CData and Characters diff --git a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala index d1dfbac428..7a6a834baa 100644 --- a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala +++ b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala @@ -8,7 +8,7 @@ import akka.actor.ActorSystem import akka.stream.alpakka.testkit.scaladsl.LogCapturing import akka.stream.alpakka.xml._ import akka.stream.alpakka.xml.scaladsl.XmlParsing -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import akka.stream.scaladsl.{Flow, Framing, Keep, Sink, Source} import akka.util.ByteString import org.scalatest.concurrent.ScalaFutures import org.scalatest.BeforeAndAfterAll @@ -356,6 +356,45 @@ class XmlProcessingSpec extends AnyWordSpec with Matchers with ScalaFutures with configWasCalled shouldBe true } + "properly parse XML contextually" in { + val doc = """| + | + | elem1 + | + | + | elem2 + | + |""".stripMargin + val resultFuture = Source + .single(ByteString(doc)) + .via( + Framing.delimiter(delimiter = ByteString(System.lineSeparator), + maximumFrameLength = 65536, + allowTruncation = true) + ) + .zipWithIndex + .runWith(XmlParsing.parserWithContext[Long]().asFlow.toMat(Sink.seq)(Keep.right)) + + resultFuture.futureValue should ===( + List( + (StartDocument, 0L), + (StartElement("doc"), 0L), + (Characters(" "), 1L), + (StartElement("elem"), 1L), + (Characters(" elem1"), 2L), + (Characters(" "), 3L), + (EndElement("elem"), 3L), + (Characters(" "), 4L), + (StartElement("elem"), 4L), + (Characters(" elem2"), 5L), + (Characters(" "), 6L), + (EndElement("elem"), 6L), + (EndElement("doc"), 7L), + (EndDocument, 7L) + ) + ) + } + } override protected def afterAll(): Unit = system.terminate() From d66f71cb67a5e6e2cbec298dc9e01c6df21f6832 Mon Sep 17 00:00:00 2001 From: Joao Azevedo Date: Sat, 26 Nov 2022 00:02:33 +0000 Subject: [PATCH 2/7] Avoid unnecessarily creating tuples in non-contextual implementation Attempts to avoid the unnecessary creation of tuples by parameterizing the internal API and letting callers decide how elements in the flow are built. --- .../alpakka/xml/impl/StreamingXmlParser.scala | 47 ++++++++++--------- .../alpakka/xml/scaladsl/XmlParsing.scala | 23 +++++++-- 2 files changed, 44 insertions(+), 26 deletions(-) diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala b/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala index d02da49a61..88d28aa7c2 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala @@ -22,12 +22,15 @@ private[xml] object StreamingXmlParser { /** * INTERNAL API */ -@InternalApi private[xml] class StreamingXmlParser[Ctx](ignoreInvalidChars: Boolean, - configureFactory: AsyncXMLInputFactory => Unit) - extends GraphStage[FlowShape[(ByteString, Ctx), (ParseEvent, Ctx)]] { - val in: Inlet[(ByteString, Ctx)] = Inlet("XMLParser.in") - val out: Outlet[(ParseEvent, Ctx)] = Outlet("XMLParser.out") - override val shape: FlowShape[(ByteString, Ctx), (ParseEvent, Ctx)] = FlowShape(in, out) +@InternalApi private[xml] class StreamingXmlParser[A, B, Ctx](ignoreInvalidChars: Boolean, + configureFactory: AsyncXMLInputFactory => Unit, + getByteString: A => ByteString, + getContext: A => Ctx, + buildOutput: (ParseEvent, Ctx) => B) + extends GraphStage[FlowShape[A, B]] { + val in: Inlet[A] = Inlet("XMLParser.in") + val out: Outlet[B] = Outlet("XMLParser.out") + override val shape: FlowShape[A, B] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -46,9 +49,10 @@ private[xml] object StreamingXmlParser { setHandlers(in, out, this) override def onPush(): Unit = { - val (bs, ctx) = grab(in) + val a = grab(in) + val bs = getByteString(a) + context = getContext(a) val array = bs.toArray - context = ctx parser.getInputFeeder.feedInput(array, 0, array.length) advanceParser() } @@ -70,10 +74,10 @@ private[xml] object StreamingXmlParser { case XMLStreamConstants.START_DOCUMENT => started = true - push(out, (StartDocument, context)) + push(out, buildOutput(StartDocument, context)) case XMLStreamConstants.END_DOCUMENT => - push(out, (EndDocument, context)) + push(out, buildOutput(EndDocument, context)) completeStage() case XMLStreamConstants.START_ELEMENT => @@ -94,28 +98,29 @@ private[xml] object StreamingXmlParser { val optNs = optPrefix.flatMap(prefix => Option(parser.getNamespaceURI(prefix))) push( out, - (StartElement(parser.getLocalName, - attributes, - optPrefix.filterNot(_ == ""), - optNs.filterNot(_ == ""), - namespaceCtx = namespaces), - context) + buildOutput(StartElement(parser.getLocalName, + attributes, + optPrefix.filterNot(_ == ""), + optNs.filterNot(_ == ""), + namespaceCtx = namespaces), + context) ) case XMLStreamConstants.END_ELEMENT => - push(out, (EndElement(parser.getLocalName), context)) + push(out, buildOutput(EndElement(parser.getLocalName), context)) case XMLStreamConstants.CHARACTERS => - push(out, (Characters(parser.getText), context)) + push(out, buildOutput(Characters(parser.getText), context)) case XMLStreamConstants.PROCESSING_INSTRUCTION => - push(out, (ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData)), context)) + push(out, + buildOutput(ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData)), context)) case XMLStreamConstants.COMMENT => - push(out, (Comment(parser.getText), context)) + push(out, buildOutput(Comment(parser.getText), context)) case XMLStreamConstants.CDATA => - push(out, (CData(parser.getText), context)) + push(out, buildOutput(CData(parser.getText), context)) // Do not support DTD, SPACE, NAMESPACE, NOTATION_DECLARATION, ENTITY_DECLARATION, PROCESSING_INSTRUCTION // ATTRIBUTE is handled in START_ELEMENT implicitly diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala index de72f894db..e15c9daf2c 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala @@ -40,10 +40,15 @@ object XmlParsing { */ def parser(ignoreInvalidChars: Boolean = false, configureFactory: AsyncXMLInputFactory => Unit = configureDefault): Flow[ByteString, ParseEvent, NotUsed] = - Flow[ByteString] - .map((_, ())) - .via(Flow.fromGraph(new impl.StreamingXmlParser[Unit](ignoreInvalidChars, configureFactory))) - .map(_._1) + Flow[ByteString].via( + Flow.fromGraph( + new impl.StreamingXmlParser[ByteString, ParseEvent, Unit](ignoreInvalidChars, + configureFactory, + getByteString = identity, + getContext = _ => (), + buildOutput = (parseEvent, _) => parseEvent) + ) + ) /** * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to @@ -53,7 +58,15 @@ object XmlParsing { ignoreInvalidChars: Boolean = false, configureFactory: AsyncXMLInputFactory => Unit = configureDefault ): FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = - FlowWithContext.fromTuples(Flow.fromGraph(new impl.StreamingXmlParser[Ctx](ignoreInvalidChars, configureFactory))) + FlowWithContext.fromTuples( + Flow.fromGraph( + new impl.StreamingXmlParser[(ByteString, Ctx), (ParseEvent, Ctx), Ctx](ignoreInvalidChars, + configureFactory, + getByteString = _._1, + getContext = _._2, + buildOutput = (pe, ctx) => (pe, ctx)) + ) + ) /** * A Flow that transforms a stream of XML ParseEvents. This stage coalesces consecutive CData and Characters From 2d980fb120140da7e9ba5eddcef5d45c9cc50e2d Mon Sep 17 00:00:00 2001 From: Joao Azevedo Date: Tue, 29 Nov 2022 00:43:43 +0000 Subject: [PATCH 3/7] Use a data type to contain all transform methods --- .../alpakka/xml/impl/StreamingXmlParser.scala | 57 +++++++++++++------ .../alpakka/xml/scaladsl/XmlParsing.scala | 14 ++--- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala b/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala index 88d28aa7c2..ddc7b43c6c 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala @@ -17,6 +17,28 @@ import scala.annotation.tailrec private[xml] object StreamingXmlParser { lazy val withStreamingFinishedException = new IllegalStateException("Stream finished before event was fully parsed.") + + sealed trait Transform[A, B, Ctx] { + def getByteString(a: A): ByteString + def getContext(a: A): Ctx + def buildOutput(pe: ParseEvent, ctx: Ctx): B + } + + object Transform { + final val uncontextual: Transform[ByteString, ParseEvent, Unit] = + new Transform[ByteString, ParseEvent, Unit] { + def getByteString(a: ByteString): ByteString = a + def getContext(a: ByteString): Unit = () + def buildOutput(pe: ParseEvent, ctx: Unit): ParseEvent = pe + } + + final def contextual[Ctx]: Transform[(ByteString, Ctx), (ParseEvent, Ctx), Ctx] = + new Transform[(ByteString, Ctx), (ParseEvent, Ctx), Ctx] { + def getByteString(a: (ByteString, Ctx)): ByteString = a._1 + def getContext(a: (ByteString, Ctx)): Ctx = a._2 + def buildOutput(pe: ParseEvent, ctx: Ctx): (ParseEvent, Ctx) = (pe, ctx) + } + } } /** @@ -24,9 +46,7 @@ private[xml] object StreamingXmlParser { */ @InternalApi private[xml] class StreamingXmlParser[A, B, Ctx](ignoreInvalidChars: Boolean, configureFactory: AsyncXMLInputFactory => Unit, - getByteString: A => ByteString, - getContext: A => Ctx, - buildOutput: (ParseEvent, Ctx) => B) + transform: StreamingXmlParser.Transform[A, B, Ctx]) extends GraphStage[FlowShape[A, B]] { val in: Inlet[A] = Inlet("XMLParser.in") val out: Outlet[B] = Outlet("XMLParser.out") @@ -50,8 +70,8 @@ private[xml] object StreamingXmlParser { override def onPush(): Unit = { val a = grab(in) - val bs = getByteString(a) - context = getContext(a) + val bs = transform.getByteString(a) + context = transform.getContext(a) val array = bs.toArray parser.getInputFeeder.feedInput(array, 0, array.length) advanceParser() @@ -74,10 +94,10 @@ private[xml] object StreamingXmlParser { case XMLStreamConstants.START_DOCUMENT => started = true - push(out, buildOutput(StartDocument, context)) + push(out, transform.buildOutput(StartDocument, context)) case XMLStreamConstants.END_DOCUMENT => - push(out, buildOutput(EndDocument, context)) + push(out, transform.buildOutput(EndDocument, context)) completeStage() case XMLStreamConstants.START_ELEMENT => @@ -98,29 +118,30 @@ private[xml] object StreamingXmlParser { val optNs = optPrefix.flatMap(prefix => Option(parser.getNamespaceURI(prefix))) push( out, - buildOutput(StartElement(parser.getLocalName, - attributes, - optPrefix.filterNot(_ == ""), - optNs.filterNot(_ == ""), - namespaceCtx = namespaces), - context) + transform.buildOutput(StartElement(parser.getLocalName, + attributes, + optPrefix.filterNot(_ == ""), + optNs.filterNot(_ == ""), + namespaceCtx = namespaces), + context) ) case XMLStreamConstants.END_ELEMENT => - push(out, buildOutput(EndElement(parser.getLocalName), context)) + push(out, transform.buildOutput(EndElement(parser.getLocalName), context)) case XMLStreamConstants.CHARACTERS => - push(out, buildOutput(Characters(parser.getText), context)) + push(out, transform.buildOutput(Characters(parser.getText), context)) case XMLStreamConstants.PROCESSING_INSTRUCTION => push(out, - buildOutput(ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData)), context)) + transform.buildOutput(ProcessingInstruction(Option(parser.getPITarget), Option(parser.getPIData)), + context)) case XMLStreamConstants.COMMENT => - push(out, buildOutput(Comment(parser.getText), context)) + push(out, transform.buildOutput(Comment(parser.getText), context)) case XMLStreamConstants.CDATA => - push(out, buildOutput(CData(parser.getText), context)) + push(out, transform.buildOutput(CData(parser.getText), context)) // Do not support DTD, SPACE, NAMESPACE, NOTATION_DECLARATION, ENTITY_DECLARATION, PROCESSING_INSTRUCTION // ATTRIBUTE is handled in START_ELEMENT implicitly diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala index e15c9daf2c..14309f80d7 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala @@ -44,9 +44,7 @@ object XmlParsing { Flow.fromGraph( new impl.StreamingXmlParser[ByteString, ParseEvent, Unit](ignoreInvalidChars, configureFactory, - getByteString = identity, - getContext = _ => (), - buildOutput = (parseEvent, _) => parseEvent) + impl.StreamingXmlParser.Transform.uncontextual) ) ) @@ -60,11 +58,11 @@ object XmlParsing { ): FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = FlowWithContext.fromTuples( Flow.fromGraph( - new impl.StreamingXmlParser[(ByteString, Ctx), (ParseEvent, Ctx), Ctx](ignoreInvalidChars, - configureFactory, - getByteString = _._1, - getContext = _._2, - buildOutput = (pe, ctx) => (pe, ctx)) + new impl.StreamingXmlParser[(ByteString, Ctx), (ParseEvent, Ctx), Ctx]( + ignoreInvalidChars, + configureFactory, + impl.StreamingXmlParser.Transform.contextual + ) ) ) From 781b374534e9838f13c8db60da6fe89ccde6327b Mon Sep 17 00:00:00 2001 From: Joao Azevedo Date: Thu, 1 Dec 2022 21:59:41 +0000 Subject: [PATCH 4/7] Rename test example --- xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala index 7a6a834baa..20ff795021 100644 --- a/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala +++ b/xml/src/test/scala/docs/scaladsl/XmlProcessingSpec.scala @@ -356,7 +356,7 @@ class XmlProcessingSpec extends AnyWordSpec with Matchers with ScalaFutures with configWasCalled shouldBe true } - "properly parse XML contextually" in { + "parse XML and attach line numbers as context" in { val doc = """| | | elem1 From 3729e3151593afd3a107688a06d7dcec859ef8ff Mon Sep 17 00:00:00 2001 From: Joao Azevedo Date: Thu, 1 Dec 2022 22:03:38 +0000 Subject: [PATCH 5/7] Reword Scaladoc around contextual flows --- .../stream/alpakka/xml/javadsl/XmlParsing.scala | 16 ++++++++-------- .../stream/alpakka/xml/scaladsl/XmlParsing.scala | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala b/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala index ae0d3c0a67..b131a86dbe 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala @@ -24,8 +24,8 @@ object XmlParsing { xml.scaladsl.XmlParsing.parser.asJava /** - * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to - * SAX. + * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX while keeping + * a context attached. */ def parserWithContext[Ctx](): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = xml.scaladsl.XmlParsing.parserWithContext().asJava @@ -37,8 +37,8 @@ object XmlParsing { xml.scaladsl.XmlParsing.parser(ignoreInvalidChars).asJava /** - * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to - * SAX. + * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX while keeping + * a context attached. */ def parserWithContext[Ctx]( ignoreInvalidChars: Boolean @@ -54,8 +54,8 @@ object XmlParsing { xml.scaladsl.XmlParsing.parser(false, configureFactory.accept(_)).asJava /** - * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to - * SAX. + * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX while keeping + * a context attached. */ def parserWithContext[Ctx]( configureFactory: Consumer[AsyncXMLInputFactory] @@ -72,8 +72,8 @@ object XmlParsing { xml.scaladsl.XmlParsing.parser(ignoreInvalidChars, configureFactory.accept(_)).asJava /** - * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to - * SAX. + * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX while keeping + * a context attached. */ def parserWithContext[Ctx]( ignoreInvalidChars: Boolean, diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala index 14309f80d7..d9a1b385f2 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala @@ -49,8 +49,8 @@ object XmlParsing { ) /** - * Contextual version of a parser Flow that takes a stream of ByteStrings and parses them to XML events similar to - * SAX. + * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX while keeping + * a context attached. */ def parserWithContext[Ctx]( ignoreInvalidChars: Boolean = false, From cf232d12cfd3ef0aab2d754dbfe8459c148e3ddd Mon Sep 17 00:00:00 2001 From: Joao Azevedo Date: Thu, 1 Dec 2022 22:04:38 +0000 Subject: [PATCH 6/7] Reduce the number of parameter list variants for `parserWithContext` --- .../akka/stream/alpakka/xml/javadsl/XmlParsing.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala b/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala index b131a86dbe..ab9b0bd517 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/javadsl/XmlParsing.scala @@ -53,15 +53,6 @@ object XmlParsing { ): akka.stream.javadsl.Flow[ByteString, ParseEvent, NotUsed] = xml.scaladsl.XmlParsing.parser(false, configureFactory.accept(_)).asJava - /** - * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX while keeping - * a context attached. - */ - def parserWithContext[Ctx]( - configureFactory: Consumer[AsyncXMLInputFactory] - ): akka.stream.javadsl.FlowWithContext[ByteString, Ctx, ParseEvent, Ctx, NotUsed] = - xml.scaladsl.XmlParsing.parserWithContext(false, configureFactory.accept(_)).asJava - /** * Parser Flow that takes a stream of ByteStrings and parses them to XML events similar to SAX. */ From 7fd0c69773b8cc125eeff5a377fa6085a05fdb93 Mon Sep 17 00:00:00 2001 From: Joao Azevedo Date: Thu, 1 Dec 2022 22:07:45 +0000 Subject: [PATCH 7/7] Rename `Transform` to `ContextHandler` --- .../alpakka/xml/impl/StreamingXmlParser.scala | 14 +++++++------- .../stream/alpakka/xml/scaladsl/XmlParsing.scala | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala b/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala index ddc7b43c6c..cac45b0a06 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/impl/StreamingXmlParser.scala @@ -18,22 +18,22 @@ import scala.annotation.tailrec private[xml] object StreamingXmlParser { lazy val withStreamingFinishedException = new IllegalStateException("Stream finished before event was fully parsed.") - sealed trait Transform[A, B, Ctx] { + sealed trait ContextHandler[A, B, Ctx] { def getByteString(a: A): ByteString def getContext(a: A): Ctx def buildOutput(pe: ParseEvent, ctx: Ctx): B } - object Transform { - final val uncontextual: Transform[ByteString, ParseEvent, Unit] = - new Transform[ByteString, ParseEvent, Unit] { + object ContextHandler { + final val uncontextual: ContextHandler[ByteString, ParseEvent, Unit] = + new ContextHandler[ByteString, ParseEvent, Unit] { def getByteString(a: ByteString): ByteString = a def getContext(a: ByteString): Unit = () def buildOutput(pe: ParseEvent, ctx: Unit): ParseEvent = pe } - final def contextual[Ctx]: Transform[(ByteString, Ctx), (ParseEvent, Ctx), Ctx] = - new Transform[(ByteString, Ctx), (ParseEvent, Ctx), Ctx] { + final def contextual[Ctx]: ContextHandler[(ByteString, Ctx), (ParseEvent, Ctx), Ctx] = + new ContextHandler[(ByteString, Ctx), (ParseEvent, Ctx), Ctx] { def getByteString(a: (ByteString, Ctx)): ByteString = a._1 def getContext(a: (ByteString, Ctx)): Ctx = a._2 def buildOutput(pe: ParseEvent, ctx: Ctx): (ParseEvent, Ctx) = (pe, ctx) @@ -46,7 +46,7 @@ private[xml] object StreamingXmlParser { */ @InternalApi private[xml] class StreamingXmlParser[A, B, Ctx](ignoreInvalidChars: Boolean, configureFactory: AsyncXMLInputFactory => Unit, - transform: StreamingXmlParser.Transform[A, B, Ctx]) + transform: StreamingXmlParser.ContextHandler[A, B, Ctx]) extends GraphStage[FlowShape[A, B]] { val in: Inlet[A] = Inlet("XMLParser.in") val out: Outlet[B] = Outlet("XMLParser.out") diff --git a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala index d9a1b385f2..9c1f6fb1a8 100644 --- a/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala +++ b/xml/src/main/scala/akka/stream/alpakka/xml/scaladsl/XmlParsing.scala @@ -44,7 +44,7 @@ object XmlParsing { Flow.fromGraph( new impl.StreamingXmlParser[ByteString, ParseEvent, Unit](ignoreInvalidChars, configureFactory, - impl.StreamingXmlParser.Transform.uncontextual) + impl.StreamingXmlParser.ContextHandler.uncontextual) ) ) @@ -61,7 +61,7 @@ object XmlParsing { new impl.StreamingXmlParser[(ByteString, Ctx), (ParseEvent, Ctx), Ctx]( ignoreInvalidChars, configureFactory, - impl.StreamingXmlParser.Transform.contextual + impl.StreamingXmlParser.ContextHandler.contextual ) ) )