-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Closed
Labels
area:corekind:featureFeature RequestsFeature Requestsneeds-triagelabel for new issues that we didn't triage yetlabel for new issues that we didn't triage yet
Description
Description
I would like to still be able to write dag with a low level access to airflow database for operational purposes in airflow v3
it's convenient to be able to programmatically access the database session in a pythonoperator for specific cleaning / operational purposes
example
def clear_dag_runs(dag_id, status_to_clear):
context = get_current_context()
session = settings.Session()
query = session.query(DagRun).filter(
DagRun.state == status_to_clear, DagRun.dag_id == dag_id)
rst = query.all()
dag_bag = DagBag(dag_folder=path.join(SRC_FOLDER, 'dags'), include_examples=False)
dag: DAG = dag_bag.get_dag(dag_id, session)
for dag_run in rst:
dag.clear(
start_date=dag_run.logical_date,
end_date=dag_run.logical_date,
task_ids=None,
include_subdags=True,
include_parentdag=True,
only_failed=False,
)
session.close()
def delete_dag(dag_id):
context = get_current_context()
session = settings.Session()
keep_records_in_log = False
for model in get_sqla_model_classes():
if hasattr(model, "dag_id") and (not keep_records_in_log or model.__name__ != "Log"):
session.execute(
delete(model)
.where(model.dag_id.__eq__(dag_id))
.execution_options(synchronize_session="fetch")
)
PythonOperator(
task_id="delete",
python_callable=delete_dag,
op_kwargs={"dag_id": "{{params.dag_name}}"}
)
wdyt ?
Use case/motivation
No response
Related issues
No response
Are you willing to submit a PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
area:corekind:featureFeature RequestsFeature Requestsneeds-triagelabel for new issues that we didn't triage yetlabel for new issues that we didn't triage yet