diff --git a/compose.yaml b/compose.yaml
index 0600801f9727a..0ee9d6a9ffd1b 100644
--- a/compose.yaml
+++ b/compose.yaml
@@ -139,6 +139,8 @@ services:
AZURE_ABFS_OAUTH_SECRET: ""
AZURE_ABFS_OAUTH_ENDPOINT: ""
AZURE_WASB_ACCESS_KEY: ""
+ ports:
+ - 9083:9083
depends_on:
hive-metastore-db:
condition: service_healthy
diff --git a/docker/flink/conf/hive-site.xml b/docker/flink/conf/hive-site.xml
new file mode 100644
index 0000000000000..eff583ee89550
--- /dev/null
+++ b/docker/flink/conf/hive-site.xml
@@ -0,0 +1,32 @@
+
+
+
+
+
+
+ hive.metastore.local
+ false
+
+
+
+ hive.metastore.uris
+ thrift://localhost:9083
+
+
diff --git a/ibis/backends/bigquery/compiler.py b/ibis/backends/bigquery/compiler.py
index eb24ccfdf1d7b..a63cad89f8d7c 100644
--- a/ibis/backends/bigquery/compiler.py
+++ b/ibis/backends/bigquery/compiler.py
@@ -7,6 +7,7 @@
import sqlglot as sg
import sqlglot.expressions as sge
from sqlglot.dialects import BigQuery
+from typing import TYPE_CHECKING
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
@@ -22,6 +23,12 @@
)
from ibis.common.temporal import DateUnit, IntervalUnit, TimestampUnit, TimeUnit
+if TYPE_CHECKING:
+ from typing import Any
+
+ import ibis.expr.schema as sch
+
+
_NAME_REGEX = re.compile(r'[^!"$()*,./;?@[\\\]^`{}~\n]+')
@@ -713,3 +720,24 @@ def visit_CountDistinct(self, op, *, arg, where):
if where is not None:
arg = self.if_(where, arg, NULL)
return self.f.count(sge.Distinct(expressions=[arg]))
+
+ def visit_TimeTravelDatabaseTable(
+ self,
+ op,
+ *,
+ name: str,
+ schema: sch.Schema,
+ source: Any,
+ namespace: ops.Namespace,
+ timestamp: ops.Literal,
+ ):
+ table = sg.table(
+ name, db=namespace.schema, catalog=namespace.database, quoted=self.quoted
+ )
+ return sge.Table(
+ this=table.this,
+ db=table.db,
+ catalog=table.catalog,
+ alias=table.alias,
+ version=sge.Version(this="SYSTEM_TIME", kind="AS OF", expression=timestamp),
+ )
diff --git a/ibis/backends/bigquery/tests/unit/test_compiler.py b/ibis/backends/bigquery/tests/unit/test_compiler.py
index 0d95438df2c62..2780566b2ad39 100644
--- a/ibis/backends/bigquery/tests/unit/test_compiler.py
+++ b/ibis/backends/bigquery/tests/unit/test_compiler.py
@@ -632,3 +632,58 @@ def test_unnest(snapshot):
).select(level_two=lambda t: t.level_one.unnest())
)
snapshot.assert_match(result, "out_two_unnests.sql")
+
+
+def test_time_travel(alltypes):
+ from ibis.selectors import all
+
+ # TODO (mehmet): Setting schema from `alltypes` as
+ # schema = alltypes.schema
+ # fails with this error
+ # `schema`: is not coercible to a Schema
+ schema = ibis.schema(
+ dict(
+ id="int32",
+ bool_col="boolean",
+ tinyint_col="int8",
+ smallint_col="int16",
+ int_col="int32",
+ bigint_col="int64",
+ float_col="float32",
+ double_col="float64",
+ date_string_col="string",
+ string_col="string",
+ timestamp_col=dt.Timestamp(timezone="UTC"),
+ year="int32",
+ month="int32",
+ )
+ )
+
+ table = ops.DatabaseTable(
+ name="my_table",
+ schema=schema,
+ source="bigquery",
+ namespace=ops.Namespace(schema="my_dataset", database="my_project"),
+ ).to_expr()
+ table = table.time_travel(ibis.timestamp("2023-01-02T03:04:05"))
+ expr = table.select(all())
+
+ sql = ibis.bigquery.compile(expr)
+
+ expected_sql = """SELECT
+ `t0`.`id`,
+ `t0`.`bool_col`,
+ `t0`.`tinyint_col`,
+ `t0`.`smallint_col`,
+ `t0`.`int_col`,
+ `t0`.`bigint_col`,
+ `t0`.`float_col`,
+ `t0`.`double_col`,
+ `t0`.`date_string_col`,
+ `t0`.`string_col`,
+ `t0`.`timestamp_col`,
+ `t0`.`year`,
+ `t0`.`month`
+FROM my_project.my_dataset.`my_table` FOR SYSTEM_TIME AS OF datetime('2023-01-02T03:04:05') AS `t0`"""
+
+ assert sql == expected_sql
diff --git a/ibis/backends/flink/__init__.py b/ibis/backends/flink/__init__.py
index 64c43ad21d526..d516a2b76408e 100644
--- a/ibis/backends/flink/__init__.py
+++ b/ibis/backends/flink/__init__.py
@@ -250,6 +250,7 @@ def table(
f"`database` must be a string; got {type(database)}"
)
schema = self.get_schema(name, catalog=catalog, database=database)
+
node = ops.DatabaseTable(
name,
schema=schema,
@@ -842,6 +843,8 @@ def insert(
self,
table_name: str,
obj: pa.Table | pd.DataFrame | ir.Table | list | dict,
+ *,
+ schema: sch.Schema | None = None,
database: str | None = None,
catalog: str | None = None,
overwrite: bool = False,
@@ -854,6 +857,8 @@ def insert(
The name of the table to insert data into.
obj
The source data or expression to insert.
+ schema
+ The schema for the table.
database
Name of the attached database that the table is located in.
catalog
@@ -876,6 +881,8 @@ def insert(
import pyarrow as pa
import pyarrow_hotfix # noqa: F401
+ from ibis.backends.flink.datatypes import FlinkRowSchema
+
if isinstance(obj, ir.Table):
statement = InsertSelect(
table_name,
@@ -891,7 +898,15 @@ def insert(
if isinstance(obj, dict):
obj = pd.DataFrame.from_dict(obj)
if isinstance(obj, pd.DataFrame):
- table = self._table_env.from_pandas(obj)
+ if schema:
+ schema_ = FlinkRowSchema.from_ibis(schema)
+ table = self._table_env.from_pandas(obj, schema_)
+ else:
+ table = self._table_env.from_pandas(obj)
+
+ pyflink_schema = FlinkRowSchema.from_ibis(schema)
+ table = self._table_env.from_pandas(obj, pyflink_schema)
+
return table.execute_insert(table_name, overwrite=overwrite)
if isinstance(obj, list):
diff --git a/ibis/backends/flink/tests/conftest.py b/ibis/backends/flink/tests/conftest.py
index 02fe0ade77e9b..6d51f4637926a 100644
--- a/ibis/backends/flink/tests/conftest.py
+++ b/ibis/backends/flink/tests/conftest.py
@@ -167,3 +167,11 @@ def generate_csv_configs(csv_file):
}
return generate_csv_configs
+
+
+@pytest.fixture(scope="module")
+def tempdir_sink_configs():
+ def generate_tempdir_configs(tempdir):
+ return {"connector": "filesystem", "path": tempdir, "format": "csv"}
+
+ return generate_tempdir_configs
diff --git a/ibis/backends/flink/tests/test_ddl.py b/ibis/backends/flink/tests/test_ddl.py
index 44742bc7c8213..96dc65fddcd27 100644
--- a/ibis/backends/flink/tests/test_ddl.py
+++ b/ibis/backends/flink/tests/test_ddl.py
@@ -17,14 +17,6 @@
from ibis.backends.tests.errors import Py4JJavaError
-@pytest.fixture
-def tempdir_sink_configs():
- def generate_tempdir_configs(tempdir):
- return {"connector": "filesystem", "path": tempdir, "format": "csv"}
-
- return generate_tempdir_configs
-
-
@pytest.mark.parametrize("temp", [True, False])
def test_list_tables(con, temp):
assert len(con.list_tables(temp=temp))
diff --git a/ibis/backends/flink/tests/test_time_travel.py b/ibis/backends/flink/tests/test_time_travel.py
new file mode 100644
index 0000000000000..cba62a0bb78b9
--- /dev/null
+++ b/ibis/backends/flink/tests/test_time_travel.py
@@ -0,0 +1,274 @@
+"""Tests in this module tests if
+(1) Ibis generates the correct SQL for time travel,
+(2) The generated SQL is executed by Flink without errors.
+They do NOT compare the time travel results against the expected results.
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import pandas as pd
+import pytest
+
+import ibis
+import ibis.expr.datatypes as dt
+import ibis.expr.schema as sch
+import ibis.expr.types as ir
+from ibis.backends.flink.tests.utils import download_jar_for_package, get_catalogs
+
+if TYPE_CHECKING:
+ from pathlib import Path
+
+
+def create_temp_table(
+ table_name: str,
+ con,
+ data_dir: Path,
+ tempdir_sink_configs,
+ tmp_path_factory,
+):
+ # Subset of `functional_alltypes_schema`
+ schema = sch.Schema(
+ {
+ "id": dt.int32,
+ "bool_col": dt.bool,
+ "smallint_col": dt.int16,
+ "int_col": dt.int32,
+ "timestamp_col": dt.timestamp(scale=3),
+ }
+ )
+
+ df = pd.read_parquet(f"{data_dir}/parquet/functional_alltypes.parquet")
+ df = df[list(schema.names)]
+ df = df.head(20)
+
+ temp_path = tmp_path_factory.mktemp(table_name)
+ tbl_properties = tempdir_sink_configs(temp_path)
+
+ # Note: Paimon catalog supports 'warehouse'='file:...' only for temporary tables.
+ table = con.create_table(
+ table_name,
+ schema=schema,
+ tbl_properties=tbl_properties,
+ temp=True,
+ )
+ con.insert(
+ table_name,
+ obj=df,
+ schema=schema,
+ ).wait()
+
+ return table
+
+
+@pytest.fixture(scope="module")
+def temp_table(con, data_dir, tempdir_sink_configs, tmp_path_factory) -> ir.Table:
+ table_name = "test_table"
+
+ yield create_temp_table(
+ table_name=table_name,
+ con=con,
+ data_dir=data_dir,
+ tempdir_sink_configs=tempdir_sink_configs,
+ tmp_path_factory=tmp_path_factory,
+ )
+
+ con.drop_table(name=table_name, temp=True, force=True)
+
+
+@pytest.fixture(
+ params=[
+ (
+ ibis.timestamp("2023-01-02T03:04:05"),
+ "CAST('2023-01-02 03:04:05.000000' AS TIMESTAMP)",
+ ),
+ (
+ ibis.timestamp("2023-01-01 12:34:56.789 UTC"),
+ "CAST('2023-01-01 12:34:56.789000' AS TIMESTAMP)",
+ ),
+ (
+ ibis.timestamp("2023-01-02T03:04:05") + ibis.interval(days=3),
+ "CAST('2023-01-02 03:04:05.000000' AS TIMESTAMP) + INTERVAL '3' DAY(2)",
+ ),
+ (
+ ibis.timestamp("2023-01-02T03:04:05")
+ + ibis.interval(days=3)
+ + ibis.interval(hours=6),
+ "(CAST('2023-01-02 03:04:05.000000' AS TIMESTAMP) + INTERVAL '3' DAY(2)) + INTERVAL '6' HOUR(2)",
+ ),
+ (
+ ibis.timestamp("2023-01-02 03:04:05", timezone="EST"),
+ "CAST('2023-01-02 03:04:05.000000' AS TIMESTAMP)",
+ ),
+ ]
+)
+def timestamp_and_sql(request):
+ return request.param
+
+
+def test_time_travel(temp_table, timestamp_and_sql):
+ timestamp, _ = timestamp_and_sql
+ expr = temp_table.time_travel(timestamp)
+
+ assert expr.op().timestamp == timestamp.op()
+
+
+@pytest.fixture
+def time_travel_expr_and_expected_sql(
+ temp_table, timestamp_and_sql
+) -> tuple[ir.Expr, str]:
+ from ibis.selectors import all
+
+ timestamp, timestamp_sql = timestamp_and_sql
+
+ expr = temp_table.time_travel(timestamp).select(all())
+ expected_sql = f"""SELECT `t0`.`id`, `t0`.`bool_col`, `t0`.`smallint_col`, `t0`.`int_col`, `t0`.`timestamp_col` FROM `{temp_table.get_name()}` FOR SYSTEM_TIME AS OF {timestamp_sql} AS `t0`"""
+
+ return expr, expected_sql
+
+
+def test_time_travel_compile(con, time_travel_expr_and_expected_sql):
+ expr, expected_sql = time_travel_expr_and_expected_sql
+ sql = con.compile(expr)
+ assert sql == expected_sql
+
+
+@pytest.fixture
+def use_hive_catalog(con):
+ # Flink related
+ download_jar_for_package(
+ package_name="apache-flink",
+ jar_name="flink-sql-connector-hive-3.1.3_2.12-1.18.1",
+ jar_url="https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.18.1/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar",
+ )
+
+ # Hadoop related
+ download_jar_for_package(
+ package_name="apache-flink",
+ jar_name="woodstox-core-5.3.0",
+ jar_url="https://repo1.maven.org/maven2/com/fasterxml/woodstox/woodstox-core/5.3.0/woodstox-core-5.3.0.jar",
+ )
+ download_jar_for_package(
+ package_name="apache-flink",
+ jar_name="commons-logging-1.1.3",
+ jar_url="https://repo1.maven.org/maven2/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar",
+ )
+ download_jar_for_package(
+ package_name="apache-flink",
+ jar_name="commons-configuration2-2.1.1",
+ jar_url="https://repo1.maven.org/maven2/org/apache/commons/commons-configuration2/2.1.1/commons-configuration2-2.1.1.jar",
+ )
+ download_jar_for_package(
+ package_name="apache-flink",
+ jar_name="hadoop-auth-3.3.2",
+ jar_url="https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-auth/3.3.2/hadoop-auth-3.3.2.jar",
+ )
+ download_jar_for_package(
+ package_name="apache-flink",
+ jar_name="hadoop-common-3.3.2",
+ jar_url="https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/3.3.2/hadoop-common-3.3.2.jar",
+ )
+ download_jar_for_package(
+ package_name="apache-flink",
+ jar_name="hadoop-hdfs-client-3.3.2",
+ jar_url="https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs-client/3.3.2/hadoop-hdfs-client-3.3.2.jar",
+ )
+ download_jar_for_package(
+ package_name="apache-flink",
+ jar_name="hadoop-mapreduce-client-core-3.3.2",
+ jar_url="https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-core/3.3.2/hadoop-mapreduce-client-core-3.3.2.jar",
+ )
+ download_jar_for_package(
+ package_name="apache-flink",
+ jar_name="hadoop-shaded-guava-1.1.1",
+ jar_url="https://repo1.maven.org/maven2/org/apache/hadoop/thirdparty/hadoop-shaded-guava/1.1.1/hadoop-shaded-guava-1.1.1.jar",
+ )
+ download_jar_for_package(
+ package_name="apache-flink",
+ jar_name="stax2-api-4.2.1",
+ jar_url="https://repo1.maven.org/maven2/org/codehaus/woodstox/stax2-api/4.2.1/stax2-api-4.2.1.jar",
+ )
+
+ hive_catalog = "hive_catalog"
+ sql = """
+ CREATE CATALOG hive_catalog WITH (
+ 'type' = 'hive',
+ 'hive-conf-dir' = './docker/flink/conf/'
+ );
+ """
+ con.raw_sql(sql)
+
+ catalog_list = get_catalogs(con)
+ assert hive_catalog in catalog_list
+
+ con.raw_sql(f"USE CATALOG {hive_catalog};")
+
+
+@pytest.fixture
+def use_paimon_catalog(con):
+ # Note: It is not ideal to do "test ops" in code here. However,
+ # adding JAR files in the Flink container in this case won't help
+ # because the Flink specific tests do not run on the dockerized env,
+ # but on the local env.
+ download_jar_for_package(
+ package_name="apache-flink",
+ jar_name="paimon-flink-1.18-0.8-20240301.002155-30",
+ jar_url="https://repository.apache.org/content/groups/snapshots/org/apache/paimon/paimon-flink-1.18/0.8-SNAPSHOT/paimon-flink-1.18-0.8-20240301.002155-30.jar",
+ )
+ download_jar_for_package(
+ package_name="apache-flink",
+ jar_name="flink-shaded-hadoop-2-uber-2.8.3-10.0",
+ jar_url="https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar",
+ )
+
+ paimon_catalog = "paimon_catalog"
+ catalog_list = get_catalogs(con)
+ if paimon_catalog not in catalog_list:
+ sql = """
+ CREATE CATALOG paimon_catalog WITH (
+ 'type'='paimon',
+ 'warehouse'='file:/tmp/paimon'
+ );
+ """
+ con.raw_sql(sql)
+ catalog_list = get_catalogs(con)
+ assert paimon_catalog in catalog_list
+
+ con.raw_sql(f"USE CATALOG {paimon_catalog};")
+
+
+@pytest.fixture
+def time_travel_expr(
+ con, data_dir, tempdir_sink_configs, tmp_path_factory, timestamp_and_sql
+) -> ir.Expr:
+ from ibis.selectors import all
+
+ table_name = "test_table"
+ table = create_temp_table(
+ con=con,
+ table_name=table_name,
+ data_dir=data_dir,
+ tempdir_sink_configs=tempdir_sink_configs,
+ tmp_path_factory=tmp_path_factory,
+ )
+
+ timestamp, _ = timestamp_and_sql
+
+ yield table.time_travel(timestamp).select(all())
+
+ con.drop_table(name=table_name, temp=True, force=True)
+
+
+# Note: `test_time_travel_w_xxx_catalog()` tests rely on `use_hive_catalog`
+# to appear before `time_travel_expr` per "Fixtures are evaluated in order
+# of presence in test function arguments, from left to right."
+# This is required to create the table in the catalog.
+def test_time_travel_w_hive_catalog(con, use_hive_catalog, time_travel_expr):
+ # Start the Flink env and Hive metastore first
+ # $ docker compose up flink hive-metastore --force-recreate
+ con.execute(time_travel_expr)
+
+
+def test_time_travel_w_paimon_catalog(con, use_paimon_catalog, time_travel_expr):
+ con.execute(time_travel_expr)
diff --git a/ibis/backends/flink/tests/utils.py b/ibis/backends/flink/tests/utils.py
new file mode 100644
index 0000000000000..8363054def1d3
--- /dev/null
+++ b/ibis/backends/flink/tests/utils.py
@@ -0,0 +1,48 @@
+def download_jar_for_package(
+ package_name: str,
+ jar_name: str,
+ jar_url: str,
+):
+ import os
+ from importlib import metadata
+
+ import requests
+
+ # Find the path to package lib
+ try:
+ distribution = metadata.distribution(package_name)
+ lib_path = distribution.locate_file("")
+ except metadata.PackageNotFoundError:
+ lib_path = None
+
+ # Check if the JAR already exists
+ jar_path = os.path.join(lib_path, "pyflink/lib", f"{jar_name}.jar")
+ if os.path.exists(jar_path):
+ return jar_path
+
+ # Download the JAR
+ response = requests.get(jar_url, stream=True)
+ if response.status_code != 200:
+ raise SystemError(
+ f"Failed to download the JAR file \n"
+ f"\t jar_url= {jar_url} \n"
+ f"\t response.status_code= {response.status_code}"
+ )
+
+ # Save the JAR
+ with open(jar_path, "wb") as jar_file:
+ for chunk in response.iter_content(chunk_size=128):
+ jar_file.write(chunk)
+
+ return jar_path
+
+
+# TODO (mehmet): Why does Flink backend not implement `list_catalogs()`?
+def get_catalogs(con) -> list[str]:
+ show_catalogs = con.raw_sql("show catalogs")
+ catalog_list = []
+ with show_catalogs.collect() as results:
+ for result in results:
+ catalog_list.append(result._values[0])
+
+ return catalog_list
diff --git a/ibis/backends/sql/compiler.py b/ibis/backends/sql/compiler.py
index a929e2afc9080..6b7f5c4d77821 100644
--- a/ibis/backends/sql/compiler.py
+++ b/ibis/backends/sql/compiler.py
@@ -16,6 +16,7 @@
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
+from ibis.backends.sql.expressions import TimeTravelTable
from ibis.backends.sql.rewrites import (
FirstValue,
LastValue,
@@ -1137,6 +1138,27 @@ def visit_DatabaseTable(
name, db=namespace.schema, catalog=namespace.database, quoted=self.quoted
)
+ def visit_TimeTravelDatabaseTable(
+ self,
+ op,
+ *,
+ name: str,
+ schema: sch.Schema,
+ source: Any,
+ namespace: ops.Namespace,
+ timestamp: ops.Literal,
+ ) -> TimeTravelTable:
+ table = sg.table(
+ name, db=namespace.schema, catalog=namespace.database, quoted=self.quoted
+ )
+ return TimeTravelTable(
+ this=table.this,
+ db=table.db,
+ catalog=table.catalog,
+ alias=table.alias,
+ timestamp=timestamp,
+ )
+
def visit_SelfReference(self, op, *, parent, identifier):
return parent
diff --git a/ibis/backends/sql/dialects.py b/ibis/backends/sql/dialects.py
index bd9e77d148dbe..695b8e64c4257 100644
--- a/ibis/backends/sql/dialects.py
+++ b/ibis/backends/sql/dialects.py
@@ -19,6 +19,8 @@
)
from sqlglot.dialects.dialect import rename_func
+from ibis.backends.sql.expressions import TimeTravelTable
+
ClickHouse.Generator.TRANSFORMS |= {
sge.ArraySize: rename_func("length"),
sge.ArraySort: rename_func("arraySort"),
@@ -133,8 +135,15 @@ class Generator(Hive.Generator):
sge.DayOfWeek: rename_func("dayofweek"),
sge.DayOfMonth: rename_func("dayofmonth"),
sge.Interval: _interval_with_precision,
+ TimeTravelTable: lambda self, expr: self.time_travel_table_sql(expr),
}
+ def time_travel_table_sql(self, expr: TimeTravelTable) -> str:
+ this_sql = self.sql(expr, "this")
+ alias_sql = self.sql(expr, "alias")
+ timestamp_sql = self.sql(expr, "timestamp")
+ return f"{this_sql} FOR SYSTEM_TIME AS OF {timestamp_sql} AS {alias_sql}"
+
class Tokenizer(Hive.Tokenizer):
# In Flink, embedded single quotes are escaped like most other SQL
# dialects: doubling up the single quote
diff --git a/ibis/backends/sql/expressions.py b/ibis/backends/sql/expressions.py
new file mode 100644
index 0000000000000..d5488126530b3
--- /dev/null
+++ b/ibis/backends/sql/expressions.py
@@ -0,0 +1,31 @@
+from __future__ import annotations
+
+from typing import Any
+
+import sqlglot.expressions as sge
+
+
+class TimeTravelTable(sge.Table):
+ arg_types = {
+ "this": True,
+ "alias": False,
+ "db": False,
+ "catalog": False,
+ "laterals": False,
+ "joins": False,
+ "pivots": False,
+ "hints": False,
+ "system_time": False,
+ "version": False,
+ "format": False,
+ "pattern": False,
+ "ordinality": False,
+ "when": False,
+ "timestamp": True, # Added only this, rest copied from sge.Table.
+ }
+
+ @property
+ def timestamp(self) -> Any:
+ """Retrieves the argument with key 'timestamp'."""
+
+ return self.args.get("timestamp")
diff --git a/ibis/expr/operations/relations.py b/ibis/expr/operations/relations.py
index 2e9ac577622a4..fe3229ab4ca32 100644
--- a/ibis/expr/operations/relations.py
+++ b/ibis/expr/operations/relations.py
@@ -330,6 +330,23 @@ class DatabaseTable(PhysicalTable):
namespace: Namespace = Namespace()
+@public
+class VersionedDatabaseTable(DatabaseTable):
+ """Table that is versioned with snapshots by the backend."""
+ at_time: Optional[Column] = None
+
+
+@public
+class TimeTravelDatabaseTable(DatabaseTable):
+ """Table for time travel."""
+ timestamp: Value[dt.Timestamp]
+
+ def to_expr(self):
+ from ibis.expr.types.temporal import TimeTravelTable
+
+ return TimeTravelTable(self)
+
+
@public
class InMemoryTable(PhysicalTable):
schema: Schema
diff --git a/ibis/expr/types/relations.py b/ibis/expr/types/relations.py
index f839ce88f2a30..2e3cccfc80324 100644
--- a/ibis/expr/types/relations.py
+++ b/ibis/expr/types/relations.py
@@ -38,6 +38,7 @@
from ibis.expr.schema import SchemaLike
from ibis.expr.types import Table
from ibis.expr.types.groupby import GroupedTable
+ from ibis.expr.types.temporal import TimestampValue, TimeTravelTable
from ibis.expr.types.tvf import WindowedTable
from ibis.formats.pyarrow import PyArrowData
from ibis.selectors import IfAnyAll
@@ -3083,6 +3084,46 @@ def asof_join(
right, on, predicates, tolerance=tolerance, lname=lname, rname=rname
)
+ def time_travel(
+ self: Table,
+ timestamp: TimestampValue,
+ ) -> TimeTravelTable:
+ """Time travels the table back to `timestamp`.
+
+ Returns a time-travel table that represents a snapshot of the caller
+ table, where the snapshot corresponds to the given `timestamp`. Enables
+ building queries against the historical table snapshots. How far the `timestamp`
+ can go back in time depends on the specific backend support. If the timestamp
+ specifies a point in time prior to the allowed time travel window or prior
+ to the creation of the table, then the query built off of the returned table
+ will fail while getting executed by the bounded backend.
+
+ Parameters
+ ----------
+ timestamp
+ Timestamp to which this table will be travelled back to.
+
+ Returns
+ -------
+ Table
+ Table expression
+ """
+
+ op = self.op()
+ # TODO (mehmet): Does it make sense to support `UnboundTable` as well?
+ if not isinstance(op, ops.DatabaseTable):
+ raise com.IbisInputError(
+ "`time_travel()` is supported for only tables of type `DatabaseTable`."
+ )
+
+ return ops.TimeTravelDatabaseTable(
+ name=op.name,
+ schema=op.schema,
+ source=op.source,
+ namespace=op.namespace,
+ timestamp=timestamp,
+ ).to_expr()
+
def cross_join(
left: Table,
right: Table,
diff --git a/ibis/expr/types/temporal.py b/ibis/expr/types/temporal.py
index c43fe6780c37c..e656245f4eb63 100644
--- a/ibis/expr/types/temporal.py
+++ b/ibis/expr/types/temporal.py
@@ -8,19 +8,19 @@
import ibis.expr.datashape as ds
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
+import ibis.expr.types as ir
from ibis import util
from ibis.common.annotations import annotated
from ibis.common.temporal import IntervalUnit
from ibis.expr.types.core import _binop
from ibis.expr.types.generic import Column, Scalar, Value
+from ibis.expr.types.relations import Table
if TYPE_CHECKING:
import datetime
import pandas as pd
- import ibis.expr.types as ir
-
class _DateComponentMixin:
"""Temporal expressions that have a date component."""
@@ -994,3 +994,8 @@ def full_name(self):
The name of the day of the week
"""
return ops.DayOfWeekName(self._expr).to_expr()
+
+
+@public
+class TimeTravelTable(Table):
+ pass