-
Notifications
You must be signed in to change notification settings - Fork 16.3k
AIP-72: Port Variable.set From TaskSDK to Models #48177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-72: Port Variable.set From TaskSDK to Models #48177
Conversation
49268f9 to
51a3a7e
Compare
51a3a7e to
a076fa4
Compare
| def _set_variable(key: str, value: Any, description: str | None = None, serialize_json: bool = False) -> None: | ||
| from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable | ||
| from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS | ||
|
|
||
| if serialize_json: | ||
| import json | ||
|
|
||
| value = json.dumps(value, indent=2) | ||
| else: | ||
| value = str(value) | ||
|
|
||
| SUPERVISOR_COMMS.send_request(log=log, msg=PutVariable(key=key, value=value, description=description)) | ||
| msg = SUPERVISOR_COMMS.get_message() | ||
| if isinstance(msg, ErrorResponse): | ||
| raise AirflowRuntimeError(msg) | ||
| return | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I need to port the Secret Backend check_for_write_conflict logic from models ( https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/variable.py#L198-L199 ) to TaskSDK as well?
Just like _get_variable, right?
https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/context.py#L154-L171
cc @amoghrajesh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats right we need to do that as well
a076fa4 to
9f83b4c
Compare
|
Just rebased to latest main to resolve conflict. |
amoghrajesh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jason810496 this only handles the top level variables. We will also have to port over / allow this from the task runner level too.
Could you please handle that as well?
| def _set_variable(key: str, value: Any, description: str | None = None, serialize_json: bool = False) -> None: | ||
| from airflow.sdk.execution_time.comms import ErrorResponse, PutVariable | ||
| from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS | ||
|
|
||
| if serialize_json: | ||
| import json | ||
|
|
||
| value = json.dumps(value, indent=2) | ||
| else: | ||
| value = str(value) | ||
|
|
||
| SUPERVISOR_COMMS.send_request(log=log, msg=PutVariable(key=key, value=value, description=description)) | ||
| msg = SUPERVISOR_COMMS.get_message() | ||
| if isinstance(msg, ErrorResponse): | ||
| raise AirflowRuntimeError(msg) | ||
| return | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats right we need to do that as well
9f83b4c to
3ba45df
Compare
|
Discussed with @jason810496 on this one. There is more work left to do on this and it would be nice to do this in RC tomorrow eod. I will be working on this parallely and creating a companion PR that handles the missing gaps in this one too. |
|
Closing this one, as #49005 will address the issue. |
closes: #47920
Why
Setting a variable in Dag is failing due to 'Direct database access via the ORM is not allowed in Airflow 3.0' ( details in issue context )
What
Port
Variable.setfrom TaskSDK toairflow.models.Variable.set