Skip to content

Commit

Permalink
Allow file names to include = when convert to delta
Browse files Browse the repository at this point in the history
Allow file names to include = when convert to delta

Signed-off-by: Yishuang Lu <luystu@gmail.com>
  • Loading branch information
lys0716 committed Dec 5, 2019
1 parent 26e474b commit dce373a
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.delta.tables.execution

import org.apache.spark.sql.delta.commands.{ConvertToDeltaCommand, ConvertToDeltaCommandBase}
import org.apache.spark.sql.delta.commands.ConvertToDeltaCommand
import io.delta.tables.DeltaTable

import org.apache.spark.sql.SparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,9 @@ abstract class ConvertToDeltaCommandBase(
val dateFormatter = DateFormatter()
val timestampFormatter =
TimestampFormatter(timestampPartitionPattern, java.util.TimeZone.getDefault)
val dir = if (file.isDir) file.getPath else file.getPath.getParent
val (partitionOpt, _) = PartitionUtils.parsePartition(
path,
dir,
typeInference = false,
basePaths = Set.empty,
userSpecifiedDataTypes = Map.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,31 @@ abstract class ConvertToDeltaSuiteBase extends QueryTest
}
}

test("allow file names to have = character") {
withTempDir { dir =>
val tempDir = dir.getCanonicalPath
writeFiles(tempDir + "/part=1/", Seq(1).toDF("id"))

val sessionHadoopConf = spark.sessionState.newHadoopConf
val fs = new Path(tempDir).getFileSystem(sessionHadoopConf)
def listFileNames: Array[String] =
fs.listStatus(new Path(tempDir + "/part=1/"))
.map(_.getPath)
.filter(path => !path.getName.startsWith("_") && !path.getName.startsWith("."))
.map(_.toUri.toString)

val fileNames = listFileNames
assert(fileNames.size == 1)
fs.rename(new Path(fileNames.head), new Path(fileNames.head
.stripSuffix(".snappy.parquet").concat("-id=1.snappy.parquet")))

val newFileNames = listFileNames
assert(newFileNames.head.endsWith("-id=1.snappy.parquet"))
convertToDelta(s"parquet.`$tempDir`", Some("part string"))
checkAnswer(spark.read.format("delta").load(tempDir), Row(1, "1"))
}
}

test("allow file names to not have .parquet suffix") {
withTempDir { dir =>
val tempDir = dir.getCanonicalPath
Expand Down

0 comments on commit dce373a

Please sign in to comment.