|
17 | 17 |
|
18 | 18 | package org.apache.spark.sql.hive.execution |
19 | 19 |
|
| 20 | +import java.io.File |
| 21 | + |
| 22 | +import com.google.common.io.Files |
| 23 | + |
20 | 24 | import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} |
21 | 25 | import org.apache.spark.sql.catalyst.TableIdentifier |
22 | 26 | import org.apache.spark.sql.catalyst.analysis.NoSuchTableException |
@@ -232,31 +236,40 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto |
232 | 236 | sql("""LOAD DATA LOCAL INPATH "/non-existing/data.txt" INTO TABLE non_part_table""") |
233 | 237 | } |
234 | 238 |
|
235 | | - val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath |
| 239 | + val testData = hiveContext.getHiveFile("data/files/employee.dat") |
236 | 240 |
|
237 | 241 | // Non-local inpath: without URI Scheme and Authority |
238 | | - sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""") |
| 242 | + withCopy(testData) { tmp => |
| 243 | + sql(s"""LOAD DATA INPATH "${tmp.getCanonicalPath()}" INTO TABLE non_part_table""") |
| 244 | + } |
| 245 | + |
239 | 246 | checkAnswer( |
240 | 247 | sql("SELECT * FROM non_part_table WHERE employeeID = 16"), |
241 | 248 | Row(16, "john") :: Nil) |
242 | 249 |
|
243 | 250 | // Use URI as LOCAL inpath: |
244 | 251 | // file:/path/to/data/files/employee.dat |
245 | | - val uri = "file:" + testData |
| 252 | + val uri = "file:" + testData.getCanonicalPath() |
246 | 253 | sql(s"""LOAD DATA LOCAL INPATH "$uri" INTO TABLE non_part_table""") |
247 | 254 |
|
248 | 255 | checkAnswer( |
249 | 256 | sql("SELECT * FROM non_part_table WHERE employeeID = 16"), |
250 | 257 | Row(16, "john") :: Row(16, "john") :: Nil) |
251 | 258 |
|
252 | 259 | // Use URI as non-LOCAL inpath |
253 | | - sql(s"""LOAD DATA INPATH "$uri" INTO TABLE non_part_table""") |
| 260 | + withCopy(testData) { tmp => |
| 261 | + val tmpUri = "file:" + tmp.getCanonicalPath() |
| 262 | + sql(s"""LOAD DATA INPATH "$tmpUri" INTO TABLE non_part_table""") |
| 263 | + } |
254 | 264 |
|
255 | 265 | checkAnswer( |
256 | 266 | sql("SELECT * FROM non_part_table WHERE employeeID = 16"), |
257 | 267 | Row(16, "john") :: Row(16, "john") :: Row(16, "john") :: Nil) |
258 | 268 |
|
259 | | - sql(s"""LOAD DATA INPATH "$uri" OVERWRITE INTO TABLE non_part_table""") |
| 269 | + withCopy(testData) { tmp => |
| 270 | + val tmpUri = "file:" + tmp.getCanonicalPath() |
| 271 | + sql(s"""LOAD DATA INPATH "$tmpUri" OVERWRITE INTO TABLE non_part_table""") |
| 272 | + } |
260 | 273 |
|
261 | 274 | checkAnswer( |
262 | 275 | sql("SELECT * FROM non_part_table WHERE employeeID = 16"), |
@@ -418,4 +431,19 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto |
418 | 431 | assert(sql("SHOW PARTITIONS part_datasrc").count() == 3) |
419 | 432 | } |
420 | 433 | } |
| 434 | + |
| 435 | + /** |
| 436 | + * Run a function with a copy of the input file. Use this for tests that use "LOAD DATA" |
| 437 | + * (instead of "LOAD DATA LOCAL") since, according to Hive's semantics, files are moved |
| 438 | + * into the target location in that case, and we need the original file to be preserved. |
| 439 | + */ |
| 440 | + private def withCopy(source: File)(fn: File => Unit): Unit = { |
| 441 | + val tmp = File.createTempFile(source.getName(), ".tmp") |
| 442 | + Files.copy(source, tmp) |
| 443 | + try { |
| 444 | + fn(tmp) |
| 445 | + } finally { |
| 446 | + tmp.delete() |
| 447 | + } |
| 448 | + } |
421 | 449 | } |
0 commit comments