Skip to content

Conversation

@gopidesupavan
Copy link
Member

@gopidesupavan gopidesupavan commented Apr 5, 2025

closes: #48820

realted: #48819 #48747

Why

Introducing a separate channel for trigger workloads.
Currently when multiple tasks are entering into trigger, the messages are mixing up with Trigger Workloads. this is due to the read side when we are trying to read same sys.stdin, causing this problem IMHO. When this mixing happens we are loosing the Trigger Workloads as its read in different place.

What

Created two new sockets to handle Trigger Workloads, when main process writes with trigger_stdin and in child process this Workloads will be reading from the trigger_requests_fd

Have observed this behaviour while testing DagStateTrigger/WorkflowTrigger

image

Connections are failing to retrieve in Trigger, when multiple tasks running in tirgger.

image

After

Retrieved connections properly for all the task and their own connection id, have created a separate connection for each task to verify whether messages are mixing up or not. Looking good.

image image image
  • Add tests

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@gopidesupavan
Copy link
Member Author

Does this approach okay?

child_comms, read_msgs = mkpipe()
child_logs, read_logs = mkpipe()

if "TriggerRunnerSupervisor" in cls.__name__:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open these sockets only when in triggerer, for others we dont need it.

@ashb
Copy link
Member

ashb commented Apr 6, 2025

I'll take a closer look at this on Monday

@gopidesupavan
Copy link
Member Author

I'll take a closer look at this on Monday

cool thanks :)

Copy link
Member

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just made a couple of nits

Makes sense to me, I'll let Ash have the final word on this.

Comment on lines +543 to +547
""""
It require special case to handle the workloads and api calls to api-server, due to mixing up messages
a separate channel is used to send the workloads from parent process to child process the child process.
connect_stdin will use this channel to read the workloads in read_workload method.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly those should field docstring should go bellow the attr they describe, not above.

)

def _send(self, msg: BaseModel):
self.trigger_stdin.write(msg.model_dump_json().encode("utf-8") + b"\n") # type: ignore[union-attr]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That type ignore seems of. It wasn't necessary before and it seems really similar now:

@gopidesupavan
Copy link
Member Author

Thanks @pierrejeambrun for review, will be closing this in favour of #48880

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Triggerer messages mixing up when the the tasks triggers continuously

3 participants