Skip to content

Conversation

@dabla
Copy link
Contributor

@dabla dabla commented Apr 7, 2025

As I already explained at @potiuk and @ashb I'm experimenting with streamable/iterable XCom's which means Xcom's could be an iterable (now it' not supported yet) which would be evaluated at runtime when the consumer operator iterates over the results of the producing operator, which has the advantage of having the producer operator not needing to fetch all pages (and thus create all task instances) before passing it to the next operator, hence reducing memory usage as well as avoid unnecessary waiting times, and thus leading to much higher throughput.

When doing such test, I discovered that due to that aspect, the XCom where missing one important feature, which is filtering. We already have multiple operations like map, zip and concat, which are very handy, but not a filter. Not having this operation on the Xcom would mean I would again have to process the full XCom iterable by applying the filtering through a PythonOperator, which just removes the advantage of the streaming/iterable functionality before being able to pass it to the consuming operator, hence why this PR.

This PR doesn't of course add the streaming functionality, as this is still a WIP/POC, but at least would already provide us the filter operation on an XCom which is missing today.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@dabla dabla marked this pull request as draft April 7, 2025 11:55
@dabla dabla marked this pull request as ready for review April 7, 2025 14:11
@dabla dabla marked this pull request as draft April 7, 2025 14:14
@uranusjr
Copy link
Member

Also I think this is not a refactor PR, but a feature addition.

@dabla dabla changed the title refactor: Implemented filter operation on XCom Implemented filter operation on XCom's Apr 16, 2025
@uranusjr
Copy link
Member

Question about the get_task_map_length implementation: It seems that filter() does not change the length (since the call is simply passed onto the wrapped XComArg). This means for

@task
def g():
    return [0] * 4

@task
def p(v):
    print(v)

p.expand(v=g().filter(lambda x: x / 2))

The scheduler would still create 4 task instances for p. What would happen in each task instance? Do tis[1] and tis[3] get skipped, or tis[2] and tis[3]? Or something else?


Also I think it would be a good idea to split this into two PRs; one that solely do refactoring, and the other add filter and the related mechanism. This would make reviewing a lot easier.

@dabla
Copy link
Contributor Author

dabla commented Apr 16, 2025

Question about the get_task_map_length implementation: It seems that filter() does not change the length (since the call is simply passed onto the wrapped XComArg). This means for

Wouldn't the get_task_map_length derive the length from the _FilterResult, as there the len would be lazely calculated when needed in conjuction with expanded tasks?

Never mind found it you're right will fix it:

@get_task_map_length.register
def _(xcom_arg: SchedulerFilterXComArg, run_id: str, *, session: Session):
    return get_task_map_length(xcom_arg.arg, run_id, session=session)

@dabla
Copy link
Contributor Author

dabla commented Apr 16, 2025

Hmmm, dunno if it's possible to determine the length, as you need the context to resolve the arg and do the actual filtering? Ofc as we are using the streaming functionality in combination with the filtering, we don't have that issue as we don't need to know the task map length in advance. Maybe we could raise an AttributeError in that case, hence also probably a reason why the filter operation was never implemented before.

@dabla
Copy link
Contributor Author

dabla commented Apr 16, 2025

Maybe do something like:

@get_task_map_length.register
def _(xcom_arg: SchedulerFilterXComArg, run_id: str, *, session: Session):
    raise NotImplementedError(
        "Cannot determine map length for FilterXComArg until filtered values are fully evaluated"
    )

@uranusjr
Copy link
Member

We need a length value somehow because the scheduler needs to know how many tis to run. Since it is not possible to actually know the real length, I think one possibility would be to just use the original length and create potentially too many tis, and just mark the ones not needed as skipped afterwards.

@dabla
Copy link
Contributor Author

dabla commented Jun 4, 2025

We need a length value somehow because the scheduler needs to know how many tis to run. Since it is not possible to actually know the real length, I think one possibility would be to just use the original length and create potentially too many tis, and just mark the ones not needed as skipped afterwards.

The length issue for filter would not be an issue anymore if following PR would be accepted, so best to wait what comes out of it.

@dabla
Copy link
Contributor Author

dabla commented Jul 2, 2025

Filtering with XCom's will only be possible once AIP-88 is implemented in following PR, so until then this PR makes no sense as in current implementation Xcom's have to know their length in advance which is impossible with filtering as it will possibily alter the length of Xcom's while filtering.

@dabla dabla closed this Jul 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants