Skip to content

Commit

Permalink
feat(api): support time travel
Browse files Browse the repository at this point in the history
  • Loading branch information
mfatihaktas committed Apr 4, 2024
1 parent 27c9d0f commit 9304ad9
Show file tree
Hide file tree
Showing 15 changed files with 611 additions and 11 deletions.
2 changes: 2 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ services:
AZURE_ABFS_OAUTH_SECRET: ""
AZURE_ABFS_OAUTH_ENDPOINT: ""
AZURE_WASB_ACCESS_KEY: ""
ports:
- 9083:9083
depends_on:
hive-metastore-db:
condition: service_healthy
Expand Down
32 changes: 32 additions & 0 deletions docker/flink/conf/hive-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Hive configuration for Impala quickstart docker cluster.
-->
<configuration>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>

<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
</property>
</configuration>
28 changes: 28 additions & 0 deletions ibis/backends/bigquery/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import re
from typing import TYPE_CHECKING

Check warning on line 6 in ibis/backends/bigquery/compiler.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/compiler.py#L6

Added line #L6 was not covered by tests

import sqlglot as sg
import sqlglot.expressions as sge
Expand All @@ -23,6 +24,12 @@
from ibis.common.temporal import DateUnit, IntervalUnit, TimestampUnit, TimeUnit
from ibis.expr.rewrites import rewrite_stringslice

if TYPE_CHECKING:
from typing import Any

import ibis.expr.schema as sch


_NAME_REGEX = re.compile(r'[^!"$()*,./;?@[\\\]^`{}~\n]+')


Expand Down Expand Up @@ -698,3 +705,24 @@ def visit_CountDistinct(self, op, *, arg, where):
if where is not None:
arg = self.if_(where, arg, NULL)
return self.f.count(sge.Distinct(expressions=[arg]))

def visit_TimeTravelDatabaseTable(

Check warning on line 709 in ibis/backends/bigquery/compiler.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/compiler.py#L709

Added line #L709 was not covered by tests
self,
op,
*,
name: str,
schema: sch.Schema,
source: Any,
namespace: ops.Namespace,
timestamp: ops.Literal,
):
table = sg.table(

Check warning on line 719 in ibis/backends/bigquery/compiler.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/compiler.py#L719

Added line #L719 was not covered by tests
name, db=namespace.schema, catalog=namespace.database, quoted=self.quoted
)
return sge.Table(

Check warning on line 722 in ibis/backends/bigquery/compiler.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/compiler.py#L722

Added line #L722 was not covered by tests
this=table.this,
db=table.db,
catalog=table.catalog,
alias=table.alias,
version=sge.Version(this="SYSTEM_TIME", kind="AS OF", expression=timestamp),
)
55 changes: 55 additions & 0 deletions ibis/backends/bigquery/tests/unit/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,3 +632,58 @@ def test_unnest(snapshot):
).select(level_two=lambda t: t.level_one.unnest())
)
snapshot.assert_match(result, "out_two_unnests.sql")


def test_time_travel(alltypes):
from ibis.selectors import all

# TODO (mehmet): Setting schema from `alltypes` as
# schema = alltypes.schema
# fails with this error
# `schema`: <bound method Table.schema of UnboundTable: functional_alltypes ... > is not coercible to a Schema
schema = ibis.schema(
dict(
id="int32",
bool_col="boolean",
tinyint_col="int8",
smallint_col="int16",
int_col="int32",
bigint_col="int64",
float_col="float32",
double_col="float64",
date_string_col="string",
string_col="string",
timestamp_col=dt.Timestamp(timezone="UTC"),
year="int32",
month="int32",
)
)

table = ops.DatabaseTable(
name="my_table",
schema=schema,
source="bigquery",
namespace=ops.Namespace(schema="my_dataset", database="my_project"),
).to_expr()
table = table.time_travel(ibis.timestamp("2023-01-02T03:04:05"))
expr = table.select(all())

sql = ibis.bigquery.compile(expr)

