Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-833] Add an airflow default_config sub-command #2052

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ def get_dag(args):
return dagbag.dags[args.dag_id]


def default_config(args):
if os.path.isfile(args.filename):
if args.overwrite:
os.unlink(args.filename)
else:
raise AirflowException('{} already exists; pass --overwrite to '
'overwrite it'.format(args.filename))
if not conf.load_config_file(args.filename):
conf.save_config_file(args.filename, conf.DEFAULT_CONFIG)


def backfill(args, dag=None):
logging.basicConfig(
level=settings.LOGGING_LEVEL,
Expand Down Expand Up @@ -1146,6 +1157,14 @@ class CLIFactory(object):
'log_file': Arg(
("-l", "--log-file"), "Location of the log file"),

# default_config
'filename': Arg(
("-f", "--filename"),
"File to write the default config to",
default=conf.AIRFLOW_CONFIG),
'overwrite': Arg(
("-o", "--overwrite"),
"Overwrite file if it exists", "store_true"),
# backfill
'mark_success': Arg(
("-m", "--mark_success"),
Expand Down Expand Up @@ -1425,6 +1444,10 @@ class CLIFactory(object):
}
subparsers = (
{
'func': default_config,
'help': "Write the default config file",
'args': ('filename', 'overwrite'),
}, {
'func': backfill,
'help': "Run subsections of a DAG for a specified date range",
'args': (
Expand Down
125 changes: 50 additions & 75 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,20 +521,12 @@ class AirflowConfigParser(ConfigParser):

def __init__(self, *args, **kwargs):
ConfigParser.__init__(self, *args, **kwargs)
self.read_string(parameterized_config(DEFAULT_CONFIG))
self.is_validated = False

def read_string(self, string, source='<string>'):
"""
Read configuration from a string.

A backwards-compatible version of the ConfigParser.read_string()
method that was introduced in Python 3.
"""
# Python 3 added read_string() method
def read_parameterized_string(self, string, source='<string>'):
string = parameterized_config(string)
if six.PY3:
ConfigParser.read_string(self, string, source=source)
# Python 2 requires StringIO buffer
else:
import StringIO
self.readfp(StringIO.StringIO(string))
Expand Down Expand Up @@ -688,18 +680,12 @@ def as_dict(self, display_source=False, display_sensitive=False):

return cfg

def load_test_config(self):
"""
Load the unit test configuration.

Note: this is not reversible.
"""
# override any custom settings with defaults
self.read_string(parameterized_config(DEFAULT_CONFIG))
# then read test config
self.read_string(parameterized_config(TEST_CONFIG))
# then read any "custom" test settings
self.read(TEST_CONFIG_FILE)
def load_config_file(self, config_file):
if os.path.isfile(config_file):
self.read(config_file)
return True
else:
return False


def mkdir_p(path):
Expand All @@ -715,42 +701,34 @@ def mkdir_p(path):
# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
# "~/airflow" and "~/airflow/airflow.cfg" respectively as defaults.

if 'AIRFLOW_HOME' not in os.environ:
AIRFLOW_HOME = expand_env_var('~/airflow')
else:
if 'AIRFLOW_HOME' in os.environ:
AIRFLOW_HOME = expand_env_var(os.environ['AIRFLOW_HOME'])
else:
AIRFLOW_HOME = expand_env_var('~/airflow')

mkdir_p(AIRFLOW_HOME)

if 'AIRFLOW_CONFIG' not in os.environ:
if os.path.isfile(expand_env_var('~/airflow.cfg')):
AIRFLOW_CONFIG = expand_env_var('~/airflow.cfg')
else:
AIRFLOW_CONFIG = AIRFLOW_HOME + '/airflow.cfg'
else:
if 'AIRFLOW_CONFIG' in os.environ:
AIRFLOW_CONFIG = expand_env_var(os.environ['AIRFLOW_CONFIG'])
elif os.path.isfile(expand_env_var('~/airflow.cfg')):
AIRFLOW_CONFIG = expand_env_var('~/airflow.cfg')
else:
AIRFLOW_CONFIG = os.path.join(AIRFLOW_HOME, 'airflow.cfg')

# Set up dags folder for unit tests
# this directory won't exist if users install via pip
_TEST_DAGS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'tests',
'dags')
if os.path.exists(_TEST_DAGS_FOLDER):
TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
else:
_ROOT_FOLDER = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
TEST_DAGS_FOLDER = os.path.join(_ROOT_FOLDER, 'tests', 'dags')
if not os.path.exists(TEST_DAGS_FOLDER):
TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags')

# Set up plugins folder for unit tests
_TEST_PLUGINS_FOLDER = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
'tests',
'plugins')
if os.path.exists(_TEST_PLUGINS_FOLDER):
TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
else:
TEST_PLUGINS_FOLDER = os.path.join(_ROOT_FOLDER, 'tests', 'plugins')
if not os.path.exists(TEST_PLUGINS_FOLDER):
TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, 'plugins')

TEST_CONFIG_FILE = os.path.join(AIRFLOW_HOME, 'unittests.cfg')


def parameterized_config(template):
"""
Expand All @@ -762,38 +740,10 @@ def parameterized_config(template):
all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
return template.format(**all_vars)

TEST_CONFIG_FILE = AIRFLOW_HOME + '/unittests.cfg'
if not os.path.isfile(TEST_CONFIG_FILE):
logging.info("Creating new airflow config file for unit tests in: " +
TEST_CONFIG_FILE)
with open(TEST_CONFIG_FILE, 'w') as f:
f.write(parameterized_config(TEST_CONFIG))

if not os.path.isfile(AIRFLOW_CONFIG):
# These configuration options are used to generate a default configuration
# when it is missing. The right way to change your configuration is to
# alter your configuration file, not this code.
logging.info("Creating new airflow config file in: " + AIRFLOW_CONFIG)
with open(AIRFLOW_CONFIG, 'w') as f:
f.write(parameterized_config(DEFAULT_CONFIG))

logging.info("Reading the config from " + AIRFLOW_CONFIG)


conf = AirflowConfigParser()
conf.read(AIRFLOW_CONFIG)


def load_test_config():
"""
Load the unit test configuration.

Note: this is not reversible.
"""
conf.load_test_config()

if conf.getboolean('core', 'unit_test_mode'):
load_test_config()
conf.read_parameterized_string(DEFAULT_CONFIG)
conf.load_config_file(AIRFLOW_CONFIG)


def get(section, key, **kwargs):
Expand Down Expand Up @@ -828,3 +778,28 @@ def as_dict(display_source=False, display_sensitive=False):

def set(section, option, value): # noqa
return conf.set(section, option, value)


def load_config_file(config_file):
return conf.load_config_file(config_file)


def save_config_file(config_file, config_string):
logging.info("Creating new airflow config file in: %s", config_file)
with open(config_file, 'w') as f:
f.write(parameterized_config(config_string))


def load_test_config():
"""
Load the unit test configuration.

Note: this is not reversible.
"""
conf.read_parameterized_string(DEFAULT_CONFIG)
conf.read_parameterized_string(TEST_CONFIG)
if not conf.load_config_file(TEST_CONFIG_FILE):
save_config_file(TEST_CONFIG_FILE, TEST_CONFIG)

if conf.getboolean('core', 'unit_test_mode'):
load_test_config()
5 changes: 5 additions & 0 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from sqlalchemy import event, exc
from sqlalchemy.pool import Pool

from airflow import configuration as conf
from airflow import settings


Expand Down Expand Up @@ -278,6 +279,10 @@ def initdb():
session.add(chart)
session.commit()

# save the default airflow.cfg if it doesn't exist
if not os.path.isfile(conf.AIRFLOW_CONFIG):
conf.save_config_file(conf.AIRFLOW_CONFIG, conf.DEFAULT_CONFIG)


def upgradedb():
logging.info("Creating tables")
Expand Down
8 changes: 6 additions & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2615,8 +2615,12 @@ def conf(self):
title = "Airflow Configuration"
subtitle = conf.AIRFLOW_CONFIG
if conf.getboolean("webserver", "expose_config"):
with open(conf.AIRFLOW_CONFIG, 'r') as f:
config = f.read()
if os.path.isfile(conf.AIRFLOW_CONFIG):
with open(conf.AIRFLOW_CONFIG, 'r') as f:
config = f.read()
else:
config = conf.parameterized_config(conf.DEFAULT_CONFIG)
subtitle = None
table = [(section, key, value, source)
for section, parameters in conf.as_dict(True, True).items()
for key, (value, source) in parameters.items()]
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ building a production-grade environment requires a bit more work!
Setting Configuration Options
'''''''''''''''''''''''''''''

The first time you run Airflow, it will create a file called ``airflow.cfg`` in
The first time you run ``airflow initdb``, it will create a file called ``airflow.cfg`` in
your ``$AIRFLOW_HOME`` directory (``~/airflow`` by default). This file contains Airflow's configuration and you
can edit it to change any of the settings. You can also set options with environment variables by using this format:
``$AIRFLOW__{SECTION}__{KEY}`` (note the double underscores).
Expand Down
28 changes: 26 additions & 2 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
from airflow.operators import sensors
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.sqlite_hook import SqliteHook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.bin import cli
from airflow.www import app as application
from airflow.settings import Session
Expand Down Expand Up @@ -1057,7 +1056,32 @@ def test_cli_list_tasks(self):
cli.list_tasks(args)

def test_cli_initdb(self):
cli.initdb(self.parser.parse_args(['initdb']))
temp_config_file = tempfile.mktemp()
self.assertFalse(os.path.isfile(temp_config_file))
with mock.patch.object(configuration, 'AIRFLOW_CONFIG', temp_config_file):
cli.initdb(self.parser.parse_args(['initdb']))
self.assertTrue(os.path.isfile(temp_config_file))
os.unlink(temp_config_file)

def test_cli_default_config(self):
temp_config_file = tempfile.mktemp()
self.assertFalse(os.path.isfile(temp_config_file))
with mock.patch.object(cli.conf, 'AIRFLOW_CONFIG', temp_config_file):
cli.default_config(self.parser.parse_args(['default_config',
'-f', temp_config_file]))
self.assertTrue(os.path.isfile(temp_config_file))

with self.assertRaises(AirflowException):
cli.default_config(self.parser.parse_args(['default_config',
'-f', temp_config_file]))
self.assertTrue(os.path.isfile(temp_config_file))

cli.default_config(self.parser.parse_args(['default_config',
'-f', temp_config_file,
'--overwrite']))
self.assertTrue(os.path.isfile(temp_config_file))

os.unlink(temp_config_file)

def test_cli_connections_list(self):
with mock.patch('sys.stdout',
Expand Down