Skip to content

Commit

Permalink
Merge remote-tracking branch 'ibis-project/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
xmnlab committed Apr 16, 2018
2 parents 70ba0ec + f917f7c commit c952e95
Show file tree
Hide file tree
Showing 33 changed files with 699 additions and 360 deletions.
13 changes: 13 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import fnmatch
import os
import sys

collect_ignore = ['setup.py']

if sys.version_info.major == 2:
this_directory = os.path.dirname(__file__)
bigquery_udf = os.path.join(this_directory, 'ibis', 'bigquery', 'udf')
for root, _, filenames in os.walk(bigquery_udf):
for filename in filenames:
if fnmatch.fnmatch(filename, '*.py'):
collect_ignore.append(os.path.join(root, filename))
18 changes: 9 additions & 9 deletions docs/source/developer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Conda Environment Setup
.. code:: sh
# Create a conda environment ready for ibis development
conda env create --name ibis36 --file=ci/requirements_dev-3.6.yml
conda env create --name ibis36 --file=ci/requirements-dev-3.6.yml
# Activate the conda environment
source activate ibis36
Expand All @@ -54,8 +54,8 @@ The following command does three steps:

.. code:: sh
cd testing
bash start-all.sh
cd ci
bash build.sh
To use specific backends follow the instructions below.

Expand All @@ -71,7 +71,7 @@ Download Test Dataset

.. code:: sh
testing/datamgr.py download
ci/datamgr.py download
Setting Up Test Databases
Expand All @@ -81,7 +81,7 @@ To start each backends

.. code:: sh
cd testing
cd ci
docker-compose up
Expand All @@ -99,7 +99,7 @@ Impala (with UDFs)

.. code:: sh
testing/impalamgr.py load --data --data-dir ibis-testing-data
ci/impalamgr.py load --data --data-dir ibis-testing-data
Clickhouse
^^^^^^^^^^
Expand All @@ -115,7 +115,7 @@ Clickhouse

.. code:: sh
testing/datamgr.py clickhouse
ci/datamgr.py clickhouse
PostgreSQL
^^^^^^^^^^
Expand All @@ -127,7 +127,7 @@ Here's how to load test data into PostgreSQL:

.. code:: sh
testing/datamgr.py postgres
ci/datamgr.py postgres
SQLite
^^^^^^
Expand All @@ -137,7 +137,7 @@ instructions above, then SQLite will be available in the conda environment.

.. code:: sh
testing/datamgr.py sqlite
ci/datamgr.py sqlite
Running Tests
Expand Down
73 changes: 71 additions & 2 deletions ibis/bigquery/compiler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from functools import partial

import six

from multipledispatch import Dispatcher

import ibis
import ibis.common as com

Expand All @@ -15,6 +19,8 @@
from ibis.impala.compiler import ImpalaSelect, unary, fixed_arity
from ibis.impala import compiler as impala_compiler

from ibis.bigquery.datatypes import ibis_type_to_bigquery_type


class BigQueryUDFNode(ops.ValueOp):
pass
Expand Down Expand Up @@ -102,12 +108,27 @@ def extract_field_formatter(translator, expr):
}


bigquery_cast = Dispatcher('bigquery_cast')


@bigquery_cast.register(six.string_types, dt.Timestamp, dt.Integer)
def bigquery_cast_timestamp_to_integer(compiled_arg, from_, to):
return 'UNIX_MICROS({})'.format(compiled_arg)


@bigquery_cast.register(six.string_types, dt.DataType, dt.DataType)
def bigquery_cast_generate(compiled_arg, from_, to):
target_name = to.name.lower()
sql_type = SQL_TYPE_NAMES[target_name]
uppercase_sql_type = sql_type.upper()
return 'CAST({} AS {})'.format(compiled_arg, uppercase_sql_type)


def _cast(translator, expr):
op = expr.op()
arg, target_type = op.args
arg_formatted = translator.translate(arg)
sql_type = SQL_TYPE_NAMES[target_type.name.lower()]
return 'CAST({} AS {})'.format(arg_formatted, sql_type.upper())
return bigquery_cast(arg_formatted, arg.type(), target_type)


def _struct_field(translator, expr):
Expand Down Expand Up @@ -307,6 +328,13 @@ def _formatter(translator, expr):
return _formatter


STRFTIME_FORMAT_FUNCTIONS = {
dt.Date: 'DATE',
dt.Time: 'TIME',
dt.Timestamp: 'TIMESTAMP',
}


