diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 63e3b026047a3..85576c386c608 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -24,4 +24,3 @@ Please accept this PR. I understand that it will not be reviewed until I have ch 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" - diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py index 5bc7f76aaa095..3e99edc9239ef 100644 --- a/airflow/api/client/local_client.py +++ b/airflow/api/client/local_client.py @@ -1,4 +1,4 @@ -# -*- coding: utf-8 -*- + # -*- coding: utf-8 -*- # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/airflow/contrib/auth/backends/astronomer_auth.py b/airflow/contrib/auth/backends/astronomer_auth.py new file mode 100644 index 0000000000000..b0af533e5891f --- /dev/null +++ b/airflow/contrib/auth/backends/astronomer_auth.py @@ -0,0 +1,230 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import unicode_literals + +from sys import version_info + +import flask_login +from flask_login import login_required, current_user, logout_user +from flask import flash +from wtforms import ( + Form, PasswordField, StringField) +from wtforms.validators import InputRequired + +from flask import url_for, redirect +from flask_bcrypt import generate_password_hash, check_password_hash + +from sqlalchemy import ( + Column, String, DateTime) +from sqlalchemy.ext.hybrid import hybrid_property + +from airflow import settings +from airflow import models +from airflow import configuration + +import os +import logging +import requests + +import json + +login_manager = flask_login.LoginManager() +login_manager.login_view = 'airflow.login' # Calls login() below +login_manager.login_message = None + +LOG = logging.getLogger(__name__) +PY3 = version_info[0] == 3 + + +class AuthenticationError(Exception): + pass + + +''' +class User(Base): + __tablename__ = "users" + + id = Column(Integer, primary_key=True) + username = Column(String(ID_LEN), unique=True) + email = Column(String(500)) + superuser = False + + def __repr__(self): + return self.username + + def get_id(self): + return str(self.id) + + def is_superuser(self): + return self.superuser +''' + +class AstronomerUser(models.User): + def __init__(self, user): + self.user = user + + @staticmethod + def authenticate(username, password, org_id): + hostname = configuration.get("astronomer-api", "hostname") + port = configuration.get("astronomer-api", "port") + protocol = configuration.get("astronomer-api", "protocol") + + base = '{}://{}:{}'.format(protocol, hostname, port) + endpoint = requests.compat.urljoin(base, 'v1') + + + try: + request_data = { + 'query': ''' + mutation Login($username: String!, $password: String!, $orgId: String) { + createToken(username: $username, password: $password, orgId: $orgId) { + success, + message, + token + } + } + ''', + 'variables': {'username': username, 'password': password, 'orgId': org_id} + } + + headers = {'Accept': 'application/json', + 'Content-Type': 'application/json'} + + post_data = json.dumps(request_data).encode("UTF-8") + + response = requests.post(endpoint, data=post_data, headers=headers) + + data = response.json() + + data = data['data']['createToken'] + + if data and 'success' in data and not data['success']: + raise Exception(data['message']) + + if not data or 'success' not in data: + return False + + return True + except requests.exceptions.RequestException as e: + LOG.info("Problem communicating with API: %s", str(e)) + return False + + def is_active(self): + '''Required by flask_login''' + return True + + def is_authenticated(self): + '''Required by flask_login''' + return True + + def is_anonymous(self): + '''Required by flask_login''' + return False + + def get_id(self): + '''Returns the current user id as required by flask_login''' + return self.user.get_id() + + def data_profiling(self): + '''Provides access to data profiling tools''' + return True + + def is_superuser(self): + '''Access all the things''' + return True + + +@login_manager.user_loader +def load_user(userid): + LOG.debug("Loading user %s", userid) + if not userid or userid == 'None': + return None + + session = settings.Session() + user = session.query(models.User).filter(models.User.id == int(userid)).first() + session.expunge_all() + session.commit() + session.close() + return AstronomerUser(user) + + +def login(self, request): + if current_user.is_authenticated(): + flash("You are already logged in") + return redirect(url_for('admin.index')) + + username = None + password = None + + form = LoginForm(request.form) + + if request.method == 'POST' and form.validate(): + username = request.form.get("username") + password = request.form.get("password") + + if not username or not password: + return self.render('airflow/login.html', + title="Airflow - Login", + form=form) + + try: + + session = settings.Session() + + if 'ASTRONOMER_ORG_ID' not in os.environ: + raise AuthenticationError("Unknown organization, server not configured correctly") + astro_org_id = os.environ['ASTRONOMER_ORG_ID'] + + if not AstronomerUser.authenticate(username, password, astro_org_id): + session.close() + raise AuthenticationError("Incorrect login details") + + LOG.info("User %s successfully authenticated", username) + + user = session.query(models.User).filter( + models.User.username == username).first() + + if not user: + user = models.User( + username=username, + is_superuser=False) + + # Add to database immediately, then query for the full object + session.merge(user) + session.commit() + user = session.query(models.User).filter( + models.User.username == username).first() + + session.merge(user) + session.commit() + flask_login.login_user(AstronomerUser(user)) + session.commit() + session.close() + + return redirect(request.args.get("next") or url_for("admin.index")) + except AuthenticationError as e: + flash(str(e)) + return self.render('airflow/login.html', + title="Airflow - Login", + form=form) + except Exception as e: + flash("Authentication Error: %s" % str(e)) + return self.render('airflow/login.html', + title="Airflow - Login", + form=form) + +class LoginForm(Form): + username = StringField('Username', [InputRequired()]) + password = PasswordField('Password', [InputRequired()]) diff --git a/airflow/contrib/operators/ssh_execute_operator.py b/airflow/contrib/operators/ssh_execute_operator.py new file mode 100644 index 0000000000000..dd9e197c2f99e --- /dev/null +++ b/airflow/contrib/operators/ssh_execute_operator.py @@ -0,0 +1,159 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from builtins import bytes +import logging +import subprocess +from subprocess import STDOUT + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.exceptions import AirflowException + + +class SSHTempFileContent(object): + """This class prvides a functionality that creates tempfile + with given content at remote host. + Use like:: + + with SSHTempFileContent(ssh_hook, content) as tempfile: + ... + + In this case, a temporary file ``tempfile`` + with content ``content`` is created where ``ssh_hook`` designate. + + Note that this isn't safe because other processes + at remote host can read and write that tempfile. + + :param ssh_hook: A SSHHook that indicates a remote host + where you want to create tempfile + :param content: Initial content of creating temporary file + :type content: string + :param prefix: The prefix string you want to use for the temporary file + :type prefix: string + """ + + def __init__(self, ssh_hook, content, prefix="tmp"): + self._ssh_hook = ssh_hook + self._content = content + self._prefix = prefix + + def __enter__(self): + ssh_hook = self._ssh_hook + string = self._content + prefix = self._prefix + + pmktemp = ssh_hook.Popen(["-q", + "mktemp", "-t", prefix + "_XXXXXX"], + stdout=subprocess.PIPE, + stderr=STDOUT) + tempfile = pmktemp.communicate()[0].rstrip() + pmktemp.wait() + if pmktemp.returncode: + raise AirflowException("Failed to create remote temp file") + + ptee = ssh_hook.Popen(["-q", "tee", tempfile], + stdin=subprocess.PIPE, + # discard stdout + stderr=STDOUT) + ptee.stdin.write(bytes(string, 'utf_8')) + ptee.stdin.close() + ptee.wait() + if ptee.returncode: + raise AirflowException("Failed to write to remote temp file") + + self._tempfile = tempfile + return tempfile + + def __exit__(self, type, value, traceback): + sp = self._ssh_hook.Popen(["-q", "rm", "-f", "--", self._tempfile]) + sp.communicate() + sp.wait() + if sp.returncode: + raise AirflowException("Failed to remove to remote temp file") + return False + + +class SSHExecuteOperator(BaseOperator): + """ + Execute a Bash script, command or set of commands at remote host. + + :param ssh_hook: A SSHHook that indicates the remote host + you want to run the script + :param ssh_hook: SSHHook + :param bash_command: The command, set of commands or reference to a + bash script (must be '.sh') to be executed. + :type bash_command: string + :param env: If env is not None, it must be a mapping that defines the + environment variables for the new process; these are used instead + of inheriting the current process environment, which is the default + behavior. + :type env: dict + """ + + template_fields = ("bash_command", "env",) + template_ext = (".sh", ".bash",) + + @apply_defaults + def __init__(self, + ssh_hook, + bash_command, + xcom_push=False, + env=None, + *args, **kwargs): + super(SSHExecuteOperator, self).__init__(*args, **kwargs) + self.bash_command = bash_command + self.env = env + self.hook = ssh_hook + self.xcom_push = xcom_push + + def execute(self, context): + bash_command = self.bash_command + hook = self.hook + host = hook._host_ref() + + with SSHTempFileContent(self.hook, + self.bash_command, + self.task_id) as remote_file_path: + logging.info("Temporary script " + "location : {0}:{1}".format(host, remote_file_path)) + logging.info("Running command: " + bash_command) + if self.env is not None: + logging.info("env: " + str(self.env)) + + sp = hook.Popen( + ['-q', 'bash', remote_file_path], + stdout=subprocess.PIPE, stderr=STDOUT, + env=self.env) + + self.sp = sp + + logging.info("Output:") + line = '' + for line in iter(sp.stdout.readline, b''): + line = line.decode('utf_8').strip() + logging.info(line) + sp.wait() + logging.info("Command exited with " + "return code {0}".format(sp.returncode)) + if sp.returncode: + raise AirflowException("Bash command failed") + if self.xcom_push: + return line + + def on_kill(self): + # TODO: Cleanup remote tempfile + # TODO: kill `mktemp` or `tee` too when they are alive. + logging.info('Sending SIGTERM signal to bash subprocess') + self.sp.terminate() diff --git a/airflow/www/csrf.py b/airflow/www/csrf.py new file mode 100644 index 0000000000000..8213441c97068 --- /dev/null +++ b/airflow/www/csrf.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from flask_wtf.csrf import CSRFProtect +csrf = CSRFProtect() diff --git a/airflow/www/static/bootstrap-theme.css b/airflow/www/static/bootstrap-theme.css index 734f940feed61..4e19ce0fcef11 100644 --- a/airflow/www/static/bootstrap-theme.css +++ b/airflow/www/static/bootstrap-theme.css @@ -9,8 +9,60 @@ * BootSwatchr built and provided by @DrewStrickland */ /*! normalize.css v3.0.2 | MIT License | git.io/normalize */ + +@font-face{ + font-family:"Sofia Light"; + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/0eaf6264-15ee-4251-826e-9c0a59e8395f.eot"); + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/0eaf6264-15ee-4251-826e-9c0a59e8395f.eot?#iefix") format("eot"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/8d11b8a0-868c-4a53-8b26-9f0ccbd58247.woff2") format("woff2"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/9b976f15-23b7-45cd-ad4b-59c95b2889f9.woff") format("woff"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/864da3de-5a97-4ce6-a22b-22932ed4dfe0.ttf") format("truetype"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/d5898683-1b2a-4b1b-8058-0c8db30e40cf.svg#d5898683-1b2a-4b1b-8058-0c8db30e40cf") format("svg"); +} +@font-face{ + font-family:"Sofia Light Italic"; + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/9b4837d6-e74a-44ef-a6c4-6b35a90a3702.eot"); + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/9b4837d6-e74a-44ef-a6c4-6b35a90a3702.eot?#iefix") format("eot"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/9be1eedf-69e4-4b70-ba74-3b23121fde9c.woff2") format("woff2"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/461e0a9e-689d-4609-bd34-f2316740e199.woff") format("woff"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/ab9ca33b-f66e-4aaf-8370-a2cd16af4072.ttf") format("truetype"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/d7142d94-d827-4eb5-b479-359ac2ebc205.svg#d7142d94-d827-4eb5-b479-359ac2ebc205") format("svg"); +} +@font-face{ + font-family:"Sofia Regular"; + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/941243bb-e692-46f8-91b9-6e621e5c1ff8.eot"); + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/941243bb-e692-46f8-91b9-6e621e5c1ff8.eot?#iefix") format("eot"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/8b31e0d7-5f53-45c2-b318-064f0b532543.woff2") format("woff2"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/476fe015-ab32-40a2-8fed-0a5af9a3bba4.woff") format("woff"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/d8bcfa37-4f93-49cf-8dea-c29e894af58e.ttf") format("truetype"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/28fc42bb-8761-4e6a-8c62-65d4a41c9e33.svg#28fc42bb-8761-4e6a-8c62-65d4a41c9e33") format("svg"); +} +@font-face{ + font-family:"Sofia Regular Italic"; + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/addba733-2fd3-4151-adf4-d0b563dc6517.eot"); + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/addba733-2fd3-4151-adf4-d0b563dc6517.eot?#iefix") format("eot"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/87cd0dd6-3e8b-46f1-b677-ab7644ea015e.woff2") format("woff2"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/90682d0e-e5df-4a2e-a2ff-6899b8ae8e4b.woff") format("woff"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/d1a6f359-b96c-43fd-87af-85f9762a4ecf.ttf") format("truetype"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/6cc18d90-baae-43d6-9b7e-e778cb8c630f.svg#6cc18d90-baae-43d6-9b7e-e778cb8c630f") format("svg"); +} +@font-face{ + font-family:"Sofia Medium"; + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/27070da8-f8ed-4002-8324-4f0dd80f8fa4.eot"); + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/27070da8-f8ed-4002-8324-4f0dd80f8fa4.eot?#iefix") format("eot"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/fe0b48dd-7bb4-4787-913d-eaf613373f35.woff2") format("woff2"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/d493595c-97d2-4edc-bc9a-c09ad13b4913.woff") format("woff"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/449bb77f-1bc7-4956-a8f1-3d4ae9243a59.ttf") format("truetype"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/1332b505-18f0-4aa5-ba55-9be49eba8fb5.svg#1332b505-18f0-4aa5-ba55-9be49eba8fb5") format("svg"); +} +@font-face{ + font-family:"Sofia Medium Italic"; + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/877f3b67-be6a-461d-a5a6-809160a59cd8.eot"); + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/877f3b67-be6a-461d-a5a6-809160a59cd8.eot?#iefix") format("eot"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/9da6ce88-4609-46ec-b6a3-9e8a69f8d053.woff2") format("woff2"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/44cb396c-a0d9-43f0-a13f-bad6bd335656.woff") format("woff"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/aeb7ae09-586c-4808-bada-216d01ed5ec7.ttf") format("truetype"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/f6759ecb-5f45-4a67-a026-db1db3e2c8d0.svg#f6759ecb-5f45-4a67-a026-db1db3e2c8d0") format("svg"); +} +@font-face{ + font-family:"Sofia Semi Bold"; + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/e488d96e-1d10-41aa-a2a0-8cb7cc0a54a8.eot"); + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/e488d96e-1d10-41aa-a2a0-8cb7cc0a54a8.eot?#iefix") format("eot"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/20755c49-19b5-4792-aaf3-27f6b048149a.woff2") format("woff2"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/7cdc9309-dc21-43e2-a348-c6e3853fe8a9.woff") format("woff"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/fe0fe0a1-32b6-4795-80da-fb446ac9d5ea.ttf") format("truetype"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/d7a947d1-755b-4774-a917-036acd235435.svg#d7a947d1-755b-4774-a917-036acd235435") format("svg"); +} +@font-face{ + font-family:"Sofia Semi Bold Italic"; + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/6158ec61-bf1b-4bd7-9dda-df9045c9c986.eot"); + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/6158ec61-bf1b-4bd7-9dda-df9045c9c986.eot?#iefix") format("eot"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/e03d67b8-21d1-471a-a052-5450498e2851.woff2") format("woff2"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/7e254ac6-68c1-4462-b775-f4064eea9be4.woff") format("woff"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/3e17b546-0789-4b1e-b570-6590b98e9700.ttf") format("truetype"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/002ad5e0-12e8-474c-95bd-1e7ae9cea0d4.svg#002ad5e0-12e8-474c-95bd-1e7ae9cea0d4") format("svg"); +} +@font-face{ + font-family:"Sofia Bold"; + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/c2440dd8-b25c-49ba-bc04-8a638a943324.eot"); + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/c2440dd8-b25c-49ba-bc04-8a638a943324.eot?#iefix") format("eot"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/80042803-0630-405f-b2d2-af97b4e50b46.woff2") format("woff2"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/ec41d681-0313-4b04-88c0-820aebb51f4e.woff") format("woff"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/81434fc3-e426-4a4c-987d-f25c098198c4.ttf") format("truetype"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/59fd3475-fba9-4c77-94f9-959fd3925e06.svg#59fd3475-fba9-4c77-94f9-959fd3925e06") format("svg"); +} +@font-face{ + font-family:"Sofia Bold Italic"; + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/0b2112bc-4fb7-4a37-8c55-1e5848431681.eot"); + src:url("https://cdn.astronomer.io/app/fonts/sofia-pro/0b2112bc-4fb7-4a37-8c55-1e5848431681.eot?#iefix") format("eot"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/27f146af-7709-44d9-8b45-d09866bec59f.woff2") format("woff2"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/5f085bad-f484-4825-8686-d347df989cf2.woff") format("woff"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/a1133ba8-2dbc-4877-b3bc-b9a3a44652ba.ttf") format("truetype"),url("https://cdn.astronomer.io/app/fonts/sofia-pro/e37199b8-7efe-4da4-bb3c-9a415c519b57.svg#e37199b8-7efe-4da4-bb3c-9a415c519b57") format("svg"); +} + html { - font-family: sans-serif; + font-family: "Sofia Regular"; -ms-text-size-adjust: 100%; -webkit-text-size-adjust: 100%; } @@ -294,7 +346,7 @@ textarea { line-height: inherit; } a { - color: #0091a1; + color: #342f54; text-decoration: none; } a:hover, @@ -532,10 +584,10 @@ mark, color: #777777; } .text-primary { - color: #005c66; + color: #302c43; } a.text-primary:hover { - color: #002e33; + color: #342f54; } .text-success { color: #3c763d; @@ -1501,7 +1553,7 @@ th { border-bottom-width: 2px; } .table-striped > tbody > tr:nth-child(odd) { - background-color: #f9f9f9; + background-color: #f4f2ec; } .table-hover > tbody > tr:hover { background-color: #f5f5f5; @@ -2247,7 +2299,7 @@ fieldset[disabled] .btn-default.active { } .btn-primary { color: #ffffff; - background-color: #005c66; + background-color: #302c43; border-color: #00454c; } .btn-primary:hover, @@ -3430,7 +3482,7 @@ tbody.collapse.in { color: #ffffff; text-decoration: none; outline: 0; - background-color: #005c66; + background-color: #342f54; } .dropdown-menu > .disabled > a, .dropdown-menu > .disabled > a:hover, @@ -4137,6 +4189,7 @@ select[multiple].input-group-sm > .input-group-btn > .btn { } .navbar-brand > img { display: block; + margin-right: 10px; } @media (min-width: 768px) { .navbar > .container .navbar-brand, @@ -4440,8 +4493,8 @@ fieldset[disabled] .navbar-default .btn-link:focus { color: #cccccc; } .navbar-inverse { - background-color: #007a87; - border-color: #004c54; + background-color: #302c43; + border-color: #302c43; background-image: none; } .navbar-inverse .navbar-nav>.active>a, .navbar-inverse .navbar-nav>.open>a { @@ -4451,7 +4504,7 @@ fieldset[disabled] .navbar-default .btn-link:focus { background-image: none; } .navbar-inverse .navbar-brand { - color: #dddddd; + color: #f4f2ec; } .navbar-inverse .navbar-brand:hover, .navbar-inverse .navbar-brand:focus { @@ -4459,21 +4512,21 @@ fieldset[disabled] .navbar-default .btn-link:focus { background-color: transparent; } .navbar-inverse .navbar-text { - color: #ffb400; + color: #f4f2ec; } .navbar-inverse .navbar-nav > li > a { color: #dddddd; } .navbar-inverse .navbar-nav > li > a:hover, .navbar-inverse .navbar-nav > li > a:focus { - color: #ffffff; + color: #fefcf9; background-color: transparent; } .navbar-inverse .navbar-nav > .active > a, .navbar-inverse .navbar-nav > .active > a:hover, .navbar-inverse .navbar-nav > .active > a:focus { - color: #ffffff; - background-color: #004c54; + color: #f4f2ec; + background-color: #24232A; } .navbar-inverse .navbar-nav > .disabled > a, .navbar-inverse .navbar-nav > .disabled > a:hover, @@ -4489,7 +4542,7 @@ fieldset[disabled] .navbar-default .btn-link:focus { background-color: #333333; } .navbar-inverse .navbar-toggle .icon-bar { - background-color: #ffffff; + background-color: #342f54; } .navbar-inverse .navbar-collapse, .navbar-inverse .navbar-form { @@ -4498,15 +4551,15 @@ fieldset[disabled] .navbar-default .btn-link:focus { .navbar-inverse .navbar-nav > .open > a, .navbar-inverse .navbar-nav > .open > a:hover, .navbar-inverse .navbar-nav > .open > a:focus { - background-color: #004c54; + background-color: #24232A; color: #ffffff; } @media (max-width: 767px) { .navbar-inverse .navbar-nav .open .dropdown-menu > .dropdown-header { - border-color: #004c54; + border-color: #24232A; } .navbar-inverse .navbar-nav .open .dropdown-menu .divider { - background-color: #004c54; + background-color: #24232A; } .navbar-inverse .navbar-nav .open .dropdown-menu > li > a { color: #dddddd; @@ -4520,7 +4573,7 @@ fieldset[disabled] .navbar-default .btn-link:focus { .navbar-inverse .navbar-nav .open .dropdown-menu > .active > a:hover, .navbar-inverse .navbar-nav .open .dropdown-menu > .active > a:focus { color: #ffffff; - background-color: #004c54; + background-color: #24232A; } .navbar-inverse .navbar-nav .open .dropdown-menu > .disabled > a, .navbar-inverse .navbar-nav .open .dropdown-menu > .disabled > a:hover, @@ -4614,8 +4667,8 @@ fieldset[disabled] .navbar-inverse .btn-link:focus { .pagination > .active > span:focus { z-index: 2; color: #ffffff; - background-color: #005c66; - border-color: #005c66; + background-color: #302c43; + border-color: #302c43; cursor: default; } .pagination > .disabled > span, diff --git a/airflow/www/static/jquery.dataTables.css b/airflow/www/static/jquery.dataTables.css index 6a2e88d942260..f72cb54c2f89a 100644 --- a/airflow/www/static/jquery.dataTables.css +++ b/airflow/www/static/jquery.dataTables.css @@ -103,10 +103,10 @@ table.dataTable.cell-border tbody tr:first-child td { border-top: none; } table.dataTable.stripe tbody tr.odd, table.dataTable.display tbody tr.odd { - background-color: #f9f9f9; + background-color: #f4f2ec; } table.dataTable.stripe tbody tr.odd.selected, table.dataTable.display tbody tr.odd.selected { - background-color: #abb9d3; + background-color: #f4f2ec; } table.dataTable.hover tbody tr:hover, table.dataTable.hover tbody tr.odd:hover, @@ -127,7 +127,7 @@ table.dataTable.order-column tbody tr > .sorting_2, table.dataTable.order-column tbody tr > .sorting_3, table.dataTable.display tbody tr > .sorting_1, table.dataTable.display tbody tr > .sorting_2, table.dataTable.display tbody tr > .sorting_3 { - background-color: #f9f9f9; + background-color: #f4f2ec; } table.dataTable.order-column tbody tr.selected > .sorting_1, table.dataTable.order-column tbody tr.selected > .sorting_2, diff --git a/airflow/www/templates/admin/master.html b/airflow/www/templates/admin/master.html index 49660a66201ce..d2b1602963f3e 100644 --- a/airflow/www/templates/admin/master.html +++ b/airflow/www/templates/admin/master.html @@ -107,4 +107,3 @@ {% endblock %} - diff --git a/tests/api/client/local_client.py b/tests/api/client/local_client.py new file mode 100644 index 0000000000000..a36b71f01faf7 --- /dev/null +++ b/tests/api/client/local_client.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import unittest +import datetime + +from mock import patch + +from airflow import AirflowException +from airflow import models + +from airflow.api.client.local_client import Client +from airflow.utils.state import State + +EXECDATE = datetime.datetime.now() +EXECDATE_NOFRACTIONS = EXECDATE.replace(microsecond=0) +EXECDATE_ISO = EXECDATE_NOFRACTIONS.isoformat() + +real_datetime_class = datetime.datetime + + +def mock_datetime_now(target, dt): + class DatetimeSubclassMeta(type): + @classmethod + def __instancecheck__(mcs, obj): + return isinstance(obj, real_datetime_class) + + class BaseMockedDatetime(real_datetime_class): + @classmethod + def now(cls, tz=None): + return target.replace(tzinfo=tz) + + @classmethod + def utcnow(cls): + return target + + # Python2 & Python3 compatible metaclass + MockedDatetime = DatetimeSubclassMeta('datetime', (BaseMockedDatetime,), {}) + + return patch.object(dt, 'datetime', MockedDatetime) + + +class TestLocalClient(unittest.TestCase): + def setUp(self): + self.client = Client(api_base_url=None, auth=None) + + @patch.object(models.DAG, 'create_dagrun') + def test_trigger_dag(self, mock): + client = self.client + + # non existent + with self.assertRaises(AirflowException): + client.trigger_dag(dag_id="blablabla") + + import airflow.api.common.experimental.trigger_dag + with mock_datetime_now(EXECDATE, airflow.api.common.experimental.trigger_dag.datetime): + # no execution date, execution date should be set automatically + client.trigger_dag(dag_id="test_start_date_scheduling") + mock.assert_called_once_with(run_id="manual__{0}".format(EXECDATE_ISO), + execution_date=EXECDATE_NOFRACTIONS, + state=State.RUNNING, + conf=None, + external_trigger=True) + mock.reset_mock() + + # execution date with microseconds cutoff + client.trigger_dag(dag_id="test_start_date_scheduling", execution_date=EXECDATE) + mock.assert_called_once_with(run_id="manual__{0}".format(EXECDATE_ISO), + execution_date=EXECDATE_NOFRACTIONS, + state=State.RUNNING, + conf=None, + external_trigger=True) + mock.reset_mock() + + # run id + run_id = "my_run_id" + client.trigger_dag(dag_id="test_start_date_scheduling", run_id=run_id) + mock.assert_called_once_with(run_id=run_id, + execution_date=EXECDATE_NOFRACTIONS, + state=State.RUNNING, + conf=None, + external_trigger=True) + mock.reset_mock() + + # test conf + conf = '{"name": "John"}' + client.trigger_dag(dag_id="test_start_date_scheduling", conf=conf) + mock.assert_called_once_with(run_id="manual__{0}".format(EXECDATE_ISO), + execution_date=EXECDATE_NOFRACTIONS, + state=State.RUNNING, + conf=json.loads(conf), + external_trigger=True) + mock.reset_mock() + + # this is a unit test only, cannot verify existing dag run diff --git a/tests/api/common/mark_tasks.py b/tests/api/common/mark_tasks.py new file mode 100644 index 0000000000000..e01f3adc30b8d --- /dev/null +++ b/tests/api/common/mark_tasks.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from airflow import models +from airflow.api.common.experimental.mark_tasks import set_state, _create_dagruns +from airflow.settings import Session +from airflow.utils.dates import days_ago +from airflow.utils.state import State + + +DEV_NULL = "/dev/null" + + +class TestMarkTasks(unittest.TestCase): + def setUp(self): + self.dagbag = models.DagBag(include_examples=True) + self.dag1 = self.dagbag.dags['test_example_bash_operator'] + self.dag2 = self.dagbag.dags['example_subdag_operator'] + + self.execution_dates = [days_ago(2), days_ago(1)] + + drs = _create_dagruns(self.dag1, self.execution_dates, + state=State.RUNNING, + run_id_template="scheduled__{}") + for dr in drs: + dr.dag = self.dag1 + dr.verify_integrity() + + drs = _create_dagruns(self.dag2, + [self.dag2.default_args['start_date']], + state=State.RUNNING, + run_id_template="scheduled__{}") + + for dr in drs: + dr.dag = self.dag2 + dr.verify_integrity() + + self.session = Session() + + def snapshot_state(self, dag, execution_dates): + TI = models.TaskInstance + tis = self.session.query(TI).filter( + TI.dag_id==dag.dag_id, + TI.execution_date.in_(execution_dates) + ).all() + + self.session.expunge_all() + + return tis + + def verify_state(self, dag, task_ids, execution_dates, state, old_tis): + TI = models.TaskInstance + + tis = self.session.query(TI).filter( + TI.dag_id==dag.dag_id, + TI.execution_date.in_(execution_dates) + ).all() + + self.assertTrue(len(tis) > 0) + + for ti in tis: + if ti.task_id in task_ids and ti.execution_date in execution_dates: + self.assertEqual(ti.state, state) + else: + for old_ti in old_tis: + if (old_ti.task_id == ti.task_id + and old_ti.execution_date == ti.execution_date): + self.assertEqual(ti.state, old_ti.state) + + def test_mark_tasks_now(self): + # set one task to success but do not commit + snapshot = self.snapshot_state(self.dag1, self.execution_dates) + task = self.dag1.get_task("runme_1") + altered = set_state(task=task, execution_date=self.execution_dates[0], + upstream=False, downstream=False, future=False, + past=False, state=State.SUCCESS, commit=False) + self.assertEqual(len(altered), 1) + self.verify_state(self.dag1, [task.task_id], [self.execution_dates[0]], + None, snapshot) + + # set one and only one task to success + altered = set_state(task=task, execution_date=self.execution_dates[0], + upstream=False, downstream=False, future=False, + past=False, state=State.SUCCESS, commit=True) + self.assertEqual(len(altered), 1) + self.verify_state(self.dag1, [task.task_id], [self.execution_dates[0]], + State.SUCCESS, snapshot) + + # set no tasks + altered = set_state(task=task, execution_date=self.execution_dates[0], + upstream=False, downstream=False, future=False, + past=False, state=State.SUCCESS, commit=True) + self.assertEqual(len(altered), 0) + self.verify_state(self.dag1, [task.task_id], [self.execution_dates[0]], + State.SUCCESS, snapshot) + + # set task to other than success + altered = set_state(task=task, execution_date=self.execution_dates[0], + upstream=False, downstream=False, future=False, + past=False, state=State.FAILED, commit=True) + self.assertEqual(len(altered), 1) + self.verify_state(self.dag1, [task.task_id], [self.execution_dates[0]], + State.FAILED, snapshot) + + # dont alter other tasks + snapshot = self.snapshot_state(self.dag1, self.execution_dates) + task = self.dag1.get_task("runme_0") + altered = set_state(task=task, execution_date=self.execution_dates[0], + upstream=False, downstream=False, future=False, + past=False, state=State.SUCCESS, commit=True) + self.assertEqual(len(altered), 1) + self.verify_state(self.dag1, [task.task_id], [self.execution_dates[0]], + State.SUCCESS, snapshot) + + def test_mark_downstream(self): + # test downstream + snapshot = self.snapshot_state(self.dag1, self.execution_dates) + task = self.dag1.get_task("runme_1") + relatives = task.get_flat_relatives(upstream=False) + task_ids = [t.task_id for t in relatives] + task_ids.append(task.task_id) + + altered = set_state(task=task, execution_date=self.execution_dates[0], + upstream=False, downstream=True, future=False, + past=False, state=State.SUCCESS, commit=True) + self.assertEqual(len(altered), 3) + self.verify_state(self.dag1, task_ids, [self.execution_dates[0]], + State.SUCCESS, snapshot) + + def test_mark_upstream(self): + # test upstream + snapshot = self.snapshot_state(self.dag1, self.execution_dates) + task = self.dag1.get_task("run_after_loop") + relatives = task.get_flat_relatives(upstream=True) + task_ids = [t.task_id for t in relatives] + task_ids.append(task.task_id) + + altered = set_state(task=task, execution_date=self.execution_dates[0], + upstream=True, downstream=False, future=False, + past=False, state=State.SUCCESS, commit=True) + self.assertEqual(len(altered), 4) + self.verify_state(self.dag1, task_ids, [self.execution_dates[0]], + State.SUCCESS, snapshot) + + def test_mark_tasks_future(self): + # set one task to success towards end of scheduled dag runs + snapshot = self.snapshot_state(self.dag1, self.execution_dates) + task = self.dag1.get_task("runme_1") + altered = set_state(task=task, execution_date=self.execution_dates[0], + upstream=False, downstream=False, future=True, + past=False, state=State.SUCCESS, commit=True) + self.assertEqual(len(altered), 2) + self.verify_state(self.dag1, [task.task_id], self.execution_dates, + State.SUCCESS, snapshot) + + def test_mark_tasks_past(self): + # set one task to success towards end of scheduled dag runs + snapshot = self.snapshot_state(self.dag1, self.execution_dates) + task = self.dag1.get_task("runme_1") + altered = set_state(task=task, execution_date=self.execution_dates[1], + upstream=False, downstream=False, future=False, + past=True, state=State.SUCCESS, commit=True) + self.assertEqual(len(altered), 2) + self.verify_state(self.dag1, [task.task_id], self.execution_dates, + State.SUCCESS, snapshot) + + def test_mark_tasks_subdag(self): + # set one task to success towards end of scheduled dag runs + task = self.dag2.get_task("section-1") + relatives = task.get_flat_relatives(upstream=False) + task_ids = [t.task_id for t in relatives] + task_ids.append(task.task_id) + + altered = set_state(task=task, execution_date=self.execution_dates[0], + upstream=False, downstream=True, future=False, + past=False, state=State.SUCCESS, commit=True) + self.assertEqual(len(altered), 14) + + # cannot use snapshot here as that will require drilling down the + # the sub dag tree essentially recreating the same code as in the + # tested logic. + self.verify_state(self.dag2, task_ids, [self.execution_dates[0]], + State.SUCCESS, []) + + def tearDown(self): + self.dag1.clear() + self.dag2.clear() + + # just to make sure we are fully cleaned up + self.session.query(models.DagRun).delete() + self.session.query(models.TaskInstance).delete() + self.session.commit() + + self.session.close() + +if __name__ == '__main__': + unittest.main() diff --git a/tests/contrib/hooks/spark_submit_hook.py b/tests/contrib/hooks/spark_submit_hook.py new file mode 100644 index 0000000000000..c156a3fdbaf3d --- /dev/null +++ b/tests/contrib/hooks/spark_submit_hook.py @@ -0,0 +1,201 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import unittest + +from airflow import configuration, models +from airflow.utils import db +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.spark_submit_hook import SparkSubmitHook + + +class TestSparkSubmitHook(unittest.TestCase): + _spark_job_file = 'test_application.py' + _config = { + 'conf': { + 'parquet.compression': 'SNAPPY' + }, + 'conn_id': 'default_spark', + 'files': 'hive-site.xml', + 'py_files': 'sample_library.py', + 'jars': 'parquet.jar', + 'total_executor_cores': 4, + 'executor_cores': 4, + 'executor_memory': '22g', + 'keytab': 'privileged_user.keytab', + 'principal': 'user/spark@airflow.org', + 'name': 'spark-job', + 'num_executors': 10, + 'verbose': True, + 'driver_memory': '3g', + 'java_class': 'com.foo.bar.AppMain', + 'application_args': [ + '-f foo', + '--bar bar', + 'baz' + ] + } + + def setUp(self): + configuration.load_test_config() + db.merge_conn( + models.Connection( + conn_id='spark_yarn_cluster', conn_type='spark', + host='yarn://yarn-master', extra='{"queue": "root.etl", "deploy-mode": "cluster"}') + ) + db.merge_conn( + models.Connection( + conn_id='spark_default_mesos', conn_type='spark', + host='mesos://host', port=5050) + ) + + db.merge_conn( + models.Connection( + conn_id='spark_home_set', conn_type='spark', + host='yarn://yarn-master', + extra='{"spark-home": "/opt/myspark"}') + ) + + db.merge_conn( + models.Connection( + conn_id='spark_home_not_set', conn_type='spark', + host='yarn://yarn-master') + ) + + def test_build_command(self): + hook = SparkSubmitHook(**self._config) + + # The subprocess requires an array but we build the cmd by joining on a space + cmd = ' '.join(hook._build_command(self._spark_job_file)) + + # Check if the URL gets build properly and everything exists. + assert self._spark_job_file in cmd + + # Check all the parameters + assert "--files {}".format(self._config['files']) in cmd + assert "--py-files {}".format(self._config['py_files']) in cmd + assert "--jars {}".format(self._config['jars']) in cmd + assert "--total-executor-cores {}".format(self._config['total_executor_cores']) in cmd + assert "--executor-cores {}".format(self._config['executor_cores']) in cmd + assert "--executor-memory {}".format(self._config['executor_memory']) in cmd + assert "--keytab {}".format(self._config['keytab']) in cmd + assert "--principal {}".format(self._config['principal']) in cmd + assert "--name {}".format(self._config['name']) in cmd + assert "--num-executors {}".format(self._config['num_executors']) in cmd + assert "--class {}".format(self._config['java_class']) in cmd + assert "--driver-memory {}".format(self._config['driver_memory']) in cmd + + # Check if all config settings are there + for k in self._config['conf']: + assert "--conf {0}={1}".format(k, self._config['conf'][k]) in cmd + + # Check the application arguments are there + for a in self._config['application_args']: + assert a in cmd + + # Check if application arguments are after the application + application_idx = cmd.find(self._spark_job_file) + for a in self._config['application_args']: + assert cmd.find(a) > application_idx + + if self._config['verbose']: + assert "--verbose" in cmd + + def test_submit(self): + hook = SparkSubmitHook() + + # We don't have spark-submit available, and this is hard to mock, so just accept + # an exception for now. + with self.assertRaises(AirflowException): + hook.submit(self._spark_job_file) + + def test_resolve_connection(self): + + # Default to the standard yarn connection because conn_id does not exists + hook = SparkSubmitHook(conn_id='') + self.assertEqual(hook._resolve_connection(), ('yarn', None, None, None)) + assert "--master yarn" in ' '.join(hook._build_command(self._spark_job_file)) + + # Default to the standard yarn connection + hook = SparkSubmitHook(conn_id='spark_default') + self.assertEqual( + hook._resolve_connection(), + ('yarn', 'root.default', None, None) + ) + cmd = ' '.join(hook._build_command(self._spark_job_file)) + assert "--master yarn" in cmd + assert "--queue root.default" in cmd + + # Connect to a mesos master + hook = SparkSubmitHook(conn_id='spark_default_mesos') + self.assertEqual( + hook._resolve_connection(), + ('mesos://host:5050', None, None, None) + ) + + cmd = ' '.join(hook._build_command(self._spark_job_file)) + assert "--master mesos://host:5050" in cmd + + # Set specific queue and deploy mode + hook = SparkSubmitHook(conn_id='spark_yarn_cluster') + self.assertEqual( + hook._resolve_connection(), + ('yarn://yarn-master', 'root.etl', 'cluster', None) + ) + + cmd = ' '.join(hook._build_command(self._spark_job_file)) + assert "--master yarn://yarn-master" in cmd + assert "--queue root.etl" in cmd + assert "--deploy-mode cluster" in cmd + + # Set the spark home + hook = SparkSubmitHook(conn_id='spark_home_set') + self.assertEqual( + hook._resolve_connection(), + ('yarn://yarn-master', None, None, '/opt/myspark') + ) + + cmd = ' '.join(hook._build_command(self._spark_job_file)) + assert cmd.startswith('/opt/myspark/bin/spark-submit') + + # Spark home not set + hook = SparkSubmitHook(conn_id='spark_home_not_set') + self.assertEqual( + hook._resolve_connection(), + ('yarn://yarn-master', None, None, None) + ) + + cmd = ' '.join(hook._build_command(self._spark_job_file)) + assert cmd.startswith('spark-submit') + + def test_process_log(self): + # Must select yarn connection + hook = SparkSubmitHook(conn_id='spark_yarn_cluster') + + log_lines = [ + 'SPARK_MAJOR_VERSION is set to 2, using Spark2', + 'WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable', + 'WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.', + 'INFO Client: Requesting a new application from cluster with 10 NodeManagers', + 'INFO Client: Submitting application application_1486558679801_1820 to ResourceManager' + ] + + hook._process_log(log_lines) + + assert hook._yarn_application_id == 'application_1486558679801_1820' + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/contrib/operators/spark_submit_operator.py b/tests/contrib/operators/spark_submit_operator.py new file mode 100644 index 0000000000000..531235f93ef61 --- /dev/null +++ b/tests/contrib/operators/spark_submit_operator.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest +import datetime + +from airflow import DAG, configuration +from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator + +DEFAULT_DATE = datetime.datetime(2017, 1, 1) + + +class TestSparkSubmitOperator(unittest.TestCase): + _config = { + 'conf': { + 'parquet.compression': 'SNAPPY' + }, + 'files': 'hive-site.xml', + 'py_files': 'sample_library.py', + 'jars': 'parquet.jar', + 'total_executor_cores':4, + 'executor_cores': 4, + 'executor_memory': '22g', + 'keytab': 'privileged_user.keytab', + 'principal': 'user/spark@airflow.org', + 'name': 'spark-job', + 'num_executors': 10, + 'verbose': True, + 'application': 'test_application.py', + 'driver_memory': '3g', + 'java_class': 'com.foo.bar.AppMain', + 'application_args': [ + '-f foo', + '--bar bar' + ] + } + + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } + self.dag = DAG('test_dag_id', default_args=args) + + def test_execute(self, conn_id='spark_default'): + operator = SparkSubmitOperator( + task_id='spark_submit_job', + dag=self.dag, + **self._config + ) + + self.assertEqual(conn_id, operator._conn_id) + + self.assertEqual(self._config['application'], operator._application) + self.assertEqual(self._config['conf'], operator._conf) + self.assertEqual(self._config['files'], operator._files) + self.assertEqual(self._config['py_files'], operator._py_files) + self.assertEqual(self._config['jars'], operator._jars) + self.assertEqual(self._config['total_executor_cores'], operator._total_executor_cores) + self.assertEqual(self._config['executor_cores'], operator._executor_cores) + self.assertEqual(self._config['executor_memory'], operator._executor_memory) + self.assertEqual(self._config['keytab'], operator._keytab) + self.assertEqual(self._config['principal'], operator._principal) + self.assertEqual(self._config['name'], operator._name) + self.assertEqual(self._config['num_executors'], operator._num_executors) + self.assertEqual(self._config['verbose'], operator._verbose) + self.assertEqual(self._config['java_class'], operator._java_class) + self.assertEqual(self._config['driver_memory'], operator._driver_memory) + self.assertEqual(self._config['application_args'], operator._application_args) + + + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/utils/compression.py b/tests/utils/compression.py new file mode 100644 index 0000000000000..f8e0ebbb2a7d5 --- /dev/null +++ b/tests/utils/compression.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.utils import compression +import unittest +from tempfile import NamedTemporaryFile, mkdtemp +import bz2 +import gzip +import shutil +import logging +import errno +import filecmp + + +class Compression(unittest.TestCase): + + def setUp(self): + self.fn = {} + try: + header = "Sno\tSome,Text \n".encode() + line1 = "1\tAirflow Test\n".encode() + line2 = "2\tCompressionUtil\n".encode() + self.tmp_dir = mkdtemp(prefix='test_utils_compression_') + # create sample txt, gz and bz2 files + with NamedTemporaryFile(mode='wb+', + dir=self.tmp_dir, + delete=False) as f_txt: + self._set_fn(f_txt.name, '.txt') + f_txt.writelines([header, line1, line2]) + fn_gz = self._get_fn('.txt') + ".gz" + with gzip.GzipFile(filename=fn_gz, + mode="wb") as f_gz: + self._set_fn(fn_gz, '.gz') + f_gz.writelines([header, line1, line2]) + fn_bz2 = self._get_fn('.txt') + '.bz2' + with bz2.BZ2File(filename=fn_bz2, + mode="wb") as f_bz2: + self._set_fn(fn_bz2, '.bz2') + f_bz2.writelines([header, line1, line2]) + # Base Exception so it catches Keyboard Interrupt + except BaseException as e: + logging.error(e) + self.tearDown() + + def tearDown(self): + try: + shutil.rmtree(self.tmp_dir) + except OSError as e: + # ENOENT - no such file or directory + if e.errno != errno.ENOENT: + raise e + + # Helper method to create a dictionary of file names and + # file extension + def _set_fn(self, fn, ext): + self.fn[ext] = fn + + # Helper method to fetch a file of a + # certain extension + def _get_fn(self, ext): + return self.fn[ext] + + def test_uncompress_file(self): + # Testing txt file type + self.assertRaisesRegexp(NotImplementedError, + "^Received .txt format. Only gz and bz2.*", + compression.uncompress_file, + **{'input_file_name': None, + 'file_extension': '.txt', + 'dest_dir': None + }) + # Testing gz file type + fn_txt = self._get_fn('.txt') + fn_gz = self._get_fn('.gz') + txt_gz = compression.uncompress_file(fn_gz, '.gz', self.tmp_dir) + self.assertTrue(filecmp.cmp(txt_gz, fn_txt, shallow=False), + msg="Uncompressed file doest match original") + # Testing bz2 file type + fn_bz2 = self._get_fn('.bz2') + txt_bz2 = compression.uncompress_file(fn_bz2, '.bz2', self.tmp_dir) + self.assertTrue(filecmp.cmp(txt_bz2, fn_txt, shallow=False), + msg="Uncompressed file doest match original") + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/utils/dates.py b/tests/utils/dates.py new file mode 100644 index 0000000000000..dc0c87ecad6e3 --- /dev/null +++ b/tests/utils/dates.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime, timedelta +import unittest + +from airflow.utils import dates + +class Dates(unittest.TestCase): + + def test_days_ago(self): + today = datetime.today() + today_midnight = datetime.fromordinal(today.date().toordinal()) + + self.assertTrue(dates.days_ago(0) == today_midnight) + + self.assertTrue( + dates.days_ago(100) == today_midnight + timedelta(days=-100)) + + self.assertTrue( + dates.days_ago(0, hour=3) == today_midnight + timedelta(hours=3)) + self.assertTrue( + dates.days_ago(0, minute=3) + == today_midnight + timedelta(minutes=3)) + self.assertTrue( + dates.days_ago(0, second=3) + == today_midnight + timedelta(seconds=3)) + self.assertTrue( + dates.days_ago(0, microsecond=3) + == today_midnight + timedelta(microseconds=3)) + + +if __name__ == '__main__': + unittest.main()