Skip to content

Commit

Permalink
[AIRFLOW-833] Add an airflow default_config sub-command
Browse files Browse the repository at this point in the history
- Generate the default airflow.cfg explicitly with a command,
  not when simply importing airflow.
- The default airflow.cfg is also created on `airflow initdb`
  if it doesn't exist already.
- Don't create the unittests.cfg unless unit_test_mode is true.
  • Loading branch information
gsakkis committed Feb 12, 2017
1 parent eea5ff8 commit 7de2eff
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 80 deletions.
23 changes: 23 additions & 0 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ def get_dag(args):
return dagbag.dags[args.dag_id]


def default_config(args):
if os.path.isfile(args.filename):
if args.overwrite:
os.unlink(args.filename)
else:
raise AirflowException('{} already exists; pass --overwrite to '
'overwrite it'.format(args.filename))
if not conf.load_config_file(args.filename):
conf.save_config_file(args.filename, conf.DEFAULT_CONFIG)


def backfill(args, dag=None):
logging.basicConfig(
level=settings.LOGGING_LEVEL,
Expand Down Expand Up @@ -1146,6 +1157,14 @@ class CLIFactory(object):
'log_file': Arg(
("-l", "--log-file"), "Location of the log file"),

# default_config
'filename': Arg(
("-f", "--filename"),
"File to write the default config to",
default=conf.AIRFLOW_CONFIG),
'overwrite': Arg(
("-o", "--overwrite"),
"Overwrite file if it exists", "store_true"),
# backfill
'mark_success': Arg(
("-m", "--mark_success"),
Expand Down Expand Up @@ -1425,6 +1444,10 @@ class CLIFactory(object):
}
subparsers = (
{
'func': default_config,
'help': "Write the default config file",
'args': ('filename', 'overwrite'),
}, {
'func': backfill,
'help': "Run subsections of a DAG for a specified date range",
'args': (
Expand Down
125 changes: 50 additions & 75 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,20 +521,12 @@ class AirflowConfigParser(ConfigParser):

def __init__(self, *args, **kwargs):
ConfigParser.__init__(self, *args, **kwargs)
self.read_string(parameterized_config(DEFAULT_CONFIG))
self.is_validated = False

def read_string(self, string, source='<string>'):
"""
Read configuration from a string.
A backwards-compatible version of the ConfigParser.read_string()
method that was introduced in Python 3.
"""
# Python 3 added read_string() method
def read_parameterized_string(self, string, source='<string>'):
string = parameterized_config(string)
if six.PY3:
ConfigParser.read_string(self, string, source=source)
# Python 2 requires StringIO buffer
else:
import StringIO
self.readfp(StringIO.StringIO(string))
Expand Down Expand Up @@ -688,18 +680,12 @@ def as_dict(self, display_source=False, display_sensitive=False):

return cfg

def load_test_config(self):
"""
Load the unit test configuration.
Note: this is not reversible.
"""
# override any custom settings with defaults
self.read_string(parameterized_config(DEFAULT_CONFIG))
# then read test config
self.read_string(parameterized_config(TEST_CONFIG))
# then read any "custom" test settings
self.read(TEST_CONFIG_FILE)
def load_config_file(self, config_file):
if os.path.isfile(config_file):
self.read(config_file)
return True
else:
return False


def mkdir_p(path):
Expand All @@ -715,42 +701,34 @@ def mkdir_p(path):
# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
# "~/airflow" and "~/airflow/airflow.cfg" respectively as defaults.

if 'AIRFLOW_HOME' not in os.environ:
AIRFLOW_HOME = expand_env_var('~/airflow')
else:
if 'AIRFLOW_HOME' in os.environ:
AIRFLOW_HOME = expand_env_var(os.environ['AIRFLOW_HOME'])
else:
AIRFLOW_HOME = expand_env_var('~/airflow')

mkdir_p(AIRFLOW_HOME)

if 'AIRFLOW_CONFIG' not in os.environ:
if os.path.isfile(expand_env_var('~/airflow.cfg')):
AIRFLOW_CONFIG = expand_env_var('~/airflow.cfg')
else:
AIRFLOW_CONFIG = AIRFLOW_HOME + '/airflow.cfg'
else:
if 'AIRFLOW_CONFIG' in os.environ:
AIRFLOW_CONFIG = expand_env_var(os.environ['AIRFLOW_CONFIG'])
elif os.path.isfile(expand_env_var('~/airflow.cfg')):
AIRFLOW_CONFIG = expand_env_var('~/airflow.cfg')
else:
AIRFLOW_CONFIG = os.path.join(AIRFLOW_HOME, 'airflow.cfg')

# Set up dags folder for unit tests
# this directory won't exist if users install via pip
_TEST_DAGS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'tests',
'dags')
if os.path.exists(_TEST_DAGS_FOLDER):
TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
else:
_ROOT_FOLDER = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
TEST_DAGS_FOLDER = os.path.join(_ROOT_FOLDER, 'tests', 'dags')
if not os.path.exists(TEST_DAGS_FOLDER):
TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags')

# Set up plugins folder for unit tests
_TEST_PLUGINS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'tests',
'plugins')
if os.path.exists(_TEST_PLUGINS_FOLDER):
TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
else:
TEST_PLUGINS_FOLDER = os.path.join(_ROOT_FOLDER, 'tests', 'plugins')
if not os.path.exists(TEST_PLUGINS_FOLDER):
TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, 'plugins')

TEST_CONFIG_FILE = os.path.join(AIRFLOW_HOME, 'unittests.cfg')


def parameterized_config(template):
"""
Expand All @@ -762,38 +740,10 @@ def parameterized_config(template):
all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
return template.format(**all_vars)

TEST_CONFIG_FILE = AIRFLOW_HOME + '/unittests.cfg'
if not os.path.isfile(TEST_CONFIG_FILE):
logging.info("Creating new airflow config file for unit tests in: " +
TEST_CONFIG_FILE)
with open(TEST_CONFIG_FILE, 'w') as f:
f.write(parameterized_config(TEST_CONFIG))

if not os.path.isfile(AIRFLOW_CONFIG):
# These configuration options are used to generate a default configuration
# when it is missing. The right way to change your configuration is to
# alter your configuration file, not this code.
logging.info("Creating new airflow config file in: " + AIRFLOW_CONFIG)
with open(AIRFLOW_CONFIG, 'w') as f:
f.write(parameterized_config(DEFAULT_CONFIG))

logging.info("Reading the config from " + AIRFLOW_CONFIG)


conf = AirflowConfigParser()
conf.read(AIRFLOW_CONFIG)


def load_test_config():
"""
Load the unit test configuration.
Note: this is not reversible.
"""
conf.load_test_config()

if conf.getboolean('core', 'unit_test_mode'):
load_test_config()
conf.read_parameterized_string(DEFAULT_CONFIG)
conf.load_config_file(AIRFLOW_CONFIG)


def get(section, key, **kwargs):
Expand Down Expand Up @@ -828,3 +778,28 @@ def as_dict(display_source=False, display_sensitive=False):

def set(section, option, value): # noqa
return conf.set(section, option, value)


def load_config_file(config_file):
return conf.load_config_file(config_file)


def save_config_file(config_file, config_string):
logging.info("Creating new airflow config file in: %s", config_file)
with open(config_file, 'w') as f:
f.write(parameterized_config(config_string))


def load_test_config():
"""
Load the unit test configuration.
Note: this is not reversible.
"""
conf.read_parameterized_string(DEFAULT_CONFIG)
conf.read_parameterized_string(TEST_CONFIG)
if not conf.load_config_file(TEST_CONFIG_FILE):
save_config_file(TEST_CONFIG_FILE, TEST_CONFIG)

if conf.getboolean('core', 'unit_test_mode'):
load_test_config()
5 changes: 5 additions & 0 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from sqlalchemy import event, exc
from sqlalchemy.pool import Pool

from airflow import configuration as conf
from airflow import settings


Expand Down Expand Up @@ -278,6 +279,10 @@ def initdb():
session.add(chart)
session.commit()

# save the default airflow.cfg if it doesn't exist
if not os.path.isfile(conf.AIRFLOW_CONFIG):
conf.save_config_file(conf.AIRFLOW_CONFIG, conf.DEFAULT_CONFIG)


def upgradedb():
logging.info("Creating tables")
Expand Down
8 changes: 6 additions & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2615,8 +2615,12 @@ def conf(self):
title = "Airflow Configuration"
subtitle = conf.AIRFLOW_CONFIG
if conf.getboolean("webserver", "expose_config"):
with open(conf.AIRFLOW_CONFIG, 'r') as f:
config = f.read()
if os.path.isfile(conf.AIRFLOW_CONFIG):
with open(conf.AIRFLOW_CONFIG, 'r') as f:
config = f.read()
else:
config = conf.parameterized_config(conf.DEFAULT_CONFIG)
subtitle = None
table = [(section, key, value, source)
for section, parameters in conf.as_dict(True, True).items()
for key, (value, source) in parameters.items()]
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ building a production-grade environment requires a bit more work!
Setting Configuration Options
'''''''''''''''''''''''''''''

The first time you run Airflow, it will create a file called ``airflow.cfg`` in
The first time you run ``airflow initdb``, it will create a file called ``airflow.cfg`` in
your ``$AIRFLOW_HOME`` directory (``~/airflow`` by default). This file contains Airflow's configuration and you
can edit it to change any of the settings. You can also set options with environment variables by using this format:
``$AIRFLOW__{SECTION}__{KEY}`` (note the double underscores).
Expand Down
28 changes: 26 additions & 2 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
from airflow.operators import sensors
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.sqlite_hook import SqliteHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.bin import cli
from airflow.www import app as application
from airflow.settings import Session
Expand Down Expand Up @@ -1057,7 +1056,32 @@ def test_cli_list_tasks(self):
cli.list_tasks(args)

def test_cli_initdb(self):
cli.initdb(self.parser.parse_args(['initdb']))
temp_config_file = tempfile.mktemp()
self.assertFalse(os.path.isfile(temp_config_file))
with mock.patch.object(configuration, 'AIRFLOW_CONFIG', temp_config_file):
cli.initdb(self.parser.parse_args(['initdb']))
self.assertTrue(os.path.isfile(temp_config_file))
os.unlink(temp_config_file)

def test_cli_default_config(self):
temp_config_file = tempfile.mktemp()
self.assertFalse(os.path.isfile(temp_config_file))
with mock.patch.object(cli.conf, 'AIRFLOW_CONFIG', temp_config_file):
cli.default_config(self.parser.parse_args(['default_config',
'-f', temp_config_file]))
self.assertTrue(os.path.isfile(temp_config_file))

with self.assertRaises(AirflowException):
cli.default_config(self.parser.parse_args(['default_config',
'-f', temp_config_file]))
self.assertTrue(os.path.isfile(temp_config_file))

cli.default_config(self.parser.parse_args(['default_config',
'-f', temp_config_file,
'--overwrite']))
self.assertTrue(os.path.isfile(temp_config_file))

os.unlink(temp_config_file)

def test_cli_connections_list(self):
with mock.patch('sys.stdout',
Expand Down

0 comments on commit 7de2eff

Please sign in to comment.