expected_sql = """SELECT
`t0`.`id`,
`t0`.`bool_col`,
`t0`.`tinyint_col`,
`t0`.`smallint_col`,
`t0`.`int_col`,
`t0`.`bigint_col`,
`t0`.`float_col`,
`t0`.`double_col`,
`t0`.`date_string_col`,
`t0`.`string_col`,
`t0`.`timestamp_col`,
`t0`.`year`,
`t0`.`month`
FROM my_project.my_dataset.`my_table` FOR SYSTEM_TIME AS OF datetime('2023-01-02T03:04:05') AS `t0`"""

assert sql == expected_sql
17 changes: 16 additions & 1 deletion ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def table(
f"`database` must be a string; got {type(database)}"
)
schema = self.get_schema(name, catalog=catalog, database=database)

node = ops.DatabaseTable(
name,
schema=schema,
Expand Down Expand Up @@ -843,6 +844,8 @@ def insert(
self,
table_name: str,
obj: pa.Table | pd.DataFrame | ir.Table | list | dict,
*,
schema: sch.Schema | None = None,
database: str | None = None,
catalog: str | None = None,
overwrite: bool = False,
Expand All @@ -855,6 +858,8 @@ def insert(
The name of the table to insert data into.
obj
The source data or expression to insert.
schema
The schema for the table.
database
Name of the attached database that the table is located in.
catalog
Expand All @@ -877,6 +882,8 @@ def insert(
import pyarrow as pa
import pyarrow_hotfix # noqa: F401

from ibis.backends.flink.datatypes import FlinkRowSchema

Check warning on line 885 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L885

Added line #L885 was not covered by tests

if isinstance(obj, ir.Table):
statement = InsertSelect(
table_name,
Expand All @@ -892,7 +899,15 @@ def insert(
if isinstance(obj, dict):
obj = pd.DataFrame.from_dict(obj)
if isinstance(obj, pd.DataFrame):
table = self._table_env.from_pandas(obj)
if schema:
schema_ = FlinkRowSchema.from_ibis(schema)
table = self._table_env.from_pandas(obj, schema_)

Check warning on line 904 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L903-L904

Added lines #L903 - L904 were not covered by tests
else:
table = self._table_env.from_pandas(obj)

Check warning on line 906 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L906

Added line #L906 was not covered by tests

pyflink_schema = FlinkRowSchema.from_ibis(schema)
table = self._table_env.from_pandas(obj, pyflink_schema)

Check warning on line 909 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L908-L909

Added lines #L908 - L909 were not covered by tests

return table.execute_insert(table_name, overwrite=overwrite)

if isinstance(obj, list):
Expand Down
8 changes: 8 additions & 0 deletions ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,11 @@ def generate_csv_configs(csv_file):
}

return generate_csv_configs


@pytest.fixture(scope="module")

Check warning on line 193 in ibis/backends/flink/tests/conftest.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/tests/conftest.py#L193

Added line #L193 was not covered by tests
def tempdir_sink_configs():
def generate_tempdir_configs(tempdir):
return {"connector": "filesystem", "path": tempdir, "format": "csv"}

Check warning on line 196 in ibis/backends/flink/tests/conftest.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/tests/conftest.py#L195-L196

Added lines #L195 - L196 were not covered by tests

return generate_tempdir_configs

Check warning on line 198 in ibis/backends/flink/tests/conftest.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/tests/conftest.py#L198

Added line #L198 was not covered by tests
8 changes: 0 additions & 8 deletions ibis/backends/flink/tests/test_ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@
from ibis.backends.tests.errors import Py4JJavaError


@pytest.fixture
def tempdir_sink_configs():
def generate_tempdir_configs(tempdir):
return {"connector": "filesystem", "path": tempdir, "format": "csv"}

return generate_tempdir_configs


@pytest.mark.parametrize("temp", [True, False])
def test_list_tables(con, temp):
assert len(con.list_tables(temp=temp))
Expand Down
Loading

0 comments on commit 9304ad9

Please sign in to comment.