Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support all strategy for incremental #4

Merged
merged 1 commit into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions dbt/adapters/databend/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from databend_sqlalchemy import connector

from dbt.exceptions import (
RuntimeException,
Exception,
)

logger = AdapterLogger("databend")
Expand All @@ -35,7 +35,8 @@ class DatabendCredentials(Credentials):
database: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
schema: str = "default"
schema: Optional[str] = None
secure: Optional[bool] = None

# Add credentials members here, like:
# host: str
Expand All @@ -61,7 +62,7 @@ def __post_init__(self):
# databend classifies database and schema as the same thing
self.database = None
if self.database is not None and self.database != self.schema:
raise dbt.exceptions.RuntimeException(
raise dbt.exceptions.Exception(
f" schema: {self.schema} \n"
f" database: {self.database} \n"
f"On Databend, database must be omitted or have the same value as"
Expand Down Expand Up @@ -104,7 +105,7 @@ def exception_handler(self, sql: str):
logger.debug("Error running SQL: {}".format(sql))
logger.debug("Rolling back transaction.")
self.rollback_if_open()
raise dbt.exceptions.RuntimeException(str(e))
raise dbt.exceptions.Exception(str(e))

# except for DML statements where explicitly defined
def add_begin_query(self, *args, **kwargs):
Expand Down Expand Up @@ -141,9 +142,18 @@ def open(cls, connection):
# # user=credentials.username,
# # password=credentials.password,
# )
handle = connector.connect(
f"https://{credentials.username}:{credentials.password}@{credentials.host}:{credentials.port}?secure=true"
)
if credentials.secure is None:
credentials.secure = True

if credentials.secure:
handle = connector.connect(
f"https://{credentials.username}:{credentials.password}@{credentials.host}:{credentials.port}/{credentials.schema}?secure=true "
)
else:
handle = connector.connect(
f"http://{credentials.username}:{credentials.password}@{credentials.host}:{credentials.port}/{credentials.schema}?secure=false "
)

except Exception as e:
logger.debug("Error opening connection: {}".format(e))
connection.handle = None
Expand All @@ -158,7 +168,7 @@ def get_response(cls, _):
return "OK"

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[AdapterResponse, agate.Table]:
# don't apply the query comment here
# it will be applied after ';' queries are split
Expand All @@ -183,7 +193,7 @@ def add_query(self, sql, auto_begin=False, bindings=None, abridge_sql_log=False)
else:
conn_name = conn.name

raise RuntimeException(
raise Exception(
"Tried to run an empty query on model '{}'. If you are "
"conditionally running\nsql, eg. in a model hook, make "
"sure your `else` clause contains valid sql!\n\n"
Expand Down
16 changes: 8 additions & 8 deletions dbt/adapters/databend/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def convert_date_type(cls, agate_table: agate.Table, col_idx: int) -> str:

@classmethod
def convert_time_type(cls, agate_table: agate.Table, col_idx: int) -> str:
raise dbt.exceptions.NotImplementedException(
raise dbt.exceptions.DbtRuntimeError(
"`convert_time_type` is not implemented for this adapter!"
)

Expand All @@ -123,7 +123,7 @@ def list_relations_without_caching(
relations = []
for row in results:
if len(row) != 4:
raise dbt.exceptions.RuntimeException(
raise dbt.exceptions.DbtRuntimeError(
f"Invalid value from 'show table extended ...', "
f"got {len(row)} values, expected 4"
)
Expand Down Expand Up @@ -151,8 +151,8 @@ def _catalog_filter_table(
return table.where(_catalog_filter_schemas(manifest))

def get_relation(self, database: Optional[str], schema: str, identifier: str):
if not self.Relation.include_policy.database:
database = None
# if not self.Relation.include_policy.database:
# database = None

return super().get_relation(database, schema, identifier)

Expand Down Expand Up @@ -182,7 +182,7 @@ def get_columns_in_relation(
def get_catalog(self, manifest):
schema_map = self._get_catalog_schemas(manifest)
if len(schema_map) > 1:
dbt.exceptions.raise_compiler_error(
dbt.exceptions.DbtRuntimeError(
f"Expected only one database in get_catalog, found "
f"{list(schema_map)}"
)
Expand Down Expand Up @@ -211,7 +211,7 @@ def _get_one_catalog(
manifest: Manifest,
) -> agate.Table:
if len(schemas) != 1:
dbt.exceptions.raise_compiler_error(
dbt.exceptions.DbtRuntimeError(
f"Expected only one schema in databend _get_one_catalog, found {schemas}"
)

Expand All @@ -224,7 +224,7 @@ def update_column_sql(
clause: str,
where_clause: Optional[str] = None,
) -> str:
raise dbt.exceptions.NotImplementedException(
raise dbt.exceptions.DbtInternalError(
"`update_column_sql` is not implemented for this adapter!"
)

Expand Down Expand Up @@ -339,4 +339,4 @@ def default_python_submission_method(self):

@property
def python_submission_helpers(self):
raise NotImplementedError("python_submission_helpers is not specified")
raise NotImplementedError("python_submission_helpers is not specified")
37 changes: 19 additions & 18 deletions dbt/adapters/databend/relation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Optional, TypeVar, Any, Type, Dict, Union, Iterator, Tuple, Set
import dbt.exceptions
from dbt.adapters.base.relation import BaseRelation, Policy
Expand Down Expand Up @@ -27,13 +27,15 @@ class DatabendIncludePolicy(Policy):

@dataclass(frozen=True, eq=False, repr=False)
class DatabendRelation(BaseRelation):
quote_policy: DatabendQuotePolicy = DatabendQuotePolicy()
include_policy: DatabendIncludePolicy = DatabendIncludePolicy()
quote_policy: Policy = field(default_factory=lambda: DatabendQuotePolicy())
include_policy: DatabendIncludePolicy = field(
default_factory=lambda: DatabendIncludePolicy()
)
quote_character: str = ""

def __post_init__(self):
if self.database != self.schema and self.database:
raise dbt.exceptions.RuntimeException(
raise dbt.exceptions.DbtRuntimeError(
f" schema: {self.schema} \n"
f" database: {self.database} \n"
f"On Databend, database must be omitted or have the same value as"
Expand All @@ -42,14 +44,13 @@ def __post_init__(self):

@classmethod
def create(
cls: Type[Self],
database: Optional[str] = None,
schema: Optional[str] = None,
identifier: Optional[str] = None,
type: Optional[RelationType] = None,
**kwargs,
cls: Type[Self],
database: Optional[str] = None,
schema: Optional[str] = None,
identifier: Optional[str] = None,
type: Optional[RelationType] = None,
**kwargs,
) -> Self:
cls.database = None
database = None
kwargs.update(
{
Expand All @@ -65,15 +66,15 @@ def create(

def render(self):
if self.include_policy.database and self.include_policy.schema:
raise dbt.exceptions.RuntimeException(
raise dbt.exceptions.DbtRuntimeError(
"Got a databend relation with schema and database set to "
"include, but only one can be set"
)
return super().render()

@classmethod
def get_path(
cls, relation: BaseRelation, information_schema_view: Optional[str]
cls, relation: BaseRelation, information_schema_view: Optional[str]
) -> Path:
Path.database = None
return Path(
Expand All @@ -83,13 +84,13 @@ def get_path(
)

def matches(
self,
database: Optional[str] = None,
schema: Optional[str] = None,
identifier: Optional[str] = None,
self,
database: Optional[str] = None,
schema: Optional[str] = None,
identifier: Optional[str] = None,
):
if database:
raise dbt.exceptions.RuntimeException(
raise dbt.exceptions.DbtRuntimeError(
f"Passed unexpected schema value {schema} to Relation.matches"
)
return self.schema == schema and self.identifier == identifier
2 changes: 1 addition & 1 deletion dbt/include/databend/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: dbt_databend
version: 1.3.0
version: 1.5.0
config-version: 2

macro-paths: ["macros"]
107 changes: 1 addition & 106 deletions dbt/include/databend/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,60 +3,13 @@ postgres adapter macros: https://github.com/dbt-labs/dbt-core/blob/main/plugins/
dbt docs: https://docs.getdbt.com/docs/contributing/building-a-new-adapter
*/

-- {% macro databend__alter_column_type(relation,column_name,new_column_type) -%}
-- '''Changes column name or data type'''
-- /*
-- 1. Create a new column (w/ temp name and correct type)
-- 2. Copy data over to it
-- 3. Drop the existing column (cascade!)
-- 4. Rename the new column to existing column
-- */
-- {% endmacro %}

-- {% macro databend__check_schema_exists(information_schema,schema) -%}
-- '''Checks if schema name exists and returns number or times it shows up.'''
-- /*
-- 1. Check if schemas exist
-- 2. return number of rows or columns that match searched parameter
-- */
-- {% endmacro %}

-- Example from postgres adapter in dbt-core
-- Notice how you can build out other methods than the designated ones for the impl.py file,
-- to make a more robust adapter. ex. (verify_database)

/*

{% macro postgres__create_schema(relation) -%}
{% if relation.database -%}
{{ adapter.verify_database(relation.database) }}
{%- endif -%} {%- call statement('create_schema') -%}
create schema if not exists {{ relation.without_identifier().include(database=False) }}
{%- endcall -%}
{% endmacro %}

*/

{% macro databend__create_schema(relation) -%}
'''Creates a new schema in the target database, if schema already exists, method is a no-op. '''
{%- call statement('create_schema') -%}
create database if not exists {{ relation.without_identifier().include(database=False) }}
{% endcall %}
{% endmacro %}

/*

{% macro postgres__drop_schema(relation) -%}
{% if relation.database -%}
{{ adapter.verify_database(relation.database) }}
{%- endif -%}
{%- call statement('drop_schema') -%}
drop schema if exists {{ relation.without_identifier().include(database=False) }} cascade
{%- endcall -%}
{% endmacro %}

*/

{% macro databend__drop_relation(relation) -%}
'''Deletes relatonship identifer between tables.'''
/*
Expand All @@ -79,30 +32,6 @@ dbt docs: https://docs.getdbt.com/docs/contributing/building-a-new-adapter
{%- endcall -%}
{% endmacro %}

/*

Example of 1 of 3 required macros that does not have a default implementation
{% macro postgres__get_columns_in_relation(relation) -%}
{% call statement('get_columns_in_relation', fetch_result=True) %}
select
column_name,
data_type,
character_maximum_length,
numeric_precision,
numeric_scale
from {{ relation.information_schema('columns') }}
where table_name = '{{ relation.identifier }}'
{% if relation.schema %}
and table_schema = '{{ relation.schema }}'
{% endif %}
order by ordinal_position
{% endcall %}
{% set table = load_result('get_columns_in_relation').table %}
{{ return(sql_convert_columns_in_relation(table)) }}
{% endmacro %}
*/


{% macro databend__get_columns_in_relation(relation) -%}
'''Returns a list of Columns in a table.'''
/*
Expand Down Expand Up @@ -131,31 +60,6 @@ dbt docs: https://docs.getdbt.com/docs/contributing/building-a-new-adapter

-- Example of 2 of 3 required macros that do not come with a default implementation

/*

{% macro postgres__list_relations_without_caching(schema_relation) %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
select
'{{ schema_relation.database }}' as database,
tablename as name,
schemaname as schema,
'table' as type
from pg_tables
where schemaname ilike '{{ schema_relation.schema }}'
union all
select
'{{ schema_relation.database }}' as database,
viewname as name,
schemaname as schema,
'view' as type
from pg_views
where schemaname ilike '{{ schema_relation.schema }}'
{% endcall %}
{{ return(load_result('list_relations_without_caching').table) }}
{% endmacro %}

*/

{% macro databend__list_relations_without_caching(schema_relation) -%}
'''creates a table of relations withough using local caching.'''
{% call statement('list_relations_without_caching', fetch_result=True) -%}
Expand Down Expand Up @@ -219,15 +123,6 @@ dbt docs: https://docs.getdbt.com/docs/contributing/building-a-new-adapter
{% do return(None) %}
{%- endmacro %}

/*

Example 3 of 3 of required macros that does not have a default implementation.
** Good example of building out small methods ** please refer to impl.py for implementation of now() in postgres plugin
{% macro postgres__current_timestamp() -%}
now()
{%- endmacro %}

*/
{% macro databend__current_timestamp() -%}
NOW()
{%- endmacro %}
Expand All @@ -238,7 +133,7 @@ Example 3 of 3 of required macros that does not have a default implementation.
{{ sql_header if sql_header is not none }}

{% if temporary -%}
create transient table {{ relation.name }}
create transient table {{ relation.name }} if not exist
{%- else %}
create table {{ relation.include(database=False) }}
{{ cluster_by_clause(label="cluster by") }}
Expand Down
Loading