Skip to content

Commit

Permalink
Apply per-run log templates to log handlers (#24153)
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr authored Jun 7, 2022
1 parent 7498fba commit c238269
Show file tree
Hide file tree
Showing 25 changed files with 232 additions and 155 deletions.
1 change: 0 additions & 1 deletion airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
'filters': ['mask_secrets'],
},
'processor': {
Expand Down
1 change: 0 additions & 1 deletion airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ base_log_folder = {AIRFLOW_HOME}/logs
logging_level = INFO
celery_logging_level = WARN
fab_logging_level = WARN
log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log
log_processor_filename_template = {{{{ filename }}}}.log
dag_processor_manager_log_location = {AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log
worker_log_server_port = 8793
Expand Down
15 changes: 12 additions & 3 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,14 +1065,23 @@ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = NEW_SES
return count

@provide_session
def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str:
def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate:
if self.log_template_id is None: # DagRun created before LogTemplate introduction.
template = session.query(LogTemplate.filename).order_by(LogTemplate.id).limit(1).scalar()
template = session.query(LogTemplate).order_by(LogTemplate.id).first()
else:
template = session.query(LogTemplate.filename).filter_by(id=self.log_template_id).scalar()
template = session.query(LogTemplate).get(self.log_template_id)
if template is None:
raise AirflowException(
f"No log_template entry found for ID {self.log_template_id!r}. "
f"Please make sure you set up the metadatabase correctly."
)
return template

@provide_session
def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str:
warnings.warn(
"This method is deprecated. Please use get_log_template instead.",
DeprecationWarning,
stacklevel=2,
)
return self.get_log_template(session=session).filename
2 changes: 1 addition & 1 deletion airflow/providers/alibaba/cloud/log/oss_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
uploads to and reads from OSS remote storage.
"""

def __init__(self, base_log_folder, oss_log_folder, filename_template):
def __init__(self, base_log_folder, oss_log_folder, filename_template=None):
self.log.info("Using oss_task_handler for remote logging...")
super().__init__(base_log_folder, filename_template)
(self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder)
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
import sys
from datetime import datetime
from typing import Optional

import watchtower

Expand All @@ -42,7 +43,7 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
:param filename_template: template for file name (local storage) or log stream name (remote)
"""

def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: str):
def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: Optional[str] = None):
super().__init__(base_log_folder, filename_template)
split_arn = log_group_arn.split(':')

Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/amazon/aws/log/s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import pathlib
import sys
from typing import Optional

