Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.connector.catalog.*;
import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.delta.DeltaTableUtils;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
Expand All @@ -55,6 +57,47 @@ public class SparkTable implements Table, SupportsRead {
private final StructType partitionSchema;
private final Column[] columns;
private final Transform[] partitionTransforms;
private final Optional<CatalogTable> catalogTable;

/**
* Creates a SparkTable from a filesystem path without a catalog table.
*
* @param identifier logical table identifier used by Spark's catalog
* @param tablePath filesystem path to the Delta table root
* @throws NullPointerException if identifier or tablePath is null
*/
public SparkTable(Identifier identifier, String tablePath) {
this(identifier, tablePath, Collections.emptyMap(), Optional.empty());
}

/**
* Creates a SparkTable from a filesystem path with options.
*
* @param identifier logical table identifier used by Spark's catalog
* @param tablePath filesystem path to the Delta table root
* @param options table options used to configure the Hadoop conf, table reads and writes
* @throws NullPointerException if identifier or tablePath is null
*/
public SparkTable(Identifier identifier, String tablePath, Map<String, String> options) {
this(identifier, tablePath, options, Optional.empty());
}

/**
* Constructor that accepts a Spark CatalogTable and user-provided options. Extracts the table
* location and storage properties from the catalog table, then merges with user options. User
* options take precedence over catalog properties in case of conflicts.
*
* @param identifier logical table identifier used by Spark's catalog
* @param catalogTable the Spark CatalogTable containing table metadata including location
* @param options user-provided options to override catalog properties
*/
public SparkTable(Identifier identifier, CatalogTable catalogTable, Map<String, String> options) {
this(
identifier,
getDecodedPath(requireNonNull(catalogTable, "catalogTable is null").location()),
options,
Optional.of(catalogTable));
}

/**
* Creates a SparkTable backed by a Delta Kernel snapshot manager and initializes Spark-facing
Expand All @@ -67,21 +110,37 @@ public class SparkTable implements Table, SupportsRead {
* <p>Notes: - Partition column order from the snapshot is preserved for partitioning and appended
* after data columns in the public Spark schema, per Spark conventions. - Read-time scan options
* are later merged with these options.
*
* @param identifier logical table identifier used by Spark's catalog
* @param tablePath filesystem path to the Delta table root
* @param options table options used to configure the Hadoop conf, table reads and writes
* @throws NullPointerException if identifier or tablePath is null
*/
public SparkTable(Identifier identifier, String tablePath, Map<String, String> options) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we keep the previous constructor? Otherwise we can't read file path table with options

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added back

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if(catalogTable.properties.exists(kv => kv.key == "delta.feature.catalogOwned-preview" && kv.value == "supported")){
//catalog owned table, we should get the schema and partitioning from it.
we will handle it in the next pr
} else {
// we still need to learn it from snapshot
}

private SparkTable(
Identifier identifier,
String tablePath,
Map<String, String> userOptions,
Optional<CatalogTable> catalogTable) {
this.identifier = requireNonNull(identifier, "identifier is null");
this.options = options;
this.catalogTable = catalogTable;
// Merge options: file system options from catalog + user options (user takes precedence)
// This follows the same pattern as DeltaTableV2 in delta-spark
Map<String, String> merged = new HashMap<>();
// Only extract file system options from table storage properties
catalogTable.ifPresent(
table ->
scala.collection.JavaConverters.mapAsJavaMap(table.storage().properties())
.forEach(
(key, value) -> {
if (DeltaTableUtils.validDeltaTableHadoopPrefixes()
.exists(prefix -> key.startsWith(prefix))) {
merged.put(key, value);
}
}));
// User options override catalog properties
merged.putAll(userOptions);
this.options = Collections.unmodifiableMap(merged);

this.hadoopConf =
SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options));
this.snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf);
// Load the initial snapshot through the manager
this.initialSnapshot = snapshotManager.loadLatestSnapshot();

this.schema = SchemaUtils.convertKernelSchemaToSparkSchema(initialSnapshot.getSchema());
this.partColNames =
Collections.unmodifiableList(new ArrayList<>(initialSnapshot.getPartitionColumnNames()));
Expand Down Expand Up @@ -122,15 +181,20 @@ public SparkTable(Identifier identifier, String tablePath, Map<String, String> o
}

/**
* Convenience constructor that uses empty options. See {@link #SparkTable(Identifier, String,
* java.util.Map)} for full behavior and notes.
* Helper method to decode URI path handling URL-encoded characters correctly. E.g., converts
* "spark%25dir%25prefix" to "spark%dir%prefix"
*/
private static String getDecodedPath(java.net.URI location) {
return new java.io.File(location).getPath();
}

/**
* Returns the CatalogTable if this SparkTable was created from a catalog table.
*
* @param identifier logical table identifier used by Spark's catalog
* @param tablePath filesystem path to the Delta table root
* @throws NullPointerException if identifier or tablePath is null
* @return Optional containing the CatalogTable, or empty if this table was created from a path
*/
public SparkTable(Identifier identifier, String tablePath) {
this(identifier, tablePath, Collections.emptyMap());
public Optional<CatalogTable> getCatalogTable() {
return catalogTable;
}

@Override
Expand All @@ -156,7 +220,6 @@ public Transform[] partitioning() {
@Override
public Map<String, String> properties() {
Map<String, String> props = new HashMap<>(initialSnapshot.getTableProperties());
props.putAll(this.options);
return Collections.unmodifiableMap(props);
}

Expand Down
Loading
Loading