diff --git a/UPDATING.md b/UPDATING.md index d50c123cf5fe6..61a6d5c81586b 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -281,6 +281,19 @@ if foo is None: This changes the behaviour if you previously explicitly provided `None` as a default value. If your code expects a `KeyError` to be thrown, then don't pass the `default_var` argument. +### Removal of `airflow_home` config setting + +There were previously two ways of specifying the Airflow "home" directory +(`~/airflow` by default): the `AIRFLOW_HOME` environment variable, and the +`airflow_home` config setting in the `[core]` section. + +If they had two different values different parts of the code base would end up +with different values. The config setting has been deprecated, and you should +remove the value from the config file and set `AIRFLOW_HOME` environment +variable if you need to use a non default value for this. + +(Since this setting is used to calculate what config file to load, it is not +possible to keep just the config option) ## Airflow 1.10.2 diff --git a/airflow/__init__.py b/airflow/__init__.py index c9b0bdd678ddd..e663f160f555e 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -37,8 +37,7 @@ from airflow.models import DAG from airflow.exceptions import AirflowException -if settings.DAGS_FOLDER not in sys.path: - sys.path.append(settings.DAGS_FOLDER) # type: ignore +settings.initialize() login = None diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 487f15af63f58..d6819aa7e0aac 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -117,17 +117,13 @@ def setup_logging(filename): def setup_locations(process, pid=None, stdout=None, stderr=None, log=None): if not stderr: - stderr = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), - 'airflow-{}.err'.format(process)) + stderr = os.path.join(settings.AIRFLOW_HOME, 'airflow-{}.err'.format(process)) if not stdout: - stdout = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), - 'airflow-{}.out'.format(process)) + stdout = os.path.join(settings.AIRFLOW_HOME, 'airflow-{}.out'.format(process)) if not log: - log = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), - 'airflow-{}.log'.format(process)) + log = os.path.join(settings.AIRFLOW_HOME, 'airflow-{}.log'.format(process)) if not pid: - pid = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME), - 'airflow-{}.pid'.format(process)) + pid = os.path.join(settings.AIRFLOW_HOME, 'airflow-{}.pid'.format(process)) return pid, stdout, stderr, log diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 34d809d8ac968..c342763ca059d 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -30,9 +30,6 @@ # ----------------------- TEMPLATE BEGINS HERE ----------------------- [core] -# The home folder for airflow, default is ~/airflow -airflow_home = {AIRFLOW_HOME} - # The folder where your airflow pipelines live, most likely a # subfolder in a code repository # This path must be absolute diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 0429af3cad2a3..5361cffe9b9cb 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -31,7 +31,6 @@ [core] unit_test_mode = True -airflow_home = {AIRFLOW_HOME} dags_folder = {TEST_DAGS_FOLDER} plugins_folder = {TEST_PLUGINS_FOLDER} base_log_folder = {AIRFLOW_HOME}/logs diff --git a/airflow/configuration.py b/airflow/configuration.py index 827303d102580..c9fdf22bbd91d 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -544,12 +544,31 @@ def parameterized_config(template): conf.read(AIRFLOW_CONFIG) -DEFAULT_WEBSERVER_CONFIG = _read_default_config_file('default_webserver_config.py') +if conf.has_option('core', 'AIRFLOW_HOME'): + msg = ( + 'Specifying both AIRFLOW_HOME environment variable and airflow_home ' + 'in the config file is deprecated. Please use only the AIRFLOW_HOME ' + 'environment variable and remove the config file entry.' + ) + if 'AIRFLOW_HOME' in os.environ: + warnings.warn(msg, category=DeprecationWarning) + elif conf.get('core', 'airflow_home') == AIRFLOW_HOME: + warnings.warn( + 'Specifying airflow_home in the config file is deprecated. As you ' + 'have left it at the default value you should remove the setting ' + 'from your airflow.cfg and suffer no change in behaviour.', + category=DeprecationWarning, + ) + else: + AIRFLOW_HOME = conf.get('core', 'airflow_home') + warnings.warn(msg, category=DeprecationWarning) + WEBSERVER_CONFIG = AIRFLOW_HOME + '/webserver_config.py' if not os.path.isfile(WEBSERVER_CONFIG): log.info('Creating new FAB webserver config file in: %s', WEBSERVER_CONFIG) + DEFAULT_WEBSERVER_CONFIG = _read_default_config_file('default_webserver_config.py') with open(WEBSERVER_CONFIG, 'w') as f: f.write(DEFAULT_WEBSERVER_CONFIG) diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 17f7e9a2331f1..38abc5ed85a08 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -36,7 +36,7 @@ from airflow.models.kubernetes import KubeResourceVersion, KubeWorkerIdentifier from airflow.utils.state import State from airflow.utils.db import provide_session, create_session -from airflow import configuration +from airflow import configuration, settings from airflow.exceptions import AirflowConfigException, AirflowException from airflow.utils.log.logging_mixin import LoggingMixin @@ -125,7 +125,7 @@ def __init__(self): self.core_configuration = configuration_dict['core'] self.kube_secrets = configuration_dict.get('kubernetes_secrets', {}) self.kube_env_vars = configuration_dict.get('kubernetes_environment_variables', {}) - self.airflow_home = configuration.get(self.core_section, 'airflow_home') + self.airflow_home = settings.AIRFLOW_HOME self.dags_folder = configuration.get(self.core_section, 'dags_folder') self.parallelism = configuration.getint(self.core_section, 'PARALLELISM') self.worker_container_repository = configuration.get( diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 2a1d7e58c18d7..e8bc7384a0f82 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -140,7 +140,7 @@ def _get_environment(self): env["AIRFLOW__CORE__EXECUTOR"] = "LocalExecutor" if self.kube_config.airflow_configmap: - env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home + env['AIRFLOW_HOME'] = self.worker_airflow_home env['AIRFLOW__CORE__DAGS_FOLDER'] = self.worker_airflow_dags if (not self.kube_config.airflow_configmap and 'AIRFLOW__CORE__SQL_ALCHEMY_CONN' not in self.kube_config.kube_secrets): diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py index 20c4bb64702ed..124cbabbd9a3d 100644 --- a/airflow/lineage/__init__.py +++ b/airflow/lineage/__init__.py @@ -21,7 +21,7 @@ from airflow import configuration as conf from airflow.lineage.datasets import DataSet from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.module_loading import import_string, prepare_classpath +from airflow.utils.module_loading import import_string from itertools import chain @@ -36,7 +36,6 @@ def _get_backend(): try: _backend_str = conf.get("lineage", "backend") - prepare_classpath() backend = import_string(_backend_str) except ImportError as ie: log.debug("Cannot import %s due to %s", _backend_str, ie) diff --git a/airflow/logging_config.py b/airflow/logging_config.py index 4865f0567a571..0cf8f4db0d59b 100644 --- a/airflow/logging_config.py +++ b/airflow/logging_config.py @@ -23,7 +23,7 @@ from airflow import configuration as conf from airflow.exceptions import AirflowConfigException -from airflow.utils.module_loading import import_string, prepare_classpath +from airflow.utils.module_loading import import_string log = logging.getLogger(__name__) @@ -31,10 +31,6 @@ def configure_logging(): logging_class_path = '' try: - # Prepare the classpath so we are sure that the config folder - # is on the python classpath and it is reachable - prepare_classpath() - logging_class_path = conf.get('core', 'logging_config_class') except AirflowConfigException: log.debug('Could not find key logging_config_class in config') diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 3ebecd78f513c..f942ddd90cc03 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -27,11 +27,10 @@ import inspect import os import re -import sys import pkg_resources from typing import List, Any -from airflow import configuration +from airflow import settings from airflow.utils.log.logging_mixin import LoggingMixin log = LoggingMixin().log @@ -122,20 +121,12 @@ def is_valid_plugin(plugin_obj, existing_plugins): return False -plugins_folder = configuration.conf.get('core', 'plugins_folder') -if not plugins_folder: - plugins_folder = configuration.conf.get('core', 'airflow_home') + '/plugins' -plugins_folder = os.path.expanduser(plugins_folder) - -if plugins_folder not in sys.path: - sys.path.append(plugins_folder) - plugins = [] # type: List[AirflowPlugin] norm_pattern = re.compile(r'[/|.]') # Crawl through the plugins folder to find AirflowPlugin derivatives -for root, dirs, files in os.walk(plugins_folder, followlinks=True): +for root, dirs, files in os.walk(settings.PLUGINS_FOLDER, followlinks=True): for f in files: try: filepath = os.path.join(root, f) diff --git a/airflow/settings.py b/airflow/settings.py index c8e0b77b5eed2..f176846c1f95b 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -26,13 +26,13 @@ import logging import os import pendulum - +import sys from sqlalchemy import create_engine, exc from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.pool import NullPool -from airflow import configuration as conf +from airflow.configuration import conf, AIRFLOW_HOME, WEBSERVER_CONFIG # NOQA F401 from airflow.logging_config import configure_logging from airflow.utils.sqlalchemy import setup_event_handlers @@ -67,9 +67,10 @@ LOG_FORMAT = conf.get('core', 'log_format') SIMPLE_LOG_FORMAT = conf.get('core', 'simple_log_format') -AIRFLOW_HOME = None SQL_ALCHEMY_CONN = None DAGS_FOLDER = None +PLUGINS_FOLDER = None +LOGGING_CLASS_PATH = None engine = None Session = None @@ -103,13 +104,18 @@ def policy(task_instance): def configure_vars(): - global AIRFLOW_HOME global SQL_ALCHEMY_CONN global DAGS_FOLDER - AIRFLOW_HOME = os.path.expanduser(conf.get('core', 'AIRFLOW_HOME')) + global PLUGINS_FOLDER SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN') DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) + PLUGINS_FOLDER = conf.get( + 'core', + 'plugins_folder', + fallback=os.path.join(AIRFLOW_HOME, 'plugins') + ) + def configure_orm(disable_connection_pool=False): log.debug("Setting up DB connection pool (PID %s)" % os.getpid()) @@ -216,21 +222,44 @@ def configure_action_logging(): pass +def prepare_classpath(): + """ + Ensures that certain subfolders of AIRFLOW_HOME are on the classpath + """ + + if DAGS_FOLDER not in sys.path: + sys.path.append(DAGS_FOLDER) + + # Add ./config/ for loading custom log parsers etc, or + # airflow_local_settings etc. + config_path = os.path.join(AIRFLOW_HOME, 'config') + if config_path not in sys.path: + sys.path.append(config_path) + + if PLUGINS_FOLDER not in sys.path: + sys.path.append(PLUGINS_FOLDER) + + try: from airflow_local_settings import * # noqa F403 F401 log.info("Loaded airflow_local_settings.") except Exception: pass -logging_class_path = configure_logging() -configure_vars() -configure_adapters() -# The webservers import this file from models.py with the default settings. -configure_orm() -configure_action_logging() -# Ensure we close DB connections at scheduler and gunicon worker terminations -atexit.register(dispose_orm) +def initialize(): + configure_vars() + prepare_classpath() + global LOGGING_CLASS_PATH + LOGGING_CLASS_PATH = configure_logging() + configure_adapters() + # The webservers import this file from models.py with the default settings. + configure_orm() + configure_action_logging() + + # Ensure we close DB connections at scheduler and gunicon worker terminations + atexit.register(dispose_orm) + # Const stuff diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 950c394e2649d..f0adae27b4b24 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -48,7 +48,6 @@ from airflow.dag.base_dag import BaseDag, BaseDagBag from airflow.exceptions import AirflowException from airflow.models import errors -from airflow.settings import logging_class_path from airflow.stats import Stats from airflow.utils import timezone from airflow.utils.db import provide_session @@ -548,8 +547,9 @@ def helper(): os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True' # Replicating the behavior of how logging module was loaded # in logging_config.py - reload_module(import_module(logging_class_path.rsplit('.', 1)[0])) + reload_module(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit('.', 1)[0])) reload_module(airflow.settings) + airflow.settings.initialize() del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] processor_manager = DagFileProcessorManager(dag_directory, file_paths, diff --git a/airflow/utils/log/file_processor_handler.py b/airflow/utils/log/file_processor_handler.py index 8b0bc978e1aa3..fb7b2fad32013 100644 --- a/airflow/utils/log/file_processor_handler.py +++ b/airflow/utils/log/file_processor_handler.py @@ -21,7 +21,7 @@ import logging import os -from airflow import configuration as conf +from airflow import settings from airflow.utils.helpers import parse_template_string from datetime import datetime @@ -41,7 +41,7 @@ def __init__(self, base_log_folder, filename_template): super(FileProcessorHandler, self).__init__() self.handler = None self.base_log_folder = base_log_folder - self.dag_dir = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) + self.dag_dir = os.path.expanduser(settings.DAGS_FOLDER) self.filename_template, self.filename_jinja_template = \ parse_template_string(filename_template) diff --git a/airflow/utils/module_loading.py b/airflow/utils/module_loading.py index 51c6adf4e642c..d5b4971b736dd 100644 --- a/airflow/utils/module_loading.py +++ b/airflow/utils/module_loading.py @@ -16,24 +16,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import os -import sys -from airflow import configuration as conf from importlib import import_module -def prepare_classpath(): - """ - Ensures that the Airflow home directory is on the classpath - """ - config_path = os.path.join(conf.get('core', 'airflow_home'), 'config') - config_path = os.path.expanduser(config_path) - - if config_path not in sys.path: - sys.path.append(config_path) - - def import_string(dotted_path): """ Import a dotted module path and return the attribute/class designated by the diff --git a/airflow/www/app.py b/airflow/www/app.py index f8723cc8de648..449c2d7d2e191 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -18,6 +18,7 @@ # under the License. # import logging +import os import socket from typing import Any @@ -50,9 +51,7 @@ def create_app(config=None, session=None, testing=False, app_name="Airflow"): app.wsgi_app = ProxyFix(app.wsgi_app) app.secret_key = conf.get('webserver', 'SECRET_KEY') - airflow_home_path = conf.get('core', 'AIRFLOW_HOME') - webserver_config_path = airflow_home_path + '/webserver_config.py' - app.config.from_pyfile(webserver_config_path, silent=True) + app.config.from_pyfile(settings.WEBSERVER_CONFIG, silent=True) app.config['APP_NAME'] = app_name app.config['TESTING'] = testing app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg index f9c3181407d21..f630655e3fe54 100644 --- a/scripts/ci/airflow_travis.cfg +++ b/scripts/ci/airflow_travis.cfg @@ -17,7 +17,6 @@ # under the License. [core] -airflow_home = ~/airflow dags_folder = ~/airflow/dags base_log_folder = ~/airflow/logs executor = LocalExecutor diff --git a/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml b/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml index 45b6f855b2b4a..e83884f012e23 100644 --- a/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml +++ b/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml @@ -21,7 +21,6 @@ metadata: data: airflow.cfg: | [core] - airflow_home = /root/airflow dags_folder = {{CONFIGMAP_DAGS_FOLDER}} base_log_folder = /root/airflow/logs logging_level = INFO diff --git a/tests/cli/test_worker_initialisation.py b/tests/cli/test_worker_initialisation.py index 8baa787a61b37..d6a8fd4404323 100644 --- a/tests/cli/test_worker_initialisation.py +++ b/tests/cli/test_worker_initialisation.py @@ -53,7 +53,7 @@ def test_error(self, mock_validate_session): cli.worker(mock_args) self.assertEqual(cm.exception.code, 1) - @mock.patch('airflow.configuration.getboolean') + @mock.patch('airflow.configuration.conf.getboolean') def test_worker_precheck_exception(self, mock_getboolean): """ Test to check the behaviour of validate_session method @@ -65,7 +65,7 @@ def test_worker_precheck_exception(self, mock_getboolean): mock_getboolean.assert_called_once_with('core', 'worker_precheck', fallback=False) @mock.patch('sqlalchemy.orm.session.Session.execute') - @mock.patch('airflow.configuration.getboolean') + @mock.patch('airflow.configuration.conf.getboolean') def test_validate_session_dbapi_exception(self, mock_getboolean, mock_session): """ Test to validate connection failure scenario on SELECT 1 query