Skip to content

Commit

Permalink
Refactor: SKIPPED shouldn't be logged again as SUCCESS. (#14822)
Browse files Browse the repository at this point in the history
* `SKIPPED` shouldn't be logged again as `SUCCESS`.

* `_safe_date` duplicates with `_date_or_empty`.

* Borrowed advantage from `_safe_date`.
  • Loading branch information
suiting-young authored Jun 10, 2021
1 parent f2315bf commit dbeec89
Showing 1 changed file with 20 additions and 45 deletions.
65 changes: 20 additions & 45 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1087,12 +1087,23 @@ def check_and_change_state_before_execution( # pylint: disable=too-many-argumen
self.log.info("Executing %s on %s", self.task, self.execution_date)
return True

def _date_or_empty(self, attr):
if hasattr(self, attr):
date = getattr(self, attr)
if date:
return date.strftime('%Y%m%dT%H%M%S')
return ''
def _date_or_empty(self, attr: str):
result = getattr(self, attr, None) # type: datetime
return result.strftime('%Y%m%dT%H%M%S') if result else ''

def _log_state(self, lead_msg: str = ''):
self.log.info(
'%sMarking task as %s.'
+ ' dag_id=%s, task_id=%s,'
+ ' execution_date=%s, start_date=%s, end_date=%s',
lead_msg,
self.state.upper(),
self.dag_id,
self.task_id,
self._date_or_empty('execution_date'),
self._date_or_empty('start_date'),
self._date_or_empty('end_date'),
)

@provide_session
@Sentry.enrich_errors
Expand Down Expand Up @@ -1147,15 +1158,6 @@ def _run_raw_task(
self.log.info(e)
self.refresh_from_db(lock_for_update=True)
self.state = State.SKIPPED
self.log.info(
'Marking task as SKIPPED. '
'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
self.dag_id,
self.task_id,
self._date_or_empty('execution_date'),
self._date_or_empty('start_date'),
self._date_or_empty('end_date'),
)
except AirflowRescheduleException as reschedule_exception:
self.refresh_from_db()
self._handle_reschedule(actual_start_date, reschedule_exception, test_mode)
Expand All @@ -1181,17 +1183,9 @@ def _run_raw_task(
finally:
Stats.incr(f'ti.finish.{task.dag_id}.{task.task_id}.{self.state}')

# Recording SUCCESS
# Recording SKIPPED or SUCCESS
self.end_date = timezone.utcnow()
self.log.info(
'Marking task as SUCCESS. '
'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
self.dag_id,
self.task_id,
self._date_or_empty('execution_date'),
self._date_or_empty('start_date'),
self._date_or_empty('end_date'),
)
self._log_state()
self.set_duration()
if not test_mode:
session.add(Log(self.state, self))
Expand Down Expand Up @@ -1458,25 +1452,12 @@ def handle_failure(

if force_fail or not self.is_eligible_to_retry():
self.state = State.FAILED
if force_fail:
log_message = "Immediate failure requested. Marking task as FAILED."
else:
log_message = "Marking task as FAILED."
email_for_state = task.email_on_failure
else:
self.state = State.UP_FOR_RETRY
log_message = "Marking task as UP_FOR_RETRY."
email_for_state = task.email_on_retry

self.log.info(
'%s dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
log_message,
self.dag_id,
self.task_id,
self._safe_date('execution_date', '%Y%m%dT%H%M%S'),
self._safe_date('start_date', '%Y%m%dT%H%M%S'),
self._safe_date('end_date', '%Y%m%dT%H%M%S'),
)
self._log_state('Immediate failure requested. ' if force_fail else '')
if email_for_state and task.email:
try:
self.email_alert(error)
Expand All @@ -1502,12 +1483,6 @@ def is_eligible_to_retry(self):
"""Is task instance is eligible for retry"""
return self.task.retries and self.try_number <= self.max_tries

def _safe_date(self, date_attr, fmt):
result = getattr(self, date_attr, None)
if result is not None:
return result.strftime(fmt)
return ''

@provide_session
def get_template_context(self, session=None) -> Context: # pylint: disable=too-many-locals
"""Return TI Context"""
Expand Down

0 comments on commit dbeec89

Please sign in to comment.