diff --git a/.gitignore b/.gitignore index 36ccbeba..585a673d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ # Compiled class file *.class +# vscode configs +.vscode/* + # Log file *.log @@ -41,3 +44,4 @@ target/ *.crc demo/jars/* demo/notebook/.ipynb_checkpoints/* +/.history diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..7b016a89 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.compile.nullAnalysis.mode": "automatic" +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java b/core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java index 120ee0e0..0d1b029c 100644 --- a/core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java +++ b/core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java @@ -32,6 +32,8 @@ import lombok.ToString; import org.apache.hadoop.conf.Configuration; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.expressions.Literal; import org.apache.spark.sql.types.StructField; @@ -65,12 +67,16 @@ import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.sync.ConversionTarget; +// to support getting the min read and writer dynamically +import io.delta.tables.*; + public class DeltaConversionTarget implements ConversionTarget { - private static final String MIN_READER_VERSION = String.valueOf(1); + // private static final String MIN_READER_VERSION = String.valueOf(1); // gets access to generated columns. - private static final String MIN_WRITER_VERSION = String.valueOf(4); + // private static final String MIN_WRITER_VERSION = String.valueOf(4); private DeltaLog deltaLog; + private DeltaTable deltaTable; private DeltaSchemaExtractor schemaExtractor; private DeltaPartitionExtractor partitionExtractor; private DeltaDataFileUpdatesExtractor dataFileUpdatesExtractor; @@ -79,6 +85,9 @@ public class DeltaConversionTarget implements ConversionTarget { private int logRetentionInHours; private TransactionState transactionState; + private String minReaderVersion; + private String minWriterVersion; + public DeltaConversionTarget() {} public DeltaConversionTarget(PerTableConfig perTableConfig, SparkSession sparkSession) { @@ -112,6 +121,7 @@ public DeltaConversionTarget(PerTableConfig perTableConfig, SparkSession sparkSe dataFileUpdatesExtractor); } + private void _init( String tableDataPath, String tableName, @@ -121,6 +131,9 @@ private void _init( DeltaPartitionExtractor partitionExtractor, DeltaDataFileUpdatesExtractor dataFileUpdatesExtractor) { DeltaLog deltaLog = DeltaLog.forTable(sparkSession, tableDataPath); + DeltaTable deltaTable = DeltaTable.forPath(sparkSession, tableDataPath); + minReaderVersion = String.valueOf(1); + minWriterVersion = String.valueOf(4); boolean deltaTableExists = deltaLog.tableExists(); if (!deltaTableExists) { deltaLog.ensureLogDirectoryExist(); @@ -129,6 +142,7 @@ private void _init( this.partitionExtractor = partitionExtractor; this.dataFileUpdatesExtractor = dataFileUpdatesExtractor; this.deltaLog = deltaLog; + this.deltaTable = deltaTable; this.tableName = tableName; this.logRetentionInHours = logRetentionInHours; } @@ -267,10 +281,25 @@ private void commitTransaction() { new DeltaOperations.Update(Option.apply(Literal.fromObject("xtable-delta-sync")))); } + @VisibleForTesting private Map getConfigurationsForDeltaSync() { + + // The output of detail() method has only one row with the following schema. + // deltaTable.detail() is added to the constructor for this class, and sets + // a private variable to of deltaTable + + // limit the results to the attributes needed + Dataset record = deltaTable.detail().select("minWriterVersion", "minReaderVersion"); + + // Collect the first row and extract data (in this instance the function only yields a single row) + Row row = record.first(); + + minWriterVersion = row.getAs("minWriterVersion").toString(); + minReaderVersion = row.getAs("minReaderVersion").toString(); + Map configMap = new HashMap<>(); - configMap.put(DeltaConfigs.MIN_READER_VERSION().key(), MIN_READER_VERSION); - configMap.put(DeltaConfigs.MIN_WRITER_VERSION().key(), MIN_WRITER_VERSION); + configMap.put(DeltaConfigs.MIN_READER_VERSION().key(), minReaderVersion); + configMap.put(DeltaConfigs.MIN_WRITER_VERSION().key(), minWriterVersion); configMap.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson()); // Sets retention for the Delta Log, does not impact underlying files in the table configMap.put( diff --git a/core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java b/core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java index fa425ef2..450fd7e6 100644 --- a/core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java +++ b/core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java @@ -42,6 +42,7 @@ import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; + import org.apache.xtable.schema.SchemaUtils; /** @@ -60,6 +61,10 @@ public class DeltaSchemaExtractor { private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id"; private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor(); + // Timestamps in Delta are microsecond precision by default + private static final Map DEFAULT_TIMESTAMP_PRECISION_METADATA = Collections.singletonMap( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); + public static DeltaSchemaExtractor getInstance() { return INSTANCE; } @@ -86,7 +91,6 @@ private DataType convertFieldType(InternalField field) { case INT: return DataTypes.IntegerType; case LONG: - case TIMESTAMP_NTZ: return DataTypes.LongType; case BYTES: case FIXED: @@ -99,6 +103,8 @@ private DataType convertFieldType(InternalField field) { return DataTypes.DateType; case TIMESTAMP: return DataTypes.TimestampType; + case TIMESTAMP_NTZ: + return DataTypes.TimestampNTZType; case DOUBLE: return DataTypes.DoubleType; case DECIMAL: @@ -183,10 +189,11 @@ private InternalSchema toInternalSchema( case "timestamp": type = InternalType.TIMESTAMP; // Timestamps in Delta are microsecond precision by default - metadata = - Collections.singletonMap( - InternalSchema.MetadataKey.TIMESTAMP_PRECISION, - InternalSchema.MetadataValue.MICROS); + metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA; + break; + case "timestamp_ntz": + type = InternalType.TIMESTAMP_NTZ; + metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA; break; case "struct": StructType structType = (StructType) dataType; diff --git a/core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java b/core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java index a458070b..27826d0e 100644 --- a/core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java +++ b/core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java @@ -44,8 +44,11 @@ import io.delta.tables.DeltaTable; +import org.apache.spark.sql.types.DataTypes; import org.apache.xtable.delta.TestDeltaHelper; +import static org.apache.spark.sql.types.DataTypes.TimestampNTZType; + @Getter public class TestSparkDeltaTable implements GenericTable, Closeable { // typical inserts or upserts do not use this partition value. @@ -110,6 +113,20 @@ public List insertRowsForPartition(int numRows, Integer year, String level) return rows; } + public List insertRowsForTimestampPartition() { + List rows = testDeltaHelper.generateRows(10); + Dataset df = sparkSession.createDataFrame(rows, testDeltaHelper.getTableStructSchema()); + // Add a TimestampNTZ column manually if not already included + df.schema().add(DataTypes.createStructField("timestamp_ntz", TimestampNTZType, false)); + // Writing the DataFrame to Delta format, partitioned by the TimestampNTZ column + df.write() + .format("delta") + .mode("append") + .partitionBy("timestamp_ntz") // Replace "timestamp_ntz" with the actual column name + .save(basePath); + + return rows; + } @Override public List insertRecordsForSpecialPartition(int numRows) { return insertRowsForPartition(numRows, SPECIAL_DATE_PARTITION_VALUE, SPECIAL_PARTITION_VALUE); diff --git a/core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java b/core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java index 3e2d61f5..a3fcd328 100644 --- a/core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java +++ b/core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java @@ -19,7 +19,6 @@ package org.apache.xtable.delta; import static org.junit.jupiter.api.Assertions.*; - import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Path; @@ -35,10 +34,13 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import io.delta.tables.DeltaTable; import org.apache.hadoop.conf.Configuration; import org.apache.spark.serializer.KryoSerializer; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.xtable.model.metadata.TableSyncMetadata; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -71,6 +73,7 @@ import org.apache.xtable.model.storage.*; import org.apache.xtable.model.storage.InternalDataFile; + public class ITDeltaConversionTargetSource { private static final InternalField COL1_INT_FIELD = @@ -731,4 +734,25 @@ private boolean checkIfFileIsRemoved(String activePath, TableChange tableChange) .collect(Collectors.toSet()); return filePathsRemoved.contains(activePath); } + + @Test + public void getConfigurationsForDeltaSync() { + + String tableName = GenericTable.getTableName(); + TestSparkDeltaTable testSparkDeltaTable = + new TestSparkDeltaTable( + tableName, tempDir, sparkSession, "timestamp_field", false); + testSparkDeltaTable.insertRowsForTimestampPartition(); + DeltaTable dt = testSparkDeltaTable.getDeltaTable(); + Dataset record = dt.detail().select("minWriterVersion", "minReaderVersion"); + + // Collect the first row and extract data (in this instance the function only yields a single row) + Row row = record.first(); + + String minWriterVersion = row.getAs("minWriterVersion").toString(); + String minReaderVersion = row.getAs("minReaderVersion").toString(); + // Assert the results + assertEquals("7", minWriterVersion); + assertEquals("3", minReaderVersion); + } } diff --git a/core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java b/core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java index a677f57e..09f5f118 100644 --- a/core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java +++ b/core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java @@ -154,6 +154,29 @@ public void createTable(SparkSession sparkSession, String tableName, String base .partitionedBy("yearOfBirth"); } else if ("level".equals(partitionField)) { tableBuilder.partitionedBy(partitionField); + } else if ("timestamp_field".equals(partitionField)){ + tableBuilder + .addColumn( + DeltaTable.columnBuilder("YEAR") + .dataType(IntegerType) + .generatedAlwaysAs("YEAR(timestamp_field)") + .build()) + .addColumn( + DeltaTable.columnBuilder("MONTH") + .dataType(IntegerType) + .generatedAlwaysAs("MONTH(timestamp_field)") + .build()) + .addColumn( + DeltaTable.columnBuilder("DAY") + .dataType(IntegerType) + .generatedAlwaysAs("DAY(timestamp_field)") + .build()) + .addColumn( + DeltaTable.columnBuilder("HOUR") + .dataType(IntegerType) + .generatedAlwaysAs("HOUR(timestamp_field)") + .build()) + .partitionedBy("timestamp_field"); } else if (partitionField != null) { throw new IllegalArgumentException("Unexpected partition field: " + partitionField); } diff --git a/core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java b/core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java index 45c90660..05aa84ab 100644 --- a/core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java +++ b/core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java @@ -391,8 +391,8 @@ public void testTimestamps() { StructType structRepresentationTimestampNtz = new StructType() - .add("requiredTimestampNtz", DataTypes.LongType, false) - .add("optionalTimestampNtz", DataTypes.LongType, true); + .add("requiredTimestampNtz", DataTypes.TimestampNTZType, false) + .add("optionalTimestampNtz", DataTypes.TimestampNTZType, true); Assertions.assertEquals( structRepresentationTimestamp, @@ -892,4 +892,5 @@ void generateColumnsAreNotTranslatedToInternalSchema() { Assertions.assertEquals( internalSchema, DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation)); } + } diff --git a/core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java b/core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java index 135ce995..1699f10e 100644 --- a/core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java +++ b/core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java @@ -57,7 +57,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; - +import org.apache.spark.sql.delta.DeltaConfigs; import org.apache.spark.sql.delta.GeneratedColumn; import scala.collection.JavaConverters; @@ -406,6 +406,8 @@ private void validateDeltaTable( internalDataFiles.size(), count, "Number of files from DeltaScan don't match expectation"); } + + private InternalSnapshot buildSnapshot(InternalTable table, InternalDataFile... dataFiles) { return InternalSnapshot.builder() .table(table) @@ -508,4 +510,5 @@ private static SparkSession buildSparkSession() { .set("spark.master", "local[2]"); return SparkSession.builder().config(sparkConf).getOrCreate(); } + }