Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.
30 changes: 3 additions & 27 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,13 @@ cache:
- $HOME/.ivy2
matrix:
include:
# Spark 1.3.0
- jdk: openjdk6
scala: 2.10.5
env: TEST_HADOOP_VERSION="1.2.1" TEST_SPARK_VERSION="1.3.0"
- jdk: openjdk6
scala: 2.11.7
env: TEST_HADOOP_VERSION="1.0.4" TEST_SPARK_VERSION="1.3.0"
# Spark 1.4.1
# We only test Spark 1.4.1 with Hadooop 2.2.0 because
# https://github.com/apache/spark/pull/6599 is not present in 1.4.1,
# so the published Spark Maven artifacts will not work with Hadoop 1.x.
- jdk: openjdk6
scala: 2.10.5
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="1.4.1"
- jdk: openjdk7
scala: 2.11.7
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="1.4.1"
# Spark 1.5.0
- jdk: openjdk7
scala: 2.10.5
env: TEST_HADOOP_VERSION="1.0.4" TEST_SPARK_VERSION="1.5.0"
- jdk: openjdk7
scala: 2.11.7
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="1.5.0"
# Spark 1.6.0
# Spark 2.0.0
- jdk: openjdk7
scala: 2.10.5
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="1.6.0"
env: TEST_HADOOP_VERSION="2.2.0" TEST_SPARK_VERSION="2.0.0"
- jdk: openjdk7
scala: 2.11.7
env: TEST_HADOOP_VERSION="1.2.1" TEST_SPARK_VERSION="1.6.0"
env: TEST_HADOOP_VERSION="2.6.0" TEST_SPARK_VERSION="2.0.0"
script:
- sbt -Dhadoop.testVersion=$TEST_HADOOP_VERSION -Dspark.testVersion=$TEST_SPARK_VERSION ++$TRAVIS_SCALA_VERSION coverage test
- sbt ++$TRAVIS_SCALA_VERSION assembly
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -496,4 +496,3 @@ This library is built with [SBT](http://www.scala-sbt.org/0.13/docs/Command-Line
## Acknowledgements

This project was initially created by [HyukjinKwon](https://github.com/HyukjinKwon) and donated to [Databricks](https://databricks.com).

27 changes: 2 additions & 25 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "spark-xml"

version := "0.3.4"
version := "0.4.0-SNAPSHOT"

organization := "com.databricks"

Expand All @@ -10,7 +10,7 @@ spName := "databricks/spark-xml"

crossScalaVersions := Seq("2.10.5", "2.11.7")

sparkVersion := "1.6.0"
sparkVersion := "2.0.0"

val testSparkVersion = settingKey[String]("The version of Spark to test against.")

Expand Down Expand Up @@ -75,26 +75,3 @@ ScoverageSbtPlugin.ScoverageKeys.coverageHighlighting := {
if (scalaBinaryVersion.value == "2.10") false
else true
}

// -- MiMa binary compatibility checks ------------------------------------------------------------

//import com.typesafe.tools.mima.core._
//import com.typesafe.tools.mima.plugin.MimaKeys.binaryIssueFilters
//import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact
//import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
//
//mimaDefaultSettings ++ Seq(
// previousArtifact := Some("org.apache" %% "spark-xml" % "1.2.0"),
// binaryIssueFilters ++= Seq(
// // These classes are not intended to be public interfaces:
// ProblemFilters.excludePackage("org.apache.spark.xml.XmlRelation"),
// ProblemFilters.excludePackage("org.apache.spark.xml.util.InferSchema"),
// ProblemFilters.excludePackage("org.apache.spark.sql.readers"),
// ProblemFilters.excludePackage("org.apache.spark.xml.util.TypeCast"),
// // We allowed the private `XmlRelation` type to leak into the public method signature:
// ProblemFilters.exclude[IncompatibleResultTypeProblem](
// "org.apache.spark.xml.DefaultSource.createRelation")
// )
//)

// ------------------------------------------------------------------------------------------------
5 changes: 1 addition & 4 deletions src/main/scala/com/databricks/spark/xml/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import com.databricks.spark.xml.util.CompressionCodecs
import com.databricks.spark.xml.util.XmlFile

/**
Expand Down Expand Up @@ -90,9 +89,7 @@ class DefaultSource
}
if (doSave) {
// Only save data when the save mode is not ignore.
val codecClass =
CompressionCodecs.getCodecClass(XmlOptions(parameters).codec)
data.saveAsXmlFile(filesystemPath.toString, parameters, codecClass)
XmlFile.saveAsXmlFile(data, filesystemPath.toString, parameters)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I moved the codec handling inside to saveAsXmlFile so that codec option in parameters can be also concerned.

}
createRelation(sqlContext, parameters, data.schema)
}
Expand Down
39 changes: 13 additions & 26 deletions src/main/scala/com/databricks/spark/xml/XmlInputFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,7 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {

override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {
val fileSplit: FileSplit = split.asInstanceOf[FileSplit]
val conf: Configuration = {
// Use reflection to get the Configuration. This is necessary because TaskAttemptContext is
// a class in Hadoop 1.x and an interface in Hadoop 2.x.
val method = context.getClass.getMethod("getConfiguration")
method.invoke(context).asInstanceOf[Configuration]
}
val conf: Configuration = context.getConfiguration
val charset =
Charset.forName(conf.get(XmlInputFormat.ENCODING_KEY, XmlOptions.DEFAULT_CHARSET))
startTag = conf.get(XmlInputFormat.START_TAG_KEY).getBytes(charset)
Expand All @@ -97,25 +92,18 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {
val codec = new CompressionCodecFactory(conf).getCodec(path)
if (null != codec) {
decompressor = CodecPool.getDecompressor(codec)
// Use reflection to get the splittable compression codec and stream. This is necessary
// because SplittableCompressionCodec does not exist in Hadoop 1.0.x.
def isSplitCompressionCodec(obj: Any) = {
val splittableClassName = "org.apache.hadoop.io.compress.SplittableCompressionCodec"
obj.getClass.getInterfaces.map(_.getName).contains(splittableClassName)
}
// Here I made separate variables to avoid to try to find SplitCompressionInputStream at
// runtime.
val (inputStream, seekable) = codec match {
case c: CompressionCodec if isSplitCompressionCodec(c) =>
// At Hadoop 1.0.x, this case would not be executed.
val cIn = {
val sc = c.asInstanceOf[SplittableCompressionCodec]
sc.createInputStream(fsin, decompressor, start,
end, SplittableCompressionCodec.READ_MODE.BYBLOCK)
}
codec match {
case sc: SplittableCompressionCodec =>
val cIn = sc.createInputStream(
fsin,
decompressor,
start,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big change. I just removed the reflection part that was needed for Hadoop 1.x.

end,
SplittableCompressionCodec.READ_MODE.BYBLOCK)
start = cIn.getAdjustedStart
end = cIn.getAdjustedEnd
(cIn, cIn)
in = cIn
filePosition = cIn
case c: CompressionCodec =>
if (start != 0) {
// So we have a split that is only part of a file stored using
Expand All @@ -124,10 +112,9 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {
codec.getClass.getSimpleName + " compressed stream")
}
val cIn = c.createInputStream(fsin, decompressor)
(cIn, fsin)
in = cIn
filePosition = fsin
}
in = inputStream
filePosition = seekable
} else {
in = fsin
filePosition = fsin
Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/com/databricks/spark/xml/XmlRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.sources.{PrunedScan, InsertableRelation, BaseRelation, TableScan}
import org.apache.spark.sql.types._
import com.databricks.spark.xml.util.{CompressionCodecs, InferSchema}
import com.databricks.spark.xml.util.{InferSchema, XmlFile}
import com.databricks.spark.xml.parsers.StaxXmlParser

case class XmlRelation protected[spark] (
Expand Down Expand Up @@ -90,8 +90,7 @@ case class XmlRelation protected[spark] (
+ s" to INSERT OVERWRITE a XML table:\n${e.toString}")
}
// Write the data. We assume that schema isn't changed, and we won't update it.
val codecClass = CompressionCodecs.getCodecClass(options.codec)
data.saveAsXmlFile(filesystemPath.toString, parameters, codecClass)
XmlFile.saveAsXmlFile(data, filesystemPath.toString, parameters)
} else {
sys.error("XML tables only support INSERT OVERWRITE for now.")
}
Expand Down
73 changes: 8 additions & 65 deletions src/main/scala/com/databricks/spark/xml/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,19 @@
*/
package com.databricks.spark

import java.io.CharArrayWriter
import javax.xml.stream.XMLOutputFactory

import scala.collection.Map

import com.sun.xml.internal.txw2.output.IndentingXMLStreamWriter
import org.apache.hadoop.io.compress.CompressionCodec

import org.apache.spark.sql.{DataFrame, SQLContext}
import com.databricks.spark.xml.util.XmlFile
import com.databricks.spark.xml.parsers.StaxXmlGenerator

package object xml {
/**
* Adds a method, `xmlFile`, to [[SQLContext]] that allows reading XML data.
*/
implicit class XmlContext(sqlContext: SQLContext) extends Serializable {
@deprecated("Use DataFrameReader.read()", "0.4.0")
def xmlFile(
filePath: String,
rowTag: String = XmlOptions.DEFAULT_ROW_TAG,
Expand Down Expand Up @@ -82,69 +78,16 @@ package object xml {
// </fieldA>
//
// Namely, roundtrip in writing and reading can end up in different schema structure.
@deprecated("Use DataFrameWriter.write()", "0.4.0")
def saveAsXmlFile(
path: String, parameters: Map[String, String] = Map(),
compressionCodec: Class[_ <: CompressionCodec] = null): Unit = {
val options = XmlOptions(parameters.toMap)
val startElement = s"<${options.rootTag}>"
val endElement = s"</${options.rootTag}>"
val rowSchema = dataFrame.schema
val indent = XmlFile.DEFAULT_INDENT
val rowSeparator = XmlFile.DEFAULT_ROW_SEPARATOR

val xmlRDD = dataFrame.rdd.mapPartitions { iter =>
val factory = XMLOutputFactory.newInstance()
val writer = new CharArrayWriter()
val xmlWriter = factory.createXMLStreamWriter(writer)
val indentingXmlWriter = new IndentingXMLStreamWriter(xmlWriter)
indentingXmlWriter.setIndentStep(indent)

new Iterator[String] {
var firstRow: Boolean = true
var lastRow: Boolean = true

override def hasNext: Boolean = iter.hasNext || firstRow || lastRow

override def next: String = {
if (iter.nonEmpty) {
val xml = {
StaxXmlGenerator(
rowSchema,
indentingXmlWriter,
options)(iter.next())
writer.toString
}
writer.reset()

// Here it needs to add indentations for the start of each line,
// in order to insert the start element and end element.
val indentedXml = indent + xml.replaceAll(rowSeparator, rowSeparator + indent)
if (firstRow) {
firstRow = false
startElement + rowSeparator + indentedXml
} else {
indentedXml
}
} else {
indentingXmlWriter.close()
if (!firstRow) {
lastRow = false
endElement
} else {
// This means the iterator was initially empty.
firstRow = false
lastRow = false
""
}
}
}
}
}

compressionCodec match {
case null => xmlRDD.saveAsTextFile(path)
case codec => xmlRDD.saveAsTextFile(path, codec)
}
val mutableParams = collection.mutable.Map(parameters.toSeq: _*)
val safeCodec = mutableParams.get("codec")
.orElse(Option(compressionCodec).map(_.getCanonicalName))
.orNull
Copy link
Member Author

@HyukjinKwon HyukjinKwon Aug 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved below codes to src/main/scala/com/databricks/spark/xml/util/XmlFile.scala with deprecating this.

mutableParams.put("codec", safeCodec)
XmlFile.saveAsXmlFile(dataFrame, path, mutableParams.toMap)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ private[xml] object StaxXmlGenerator {
case (ByteType, v: Byte) => writer.writeCharacters(v.toString)
case (BooleanType, v: Boolean) => writer.writeCharacters(v.toString)
case (DateType, v) => writer.writeCharacters(v.toString)
case (udt: UserDefinedType[_], v) => writeElement(udt.sqlType, udt.serialize(v))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to remove this because it was hidden from 2.0.

// For the case roundtrip in reading and writing XML files, [[ArrayType]] cannot have
// [[ArrayType]] as element type. It always wraps the element with [[StructType]]. So,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ private[xml] object StaxXmlParser {
case dt: StructType => convertObject(parser, dt, options)
case MapType(StringType, vt, _) => convertMap(parser, vt, options)
case ArrayType(st, _) => convertField(parser, st, options)
case udt: UserDefinedType[_] => convertField(parser, udt.sqlType, options)
case _: StringType => StaxXmlParserUtils.currentStructureAsString(parser)
}

Expand Down Expand Up @@ -153,7 +152,7 @@ private[xml] object StaxXmlParser {
case (v, ByteType) => castTo(v, ByteType)
case (v, ShortType) => castTo(v, ShortType)
case (v, IntegerType) => signSafeToInt(v)
case (v, _: DecimalType) => castTo(v, new DecimalType(None))
case (v, dt: DecimalType) => castTo(v, dt)
case (_, dataType) =>
sys.error(s"Failed to parse a value for data type $dataType.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[xml] object InferSchema {

/**
* Copied from internal Spark api
* [[org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion]]
* [[org.apache.spark.sql.catalyst.analysis.TypeCoercion]]
*/
private val numericPrecedence: IndexedSeq[DataType] =
IndexedSeq[DataType](
Expand All @@ -47,7 +47,7 @@ private[xml] object InferSchema {
FloatType,
DoubleType,
TimestampType,
DecimalType.Unlimited)
DecimalType.SYSTEM_DEFAULT)

val findTightestCommonTypeOfTwo: (DataType, DataType) => Option[DataType] = {
case (t1, t2) if t1 == t2 => Some(t1)
Expand Down
Loading