Skip to content

Commit

Permalink
feat(api): support time travel
Browse files Browse the repository at this point in the history
  • Loading branch information
mfatihaktas committed Mar 21, 2024
1 parent 32b7514 commit 38c7e9b
Show file tree
Hide file tree
Showing 15 changed files with 590 additions and 11 deletions.
2 changes: 2 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ services:
AZURE_ABFS_OAUTH_SECRET: ""
AZURE_ABFS_OAUTH_ENDPOINT: ""
AZURE_WASB_ACCESS_KEY: ""
ports:
- 9083:9083
depends_on:
hive-metastore-db:
condition: service_healthy
Expand Down
32 changes: 32 additions & 0 deletions docker/flink/conf/hive-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Hive configuration for Impala quickstart docker cluster.
-->
<configuration>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>

<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
</property>
</configuration>
28 changes: 28 additions & 0 deletions ibis/backends/bigquery/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sqlglot as sg
import sqlglot.expressions as sge
from sqlglot.dialects import BigQuery
from typing import TYPE_CHECKING

Check warning on line 10 in ibis/backends/bigquery/compiler.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/compiler.py#L10

Added line #L10 was not covered by tests

import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
Expand All @@ -22,6 +23,12 @@
)
from ibis.common.temporal import DateUnit, IntervalUnit, TimestampUnit, TimeUnit

if TYPE_CHECKING:
from typing import Any

import ibis.expr.schema as sch


_NAME_REGEX = re.compile(r'[^!"$()*,./;?@[\\\]^`{}~\n]+')


Expand Down Expand Up @@ -713,3 +720,24 @@ def visit_CountDistinct(self, op, *, arg, where):
if where is not None:
arg = self.if_(where, arg, NULL)
return self.f.count(sge.Distinct(expressions=[arg]))

def visit_TimeTravelDatabaseTable(

Check warning on line 724 in ibis/backends/bigquery/compiler.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/compiler.py#L724

Added line #L724 was not covered by tests
self,
op,
*,
name: str,
schema: sch.Schema,
source: Any,
namespace: ops.Namespace,
timestamp: ops.Literal,
):
table = sg.table(

Check warning on line 734 in ibis/backends/bigquery/compiler.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/compiler.py#L734

Added line #L734 was not covered by tests
name, db=namespace.schema, catalog=namespace.database, quoted=self.quoted
)
return sge.Table(

Check warning on line 737 in ibis/backends/bigquery/compiler.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/compiler.py#L737

Added line #L737 was not covered by tests
this=table.this,
db=table.db,
catalog=table.catalog,
alias=table.alias,
version=sge.Version(this="SYSTEM_TIME", kind="AS OF", expression=timestamp),
)
55 changes: 55 additions & 0 deletions ibis/backends/bigquery/tests/unit/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,3 +632,58 @@ def test_unnest(snapshot):
).select(level_two=lambda t: t.level_one.unnest())
)
snapshot.assert_match(result, "out_two_unnests.sql")


def test_time_travel(alltypes):
from ibis.selectors import all

# TODO (mehmet): Setting schema from `alltypes` as
# schema = alltypes.schema
# fails with this error
# `schema`: <bound method Table.schema of UnboundTable: functional_alltypes ... > is not coercible to a Schema
schema = ibis.schema(
dict(
id="int32",
bool_col="boolean",
tinyint_col="int8",
smallint_col="int16",
int_col="int32",
bigint_col="int64",
float_col="float32",
double_col="float64",
date_string_col="string",
string_col="string",
timestamp_col=dt.Timestamp(timezone="UTC"),
year="int32",
month="int32",
)
)

table = ops.DatabaseTable(
name="my_table",
schema=schema,
source="bigquery",
namespace=ops.Namespace(schema="my_dataset", database="my_project"),
).to_expr()
table = table.time_travel(ibis.timestamp("2023-01-02T03:04:05"))
expr = table.select(all())

sql = ibis.bigquery.compile(expr)

