Skip to content

Commit

Permalink
[AIRFLOW-3743] Unify different methods of working out AIRFLOW_HOME (a…
Browse files Browse the repository at this point in the history
…pache#4705)

There were a few ways of getting the AIRFLOW_HOME directory used
throughout the code base, giving possibly conflicting answer if they
weren't kept in sync:

- the AIRFLOW_HOME environment variable
- core/airflow_home from the config
- settings.AIRFLOW_HOME
- configuration.AIRFLOW_HOME

Since the home directory is used to compute the default path of the
config file to load, specifying the home directory Again in the config
file didn't make any sense to me, and I have deprecated that.

This commit makes everything in the code base use
`settings.AIRFLOW_HOME` as the source of truth, and deprecates the
core/airflow_home config option.

There was an import cycle form settings -> logging_config ->
module_loading -> settings that needed to be broken on Python 2 - so I
have moved all adjusting of sys.path in to the settings module

(This issue caused me a problem where the RBAC UI wouldn't work as it
didn't find the right webserver_config.py)
  • Loading branch information
ashb authored and andriisoldatenko committed Jul 26, 2019
1 parent 0bb709e commit 42b4003
Show file tree
Hide file tree
Showing 19 changed files with 95 additions and 74 deletions.
13 changes: 13 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,19 @@ if foo is None:

This changes the behaviour if you previously explicitly provided `None` as a default value. If your code expects a `KeyError` to be thrown, then don't pass the `default_var` argument.

### Removal of `airflow_home` config setting

There were previously two ways of specifying the Airflow "home" directory
(`~/airflow` by default): the `AIRFLOW_HOME` environment variable, and the
`airflow_home` config setting in the `[core]` section.

If they had two different values different parts of the code base would end up
with different values. The config setting has been deprecated, and you should
remove the value from the config file and set `AIRFLOW_HOME` environment
variable if you need to use a non default value for this.

(Since this setting is used to calculate what config file to load, it is not
possible to keep just the config option)

## Airflow 1.10.2

Expand Down
3 changes: 1 addition & 2 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
from airflow.models import DAG
from airflow.exceptions import AirflowException

if settings.DAGS_FOLDER not in sys.path:
sys.path.append(settings.DAGS_FOLDER) # type: ignore
settings.initialize()

login = None

Expand Down
12 changes: 4 additions & 8 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,13 @@ def setup_logging(filename):

def setup_locations(process, pid=None, stdout=None, stderr=None, log=None):
if not stderr:
stderr = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME),
'airflow-{}.err'.format(process))
stderr = os.path.join(settings.AIRFLOW_HOME, 'airflow-{}.err'.format(process))
if not stdout:
stdout = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME),
'airflow-{}.out'.format(process))
stdout = os.path.join(settings.AIRFLOW_HOME, 'airflow-{}.out'.format(process))
if not log:
log = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME),
'airflow-{}.log'.format(process))
log = os.path.join(settings.AIRFLOW_HOME, 'airflow-{}.log'.format(process))
if not pid:
pid = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME),
'airflow-{}.pid'.format(process))
pid = os.path.join(settings.AIRFLOW_HOME, 'airflow-{}.pid'.format(process))

return pid, stdout, stderr, log

Expand Down
3 changes: 0 additions & 3 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
# ----------------------- TEMPLATE BEGINS HERE -----------------------

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = {AIRFLOW_HOME}

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
Expand Down
1 change: 0 additions & 1 deletion airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

[core]
unit_test_mode = True
airflow_home = {AIRFLOW_HOME}
dags_folder = {TEST_DAGS_FOLDER}
plugins_folder = {TEST_PLUGINS_FOLDER}
base_log_folder = {AIRFLOW_HOME}/logs
Expand Down
21 changes: 20 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,12 +544,31 @@ def parameterized_config(template):

conf.read(AIRFLOW_CONFIG)

DEFAULT_WEBSERVER_CONFIG = _read_default_config_file('default_webserver_config.py')
if conf.has_option('core', 'AIRFLOW_HOME'):
msg = (
'Specifying both AIRFLOW_HOME environment variable and airflow_home '
'in the config file is deprecated. Please use only the AIRFLOW_HOME '
'environment variable and remove the config file entry.'
)
if 'AIRFLOW_HOME' in os.environ:
warnings.warn(msg, category=DeprecationWarning)
elif conf.get('core', 'airflow_home') == AIRFLOW_HOME:
warnings.warn(
'Specifying airflow_home in the config file is deprecated. As you '
'have left it at the default value you should remove the setting '
'from your airflow.cfg and suffer no change in behaviour.',
category=DeprecationWarning,
)
else:
AIRFLOW_HOME = conf.get('core', 'airflow_home')
warnings.warn(msg, category=DeprecationWarning)


