Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-3743] Unify different methods of working out AIRFLOW_HOME #4705

Merged
merged 1 commit into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,19 @@ if foo is None:

This changes the behaviour if you previously explicitly provided `None` as a default value. If your code expects a `KeyError` to be thrown, then don't pass the `default_var` argument.

### Removal of `airflow_home` config setting

There were previously two ways of specifying the Airflow "home" directory
(`~/airflow` by default): the `AIRFLOW_HOME` environment variable, and the
`airflow_home` config setting in the `[core]` section.

If they had two different values different parts of the code base would end up
with different values. The config setting has been deprecated, and you should
remove the value from the config file and set `AIRFLOW_HOME` environment
variable if you need to use a non default value for this.

(Since this setting is used to calculate what config file to load, it is not
possible to keep just the config option)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb I'm wondering what will happen if I have both env variables AIRFLOW_HOME and AIRFLOW__CORE__AIRFLOW_HOME , but set to different values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the code the second one is the same as setting it in the config file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having re-read this: this is the very problem I ran in to that caused me to issue the PR. If the are set to different values then the webserver_config.py would be looked for in one, but other things from the other.

Specifically the problem I had was that webserver_config.py was written to one, but read from the other! So airflow create_user etc targeted an sqlite://:memory: DB!


## Airflow 1.10.2

Expand Down
3 changes: 1 addition & 2 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
from airflow.models import DAG
from airflow.exceptions import AirflowException

if settings.DAGS_FOLDER not in sys.path:
kaxil marked this conversation as resolved.
Show resolved Hide resolved
sys.path.append(settings.DAGS_FOLDER) # type: ignore
settings.initialize()

login = None

Expand Down
12 changes: 4 additions & 8 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,13 @@ def setup_logging(filename):

def setup_locations(process, pid=None, stdout=None, stderr=None, log=None):
if not stderr:
stderr = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME),
'airflow-{}.err'.format(process))
stderr = os.path.join(settings.AIRFLOW_HOME, 'airflow-{}.err'.format(process))
if not stdout:
stdout = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME),
'airflow-{}.out'.format(process))
stdout = os.path.join(settings.AIRFLOW_HOME, 'airflow-{}.out'.format(process))
if not log:
log = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME),
'airflow-{}.log'.format(process))
log = os.path.join(settings.AIRFLOW_HOME, 'airflow-{}.log'.format(process))
if not pid:
pid = os.path.join(os.path.expanduser(settings.AIRFLOW_HOME),
'airflow-{}.pid'.format(process))
pid = os.path.join(settings.AIRFLOW_HOME, 'airflow-{}.pid'.format(process))

return pid, stdout, stderr, log

Expand Down
3 changes: 0 additions & 3 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
# ----------------------- TEMPLATE BEGINS HERE -----------------------

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = {AIRFLOW_HOME}

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
Expand Down
1 change: 0 additions & 1 deletion airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

[core]
unit_test_mode = True
airflow_home = {AIRFLOW_HOME}
dags_folder = {TEST_DAGS_FOLDER}
plugins_folder = {TEST_PLUGINS_FOLDER}
base_log_folder = {AIRFLOW_HOME}/logs
Expand Down
21 changes: 20 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,12 +544,31 @@ def parameterized_config(template):

conf.read(AIRFLOW_CONFIG)

DEFAULT_WEBSERVER_CONFIG = _read_default_config_file('default_webserver_config.py')
if conf.has_option('core', 'AIRFLOW_HOME'):
ashb marked this conversation as resolved.
Show resolved Hide resolved
msg = (
'Specifying both AIRFLOW_HOME environment variable and airflow_home '
'in the config file is deprecated. Please use only the AIRFLOW_HOME '
'environment variable and remove the config file entry.'
)
if 'AIRFLOW_HOME' in os.environ:
warnings.warn(msg, category=DeprecationWarning)
elif conf.get('core', 'airflow_home') == AIRFLOW_HOME:
warnings.warn(
'Specifying airflow_home in the config file is deprecated. As you '
'have left it at the default value you should remove the setting '
'from your airflow.cfg and suffer no change in behaviour.',
category=DeprecationWarning,
)
else:
AIRFLOW_HOME = conf.get('core', 'airflow_home')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of having this line?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double-checking on this: so your intention is that if the user is still using airflow_home in .cfg file, only a warning will be given (no hard stoping) and the value in .cfg file will be used eventually?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was trying to make this change as easy as possible - warn but carry on using the setting (for 99% of people it will be the same)

Though the 1 case I know that this will break is the Puckel docker container. So perhaps if the two settings are different I should throw an exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or throw an exception once conf.has_option('core', 'AIRFLOW_HOME') == True?

Personally I think it's better to make it as explicit as possible, and having a clean cut.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing an exception in this case would be a breaking change (as this was in the default template, so every install would and up stopping working) which I am trying to avoid.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, it’s not really necessary to have an if-else here to check whether AIRFLOW_HOME is in env? The two warning messages are not that different to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ashb , I had a double-check on this. Let me correct myself for this:

  • I agree it's necessary to have the if-else if 'AIRFLOW_HOME' in os.environ: inside if conf.has_option('core', 'AIRFLOW_HOME'):, and decide if AIRFLOW_HOME should be based on the value in .cfg file.
  • But I don't think it's necessary to have the warning messages for two times in the code. The two warning messages here are very similar to each other, and either of them will be invoked anyway. We can have something like
if conf.has_option('core', 'AIRFLOW_HOME'):
    warnings.warn('<warning msg>', category=DeprecationWarning,)

    if 'AIRFLOW_HOME' not in os.environ:
        AIRFLOW_HOME = conf.get('core', 'airflow_home')

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed it to:

if conf.has_option('core', 'AIRFLOW_HOME'):
    msg = (
        'Specifying both AIRFLOW_HOME environment variable and airflow_home '
        'in the config file is deprecated. Please use only the AIRFLOW_HOME '
        'environment variable and remove the config file entry.'
    )
    if 'AIRFLOW_HOME' in os.environ:
        warnings.warn(msg, category=DeprecationWarning,)
    elif conf.get('core', 'airflow_home') == AIRFLOW_HOME:
        warnings.warn(
            'Specifying airflow_home in the config file is deprecated. As you '
            'have left it at the default value you should remove the setting '
            'from your airflow.cfg and suffer no change in behaviour.',
            category=DeprecationWarning,
        )
    else:
        AIRFLOW_HOME = conf.get('core', 'airflow_home')
        warnings.warn(msg, category=DeprecationWarning,)

I thought about adding an extra case do say which value it was

warnings.warn(msg, category=DeprecationWarning)


WEBSERVER_CONFIG = AIRFLOW_HOME + '/webserver_config.py'

if not os.path.isfile(WEBSERVER_CONFIG):
log.info('Creating new FAB webserver config file in: %s', WEBSERVER_CONFIG)
DEFAULT_WEBSERVER_CONFIG = _read_default_config_file('default_webserver_config.py')
with open(WEBSERVER_CONFIG, 'w') as f:
f.write(DEFAULT_WEBSERVER_CONFIG)

Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from airflow.models.kubernetes import KubeResourceVersion, KubeWorkerIdentifier
from airflow.utils.state import State
from airflow.utils.db import provide_session, create_session
from airflow import configuration
from airflow import configuration, settings
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin

