Skip to content

Conversation

@kgw7401
Copy link
Contributor

@kgw7401 kgw7401 commented Jun 22, 2025

close: #50563

Hi i implemented start_from_trigger function for DataprocSubmitJobOperator. Pls review my pr

@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Jun 22, 2025
@kgw7401 kgw7401 marked this pull request as draft June 22, 2025 15:28
Comment on lines +263 to +269
query = session.query(TaskInstance).filter(
TaskInstance.dag_id == self.task_instance.dag_id,
TaskInstance.task_id == self.task_instance.task_id,
TaskInstance.run_id == self.task_instance.run_id,
TaskInstance.map_index == self.task_instance.map_index,
)
task_instance = query.one_or_none()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code will not be compatible with AF3. If you were looking for solutions for this functionality, I think it is better to use already existing trigger. If I understand correctly. Can you please show also your system tests results for this code, both for AF2 and AF3 as screenshots? thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for reviewing.
Actually when i ran system test, it failed so that i should fix the code. And i can't find ref about why the code that you mentioned couldn't be compatible with AF3. Could you give me ref for that?

Copy link
Contributor Author

@kgw7401 kgw7401 Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VladaZakharova Hi i wrote system test for new feature and all passed. but when i ran that in real airflow env using breeze i got error below and it's not only raised in start_from_trigger, but also just deferrable operator. is there something wrong with the af3 deferrable?

Traceback (most recent call last):

  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 923, in cleanup_finished_triggers
    result = details["task"].result()
             ^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 1032, in run_trigger
    async for event in trigger.run():

  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py", line 187, in run
    job = await self.get_async_hook().get_job(
                ^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py", line 71, in get_async_hook
    return DataprocAsyncHook(
           ^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/providers/google/src/airflow/providers/google/cloud/hooks/dataproc.py", line 1286, in __init__
    super().__init__(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain, **kwargs)

  File "/opt/airflow/providers/google/src/airflow/providers/google/common/hooks/base_google.py", line 280, in __init__
    self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/airflow-core/src/airflow/hooks/base.py", line 64, in get_connection
    conn = Connection.get_connection_from_secrets(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/airflow-core/src/airflow/models/connection.py", line 481, in get_connection_from_secrets
    conn = TaskSDKConnection.get(conn_id=conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 152, in get
    return _get_connection(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 155, in _get_connection
    msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 708, in send
    return async_to_sync(self.asend)(msg)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/lib/python3.11/site-packages/asgiref/sync.py", line 186, in __call__
    raise RuntimeError(

RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the system test I managed to run it (example_dataproc_start_from_trigger.py) sucessfully (AF3).

@kgw7401 kgw7401 marked this pull request as ready for review July 2, 2025 05:37
@eladkal eladkal requested a review from RNHTTR July 9, 2025 18:29
@kgw7401 kgw7401 marked this pull request as draft July 9, 2025 22:46
@kgw7401
Copy link
Contributor Author

kgw7401 commented Jul 9, 2025

@RNHTTR hi, this PR is related to #52981, so it's still working. if i finish this pr, i'll notify you later thx!

@kgw7401 kgw7401 marked this pull request as ready for review July 20, 2025 14:10
@kgw7401
Copy link
Contributor Author

kgw7401 commented Jul 20, 2025

@RNHTTR I'm ready for review. Please check my pr when you have some time!

@RNHTTR
Copy link
Contributor

RNHTTR commented Jul 21, 2025

Hey @kgw7401 -- looks like there's a merge conflict in providers/google/tests/unit/google/cloud/triggers/test_dataproc.py

@kgw7401
Copy link
Contributor Author

kgw7401 commented Jul 22, 2025

Hey @kgw7401 -- looks like there's a merge conflict in providers/google/tests/unit/google/cloud/triggers/test_dataproc.py

@RNHTTR I've fixed it!

@potiuk potiuk requested a review from VladaZakharova July 22, 2025 16:36
@potiuk
Copy link
Member

potiuk commented Jul 22, 2025

@VladaZakharova ?

return None
# For other retry objects (like Retry instances), use None as fallback
# since they are complex objects that don't serialize well
return None
Copy link
Contributor

@olegkachur-e olegkachur-e Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that is what users expect providing the retries.
Often None - means to wait forever.

There should be a way to pass it, in worst case I'd prefer to have an exception that such params are not supported, so please provide None explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs mentions this limitation as trigger_kwargs: Keyword arguments to pass to the trigger_cls when it’s initialized. Note that all the arguments need to be serializable by Airflow. It’s the main limitation of this feature.
If we don't need it, maybe then just avoid passing it?

raise e


class DataprocSubmitJobTrigger(DataprocBaseTrigger):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have to use this one exclusively for start_from_trigger I suggest to highlight it in the class name and docstring.

@github-actions
Copy link

github-actions bot commented Sep 9, 2025

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 9, 2025
@RNHTTR
Copy link
Contributor

RNHTTR commented Sep 16, 2025

Hey @kgw7401 this is at the finish line. Do you want to keep working on it?

@github-actions github-actions bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement direct-to-triggerer functionality for DataprocSubmitJobOperator

5 participants