WEBSERVER_CONFIG = AIRFLOW_HOME + '/webserver_config.py'

if not os.path.isfile(WEBSERVER_CONFIG):
log.info('Creating new FAB webserver config file in: %s', WEBSERVER_CONFIG)
DEFAULT_WEBSERVER_CONFIG = _read_default_config_file('default_webserver_config.py')
with open(WEBSERVER_CONFIG, 'w') as f:
f.write(DEFAULT_WEBSERVER_CONFIG)

Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from airflow.models.kubernetes import KubeResourceVersion, KubeWorkerIdentifier
from airflow.utils.state import State
from airflow.utils.db import provide_session, create_session
from airflow import configuration
from airflow import configuration, settings
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin

Expand Down Expand Up @@ -125,7 +125,7 @@ def __init__(self):
self.core_configuration = configuration_dict['core']
self.kube_secrets = configuration_dict.get('kubernetes_secrets', {})
self.kube_env_vars = configuration_dict.get('kubernetes_environment_variables', {})
self.airflow_home = configuration.get(self.core_section, 'airflow_home')
self.airflow_home = settings.AIRFLOW_HOME
self.dags_folder = configuration.get(self.core_section, 'dags_folder')
self.parallelism = configuration.getint(self.core_section, 'PARALLELISM')
self.worker_container_repository = configuration.get(
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _get_environment(self):
env["AIRFLOW__CORE__EXECUTOR"] = "LocalExecutor"

if self.kube_config.airflow_configmap:
env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home
env['AIRFLOW_HOME'] = self.worker_airflow_home
env['AIRFLOW__CORE__DAGS_FOLDER'] = self.worker_airflow_dags
if (not self.kube_config.airflow_configmap and
'AIRFLOW__CORE__SQL_ALCHEMY_CONN' not in self.kube_config.kube_secrets):
Expand Down
3 changes: 1 addition & 2 deletions airflow/lineage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from airflow import configuration as conf
from airflow.lineage.datasets import DataSet
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string, prepare_classpath
from airflow.utils.module_loading import import_string

from itertools import chain

Expand All @@ -36,7 +36,6 @@ def _get_backend():

try:
_backend_str = conf.get("lineage", "backend")
prepare_classpath()
backend = import_string(_backend_str)
except ImportError as ie:
log.debug("Cannot import %s due to %s", _backend_str, ie)
Expand Down
6 changes: 1 addition & 5 deletions airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,14 @@

from airflow import configuration as conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.module_loading import import_string, prepare_classpath
from airflow.utils.module_loading import import_string

log = logging.getLogger(__name__)


def configure_logging():
logging_class_path = ''
try:
# Prepare the classpath so we are sure that the config folder
# is on the python classpath and it is reachable
prepare_classpath()

logging_class_path = conf.get('core', 'logging_config_class')
except AirflowConfigException:
log.debug('Could not find key logging_config_class in config')
Expand Down
13 changes: 2 additions & 11 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@
import inspect
import os
import re
import sys
import pkg_resources
from typing import List, Any

from airflow import configuration
from airflow import settings
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log
Expand Down Expand Up @@ -122,20 +121,12 @@ def is_valid_plugin(plugin_obj, existing_plugins):
return False


plugins_folder = configuration.conf.get('core', 'plugins_folder')
if not plugins_folder:
plugins_folder = configuration.conf.get('core', 'airflow_home') + '/plugins'
plugins_folder = os.path.expanduser(plugins_folder)

if plugins_folder not in sys.path:
sys.path.append(plugins_folder)

plugins = [] # type: List[AirflowPlugin]

norm_pattern = re.compile(r'[/|.]')

# Crawl through the plugins folder to find AirflowPlugin derivatives
for root, dirs, files in os.walk(plugins_folder, followlinks=True):
for root, dirs, files in os.walk(settings.PLUGINS_FOLDER, followlinks=True):
for f in files:
try:
filepath = os.path.join(root, f)
Expand Down
55 changes: 42 additions & 13 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import logging
import os
import pendulum

import sys

from sqlalchemy import create_engine, exc
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool

from airflow import configuration as conf
from airflow.configuration import conf, AIRFLOW_HOME, WEBSERVER_CONFIG # NOQA F401
from airflow.logging_config import configure_logging
from airflow.utils.sqlalchemy import setup_event_handlers

Expand Down Expand Up @@ -67,9 +67,10 @@
LOG_FORMAT = conf.get('core', 'log_format')
SIMPLE_LOG_FORMAT = conf.get('core', 'simple_log_format')

AIRFLOW_HOME = None
SQL_ALCHEMY_CONN = None
DAGS_FOLDER = None
PLUGINS_FOLDER = None
LOGGING_CLASS_PATH = None

engine = None
Session = None
Expand Down Expand Up @@ -103,13 +104,18 @@ def policy(task_instance):


def configure_vars():
global AIRFLOW_HOME
global SQL_ALCHEMY_CONN
global DAGS_FOLDER
AIRFLOW_HOME = os.path.expanduser(conf.get('core', 'AIRFLOW_HOME'))
global PLUGINS_FOLDER
SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))

