Skip to content

Commit

Permalink
Python API for restoring delta table
Browse files Browse the repository at this point in the history
 * Add possibility to restore delta table using version or timestamp from pyspark
   Examples:
   ```
   DeltaTable.forPath(spark, path).restoreToVersion(0)
   DeltaTable.forPath(spark, path).restoreToTimestamp('2021-01-01 01:01-01')
   ```

Tested by unit tests.

Fixes delta-io#890

Signed-off-by: Maksym Dovhal <maksym.dovhal@gmail.com>

Closes delta-io#903

Signed-off-by: Venki Korukanti <venki.korukanti@databricks.com>
GitOrigin-RevId: 8ca6a3643d97b1a95ebf3a48edcb23f4f2adb6f4
  • Loading branch information
Maksym Dovhal authored and jbguerraz committed Jul 6, 2022
1 parent f53b493 commit 2ba2a1d
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 3 deletions.
40 changes: 40 additions & 0 deletions python/delta/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,46 @@ def upgradeTableProtocol(self, readerVersion: int, writerVersion: int) -> None:
type(writerVersion))
jdt.upgradeTableProtocol(readerVersion, writerVersion)

@since(1.2) # type: ignore[arg-type]
def restoreToVersion(self, version: int) -> DataFrame:
"""
Restore the DeltaTable to an older version of the table specified by version number.
Example::
io.delta.tables.DeltaTable.restoreToVersion(1)
:param version: target version of restored table
:return: Dataframe with metrics of restore operation.
:rtype: pyspark.sql.DataFrame
"""

return DataFrame(
self._jdt.restoreToVersion(version),
self._spark._wrapped # type: ignore[attr-defined]
)

@since(1.2) # type: ignore[arg-type]
def restoreToTimestamp(self, timestamp: str) -> DataFrame:
"""
Restore the DeltaTable to an older version of the table specified by a timestamp.
Timestamp can be of the format yyyy-MM-dd or yyyy-MM-dd HH:mm:ss
Example::
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01')
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01')
:param timestamp: target timestamp of restored table
:return: Dataframe with metrics of restore operation.
:rtype: pyspark.sql.DataFrame
"""

return DataFrame(
self._jdt.restoreToTimestamp(timestamp),
self._spark._wrapped # type: ignore[attr-defined]
)

@staticmethod
def _dict_to_jmap(
sparkSession: SparkSession,
Expand Down
47 changes: 44 additions & 3 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,42 @@ def test_protocolUpgrade(self) -> None:
with self.assertRaisesRegex(ValueError, "writerVersion"):
dt.upgradeTableProtocol(1, {}) # type: ignore[arg-type]

def test_restore_to_version(self) -> None:
self.__writeDeltaTable([('a', 1), ('b', 2)])
self.__overwriteDeltaTable([('a', 3), ('b', 2)],
schema=["key_new", "value_new"],
overwriteSchema='true')

overwritten = DeltaTable.forPath(self.spark, self.tempFile).toDF()
self.__checkAnswer(overwritten,
[Row(key_new='a', value_new=3), Row(key_new='b', value_new=2)])

DeltaTable.forPath(self.spark, self.tempFile).restoreToVersion(0)
restored = DeltaTable.forPath(self.spark, self.tempFile).toDF()

self.__checkAnswer(restored, [Row(key='a', value=1), Row(key='b', value=2)])

def test_restore_to_timestamp(self) -> None:
self.__writeDeltaTable([('a', 1), ('b', 2)])
timestampToRestore = DeltaTable.forPath(self.spark, self.tempFile) \
.history() \
.head() \
.timestamp \
.strftime('%Y-%m-%d %H:%M:%S.%f')

self.__overwriteDeltaTable([('a', 3), ('b', 2)],
schema=["key_new", "value_new"],
overwriteSchema='true')

overwritten = DeltaTable.forPath(self.spark, self.tempFile).toDF()
self.__checkAnswer(overwritten,
[Row(key_new='a', value_new=3), Row(key_new='b', value_new=2)])

DeltaTable.forPath(self.spark, self.tempFile).restoreToTimestamp(timestampToRestore)

restored = DeltaTable.forPath(self.spark, self.tempFile).toDF()
self.__checkAnswer(restored, [Row(key='a', value=1), Row(key='b', value=2)])

def __checkAnswer(self, df: DataFrame,
expectedAnswer: List[Any],
schema: Union[StructType, List[str]] = ["key", "value"]) -> None:
Expand Down Expand Up @@ -818,9 +854,14 @@ def __writeAsTable(self, datalist: List[Tuple[Any, Any]], tblName: str) -> None:
df = self.spark.createDataFrame(datalist, ["key", "value"])
df.write.format("delta").saveAsTable(tblName)

def __overwriteDeltaTable(self, datalist: List[Tuple[Any, Any]]) -> None:
df = self.spark.createDataFrame(datalist, ["key", "value"])
df.write.format("delta").mode("overwrite").save(self.tempFile)
def __overwriteDeltaTable(self, datalist: List[Tuple[Any, Any]],
schema: Union[StructType, List[str]] = ["key", "value"],
overwriteSchema: str = 'false') -> None:
df = self.spark.createDataFrame(datalist, schema)
df.write.format("delta") \
.option('overwriteSchema', overwriteSchema) \
.mode("overwrite") \
.save(self.tempFile)

def __createFile(self, fileName: str, content: Any) -> None:
with open(os.path.join(self.tempFile, fileName), 'w') as f:
Expand Down

0 comments on commit 2ba2a1d

Please sign in to comment.