Skip to content

Commit

Permalink
Use PyUpgrade to use Python 3.6 features (#11447)
Browse files Browse the repository at this point in the history
Use features like `f-strings` instead of format across the code-base.
More details: https://github.com/asottile/pyupgrade
  • Loading branch information
kaxil authored Nov 3, 2020
1 parent 8000ab7 commit 8c42cf1
Show file tree
Hide file tree
Showing 323 changed files with 1,046 additions and 1,099 deletions.
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ repos:
- id: fix-encoding-pragma
args:
- --remove
- repo: https://github.com/asottile/pyupgrade
rev: v2.7.3
hooks:
- id: pyupgrade
args: ["--py36-plus"]
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.6.0
hooks:
Expand Down
4 changes: 2 additions & 2 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2005,8 +2005,8 @@ This is the current syntax for `./breeze <./breeze>`_:
helm-lint incorrect-use-of-LoggingMixin insert-license isort language-matters
lint-dockerfile lint-openapi mermaid mixed-line-ending mypy mypy-helm
no-relative-imports pre-commit-descriptions provide-create-sessions pydevd
pydocstyle pylint pylint-tests python-no-log-warn restrict-start_date rst-backticks
setup-order setup-installation shellcheck sort-in-the-wild stylelint
pydocstyle pylint pylint-tests python-no-log-warn pyupgrade restrict-start_date
rst-backticks setup-order setup-installation shellcheck sort-in-the-wild stylelint
trailing-whitespace update-breeze-file update-extras update-local-yml-file
update-setup-cfg-file yamllint
Expand Down
2 changes: 2 additions & 0 deletions STATIC_CODE_CHECKS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ require Breeze Docker images to be installed locally:
----------------------------------- ---------------------------------------------------------------- ------------
``fix-encoding-pragma`` Removes encoding header from python files.
----------------------------------- ---------------------------------------------------------------- ------------
``pyupgrade`` Runs PyUpgrade
----------------------------------- ---------------------------------------------------------------- ------------
``flake8`` Runs flake8. *
----------------------------------- ---------------------------------------------------------------- ------------
``forbid-tabs`` Fails if tabs are used in the project.
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/auth/backend/kerberos_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def init_app(app):

service = 'airflow'

_KERBEROS_SERVICE.service_name = "{}@{}".format(service, hostname)
_KERBEROS_SERVICE.service_name = f"{service}@{hostname}"

if 'KRB5_KTNAME' not in os.environ:
os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')
Expand Down
8 changes: 4 additions & 4 deletions airflow/api/client/json_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def _request(self, url, method='GET', json=None):
return resp.json()

def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
endpoint = '/api/experimental/dags/{}/dag_runs'.format(dag_id)
endpoint = f'/api/experimental/dags/{dag_id}/dag_runs'
url = urljoin(self._api_base_url, endpoint)
data = self._request(url, method='POST',
json={
Expand All @@ -54,13 +54,13 @@ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
return data['message']

def delete_dag(self, dag_id):
endpoint = '/api/experimental/dags/{}/delete_dag'.format(dag_id)
endpoint = f'/api/experimental/dags/{dag_id}/delete_dag'
url = urljoin(self._api_base_url, endpoint)
data = self._request(url, method='DELETE')
return data['message']

def get_pool(self, name):
endpoint = '/api/experimental/pools/{}'.format(name)
endpoint = f'/api/experimental/pools/{name}'
url = urljoin(self._api_base_url, endpoint)
pool = self._request(url)
return pool['pool'], pool['slots'], pool['description']
Expand All @@ -83,7 +83,7 @@ def create_pool(self, name, slots, description):
return pool['pool'], pool['slots'], pool['description']

def delete_pool(self, name):
endpoint = '/api/experimental/pools/{}'.format(name)
endpoint = f'/api/experimental/pools/{name}'
url = urljoin(self._api_base_url, endpoint)
pool = self._request(url, method='DELETE')
return pool['pool'], pool['slots'], pool['description']
Expand Down
4 changes: 2 additions & 2 deletions airflow/api/client/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
run_id=run_id,
conf=conf,
execution_date=execution_date)
return "Created {}".format(dag_run)
return f"Created {dag_run}"

def delete_dag(self, dag_id):
count = delete_dag.delete_dag(dag_id)
return "Removed {} record(s)".format(count)
return f"Removed {count} record(s)"

def get_pool(self, name):
the_pool = pool.get_pool(name=name)
Expand Down
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ def check_and_get_dag(dag_id: str, task_id: Optional[str] = None) -> DagModel:
"""Checks that DAG exists and in case it is specified that Task exist"""
dag_model = DagModel.get_current(dag_id)
if dag_model is None:
raise DagNotFound("Dag id {} not found in DagModel".format(dag_id))
raise DagNotFound(f"Dag id {dag_id} not found in DagModel")

dagbag = DagBag(
dag_folder=dag_model.fileloc,
read_dags_from_db=True
)
dag = dagbag.get_dag(dag_id)
if not dag:
error_message = "Dag id {} not found".format(dag_id)
error_message = f"Dag id {dag_id} not found"
raise DagNotFound(error_message)
if task_id and not dag.has_task(task_id):
error_message = 'Task {} not found in dag {}'.format(task_id, dag_id)
error_message = f'Task {task_id} not found in dag {dag_id}'
raise TaskNotFound(error_message)
return dag

Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
log.info("Deleting DAG: %s", dag_id)
dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
if dag is None:
raise DagNotFound("Dag id {} not found".format(dag_id))
raise DagNotFound(f"Dag id {dag_id} not found")

# Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
# There may be a lag, so explicitly removes serialized DAG here.
Expand Down
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ def set_state(
return []

if not timezone.is_localized(execution_date):
raise ValueError("Received non-localized date {}".format(execution_date))
raise ValueError(f"Received non-localized date {execution_date}")

task_dags = {task.dag for task in tasks}
if len(task_dags) > 1:
raise ValueError("Received tasks from multiple DAGs: {}".format(task_dags))
raise ValueError(f"Received tasks from multiple DAGs: {task_dags}")
dag = next(iter(task_dags))
if dag is None:
raise ValueError("Received tasks with no DAG")
Expand Down Expand Up @@ -247,7 +247,7 @@ def get_execution_dates(dag, execution_date, future, past):
"""Returns dates of DAG execution"""
latest_execution_date = dag.get_latest_execution_date()
if latest_execution_date is None:
raise ValueError("Received non-localized date {}".format(execution_date))
raise ValueError(f"Received non-localized date {execution_date}")
# determine date range of dag runs and tasks to consider
end_date = latest_execution_date if future else execution_date
if 'start_date' in dag.default_args:
Expand Down
6 changes: 3 additions & 3 deletions airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _trigger_dag(
dag = dag_bag.get_dag(dag_id) # prefetch dag if it is stored serialized

if dag_id not in dag_bag.dags:
raise DagNotFound("Dag id {} not found".format(dag_id))
raise DagNotFound(f"Dag id {dag_id} not found")

execution_date = execution_date if execution_date else timezone.utcnow()

Expand All @@ -62,7 +62,7 @@ def _trigger_dag(
min_dag_start_date = dag.default_args["start_date"]
if min_dag_start_date and execution_date < min_dag_start_date:
raise ValueError(
"The execution_date [{0}] should be >= start_date [{1}] from DAG's default_args".format(
"The execution_date [{}] should be >= start_date [{}] from DAG's default_args".format(
execution_date.isoformat(),
min_dag_start_date.isoformat()))

Expand Down Expand Up @@ -112,7 +112,7 @@ def trigger_dag(
"""
dag_model = DagModel.get_current(dag_id)
if dag_model is None:
raise DagNotFound("Dag id {} not found in DagModel".format(dag_id))
raise DagNotFound(f"Dag id {dag_id} not found in DagModel")

dagbag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
triggers = _trigger_dag(
Expand Down
6 changes: 3 additions & 3 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def post_clear_task_instances(dag_id: str, session=None):

dag = current_app.dag_bag.get_dag(dag_id)
if not dag:
error_message = "Dag id {} not found".format(dag_id)
error_message = f"Dag id {dag_id} not found"
raise NotFound(error_message)
reset_dag_runs = data.pop('reset_dag_runs')
task_instances = dag.clear(get_tis=True, **data)
Expand Down Expand Up @@ -287,7 +287,7 @@ def post_set_task_instances_state(dag_id, session):
except ValidationError as err:
raise BadRequest(detail=str(err.messages))

error_message = "Dag ID {} not found".format(dag_id)
error_message = f"Dag ID {dag_id} not found"
try:
dag = current_app.dag_bag.get_dag(dag_id)
if not dag:
Expand All @@ -300,7 +300,7 @@ def post_set_task_instances_state(dag_id, session):
task = dag.task_dict.get(task_id)

if not task:
error_message = "Task ID {} not found".format(task_id)
error_message = f"Task ID {task_id} not found"
raise NotFound(error_message)

tis = set_state(
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/schemas/common_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def get_obj_type(self, obj):
elif isinstance(obj, CronExpression):
return "CronExpression"
else:
raise Exception("Unknown object type: {}".format(obj.__class__.__name__))
raise Exception(f"Unknown object type: {obj.__class__.__name__}")


class ColorField(fields.String):
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/connection_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def connections_add(args):
missing_args.append('conn-uri or conn-type')
if missing_args:
msg = ('The following args are required to add a connection:' +
' {missing!r}'.format(missing=missing_args))
f' {missing_args!r}')
raise SystemExit(msg)
if invalid_args:
msg = ('The following args are not compatible with the ' +
Expand Down
14 changes: 7 additions & 7 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ def dag_backfill(args, dag=None):
run_conf = json.loads(args.conf)

if args.dry_run:
print("Dry run of DAG {0} on {1}".format(args.dag_id,
args.start_date))
print("Dry run of DAG {} on {}".format(args.dag_id,
args.start_date))
for task in dag.tasks:
print("Task {0}".format(task.task_id))
print(f"Task {task.task_id}")
ti = TaskInstance(task, args.start_date)
ti.dry_run()
else:
Expand Down Expand Up @@ -239,7 +239,7 @@ def _display_dot_via_imgcat(dot: Dot):
def _save_dot_to_file(dot: Dot, filename: str):
filename_without_ext, _, ext = filename.rpartition('.')
dot.render(filename=filename_without_ext, format=ext, cleanup=True)
print("File {} saved".format(filename))
print(f"File {filename} saved")


@cli_utils.action_logging
Expand Down Expand Up @@ -319,7 +319,7 @@ def dag_list_jobs(args, dag=None):
dagbag = DagBag()

if args.dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(args.dag_id)
error_message = f"Dag id {args.dag_id} not found"
raise AirflowException(error_message)
queries.append(BaseJob.dag_id == args.dag_id)

Expand Down Expand Up @@ -350,7 +350,7 @@ def dag_list_dag_runs(args, dag=None):
dagbag = DagBag()

if args.dag_id is not None and args.dag_id not in dagbag.dags:
error_message = "Dag id {} not found".format(args.dag_id)
error_message = f"Dag id {args.dag_id} not found"
raise AirflowException(error_message)

state = args.state.lower() if args.state else None
Expand All @@ -363,7 +363,7 @@ def dag_list_dag_runs(args, dag=None):
)

if not dag_runs:
print('No dag runs for {dag_id}'.format(dag_id=args.dag_id))
print(f'No dag runs for {args.dag_id}')
return

dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/pool_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def pool_import_helper(filepath):
"""Helps import pools from the json file"""
api_client = get_current_api_client()

with open(filepath, 'r') as poolfile:
with open(filepath) as poolfile:
data = poolfile.read()
try: # pylint: disable=too-many-nested-blocks
pools_json = json.loads(data)
Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def task_run(args, dag=None):
"""Runs a single task instance"""
# Load custom airflow config
if args.cfg_path:
with open(args.cfg_path, 'r') as conf_file:
with open(args.cfg_path) as conf_file:
conf_dict = json.load(conf_file)

if os.path.exists(args.cfg_path):
Expand Down Expand Up @@ -238,7 +238,7 @@ def task_failed_deps(args):
if failed_deps:
print("Task instance dependencies not met:")
for dep in failed_deps:
print("{}: {}".format(dep.dep_name, dep.reason))
print(f"{dep.dep_name}: {dep.reason}")
else:
print("Task instance dependencies are all met.")

Expand Down
18 changes: 9 additions & 9 deletions airflow/cli/commands/user_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def users_create(args):
role = appbuilder.sm.find_role(args.role)
if not role:
valid_roles = appbuilder.sm.get_all_roles()
raise SystemExit('{} is not a valid role. Valid roles are: {}'.format(args.role, valid_roles))
raise SystemExit(f'{args.role} is not a valid role. Valid roles are: {valid_roles}')

if args.use_random_password:
password = ''.join(random.choice(string.printable) for _ in range(16))
Expand All @@ -61,12 +61,12 @@ def users_create(args):
raise SystemExit('Passwords did not match!')

if appbuilder.sm.find_user(args.username):
print('{} already exist in the db'.format(args.username))
print(f'{args.username} already exist in the db')
return
user = appbuilder.sm.add_user(args.username, args.firstname, args.lastname,
args.email, role, password)
if user:
print('{} user {} created.'.format(args.role, args.username))
print(f'{args.role} user {args.username} created.')
else:
raise SystemExit('Failed to create user.')

Expand All @@ -80,10 +80,10 @@ def users_delete(args):
user = next(u for u in appbuilder.sm.get_all_users()
if u.username == args.username)
except StopIteration:
raise SystemExit('{} is not a valid user.'.format(args.username))
raise SystemExit(f'{args.username} is not a valid user.')

if appbuilder.sm.del_register_user(user):
print('User {} deleted.'.format(args.username))
print(f'User {args.username} deleted.')
else:
raise SystemExit('Failed to delete user.')

Expand All @@ -108,7 +108,7 @@ def users_manage_role(args, remove=False):
role = appbuilder.sm.find_role(args.role)
if not role:
valid_roles = appbuilder.sm.get_all_roles()
raise SystemExit('{} is not a valid role. Valid roles are: {}'.format(args.role, valid_roles))
raise SystemExit(f'{args.role} is not a valid role. Valid roles are: {valid_roles}')

if remove:
if role in user.roles:
Expand Down Expand Up @@ -167,10 +167,10 @@ def users_import(args):

users_list = None # pylint: disable=redefined-outer-name
try:
with open(json_file, 'r') as file:
with open(json_file) as file:
users_list = json.loads(file.read())
except ValueError as e:
print("File '{}' is not valid JSON. Error: {}".format(json_file, e))
print(f"File '{json_file}' is not valid JSON. Error: {e}")
sys.exit(1)

users_created, users_updated = _import_users(users_list)
Expand All @@ -194,7 +194,7 @@ def _import_users(users_list): # pylint: disable=redefined-outer-name
role = appbuilder.sm.find_role(rolename)
if not role:
valid_roles = appbuilder.sm.get_all_roles()
print("Error: '{}' is not a valid role. Valid roles are: {}".format(rolename, valid_roles))
print(f"Error: '{rolename}' is not a valid role. Valid roles are: {valid_roles}")
sys.exit(1)
else:
roles.append(role)
Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/commands/variable_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def variables_export(args):

def _import_helper(filepath):
"""Helps import variables from the file"""
with open(filepath, 'r') as varfile:
with open(filepath) as varfile:
data = varfile.read()

try:
Expand All @@ -101,7 +101,7 @@ def _import_helper(filepath):
suc_count += 1
print("{} of {} variables successfully updated.".format(suc_count, len(var_json)))
if fail_count:
print("{} variable(s) failed to be updated.".format(fail_count))
print(f"{fail_count} variable(s) failed to be updated.")


def _variable_export_helper(filepath):
Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/commands/webserver_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def _wait_until_true(self, fn, timeout: int = 0) -> None:
while not fn():
if 0 < timeout <= time.time() - start_time:
raise AirflowWebServerTimeout(
"No response from gunicorn master within {0} seconds".format(timeout)
f"No response from gunicorn master within {timeout} seconds"
)
sleep(0.1)

Expand Down Expand Up @@ -328,7 +328,7 @@ def webserver(args):

if args.debug:
print(
"Starting the web server on port {0} and host {1}.".format(
"Starting the web server on port {} and host {}.".format(
args.port, args.hostname))
app = create_app(testing=conf.getboolean('core', 'unit_test_mode'))
app.run(debug=True, use_reloader=not app.config['TESTING'],
Expand Down
Loading

0 comments on commit 8c42cf1

Please sign in to comment.