You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
persisting a row to a Parquet file works fine but reading the content returns null values in the child structure fields. The following is a full class to reproduce the issue:
import io.eels.component.parquet.{ParquetSink, ParquetSource}
import io.eels.datastream.DataStream
import io.eels.schema._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
object EelBugReproductionTest extends App {
private implicit val conf = new Configuration()
private implicit val fs = FileSystem.get(new Configuration())
case class Child(foo: String, bar: Array[String]) {
override def toString: String = {
s"$foo :: ${bar.mkString(",")}"
}
}
case class Parent(foo: String, children: Map[String, Child]) {
override def toString: String = {
s"$foo :: ${children.mkString(" >> ")}"
}
}
val childStructType = StructType(
Field("foo", StringType),
Field("bar", ArrayType(StringType))
)
val parentStructType = StructType(
Field("foo", StringType),
Field("children", MapType(StringType, childStructType))
)
def toSeq(c: Child): Seq[Any] = Seq(c.foo, c.bar)
def toDataStream(p: Parent) = {
val values = Seq(p.foo, p.children.mapValues(toSeq))
DataStream.fromValues(parentStructType, Seq(values))
}
def write(p: Parent, path: Path): Unit = {
if (fs.exists(path)) fs.delete(path, false)
toDataStream(p).to(ParquetSink(path))
}
def toChild(values: Seq[Any]): Child = {
val foo = values(0).asInstanceOf[String]
val bar = values(1).asInstanceOf[Vector[String]].toArray
Child(foo, bar)
}
def readParent(path: Path): Parent = {
val ps = ParquetSource(path)
val row = ps.toDataStream.head
val foo = row.values(0).asInstanceOf[String]
val children = row.values(1).asInstanceOf[Map[String, Seq[Any]]].mapValues(toChild)
Parent(foo, children)
}
val path = new Path("test.pq")
if (fs.exists(path)) fs.delete(path, false)
val child1 = Child("foo1", Array("bar11", "bar12"))
val child2 = Child("foo2", Array("bar21", "bar22"))
val parent = Parent("foo1", Map("child1" -> child1, "child2" -> child2))
write(parent, path)
println(readParent(path))
}
I have managed to fix the issue locally by amending RowReadSupport.scala as per the following diff:
diff --git a/eel-core/src/main/scala/io/eels/component/parquet/RowReadSupport.scala b/eel-core/src/main/scala/io/eels/component/parquet/RowReadSupport.scala
index 7ec2501e..ebdb32a3 100644
--- a/eel-core/src/main/scala/io/eels/component/parquet/RowReadSupport.scala
+++ b/eel-core/src/main/scala/io/eels/component/parquet/RowReadSupport.scala
@@ -132,11 +132,13 @@ class MapConverter(index: Int,
private val keys = new VectorBuilder()
private val values = new VectorBuilder()
private val keysConverter = Converter(mapType.keyType, false, -1, keys)
private val valuesConverter = Converter(mapType.valueType, false, -1, values)
override def getConverter(fieldIndex: Int): Converter = new GroupConverter {
override def getConverter(fieldIndex: Int): Converter = fieldIndex match {
case 0 => Converter(mapType.keyType, false, -1, keys)
case 1 => Converter(mapType.valueType, false, -1, values)
case 0 => keysConverter
case 1 => valuesConverter
}
override def start(): Unit = ()
override def end(): Unit = () // a no-op as each nested group only contains a single element and we want to handle the finished list
~
I'd appreciate if you could take a look both at the issue and my amendment and merge it if it makes sense to you. Thanks
Regards,
Iñaki
The text was updated successfully, but these errors were encountered:
Inaki-Martin
changed the title
Structs as values in maps not being properly read from Spark files
Structs as values in maps not being properly read from Parquet files
Feb 27, 2018
Given the following example structure
persisting a row to a Parquet file works fine but reading the content returns null values in the child structure fields. The following is a full class to reproduce the issue:
I have managed to fix the issue locally by amending RowReadSupport.scala as per the following diff:
diff --git a/eel-core/src/main/scala/io/eels/component/parquet/RowReadSupport.scala b/eel-core/src/main/scala/io/eels/component/parquet/RowReadSupport.scala
index 7ec2501e..ebdb32a3 100644
--- a/eel-core/src/main/scala/io/eels/component/parquet/RowReadSupport.scala
+++ b/eel-core/src/main/scala/io/eels/component/parquet/RowReadSupport.scala
@@ -132,11 +132,13 @@ class MapConverter(index: Int,
private val keys = new VectorBuilder()
private val values = new VectorBuilder()
private val keysConverter = Converter(mapType.keyType, false, -1, keys)
private val valuesConverter = Converter(mapType.valueType, false, -1, values)
override def getConverter(fieldIndex: Int): Converter = new GroupConverter {
override def getConverter(fieldIndex: Int): Converter = fieldIndex match {
override def start(): Unit = ()
override def end(): Unit = () // a no-op as each nested group only contains a single element and we want to handle the finished list
~
I'd appreciate if you could take a look both at the issue and my amendment and merge it if it makes sense to you. Thanks
Regards,
Iñaki
The text was updated successfully, but these errors were encountered: