From 63c92747e63bc99abc6dcdb232f5230ad3f3cb7a Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Wed, 18 Apr 2018 11:06:12 -0400 Subject: [PATCH 1/9] ENH: Upgrade google-cloud-bigquery dependency to 1.0.0 Upgrade to `google-cloud-bigquery` 1.0.0 cc @tswast Author: Phillip Cloud Closes #1424 from cpcloud/bigquery-1.0.0-upgrade and squashes the following commits: fa7f0f80 [Phillip Cloud] ENH: Upgrade google-cloud-bigquery dependency to 1.0.0 --- ci/requirements-dev-2.7.yml | 2 +- ci/requirements-dev-3.5.yml | 2 +- ci/requirements-dev-3.6.yml | 2 +- ci/requirements-docs-3.6.yml | 2 +- ibis/bigquery/client.py | 238 +++++++------------- ibis/bigquery/tests/conftest.py | 2 +- ibis/bigquery/tests/test_client.py | 38 +++- ibis/bigquery/udf/api.py | 7 +- ibis/bigquery/udf/tests/test_udf_execute.py | 35 ++- ibis/expr/schema.py | 9 +- setup.py | 3 +- 11 files changed, 166 insertions(+), 174 deletions(-) diff --git a/ci/requirements-dev-2.7.yml b/ci/requirements-dev-2.7.yml index 92f1fe379281..0defe3a496e2 100644 --- a/ci/requirements-dev-2.7.yml +++ b/ci/requirements-dev-2.7.yml @@ -10,7 +10,7 @@ dependencies: - flake8 - funcsigs - functools32 - - google-cloud-bigquery<0.28 + - google-cloud-bigquery>=1.0.0 - graphviz - impyla>=0.14.0 - jinja2 diff --git a/ci/requirements-dev-3.5.yml b/ci/requirements-dev-3.5.yml index 287df5ca01f4..ce2af1155945 100644 --- a/ci/requirements-dev-3.5.yml +++ b/ci/requirements-dev-3.5.yml @@ -7,7 +7,7 @@ dependencies: - clickhouse-sqlalchemy - cmake - flake8 - - google-cloud-bigquery<0.28 + - google-cloud-bigquery>=1.0.0 - graphviz - impyla>=0.14.0 - jinja2 diff --git a/ci/requirements-dev-3.6.yml b/ci/requirements-dev-3.6.yml index 6e46ed2f84b7..a59e656de849 100644 --- a/ci/requirements-dev-3.6.yml +++ b/ci/requirements-dev-3.6.yml @@ -7,7 +7,7 @@ dependencies: - clickhouse-sqlalchemy - cmake - flake8 - - google-cloud-bigquery<0.28 + - google-cloud-bigquery>=1.0.0 - graphviz - impyla>=0.14.0 - jinja2 diff --git a/ci/requirements-docs-3.6.yml b/ci/requirements-docs-3.6.yml index 11fb9568c931..c51705ffdf31 100644 --- a/ci/requirements-docs-3.6.yml +++ b/ci/requirements-docs-3.6.yml @@ -7,7 +7,7 @@ dependencies: - clickhouse-sqlalchemy - cmake - flake8 - - google-cloud-bigquery<0.28 + - google-cloud-bigquery>=1.0.0 - graphviz - impyla>=0.14.0 - ipython diff --git a/ibis/bigquery/client.py b/ibis/bigquery/client.py index 3050c9860093..a49ea4efbcd4 100644 --- a/ibis/bigquery/client.py +++ b/ibis/bigquery/client.py @@ -1,8 +1,9 @@ -import regex as re -import time -import collections import datetime +from collections import OrderedDict + +import regex as re + import six import pandas as pd @@ -22,22 +23,20 @@ from ibis.client import Database, Query, SQLClient from ibis.bigquery import compiler as comp -from google.api.core.exceptions import BadRequest - - NATIVE_PARTITION_COL = '_PARTITIONTIME' def _ensure_split(table_id, dataset_id): split = table_id.split('.') if len(split) > 1: - assert len(split) == 2 + assert len(split) == 2, \ + "Found more than 1 '.' character in BigQuery table_id" if dataset_id: raise ValueError( - "Can't pass a fully qualified table name *AND* a dataset_id" + "Can't pass a fully qualified table name AND a dataset_id" ) - (dataset_id, table_id) = split - return (table_id, dataset_id) + dataset_id, table_id = split + return table_id, dataset_id _IBIS_TYPE_TO_DTYPE = { @@ -75,7 +74,7 @@ def bigquery_field_to_ibis_dtype(field): typ = field.field_type if typ == 'RECORD': fields = field.fields - assert fields + assert fields, 'RECORD fields are empty' names = [el.name for el in fields] ibis_types = list(map(dt.dtype, fields)) ibis_type = dt.Struct(names, ibis_types) @@ -89,13 +88,16 @@ def bigquery_field_to_ibis_dtype(field): @sch.infer.register(bq.table.Table) def bigquery_schema(table): - pairs = [(el.name, dt.dtype(el)) for el in table.schema] - try: - if table.list_partitions(): - pairs.append((NATIVE_PARTITION_COL, dt.timestamp)) - except BadRequest: - pass - return sch.schema(pairs) + fields = OrderedDict((el.name, dt.dtype(el)) for el in table.schema) + partition_info = table._properties.get('timePartitioning', None) + + # We have a partitioned table + if partition_info is not None: + partition_field = partition_info.get('field', NATIVE_PARTITION_COL) + + # Only add a new column if it's not already a column in the schema + fields.setdefault(partition_field, dt.timestamp) + return sch.schema(fields) class BigQueryCursor(object): @@ -106,11 +108,13 @@ def __init__(self, query): self.query = query def fetchall(self): - return list(self.query.fetch_data()) + result = self.query.result() + return [row.values() for row in result] @property def columns(self): - return [field.name for field in self.query.schema] + result = self.query.result() + return [field.name for field in result.schema] def __enter__(self): # For compatibility when constructed from Query.execute() @@ -157,7 +161,7 @@ def __init__(self, client, ddl, query_parameters=None): ] def _fetch(self, cursor): - df = pd.DataFrame(cursor.fetchall(), columns=cursor.columns) + df = cursor.query.to_dataframe() return self.schema().apply_to(df) def execute(self): @@ -172,46 +176,6 @@ def execute(self): return self._wrap_result(result) -class BigQueryAPIProxy(object): - - def __init__(self, project_id): - self._client = bq.Client(project_id) - - @property - def client(self): - return self._client - - @property - def project_id(self): - return self.client.project - - def get_datasets(self): - return list(self.client.list_datasets()) - - def get_dataset(self, dataset_id): - return self.client.dataset(dataset_id) - - def get_table(self, table_id, dataset_id, reload=True): - (table_id, dataset_id) = _ensure_split(table_id, dataset_id) - table = self.client.dataset(dataset_id).table(table_id) - if reload: - table.reload() - return table - - def get_schema(self, table_id, dataset_id): - return self.get_table(table_id, dataset_id).schema - - def run_sync_query(self, stmt): - query = self.client.run_sync_query(stmt) - query.use_legacy_sql = False - query.run() - # run_sync_query is not really synchronous: there's a timeout - while not query.job.done(): - query.job.reload() - time.sleep(0.1) - return query - - class BigQueryDatabase(Database): pass @@ -219,7 +183,7 @@ class BigQueryDatabase(Database): bigquery_param = Dispatcher('bigquery_param') -@bigquery_param.register(ir.StructScalar, collections.OrderedDict) +@bigquery_param.register(ir.StructScalar, OrderedDict) def bq_param_struct(param, value): field_params = [bigquery_param(param[k], v) for k, v in value.items()] return bq.StructQueryParameter(param.get_name(), *field_params) @@ -243,7 +207,7 @@ def bq_param_array(param, value): six.string_types + (datetime.datetime, datetime.date) ) def bq_param_timestamp(param, value): - assert isinstance(param.type(), dt.Timestamp) + assert isinstance(param.type(), dt.Timestamp), str(param.type()) # TODO(phillipc): Not sure if this is the correct way to do this. timestamp_value = pd.Timestamp(value, tz='UTC').to_pydatetime() @@ -290,35 +254,51 @@ class BigQueryTable(ops.DatabaseTable): pass +def rename_partitioned_column(table_expr, bq_table): + partition_info = bq_table._properties.get('timePartitioning', None) + + # If we don't have any partiton information, the table isn't partitioned + if partition_info is None: + return table_expr + + # If we have a partition, but no "field" field in the table properties, + # then use NATIVE_PARTITION_COL as the default + partition_field = partition_info.get('field', NATIVE_PARTITION_COL) + + # The partition field must be in table_expr columns + assert partition_field in table_expr.columns + + # User configured partition column name default + col = ibis.options.bigquery.partition_col + + # No renaming if the config option is set to None + if col is None: + return table_expr + return table_expr.relabel({partition_field: col}) + + class BigQueryClient(SQLClient): sync_query = BigQueryQuery database_class = BigQueryDatabase - proxy_class = BigQueryAPIProxy - dialect = comp.BigQueryDialect table_class = BigQueryTable + dialect = comp.BigQueryDialect def __init__(self, project_id, dataset_id): - self._proxy = type(self).proxy_class(project_id) - self._dataset_id = dataset_id + self.client = bq.Client(project_id) + self.dataset_id = dataset_id @property def project_id(self): - return self._proxy.project_id + return self.client.project_id - @property - def dataset_id(self): - return self._dataset_id - - def table(self, *args, **kwargs): - t = super(BigQueryClient, self).table(*args, **kwargs) - if NATIVE_PARTITION_COL in t.columns: - col = ibis.options.bigquery.partition_col - assert col not in t - return (t - .mutate(**{col: t[NATIVE_PARTITION_COL]}) - .drop([NATIVE_PARTITION_COL])) - return t + def table(self, name, database=None): + t = super(BigQueryClient, self).table(name, database=database) + table_id, dataset_id = _ensure_split(name, database or self.dataset_id) + dataset_ref = self.client.dataset(dataset_id) + table_ref = dataset_ref.table(name) + bq_table = self.client.get_table(table_ref) + return rename_partitioned_column(t, bq_table) def _build_ast(self, expr, context): result = comp.build_ast(expr, context) @@ -339,109 +319,61 @@ def _get_table_schema(self, qualified_name): def _execute(self, stmt, results=True, query_parameters=None): # TODO(phillipc): Allow **kwargs in calls to execute - query = self._proxy.client.run_sync_query(stmt) - query.use_legacy_sql = False - query.query_parameters = query_parameters or [] - query.run() - - # run_sync_query is not really synchronous: there's a timeout - while not query.job.done(): - query.job.reload() - time.sleep(0.1) - + job_config = bq.job.QueryJobConfig() + job_config.query_parameters = query_parameters or [] + job_config.use_legacy_sql = False # False by default in >=0.28 + query = self.client.query(stmt, job_config=job_config) + query.result() # blocks until finished return BigQueryCursor(query) def database(self, name=None): - if name is None: - name = self.dataset_id - return self.database_class(name, self) + return self.database_class(name or self.dataset_id, self) @property def current_database(self): return self.database(self.dataset_id) def set_database(self, name): - self._dataset_id = name + self.dataset_id = name def exists_database(self, name): - return self._proxy.get_dataset(name).exists() + return name in self.list_databases() def list_databases(self, like=None): - results = [dataset.name - for dataset in self._proxy.get_datasets()] + results = [ + dataset.dataset_id for dataset in self.client.list_datasets() + ] if like: results = [ dataset_name for dataset_name in results - if re.match(like, dataset_name) + if re.match(like, dataset_name) is not None ] return results def exists_table(self, name, database=None): - (table_id, dataset_id) = _ensure_split(name, database) - return self._proxy.get_table(table_id, dataset_id).exists() + table_id, dataset_id = _ensure_split(name, database) + return table_id in self.list_tables( + database=dataset_id or self.dataset_id + ) def list_tables(self, like=None, database=None): - dataset = self._proxy.get_dataset(database or self.dataset_id) - result = [table.name for table in dataset.list_tables()] + dataset_ref = self.client.dataset(database or self.dataset_id) + result = [ + table.table_id for table in self.client.list_tables(dataset_ref) + ] if like: result = [ table_name for table_name in result - if re.match(like, table_name) + if re.match(like, table_name) is not None ] return result def get_schema(self, name, database=None): - (table_id, dataset_id) = _ensure_split(name, database) - bq_table = self._proxy.get_table(table_id, dataset_id) + table_id, dataset_id = _ensure_split(name, database) + table_ref = self.client.dataset(dataset_id).table(table_id) + bq_table = self.client.get_table(table_ref) return sch.infer(bq_table) @property def version(self): return parse_version(bq.__version__) - - -_DTYPE_TO_IBIS_TYPE = { - 'INT64': dt.int64, - 'FLOAT64': dt.double, - 'BOOL': dt.boolean, - 'STRING': dt.string, - 'DATE': dt.date, - # FIXME: enforce no tz info - 'DATETIME': dt.timestamp, - 'TIME': dt.time, - 'TIMESTAMP': dt.timestamp, - 'BYTES': dt.binary, -} - - -_LEGACY_TO_STANDARD = { - 'INTEGER': 'INT64', - 'FLOAT': 'FLOAT64', - 'BOOLEAN': 'BOOL', -} - - -def _discover_type(field): - typ = field.field_type - if typ == 'RECORD': - fields = field.fields - assert fields - names = [el.name for el in fields] - ibis_types = [_discover_type(el) for el in fields] - ibis_type = dt.Struct(names, ibis_types) - else: - ibis_type = _LEGACY_TO_STANDARD.get(typ, typ) - ibis_type = _DTYPE_TO_IBIS_TYPE.get(ibis_type, ibis_type) - if field.mode == 'REPEATED': - ibis_type = dt.Array(ibis_type) - return ibis_type - - -def bigquery_table_to_ibis_schema(table): - pairs = [(el.name, _discover_type(el)) for el in table.schema] - try: - if table.list_partitions(): - pairs.append((NATIVE_PARTITION_COL, dt.timestamp)) - except BadRequest: - pass - return ibis.schema(pairs) diff --git a/ibis/bigquery/tests/conftest.py b/ibis/bigquery/tests/conftest.py index da2744f01cdf..a202338d7e9d 100644 --- a/ibis/bigquery/tests/conftest.py +++ b/ibis/bigquery/tests/conftest.py @@ -5,7 +5,7 @@ import ibis -PROJECT_ID = os.environ.get('GOOGLE_BIGQUERY_PROJECT_ID') +PROJECT_ID = os.environ.get('GOOGLE_BIGQUERY_PROJECT_ID', 'ibis-gbq') DATASET_ID = 'testing' diff --git a/ibis/bigquery/tests/test_client.py b/ibis/bigquery/tests/test_client.py index 685f425ed486..e36c08ec29fd 100644 --- a/ibis/bigquery/tests/test_client.py +++ b/ibis/bigquery/tests/test_client.py @@ -46,7 +46,8 @@ def test_simple_aggregate_execute(alltypes, df): def test_list_tables(client): - assert set(client.list_tables(like='functional_alltypes')) == { + tables = client.list_tables(like='functional_alltypes') + assert set(tables) == { 'functional_alltypes', 'functional_alltypes_parted', } @@ -63,13 +64,6 @@ def test_database(client): assert database.list_tables() == client.list_tables() -def test_database_layer(client): - bq_dataset = client._proxy.get_dataset(client.dataset_id) - actual = client.list_tables() - expected = [el.name for el in bq_dataset.list_tables()] - assert sorted(actual) == sorted(expected) - - def test_compile_toplevel(): t = ibis.table([('foo', 'double')], name='t0') @@ -375,3 +369,31 @@ def test_scalar_param_partition_time(parted_alltypes): expr = t[t.PARTITIONTIME < param] df = expr.execute(params={param: '2017-01-01'}) assert df.empty + + +def test_exists_table(client): + assert client.exists_table('functional_alltypes') + assert not client.exists_table('footable') + + +def test_exists_database(client): + assert client.exists_database('testing') + assert not client.exists_database('foodataset') + + +@pytest.mark.parametrize('kind', ['date', 'timestamp']) +@pytest.mark.parametrize( + ('option', 'expected_fn'), + [ + (None, 'my_{}_parted_col'.format), + ('PARTITIONTIME', lambda kind: 'PARTITIONTIME'), + ('foo_bar', lambda kind: 'foo_bar'), + ] +) +def test_parted_column(client, kind, option, expected_fn): + table_name = '{}_column_parted'.format(kind) + option_key = 'bigquery.partition_col' + with ibis.config.option_context(option_key, option): + t = client.table(table_name) + expected_column = expected_fn(kind) + assert t.columns == [expected_column, 'string_col', 'int_col'] diff --git a/ibis/bigquery/udf/api.py b/ibis/bigquery/udf/api.py index 45cca2748eb8..cc8e04ba833d 100644 --- a/ibis/bigquery/udf/api.py +++ b/ibis/bigquery/udf/api.py @@ -137,7 +137,12 @@ def wrapper(f): (name, Arg(rlz.value(type))) for name, type in zip(sig.parameters.keys(), input_type) ] + [ - ('output_type', output_type.array_type), + ( + 'output_type', + lambda self, output_type=output_type: rlz.shape_like( + self.args, dtype=output_type + ) + ), ('__slots__', ('js',)), ]) udf_node = type(f.__name__, (BigQueryUDFNode,), udf_node_fields) diff --git a/ibis/bigquery/udf/tests/test_udf_execute.py b/ibis/bigquery/udf/tests/test_udf_execute.py index dd2e72440838..dd14286ea229 100644 --- a/ibis/bigquery/udf/tests/test_udf_execute.py +++ b/ibis/bigquery/udf/tests/test_udf_execute.py @@ -1,3 +1,5 @@ +import os + import pytest import pandas as pd @@ -12,10 +14,18 @@ from ibis.bigquery.api import udf # noqa: E402 +PROJECT_ID = os.environ.get('GOOGLE_BIGQUERY_PROJECT_ID', 'ibis-gbq') +DATASET_ID = 'testing' + @pytest.fixture(scope='module') def client(): - return ibis.bigquery.connect('ibis-gbq', 'testing') + ga = pytest.importorskip('google.auth') + + try: + return ibis.bigquery.connect(PROJECT_ID, DATASET_ID) + except ga.exceptions.DefaultCredentialsError: + pytest.skip("no credentials found, skipping") @pytest.fixture(scope='module') @@ -103,3 +113,26 @@ def times_two(x): result = expr.execute() expected = ((df.double_col + 1.0) * 2.0).rename('tmp') tm.assert_series_equal(result, expected) + + +def test_udf_scalar(client): + @udf([dt.double, dt.double], dt.double) + def my_add(x, y): + return x + y + + expr = my_add(1, 2) + sql = client.compile(expr) + assert sql == '''\ +CREATE TEMPORARY FUNCTION my_add(x FLOAT64, y FLOAT64) +RETURNS FLOAT64 +LANGUAGE js AS """ +'use strict'; +function my_add(x, y) { + return (x + y); +} +return my_add(x, y); +"""; + +SELECT my_add(1, 2) AS `tmp`''' + result = client.execute(expr) + assert result == 3 diff --git a/ibis/expr/schema.py b/ibis/expr/schema.py index 026ea9512f95..9e7121f37a8c 100644 --- a/ibis/expr/schema.py +++ b/ibis/expr/schema.py @@ -1,3 +1,4 @@ +import collections from multipledispatch import Dispatcher import ibis.common as com @@ -164,16 +165,16 @@ def identity(s): return s -@schema.register(dict) -def schema_from_dict(d): +@schema.register(collections.Mapping) +def schema_from_mapping(d): return Schema.from_dict(d) -@schema.register((tuple, list)) +@schema.register(collections.Iterable) def schema_from_pairs(lst): return Schema.from_tuples(lst) -@schema.register((tuple, list), (tuple, list)) +@schema.register(collections.Iterable, collections.Iterable) def schema_from_names_types(names, types): return Schema(names, types) diff --git a/setup.py b/setup.py index 468a0e571553..fbd044efaed3 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ kerberos_requires = ['requests-kerberos'] visualization_requires = ['graphviz'] clickhouse_requires = ['clickhouse-driver>=0.0.8'] -bigquery_requires = ['google-cloud-bigquery<0.28'] +bigquery_requires = ['google-cloud-bigquery>=1.0.0'] hdf5_requires = ['tables>=3.0.0'] parquet_requires = ['pyarrow>=0.6.0'] @@ -82,7 +82,6 @@ ], 'clickhouse:python_version == "3.4"': clickhouse_requires, 'bigquery': bigquery_requires, - 'bigquery:python_version < "3"': bigquery_requires + ['chainmap'], 'csv:python_version < "3"': ['pathlib2'], 'hdf5': hdf5_requires, 'hdf5:python_version < "3"': hdf5_requires + ['pathlib2'], From 650c57b9c5a98b0216274de21b9b86252a132425 Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Wed, 18 Apr 2018 20:51:43 -0400 Subject: [PATCH 2/9] Added dateadd/timestampad mapd operations. --- ibis/mapd/README.rst | 82 ++++++++++++++++++++++++++++++ ibis/mapd/compiler.py | 10 ++-- ibis/mapd/operations.py | 108 +++++++++++++++++++++++++++++----------- 3 files changed, 163 insertions(+), 37 deletions(-) diff --git a/ibis/mapd/README.rst b/ibis/mapd/README.rst index 3e75c6419ed7..d40ef33d266a 100644 --- a/ibis/mapd/README.rst +++ b/ibis/mapd/README.rst @@ -226,6 +226,88 @@ a set of reserved words from `MapD` language. `quote_identifiers` is used to put quotes around the string sent if the string match to specific criteria. +Timestamp/Date operations +------------------------- + +**Interval:** + +MapD Interval statement allow just the follow date/time attribute: YEAR, DAY, +MONTH, HOUR, MINUTE, SECOND + +To use the interval statement, it is necessary use a `integer literal/constant` +and use the `to_interval` method: + +.. code-block:: Python + + >>> t['arr_timestamp'] + ibis.literal(1).to_interval('Y') + +.. code-block:: Sql + + SELECT "arr_timestamp" + INTERVAL '1' YEAR AS tmp + FROM mapd.flights_2008_10k + + +**Extract date/time** + +To extract a date part information from a timestamp, `extract` would be used: + +.. code-block:: Python + + >>> t['arr_timestamp'].extract('YEAR') + +The `extract` method is just available on `ibis.mapd` backend. + +The operators allowed are: YEAR, QUARTER, MONTH, DAY, HOUR, MINUTE, SECOND, +DOW, ISODOW, DOY, EPOCH, QUARTERDAY, WEEK + +**Direct functions to extract date/time** + +There is some direct functions to extract date/time, the following shows how +to use that: + +.. code-block:: Python + + >>> t['arr_timestamp'].year() + >>> t['arr_timestamp'].month() + >>> t['arr_timestamp'].day() + >>> t['arr_timestamp'].hour() + >>> t['arr_timestamp'].minute() + >>> t['arr_timestamp'].second() + +The result should be: + +.. code-block:: Sql + + SELECT EXTRACT(YEAR FROM "arr_timestamp") AS tmp + FROM mapd.flights_2008_10k + + SELECT EXTRACT(MONTH FROM "arr_timestamp") AS tmp + FROM mapd.flights_2008_10k + + SELECT EXTRACT(DAY FROM "arr_timestamp") AS tmp + FROM mapd.flights_2008_10k + + SELECT EXTRACT(HOUR FROM "arr_timestamp") AS tmp + FROM mapd.flights_2008_10k + + SELECT EXTRACT(MINUTE FROM "arr_timestamp") AS tmp + FROM mapd.flights_2008_10k + + SELECT EXTRACT(SECOND FROM "arr_timestamp") AS tmp + FROM mapd.flights_2008_10k + +**Timestap/Date Truncate** + +A truncate timestamp/data value function is available as `truncate`: + +.. code-block:: Python + + >>> t['arr_timestamp'].truncate(date_part) + +The date part operators allowed are: YEAR or Y, QUARTER or Q, MONTH or M, +DAY or D, HOUR or h, MINUTE or m, SECOND or s, WEEK, MILLENNIUM, CENTURY, +DECADE, QUARTERDAY + Best practices -------------- diff --git a/ibis/mapd/compiler.py b/ibis/mapd/compiler.py index d96c2393f581..e501fb4eea51 100644 --- a/ibis/mapd/compiler.py +++ b/ibis/mapd/compiler.py @@ -1,14 +1,11 @@ from six import StringIO -from .operations import ( - _operation_registry, _name_expr -) from . import operations as mapd_ops import ibis.common as com import ibis.util as util import ibis.expr.operations as ops -import ibis.expr.types as ir import ibis.sql.compiler as compiles +import ibis.expr.types as ir def build_ast(expr, context): @@ -169,11 +166,11 @@ class MapDExprTranslator(compiles.ExprTranslator): """ """ - _registry = _operation_registry + _registry = mapd_ops._operation_registry context_class = MapDQueryContext def name(self, translated, name, force=True): - return _name_expr(translated, name) + return mapd_ops._name_expr(translated, name) class MapDDialect(compiles.Dialect): @@ -186,4 +183,3 @@ class MapDDialect(compiles.Dialect): dialect = MapDDialect compiles = MapDExprTranslator.compiles rewrites = MapDExprTranslator.rewrites - diff --git a/ibis/mapd/operations.py b/ibis/mapd/operations.py index ad205b01343c..3390c5b8a85b 100644 --- a/ibis/mapd/operations.py +++ b/ibis/mapd/operations.py @@ -77,33 +77,36 @@ def unary(func_name): return fixed_arity(func_name, 1) -def _reduction_format(translator, func_name, arg, args, where, distinct=False): +def _reduction_format( + translator, + func_name, + sql_func_name=None, + sql_signature='{}({})', + arg=None, args=None, where=None +): + if not sql_func_name: + sql_func_name = func_name + if where is not None: arg = where.ifelse(arg, ibis.NA) - distinct_ = '' if not distinct else 'DISTINCT ' - return '{}({}{})'.format( - func_name, - distinct_, + return sql_signature.format( + sql_func_name, ', '.join(map(translator.translate, [arg] + list(args))) ) -def _reduction(func_name, distinct=False): +def _reduction(func_name, sql_func_name=None, sql_signature='{}({})'): def formatter(translator, expr): op = expr.op() # HACK: support trailing arguments where = op.where args = [arg for arg in op.args if arg is not where] - distinct_ = distinct - - if hasattr(op, 'approx') and func_name == 'count' and distinct: - func_name == 'APPROX_COUNT_DISTINCT' - distinct_ = False return _reduction_format( - translator, func_name, args[0], args[1:], where, distinct_ + translator, func_name, sql_func_name, sql_signature, + args[0], args[1:], where ) formatter.__name__ = func_name @@ -122,7 +125,9 @@ def formatter(translator, expr): func_type = '' if not _is_floating(arg) else '_FLOAT' return _reduction_format( - translator, variants[how].upper() + func_type, arg, [], where + translator, variants[how].upper() + func_type, + None, '{}({})', + arg, [], where ) formatter.__name__ = func @@ -382,7 +387,22 @@ def _interval_from_integer(translator, expr): "MapD doesn't support subsecond interval resolutions") arg_ = translator.translate(arg) - return 'INTERVAL {} {}'.format(arg_, dtype.resolution.upper()) + return '{}, (sign){}'.format(dtype.resolution.upper(), arg_) + + +def _timestamp_op(func, op_sign='+'): + def _formatter(translator, expr): + op = expr.op() + left, right = op.args + formatted_left = translator.translate(left) + formatted_right = translator.translate(right) + + return '{}({}, {})'.format( + func, formatted_right.replace('(sign)', op_sign), + formatted_left + ) + + return _formatter def _set_literal_format(translator, expr): @@ -637,7 +657,7 @@ def _zero_if_null(translator, expr): # AGGREGATION -class ApproxCountDistinct(ops.CountDistinct): +class CountDistinct(ops.CountDistinct): """ Returns the approximate count of distinct values of x with defined expected error rate e @@ -874,20 +894,50 @@ class TimestampAdd(ops.TimestampUnaryOp): TimestampExtract: timestamp_binary_infix_op('EXTRACT', 'FROM'), - ops.DateAdd: binary_infix_op('+'), - ops.DateSub: binary_infix_op('-'), - ops.DateDiff: binary_infix_op('-'), - ops.TimestampAdd: binary_infix_op('+'), - ops.TimestampSub: binary_infix_op('-'), - ops.TimestampDiff: binary_infix_op('-'), + ops.IntervalAdd: _interval_from_integer, + ops.IntervalFromInteger: _interval_from_integer, + + ops.DateAdd: _timestamp_op('DATEADD'), + ops.DateSub: _timestamp_op('DATEADD', '-'), + ops.DateDiff: _timestamp_op('DATEDIFF'), + ops.TimestampAdd: _timestamp_op('TIMESTAMPADD'), + ops.TimestampSub: _timestamp_op('TIMESTAMPADD', '-'), + ops.TimestampDiff: _timestamp_op('TIMESTAMPDIFF'), ops.TimestampFromUNIX: _timestamp_from_unix, - TimestampAdd: lambda field, value, unit: 'TIMESTAMPADD({}, {}, {}) '.format(v, u) + TimestampAdd: ( + lambda field, value, unit: + 'TIMESTAMPADD({}, {}, {}) '.format(value, unit) + ) } + +class ApproxCountDistinct(ops.Reduction): + """Approximate number of unique values + + """ + arg = ops.Arg(rlz.column(rlz.any)) + approx = ops.Arg(rlz.integer, default=1) + where = ops.Arg(rlz.boolean, default=None) + + def output_type(self): + # Impala 2.0 and higher returns a DOUBLE + # return ir.DoubleScalar + return ops.partial(ir.IntegerScalar, dtype=ops.dt.int64) + + +approx_count_distinct = _reduction( + 'approx_nunique', + sql_func_name='approx_count_distinct', + sql_signature='{}({})' +) + +count_distinct = _reduction('count', sql_signature='{}(DISTINCT {})') +count = _reduction('count') + _agg_ops = { - ops.Count: _reduction('count'), - ops.CountDistinct: _reduction('count', distinct=True), - ApproxCountDistinct: _reduction('count', distinct=True), + ops.Count: count, + ops.CountDistinct: count_distinct, + ApproxCountDistinct: approx_count_distinct, ops.Mean: _reduction('avg'), ops.Max: _reduction('max'), ops.Min: _reduction('min'), @@ -966,14 +1016,14 @@ def f(_klass): assign_functions_to_dtype(ir.NumericValue, _trigonometric_ops, forced=True) assign_functions_to_dtype(ir.NumericValue, _math_ops, forced=True) assign_functions_to_dtype(ir.NumericValue, _geometric_ops, forced=True) -assign_functions_to_dtype(ir.NumericValue, _stats_ops, forced=True) -assign_functions_to_dtype(ir.NumericValue, _agg_ops, forced=True) - +assign_functions_to_dtype(ir.NumericValue, _stats_ops, forced=False) +assign_functions_to_dtype(ir.ColumnExpr, _agg_ops, forced=True) # string operations assign_functions_to_dtype(ir.StringValue, _string_ops, forced=True) # date/time/timestamp operations assign_functions_to_dtype(ir.TimestampColumn, _date_ops, forced=True) +assign_functions_to_dtype(ir.DateColumn, _date_ops, forced=True) # assign_functions_to_dtype(ir.DateColumn, _date_ops, forced=True) _add_method(ir.TimestampColumn, TimestampTruncate, 'truncate') @@ -981,5 +1031,3 @@ def f(_klass): _add_method(ir.TimestampColumn, TimestampExtract, 'extract') # _add_method(ir.DateColumn, TimestampExtract, 'extract') - -year = ibis.api._timedelta('MY_YEAR', 'Y') From 716494e4777da55702f29aa1ae18ce81d0f775ae Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Wed, 18 Apr 2018 22:33:15 -0400 Subject: [PATCH 3/9] Added byte_length operation --- ibis/mapd/README.rst | 37 +++++++++++++++++++++++++++++++++++++ ibis/mapd/operations.py | 37 +++++++++++++++++++++++++++++-------- 2 files changed, 66 insertions(+), 8 deletions(-) diff --git a/ibis/mapd/README.rst b/ibis/mapd/README.rst index d40ef33d266a..98249de6dd83 100644 --- a/ibis/mapd/README.rst +++ b/ibis/mapd/README.rst @@ -308,6 +308,43 @@ The date part operators allowed are: YEAR or Y, QUARTER or Q, MONTH or M, DAY or D, HOUR or h, MINUTE or m, SECOND or s, WEEK, MILLENNIUM, CENTURY, DECADE, QUARTERDAY +String operations +----------------- + +- `byte_length` is not part of `ibis` `string` operations, it will work just +for `mapd` backend. + +`Not` operation can be done using `~` operator: + +.. code-block:: Python + + >>> ~t['dest_name'].like('L%') + +`regexp` and `regexp_like` operations can be done using `re_search` operation: + +.. code-block:: Python + + >>> t['dest_name'].re_search('L%') + + +Aggregate operations +==================== + +count column +t['taxiin'].count() + +distinct count column +t['taxiin'].distinct().count() + + +distinct count/nunique +t['taxiin'].nunique().name('v') + + +approx distinct count +t['taxiin'].approx_nunique(10) + + Best practices -------------- diff --git a/ibis/mapd/operations.py b/ibis/mapd/operations.py index 3390c5b8a85b..181afdbda609 100644 --- a/ibis/mapd/operations.py +++ b/ibis/mapd/operations.py @@ -134,6 +134,17 @@ def formatter(translator, expr): return formatter +def unary_prefix_op(prefix_op): + def formatter(translator, expr): + op = expr.op() + arg = _parenthesize(translator, op.args[0]) + + return '{0!s} {1!s}'.format(prefix_op.upper(), arg) + + formatter.__name__ = prefix_op + return formatter + + def binary_infix_op(infix_sym): def formatter(translator, expr): op = expr.op() @@ -248,12 +259,15 @@ def compile_cov(translator, expr): ) -def compile_char_length(translator, expr): - # pull out the arguments to the expression - arg = expr.op().args[0] - # compile the argument - compiled_arg = translator.translate(arg) - return 'CHAR_LENGTH({})'.format(compiled_arg) +def compile_length(func_name='length', sql_func_name='CHAR_LENGTH'): + def _compile_lenght(translator, expr): + # pull out the arguments to the expression + arg = expr.op().args[0] + # compile the argument + compiled_arg = translator.translate(arg) + return '{}({})'.format(sql_func_name, compiled_arg) + _compile_lenght.__name__ = func_name + return _compile_lenght def _xor(translator, expr): @@ -839,8 +853,14 @@ class DateTruncate(ops.DateTruncate): Conv_4326_900913_Y: unary('conv_4326_900913_y') } + +class ByteLength(ops.StringLength): + """Returns the length of a string in bytes length""" + + _string_ops = { - ops.StringLength: unary('char_length'), + ops.StringLength: compile_length(), + ByteLength: compile_length('byte_length', 'LENGTH'), ops.RegexSearch: binary_infix_op('REGEXP'), ops.StringSQLLike: binary_infix_op('like'), ops.StringSQLILike: binary_infix_op('ilike'), @@ -931,13 +951,14 @@ def output_type(self): sql_signature='{}({})' ) -count_distinct = _reduction('count', sql_signature='{}(DISTINCT {})') +count_distinct = _reduction('count') count = _reduction('count') _agg_ops = { ops.Count: count, ops.CountDistinct: count_distinct, ApproxCountDistinct: approx_count_distinct, + ops.DistinctColumn: unary_prefix_op('distinct'), ops.Mean: _reduction('avg'), ops.Max: _reduction('max'), ops.Min: _reduction('min'), From 228a8f5ca5a94a5442831c4a988294067b265d84 Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Fri, 20 Apr 2018 01:03:04 -0400 Subject: [PATCH 4/9] Added timestamp/date diff; improve timestamp/date add/sub; --- ibis/mapd/README.rst | 4 +- ibis/mapd/operations.py | 113 ++++++++++++++++++++++++++++------------ 2 files changed, 82 insertions(+), 35 deletions(-) diff --git a/ibis/mapd/README.rst b/ibis/mapd/README.rst index 98249de6dd83..7ca651f86bdb 100644 --- a/ibis/mapd/README.rst +++ b/ibis/mapd/README.rst @@ -243,9 +243,10 @@ and use the `to_interval` method: .. code-block:: Sql - SELECT "arr_timestamp" + INTERVAL '1' YEAR AS tmp + SELECT TIMESTAMPADD(YEAR, 1, "arr_timestamp") AS tmp FROM mapd.flights_2008_10k +Another way to use intervals is using `ibis.interval(years=1)` **Extract date/time** @@ -308,6 +309,7 @@ The date part operators allowed are: YEAR or Y, QUARTER or Q, MONTH or M, DAY or D, HOUR or h, MINUTE or m, SECOND or s, WEEK, MILLENNIUM, CENTURY, DECADE, QUARTERDAY + String operations ----------------- diff --git a/ibis/mapd/operations.py b/ibis/mapd/operations.py index 181afdbda609..cb402ffd4d54 100644 --- a/ibis/mapd/operations.py +++ b/ibis/mapd/operations.py @@ -158,24 +158,13 @@ def formatter(translator, expr): return formatter -def timestamp_binary_infix_op(func_name, infix_sym): +def timestamp_binary_infix_op(func_name, infix_sym, timestamp_code): def formatter(translator, expr): op = expr.op() arg, unit = op.args[0], op.args[1] arg_ = _parenthesize(translator, arg) - timestamp_code = { - 'Y': 'YEAR', - 'M': 'MONTH', - 'D': 'DAY', - 'W': 'WEEK', - 'Q': 'QUARTER', - 'h': 'HOUR', - 'm': 'MINUTE', - 's': 'SECOND', - } - if unit.upper() in [u.upper() for u in timestamp_code.keys()]: converter = timestamp_code[unit].upper() elif unit.upper() in [u.upper() for u in _timestamp_units.values()]: @@ -388,7 +377,7 @@ def _interval_format(translator, expr): raise com.UnsupportedOperationError( "MapD doesn't support subsecond interval resolutions") - return 'INTERVAL {} {}'.format(expr.op().value, dtype.resolution.upper()) + return '{1}, (sign){0}'.format(expr.op().value, dtype.resolution.upper()) def _interval_from_integer(translator, expr): @@ -408,17 +397,49 @@ def _timestamp_op(func, op_sign='+'): def _formatter(translator, expr): op = expr.op() left, right = op.args + formatted_left = translator.translate(left) formatted_right = translator.translate(right) + if isinstance(left, ir.DateValue): + formatted_left = 'CAST({} as timestamp)'.format(formatted_left) + return '{}({}, {})'.format( - func, formatted_right.replace('(sign)', op_sign), + func, + formatted_right.replace('(sign)', op_sign), formatted_left ) return _formatter +def _timestamp_diff(sql_func, timestamp_code): + def _formatter(translator, expr): + op = expr.op() + left, right, unit = op.args + + formatted_left = translator.translate(left) + formatted_right = translator.translate(right) + + formatted_unit = timestamp_code[unit] + + if isinstance(left, ir.DateValue): + formatted_left = 'CAST({} as timestamp)'.format(formatted_left) + + if isinstance(right, ir.DateValue): + formatted_right = 'CAST({} as timestamp)'.format(formatted_right) + + return '{}({}, {}, {})'.format( + sql_func, + formatted_unit, + formatted_left, + formatted_right + ) + + _formatter.__name__ = 'diff' + return _formatter + + def _set_literal_format(translator, expr): value_type = expr.type().value_type @@ -758,6 +779,26 @@ class Conv_4326_900913_Y(ops.UnaryOp): )) +class TimestampDiff(ops.ValueOp): + left = ops.Arg(rlz.timestamp) + right = ops.Arg(rlz.timestamp) + unit = ops.Arg(rlz.isin(_timestamp_units)) + output_type = rlz.shape_like('left', ops.dt.int32) + + def __init__(self, left, right, unit): + super(TimestampDiff, self).__init__(left, right, unit) + + +class DateDiff(ops.ValueOp): + left = ops.Arg(rlz.date) + right = ops.Arg(rlz.date) + unit = ops.Arg(rlz.isin(_timestamp_units)) + output_type = rlz.shape_like('left', ops.dt.int32) + + def __init__(self, left, right, unit): + super(DateDiff, self).__init__(left, right, unit) + + class TimestampExtract(ops.TimestampUnaryOp): unit = ops.Arg(rlz.isin(_timestamp_units)) output_type = rlz.shape_like('arg', ops.dt.int32) @@ -884,19 +925,23 @@ class ByteLength(ops.StringLength): _interval_dateadd = [ 'YEAR', 'QUARTER', 'MONTH', 'DAYOFYEAR', 'DAY', 'WEEK', 'WEEKDAY', 'HOUR', - 'MINUTE', 'SECOND', 'MILLISECOND' + 'MINUTE', 'SECOND', 'MILLISECOND' ] _interval_datepart = [ 'YEAR', 'QUARTER', 'MONTH', 'DAYOFYEAR', 'DAY', 'WEEK', 'WEEKDAY', 'HOUR', - 'MINUTE', 'SECOND', 'MILLISECOND' + 'MINUTE', 'SECOND', 'MILLISECOND' ] - -class TimestampAdd(ops.TimestampUnaryOp): - """Truncates x to y decimal places""" - unit = ops.Arg(rlz.isin(_interval_datepart)) - output_type = rlz.shape_like('left', ops.dt.float) - +timestamp_code = { + 'Y': 'YEAR', + 'M': 'MONTH', + 'D': 'DAY', + 'W': 'WEEK', + 'Q': 'QUARTER', + 'h': 'HOUR', + 'm': 'MINUTE', + 's': 'SECOND', +} _date_ops = { ops.Date: unary('toDate'), @@ -912,22 +957,21 @@ class TimestampAdd(ops.TimestampUnaryOp): ops.ExtractMinute: _extract_field('MINUTE'), ops.ExtractSecond: _extract_field('SECOND'), - TimestampExtract: timestamp_binary_infix_op('EXTRACT', 'FROM'), + TimestampExtract: timestamp_binary_infix_op( + 'EXTRACT', 'FROM', timestamp_code=timestamp_code + ), ops.IntervalAdd: _interval_from_integer, ops.IntervalFromInteger: _interval_from_integer, - ops.DateAdd: _timestamp_op('DATEADD'), - ops.DateSub: _timestamp_op('DATEADD', '-'), - ops.DateDiff: _timestamp_op('DATEDIFF'), + ops.DateAdd: _timestamp_op('TIMESTAMPADD'), + ops.DateSub: _timestamp_op('TIMESTAMPADD', '-'), + # ops.DateDiff: _timestamp_op('DATEDIFF'), + TimestampDiff: _timestamp_diff('TIMESTAMPDIFF', timestamp_code), ops.TimestampAdd: _timestamp_op('TIMESTAMPADD'), ops.TimestampSub: _timestamp_op('TIMESTAMPADD', '-'), - ops.TimestampDiff: _timestamp_op('TIMESTAMPDIFF'), - ops.TimestampFromUNIX: _timestamp_from_unix, - TimestampAdd: ( - lambda field, value, unit: - 'TIMESTAMPADD({}, {}, {}) '.format(value, unit) - ) + DateDiff: _timestamp_diff('TIMESTAMPDIFF', timestamp_code), + # ops.TimestampDiff: _timestamp_op('TIMESTAMPDIFF', diff=True), } @@ -1026,7 +1070,9 @@ def f(_klass): Return a lambda function that return to_expr() result from the custom classes. """ - return lambda *args, **kwargs: _klass(*args, **kwargs).to_expr() + def _f(*args, **kwargs): + return _klass(*args, **kwargs).to_expr() + return _f # assign new function to the defined DataType setattr( dtype, func_name, f(klass) @@ -1045,7 +1091,6 @@ def f(_klass): # date/time/timestamp operations assign_functions_to_dtype(ir.TimestampColumn, _date_ops, forced=True) assign_functions_to_dtype(ir.DateColumn, _date_ops, forced=True) -# assign_functions_to_dtype(ir.DateColumn, _date_ops, forced=True) _add_method(ir.TimestampColumn, TimestampTruncate, 'truncate') _add_method(ir.DateColumn, DateTruncate, 'truncate') From ca6e240d79c135b7905f0f52dc8f3c2aabb9b243 Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Fri, 20 Apr 2018 11:04:13 -0400 Subject: [PATCH 5/9] Removed date/timestamp diff. It should use diff between extracted date part. --- ibis/mapd/operations.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/ibis/mapd/operations.py b/ibis/mapd/operations.py index cb402ffd4d54..3b663ba8fd09 100644 --- a/ibis/mapd/operations.py +++ b/ibis/mapd/operations.py @@ -779,26 +779,6 @@ class Conv_4326_900913_Y(ops.UnaryOp): )) -class TimestampDiff(ops.ValueOp): - left = ops.Arg(rlz.timestamp) - right = ops.Arg(rlz.timestamp) - unit = ops.Arg(rlz.isin(_timestamp_units)) - output_type = rlz.shape_like('left', ops.dt.int32) - - def __init__(self, left, right, unit): - super(TimestampDiff, self).__init__(left, right, unit) - - -class DateDiff(ops.ValueOp): - left = ops.Arg(rlz.date) - right = ops.Arg(rlz.date) - unit = ops.Arg(rlz.isin(_timestamp_units)) - output_type = rlz.shape_like('left', ops.dt.int32) - - def __init__(self, left, right, unit): - super(DateDiff, self).__init__(left, right, unit) - - class TimestampExtract(ops.TimestampUnaryOp): unit = ops.Arg(rlz.isin(_timestamp_units)) output_type = rlz.shape_like('arg', ops.dt.int32) @@ -966,12 +946,8 @@ class ByteLength(ops.StringLength): ops.DateAdd: _timestamp_op('TIMESTAMPADD'), ops.DateSub: _timestamp_op('TIMESTAMPADD', '-'), - # ops.DateDiff: _timestamp_op('DATEDIFF'), - TimestampDiff: _timestamp_diff('TIMESTAMPDIFF', timestamp_code), ops.TimestampAdd: _timestamp_op('TIMESTAMPADD'), ops.TimestampSub: _timestamp_op('TIMESTAMPADD', '-'), - DateDiff: _timestamp_diff('TIMESTAMPDIFF', timestamp_code), - # ops.TimestampDiff: _timestamp_op('TIMESTAMPDIFF', diff=True), } From 383ddd7e70293be1ed74a5c1f871a6011652ac6a Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Fri, 20 Apr 2018 12:18:25 -0400 Subject: [PATCH 6/9] Resolves #1431 (ibis) --- ibis/expr/api.py | 8 ++++++++ ibis/expr/operations.py | 12 ++++++++++++ ibis/mapd/operations.py | 17 ++--------------- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/ibis/expr/api.py b/ibis/expr/api.py index 30873eec34d9..599bc733c3a7 100644 --- a/ibis/expr/api.py +++ b/ibis/expr/api.py @@ -1295,11 +1295,15 @@ def _integer_to_interval(arg, unit='s'): abs = _unary_op('abs', ops.Abs) ceil = _unary_op('ceil', ops.Ceil) +degrees = _unary_op('degrees', ops.Degrees) +deg2rad = _unary_op('radians', ops.Radians) exp = _unary_op('exp', ops.Exp) floor = _unary_op('floor', ops.Floor) log2 = _unary_op('log2', ops.Log2) log10 = _unary_op('log10', ops.Log10) ln = _unary_op('ln', ops.Ln) +radians = _unary_op('radians', ops.Radians) +rad2deg = _unary_op('degrees', ops.Degrees) sign = _unary_op('sign', ops.Sign) sqrt = _unary_op('sqrt', ops.Sqrt) @@ -1318,7 +1322,11 @@ def _integer_to_interval(arg, unit='s'): __neg__=negate, abs=abs, ceil=ceil, + degrees=degrees, + deg2rad=deg2rad, floor=floor, + radians=radians, + rad2deg=rad2deg, sign=sign, exp=exp, sqrt=sqrt, diff --git a/ibis/expr/operations.py b/ibis/expr/operations.py index 9f774934c77c..da51114062fd 100644 --- a/ibis/expr/operations.py +++ b/ibis/expr/operations.py @@ -521,6 +521,18 @@ class Log10(Logarithm): """Logarithm base 10""" +class Degrees(UnaryOp): + """Converts radians to degrees""" + arg = Arg(rlz.floating) + output_type = rlz.shape_like('arg', dt.float) + + +class Radians(UnaryOp): + """Converts radians to degrees""" + arg = Arg(rlz.floating) + output_type = rlz.shape_like('arg', dt.float) + + # TRIGONOMETRIC OPERATIONS class TrigonometricUnary(UnaryOp): diff --git a/ibis/mapd/operations.py b/ibis/mapd/operations.py index 3b663ba8fd09..3e66684d2691 100644 --- a/ibis/mapd/operations.py +++ b/ibis/mapd/operations.py @@ -701,25 +701,12 @@ class CountDistinct(ops.CountDistinct): # MATH - -class Degrees(ops.UnaryOp): - """Converts radians to degrees""" - arg = ops.Arg(rlz.floating) - output_type = rlz.shape_like('arg', ops.dt.float) - - class Log(ops.Ln): """ """ -class Radians(ops.UnaryOp): - """Converts radians to degrees""" - arg = ops.Arg(rlz.floating) - output_type = rlz.shape_like('arg', ops.dt.float) - - class Truncate(ops.NumericBinaryOp): """Truncates x to y decimal places""" output_type = rlz.shape_like('left', ops.dt.float) @@ -833,7 +820,7 @@ class DateTruncate(ops.DateTruncate): _math_ops = { ops.Abs: unary('abs'), ops.Ceil: unary('ceil'), - Degrees: unary('degrees'), # MapD function + ops.Degrees: unary('degrees'), # MapD function ops.Exp: unary('exp'), ops.Floor: unary('floor'), Log: unary('log'), # MapD Log wrap to IBIS Ln @@ -841,7 +828,7 @@ class DateTruncate(ops.DateTruncate): ops.Log10: unary('log10'), ops.Modulus: fixed_arity('mod', 2), ops.Pi: fixed_arity('pi', 0), - Radians: unary('radians'), + ops.Radians: unary('radians'), ops.Round: _round, ops.Sign: _sign, ops.Sqrt: unary('sqrt'), From efe358761d42b708a58f20e8f369dbc9327a989f Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Fri, 20 Apr 2018 15:35:28 -0400 Subject: [PATCH 7/9] ENH: Enable cross project queries Closes #1427 cc @tswast Author: Phillip Cloud Closes #1428 from cpcloud/cross-project and squashes the following commits: f15ab250 [Phillip Cloud] Fix test 5306e876 [Phillip Cloud] Docs 5b693b3b [Phillip Cloud] ENH: Add cross-project queries --- docs/source/getting-started.rst | 21 ++++ docs/source/release.rst | 2 + ibis/bigquery/client.py | 146 ++++++++++++++++++++------- ibis/bigquery/compiler.py | 17 +++- ibis/bigquery/tests/test_client.py | 116 ++++++++++++++++++++- ibis/bigquery/tests/test_compiler.py | 8 +- 6 files changed, 269 insertions(+), 41 deletions(-) diff --git a/docs/source/getting-started.rst b/docs/source/getting-started.rst index 615b7c2bdca9..4a12829d1c5c 100644 --- a/docs/source/getting-started.rst +++ b/docs/source/getting-started.rst @@ -149,6 +149,27 @@ with: >>> con = ibis.bigquery.connect(project_id='ibis-gbq', dataset_id='testing') +By default ibis assumes that the BigQuery project that's billed for queries is +also the project where the data lives. + +However, it's very easy to query data that does **not** live in the billing +project. + +.. note:: + + When you run queries against data from other projects **the billing project + will still be billed for any and all queries**. + +If you want to query data that lives in a different project than the billing +project you can use the :meth:`~ibis.bigquery.client.BigQueryClient.database` +method of :class:`~ibis.bigquery.client.BigQueryClient` objects: + +.. code-block:: python + + >>> db = con.database('other-data-project.other-dataset') + >>> t = db.my_awesome_table + >>> t.sweet_column.sum().execute() # runs against the billing project + Learning resources ------------------ diff --git a/docs/source/release.rst b/docs/source/release.rst index c7b25d453fe0..833cf5781d29 100644 --- a/docs/source/release.rst +++ b/docs/source/release.rst @@ -25,6 +25,8 @@ New Features * Add support for ``UNION`` in the BigQuery backend (:issue:`1408`, :issue:`1409`) * Support for writing UDFs in BigQuery (:issue:`1377`). See :ref:`the BigQuery UDF docs ` for more details. +* Support for cross-project expressions in the BigQuery backend. + (:issue:`1428`) Bug Fixes ~~~~~~~~~ diff --git a/ibis/bigquery/client.py b/ibis/bigquery/client.py index a49ea4efbcd4..f4cce9a90185 100644 --- a/ibis/bigquery/client.py +++ b/ibis/bigquery/client.py @@ -7,6 +7,8 @@ import six import pandas as pd + +from google.api_core.exceptions import NotFound import google.cloud.bigquery as bq from multipledispatch import Dispatcher @@ -26,19 +28,6 @@ NATIVE_PARTITION_COL = '_PARTITIONTIME' -def _ensure_split(table_id, dataset_id): - split = table_id.split('.') - if len(split) > 1: - assert len(split) == 2, \ - "Found more than 1 '.' character in BigQuery table_id" - if dataset_id: - raise ValueError( - "Can't pass a fully qualified table name AND a dataset_id" - ) - dataset_id, table_id = split - return table_id, dataset_id - - _IBIS_TYPE_TO_DTYPE = { 'string': 'STRING', 'int64': 'INT64', @@ -277,6 +266,53 @@ def rename_partitioned_column(table_expr, bq_table): return table_expr.relabel({partition_field: col}) +def parse_project_and_dataset(project, dataset): + """Figure out the project id under which queries will run versus the + project of where the data live as well as what dataset to use. + + Parameters + ---------- + project : str + A project name + dataset : str + A ``.`` string or just a dataset name + + Returns + ------- + data_project, billing_project, dataset : str, str, str + + Examples + -------- + >>> data_project, billing_project, dataset = parse_project_and_dataset( + ... 'ibis-gbq', + ... 'foo-bar.my_dataset' + ... ) + >>> data_project + 'foo-bar' + >>> billing_project + 'ibis-gbq' + >>> dataset + 'my_dataset' + >>> data_project, billing_project, dataset = parse_project_and_dataset( + ... 'ibis-gbq', + ... 'my_dataset' + ... ) + >>> data_project + 'ibis-gbq' + >>> billing_project + 'ibis-gbq' + >>> dataset + 'my_dataset' + """ + try: + data_project, dataset = dataset.split('.') + except ValueError: + billing_project = data_project = project + else: + billing_project = project + return data_project, billing_project, dataset + + class BigQueryClient(SQLClient): sync_query = BigQueryQuery @@ -285,17 +321,38 @@ class BigQueryClient(SQLClient): dialect = comp.BigQueryDialect def __init__(self, project_id, dataset_id): - self.client = bq.Client(project_id) - self.dataset_id = dataset_id + """ + Parameters + ---------- + project_id : str + A project name + dataset_id : str + A ``.`` string or just a dataset name + """ + (self.data_project, + self.billing_project, + self.dataset) = parse_project_and_dataset(project_id, dataset_id) + self.client = bq.Client(project=self.data_project) + + def _parse_project_and_dataset(self, dataset): + project, _, dataset = parse_project_and_dataset( + self.billing_project, + dataset or '{}.{}'.format(self.data_project, self.dataset), + ) + return project, dataset @property def project_id(self): - return self.client.project_id + return self.data_project + + @property + def dataset_id(self): + return self.dataset def table(self, name, database=None): t = super(BigQueryClient, self).table(name, database=database) - table_id, dataset_id = _ensure_split(name, database or self.dataset_id) - dataset_ref = self.client.dataset(dataset_id) + project, dataset, name = t.op().name.split('.') + dataset_ref = self.client.dataset(dataset, project=project) table_ref = dataset_ref.table(name) bq_table = self.client.get_table(table_ref) return rename_partitioned_column(t, bq_table) @@ -305,39 +362,53 @@ def _build_ast(self, expr, context): return result def _execute_query(self, dml, async=False): + if async: + raise NotImplementedError( + 'Asynchronous queries not implemented in the BigQuery backend' + ) klass = self.async_query if async else self.sync_query inst = klass(self, dml, query_parameters=dml.context.params) df = inst.execute() return df def _fully_qualified_name(self, name, database): - dataset_id = database or self.dataset_id - return dataset_id + '.' + name + project, dataset = self._parse_project_and_dataset(database) + return '{}.{}.{}'.format(project, dataset, name) def _get_table_schema(self, qualified_name): - return self.get_schema(qualified_name) + dataset, table = qualified_name.rsplit('.', 1) + return self.get_schema(table, database=dataset) def _execute(self, stmt, results=True, query_parameters=None): - # TODO(phillipc): Allow **kwargs in calls to execute job_config = bq.job.QueryJobConfig() job_config.query_parameters = query_parameters or [] job_config.use_legacy_sql = False # False by default in >=0.28 - query = self.client.query(stmt, job_config=job_config) + query = self.client.query( + stmt, job_config=job_config, project=self.billing_project + ) query.result() # blocks until finished return BigQueryCursor(query) def database(self, name=None): - return self.database_class(name or self.dataset_id, self) + return self.database_class(name or self.dataset, self) @property def current_database(self): - return self.database(self.dataset_id) + return self.database(self.dataset) def set_database(self, name): - self.dataset_id = name + self.data_project, self.dataset = self._parse_project_and_dataset(name) def exists_database(self, name): - return name in self.list_databases() + project, dataset = self._parse_project_and_dataset(name) + client = self.client + dataset_ref = client.dataset(dataset, project=project) + try: + client.get_dataset(dataset_ref) + except NotFound: + return False + else: + return True def list_databases(self, like=None): results = [ @@ -351,13 +422,20 @@ def list_databases(self, like=None): return results def exists_table(self, name, database=None): - table_id, dataset_id = _ensure_split(name, database) - return table_id in self.list_tables( - database=dataset_id or self.dataset_id - ) + project, dataset = self._parse_project_and_dataset(database) + client = self.client + dataset_ref = self.client.dataset(dataset, project=project) + table_ref = dataset_ref.table(name) + try: + client.get_table(table_ref) + except NotFound: + return False + else: + return True def list_tables(self, like=None, database=None): - dataset_ref = self.client.dataset(database or self.dataset_id) + project, dataset = self._parse_project_and_dataset(database) + dataset_ref = self.client.dataset(dataset, project=project) result = [ table.table_id for table in self.client.list_tables(dataset_ref) ] @@ -369,8 +447,8 @@ def list_tables(self, like=None, database=None): return result def get_schema(self, name, database=None): - table_id, dataset_id = _ensure_split(name, database) - table_ref = self.client.dataset(dataset_id).table(table_id) + project, dataset = self._parse_project_and_dataset(database) + table_ref = self.client.dataset(dataset, project=project).table(name) bq_table = self.client.get_table(table_ref) return sch.infer(bq_table) diff --git a/ibis/bigquery/compiler.py b/ibis/bigquery/compiler.py index 457cc1e06b57..a8a1f9ab9512 100644 --- a/ibis/bigquery/compiler.py +++ b/ibis/bigquery/compiler.py @@ -1,5 +1,7 @@ from functools import partial +import regex as re + import six from multipledispatch import Dispatcher @@ -16,7 +18,9 @@ import ibis.expr.operations as ops import ibis.expr.lineage as lin -from ibis.impala.compiler import ImpalaSelect, unary, fixed_arity +from ibis.impala.compiler import ( + ImpalaSelect, unary, fixed_arity, ImpalaTableSetFormatter +) from ibis.impala import compiler as impala_compiler from ibis.bigquery.datatypes import ibis_type_to_bigquery_type @@ -473,10 +477,21 @@ def bigquery_rewrite_notall(expr): return (1 - arg.cast(dt.int64)).sum() != 0 +class BigQueryTableSetFormatter(ImpalaTableSetFormatter): + def _quote_identifier(self, name): + if re.match(r'^[A-Za-z][A-Za-z_0-9]*$', name): + return name + return '`{}`'.format(name) + + class BigQuerySelect(ImpalaSelect): translator = BigQueryExprTranslator + @property + def table_set_formatter(self): + return BigQueryTableSetFormatter + @rewrites(ops.IdenticalTo) def identical_to(expr): diff --git a/ibis/bigquery/tests/test_client.py b/ibis/bigquery/tests/test_client.py index e36c08ec29fd..a51ab9050ed3 100644 --- a/ibis/bigquery/tests/test_client.py +++ b/ibis/bigquery/tests/test_client.py @@ -16,6 +16,7 @@ pytestmark = pytest.mark.bigquery pytest.importorskip('google.cloud.bigquery') +exceptions = pytest.importorskip('google.api_core.exceptions') def test_table(alltypes): @@ -204,7 +205,7 @@ def test_subquery_scalar_params(alltypes): SELECT `string_col`, sum(`float_col`) AS `foo` FROM ( SELECT `float_col`, `timestamp_col`, `int_col`, `string_col` - FROM testing.functional_alltypes + FROM `ibis-gbq.testing.functional_alltypes` WHERE `timestamp_col` < @my_param ) t1 GROUP BY 1 @@ -360,7 +361,7 @@ def test_scalar_param_scope(alltypes): mut = t.mutate(param=param).compile(params={param: '2017-01-01'}) assert mut == """\ SELECT *, @param AS `param` -FROM testing.functional_alltypes""" +FROM `ibis-gbq.testing.functional_alltypes`""" def test_scalar_param_partition_time(parted_alltypes): @@ -397,3 +398,114 @@ def test_parted_column(client, kind, option, expected_fn): t = client.table(table_name) expected_column = expected_fn(kind) assert t.columns == [expected_column, 'string_col', 'int_col'] + + +def test_cross_project_query(): + con = ibis.bigquery.connect( + project_id='ibis-gbq', + dataset_id='bigquery-public-data.stackoverflow') + table = con.table('posts_questions') + expr = table[table.tags.contains('ibis')][['title', 'tags']] + result = expr.compile() + expected = """\ +SELECT `title`, `tags` +FROM ( + SELECT * + FROM `bigquery-public-data.stackoverflow.posts_questions` + WHERE STRPOS(`tags`, 'ibis') - 1 >= 0 +) t0""" + assert result == expected + n = 5 + df = expr.limit(n).execute() + assert len(df) == n + assert list(df.columns) == ['title', 'tags'] + assert df.title.dtype == np.object + assert df.tags.dtype == np.object + + +def test_set_database(): + con = ibis.bigquery.connect(project_id='ibis-gbq', dataset_id='testing') + con.set_database('bigquery-public-data.epa_historical_air_quality') + tables = con.list_tables() + assert 'co_daily_summary' in tables + + +def test_exists_table_different_project(client): + name = 'co_daily_summary' + database = 'bigquery-public-data.epa_historical_air_quality' + assert client.exists_table(name, database=database) + assert not client.exists_table('foobar', database=database) + + +def test_exists_table_different_project_fully_qualified(client): + # TODO(phillipc): Should we raise instead? + name = 'bigquery-public-data.epa_historical_air_quality.co_daily_summary' + with pytest.raises(exceptions.BadRequest): + client.exists_table(name) + + +@pytest.mark.parametrize( + ('name', 'expected'), + [ + ('bigquery-public-data.epa_historical_air_quality', True), + ('bigquery-foo-bar-project.baz_dataset', False), + ] +) +def test_exists_database_different_project(client, name, expected): + assert client.exists_database(name) is expected + + +def test_repeated_project_name(): + con = ibis.bigquery.connect( + project_id='ibis-gbq', dataset_id='ibis-gbq.testing') + assert 'functional_alltypes' in con.list_tables() + + +@pytest.mark.xfail(raises=NotImplementedError, reason='async not implemented') +def test_async(client): + expr = ibis.literal(1) + result = client.execute(expr, async=True) + assert result.get_result() == 1 + + +def test_multiple_project_queries(client): + so = client.table( + 'posts_questions', database='bigquery-public-data.stackoverflow') + trips = client.table('trips', database='nyc-tlc.yellow') + join = so.join(trips, so.tags == trips.rate_code)[[so.title]] + result = join.compile() + expected = """\ +SELECT t0.`title` +FROM `bigquery-public-data.stackoverflow.posts_questions` t0 + INNER JOIN `nyc-tlc.yellow.trips` t1 + ON t0.`tags` = t1.`rate_code`""" + assert result == expected + + +def test_multiple_project_queries_database_api(client): + stackoverflow = client.database('bigquery-public-data.stackoverflow') + posts_questions = stackoverflow.posts_questions + yellow = client.database('nyc-tlc.yellow') + trips = yellow.trips + predicate = posts_questions.tags == trips.rate_code + join = posts_questions.join(trips, predicate)[[posts_questions.title]] + result = join.compile() + expected = """\ +SELECT t0.`title` +FROM `bigquery-public-data.stackoverflow.posts_questions` t0 + INNER JOIN `nyc-tlc.yellow.trips` t1 + ON t0.`tags` = t1.`rate_code`""" + assert result == expected + + +def test_multiple_project_queries_execute(client): + stackoverflow = client.database('bigquery-public-data.stackoverflow') + posts_questions = stackoverflow.posts_questions.limit(5) + yellow = client.database('nyc-tlc.yellow') + trips = yellow.trips.limit(5) + predicate = posts_questions.tags == trips.rate_code + cols = [posts_questions.title] + join = posts_questions.left_join(trips, predicate)[cols] + result = join.execute() + assert list(result.columns) == ['title'] + assert len(result) == 5 diff --git a/ibis/bigquery/tests/test_compiler.py b/ibis/bigquery/tests/test_compiler.py index 9f3c4410afcd..3da33bfeb554 100644 --- a/ibis/bigquery/tests/test_compiler.py +++ b/ibis/bigquery/tests/test_compiler.py @@ -12,7 +12,7 @@ def test_timestamp_accepts_date_literals(alltypes): result = expr.compile(params=params) expected = """\ SELECT *, @param AS `param` -FROM testing.functional_alltypes""" +FROM `ibis-gbq.testing.functional_alltypes`""" assert result == expected @@ -28,10 +28,10 @@ def test_union(alltypes, distinct, expected_keyword): result = expr.compile() expected = """\ SELECT * -FROM testing.functional_alltypes +FROM `ibis-gbq.testing.functional_alltypes` UNION {} SELECT * -FROM testing.functional_alltypes""".format(expected_keyword) +FROM `ibis-gbq.testing.functional_alltypes`""".format(expected_keyword) assert result == expected @@ -40,5 +40,5 @@ def test_ieee_divide(alltypes): result = expr.compile() expected = """\ SELECT IEEE_DIVIDE(`double_col`, 0) AS `tmp` -FROM testing.functional_alltypes""" +FROM `ibis-gbq.testing.functional_alltypes`""" assert result == expected From 14062f2a0af668a0e583458a7d2897ad2efdeedd Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Fri, 20 Apr 2018 18:50:42 -0400 Subject: [PATCH 8/9] 1st refectoring. --- ibis/mapd/README.rst | 41 ++- ibis/mapd/compiler.py | 1 - ibis/mapd/operations.py | 565 +++++++--------------------------------- 3 files changed, 116 insertions(+), 491 deletions(-) diff --git a/ibis/mapd/README.rst b/ibis/mapd/README.rst index 7ca651f86bdb..2fa81128b784 100644 --- a/ibis/mapd/README.rst +++ b/ibis/mapd/README.rst @@ -157,8 +157,7 @@ steps: 1. create a new class 2. create a new function and assign it to a DataType -3. create a compiler function to this new function and assign it to the compiler - translator +3. create a compiler function to this new function and assign it to the compiler translator A new Class database function seems like this (`my_backend_operations.py`): @@ -332,19 +331,13 @@ for `mapd` backend. Aggregate operations ==================== -count column -t['taxiin'].count() +The aggregation operations available are: max, min, mean, count, distinct and count, nunique, approx_nunique. -distinct count column -t['taxiin'].distinct().count() +The follow examples show how to use count operations: - -distinct count/nunique -t['taxiin'].nunique().name('v') - - -approx distinct count -t['taxiin'].approx_nunique(10) +- get the row count of the table: `t['taxiin'].count()` +- get the distinct count of a field: `t['taxiin'].distinct().count()` or `t['taxiin'].nunique().name('v')` +- get the approximate distinct count of a field: `t['taxiin'].approx_nunique(10)` Best practices @@ -354,6 +347,28 @@ Best practices - Use `format` string function to format a string instead of `%` statement. +History +------- + +New implementations on `ibis` core: + +- Trigonometric operations (https://github.com/ibis-project/ibis/issues/893 ); +- Radians and Degrees operations (https://github.com/ibis-project/ibis/issues/1431 ); +- PI constant (https://github.com/ibis-project/ibis/issues/1418 ); +- Correlation and Covariation operations added (https://github.com/ibis-project/ibis/issues/1432 ); +- ILIKE operation (https://github.com/ibis-project/ibis/issues/1433 ); +- Distance operation (https://github.com/ibis-project/ibis/issues/1434 ); + +Issues appointed: + +- `Ibis` `CASE` statement wasn't allowing input and output with different types (https://github.com/ibis-project/ibis/issues/93 ) +- At this time, no all MapD `date parts` are available on `ibis` (https://github.com/ibis-project/ibis/issues/1430 ) + + +Pull Requests: + +- https://github.com/ibis-project/ibis/pull/1419 + References ---------- diff --git a/ibis/mapd/compiler.py b/ibis/mapd/compiler.py index e501fb4eea51..ad0c0066753d 100644 --- a/ibis/mapd/compiler.py +++ b/ibis/mapd/compiler.py @@ -5,7 +5,6 @@ import ibis.util as util import ibis.expr.operations as ops import ibis.sql.compiler as compiles -import ibis.expr.types as ir def build_ast(expr, context): diff --git a/ibis/mapd/operations.py b/ibis/mapd/operations.py index 3e66684d2691..ff5223554f07 100644 --- a/ibis/mapd/operations.py +++ b/ibis/mapd/operations.py @@ -1,6 +1,7 @@ from datetime import date, datetime from ibis.mapd.identifiers import quote_identifier from six import StringIO +from ibis.impala import compiler as impala_compiler import ibis import ibis.common as com @@ -8,7 +9,17 @@ import ibis.expr.rules as rlz import ibis.expr.types as ir import ibis.expr.operations as ops -import ibis.sql.transforms as transforms + +_mapd_unit_names = { + 'Y': 'YEAR', + 'M': 'MONTH', + 'D': 'DAY', + 'W': 'WEEK', + 'Q': 'QUARTER', + 'h': 'HOUR', + 'm': 'MINUTE', + 's': 'SECOND', +} def _is_floating(*args): @@ -29,26 +40,6 @@ def _cast(translator, expr): return 'CAST({0!s} AS {1!s})'.format(arg_, type_) -def _between(translator, expr): - op = expr.op() - arg_, lower_, upper_ = map(translator.translate, op.args) - return '{0!s} BETWEEN {1!s} AND {2!s}'.format(arg_, lower_, upper_) - - -def _negate(translator, expr): - arg = expr.op().args[0] - if isinstance(expr, ir.BooleanValue): - arg_ = translator.translate(arg) - return 'NOT {0!s}'.format(arg_) - else: - arg_ = _parenthesize(translator, arg) - return '-{0!s}'.format(arg_) - - -def _not(translator, expr): - return 'NOT {}'.format(*map(translator.translate, expr.op().args)) - - def _parenthesize(translator, expr): op = expr.op() op_klass = type(op) @@ -158,24 +149,24 @@ def formatter(translator, expr): return formatter -def timestamp_binary_infix_op(func_name, infix_sym, timestamp_code): - def formatter(translator, expr): - op = expr.op() - - arg, unit = op.args[0], op.args[1] - arg_ = _parenthesize(translator, arg) - - if unit.upper() in [u.upper() for u in timestamp_code.keys()]: - converter = timestamp_code[unit].upper() - elif unit.upper() in [u.upper() for u in _timestamp_units.values()]: - converter = unit.upper() - else: - raise ValueError('`{}` unit is not supported!'.format(unit)) - - return '{0!s}({1!s} {2!s} {3!s})'.format( - func_name, converter, infix_sym, arg_ - ) - return formatter +# def timestamp_binary_infix_op(func_name, infix_sym, timestamp_code): +# def formatter(translator, expr): +# op = expr.op() +# +# arg, unit = op.args[0], op.args[1] +# arg_ = _parenthesize(translator, arg) +# +# if unit.upper() in [u.upper() for u in ops._timestamp_units.keys()]: +# converter = ops._timestamp_units[unit].upper() +# elif unit.upper() in [u.upper() for u in ops._date_units.values()]: +# converter = ops._timestamp_units[unit].upper() +# else: +# raise ValueError('`{}` unit is not supported!'.format(unit)) +# +# return '{0!s}({1!s} {2!s} {3!s})'.format( +# func_name, converter, infix_sym, arg_ +# ) +# return formatter def _call(translator, func, *args): @@ -183,18 +174,6 @@ def _call(translator, func, *args): return '{0!s}({1!s})'.format(func, args_) -def _call_date_trunc(translator, func, *args): - args_ = ', '.join(map(translator.translate, args)) - return 'DATE_TRUNC({0!s}, {1!s})'.format(func, args_) - - -def _aggregate(translator, func, arg, where=None): - if where is not None: - return _call(translator, func + 'If', arg, where) - else: - return _call(translator, func, arg) - - def _extract_field(sql_attr): def extract_field_formatter(translator, expr): op = expr.op() @@ -203,20 +182,9 @@ def extract_field_formatter(translator, expr): return extract_field_formatter -def _general_date_field_add(data_type_name): - if data_type_name not in ('date', 'timestamp'): - raise NotImplemented('{} not implemented.'.format(data_type_name)) - - def __general_date_field_add(translator, expr): - op = expr.op() - self = translator.translate(op.args[0]) - value = translator.translate(op.args[1]) - return '{}({},{},{})'.format(data_type_name, self, value) - - return __general_date_field_add - +# STATS -def compile_corr(translator, expr): +def _corr(translator, expr): # pull out the arguments to the expression args = expr.op().args @@ -231,7 +199,7 @@ def compile_corr(translator, expr): return 'CORR{}({}, {})'.format(f_type, compiled_x, compiled_y) -def compile_cov(translator, expr): +def _cov(translator, expr): # pull out the arguments to the expression args = expr.op().args @@ -248,113 +216,23 @@ def compile_cov(translator, expr): ) -def compile_length(func_name='length', sql_func_name='CHAR_LENGTH'): - def _compile_lenght(translator, expr): +# String + +def _length(func_name='length', sql_func_name='CHAR_LENGTH'): + def __lenght(translator, expr): # pull out the arguments to the expression arg = expr.op().args[0] # compile the argument compiled_arg = translator.translate(arg) return '{}({})'.format(sql_func_name, compiled_arg) - _compile_lenght.__name__ = func_name - return _compile_lenght - - -def _xor(translator, expr): - op = expr.op() - left_ = _parenthesize(translator, op.left) - right_ = _parenthesize(translator, op.right) - return 'xor({0}, {1})'.format(left_, right_) + __lenght.__name__ = func_name + return __lenght def _name_expr(formatted_expr, quoted_name): return '{0!s} AS {1!s}'.format(formatted_expr, quoted_name) -def varargs(func_name): - def varargs_formatter(translator, expr): - op = expr.op() - return _call(translator, func_name, *op.arg) - return varargs_formatter - - -def _substring(translator, expr): - # arg_ is the formatted notation - op = expr.op() - arg, start, length = op.args - arg_, start_ = translator.translate(arg), translator.translate(start) - - # MapD is 1-indexed - if length is None or isinstance(length.op(), ops.Literal): - if length is not None: - length_ = length.op().value - return 'substring({0}, {1} + 1, {2})'.format(arg_, start_, length_) - else: - return 'substring({0}, {1} + 1)'.format(arg_, start_) - else: - length_ = translator.translate(length) - return 'substring({0}, {1} + 1, {2})'.format(arg_, start_, length_) - - -def _string_find(translator, expr): - op = expr.op() - arg, substr, start, _ = op.args - if start is not None: - raise com.UnsupportedOperationError( - "String find doesn't support start argument" - ) - - return _call(translator, 'position', arg, substr) + ' - 1' - - -def _regex_extract(translator, expr): - op = expr.op() - arg, pattern, index = op.args - arg_, pattern_ = translator.translate(arg), translator.translate(pattern) - - if index is not None: - index_ = translator.translate(index) - return 'extractAll({0}, {1})[{2} + 1]'.format(arg_, pattern_, index_) - - return 'extractAll({0}, {1})'.format(arg_, pattern_) - - -def _parse_url(translator, expr): - op = expr.op() - arg, extract, key = op.args - - if extract == 'HOST': - return _call(translator, 'domain', arg) - elif extract == 'PROTOCOL': - return _call(translator, 'protocol', arg) - elif extract == 'PATH': - return _call(translator, 'path', arg) - elif extract == 'QUERY': - if key is not None: - return _call(translator, 'extractURLParameter', arg, key) - else: - return _call(translator, 'queryString', arg) - else: - raise com.UnsupportedOperationError( - 'Parse url with extract {0} is not supported'.format(extract) - ) - - -def _index_of(translator, expr): - op = expr.op() - - arg, arr = op.args - arg_formatted = translator.translate(arg) - arr_formatted = ','.join(map(translator.translate, arr)) - return "indexOf([{0}], {1}) - 1".format(arr_formatted, arg_formatted) - - -def _sign(translator, expr): - """Workaround for missing sign function""" - op = expr.op() - arg, = op.args - return 'SIGN({})'.format(translator.translate(arg)) - - def _round(translator, expr): op = expr.op() arg, digits = op.args @@ -413,33 +291,6 @@ def _formatter(translator, expr): return _formatter -def _timestamp_diff(sql_func, timestamp_code): - def _formatter(translator, expr): - op = expr.op() - left, right, unit = op.args - - formatted_left = translator.translate(left) - formatted_right = translator.translate(right) - - formatted_unit = timestamp_code[unit] - - if isinstance(left, ir.DateValue): - formatted_left = 'CAST({} as timestamp)'.format(formatted_left) - - if isinstance(right, ir.DateValue): - formatted_right = 'CAST({} as timestamp)'.format(formatted_right) - - return '{}({}, {}, {})'.format( - sql_func, - formatted_unit, - formatted_left, - formatted_right - ) - - _formatter.__name__ = 'diff' - return _formatter - - def _set_literal_format(translator, expr): value_type = expr.type().value_type @@ -532,22 +383,6 @@ def _next_case(self): self.buf.write(' ') -def _simple_case(translator, expr): - op = expr.op() - formatter = CaseFormatter( - translator, op.base, op.cases, op.results, op.default - ) - return formatter.get_result() - - -def _searched_case(translator, expr): - op = expr.op() - formatter = CaseFormatter( - translator, None, op.cases, op.results, op.default - ) - return formatter.get_result() - - def _table_array_view(translator, expr): ctx = translator.context table = expr.op().table @@ -555,60 +390,18 @@ def _table_array_view(translator, expr): return '(\n{0}\n)'.format(util.indent(query, ctx.indent)) -def _timestamp_from_unix(translator, expr): +def _timestamp_truncate(translator, expr): op = expr.op() arg, unit = op.args - if unit in {'ms', 'us', 'ns'}: - raise ValueError('`{}` unit is not supported!'.format(unit)) - - return _call(translator, 'toDateTime', arg) - - -def date_truncate(translator, expr): - op = expr.op() - arg, unit = op.args - - timestamp_code = { - 'Y': 'YEAR', - 'M': 'MONTH', - 'D': 'DAY', - 'W': 'WEEK', - 'Q': 'QUARTER', - 'h': 'HOUR', - 'm': 'MINUTE', - 's': 'SECOND', - } - - if unit.upper() in [u.upper() for u in timestamp_code.keys()]: - converter = timestamp_code[unit].upper() - elif unit.upper() in [u.upper() for u in _timestamp_units.values()]: - converter = unit.upper() + if unit.upper() in [u.upper() for u in _mapd_unit_names.keys()]: + unit_ = _mapd_unit_names[unit].upper() else: raise ValueError('`{}` unit is not supported!'.format(unit)) - return _call_date_trunc(translator, converter, arg) - - -def _exists_subquery(translator, expr): - op = expr.op() - ctx = translator.context - - dummy = ir.literal(1).name(ir.unnamed) - - filtered = op.foreign_table.filter(op.predicates) - expr = filtered.projection([dummy]) - - subquery = ctx.get_compiled_expr(expr) - - if isinstance(op, transforms.ExistsSubquery): - key = 'EXISTS' - elif isinstance(op, transforms.NotExistsSubquery): - key = 'NOT EXISTS' - else: - raise NotImplementedError - - return '{0} (\n{1}\n)'.format(key, util.indent(subquery, ctx.indent)) + # return _call_date_trunc(translator, converter, arg) + arg_ = translator.translate(arg) + return 'DATE_TRUNC({0!s}, {1!s})'.format(unit_, arg_) def _table_column(translator, expr): @@ -633,71 +426,30 @@ def _table_column(translator, expr): return quoted_name -def _string_split(translator, expr): - value, sep = expr.op().args - return 'splitByString({}, {})'.format( - translator.translate(sep), - translator.translate(value) - ) - - -def _string_join(translator, expr): - sep, elements = expr.op().args - assert isinstance(elements.op(), ops.ValueList), \ - 'elements must be a ValueList, got {}'.format(type(elements.op())) - return 'arrayStringConcat([{}], {})'.format( - ', '.join(map(translator.translate, elements)), - translator.translate(sep), - ) - - -def _string_repeat(translator, expr): - value, times = expr.op().args - result = 'arrayStringConcat(arrayMap(x -> {}, range({})))'.format( - translator.translate(value), translator.translate(times) - ) - return result - - -def _string_like(translator, expr): - value, pattern = expr.op().args[:2] - return '{} LIKE {}'.format( - translator.translate(value), translator.translate(pattern) - ) - - -def raise_error(translator, expr, *args): - msg = "MapD backend doesn't support {0} operation!" - op = expr.op() - raise com.UnsupportedOperationError(msg.format(type(op))) - - -def _null_literal(translator, expr): - return 'Null' - +# AGGREGATION -def _null_if_zero(translator, expr): - op = expr.op() - arg = op.args[0] - arg_ = translator.translate(arg) - return 'nullIf({0}, 0)'.format(arg_) +class ApproxCountDistinct(ops.Reduction): + """Approximate number of unique values + """ + arg = ops.Arg(rlz.column(rlz.any)) + approx = ops.Arg(rlz.integer, default=1) + where = ops.Arg(rlz.boolean, default=None) -def _zero_if_null(translator, expr): - op = expr.op() - arg = op.args[0] - arg_ = translator.translate(arg) - return 'ifNull({0}, 0)'.format(arg_) + def output_type(self): + # Impala 2.0 and higher returns a DOUBLE + # return ir.DoubleScalar + return ops.partial(ir.IntegerScalar, dtype=ops.dt.int64) -# AGGREGATION +approx_count_distinct = _reduction( + 'approx_nunique', + sql_func_name='approx_count_distinct', + sql_signature='{}({})' +) -class CountDistinct(ops.CountDistinct): - """ - Returns the approximate count of distinct values of x with defined - expected error rate e - """ - approx = ops.Arg(rlz.integer) +count_distinct = _reduction('count') +count = _reduction('count') # MATH @@ -707,7 +459,7 @@ class Log(ops.Ln): """ -class Truncate(ops.NumericBinaryOp): +class NumericTruncate(ops.NumericBinaryOp): """Truncates x to y decimal places""" output_type = rlz.shape_like('left', ops.dt.float) @@ -741,106 +493,40 @@ class Conv_4326_900913_Y(ops.UnaryOp): output_type = rlz.shape_like('arg', ops.dt.float) -# DATE/TIME OPERATIONS - - -_timestamp_units = dict(ops._date_units) -_timestamp_units.update(ops._time_units) -_timestamp_units.update(dict( - millennium='MILLENNIUM', - MILLENNIUM='MILLENNIUM', - - century='CENTURY', - CENTURY='CENTURY', - - DECADE='DECADE', - decade='decade', +# String - quarterday='quarterday', - QUARTERDAY='QUARTERDAY', - - DOW='DOW', - ISODOW='DOW', - DOY='DOY', - EPOCH='EPOCH' -)) - - -class TimestampExtract(ops.TimestampUnaryOp): - unit = ops.Arg(rlz.isin(_timestamp_units)) - output_type = rlz.shape_like('arg', ops.dt.int32) - - -class TimestampTruncate(ops.TimestampTruncate): - unit = ops.Arg(rlz.isin(_timestamp_units)) - - -class DateTruncate(ops.DateTruncate): - unit = ops.Arg(rlz.isin(_timestamp_units)) +class ByteLength(ops.StringLength): + """Returns the length of a string in bytes length""" # https://www.mapd.com/docs/latest/mapd-core-guide/dml/ _binary_infix_ops = { # math - ops.Add: binary_infix_op('+'), - ops.Subtract: binary_infix_op('-'), - ops.Multiply: binary_infix_op('*'), - ops.Divide: binary_infix_op('/'), ops.Power: fixed_arity('power', 2), - # comparison - ops.Equals: binary_infix_op('='), - ops.NotEquals: binary_infix_op('<>'), - ops.GreaterEqual: binary_infix_op('>='), - ops.Greater: binary_infix_op('>'), - ops.LessEqual: binary_infix_op('<='), - ops.Less: binary_infix_op('<'), - # logical - ops.And: binary_infix_op('AND'), - ops.Or: binary_infix_op('OR'), } -_unary_ops = { - # logical - ops.Negate: _negate, - ops.Not: _not, -} +_unary_ops = {} # COMPARISON -_comparison_ops = { - ops.IsNull: unary('is null'), - ops.Between: _between, - ops.NullIf: fixed_arity('nullif', 2), - ops.NotNull: unary('is not null'), - ops.Contains: binary_infix_op('in'), - ops.NotContains: binary_infix_op('not in'), -} +_comparison_ops = {} # MATH _math_ops = { - ops.Abs: unary('abs'), - ops.Ceil: unary('ceil'), ops.Degrees: unary('degrees'), # MapD function - ops.Exp: unary('exp'), - ops.Floor: unary('floor'), - Log: unary('log'), # MapD Log wrap to IBIS Ln - ops.Ln: unary('ln'), - ops.Log10: unary('log10'), ops.Modulus: fixed_arity('mod', 2), ops.Pi: fixed_arity('pi', 0), ops.Radians: unary('radians'), ops.Round: _round, - ops.Sign: _sign, - ops.Sqrt: unary('sqrt'), - Truncate: fixed_arity('truncate', 2) + NumericTruncate: fixed_arity('truncate', 2) } # STATS _stats_ops = { - ops.Correlation: compile_corr, + ops.Correlation: _corr, ops.StandardDev: _variance_like('stddev'), ops.Variance: _variance_like('var'), - ops.Covariance: compile_cov, + ops.Covariance: _cov, } # TRIGONOMETRIC @@ -861,60 +547,16 @@ class DateTruncate(ops.DateTruncate): Conv_4326_900913_Y: unary('conv_4326_900913_y') } - -class ByteLength(ops.StringLength): - """Returns the length of a string in bytes length""" - - _string_ops = { - ops.StringLength: compile_length(), - ByteLength: compile_length('byte_length', 'LENGTH'), - ops.RegexSearch: binary_infix_op('REGEXP'), - ops.StringSQLLike: binary_infix_op('like'), + ops.StringLength: _length(), + ByteLength: _length('byte_length', 'LENGTH'), ops.StringSQLILike: binary_infix_op('ilike'), } -_date_part_truncate = [ - 'YEAR', 'QUARTER', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', - 'MILLENNIUM', 'CENTURY', 'DECADE', 'WEEK', 'QUARTERDAY' -] - -_date_part_extract = [ - 'YEAR', 'QUARTER', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', - 'DOW', 'ISODOW', 'DOY', 'EPOCH', 'QUARTERDAY', 'WEEK' - -] - -_date_part_datediff = [ - 'YEAR', 'QUARTER', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', - 'MILLENNIUM', 'CENTURY', 'DECADE', 'WEEK', 'QUARTERDAY' -] - -_interval_dateadd = [ - 'YEAR', 'QUARTER', 'MONTH', 'DAYOFYEAR', 'DAY', 'WEEK', 'WEEKDAY', 'HOUR', - 'MINUTE', 'SECOND', 'MILLISECOND' -] -_interval_datepart = [ - 'YEAR', 'QUARTER', 'MONTH', 'DAYOFYEAR', 'DAY', 'WEEK', 'WEEKDAY', 'HOUR', - 'MINUTE', 'SECOND', 'MILLISECOND' -] - -timestamp_code = { - 'Y': 'YEAR', - 'M': 'MONTH', - 'D': 'DAY', - 'W': 'WEEK', - 'Q': 'QUARTER', - 'h': 'HOUR', - 'm': 'MINUTE', - 's': 'SECOND', -} - _date_ops = { ops.Date: unary('toDate'), - DateTruncate: date_truncate, - ops.TimestampNow: fixed_arity('NOW', 0), - TimestampTruncate: date_truncate, + ops.DateTruncate: _timestamp_truncate, + ops.TimestampTruncate: _timestamp_truncate, # DIRECT EXTRACT OPERATIONS ops.ExtractYear: _extract_field('YEAR'), @@ -924,9 +566,9 @@ class ByteLength(ops.StringLength): ops.ExtractMinute: _extract_field('MINUTE'), ops.ExtractSecond: _extract_field('SECOND'), - TimestampExtract: timestamp_binary_infix_op( - 'EXTRACT', 'FROM', timestamp_code=timestamp_code - ), + # TimestampExtract: timestamp_binary_infix_op( + # 'EXTRACT', 'FROM', timestamp_code=timestamp_code + # ), ops.IntervalAdd: _interval_from_integer, ops.IntervalFromInteger: _interval_from_integer, @@ -938,38 +580,11 @@ class ByteLength(ops.StringLength): } -class ApproxCountDistinct(ops.Reduction): - """Approximate number of unique values - - """ - arg = ops.Arg(rlz.column(rlz.any)) - approx = ops.Arg(rlz.integer, default=1) - where = ops.Arg(rlz.boolean, default=None) - - def output_type(self): - # Impala 2.0 and higher returns a DOUBLE - # return ir.DoubleScalar - return ops.partial(ir.IntegerScalar, dtype=ops.dt.int64) - - -approx_count_distinct = _reduction( - 'approx_nunique', - sql_func_name='approx_count_distinct', - sql_signature='{}({})' -) - -count_distinct = _reduction('count') -count = _reduction('count') - _agg_ops = { - ops.Count: count, - ops.CountDistinct: count_distinct, + # ops.Count: count, + # ops.CountDistinct: count_distinct, ApproxCountDistinct: approx_count_distinct, ops.DistinctColumn: unary_prefix_op('distinct'), - ops.Mean: _reduction('avg'), - ops.Max: _reduction('max'), - ops.Min: _reduction('min'), - ops.Sum: _reduction('sum'), } _general_ops = { @@ -978,17 +593,13 @@ def output_type(self): ops.ValueList: _value_list, ops.Cast: _cast, ops.Where: fixed_arity('if', 3), - ops.SimpleCase: _simple_case, - ops.SearchedCase: _searched_case, ops.TableColumn: _table_column, - ops.TableArrayView: _table_array_view, - transforms.ExistsSubquery: _exists_subquery, - transforms.NotExistsSubquery: _exists_subquery, - ops.ArrayLength: unary('length'), - ops.Coalesce: varargs('coalesce'), + # ops.TableArrayView: _table_array_view, + # ops.ArrayLength: unary('length'), + # ops.Coalesce: varargs('coalesce'), } -_operation_registry = {} +_operation_registry = impala_compiler._operation_registry.copy() _operation_registry.update(_general_ops) _operation_registry.update(_binary_infix_ops) @@ -1055,8 +666,8 @@ def _f(*args, **kwargs): assign_functions_to_dtype(ir.TimestampColumn, _date_ops, forced=True) assign_functions_to_dtype(ir.DateColumn, _date_ops, forced=True) -_add_method(ir.TimestampColumn, TimestampTruncate, 'truncate') -_add_method(ir.DateColumn, DateTruncate, 'truncate') -_add_method(ir.TimestampColumn, TimestampExtract, 'extract') +# _add_method(ir.TimestampColumn, ops.TimestampTruncate, 'truncate') +# _add_method(ir.DateColumn, ops.DateTruncate, 'truncate') +# _add_method(ir.TimestampColumn, TimestampExtract, 'extract') # _add_method(ir.DateColumn, TimestampExtract, 'extract') From 834dcfc3ba4f1285e92faed39746cb230be919b0 Mon Sep 17 00:00:00 2001 From: Ivan Ogasawara Date: Sat, 21 Apr 2018 15:03:54 -0400 Subject: [PATCH 9/9] Added ci; docs; tests; --- ci/datamgr.py | 39 ++++++++ ci/docker-compose.yml | 18 +++- ci/requirements-dev-2.7.yml | 1 + ci/requirements-dev-3.5.yml | 1 + ci/requirements-dev-3.6.yml | 1 + ci/requirements-docs-3.6.yml | 1 + ci/schema/mapd.sql | 72 ++++++++++++++ docs/source/developer.rst | 19 ++++ ibis/expr/api.py | 10 +- ibis/expr/operations.py | 14 +++ ibis/mapd/api.py | 12 +-- ibis/mapd/client.py | 29 +++--- ibis/mapd/compiler.py | 10 ++ ibis/mapd/operations.py | 162 ++++++++++++------------------- ibis/mapd/tests/conftest.py | 7 +- ibis/mapd/tests/test_client.py | 68 +++++++++++-- ibis/mapd/tests/test_compiler.py | 2 - setup.py | 3 + 18 files changed, 340 insertions(+), 129 deletions(-) create mode 100644 ci/schema/mapd.sql delete mode 100644 ibis/mapd/tests/test_compiler.py diff --git a/ci/datamgr.py b/ci/datamgr.py index 470bd30403cb..b5178c5f37a6 100755 --- a/ci/datamgr.py +++ b/ci/datamgr.py @@ -7,6 +7,7 @@ import pandas as pd import sqlalchemy as sa +import pymapd from toolz import dissoc from plumbum import local @@ -188,6 +189,44 @@ def sqlite(database, schema, tables, data_directory, **params): insert_tables(engine, tables, data_directory) +@cli.command() +@click.option('-h', '--host', default='localhost') +@click.option('-P', '--port', default=9091, type=int) +@click.option('-u', '--user', default='mapd') +@click.option('-p', '--password', default='HyperInteractive') +@click.option('-D', '--database', default='mapd') +@click.option('-S', '--schema', type=click.File('rt'), + default=str(SCRIPT_DIR / 'schema' / 'mapd.sql')) +@click.option('-t', '--tables', multiple=True, default=TEST_TABLES) +@click.option('-d', '--data-directory', default=DATA_DIR) +def mapd(schema, tables, data_directory, **params): + data_directory = Path(data_directory) + click.echo('Initializing MapD...') + + # connection + conn = pymapd.connect( + host=params['host'], user=params['user'], + password=params['password'], + port=params['port'], dbname=params['database'] + ) + + # create database + for stmt in schema.read().split(';'): + stmt = stmt.strip() + if len(stmt): + conn.execute(stmt) + + # import data + query = 'COPY {} FROM \'{}\' WITH(delimiter=\',\', header=\'true\')' + + click.echo('Loading data ...') + for table in tables: + src = data_directory / '{}.csv'.format(table) + click.echo(src) + conn.execute(query.format(table, src)) + conn.close() + + @cli.command() @click.option('-h', '--host', default='localhost') @click.option('-P', '--port', default=3306, type=int) diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index fe42c6c0e36e..8f96752e8986 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -8,6 +8,16 @@ services: environment: POSTGRES_PASSWORD: postgres + mapd: + image: mapd/mapd-ce-cpu + ports: + - 9090-9092:9090-9092 + environment: + - MAPD_HOST=mapd + - MAPD_PORT=9091 + - MAPD_DATABASE=mapd + - MAPD_USER=mapd + mysql: image: mariadb:10.2 ports: @@ -52,7 +62,8 @@ services: waiter: image: jwilder/dockerize command: | - dockerize -wait tcp://mysql:3306 + dockerize -wait tcp://mapd:9091 + -wait tcp://mysql:3306 -wait tcp://postgres:5432 -wait tcp://impala:21050 -wait tcp://impala:50070 @@ -84,6 +95,11 @@ services: - IBIS_TEST_CLICKHOUSE_HOST=clickhouse - IBIS_TEST_CLICKHOUSE_PORT=9000 - IBIS_TEST_CLICKHOUSE_DATABASE=ibis_testing + - IBIS_TEST_MAPD_HOST=mapd + - IBIS_TEST_MAPD_PORT=9091 + - IBIS_TEST_MAPD_DATABASE=mapd + - IBIS_TEST_MAPD_USER=mapd + - IBIS_TEST_MAPD_PASSWORD=HyperInteractive - GOOGLE_BIGQUERY_PROJECT_ID=ibis-gbq - GOOGLE_APPLICATION_CREDENTIALS=/tmp/gcloud-service-key.json volumes: diff --git a/ci/requirements-dev-2.7.yml b/ci/requirements-dev-2.7.yml index 0defe3a496e2..e90a55a87095 100644 --- a/ci/requirements-dev-2.7.yml +++ b/ci/requirements-dev-2.7.yml @@ -23,6 +23,7 @@ dependencies: - plumbum - psycopg2 - pyarrow>=0.6.0 + - pymapd - pymysql - pytables - pytest diff --git a/ci/requirements-dev-3.5.yml b/ci/requirements-dev-3.5.yml index ce2af1155945..f85fa3885537 100644 --- a/ci/requirements-dev-3.5.yml +++ b/ci/requirements-dev-3.5.yml @@ -18,6 +18,7 @@ dependencies: - plumbum - psycopg2 - pyarrow>=0.6.0 + - pymapd - pymysql - pytest - python=3.5 diff --git a/ci/requirements-dev-3.6.yml b/ci/requirements-dev-3.6.yml index a59e656de849..c465737d1e94 100644 --- a/ci/requirements-dev-3.6.yml +++ b/ci/requirements-dev-3.6.yml @@ -18,6 +18,7 @@ dependencies: - plumbum - psycopg2 - pyarrow>=0.6.0 + - pymapd - pymysql - pytables - pytest diff --git a/ci/requirements-docs-3.6.yml b/ci/requirements-docs-3.6.yml index c51705ffdf31..78e84c451873 100644 --- a/ci/requirements-docs-3.6.yml +++ b/ci/requirements-docs-3.6.yml @@ -22,6 +22,7 @@ dependencies: - plumbum - psycopg2 - pyarrow>=0.6.0 + - pymapd - pymysql - pytables - pytest diff --git a/ci/schema/mapd.sql b/ci/schema/mapd.sql new file mode 100644 index 000000000000..8aed19ee34c1 --- /dev/null +++ b/ci/schema/mapd.sql @@ -0,0 +1,72 @@ +DROP TABLE IF EXISTS diamonds; + +CREATE TABLE diamonds ( + carat FLOAT, + cut TEXT, + color TEXT, + clarity TEXT, + depth FLOAT, + table_ FLOAT, + price BIGINT, + x FLOAT, + y FLOAT, + z FLOAT +); + +DROP TABLE IF EXISTS batting; + +CREATE TABLE batting ( + playerID VARCHAR(255), + yearID BIGINT, + stint BIGINT, + teamID VARCHAR(7), + lgID VARCHAR(7), + G BIGINT, + AB BIGINT, + R BIGINT, + H BIGINT, + X2B BIGINT, + X3B BIGINT, + HR BIGINT, + RBI BIGINT, + SB BIGINT, + CS BIGINT, + BB BIGINT, + SO BIGINT, + IBB BIGINT, + HBP BIGINT, + SH BIGINT, + SF BIGINT, + GIDP BIGINT +); + +DROP TABLE IF EXISTS awards_players; + +CREATE TABLE awards_players ( + playerID VARCHAR(255), + awardID VARCHAR(255), + yearID BIGINT, + lgID VARCHAR(7), + tie VARCHAR(7), + notes VARCHAR(255) +); + +DROP TABLE IF EXISTS functional_alltypes; + +CREATE TABLE functional_alltypes ( + index BIGINT, + Unnamed_ BIGINT, + id INTEGER, + bool_col BOOLEAN, + tinyint_col SMALLINT, + smallint_col SMALLINT, + int_col INTEGER, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + date_string_col TEXT, + string_col TEXT, + timestamp_col TIMESTAMP, + year_ INTEGER, + month_ INTEGER +); diff --git a/docs/source/developer.rst b/docs/source/developer.rst index ab4aaa9cdd24..64825ebe97bc 100644 --- a/docs/source/developer.rst +++ b/docs/source/developer.rst @@ -139,6 +139,25 @@ instructions above, then SQLite will be available in the conda environment. ci/datamgr.py sqlite +MapD +^^^^ + +MapD can be used from either a docker image or from your machine directly. + +#. **Start the MapD Server docker image in another terminal**: + + .. code:: sh + + # Keeping this running as long as you want to test ibis + docker run -d -v $HOME/mapd-docker-storage:/mapd-storage -p 9090-9092:9090-9092 mapd/mapd-ce-cpu + + +Here's how to load test data into MapD: + + .. code:: sh + + ci/datamgr.py mapd + Running Tests ------------- diff --git a/ibis/expr/api.py b/ibis/expr/api.py index 599bc733c3a7..b2000b1f3b42 100644 --- a/ibis/expr/api.py +++ b/ibis/expr/api.py @@ -76,7 +76,7 @@ 'negate', 'ifelse', 'Expr', 'Schema', 'window', 'trailing_window', 'cumulative_window', - 'pi' + 'pi', 'distance' ] @@ -3200,3 +3200,11 @@ def _table_drop(self, fields): _add_methods(ir.TableExpr, _table_methods) + + +# geometric operation + +def distance(from_lon, from_lat, to_lon, to_lat): + """ + """ + return ops.Distance(from_lon, from_lat, to_lon, to_lat).to_expr() diff --git a/ibis/expr/operations.py b/ibis/expr/operations.py index da51114062fd..e5a7d9abb583 100644 --- a/ibis/expr/operations.py +++ b/ibis/expr/operations.py @@ -2782,3 +2782,17 @@ def root_tables(self): def _make_expr(self): dtype = rlz.highest_precedence_dtype(self.values) return ir.ListExpr(self, dtype=dtype) + + +# GEOMETRIC OPERATIONS + +class Distance(ValueOp): + """ + Calculates distance in meters between two WGS-84 positions. + + """ + from_lon = Arg(rlz.column(rlz.numeric)) + from_lat = Arg(rlz.column(rlz.numeric)) + to_lon = Arg(rlz.column(rlz.numeric)) + to_lat = Arg(rlz.column(rlz.numeric)) + output_type = rlz.shape_like('from_lon', dt.float) diff --git a/ibis/mapd/api.py b/ibis/mapd/api.py index e42bb8ce355c..81505df4af4d 100644 --- a/ibis/mapd/api.py +++ b/ibis/mapd/api.py @@ -1,6 +1,6 @@ from ibis.config import options from ibis.mapd.compiler import dialect, compiles, rewrites -from ibis.mapd.client import MapDClient +from ibis.mapd.client import MapDClient, EXECUTION_TYPE_CURSOR import ibis.common as com @@ -31,9 +31,9 @@ def verify(expr, params=None): def connect( - uri: str=None, user: str=None, password: str=None, host: str=None, - port: int=9091, dbname: str=None, protocol: str='binary', - execution_type: int=3 + uri=None, user=None, password=None, host=None, + port=9091, database=None, protocol='binary', + execution_type=EXECUTION_TYPE_CURSOR ): """Create a MapDClient for use with Ibis @@ -44,7 +44,7 @@ def connect( :param password: str :param host: str :param port: int - :param dbname: str + :param database: str :param protocol: str :param execution_type: int Returns @@ -54,7 +54,7 @@ def connect( """ client = MapDClient( uri=uri, user=user, password=password, host=host, - port=port, dbname=dbname, protocol=protocol, + port=port, database=database, protocol=protocol, execution_type=execution_type ) diff --git a/ibis/mapd/client.py b/ibis/mapd/client.py index 8e00336296cd..5d7fb575579d 100644 --- a/ibis/mapd/client.py +++ b/ibis/mapd/client.py @@ -34,6 +34,7 @@ class MapDDataType(object): dtypes = { 'BIGINT': dt.int64, 'BOOLEAN': dt.Boolean, + 'BOOL': dt.Boolean, 'CHAR': dt.string, 'DATE': dt.date, 'DECIMAL': dt.float64, @@ -140,9 +141,9 @@ class MapDClient(SQLClient): dialect = MapDDialect def __init__( - self, uri: str=None, user: str=None, password: str=None, - host: str=None, port: int=9091, dbname: str=None, - protocol: str='binary', execution_type: int=3 + self, uri=None, user=None, password=None, + host=None, port=9091, database=None, + protocol='binary', execution_type=EXECUTION_TYPE_CURSOR ): """ @@ -153,9 +154,11 @@ def __init__( password : str host : str port : int - dbname : str + database : str protocol : {‘binary’, ‘http’, ‘https’} - execution_type : {1, 2, 3} + execution_type : { + EXECUTION_TYPE_ICP, EXECUTION_TYPE_ICP_GPU, EXECUTION_TYPE_CURSOR + } """ self.uri = uri @@ -163,7 +166,7 @@ def __init__( self.password = password self.host = host self.port = port - self.dbname = dbname + self.db_name = database self.protocol = protocol if execution_type not in ( @@ -177,7 +180,7 @@ def __init__( self.con = pymapd.connect( uri=uri, user=user, password=password, host=host, - port=port, dbname=dbname, protocol=protocol + port=port, dbname=database, protocol=protocol ) def log(self, msg): @@ -259,14 +262,14 @@ def database(self, name=None): client_class = type(self) new_client = client_class( uri=self.uri, user=self.user, password=self.password, - host=self.host, port=self.port, dbname=name, + host=self.host, port=self.port, database=name, protocol=self.protocol, execution_type=self.execution_type ) return self.database_class(name, new_client) @property def current_database(self): - return self.dbname + return self.db_name def set_database(self, name): raise NotImplementedError( @@ -280,7 +283,7 @@ def exists_database(self, name): def list_databases(self, like=None): raise NotImplementedError() - def exists_table(self, name: str, database: str=None): + def exists_table(self, name, database=None): """ Determine if the indicated table or view exists @@ -296,7 +299,11 @@ def exists_table(self, name: str, database: str=None): return bool(self.list_tables(like=name, database=database)) def list_tables(self, like=None, database=None): - return self.con.get_tables() + tables = self.con.get_tables() + + if like is None: + return tables + return list(filter(lambda t: t == like, tables)) def get_schema(self, table_name, database=None): """ diff --git a/ibis/mapd/compiler.py b/ibis/mapd/compiler.py index ad0c0066753d..b409a746c74d 100644 --- a/ibis/mapd/compiler.py +++ b/ibis/mapd/compiler.py @@ -8,11 +8,13 @@ def build_ast(expr, context): + assert context is not None, 'context is None' builder = MapDQueryBuilder(expr, context=context) return builder.get_result() def _get_query(expr, context): + assert context is not None, 'context is None' ast = build_ast(expr, context) query = ast.queries[0] @@ -20,6 +22,9 @@ def _get_query(expr, context): def to_sql(expr, context=None): + if context is None: + context = MapDDialect.make_context() + assert context is not None, 'context is None' query = _get_query(expr, context) return query.compile() @@ -47,7 +52,10 @@ class MapDQueryContext(compiles.QueryContext): """ """ + always_alias = False + def _to_sql(self, expr, ctx): + ctx.always_alias = False return to_sql(expr, context=ctx) @@ -182,3 +190,5 @@ class MapDDialect(compiles.Dialect): dialect = MapDDialect compiles = MapDExprTranslator.compiles rewrites = MapDExprTranslator.rewrites + +compiles(ops.Distance, mapd_ops.distance) diff --git a/ibis/mapd/operations.py b/ibis/mapd/operations.py index ff5223554f07..a7c165b506d2 100644 --- a/ibis/mapd/operations.py +++ b/ibis/mapd/operations.py @@ -22,6 +22,45 @@ } +def _add_method(dtype, klass, func_name): + """ + + :param dtype: + :param klass: + :param func_name: + :return: + """ + def f(_klass): + """ + Return a lambda function that return to_expr() result from the + custom classes. + """ + def _f(*args, **kwargs): + return _klass(*args, **kwargs).to_expr() + return _f + # assign new function to the defined DataType + setattr( + dtype, func_name, f(klass) + ) + + +def _add_methods(dtype, function_ops, forced=False): + """ + + :param dtype: + :param function_ops: dict + :param forced: + :return: + """ + for klass in function_ops.keys(): + # skip if the class is already in the ibis operations + if klass in ops.__dict__.values() and not forced: + continue + # assign new function to the defined DataType + func_name = _operation_registry[klass].__name__ + _add_method(dtype, klass, func_name) + + def _is_floating(*args): for arg in args: if isinstance(arg, ir.FloatingColumn): @@ -149,26 +188,6 @@ def formatter(translator, expr): return formatter -# def timestamp_binary_infix_op(func_name, infix_sym, timestamp_code): -# def formatter(translator, expr): -# op = expr.op() -# -# arg, unit = op.args[0], op.args[1] -# arg_ = _parenthesize(translator, arg) -# -# if unit.upper() in [u.upper() for u in ops._timestamp_units.keys()]: -# converter = ops._timestamp_units[unit].upper() -# elif unit.upper() in [u.upper() for u in ops._date_units.values()]: -# converter = ops._timestamp_units[unit].upper() -# else: -# raise ValueError('`{}` unit is not supported!'.format(unit)) -# -# return '{0!s}({1!s} {2!s} {3!s})'.format( -# func_name, converter, infix_sym, arg_ -# ) -# return formatter - - def _call(translator, func, *args): args_ = ', '.join(map(translator.translate, args)) return '{0!s}({1!s})'.format(func, args_) @@ -408,6 +427,7 @@ def _table_column(translator, expr): op = expr.op() field_name = op.name quoted_name = quote_identifier(field_name, force=True) + table = op.table ctx = translator.context @@ -417,11 +437,10 @@ def _table_column(translator, expr): proj_expr = table.projection([field_name]).to_array() return _table_array_view(translator, proj_expr) - # TODO(kszucs): table aliasing is partially supported - # if ctx.need_aliases(): - # alias = ctx.get_ref(table) - # if alias is not None: - # quoted_name = '{0}.{1}'.format(alias, quoted_name) + if ctx.need_aliases(): + alias = ctx.get_ref(table) + if alias is not None: + quoted_name = '{}.{}'.format(alias, quoted_name) return quoted_name @@ -452,6 +471,14 @@ def output_type(self): count = _reduction('count') +def distance(translator, expr): + op = expr.op() + values = map(translator.translate, op.args) + return 'DISTANCE_IN_METERS({0})'.format(', '.join(values)) + + +# classes + # MATH class Log(ops.Ln): """ @@ -466,18 +493,6 @@ class NumericTruncate(ops.NumericBinaryOp): # GEOMETRIC -class Distance_In_Meters(ops.ValueOp): - """ - Calculates distance in meters between two WGS-84 positions. - - """ - fromLon = ops.Arg(rlz.column(rlz.numeric)) - fromLat = ops.Arg(rlz.column(rlz.numeric)) - toLon = ops.Arg(rlz.column(rlz.numeric)) - toLat = ops.Arg(rlz.column(rlz.numeric)) - output_type = rlz.shape_like('fromLon', ops.dt.float) - - class Conv_4326_900913_X(ops.UnaryOp): """ Converts WGS-84 latitude to WGS-84 Web Mercator x coordinate. @@ -542,7 +557,6 @@ class ByteLength(ops.StringLength): } _geometric_ops = { - Distance_In_Meters: fixed_arity('distance_in_meters', 4), Conv_4326_900913_X: unary('conv_4326_900913_x'), Conv_4326_900913_Y: unary('conv_4326_900913_y') } @@ -566,10 +580,6 @@ class ByteLength(ops.StringLength): ops.ExtractMinute: _extract_field('MINUTE'), ops.ExtractSecond: _extract_field('SECOND'), - # TimestampExtract: timestamp_binary_infix_op( - # 'EXTRACT', 'FROM', timestamp_code=timestamp_code - # ), - ops.IntervalAdd: _interval_from_integer, ops.IntervalFromInteger: _interval_from_integer, @@ -581,22 +591,16 @@ class ByteLength(ops.StringLength): _agg_ops = { - # ops.Count: count, - # ops.CountDistinct: count_distinct, ApproxCountDistinct: approx_count_distinct, ops.DistinctColumn: unary_prefix_op('distinct'), } _general_ops = { - # Unary operations ops.Literal: literal, ops.ValueList: _value_list, ops.Cast: _cast, ops.Where: fixed_arity('if', 3), ops.TableColumn: _table_column, - # ops.TableArrayView: _table_array_view, - # ops.ArrayLength: unary('length'), - # ops.Coalesce: varargs('coalesce'), } _operation_registry = impala_compiler._operation_registry.copy() @@ -614,60 +618,16 @@ class ByteLength(ops.StringLength): _operation_registry.update(_agg_ops) -def assign_functions_to_dtype(dtype, function_ops, forced=False): - """ - - :param dtype: - :param function_ops: dict - :param forced: - :return: - """ - for klass in function_ops.keys(): - # skip if the class is already in the ibis operations - if klass in ops.__dict__.values() and not forced: - continue - # assign new function to the defined DataType - func_name = _operation_registry[klass].__name__ - _add_method(dtype, klass, func_name) - - -def _add_method(dtype, klass, func_name): - """ - - :param dtype: - :param klass: - :param func_name: - :return: - """ - def f(_klass): - """ - Return a lambda function that return to_expr() result from the - custom classes. - """ - def _f(*args, **kwargs): - return _klass(*args, **kwargs).to_expr() - return _f - # assign new function to the defined DataType - setattr( - dtype, func_name, f(klass) - ) - - # numeric operations -assign_functions_to_dtype(ir.NumericValue, _trigonometric_ops, forced=True) -assign_functions_to_dtype(ir.NumericValue, _math_ops, forced=True) -assign_functions_to_dtype(ir.NumericValue, _geometric_ops, forced=True) -assign_functions_to_dtype(ir.NumericValue, _stats_ops, forced=False) -assign_functions_to_dtype(ir.ColumnExpr, _agg_ops, forced=True) +_add_methods(ir.NumericValue, _trigonometric_ops, forced=True) +_add_methods(ir.NumericValue, _math_ops, forced=True) +_add_methods(ir.NumericValue, _geometric_ops, forced=True) +_add_methods(ir.NumericValue, _stats_ops, forced=False) +_add_methods(ir.ColumnExpr, _agg_ops, forced=True) + # string operations -assign_functions_to_dtype(ir.StringValue, _string_ops, forced=True) +_add_methods(ir.StringValue, _string_ops, forced=True) # date/time/timestamp operations -assign_functions_to_dtype(ir.TimestampColumn, _date_ops, forced=True) -assign_functions_to_dtype(ir.DateColumn, _date_ops, forced=True) - -# _add_method(ir.TimestampColumn, ops.TimestampTruncate, 'truncate') -# _add_method(ir.DateColumn, ops.DateTruncate, 'truncate') -# _add_method(ir.TimestampColumn, TimestampExtract, 'extract') -# _add_method(ir.DateColumn, TimestampExtract, 'extract') - +_add_methods(ir.TimestampColumn, _date_ops, forced=True) +_add_methods(ir.DateColumn, _date_ops, forced=True) diff --git a/ibis/mapd/tests/conftest.py b/ibis/mapd/tests/conftest.py index f603e48ad711..4db390f927ab 100644 --- a/ibis/mapd/tests/conftest.py +++ b/ibis/mapd/tests/conftest.py @@ -21,10 +21,15 @@ def con(): port=MAPD_PORT, user=MAPD_USER, password=MAPD_PASS, - dbname=MAPD_DB, + database=MAPD_DB, ) +@pytest.fixture(scope='module') +def alltypes(con): + return con.table('functional_alltypes') + + @pytest.fixture def translate(): """ diff --git a/ibis/mapd/tests/test_client.py b/ibis/mapd/tests/test_client.py index d1977af08896..5768e704b510 100644 --- a/ibis/mapd/tests/test_client.py +++ b/ibis/mapd/tests/test_client.py @@ -1,14 +1,70 @@ -import pytest +from ibis.tests.util import assert_equal + import ibis +import ibis.expr.types as ir +import os +import pandas as pd +import pytest pytestmark = pytest.mark.mapd pytest.importorskip('pymapd') -def test_literal_execute(client): - expected = '1234' - expr = ibis.literal(expected) - result = client.execute(expr) - assert result == expected +MAPD_TEST_DB = os.environ.get('IBIS_TEST_MAPD_DATABASE', 'mapd') +IBIS_MAPD_HOST = os.environ.get('IBIS_TEST_MAPD_HOST', 'localhost') +IBIS_MAPD_USER = os.environ.get('IBIS_TEST_MAPD_USER', 'mapd') +IBIS_MAPD_PASS = os.environ.get('IBIS_TEST_MAPD_PASSWORD', 'HyperInteractive') + + +def test_table(alltypes): + assert isinstance(alltypes, ir.TableExpr) + + +def test_array_execute(alltypes): + d = alltypes.limit(10).double_col + s = d.execute() + assert isinstance(s, pd.Series) + assert len(s) == 10 + + +def test_literal_execute(alltypes): + expr = alltypes[alltypes, ibis.literal('1234').name('lit')].limit(1) + result = expr.execute() + assert result.lit[0] == '1234' + + +def test_simple_aggregate_execute(alltypes): + d = alltypes.double_col.sum().name('sum1') + v = d.execute() + assert isinstance(v, float) + + +def test_list_tables(con): + assert len(con.list_tables()) > 0 + assert len(con.list_tables(like='functional_alltypes')) == 1 + + +def test_compile_verify(alltypes): + supported_expr = alltypes.double_col.sum() + assert supported_expr.verify() + + +def test_database_layer(con, alltypes): + db = con.database() + t = db.functional_alltypes + + assert_equal(t, alltypes) + + assert db.list_tables() == con.list_tables() + + +def test_compile_toplevel(): + t = ibis.table([('foo', 'double')], name='t0') + + # it works! + expr = t.foo.sum() + result = ibis.mapd.compile(expr) + expected = 'SELECT sum("foo") AS sum\nFROM t0' # noqa + assert str(result) == expected diff --git a/ibis/mapd/tests/test_compiler.py b/ibis/mapd/tests/test_compiler.py deleted file mode 100644 index d4b16ac55087..000000000000 --- a/ibis/mapd/tests/test_compiler.py +++ /dev/null @@ -1,2 +0,0 @@ -import ibis -import ibis.expr.datatypes as dt diff --git a/setup.py b/setup.py index fbd044efaed3..f26042dee081 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,7 @@ sqlite_requires = ['sqlalchemy>=1.0.0,<1.1.15'] postgres_requires = sqlite_requires + ['psycopg2'] mysql_requires = sqlite_requires + ['pymysql'] +mapd_requires = ['pymapd'] kerberos_requires = ['requests-kerberos'] visualization_requires = ['graphviz'] clickhouse_requires = ['clickhouse-driver>=0.0.8'] @@ -32,6 +33,7 @@ all_requires = ( impala_requires + postgres_requires + + mapd_requires + mysql_requires + kerberos_requires + visualization_requires + @@ -74,6 +76,7 @@ 'impala:python_version >= "3"': impala_requires, 'kerberos': kerberos_requires, 'postgres': postgres_requires, + 'mapd': mapd_requires, 'mysql': mysql_requires, 'sqlite': sqlite_requires, 'visualization': visualization_requires,