-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Closed
Description
Problem
With the new TaskSDK, the log lines should be in a json format, but it is hard to actually parse from the API response.
- The content is a tuple of some key string and then a large log string
- Then we add a header to logs which is not in a json format
airflow/airflow/utils/log/file_task_handler.py
Lines 414 to 420 in 4b65163
messages = " INFO - ::group::Log message source details\n" messages += "".join([f"*** {x}\n" for x in messages_list]) messages += " INFO - ::endgroup::\n" end_of_log = ti.try_number != try_number or ti.state not in ( TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED, ) - When we turn the logs response into a string in fastapi, they're all separated by a newline char, but any json nested inside of a log line also ends with a newline char. The UI can't even rely on splitting the logs by
\n.
Today the response for logs today can look like:
{
"content": "[('05958247049c', ' INFO - ::group::Log message source details\\n*** Found local files:\\n*** * /root/airflow/logs/dag_id=example_python_operator/run_id=manual__2025-02-06T17:38:58.899941+00:00/task_id=print_the_context/attempt=1.log\\n INFO - ::endgroup::\\n{\"timestamp\":\"2025-02-06T17:38:59.534108\",\"level\":\"info\",\"event\":\"DAG bundles loaded: dags-folder, example_dags\",\"logger\":\"airflow.dag_processing.bundles.manager.DagBundlesManager\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.534920\",\"level\":\"info\",\"event\":\"Filling up the DagBag from /opt/airflow/airflow/example_dags/example_python_operator.py\",\"logger\":\"airflow.models.dagbag.DagBag\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.538546\",\"level\":\"debug\",\"event\":\"Importing /opt/airflow/airflow/example_dags/example_python_operator.py\",\"logger\":\"airflow.models.dagbag.DagBag\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.554567\",\"level\":\"debug\",\"event\":\"Loaded DAG <DAG: example_python_operator>\",\"logger\":\"airflow.models.dagbag.DagBag\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.554924\",\"level\":\"debug\",\"event\":\"DAG file parsed\",\"file\":\"example_python_operator.py\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.555454\",\"level\":\"debug\",\"event\":\"Sending request\",\"json\":\"{\\\\\"rendered_fields\\\\\":{\\\\\"templates_dict\\\\\":null,\\\\\"op_args\\\\\":\\\\\"()\\\\\",\\\\\"op_kwargs\\\\\":{}},\\\\\"type\\\\\":\\\\\"SetRenderedFields\\\\\"}\\\\n\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.592930\",\"level\":\"warning\",\"event\":\"PythonOperator.execute cannot be called outside TaskInstance!\",\"logger\":\"airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.593413Z\",\"level\":\"info\",\"event\":\"::group::All kwargs\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.594224Z\",\"level\":\"info\",\"event\":\"{\\'conn\\': <ConnectionAccessor (dynamic access)>,\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.594278Z\",\"level\":\"info\",\"event\":\" \\'dag\\': <DAG: example_python_operator>,\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.594672Z\",\"level\":\"info\",\"event\":\" \\'dag_run\\': DagRun(dag_id=\\'example_python_operator\\', run_id=\\'manual__2025-02-06T17:38:58.899941+00:00\\', logical_date=datetime.datetime(2025, 2, 6, 17, 38, 58, 909230, tzinfo=TzInfo(UTC)), data_interval_start=datetime.datetime(2025, 2, 6, 17, 38, 58, 909230, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2025, 2, 6, 17, 38, 58, 909230, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2025, 2, 6, 17, 38, 59, 283838, tzinfo=TzInfo(UTC)), end_date=None, run_type=<DagRunType.MANUAL: \\'manual\\'>, conf={}, external_trigger=False),\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.594812Z\",\"level\":\"info\",\"event\":\" \\'data_interval_end\\': datetime.datetime(2025, 2, 6, 17, 38, 58, 909230, tzinfo=TzInfo(UTC)),\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.594899Z\",\"level\":\"info\",\"event\":\" \\'data_interval_start\\': datetime.datetime(2025, 2, 6, 17, 38, 58, 909230, tzinfo=TzInfo(UTC)),\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.594921Z\",\"level\":\"info\",\"event\":\" \\'ds_nodash\\': \\'20250206\\',\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.594947Z\",\"level\":\"info\",\"event\":\" \\'inlets\\': [],\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.595067Z\",\"level\":\"info\",\"event\":\" \\'logical_date\\': datetime.datetime(2025, 2, 6, 17, 38, 58, 909230, tzinfo=TzInfo(UTC)),\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.595215Z\",\"level\":\"info\",\"event\":\" \\'macros\\': <MacrosAccessor (dynamic access to macros)>,\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.595258Z\",\"level\":\"info\",\"event\":\" \\'map_index_template\\': None,\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.595279Z\",\"level\":\"info\",\"event\":\" \\'outlet_events\\': <airflow.sdk.execution_time.context.OutletEventAccessors object at 0xffff835e4550>,\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.595297Z\",\"level\":\"info\",\"event\":\" \\'outlets\\': [],\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.595314Z\",\"level\":\"info\",\"event\":\" \\'params\\': {},\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.594893\",\"level\":\"debug\",\"event\":\"Sending request\",\"json\":\"{\\\\\"ti_id\\\\\":\\\\\"0194dc57-f2f9-7f8e-9a3a-0b11c0b2c7b8\\\\\",\\\\\"type\\\\\":\\\\\"GetPrevSuccessfulDagRun\\\\\"}\\\\n\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606175Z\",\"level\":\"info\",\"event\":\" \\'prev_data_interval_end_success\\': <Proxy at 0xffff9b7e1dc0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffff9b7fc940>>,\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606258Z\",\"level\":\"info\",\"event\":\" \\'prev_data_interval_start_success\\': <Proxy at 0xffff9b86f080 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffff9b7fc9d0>>,\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606281Z\",\"level\":\"info\",\"event\":\" \\'prev_end_date_success\\': <Proxy at 0xffff9b8a4540 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffff9b7fcca0>>,\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606300Z\",\"level\":\"info\",\"event\":\" \\'prev_start_date_success\\': <Proxy at 0xffff9b9741c0 with factory <function RuntimeTaskInstance.get_template_context.<locals>.<lambda> at 0xffff9b7fcc10>>,\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606317Z\",\"level\":\"info\",\"event\":\" \\'run_id\\': \\'manual__2025-02-06T17:38:58.899941+00:00\\',\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606333Z\",\"level\":\"info\",\"event\":\" \\'task\\': <Task(PythonOperator): print_the_context>,\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606350Z\",\"level\":\"info\",\"event\":\" \\'task_instance\\': RuntimeTaskInstance(id=UUID(\\'0194dc57-f2f9-7f8e-9a3a-0b11c0b2c7b8\\'), task_id=\\'print_the_context\\', dag_id=\\'example_python_operator\\', run_id=\\'manual__2025-02-06T17:38:58.899941+00:00\\', try_number=1, map_index=-1, hostname=\\'05958247049c\\', task=<Task(PythonOperator): print_the_context>, max_tries=0),\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606369Z\",\"level\":\"info\",\"event\":\" \\'task_instance_key_str\\': \\'example_python_operator__print_the_context__20250206\\',\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606390Z\",\"level\":\"info\",\"event\":\" \\'templates_dict\\': None,\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606410Z\",\"level\":\"info\",\"event\":\" \\'ti\\': RuntimeTaskInstance(id=UUID(\\'0194dc57-f2f9-7f8e-9a3a-0b11c0b2c7b8\\'), task_id=\\'print_the_context\\', dag_id=\\'example_python_operator\\', run_id=\\'manual__2025-02-06T17:38:58.899941+00:00\\', try_number=1, map_index=-1, hostname=\\'05958247049c\\', task=<Task(PythonOperator): print_the_context>, max_tries=0),\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606424Z\",\"level\":\"info\",\"event\":\" \\'ts\\': \\'2025-02-06T17:38:58.909230+00:00\\',\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606448Z\",\"level\":\"info\",\"event\":\" \\'ts_nodash\\': \\'20250206T173858\\',\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606464Z\",\"level\":\"info\",\"event\":\" \\'ts_nodash_with_tz\\': \\'20250206T173858.909230+0000\\',\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606478Z\",\"level\":\"info\",\"event\":\" \\'var\\': {\\'json\\': <VariableAccessor (dynamic access)>,\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606491Z\",\"level\":\"info\",\"event\":\" \\'value\\': <VariableAccessor (dynamic access)>}}\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606504Z\",\"level\":\"info\",\"event\":\"::endgroup::\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606518Z\",\"level\":\"info\",\"event\":\"::group::Context variable ds\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606533Z\",\"level\":\"info\",\"event\":\"2025-02-06\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606546Z\",\"level\":\"info\",\"event\":\"::endgroup::\",\"chan\":\"stdout\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.605803\",\"level\":\"info\",\"event\":\"Done. Returned value was: Whatever you return gets printed in the logs\",\"logger\":\"airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.606060\",\"level\":\"debug\",\"event\":\"Sending request\",\"json\":\"{\\\\\"key\\\\\":\\\\\"return_value\\\\\",\\\\\"value\\\\\":\\\\\"\\\\\\\\\\\\\"Whatever you return gets printed in the logs\\\\\\\\\\\\\"\\\\\",\\\\\"dag_id\\\\\":\\\\\"example_python_operator\\\\\",\\\\\"run_id\\\\\":\\\\\"manual__2025-02-06T17:38:58.899941+00:00\\\\\",\\\\\"task_id\\\\\":\\\\\"print_the_context\\\\\",\\\\\"map_index\\\\\":-1,\\\\\"mapped_length\\\\\":null,\\\\\"type\\\\\":\\\\\"SetXCom\\\\\"}\\\\n\",\"logger\":\"task\"}\\n{\"timestamp\":\"2025-02-06T17:38:59.607180\",\"level\":\"debug\",\"event\":\"Sending request\",\"json\":\"{\\\\\"state\\\\\":\\\\\"success\\\\\",\\\\\"end_date\\\\\":\\\\\"2025-02-06T17:38:59.606095Z\\\\\",\\\\\"task_outlets\\\\\":[],\\\\\"outlet_events\\\\\":[],\\\\\"type\\\\\":\\\\\"SucceedTask\\\\\"}\\\\n\",\"logger\":\"task\"}')]",
"continuation_token": "eyJlbmRfb2ZfbG9nIjp0cnVlLCJsb2dfcG9zIjo4NjY4fQ.ROimXexWdP3hshTwW7eSlpgyzHM"
}
Desired outcome
- The log header should be rewritten to match the json format the log lines already have.
- If the log lines are json, then FastAPI should return a json array. Ideally, with defined types.
- If the logs are not valid json then the API should return the logs as a single string, then the UI knows it should just display the logs and to not format them.