Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): time travel query #8517

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ services:
AZURE_ABFS_OAUTH_SECRET: ""
AZURE_ABFS_OAUTH_ENDPOINT: ""
AZURE_WASB_ACCESS_KEY: ""
ports:
- 9083:9083
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this necessary? If you're looking to expose the hive metastore to the flink container, do that by adding this container to the flink network, not by exposing the port to the host.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I actually first tried adding networks: - flink for hive-metastore but that led to

E   Caused by: java.net.ConnectException: Connection refused (Connection refused)

Digging deeper into this, I found that TestConfForStreaming (the TestConf used by the tests in ibis/backends/flink/tests/) creates a local Flink environment, which means the tests do not run on the dockerized environment. This is why we have to expose the meta-store to the local env.

I have put the env creation code in a new function get_table_env(remote_env: bool, streaming_mode: bool) to make it more transparent and configurable. Switched to remote env in TestConfForStreaming to see if we can run the tests without exposing the meta-store port, but that gave me

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 8065e44d3abe8da591f9c169cde351cd)
...
Caused by: java.io.IOException: Failed to create the parent directory: /private/var/folders/9g/7ncp4wjj22q25spxdy7t44080000gn/T/pytest-of-mehmet/pytest-520/test_table0

The error seems to be due to the Flink client being on a file system that is not shared with the Flink TaskManager. I guess this is why the env was left to be local in TestConfForStreaming at the time.

depends_on:
hive-metastore-db:
condition: service_healthy
Expand Down
32 changes: 32 additions & 0 deletions docker/flink/conf/hive-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Hive configuration for Impala quickstart docker cluster.
-->
<configuration>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>

<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
</property>
</configuration>
28 changes: 28 additions & 0 deletions ibis/backends/bigquery/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import re
from typing import TYPE_CHECKING

import sqlglot as sg
import sqlglot.expressions as sge
Expand All @@ -23,6 +24,12 @@
from ibis.common.temporal import DateUnit, IntervalUnit, TimestampUnit, TimeUnit
from ibis.expr.rewrites import rewrite_stringslice

if TYPE_CHECKING:
from typing import Any

import ibis.expr.schema as sch


_NAME_REGEX = re.compile(r'[^!"$()*,./;?@[\\\]^`{}~\n]+')


Expand Down Expand Up @@ -698,3 +705,24 @@ def visit_CountDistinct(self, op, *, arg, where):
if where is not None:
arg = self.if_(where, arg, NULL)
return self.f.count(sge.Distinct(expressions=[arg]))

def visit_TimeTravelDatabaseTable(
self,
op,
*,
name: str,
schema: sch.Schema,
source: Any,
namespace: ops.Namespace,
timestamp: ops.Literal,
):
table = sg.table(
name, db=namespace.database, catalog=namespace.catalog, quoted=self.quoted
)
return sge.Table(
this=table.this,
db=table.db,
catalog=table.catalog,
alias=table.alias,
version=sge.Version(this="SYSTEM_TIME", kind="AS OF", expression=timestamp),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
SELECT
`t0`.`id`,
`t0`.`bool_col`,
`t0`.`tinyint_col`,
`t0`.`smallint_col`,
`t0`.`int_col`,
`t0`.`bigint_col`,
`t0`.`float_col`,
`t0`.`double_col`,
`t0`.`date_string_col`,
`t0`.`string_col`,
`t0`.`timestamp_col`,
`t0`.`year`,
`t0`.`month`
FROM my_project.my_dataset.`my_table` FOR SYSTEM_TIME AS OF datetime('2023-01-02T03:04:05') AS `t0`
39 changes: 39 additions & 0 deletions ibis/backends/bigquery/tests/unit/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,3 +632,42 @@ def test_unnest(snapshot):
).select(level_two=lambda t: t.level_one.unnest())
)
snapshot.assert_match(result, "out_two_unnests.sql")


def test_time_travel(alltypes, snapshot):
from ibis.selectors import all

# TODO (mehmet): Setting schema from `alltypes` as
# schema = alltypes.schema
# fails with this error
# `schema`: <bound method Table.schema of UnboundTable: functional_alltypes ... > is not coercible to a Schema
schema = ibis.schema(
dict(
id="int32",
bool_col="boolean",
tinyint_col="int8",
smallint_col="int16",
int_col="int32",
bigint_col="int64",
float_col="float32",
double_col="float64",
date_string_col="string",
string_col="string",
timestamp_col=dt.Timestamp(timezone="UTC"),
year="int32",
month="int32",
)
)

table = ops.DatabaseTable(
name="my_table",
schema=schema,
source="bigquery",
namespace=ops.Namespace(catalog="my_project", database="my_dataset"),
).to_expr()
table = table.time_travel(ibis.timestamp("2023-01-02T03:04:05"))
expr = table.select(all())

sql = ibis.bigquery.compile(expr)

snapshot.assert_match(sql, "out.sql")
17 changes: 16 additions & 1 deletion ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def table(
f"`database` must be a string; got {type(database)}"
)
schema = self.get_schema(name, catalog=catalog, database=database)

node = ops.DatabaseTable(
name,
schema=schema,
Expand Down Expand Up @@ -843,6 +844,8 @@ def insert(
self,
table_name: str,
obj: pa.Table | pd.DataFrame | ir.Table | list | dict,
*,
schema: sch.Schema | None = None,
database: str | None = None,
catalog: str | None = None,
overwrite: bool = False,
Expand All @@ -855,6 +858,8 @@ def insert(
The name of the table to insert data into.
obj
The source data or expression to insert.
schema
The schema for the table.
database
Name of the attached database that the table is located in.
catalog
Expand All @@ -877,6 +882,8 @@ def insert(
import pyarrow as pa
import pyarrow_hotfix # noqa: F401

from ibis.backends.flink.datatypes import FlinkRowSchema

if isinstance(obj, ir.Table):
statement = InsertSelect(
table_name,
Expand All @@ -892,7 +899,15 @@ def insert(
if isinstance(obj, dict):
obj = pd.DataFrame.from_dict(obj)
if isinstance(obj, pd.DataFrame):
table = self._table_env.from_pandas(obj)
if schema:
schema_ = FlinkRowSchema.from_ibis(schema)
table = self._table_env.from_pandas(obj, schema_)
else:
table = self._table_env.from_pandas(obj)

pyflink_schema = FlinkRowSchema.from_ibis(schema)
table = self._table_env.from_pandas(obj, pyflink_schema)

return table.execute_insert(table_name, overwrite=overwrite)

if isinstance(obj, list):
Expand Down
8 changes: 8 additions & 0 deletions ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,11 @@ def generate_csv_configs(csv_file):
}

return generate_csv_configs


@pytest.fixture(scope="module")
def tempdir_sink_configs():
def generate_tempdir_configs(tempdir):
return {"connector": "filesystem", "path": tempdir, "format": "csv"}

return generate_tempdir_configs
8 changes: 0 additions & 8 deletions ibis/backends/flink/tests/test_ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@
from ibis.backends.tests.errors import Py4JJavaError


@pytest.fixture
def tempdir_sink_configs():
def generate_tempdir_configs(tempdir):
return {"connector": "filesystem", "path": tempdir, "format": "csv"}

return generate_tempdir_configs


@pytest.mark.parametrize("temp", [True, False])
def test_list_tables(con, temp):
assert len(con.list_tables(temp=temp))
Expand Down
Loading
Loading