Expand Down Expand Up @@ -125,7 +125,7 @@ def __init__(self):
self.core_configuration = configuration_dict['core']
self.kube_secrets = configuration_dict.get('kubernetes_secrets', {})
self.kube_env_vars = configuration_dict.get('kubernetes_environment_variables', {})
self.airflow_home = configuration.get(self.core_section, 'airflow_home')
self.airflow_home = settings.AIRFLOW_HOME
self.dags_folder = configuration.get(self.core_section, 'dags_folder')
self.parallelism = configuration.getint(self.core_section, 'PARALLELISM')
self.worker_container_repository = configuration.get(
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _get_environment(self):
env["AIRFLOW__CORE__EXECUTOR"] = "LocalExecutor"

if self.kube_config.airflow_configmap:
env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home
env['AIRFLOW_HOME'] = self.worker_airflow_home
env['AIRFLOW__CORE__DAGS_FOLDER'] = self.worker_airflow_dags
if (not self.kube_config.airflow_configmap and
'AIRFLOW__CORE__SQL_ALCHEMY_CONN' not in self.kube_config.kube_secrets):
Expand Down
3 changes: 1 addition & 2 deletions airflow/lineage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from airflow import configuration as conf
from airflow.lineage.datasets import DataSet
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string, prepare_classpath
from airflow.utils.module_loading import import_string

from itertools import chain

Expand All @@ -36,7 +36,6 @@ def _get_backend():

try:
_backend_str = conf.get("lineage", "backend")
prepare_classpath()
ashb marked this conversation as resolved.
Show resolved Hide resolved
backend = import_string(_backend_str)
except ImportError as ie:
log.debug("Cannot import %s due to %s", _backend_str, ie)
Expand Down
6 changes: 1 addition & 5 deletions airflow/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,14 @@

from airflow import configuration as conf
from airflow.exceptions import AirflowConfigException
from airflow.utils.module_loading import import_string, prepare_classpath
from airflow.utils.module_loading import import_string

log = logging.getLogger(__name__)


def configure_logging():
logging_class_path = ''
try:
# Prepare the classpath so we are sure that the config folder
# is on the python classpath and it is reachable
prepare_classpath()

logging_class_path = conf.get('core', 'logging_config_class')
except AirflowConfigException:
log.debug('Could not find key logging_config_class in config')
Expand Down
13 changes: 2 additions & 11 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@
import inspect
import os
import re
import sys
import pkg_resources
from typing import List, Any

from airflow import configuration
from airflow import settings
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log
Expand Down Expand Up @@ -122,20 +121,12 @@ def is_valid_plugin(plugin_obj, existing_plugins):
return False


plugins_folder = configuration.conf.get('core', 'plugins_folder')
if not plugins_folder:
plugins_folder = configuration.conf.get('core', 'airflow_home') + '/plugins'
plugins_folder = os.path.expanduser(plugins_folder)

if plugins_folder not in sys.path:
sys.path.append(plugins_folder)

plugins = [] # type: List[AirflowPlugin]

norm_pattern = re.compile(r'[/|.]')

# Crawl through the plugins folder to find AirflowPlugin derivatives
for root, dirs, files in os.walk(plugins_folder, followlinks=True):
for root, dirs, files in os.walk(settings.PLUGINS_FOLDER, followlinks=True):
for f in files:
try:
filepath = os.path.join(root, f)
Expand Down
55 changes: 42 additions & 13 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import logging
import os
import pendulum

import sys

from sqlalchemy import create_engine, exc
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool

from airflow import configuration as conf
from airflow.configuration import conf, AIRFLOW_HOME, WEBSERVER_CONFIG # NOQA F401
Copy link
Member

@XD-DENG XD-DENG Mar 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If WEBSERVER_CONFIG is not used here in airflow/settings.py, why do we have to import it? Possibly I missed something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's so we can do from airflow.settings import WEVSERVER_CONFIG

from airflow.logging_config import configure_logging
from airflow.utils.sqlalchemy import setup_event_handlers

Expand Down Expand Up @@ -67,9 +67,10 @@
LOG_FORMAT = conf.get('core', 'log_format')
SIMPLE_LOG_FORMAT = conf.get('core', 'simple_log_format')

AIRFLOW_HOME = None
SQL_ALCHEMY_CONN = None
DAGS_FOLDER = None
PLUGINS_FOLDER = None
LOGGING_CLASS_PATH = None

engine = None
Session = None
Expand Down Expand Up @@ -103,13 +104,18 @@ def policy(task_instance):


def configure_vars():
global AIRFLOW_HOME
global SQL_ALCHEMY_CONN
global DAGS_FOLDER
AIRFLOW_HOME = os.path.expanduser(conf.get('core', 'AIRFLOW_HOME'))
global PLUGINS_FOLDER
SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))

PLUGINS_FOLDER = conf.get(
'core',
'plugins_folder',
fallback=os.path.join(AIRFLOW_HOME, 'plugins')
)


