-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove Provider Deprecations in Trino #44717
Conversation
Can you fix the tests please ? |
Would you please tell me how to fix this test ? There is only one deprecated class When I fixed To fix I have added that files again but but it failed here. |
This tests uses TrinoOperator: @mock.patch.dict("os.environ", AIRFLOW_CONN_TRINO_DEFAULT="trino://airflow@trino:8080/")
def test_openlineage_methods(self):
op = TrinoOperator(task_id="trino_test", sql="SELECT name FROM tpch.sf1.customer LIMIT 3")
op.execute({})
lineage = op.get_openlineage_facets_on_start()
assert lineage.inputs[0].namespace == "trino://trino:8080"
assert lineage.inputs[0].name == "tpch.sf1.customer"
assert "schema" in lineage.inputs[0].facets
assert lineage.job_facets["sql"].query == "SELECT name FROM tpch.sf1.customer LIMIT 3" Replace with SQLExecuteQueryOperator -as suggested by deprecation message. |
Done! |
def test_execute_openlineage_events(): | ||
DB_NAME = "tpch" | ||
DB_SCHEMA_NAME = "sf1" | ||
|
||
class TrinoHookForTests(TrinoHook): | ||
get_conn = mock.MagicMock(name="conn") | ||
get_connection = mock.MagicMock() | ||
|
||
def get_first(self, *_): | ||
return [f"{DB_NAME}.{DB_SCHEMA_NAME}"] | ||
|
||
dbapi_hook = TrinoHookForTests() | ||
|
||
sql = "SELECT name FROM tpch.sf1.customer LIMIT 3" | ||
op = SQLExecuteQueryOperator(task_id="trino-operator", sql=sql, conn_id=TRINO_DEFAULT) | ||
op._hook = dbapi_hook | ||
rows = [ | ||
(DB_SCHEMA_NAME, "customer", "custkey", 1, "bigint", DB_NAME), | ||
(DB_SCHEMA_NAME, "customer", "name", 2, "varchar(25)", DB_NAME), | ||
(DB_SCHEMA_NAME, "customer", "address", 3, "varchar(40)", DB_NAME), | ||
(DB_SCHEMA_NAME, "customer", "nationkey", 4, "bigint", DB_NAME), | ||
(DB_SCHEMA_NAME, "customer", "phone", 5, "varchar(15)", DB_NAME), | ||
(DB_SCHEMA_NAME, "customer", "acctbal", 6, "double", DB_NAME), | ||
] | ||
dbapi_hook.get_connection.return_value = Connection( | ||
conn_id=TRINO_DEFAULT, | ||
conn_type="trino", | ||
host="trino", | ||
port=8080, | ||
) | ||
dbapi_hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = [rows, []] | ||
|
||
lineage = op.get_openlineage_facets_on_start() | ||
assert lineage.inputs == [ | ||
Dataset( | ||
namespace="trino://trino:8080", | ||
name=f"{DB_NAME}.{DB_SCHEMA_NAME}.customer", | ||
facets={ | ||
"schema": SchemaDatasetFacet( | ||
fields=[ | ||
SchemaDatasetFacetFields(name="custkey", type="bigint"), | ||
SchemaDatasetFacetFields(name="name", type="varchar(25)"), | ||
SchemaDatasetFacetFields(name="address", type="varchar(40)"), | ||
SchemaDatasetFacetFields(name="nationkey", type="bigint"), | ||
SchemaDatasetFacetFields(name="phone", type="varchar(15)"), | ||
SchemaDatasetFacetFields(name="acctbal", type="double"), | ||
] | ||
) | ||
}, | ||
) | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this was suppose to be removed?
this is testing open lineage integration with TrinoHook
cc @kacpermuda @mobuchowski
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, i think it should be adjusted after the removal of deprecated code and not fully removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this was probably removed by mistake.
@Prab-27 can you revert that test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was probably removed as the operators/trino.py
file has been removed, so it makes sense that the corresponding test file is removed, but IMHO we should still keep this test in the trino provider (maybe in a differently named file).
@@ -50,7 +50,7 @@ def test_should_record_records_with_kerberos_auth(self): | |||
|
|||
@mock.patch.dict("os.environ", AIRFLOW_CONN_TRINO_DEFAULT="trino://airflow@trino:8080/") | |||
def test_openlineage_methods(self): | |||
op = TrinoOperator(task_id="trino_test", sql="SELECT name FROM tpch.sf1.customer LIMIT 3") | |||
op = SQLExecuteQueryOperator(task_id="trino_test", sql="SELECT name FROM tpch.sf1.customer LIMIT 3") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add conn_id="trino_default"
as this test is failing now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right it was removed by mistake.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya I'll correct that id
I am Sorry |
That would be ok |
Would you please tell me which test I should run to check if it is okay or not ? Does this work ? it passed at that time |
Here is an example where this test failed. It was triggered with |
Ah, sorry you meant the other test. Yes, the breeze command looks good. |
No, you are right. I am asking about the provider test, not Breeze, so I can know about that. |
@eladkal ,I am confused. Would You please tell me if I should add this method here providers/tests/integration/trino/hooks/test_trino.py because this file contains similar methods |
I'm not sure I follow the issue. Why are we discussing system tests path? |
Sorry , You are right I have noticed a similar method |
Related: #44559
Remove deprecated code from trino provider from operators and test. Update Changelog
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.