-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Add a public interface for custom weight_rule implementation #36029
[WIP] Add a public interface for custom weight_rule implementation #36029
Conversation
airflow/migrations/versions/0132_2_8_0_add_priority_weight_strategy_to_task_.py
Outdated
Show resolved
Hide resolved
…ntation (apache#35210)" (apache#36066)" This reverts commit f60d458.
…35210) * Add a public interface for custom weight_rule implementation * Remove _weight_strategy attribute * Move priority weight calculation to TI to support advanced strategies * Fix loading the var from mapped operators and simplify loading it from task * Update default value and deprecated the other one * Update task endpoint API spec * fix tests * Update docs and add dag example * Fix serialization test * revert change in spark provider * Update unit tests
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally like the idea f custom weight rules and I had a couple of cases where I thought it would be cool to inject this function. PR looks mostly good but I'd like to understand if there is really a need for:
- Adding a DB column per task instance and serializing (new) context informationfor every task instance. Sounds to me like a lot of overhead for potentially millions of tuples in the DB for a very special use case. I'd prefer to assume that in 95% of cases the task context is sufficient to calculate.
- Why the parameter on the task is renamed. If is serving the same purpose and has the same config property like before, renaming needs activities by all users to change DAGs whereas I assume for 9% of users this is not having any effect.
- Docs are very brief. For the complexity added, especially in terms of serialization needed I'd miss more details and a pointer to the example provided.
Please don't judge it negative but I assume we need a proper review and feel like the complexity added needs a rationale as the DB schema changes might have a negative side impact on large high throughput setups. But other opinions welcome. I feel too much overhead for a niche feature.
def upgrade(): | ||
"""Apply add priority_weight_strategy to task_instance""" | ||
with op.batch_alter_table("task_instance") as batch_op: | ||
batch_op.add_column(sa.Column("_priority_weight_strategy", sa.JSON())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand we need to store some context information about the selcted priority weight strategy - but do we really need to add this to the DB? TaskInstance is the most largest table in the DB scheme and potentially contains millions of rows. Do we really want to store the same values in mostly millions of cases? Or can we leave it NULL
and store only a value if we have this special rule and data needs stored?
I like the approach of this PR in general but fear this will create a lot of overhead in DB. Especially as it is a JSON field.
@@ -575,6 +579,11 @@ class derived from this one results in the creation of a task object, | |||
significantly speeding up the task creation process as for very large | |||
DAGs. Options can be set as string or using the constants defined in | |||
the static class ``airflow.utils.WeightRule`` | |||
:param priority_weight_strategy: weighting method used for the effective total priority weight |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not fully understand why we need to rename the field. Even if we are using a custom implementation, is the old name not matching anymore? Especially as the same parameters apply.
In this way we force all customers to change the DAG definitions as we migrate the parameter name, in 95% of cases the parameter and function does not change.
importable_string = qualname(priority_weight_strategy_class) | ||
if _get_registered_priority_weight_strategy(importable_string) is None: | ||
raise _PriorityWeightStrategyNotRegistered(importable_string) | ||
return {Encoding.TYPE: importable_string, Encoding.VAR: var.serialize()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we assume we need to store a state for the strategy? Would it not enough just to use the task context information and the python code? Would we assume real "data" needs to be serialized for a custom strategy that need to be stored/retrieved? I feel a lot of overhead for very special use cases where strategy state information needs to be persisted.
Do you have a use case in mind where per task (other than the context) data needs to be persisted in a custom strategy? I feel this is a niche use case - but maybe I don't have a use case in mind.
def get_weight(self, ti: TaskInstance): | ||
"""Get the priority weight of a task.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two suggestions:
- Return type missing? What is returned?
- Is it a relative or absolute weight?
def get_weight(self, ti: TaskInstance): | |
"""Get the priority weight of a task.""" | |
def get_weight(self, ti: TaskInstance) -> int: | |
"""Get the absolute priority weight of a task.""" |
``deserialize`` when the DAG is deserialized. The default implementation returns | ||
an empty dict. | ||
""" | ||
return {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a real use case that state data needs to be persisted (see other comment s- do we really need this?) can we optimize in a way that if no data needs to be persisted that None
is returned? Would at least leave the DB column empty in 95% of cases I feel.
class AbsolutePriorityWeightStrategy(PriorityWeightStrategy): | ||
"""Priority weight strategy that uses the task's priority weight directly.""" | ||
|
||
def get_weight(self, ti: TaskInstance): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def get_weight(self, ti: TaskInstance): | |
def get_weight(self, ti: TaskInstance) -> int: |
class DownstreamPriorityWeightStrategy(PriorityWeightStrategy): | ||
"""Priority weight strategy that uses the sum of the priority weights of all downstream tasks.""" | ||
|
||
def get_weight(self, ti: TaskInstance): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def get_weight(self, ti: TaskInstance): | |
def get_weight(self, ti: TaskInstance) -> int: |
class UpstreamPriorityWeightStrategy(PriorityWeightStrategy): | ||
"""Priority weight strategy that uses the sum of the priority weights of all upstream tasks.""" | ||
|
||
def get_weight(self, ti: TaskInstance): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def get_weight(self, ti: TaskInstance): | |
def get_weight(self, ti: TaskInstance) -> int: |
# def test_deserialization_across_process(self): | ||
# """A serialized DAG can be deserialized in another process.""" | ||
# | ||
# # Since we need to parse the dags twice here (once in the subprocess, | ||
# # and once here to get a DAG to compare to) we don't want to load all | ||
# # dags. | ||
# queue = multiprocessing.Queue() | ||
# proc = multiprocessing.Process(target=serialize_subprocess, args=(queue, "airflow/example_dags")) | ||
# proc.daemon = True | ||
# proc.start() | ||
# | ||
# stringified_dags = {} | ||
# while True: | ||
# v = queue.get() | ||
# if v is None: | ||
# break | ||
# dag = SerializedDAG.from_json(v) | ||
# assert isinstance(dag, DAG) | ||
# stringified_dags[dag.dag_id] = dag | ||
# | ||
# dags = collect_dags("airflow/example_dags") | ||
# assert set(stringified_dags.keys()) == set(dags.keys()) | ||
# | ||
# # Verify deserialized DAGs. | ||
# for dag_id in stringified_dags: | ||
# self.validate_deserialized_dag(stringified_dags[dag_id], dags[dag_id]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented code - does this need fixed or shall it be deleted?
I dont think this ia a niche feature. This is the heart and core of what orchestrator needs to have. |
"niche" in context we came to version 2.9.0 now and so far most of the demand is covered in contrast to adding a persistent blob to every task instance which we need million times in the DB - which is a high performance impact. Even it is experimental we are extending the DB model at the core where we need to carefully think about every byte we spend because it will be multiplied a million times in each instance. Even if experimental (which I think is rather about the interface/programming model) I doubt that once established we will rollback the DB scheme changes. Even the example supplied does not require persistence of any information and the use cases I have in mind also do not need a blob/JSON persisted. On top how is the persisted data filled initially when "class" is attached to the task? At parse time? To the term niche on top: If we assume 10% of installations will implement a custom weight strategy (If it is above we might consider common strategies to be added to core anyway) and in these setups 10% of tasks need this strategy (Today in less than 10% of tasks the priority is typically considered at-all). I can imagine only few use cases where a state for the weight strategy needs to be persisted, let's say in 10% of cases (other might be rather based on context or other criteria's) then in 99,9% of cases we have an empty persisted state column for the strategy. Also consider that the overall DAG flow we implemented today for the execution in general is "state-free" except DAG params, context parameters (static at parse time except Jinja templating) and upstream XCom... there might be also a very high demand storing state of a execution for a retry or similar. Today the task model also does not allow storing additional state for retries other than mis-using variables or external systems (temp files on S3). Why are we then adding a persistence across executions just for weight strategy? (I understand that the Scheduler has no access to DAGs and besides Python code this might be the only change to bring parameters into Scheduler) Which are the cases that need persistence? Is it that a weight rule just needs a parameter for 10 options? Then you could work-around by adding 10 weight strategies as well. I'd favor if it is not known to make a first step w/o additional persistence but just supplying the option of Python code only extension. If there is a strong demand about persistence think about how it can be implemented w/o impacting all other use cases. See also XCom for example, DB tuples and storage is just consumed if XCom is produced. Not on every task. But I'd be OK if you tell me the use case that urgently needs persistence across task schedules, then a DB overhead might be reasonable. Then also the example should include this. |
The feature was actually merged for 2.8 and then reverted due to failing tests so it missed the 2.8 cut. Its not a recent new change. I'd rather we make an attepmt to solve the pain if we can then wait for 2.10 We can find solutions and warn about possible performance issues like having the feature off by default and users must set it explicitly in the settings... I am not so worried about this part. We always find ways to make everyone happy :) |
I was thinking of use cases where persistence is needed and still have doubts. The use cases that came into my mind:
I understand that it was merged and reverted in 2.8 - but this was regarding security. Performance concerns stay and this is valid to be experimental. My concerns here are mainly performance. Adding millions of Blob into the biggest table in the DB Scheme needs some considerations. Otherwise all Airflow users need to throw additional hardware on the environment to compensate the overhead of this feature. Extending the DB scheme is nothing that is optional for all users. It might impact all. Means: I am totally fine except persistence. (plus the other comment about parameter rename) |
@jscheffl The goal was to make it generic and flexible to allow users to implement any type of strategy, even if the implemented strategy impacts performance we can simply recommend not to do it but not block it. Anyway, I like @eladkal suggestion of making it experimental, in this case we can implement it in different stages, for example in this release (2.9.0) we can support initiating class without parameters (without the need to be serialized), and in the next release we can decide whether to support the full serialized instance or not. |
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
related: #35210