PLUGINS_FOLDER = conf.get(
'core',
'plugins_folder',
fallback=os.path.join(AIRFLOW_HOME, 'plugins')
)


def configure_orm(disable_connection_pool=False):
log.debug("Setting up DB connection pool (PID %s)" % os.getpid())
Expand Down Expand Up @@ -216,21 +222,44 @@ def configure_action_logging():
pass


def prepare_classpath():
"""
Ensures that certain subfolders of AIRFLOW_HOME are on the classpath
"""

if DAGS_FOLDER not in sys.path:
sys.path.append(DAGS_FOLDER)

# Add ./config/ for loading custom log parsers etc, or
# airflow_local_settings etc.
config_path = os.path.join(AIRFLOW_HOME, 'config')
if config_path not in sys.path:
sys.path.append(config_path)

if PLUGINS_FOLDER not in sys.path:
sys.path.append(PLUGINS_FOLDER)


try:
from airflow_local_settings import * # noqa F403 F401
log.info("Loaded airflow_local_settings.")
except Exception:
pass

logging_class_path = configure_logging()
configure_vars()
configure_adapters()
# The webservers import this file from models.py with the default settings.
configure_orm()
configure_action_logging()

# Ensure we close DB connections at scheduler and gunicon worker terminations
atexit.register(dispose_orm)
def initialize():
configure_vars()
prepare_classpath()
global LOGGING_CLASS_PATH
LOGGING_CLASS_PATH = configure_logging()
configure_adapters()
# The webservers import this file from models.py with the default settings.
configure_orm()
configure_action_logging()

# Ensure we close DB connections at scheduler and gunicon worker terminations
atexit.register(dispose_orm)


# Const stuff

Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.exceptions import AirflowException
from airflow.models import errors
from airflow.settings import logging_class_path
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.db import provide_session
Expand Down Expand Up @@ -548,8 +547,9 @@ def helper():
os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
# Replicating the behavior of how logging module was loaded
# in logging_config.py
reload_module(import_module(logging_class_path.rsplit('.', 1)[0]))
reload_module(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit('.', 1)[0]))
reload_module(airflow.settings)
airflow.settings.initialize()
del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
processor_manager = DagFileProcessorManager(dag_directory,
file_paths,
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/log/file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import logging
import os

from airflow import configuration as conf
from airflow import settings
from airflow.utils.helpers import parse_template_string
from datetime import datetime

Expand All @@ -41,7 +41,7 @@ def __init__(self, base_log_folder, filename_template):
super(FileProcessorHandler, self).__init__()
self.handler = None
self.base_log_folder = base_log_folder
self.dag_dir = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
self.dag_dir = os.path.expanduser(settings.DAGS_FOLDER)
self.filename_template, self.filename_jinja_template = \
parse_template_string(filename_template)

Expand Down
14 changes: 0 additions & 14 deletions airflow/utils/module_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import sys

from airflow import configuration as conf
from importlib import import_module


def prepare_classpath():
"""
Ensures that the Airflow home directory is on the classpath
"""
config_path = os.path.join(conf.get('core', 'airflow_home'), 'config')
config_path = os.path.expanduser(config_path)

if config_path not in sys.path:
sys.path.append(config_path)


def import_string(dotted_path):
"""
Import a dotted module path and return the attribute/class designated by the
Expand Down
5 changes: 2 additions & 3 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# under the License.
#
import logging
import os
import socket
from typing import Any

Expand Down Expand Up @@ -50,9 +51,7 @@ def create_app(config=None, session=None, testing=False, app_name="Airflow"):
app.wsgi_app = ProxyFix(app.wsgi_app)
app.secret_key = conf.get('webserver', 'SECRET_KEY')

airflow_home_path = conf.get('core', 'AIRFLOW_HOME')
webserver_config_path = airflow_home_path + '/webserver_config.py'
app.config.from_pyfile(webserver_config_path, silent=True)
app.config.from_pyfile(settings.WEBSERVER_CONFIG, silent=True)
app.config['APP_NAME'] = app_name
app.config['TESTING'] = testing
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
Expand Down
1 change: 0 additions & 1 deletion scripts/ci/airflow_travis.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# under the License.

[core]
airflow_home = ~/airflow
dags_folder = ~/airflow/dags
base_log_folder = ~/airflow/logs
executor = LocalExecutor
Expand Down
Loading

0 comments on commit 42b4003

Please sign in to comment.