@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.orc
1919
2020import java .io .IOException
2121import java .util .{Locale , Properties }
22+
2223import scala .collection .JavaConversions ._
2324
2425import org .apache .hadoop .mapred .{JobConf , InputFormat , FileInputFormat }
@@ -28,6 +29,7 @@ import org.apache.hadoop.fs.{Path, FileSystem}
2829import org .apache .hadoop .hive .ql .io .orc ._
2930import org .apache .hadoop .hive .serde2 .objectinspector .{ObjectInspectorUtils , StructObjectInspector }
3031
32+ import org .apache .spark .deploy .SparkHadoopUtil
3133import org .apache .spark .sql .sources ._
3234import org .apache .spark .sql .{SaveMode , DataFrame , SQLContext }
3335import org .apache .spark .annotation .DeveloperApi
@@ -117,8 +119,15 @@ case class OrcRelation
117119
118120 def sparkContext = sqlContext.sparkContext
119121
120- // todo: how to calculate this size
121- // override val sizeInBytes = ???
122+ // todo: Should calculate per scan size
123+ override val sizeInBytes = {
124+ val fs = FileSystem .get(new java.net.URI (path), sparkContext.hadoopConfiguration)
125+ val fileStatus = fs.getFileStatus(fs.makeQualified(new Path (path)))
126+ val leaves = SparkHadoopUtil .get.listLeafStatuses(fs, fileStatus.getPath).filter { f =>
127+ ! (f.getPath.getName.startsWith(" _" ) || f.getPath.getName.startsWith(" ." ))
128+ }
129+ leaves.map(_.getLen).sum
130+ }
122131
123132 private def initialColumnsNamesTypes (schema : StructType ) = {
124133 val inspector = toInspector(schema).asInstanceOf [StructObjectInspector ]
@@ -205,7 +214,10 @@ case class OrcRelation
205214 * @param relationOutput
206215 * @param conf
207216 */
208- private def addColumnIds (output : Seq [Attribute ], relationOutput : Seq [Attribute ], conf : Configuration ) {
217+ private def addColumnIds (
218+ output : Seq [Attribute ],
219+ relationOutput : Seq [Attribute ],
220+ conf : Configuration ) {
209221 val names = output.map(_.name)
210222 val fieldIdMap = relationOutput.map(_.name.toLowerCase(Locale .ENGLISH )).zipWithIndex.toMap
211223 val ids = output.map { att =>
0 commit comments