Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f8961e8
init
shujingyang-db Jan 3, 2024
fc0edc9
fix for rdc
shujingyang-db Jan 3, 2024
d98a51b
revert AstBuilder
shujingyang-db Jan 3, 2024
e6654d5
nit
shujingyang-db Jan 3, 2024
d368ba0
ckpt
shujingyang-db Jan 4, 2024
4eb292d
ckpt
shujingyang-db Jan 4, 2024
4506de2
rm startElementName
shujingyang-db Jan 4, 2024
596cc90
revert and style
shujingyang-db Jan 4, 2024
0c0f47c
revert
shujingyang-db Jan 4, 2024
a2cd1ef
addOrUpdate
shujingyang-db Jan 4, 2024
98effee
revert
shujingyang-db Jan 4, 2024
f9e445d
nullType for newline character
shujingyang-db Jan 4, 2024
49f8b4e
currentStructureAsString
shujingyang-db Jan 4, 2024
3cdfe1f
rm valueTag field in a test case
shujingyang-db Jan 4, 2024
e1585cf
simplify whitespace
shujingyang-db Jan 5, 2024
66f2449
whitespace
shujingyang-db Jan 5, 2024
d6cd704
add comments in testcase
shujingyang-db Jan 5, 2024
d4e82fc
badRecord
shujingyang-db Jan 5, 2024
3ea45ae
badrecords
shujingyang-db Jan 5, 2024
1e61fff
badRecord:
shujingyang-db Jan 5, 2024
fe4cf28
comments
shujingyang-db Jan 5, 2024
9c8adb6
cdata and unsupported events
shujingyang-db Jan 5, 2024
e87ab03
comments
shujingyang-db Jan 5, 2024
8f18b64
assert on endElement name
shujingyang-db Jan 5, 2024
685fd8b
endDocument
shujingyang-db Jan 5, 2024
4f56bf5
valuetag case insensitive
shujingyang-db Jan 5, 2024
271e99d
Merge remote-tracking branch 'origin/cpature-values-follow-up' into c…
shujingyang-db Jan 5, 2024
2d10bd0
nit
shujingyang-db Jan 5, 2024
fb5d6a0
Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/…
shujingyang-db Jan 5, 2024
c70236c
fix partial results test case
shujingyang-db Jan 5, 2024
0e26f71
Merge branch 'cpature-values-follow-up' of github.com:shujingyang-db/…
shujingyang-db Jan 5, 2024
d0dc7a0
fix StaxXmlParserUtilsSuite
shujingyang-db Jan 6, 2024
ec8d4f0
Merge remote-tracking branch 'spark/master' into cpature-values-follo…
shujingyang-db Jan 7, 2024
ec9ab95
rm unused functions
shujingyang-db Jan 7, 2024
99f5204
rm unused functions
shujingyang-db Jan 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import javax.xml.stream.events._
import javax.xml.transform.stream.StreamSource
import javax.xml.validation.Schema

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.Try
Expand Down Expand Up @@ -151,12 +150,7 @@ class StaxXmlParser(
}
val parser = StaxXmlParserUtils.filteredReader(xml)
val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser)
// A structure object is an attribute-only element
// if it only consists of attributes and valueTags.
val isRootAttributesOnly = schema.fields.forall { f =>
f.name == options.valueTag || f.name.startsWith(options.attributePrefix)
}
val result = Some(convertObject(parser, schema, rootAttributes, isRootAttributesOnly))
val result = Some(convertObject(parser, schema, rootAttributes))
parser.close()
result
} catch {
Expand Down Expand Up @@ -195,69 +189,60 @@ class StaxXmlParser(
private[xml] def convertField(
parser: XMLEventReader,
dataType: DataType,
startElementName: String,
attributes: Array[Attribute] = Array.empty): Any = {

def convertComplicatedType(dt: DataType, attributes: Array[Attribute]): Any = dt match {
def convertComplicatedType(
dt: DataType,
startElementName: String,
attributes: Array[Attribute]): Any = dt match {
case st: StructType => convertObject(parser, st)
case MapType(StringType, vt, _) => convertMap(parser, vt, attributes)
case ArrayType(st, _) => convertField(parser, st)
case ArrayType(st, _) => convertField(parser, st, startElementName)
case _: StringType =>
convertTo(StaxXmlParserUtils.currentStructureAsString(parser), StringType)
convertTo(
StaxXmlParserUtils.currentStructureAsString(
parser, startElementName, options),
StringType)
}

(parser.peek, dataType) match {
case (_: StartElement, dt: DataType) => convertComplicatedType(dt, attributes)
case (_: StartElement, dt: DataType) =>
convertComplicatedType(dt, startElementName, attributes)
case (_: EndElement, _: StringType) =>
StaxXmlParserUtils.skipNextEndElement(parser, startElementName, options)
// Empty. It's null if "" is the null value
if (options.nullValue == "") {
null
} else {
UTF8String.fromString("")
}
case (_: EndElement, _: DataType) => null
case (_: EndElement, _: DataType) =>
StaxXmlParserUtils.skipNextEndElement(parser, startElementName, options)
null
case (c: Characters, ArrayType(st, _)) =>
// For `ArrayType`, it needs to return the type of element. The values are merged later.
parser.next
convertTo(c.getData, st)
case (c: Characters, st: StructType) =>
parser.next
parser.peek match {
case _: EndElement =>
// It couldn't be an array of value tags
// as the opening tag is immediately followed by a closing tag.
if (c.isWhiteSpace) {
return null
}
val indexOpt = getFieldNameToIndex(st).get(options.valueTag)
indexOpt match {
case Some(index) =>
convertTo(c.getData, st.fields(index).dataType)
case None => null
}
case _ =>
val row = convertObject(parser, st)
if (!c.isWhiteSpace) {
addOrUpdate(row.toSeq(st).toArray, st, options.valueTag, c.getData, addToTail = false)
} else {
row
}
}
val value = convertTo(c.getData, st)
StaxXmlParserUtils.skipNextEndElement(parser, startElementName, options)
value
case (_: Characters, st: StructType) =>
convertObject(parser, st)
case (_: Characters, _: StringType) =>
convertTo(StaxXmlParserUtils.currentStructureAsString(parser), StringType)
convertTo(
StaxXmlParserUtils.currentStructureAsString(
parser, startElementName, options),
StringType)
case (c: Characters, _: DataType) if c.isWhiteSpace =>
// When `Characters` is found, we need to look further to decide
// if this is really data or space between other elements.
val data = c.getData
parser.next
parser.peek match {
case _: StartElement => convertComplicatedType(dataType, attributes)
case _: EndElement if data.isEmpty => null
case _: EndElement => convertTo(data, dataType)
case _ => convertField(parser, dataType, attributes)
}
convertField(parser, dataType, startElementName, attributes)
case (c: Characters, dt: DataType) =>
val value = convertTo(c.getData, dt)
parser.next
convertTo(c.getData, dt)
StaxXmlParserUtils.skipNextEndElement(parser, startElementName, options)
value
case (e: XMLEvent, dt: DataType) =>
throw new IllegalArgumentException(
s"Failed to parse a value for data type $dt with event ${e.toString}")
Expand All @@ -280,16 +265,16 @@ class StaxXmlParser(
while (!shouldStop) {
parser.nextEvent match {
case e: StartElement =>
val key = StaxXmlParserUtils.getName(e.asStartElement.getName, options)
kvPairs +=
(UTF8String.fromString(StaxXmlParserUtils.getName(e.asStartElement.getName, options)) ->
convertField(parser, valueType))
(UTF8String.fromString(key) -> convertField(parser, valueType, key))
case c: Characters if !c.isWhiteSpace =>
// Create a value tag field for it
kvPairs +=
// TODO: We don't support an array value tags in map yet.
(UTF8String.fromString(options.valueTag) -> convertTo(c.getData, valueType))
case _: EndElement =>
shouldStop = StaxXmlParserUtils.checkEndElement(parser)
case _: EndElement | _: EndDocument =>
shouldStop = true
case _ => // do nothing
}
}
Expand Down Expand Up @@ -321,6 +306,7 @@ class StaxXmlParser(
private def convertObjectWithAttributes(
parser: XMLEventReader,
schema: StructType,
startElementName: String,
attributes: Array[Attribute] = Array.empty): InternalRow = {
// TODO: This method might have to be removed. Some logics duplicate `convertObject()`
val row = new Array[Any](schema.length)
Expand All @@ -329,7 +315,7 @@ class StaxXmlParser(
val attributesMap = convertAttributes(attributes, schema)

// Then, we read elements here.
val fieldsMap = convertField(parser, schema) match {
val fieldsMap = convertField(parser, schema, startElementName) match {
case internalRow: InternalRow =>
Map(schema.map(_.name).zip(internalRow.toSeq(schema)): _*)
case v if schema.fieldNames.contains(options.valueTag) =>
Expand Down Expand Up @@ -363,8 +349,7 @@ class StaxXmlParser(
private def convertObject(
parser: XMLEventReader,
schema: StructType,
rootAttributes: Array[Attribute] = Array.empty,
isRootAttributesOnly: Boolean = false): InternalRow = {
rootAttributes: Array[Attribute] = Array.empty): InternalRow = {
val row = new Array[Any](schema.length)
val nameToIndex = getFieldNameToIndex(schema)
// If there are attributes, then we process them first.
Expand All @@ -388,29 +373,29 @@ class StaxXmlParser(
nameToIndex.get(field) match {
case Some(index) => schema(index).dataType match {
case st: StructType =>
row(index) = convertObjectWithAttributes(parser, st, attributes)
row(index) = convertObjectWithAttributes(parser, st, field, attributes)

case ArrayType(dt: DataType, _) =>
val values = Option(row(index))
.map(_.asInstanceOf[ArrayBuffer[Any]])
.getOrElse(ArrayBuffer.empty[Any])
val newValue = dt match {
case st: StructType =>
convertObjectWithAttributes(parser, st, attributes)
convertObjectWithAttributes(parser, st, field, attributes)
case dt: DataType =>
convertField(parser, dt)
convertField(parser, dt, field)
}
row(index) = values :+ newValue

case dt: DataType =>
row(index) = convertField(parser, dt, attributes)
row(index) = convertField(parser, dt, field, attributes)
}

case None =>
if (hasWildcard) {
// Special case: there's an 'any' wildcard element that matches anything else
// as a string (or array of strings, to parse multiple ones)
val newValue = convertField(parser, StringType)
val newValue = convertField(parser, StringType, field)
val anyIndex = schema.fieldIndex(wildcardColName)
schema(wildcardColName).dataType match {
case StringType =>
Expand All @@ -423,19 +408,21 @@ class StaxXmlParser(
}
} else {
StaxXmlParserUtils.skipChildren(parser)
StaxXmlParserUtils.skipNextEndElement(parser, field, options)
}
}
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) =>
// TODO: we don't support partial results now
badRecordException = badRecordException.orElse(Some(e))
}

case c: Characters if !c.isWhiteSpace =>
addOrUpdate(row, schema, options.valueTag, c.getData)

case _: EndElement =>
shouldStop = parseAndCheckEndElement(row, schema, parser)
case _: EndElement | _: EndDocument =>
shouldStop = true

case _ => // do nothing
}
Expand Down Expand Up @@ -599,24 +586,6 @@ class StaxXmlParser(
}
}

@tailrec
private def parseAndCheckEndElement(
row: Array[Any],
schema: StructType,
parser: XMLEventReader): Boolean = {
parser.peek match {
case _: EndElement | _: EndDocument => true
case _: StartElement => false
case c: Characters if !c.isWhiteSpace =>
parser.nextEvent()
addOrUpdate(row, schema, options.valueTag, c.getData)
parseAndCheckEndElement(row, schema, parser)
case _ =>
parser.nextEvent()
parseAndCheckEndElement(row, schema, parser)
}
}

private def addOrUpdate(
row: Array[Any],
schema: StructType,
Expand All @@ -628,17 +597,14 @@ class StaxXmlParser(
schema(index).dataType match {
case ArrayType(elementType, _) =>
val value = convertTo(data, elementType)
val result = if (row(index) == null) {
ArrayBuffer(value)
} else {
val genericArrayData = row(index).asInstanceOf[GenericArrayData]
if (addToTail) {
genericArrayData.toArray(elementType) :+ value
val values = Option(row(index))
.map(_.asInstanceOf[ArrayBuffer[Any]])
.getOrElse(ArrayBuffer.empty[Any])
row(index) = if (addToTail) {
values :+ value
} else {
value +: genericArrayData.toArray(elementType)
value +: values
}
}
row(index) = new GenericArrayData(result)
case dataType =>
row(index) = convertTo(data, dataType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ object StaxXmlParserUtils {
def filteredReader(xml: String): XMLEventReader = {
val filter = new EventFilter {
override def accept(event: XMLEvent): Boolean =
// Ignore comments and processing instructions
event.getEventType match {
// Ignore comments and processing instructions
case XMLStreamConstants.COMMENT | XMLStreamConstants.PROCESSING_INSTRUCTION => false
// unsupported events
case XMLStreamConstants.DTD |
XMLStreamConstants.ENTITY_DECLARATION |
XMLStreamConstants.ENTITY_REFERENCE |
XMLStreamConstants.NOTATION_DECLARATION => false
case _ => true
}
}
Expand Down Expand Up @@ -121,7 +126,10 @@ object StaxXmlParserUtils {
/**
* Convert the current structure of XML document to a XML string.
*/
def currentStructureAsString(parser: XMLEventReader): String = {
def currentStructureAsString(
parser: XMLEventReader,
startElementName: String,
options: XmlOptions): String = {
val xmlString = new StringBuilder()
var indent = 0
do {
Expand Down Expand Up @@ -151,6 +159,7 @@ object StaxXmlParserUtils {
indent > 0
case _ => true
})
skipNextEndElement(parser, startElementName, options)
xmlString.toString()
}

Expand Down Expand Up @@ -178,4 +187,21 @@ object StaxXmlParserUtils {
}
}
}

@tailrec
def skipNextEndElement(
parser: XMLEventReader,
expectedNextEndElementName: String,
options: XmlOptions): Unit = {
parser.nextEvent() match {
case c: Characters if c.isWhiteSpace =>
skipNextEndElement(parser, expectedNextEndElementName, options)
case endElement: EndElement =>
assert(
getName(endElement.getName, options) == expectedNextEndElementName,
s"Expected EndElement </$expectedNextEndElementName>")
case _ => throw new IllegalStateException(
s"Expected EndElement </$expectedNextEndElementName>")
}
}
}
Loading