expected_sql = """SELECT
`t0`.`id`,
`t0`.`bool_col`,
`t0`.`tinyint_col`,
`t0`.`smallint_col`,
`t0`.`int_col`,
`t0`.`bigint_col`,
`t0`.`float_col`,
`t0`.`double_col`,
`t0`.`date_string_col`,
`t0`.`string_col`,
`t0`.`timestamp_col`,
`t0`.`year`,
`t0`.`month`
FROM my_project.my_dataset.`my_table` FOR SYSTEM_TIME AS OF datetime('2023-01-02T03:04:05') AS `t0`"""

assert sql == expected_sql
17 changes: 16 additions & 1 deletion ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def table(
f"`database` must be a string; got {type(database)}"
)
schema = self.get_schema(name, catalog=catalog, database=database)

node = ops.DatabaseTable(
name,
schema=schema,
Expand Down Expand Up @@ -842,6 +843,8 @@ def insert(
self,
table_name: str,
obj: pa.Table | pd.DataFrame | ir.Table | list | dict,
*,
schema: sch.Schema | None = None,
database: str | None = None,
catalog: str | None = None,
overwrite: bool = False,
Expand All @@ -854,6 +857,8 @@ def insert(
The name of the table to insert data into.
obj
The source data or expression to insert.
schema
The schema for the table.
database
Name of the attached database that the table is located in.
catalog
Expand All @@ -876,6 +881,8 @@ def insert(
import pyarrow as pa
import pyarrow_hotfix # noqa: F401

from ibis.backends.flink.datatypes import FlinkRowSchema

Check warning on line 884 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L884

Added line #L884 was not covered by tests

if isinstance(obj, ir.Table):
statement = InsertSelect(
table_name,
Expand All @@ -891,7 +898,15 @@ def insert(
if isinstance(obj, dict):
obj = pd.DataFrame.from_dict(obj)
if isinstance(obj, pd.DataFrame):
table = self._table_env.from_pandas(obj)
if schema:
schema_ = FlinkRowSchema.from_ibis(schema)
table = self._table_env.from_pandas(obj, schema_)

Check warning on line 903 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L902-L903

Added lines #L902 - L903 were not covered by tests
else:
table = self._table_env.from_pandas(obj)

Check warning on line 905 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L905

Added line #L905 was not covered by tests

pyflink_schema = FlinkRowSchema.from_ibis(schema)
table = self._table_env.from_pandas(obj, pyflink_schema)

Check warning on line 908 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L907-L908

Added lines #L907 - L908 were not covered by tests

return table.execute_insert(table_name, overwrite=overwrite)

if isinstance(obj, list):
Expand Down
8 changes: 8 additions & 0 deletions ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,11 @@ def generate_csv_configs(csv_file):
}

return generate_csv_configs


@pytest.fixture(scope="module")

Check warning on line 172 in ibis/backends/flink/tests/conftest.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/tests/conftest.py#L172

Added line #L172 was not covered by tests
def tempdir_sink_configs():
def generate_tempdir_configs(tempdir):
return {"connector": "filesystem", "path": tempdir, "format": "csv"}

Check warning on line 175 in ibis/backends/flink/tests/conftest.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/tests/conftest.py#L174-L175

Added lines #L174 - L175 were not covered by tests

return generate_tempdir_configs

Check warning on line 177 in ibis/backends/flink/tests/conftest.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/tests/conftest.py#L177

Added line #L177 was not covered by tests
8 changes: 0 additions & 8 deletions ibis/backends/flink/tests/test_ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@
from ibis.backends.tests.errors import Py4JJavaError


@pytest.fixture
def tempdir_sink_configs():
def generate_tempdir_configs(tempdir):
return {"connector": "filesystem", "path": tempdir, "format": "csv"}

return generate_tempdir_configs


@pytest.mark.parametrize("temp", [True, False])
def test_list_tables(con, temp):
assert len(con.list_tables(temp=temp))
Expand Down
Loading

0 comments on commit 38c7e9b

Please sign in to comment.