if sys.version_info >= (3, 8):
from functools import cached_property
Expand All @@ -36,7 +37,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
uploads to and reads from S3 remote storage.
"""

def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: str):
def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: Optional[str] = None):
super().__init__(base_log_folder, filename_template)
self.remote_base = s3_log_folder
self.log_relative_path = ''
Expand Down
33 changes: 27 additions & 6 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import logging
import sys
import warnings
from collections import defaultdict
from datetime import datetime
from operator import attrgetter
Expand All @@ -31,15 +32,22 @@
from elasticsearch_dsl import Search

from airflow.configuration import conf
from airflow.models import TaskInstance
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.json_formatter import JSONFormatter
from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
from airflow.utils.session import create_session

# Elasticsearch hosted log type
EsLogMsgType = List[Tuple[str, str]]

# Compatibility: Airflow 2.3.3 and up uses this method, which accesses the
# LogTemplate model to record the log ID template used. If this function does
# not exist, the task handler should use the log_id_template attribute instead.
USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")


class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
"""
Expand All @@ -65,8 +73,6 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
def __init__(
self,
base_log_folder: str,
filename_template: str,
log_id_template: str,
end_of_log_mark: str,
write_stdout: bool,
json_format: bool,
Expand All @@ -76,6 +82,9 @@ def __init__(
host: str = "localhost:9200",
frontend: str = "localhost:5601",
es_kwargs: Optional[dict] = conf.getsection("elasticsearch_configs"),
*,
filename_template: Optional[str] = None,
log_id_template: Optional[str] = None,
):
"""
:param base_log_folder: base folder to store logs locally
Expand All @@ -88,7 +97,13 @@ def __init__(

self.client = elasticsearch.Elasticsearch([host], **es_kwargs) # type: ignore[attr-defined]

self.log_id_template = log_id_template
if USE_PER_RUN_LOG_ID and log_id_template is not None:
warnings.warn(
"Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect",
DeprecationWarning,
)

self.log_id_template = log_id_template # Only used on Airflow < 2.3.2.
self.frontend = frontend
self.mark_end_on_close = True
self.end_of_log_mark = end_of_log_mark
Expand All @@ -103,7 +118,13 @@ def __init__(
self.handler: Union[logging.FileHandler, logging.StreamHandler] # type: ignore[assignment]

def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
dag_run = ti.get_dagrun()
with create_session() as session:
dag_run = ti.get_dagrun(session=session)
if USE_PER_RUN_LOG_ID:
log_id_template = dag_run.get_log_template(session=session).elasticsearch_id
else:
log_id_template = self.log_id_template

dag = ti.task.dag
assert dag is not None # For Mypy.
try:
Expand All @@ -126,7 +147,7 @@ def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
data_interval_end = ""
execution_date = dag_run.execution_date.isoformat()

return self.log_id_template.format(
return log_id_template.format(
dag_id=ti.dag_id,
task_id=ti.task_id,
run_id=getattr(ti, "run_id", ""),
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/log/gcs_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(
*,
base_log_folder: str,
gcs_log_folder: str,
filename_template: str,
filename_template: Optional[str] = None,
gcp_key_path: Optional[str] = None,
gcp_keyfile_dict: Optional[dict] = None,
gcp_scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS,
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/microsoft/azure/log/wasb_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ def __init__(
base_log_folder: str,
wasb_log_folder: str,
wasb_container: str,
filename_template: str,
delete_local_copy: str,
*,
filename_template: Optional[str] = None,
) -> None:
super().__init__(base_log_folder, filename_template)
self.wasb_container = wasb_container
Expand Down
26 changes: 18 additions & 8 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""File logging handler for tasks."""
import logging
import os
import warnings
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Tuple
Expand All @@ -28,6 +29,7 @@
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.session import create_session

if TYPE_CHECKING:
from airflow.models import TaskInstance
Expand All @@ -44,11 +46,15 @@ class FileTaskHandler(logging.Handler):
:param filename_template: template filename string
"""

def __init__(self, base_log_folder: str, filename_template: str):
def __init__(self, base_log_folder: str, filename_template: Optional[str] = None):
super().__init__()
self.handler: Optional[logging.FileHandler] = None
self.local_base = base_log_folder
self.filename_template, self.filename_jinja_template = parse_template_string(filename_template)
if filename_template is not None:
warnings.warn(
"Passing filename_template to FileTaskHandler is deprecated and has no effect",
DeprecationWarning,
)

def set_context(self, ti: "TaskInstance"):
"""
Expand All @@ -75,15 +81,19 @@ def close(self):
self.handler.close()

def _render_filename(self, ti: "TaskInstance", try_number: int) -> str:
if self.filename_jinja_template:
with create_session() as session:
dag_run = ti.get_dagrun(session=session)
template = dag_run.get_log_template(session=session).filename
str_tpl, jinja_tpl = parse_template_string(template)

if jinja_tpl:
if hasattr(ti, "task"):
context = ti.get_template_context()
else:
context = Context(ti=ti, ts=ti.get_dagrun().logical_date.isoformat())
context = Context(ti=ti, ts=dag_run.logical_date.isoformat())
context["try_number"] = try_number
return render_template_to_string(self.filename_jinja_template, context)
elif self.filename_template:
dag_run = ti.get_dagrun()
return render_template_to_string(jinja_tpl, context)
elif str_tpl:
dag = ti.task.dag
assert dag is not None # For Mypy.
try:
Expand All @@ -98,7 +108,7 @@ def _render_filename(self, ti: "TaskInstance", try_number: int) -> str:
data_interval_end = data_interval[1].isoformat()
else:
data_interval_end = ""
return self.filename_template.format(
return str_tpl.format(
dag_id=ti.dag_id,
task_id=ti.task_id,
run_id=ti.run_id,
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ def render_log_filename(
attachment_filename = render_log_filename(
ti=ti,
try_number="all" if try_number is None else try_number,
filename_template=dagrun.get_log_filename_template(session=session),
filename_template=dagrun.get_log_template(session=session).filename,
)
return attachment_filename
8 changes: 4 additions & 4 deletions tests/api_connexion/endpoints/test_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def setup_attrs(self, configured_app, configure_loggers, dag_maker, session) ->
self.ti.hostname = 'localhost'

@pytest.fixture
def configure_loggers(self, tmp_path):
def configure_loggers(self, tmp_path, create_log_template):
self.log_dir = tmp_path

dir_path = tmp_path / self.DAG_ID / self.TASK_ID / self.default_time.replace(':', '.')
Expand All @@ -112,9 +112,9 @@ def configure_loggers(self, tmp_path):
logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
logging_config['handlers']['task']['base_log_folder'] = self.log_dir

logging_config['handlers']['task'][
'filename_template'
] = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log'
create_log_template(
'{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log'
)

logging.config.dictConfig(logging_config)

Expand Down
21 changes: 21 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
os.environ["AWS_DEFAULT_REGION"] = os.environ.get("AWS_DEFAULT_REGION") or "us-east-1"
os.environ["CREDENTIALS_DIR"] = os.environ.get('CREDENTIALS_DIR') or "/files/airflow-breeze-config/keys"

from airflow import settings # noqa: E402
from airflow.models.tasklog import LogTemplate # noqa: E402

from tests.test_utils.perf.perf_kit.sqlalchemy import ( # noqa isort:skip
count_queries,
trace_queries,
Expand Down Expand Up @@ -769,3 +772,21 @@ def _get(dag_id):
return dag

return _get


@pytest.fixture()
def create_log_template(request):
session = settings.Session()

def _create_log_template(filename_template, elasticsearch_id=""):
log_template = LogTemplate(filename=filename_template, elasticsearch_id=elasticsearch_id)
session.add(log_template)
session.commit()

def _delete_log_template():
session.delete(log_template)
session.commit()

request.addfinalizer(_delete_log_template)

return _create_log_template
5 changes: 1 addition & 4 deletions tests/providers/alibaba/cloud/log/test_oss_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ class TestOSSTaskHandler(unittest.TestCase):
def setUp(self):
self.base_log_folder = 'local/airflow/logs/1.log'
self.oss_log_folder = f'oss://{MOCK_BUCKET_NAME}/airflow/logs'
self.filename_template = '{try_number}.log'
self.oss_task_handler = OSSTaskHandler(
self.base_log_folder, self.oss_log_folder, self.filename_template
)
self.oss_task_handler = OSSTaskHandler(self.base_log_folder, self.oss_log_folder)

@mock.patch(OSS_TASK_HANDLER_STRING.format('conf.get'))
@mock.patch(OSS_TASK_HANDLER_STRING.format('OSSHook'))
Expand Down
Loading

0 comments on commit c238269

Please sign in to comment.