def configure_orm(disable_connection_pool=False):
log.debug("Setting up DB connection pool (PID %s)" % os.getpid())
Expand Down Expand Up @@ -216,21 +222,44 @@ def configure_action_logging():
pass


def prepare_classpath():
"""
Ensures that certain subfolders of AIRFLOW_HOME are on the classpath
"""

if DAGS_FOLDER not in sys.path:
sys.path.append(DAGS_FOLDER)

# Add ./config/ for loading custom log parsers etc, or
# airflow_local_settings etc.
config_path = os.path.join(AIRFLOW_HOME, 'config')
if config_path not in sys.path:
sys.path.append(config_path)

if PLUGINS_FOLDER not in sys.path:
sys.path.append(PLUGINS_FOLDER)


try:
from airflow_local_settings import * # noqa F403 F401
log.info("Loaded airflow_local_settings.")
except Exception:
pass

logging_class_path = configure_logging()
configure_vars()
configure_adapters()
# The webservers import this file from models.py with the default settings.
configure_orm()
configure_action_logging()

# Ensure we close DB connections at scheduler and gunicon worker terminations
atexit.register(dispose_orm)
def initialize():
configure_vars()
prepare_classpath()
global LOGGING_CLASS_PATH
LOGGING_CLASS_PATH = configure_logging()
configure_adapters()
# The webservers import this file from models.py with the default settings.
configure_orm()
configure_action_logging()

# Ensure we close DB connections at scheduler and gunicon worker terminations
atexit.register(dispose_orm)


# Const stuff

Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.exceptions import AirflowException
from airflow.models import errors
from airflow.settings import logging_class_path
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.db import provide_session
Expand Down Expand Up @@ -548,8 +547,9 @@ def helper():
os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
# Replicating the behavior of how logging module was loaded
# in logging_config.py
reload_module(import_module(logging_class_path.rsplit('.', 1)[0]))
reload_module(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit('.', 1)[0]))
reload_module(airflow.settings)
airflow.settings.initialize()
del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
processor_manager = DagFileProcessorManager(dag_directory,
file_paths,
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/log/file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import logging
import os

from airflow import configuration as conf
from airflow import settings
from airflow.utils.helpers import parse_template_string
from datetime import datetime

Expand All @@ -41,7 +41,7 @@ def __init__(self, base_log_folder, filename_template):
super(FileProcessorHandler, self).__init__()
self.handler = None
self.base_log_folder = base_log_folder
self.dag_dir = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
self.dag_dir = os.path.expanduser(settings.DAGS_FOLDER)
self.filename_template, self.filename_jinja_template = \
parse_template_string(filename_template)

Expand Down
14 changes: 0 additions & 14 deletions airflow/utils/module_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import sys

from airflow import configuration as conf
from importlib import import_module


def prepare_classpath():
"""
Ensures that the Airflow home directory is on the classpath
"""
config_path = os.path.join(conf.get('core', 'airflow_home'), 'config')
config_path = os.path.expanduser(config_path)

if config_path not in sys.path:
sys.path.append(config_path)


def import_string(dotted_path):
"""
Import a dotted module path and return the attribute/class designated by the
Expand Down
5 changes: 2 additions & 3 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# under the License.
#
import logging
import os
import socket
from typing import Any

Expand Down Expand Up @@ -50,9 +51,7 @@ def create_app(config=None, session=None, testing=False, app_name="Airflow"):
app.wsgi_app = ProxyFix(app.wsgi_app)
app.secret_key = conf.get('webserver', 'SECRET_KEY')

airflow_home_path = conf.get('core', 'AIRFLOW_HOME')
webserver_config_path = airflow_home_path + '/webserver_config.py'
app.config.from_pyfile(webserver_config_path, silent=True)
app.config.from_pyfile(settings.WEBSERVER_CONFIG, silent=True)
app.config['APP_NAME'] = app_name
app.config['TESTING'] = testing
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
Expand Down
1 change: 0 additions & 1 deletion scripts/ci/airflow_travis.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# under the License.

[core]
airflow_home = ~/airflow
dags_folder = ~/airflow/dags
base_log_folder = ~/airflow/logs
executor = LocalExecutor
Expand Down
Loading