Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use named loggers instead of root logger #37801

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/example_dags/example_python_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def print_context(ds=None, **kwargs):
# [START howto_operator_python_render_sql]
@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
logging.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = log_sql()
# [END howto_operator_python_render_sql]
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def print_context(ds=None, **kwargs):

# [START howto_operator_python_render_sql]
def log_sql(**kwargs):
logging.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = PythonOperator(
task_id="log_sql_query",
Expand Down
4 changes: 2 additions & 2 deletions airflow/metrics/otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def _create_counter(self, name):
else:
counter = self.meter.create_counter(name=otel_safe_name)

logging.debug("Created %s as type: %s", otel_safe_name, _type_as_str(counter))
log.debug("Created %s as type: %s", otel_safe_name, _type_as_str(counter))
return counter

def get_counter(self, name: str, attributes: Attributes = None):
Expand Down Expand Up @@ -400,7 +400,7 @@ def get_otel_logger(cls) -> SafeOtelLogger:
protocol = "https" if ssl_active else "http"
endpoint = f"{protocol}://{host}:{port}/v1/metrics"

logging.info("[Metric Exporter] Connecting to OpenTelemetry Collector at %s", endpoint)
log.info("[Metric Exporter] Connecting to OpenTelemetry Collector at %s", endpoint)
readers = [
PeriodicExportingMetricReader(
OTLPMetricExporter(
Expand Down
8 changes: 5 additions & 3 deletions airflow/providers/amazon/aws/auth_manager/views/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
"pip install apache-airflow-providers-amazon[python3-saml]"
)

logger = logging.getLogger(__name__)


class AwsAuthManagerAuthenticationViews(AirflowBaseView):
"""
Expand Down Expand Up @@ -81,9 +83,9 @@ def login_callback(self):
is_authenticated = saml_auth.is_authenticated()
if not is_authenticated:
error_reason = saml_auth.get_last_error_reason()
logging.error("Failed to authenticate")
logging.error("Errors: %s", errors)
logging.error("Error reason: %s", error_reason)
logger.error("Failed to authenticate")
logger.error("Errors: %s", errors)
logger.error("Error reason: %s", error_reason)
raise AirflowException(f"Failed to authenticate: {error_reason}")

attributes = saml_auth.get_attributes()
Expand Down
3 changes: 1 addition & 2 deletions airflow/sensors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import datetime
import functools
import hashlib
import logging
import time
import traceback
from datetime import timedelta
Expand Down Expand Up @@ -257,7 +256,7 @@ def run_duration() -> float:
raise e
except Exception as e:
if self.silent_fail:
logging.error("Sensor poke failed: \n %s", traceback.format_exc())
self.log.error("Sensor poke failed: \n %s", traceback.format_exc())
poke_return = False
elif self.soft_fail:
raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
Expand Down
2 changes: 1 addition & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ def get_session_lifetime_config():
session_lifetime_days = 30
session_lifetime_minutes = minutes_per_day * session_lifetime_days

logging.debug("User session lifetime is set to %s minutes.", session_lifetime_minutes)
log.debug("User session lifetime is set to %s minutes.", session_lifetime_minutes)

return int(session_lifetime_minutes)

Expand Down
18 changes: 10 additions & 8 deletions airflow/utils/cli_action_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import logging
from typing import Callable

logger = logging.getLogger(__name__)


def register_pre_exec_callback(action_logger):
"""Register more action_logger function callback for pre-execution.
Expand All @@ -38,7 +40,7 @@ def register_pre_exec_callback(action_logger):
:param action_logger: An action logger function
:return: None
"""
logging.debug("Adding %s to pre execution callback", action_logger)
logger.debug("Adding %s to pre execution callback", action_logger)
__pre_exec_callbacks.append(action_logger)


Expand All @@ -52,7 +54,7 @@ def register_post_exec_callback(action_logger):
:param action_logger: An action logger function
:return: None
"""
logging.debug("Adding %s to post execution callback", action_logger)
logger.debug("Adding %s to post execution callback", action_logger)
__post_exec_callbacks.append(action_logger)


Expand All @@ -64,12 +66,12 @@ def on_pre_execution(**kwargs):
:param kwargs:
:return: None
"""
logging.debug("Calling callbacks: %s", __pre_exec_callbacks)
logger.debug("Calling callbacks: %s", __pre_exec_callbacks)
for callback in __pre_exec_callbacks:
try:
callback(**kwargs)
except Exception:
logging.exception("Failed on pre-execution callback using %s", callback)
logger.exception("Failed on pre-execution callback using %s", callback)


def on_post_execution(**kwargs):
Expand All @@ -82,12 +84,12 @@ def on_post_execution(**kwargs):
:param kwargs:
:return: None
"""
logging.debug("Calling callbacks: %s", __post_exec_callbacks)
logger.debug("Calling callbacks: %s", __post_exec_callbacks)
for callback in __post_exec_callbacks:
try:
callback(**kwargs)
except Exception:
logging.exception("Failed on post-execution callback using %s", callback)
logger.exception("Failed on post-execution callback using %s", callback)


def default_action_log(sub_command, user, task_id, dag_id, execution_date, host_name, full_command, **_):
Expand Down Expand Up @@ -131,9 +133,9 @@ def default_action_log(sub_command, user, task_id, dag_id, execution_date, host_
]
error_is_ok = e.args and any(x in e.args[0] for x in expected)
if not error_is_ok:
logging.warning("Failed to log action %s", e)
logger.warning("Failed to log action %s", e)
except Exception as e:
logging.warning("Failed to log action %s", e)
logger.warning("Failed to log action %s", e)


__pre_exec_callbacks: list[Callable] = []
Expand Down
6 changes: 4 additions & 2 deletions airflow/utils/log/file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler

logger = logging.getLogger(__name__)


class FileProcessorHandler(logging.Handler):
"""
Expand Down Expand Up @@ -124,13 +126,13 @@ def _symlink_latest_log_directory(self):
os.unlink(latest_log_directory_path)
os.symlink(rel_link_target, latest_log_directory_path)
elif os.path.isdir(latest_log_directory_path) or os.path.isfile(latest_log_directory_path):
logging.warning(
logger.warning(
"%s already exists as a dir/file. Skip creating symlink.", latest_log_directory_path
)
else:
os.symlink(rel_link_target, latest_log_directory_path)
except OSError:
logging.warning("OSError while attempting to symlink the latest log directory")
logger.warning("OSError while attempting to symlink the latest log directory")

def _init_file(self, filename):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ def _init_file(self, ti, *, identifier: str | None = None):
try:
os.chmod(full_path, new_file_permissions)
except OSError as e:
logging.warning("OSError while changing ownership of the log file. ", e)
logger.warning("OSError while changing ownership of the log file. ", e)

return full_path

Expand Down
5 changes: 4 additions & 1 deletion airflow/utils/log/task_handler_with_custom_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
from airflow.models.taskinstance import TaskInstance


logger = logging.getLogger(__name__)


class TaskHandlerWithCustomFormatter(logging.StreamHandler):
"""Custom implementation of StreamHandler, a class which writes logging records for Airflow."""

Expand Down Expand Up @@ -58,5 +61,5 @@ def _render_prefix(self, ti: TaskInstance) -> str:
if self.prefix_jinja_template:
jinja_context = ti.get_template_context()
return render_template_to_string(self.prefix_jinja_template, jinja_context)
logging.warning("'task_log_prefix_template' is in invalid format, ignoring the variable value")
logger.warning("'task_log_prefix_template' is in invalid format, ignoring the variable value")
return ""
4 changes: 3 additions & 1 deletion airflow/www/extensions/init_jinja_globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from airflow.utils.platform import get_airflow_git_version
from airflow.www.extensions.init_auth_manager import get_auth_manager

logger = logging.getLogger(__name__)


def init_jinja_globals(app):
"""Add extra globals variable to Jinja context."""
Expand All @@ -51,7 +53,7 @@ def init_jinja_globals(app):
airflow_version = airflow.__version__
except Exception as e:
airflow_version = None
logging.error(e)
logger.error(e)

git_version = get_airflow_git_version()

Expand Down
20 changes: 11 additions & 9 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@

SENSITIVE_FIELD_PLACEHOLDER = "RATHER_LONG_SENSITIVE_FIELD_PLACEHOLDER"

logger = logging.getLogger(__name__)


def sanitize_args(args: dict[str, str]) -> dict[str, str]:
"""
Expand Down Expand Up @@ -1358,7 +1360,7 @@ def rendered_templates(self, session):
form = DateTimeForm(data={"execution_date": dttm})
root = request.args.get("root", "")

logging.info("Retrieving rendered templates.")
logger.info("Retrieving rendered templates.")
dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
dag_run = dag.get_dagrun(execution_date=dttm)
raw_task = dag.get_task(task_id).prepare_for_execution()
Expand Down Expand Up @@ -1477,14 +1479,14 @@ def rendered_k8s(self, *, session: Session = NEW_SESSION):
dag_id = request.args.get("dag_id")
task_id = request.args.get("task_id")
if task_id is None:
logging.warning("Task id not passed in the request")
logger.warning("Task id not passed in the request")
abort(400)
execution_date = request.args.get("execution_date")
dttm = _safe_parse_datetime(execution_date)
form = DateTimeForm(data={"execution_date": dttm})
root = request.args.get("root", "")
map_index = request.args.get("map_index", -1, type=int)
logging.info("Retrieving rendered templates.")
logger.info("Retrieving rendered templates.")

dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
task = dag.get_task(task_id)
Expand Down Expand Up @@ -4006,7 +4008,7 @@ def delete(self, pk):
# Maintains compatibility but refuses to delete on GET methods if CSRF is enabled
if not self.is_get_mutation_allowed():
self.update_redirect()
logging.warning("CSRF is enabled and a delete using GET was invoked")
logger.warning("CSRF is enabled and a delete using GET was invoked")
flash(as_unicode(FLAMSG_ERR_SEC_ACCESS_DENIED), "danger")
return self.post_delete_redirect()
pk = self._deserialize_pk_if_composite(pk)
Expand Down Expand Up @@ -4507,7 +4509,7 @@ def prefill_form(self, form, pk):
extra_dictionary = {}

if not isinstance(extra_dictionary, dict):
logging.warning("extra field for %s is not a dictionary", form.data.get("conn_id", "<unknown>"))
logger.warning("extra field for %s is not a dictionary", form.data.get("conn_id", "<unknown>"))
return

for field_key, field_name, is_sensitive in self._iter_extra_field_names_and_sensitivity():
Expand Down Expand Up @@ -4894,20 +4896,20 @@ def varimport(self, session):
if action_on_existing == "fail" and existing_keys:
failed_repr = ", ".join(repr(k) for k in sorted(existing_keys))
flash(f"Failed. The variables with these keys: {failed_repr} already exists.")
logging.error(f"Failed. The variables with these keys: {failed_repr} already exists.")
logger.error(f"Failed. The variables with these keys: {failed_repr} already exists.")
self.update_redirect()
return redirect(self.get_redirect())
skipped = set()
suc_count = fail_count = 0
for k, v in variable_dict.items():
if action_on_existing == "skip" and k in existing_keys:
logging.warning("Variable: %s already exists, skipping.", k)
logger.warning("Variable: %s already exists, skipping.", k)
skipped.add(k)
continue
try:
models.Variable.set(k, v, serialize_json=not isinstance(v, str))
except Exception as exc:
logging.info("Variable import failed: %r", exc)
logger.info("Variable import failed: %r", exc)
fail_count += 1
else:
suc_count += 1
Expand Down Expand Up @@ -5841,7 +5843,7 @@ def add_user_permissions_to_dag(sender, template, context, **extra):
def restrict_to_dev(f):
def wrapper(*args, **kwargs):
if not os.environ.get("AIRFLOW_ENV", None) == "development":
logging.error(
logger.error(
"You can only access this view in development mode. Set AIRFLOW_ENV=development to view it."
)
return abort(404)
Expand Down
10 changes: 10 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,16 @@ banned-module-level-imports = ["numpy", "pandas"]
"flask.escape".msg = "Use markupsafe.escape instead. Deprecated in Flask 2.3, removed in Flask 3.0"
"flask.Markup".msg = "Use markupsafe.Markup instead. Deprecated in Flask 2.3, removed in Flask 3.0"
"flask.signals_available".msg = "Signals are always available. Deprecated in Flask 2.3, removed in Flask 3.0"
# Use root logger by a mistake / IDE autosuggestion
# If for some reason root logger required it could obtained by logging.getLogger("root")
"logging.debug".msg = "Instantiate new `logger = logging.getLogger(__name__)` and use it instead of root logger"
"logging.info".msg = "Instantiate new `logger = logging.getLogger(__name__)` and use it instead of root logger"
"logging.warning".msg = "Instantiate new `logger = logging.getLogger(__name__)` and use it instead of root logger"
"logging.error".msg = "Instantiate new `logger = logging.getLogger(__name__)` and use it instead of root logger"
"logging.exception".msg = "Instantiate new `logger = logging.getLogger(__name__)` and use it instead of root logger"
"logging.fatal".msg = "Instantiate new `logger = logging.getLogger(__name__)` and use it instead of root logger"
"logging.critical".msg = "Instantiate new `logger = logging.getLogger(__name__)` and use it instead of root logger"
"logging.log".msg = "Instantiate new `logger = logging.getLogger(__name__)` and use it instead of root logger"
# Some specific cases
"unittest.TestCase".msg = "Use pytest compatible classes"

Expand Down
15 changes: 7 additions & 8 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@

pytestmark = pytest.mark.db_test


logger = logging.getLogger(__name__)
TEST_DAG_FOLDER = pathlib.Path(__file__).parents[1].resolve() / "dags"

DEFAULT_DATE = timezone.datetime(2016, 1, 1)


Expand Down Expand Up @@ -902,7 +901,7 @@ def keep_pipe_full(pipe, exit_event):
break

req = CallbackRequest(str(dag_filepath))
logging.info("Sending CallbackRequests %d", n)
logger.info("Sending CallbackRequests %d", n)
try:
pipe.send(req)
except TypeError:
Expand All @@ -911,7 +910,7 @@ def keep_pipe_full(pipe, exit_event):
break
except OSError:
break
logging.debug(" Sent %d CallbackRequests", n)
logger.debug(" Sent %d CallbackRequests", n)

thread = threading.Thread(target=keep_pipe_full, args=(parent_pipe, exit_event))

Expand Down Expand Up @@ -943,13 +942,13 @@ def fake_processor_(*args, **kwargs):
manager._run_parsing_loop()
exit_event.set()
finally:
logging.info("Closing pipes")
logger.info("Closing pipes")
parent_pipe.close()
child_pipe.close()
logging.info("Closed pipes")
logging.info("Joining thread")
logger.info("Closed pipes")
logger.info("Joining thread")
thread.join(timeout=1.0)
logging.info("Joined thread")
logger.info("Joined thread")

@conf_vars({("core", "load_examples"): "False"})
@mock.patch("airflow.dag_processing.manager.Stats.timing")
Expand Down
3 changes: 2 additions & 1 deletion tests/dags/test_task_view_type_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator

logger = logging.getLogger(__name__)
DEFAULT_DATE = datetime(2016, 1, 1)
default_args = dict(start_date=DEFAULT_DATE, owner="airflow")

Expand All @@ -47,7 +48,7 @@ def a_function(_, __):
partial_function = functools.partial(a_function, arg_x=1)
class_instance = CallableClass()

logging.info("class_instance type: %s", type(class_instance))
logger.info("class_instance type: %s", type(class_instance))

dag = DAG(dag_id="test_task_view_type_check", default_args=default_args)

Expand Down
Loading