_operation_registry = impala_compiler._operation_registry.copy()
_operation_registry.update({
ops.ExtractYear: _extract_field('year'),
Expand Down Expand Up @@ -400,6 +428,27 @@ def bigquery_compiles_divide(t, e):
return 'IEEE_DIVIDE({}, {})'.format(*map(t.translate, e.op().args))


@compiles(ops.Strftime)
def compiles_strftime(translator, expr):
arg, format_string = expr.op().args
arg_type = arg.type()
strftime_format_func_name = STRFTIME_FORMAT_FUNCTIONS[type(arg_type)]
fmt_string = translator.translate(format_string)
arg_formatted = translator.translate(arg)
if isinstance(arg_type, dt.Timestamp):
return 'FORMAT_{}({}, {}, {!r})'.format(
strftime_format_func_name,
fmt_string,
arg_formatted,
arg_type.timezone if arg_type.timezone is not None else 'UTC'
)
return 'FORMAT_{}({}, {})'.format(
strftime_format_func_name,
fmt_string,
arg_formatted
)


@rewrites(ops.Any)
def bigquery_rewrite_any(expr):
arg, = expr.op().args
Expand Down Expand Up @@ -441,6 +490,26 @@ def log2(expr):
return arg.log(2)


UNIT_FUNCS = {
's': 'SECONDS',
'ms': 'MILLIS',
'us': 'MICROS',
}


@compiles(ops.TimestampFromUNIX)
def compiles_timestamp_from_unix(t, e):
value, unit = e.op().args
return 'TIMESTAMP_{}({})'.format(UNIT_FUNCS[unit], t.translate(value))


@compiles(ops.Floor)
def compiles_floor(t, e):
bigquery_type = ibis_type_to_bigquery_type(e.type())
arg = e.op().arg
return 'CAST(FLOOR({}) AS {})'.format(t.translate(arg), bigquery_type)


class BigQueryDialect(impala_compiler.ImpalaDialect):

translator = BigQueryExprTranslator
Expand Down
68 changes: 68 additions & 0 deletions ibis/bigquery/datatypes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from multipledispatch import Dispatcher

import ibis.expr.datatypes as dt


class TypeTranslationContext(object):
"""A tag class to allow alteration of the way a particular type is
translated.
Notes
-----
This is used to translate INT64 types to FLOAT64 when INT64 is used in the
definition of a UDF.
"""
__slots__ = ()


ibis_type_to_bigquery_type = Dispatcher('ibis_type_to_bigquery_type')


@ibis_type_to_bigquery_type.register(dt.DataType)
def trans_default(t):
return ibis_type_to_bigquery_type(t, TypeTranslationContext())


@ibis_type_to_bigquery_type.register(dt.Floating, TypeTranslationContext)
def trans_float64(t, context):
return 'FLOAT64'


@ibis_type_to_bigquery_type.register(dt.Integer, TypeTranslationContext)
def trans_integer(t, context):
return 'INT64'


@ibis_type_to_bigquery_type.register(dt.Array, TypeTranslationContext)
def trans_array(t, context):
return 'ARRAY<{}>'.format(
ibis_type_to_bigquery_type(t.value_type, context))


@ibis_type_to_bigquery_type.register(dt.Struct, TypeTranslationContext)
def trans_struct(t, context):
return 'STRUCT<{}>'.format(
', '.join(
'{} {}'.format(
name,
ibis_type_to_bigquery_type(dt.dtype(type), context)
) for name, type in zip(t.names, t.types)
)
)


@ibis_type_to_bigquery_type.register(dt.Date, TypeTranslationContext)
def trans_date(t, context):
return 'DATE'


@ibis_type_to_bigquery_type.register(dt.Timestamp, TypeTranslationContext)
def trans_timestamp(t, context):
if t.timezone is not None:
raise TypeError('BigQuery does not support timestamps with timezones')
return 'TIMESTAMP'


@ibis_type_to_bigquery_type.register(dt.DataType, TypeTranslationContext)
def trans_type(t, context):
return str(t).upper()
5 changes: 1 addition & 4 deletions ibis/bigquery/udf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
try:
from ibis.bigquery.udf.api import udf # noqa: F401
except ImportError:
pass # BigQuery UDFs are not supported in Python 2
from ibis.bigquery.udf.api import udf # noqa: F401
33 changes: 24 additions & 9 deletions ibis/bigquery/udf/api.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
import functools
import collections
import inspect

import ibis.expr.rules as rlz
import ibis.expr.datatypes as dt

from ibis.compat import functools, signature
from ibis.expr.signature import Argument as Arg

from ibis.bigquery.compiler import BigQueryUDFNode, compiles

from ibis.bigquery.udf.core import (
PythonToJavaScriptTranslator,
ibis_type_to_bigquery_type,
UDFContext
)
from ibis.bigquery.datatypes import ibis_type_to_bigquery_type


__all__ = 'udf',
Expand Down Expand Up @@ -124,11 +132,15 @@ def wrapper(f):
if not callable(f):
raise TypeError('f must be callable, got {}'.format(f))

udf_node = type(
f.__name__,
(BigQueryUDFNode,),
dict(input_type=input_type, output_type=output_type.array_type),
)
sig = signature(f)
udf_node_fields = collections.OrderedDict([
(name, Arg(rlz.value(type)))
for name, type in zip(sig.parameters.keys(), input_type)
] + [
('output_type', output_type.array_type),
('__slots__', ('js',)),
])
udf_node = type(f.__name__, (BigQueryUDFNode,), udf_node_fields)

@compiles(udf_node)
def compiles_udf_node(t, expr):
Expand All @@ -138,6 +150,7 @@ def compiles_udf_node(t, expr):
)

source = PythonToJavaScriptTranslator(f).compile()
type_translation_context = UDFContext()
js = '''\
CREATE TEMPORARY FUNCTION {name}({signature})
RETURNS {return_type}
Expand All @@ -146,12 +159,14 @@ def compiles_udf_node(t, expr):
return {name}({args});
""";'''.format(
name=f.__name__,
return_type=ibis_type_to_bigquery_type(output_type),
return_type=ibis_type_to_bigquery_type(
dt.dtype(output_type), type_translation_context),
source=source,
signature=', '.join(
'{name} {type}'.format(
name=name,
type=ibis_type_to_bigquery_type(type)
type=ibis_type_to_bigquery_type(
dt.dtype(type), type_translation_context)
) for name, type in zip(
inspect.signature(f).parameters.keys(), input_type
)
Expand Down
Loading

0 comments on commit c952e95

Please sign in to comment.