Skip to content

Commit

Permalink
[Kernel] Fix issue querying tables with spaces in the name (delta-io#…
Browse files Browse the repository at this point in the history
…3291)

## Description
Currently, Kernel uses a mix of path (file system path) or URI (in
string format) in API interfaces, which causes confusion and bugs.

Context: 
Path refers to a file system path which could have some characters that
should be escaped when converted to URI
E.g. path: `s3:/bucket/path to file/`, URI for the same path:
`s3:/bucket/path%20to%20file/`

Make it uniform everywhere to just use the paths (file system path).

## How was this patch tested?
Additional tests with table path containing spaces.
  • Loading branch information
vkorukanti committed Aug 30, 2024
1 parent f6f462a commit 84c71fe
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 18 deletions.
16 changes: 11 additions & 5 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ public interface Scan {
* <li>name: {@code add}, type: {@code struct}</li>
* <li>Description: Represents `AddFile` DeltaLog action</li>
* <li><ul>
* <li>name: {@code path}, type: {@code string}, description: location of the file.</li>
* <li>name: {@code path}, type: {@code string}, description: location of the file.
* The path is a URI as specified by RFC 2396 URI Generic Syntax, which needs to be decoded
* to get the data file path.</li>
* <li>name: {@code partitionValues}, type: {@code map(string, string)},
* description: A map from partition column to value for this logical file. </li>
* <li>name: {@code size}, type: {@code log}, description: size of the file.</li>
Expand All @@ -72,7 +74,9 @@ public interface Scan {
* <a href=https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Deletion-Vectors>
* Protocol</a><ul>
* <li>name: {@code storageType}, type: {@code string}</li>
* <li>name: {@code pathOrInlineDv}, type: {@code string}</li>
* <li>name: {@code pathOrInlineDv}, type: {@code string}, description: The path is a
* URI as specified by RFC 2396 URI Generic Syntax, which needs to be decoded to get the
* data file path.</li>
* <li>name: {@code offset}, type: {@code log}</li>
* <li>name: {@code sizeInBytes}, type: {@code log}</li>
* <li>name: {@code cardinality}, type: {@code log}</li>
Expand All @@ -81,9 +85,11 @@ public interface Scan {
* </ul></li>
* <li><ul>
* <li>name: {@code tableRoot}, type: {@code string}</li>
* <li>Description: Absolute path of the table location. NOTE: this is temporary. Will
* be removed in future. @see <a href=https://github.com/delta-io/delta/issues/2089>
* </a></li>
* <li>Description: Absolute path of the table location. The path is a URI as specified by
* RFC 2396 URI Generic Syntax, which needs to be decode to get the data file path.
* NOTE: this is temporary. Will be removed in future.
* @see <a href=https://github.com/delta-io/delta/issues/2089></a>
* </li>
* </ul></li>
* </ol>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.delta.kernel.internal;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -112,7 +113,9 @@ public static FileStatus getAddFileStatus(Row scanFileInfo) {

// TODO: this is hack until the path in `add.path` is converted to an absolute path
String tableRoot = scanFileInfo.getString(TABLE_ROOT_ORDINAL);
String absolutePath = new Path(tableRoot, path).toString();
String absolutePath = new Path(
new Path(URI.create(tableRoot)),
new Path(URI.create(path))).toString();

return FileStatus.of(absolutePath, size, modificationTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,11 @@ protected static String serializePartitionValue(Literal literal) {

/**
* Escapes the given string to be used as a partition value in the path. Basically this escapes
* - characters that can't be in a file path. E.g. `a\nb` will be escaped to `a%0Ab`. -
* character that are cause ambiguity in partition value parsing. E.g. For partition column `a`
* having value `b=c`, the path should be `a=b%3Dc`.
* <ul>
* <li>characters that can't be in a file path. E.g. `a\nb` will be escaped to `a%0Ab`.</li>
* <li>character that are cause ambiguity in partition value parsing.
* E.g. For partition column `a` having value `b=c`, the path should be `a=b%3Dc`</li>
* </ul>
*
* @param value The partition value to escape.
* @return The escaped partition value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.delta.kernel.defaults.engine;

import java.io.*;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.*;
import static java.lang.String.format;
Expand Down Expand Up @@ -190,7 +189,7 @@ public void writeJsonFileAtomically(
String filePath,
CloseableIterator<Row> data,
boolean overwrite) throws IOException {
Path path = new Path(URI.create(filePath));
Path path = new Path(filePath);
LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme());
try {
logStore.write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.*;
import static java.lang.String.format;

Expand Down Expand Up @@ -104,7 +103,7 @@ public CloseableIterator<DataFileStatus> writeParquetFiles(
CloseableIterator<FilteredColumnarBatch> dataIter,
List<Column> statsColumns) throws IOException {
ParquetFileWriter batchWriter =
new ParquetFileWriter(hadoopConf, new Path(URI.create(directoryPath)), statsColumns);
new ParquetFileWriter(hadoopConf, new Path(directoryPath), statsColumns);
return batchWriter.write(dataIter);
}

Expand All @@ -121,7 +120,7 @@ public void writeParquetFileAtomically(
String filePath,
CloseableIterator<FilteredColumnarBatch> data) throws IOException {
try {
Path targetPath = new Path(URI.create(filePath));
Path targetPath = new Path(filePath);
LogStore logStore =
LogStoreProvider.getLogStore(hadoopConf, targetPath.toUri().getScheme());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.delta.kernel.defaults.internal.parquet;

import java.io.IOException;
import java.net.URI;
import java.util.*;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -113,7 +112,7 @@ private void initParquetReaderIfRequired() {
org.apache.parquet.hadoop.ParquetFileReader fileReader = null;
try {
Configuration confCopy = configuration;
Path filePath = new Path(URI.create(path));
Path filePath = new Path(path);

// We need physical schema in order to construct a filter that can be
// pushed into the `parquet-mr` reader. For that reason read the footer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,18 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {
)
}

test(s"table with spaces in the table path") {
withTempDir { tempDir =>
val target = tempDir.getCanonicalPath + s"/table- -path"
spark.sql(s"CREATE TABLE delta.`$target` USING DELTA " +
s"SELECT * FROM delta.`${getTestResourceFilePath("basic-with-checkpoint")}`")
checkTable(
path = target,
expectedAnswer = (0 until 150).map(i => TestRow(i.toLong))
)
}
}

test("table with name column mapping mode") {
val expectedAnswer = (0 to 10).map {
case 10 => TestRow(null, null, null, null, null, null, null, null, null, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}

test("insert into partitioned table - already existing table") {
withTempDirAndEngine { (tblPath, engine) =>
withTempDirAndEngine { (tempTblPath, engine) =>
val tblPath = tempTblPath + "/table+ with special chars"
val partitionCols = Seq("part1", "part2")

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class LogReplaySuite extends AnyFunSuite with TestUtils {
assert(scanFileRows.length == 1)
val addFileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRows.head)
// get the relative path to compare
assert(new File(addFileStatus.getPath).getName == "special%20p@%23h")
assert(new File(addFileStatus.getPath).getName == "special p@#h")
}
}

Expand Down

0 comments on commit 84c71fe

Please sign in to comment.