Skip to content

Commit f2c246f

Browse files
committed
Orc support through datasource api
1 parent 1c8633f commit f2c246f

File tree

4 files changed

+599
-20
lines changed

4 files changed

+599
-20
lines changed

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,20 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
4242
with SparkHadoopMapRedUtil
4343
with Serializable {
4444

45-
private val now = new Date()
46-
private val conf = new SerializableWritable(jobConf)
45+
protected val now = new Date()
46+
protected val conf = new SerializableWritable(jobConf)
4747

48-
private var jobID = 0
49-
private var splitID = 0
50-
private var attemptID = 0
51-
private var jID: SerializableWritable[JobID] = null
52-
private var taID: SerializableWritable[TaskAttemptID] = null
48+
protected var jobID = 0
49+
protected var splitID = 0
50+
protected var attemptID = 0
51+
protected var jID: SerializableWritable[JobID] = null
52+
protected var taID: SerializableWritable[TaskAttemptID] = null
5353

54-
@transient private var writer: RecordWriter[AnyRef,AnyRef] = null
55-
@transient private var format: OutputFormat[AnyRef,AnyRef] = null
56-
@transient private var committer: OutputCommitter = null
57-
@transient private var jobContext: JobContext = null
58-
@transient private var taskContext: TaskAttemptContext = null
54+
@transient protected var writer: RecordWriter[AnyRef,AnyRef] = null
55+
@transient protected var format: OutputFormat[AnyRef,AnyRef] = null
56+
@transient protected var committer: OutputCommitter = null
57+
@transient protected var jobContext: JobContext = null
58+
@transient protected var taskContext: TaskAttemptContext = null
5959

6060
def preSetup() {
6161
setIDs(0, 0, 0)
@@ -160,38 +160,36 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
160160
cmtr.commitJob(getJobContext())
161161
}
162162

163-
// ********* Private Functions *********
164-
165-
private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = {
163+
def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = {
166164
if (format == null) {
167165
format = conf.value.getOutputFormat()
168166
.asInstanceOf[OutputFormat[AnyRef,AnyRef]]
169167
}
170168
format
171169
}
172170

173-
private def getOutputCommitter(): OutputCommitter = {
171+
def getOutputCommitter(): OutputCommitter = {
174172
if (committer == null) {
175173
committer = conf.value.getOutputCommitter
176174
}
177175
committer
178176
}
179177

180-
private def getJobContext(): JobContext = {
178+
def getJobContext(): JobContext = {
181179
if (jobContext == null) {
182180
jobContext = newJobContext(conf.value, jID.value)
183181
}
184182
jobContext
185183
}
186184

187-
private def getTaskContext(): TaskAttemptContext = {
185+
def getTaskContext(): TaskAttemptContext = {
188186
if (taskContext == null) {
189187
taskContext = newTaskAttemptContext(conf.value, taID.value)
190188
}
191189
taskContext
192190
}
193191

194-
private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
192+
def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
195193
jobID = jobid
196194
splitID = splitid
197195
attemptID = attemptid

sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ private[hive] object HadoopTableReader extends HiveInspectors {
275275

276276
val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
277277
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) =>
278-
soi.getStructFieldRef(attr.name) -> ordinal
278+
soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
279279
}.unzip
280280

281281
// Builds specific unwrappers ahead of time according to object inspector types to avoid pattern

0 commit comments

Comments
 (0)