Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Support INCREMENTAL_BY_TIME_RANGE models on Athena/Hive #3201

Merged
merged 3 commits into from
Oct 2, 2024

Conversation

erindru
Copy link
Collaborator

@erindru erindru commented Sep 30, 2024

ref: issue #1315

@erindru erindru force-pushed the erin/athena-hive-incremental branch from 2906dbc to a30f1b4 Compare October 1, 2024 02:14
@erindru erindru marked this pull request as draft October 1, 2024 02:15
@erindru erindru force-pushed the erin/athena-hive-incremental branch 4 times, most recently from a2f9153 to b623489 Compare October 1, 2024 03:59
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll revert these prior to merge.

As an aside, we need a better way of allowing cloud test runs to be triggered on the relevant PR's.

CircleCI appears to be fairly limited in this regard but we might be able to do something with GitHub Actions triggering a CircleCI workflow in response to a PR comment

@erindru erindru marked this pull request as ready for review October 1, 2024 04:50
@erindru erindru requested a review from izeigerman October 1, 2024 20:33
# Truncating a partitioned Hive table is dropping all partitions and deleting the data from S3
if self._is_hive_partitioned_table(table_name):
self._clear_partition_data(table_name, exp.true())
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elif s3_location := self._query...

@@ -317,6 +342,8 @@ def _query_table_type(
Hit the DB to check if this is a Hive or an Iceberg table
"""
table_name = exp.to_table(table_name)
quote_identifiers(table_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because the query its used in, SHOW TBLPROPERTIES {table} is just a string and not an Expression.

since we are string building, we need to quote the identifiers manually. I couldn't figure out how to make it an Expression because sqlglot just turns it into an exp.Command string literal anyway

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, we can remove this and on line 349 do sql(dialect='hive', identify=True)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh cool, forgot you could do that!

@@ -326,6 +353,15 @@ def _query_table_type(
return "hive"
return "iceberg"

def _is_hive_partitioned_table(self, table_name: exp.Table) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this takes a Table, maybe just call this arg table

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, yeah I kept switching between TableName and exp.Table because the parent classes use TableName but checking for strings all the time was getting tedious.

The general pattern throughout the codebase of making all the methods take a str or an exp.Expression is nice in some ways but annoying in others

# Athena doesnt support TRUNCATE TABLE. The closest thing is DELETE FROM <table> but it only works on Iceberg
self.execute(f"DELETE FROM {table.sql(dialect=self.dialect, identify=True)}")
if isinstance(table_name, str):
table_name = exp.to_table(table_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think this one is necessary, because you handle it it in all the other functions right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_truncate_table needs this signature (or mypy will fail) because it's defined in the EngineAdapter superclass and im just overriding it

**kwargs,
)

def _clear_partition_data(self, table: TableName, where: t.Optional[exp.Condition]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table -> table_name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or what you should do is clean up so all internal methods take in exp.Table instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or what you should do is clean up so all internal methods take in exp.Table instead

Unfortunately a bunch are defined in the superclass which means they cant be easily cleaned up. I'll clean up the ones that only exist in this adapter class though


partition_values = [list(r) for r in self.fetchall(query, quote_identifiers=True)]

if len(partition_values) > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if partition_values


return []

def _query_table_s3_location(self, table_name: exp.Table) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table

@erindru erindru force-pushed the erin/athena-hive-incremental branch from b623489 to 94f6e9e Compare October 2, 2024 20:56
@erindru erindru merged commit bffefbe into main Oct 2, 2024
23 checks passed
@erindru erindru deleted the erin/athena-hive-incremental branch October 2, 2024 22:37
justinjoseph89 added a commit to trygforsikring/sqlmesh that referenced this pull request Oct 21, 2024
* Fix: mark kind changes as breaking in forward only plan (TobikoData#3207)

* Feat: add support for parameterized python model names (TobikoData#3208)

* Fix: Bigquery support of complex nested types (TobikoData#3190)

* Feat: Snowflake: Handle forward_only changes to 'clustered_by' (TobikoData#3205)

* Docs: add gateway variables to jinja macros concepts doc (TobikoData#3210)

* Fix: avoid parsing column names into qualified columns in InsertOverwriteWithMergeMixin (TobikoData#3211)

* Chore: bump sqlglot to v25.24.2 (TobikoData#3213)

* Feat: Support INCREMENTAL_BY_TIME_RANGE models on Athena/Hive (TobikoData#3201)

* Fix: load custom materializations on run (TobikoData#3216)

* Fix: Infer column types when data type is omitted in dbt seeds (TobikoData#3215)

* Chore: bump sqlglot to v25.24.3 (TobikoData#3217)

* Fix: DBT seed column order (TobikoData#3221)

* fix: web reloading caused iteration error (TobikoData#3220)

* Fix: Make dbt adapter macros available in the local scope (TobikoData#3219)

* Feat: Support DBT Athena adapter (TobikoData#3222)

* chore: docs

* Feat: Support SQLMesh project generation from dlt pipeline (TobikoData#3218)

* Fix: Broken hive distro link in the test airflow image

* Fix: Prevent loaded context from being used concurrently (TobikoData#3229)

* Fix: Go back to using hive 3.1.3 for the Airflow test image

* Fix: Support of custom roles for Postgres (TobikoData#3230)

* Fix(redshift): regression in varchar length workaround (TobikoData#3225)

* Fix: Force the CircleCI's git to use https links when running pre-commit (TobikoData#3235)

* Fix: reset macro registry *after* loading models (TobikoData#3232)

* Fix: Modify dlt query filter not to use alias reference (TobikoData#3233)

* Fix: Support CLUSTER BY clause for the Databricks engine (TobikoData#3234)

* Feat: BigQuery - Handle forward_only changes to clustered_by (TobikoData#3231)

* chore: Fix typo in model_kinds.md (TobikoData#3239)

* Feat: support custom unit testing schema names (TobikoData#3238)

* Chore: Make the scheduler config extendable (TobikoData#3242)

* Fix: use parentheses for databricks' CLUSTER BY clause (TobikoData#3240)

* Fix: handle Paren in depends_on validator (TobikoData#3243)

* fix: data diff for bigquery project parsing (TobikoData#3248)

* Chore: Reintroduce parallelism in integration tests (TobikoData#3236)

* Feat(databricks): Add OAuth support (TobikoData#3250)

* Chore!: bump sqlglot to v25.25.0 (TobikoData#3252)

* Adding markdown feature to model description (TobikoData#3228)

* Fix: refactor table part parsing for Snowflake (TobikoData#3254)

* Fix: always warn when an audit has failed (TobikoData#3255)

* Chore: bump sqlglot to v25.25.1 (TobikoData#3256)

* Ensure using project instead of execution project for temp table as default (TobikoData#3249)

* Chore: Clarify that restatement plans ignore local changes (TobikoData#3257)

* feat!: run-all bot command errors if anything within it errors (TobikoData#3262)

* Fix(clickhouse): remove fractional seconds when time column is datetime/timestamp type (TobikoData#3261)

* remove risingwave configuration from dbt

* remove sink settings

* remove risngwave sink

* introducing risingwave as state syn engine

* add risingwave connetion as test

* change test case

* Fix: Prevent extraction of dependencies from a rendered query for dbt models (TobikoData#3263)

---------

Co-authored-by: Ben <9087625+benfdking@users.noreply.github.com>
Co-authored-by: Jo <46752250+georgesittas@users.noreply.github.com>
Co-authored-by: Themis Valtinos <73662635+Themiscodes@users.noreply.github.com>
Co-authored-by: Erin Drummond <erin.dru@gmail.com>
Co-authored-by: Trey Spiller <1831878+treysp@users.noreply.github.com>
Co-authored-by: Alexander Butler <41213451+z3z1ma@users.noreply.github.com>
Co-authored-by: Toby Mao <toby.mao@gmail.com>
Co-authored-by: Iaroslav Zeigerman <zeigerman.ia@gmail.com>
Co-authored-by: Vincent Chan <vchan@users.noreply.github.com>
Co-authored-by: Vaggelis Danias <daniasevangelos@gmail.com>
Co-authored-by: Harmuth94 <86912694+Harmuth94@users.noreply.github.com>
Co-authored-by: Christophe Oudar <kayrnt@gmail.com>
Co-authored-by: Chris Rericha <67359577+crericha@users.noreply.github.com>
Co-authored-by: Ryan Eakman <6326532+eakmanrq@users.noreply.github.com>
Co-authored-by: Justin Joseph <justin.joseph@tryg.dk>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants