Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky dask backfill test in quarantine #32778

Closed
1 task done
potiuk opened this issue Jul 22, 2023 · 6 comments · Fixed by #32991
Closed
1 task done

Flaky dask backfill test in quarantine #32778

potiuk opened this issue Jul 22, 2023 · 6 comments · Fixed by #32991
Labels
affected_version:main_branch Issues Reported for main branch area:backfill Specifically for backfill related area:core area:Scheduler including HA (high availability) scheduler kind:meta High-level information important to the community priority:high High priority bug that should be patched quickly but does not require immediate new release Quarantine Issues that are occasionally failing and are quarantined
Milestone

Comments

@potiuk
Copy link
Member

potiuk commented Jul 22, 2023

Body

We have recently started to observe a very flaky

tests/executors/test_dask_executor.py::TestDaskExecutor::test_backfill_integration test - especially Python 3.8 + postgres 3.11 combo seems to trigger it easily -but not always.

Example of failure here:

https://github.com/apache/airflow/actions/runs/5632434844/job/15260418883?pr=32776

Example errors:

E       psycopg2.errors.DeadlockDetected: deadlock detected
E       DETAIL:  Process 604 waits for ShareLock on transaction 7154; blocked by process 690.
E       Process 690 waits for ShareLock on transaction 7152; blocked by process 604.
E       HINT:  See server log for query details.
E       CONTEXT:  while updating tuple (2,204) in relation "dag_run"

Details:

self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0x7fd29cc25880>
cursor = <cursor object at 0x7fd29c8589a0; closed: -1>
statement = 'UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s'
parameters = {'dag_run_id': 23, 'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 22, 19, 58, 26, 211427, tzinfo=Timezone('UTC'))}
context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0x7fd27524c9a0>
airflow/jobs/backfill_job_runner.py:914: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
airflow/utils/session.py:74: in wrapper
    return func(*args, **kwargs)
airflow/jobs/backfill_job_runner.py:802: in _execute_dagruns
    processed_dag_run_dates = self._process_backfill_task_instances(
airflow/jobs/backfill_job_runner.py:645: in _process_backfill_task_instances
    session.commit()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1454: in commit
    self._transaction.commit(_to_root=self.future)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:832: in commit
    self._prepare_impl()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:811: in _prepare_impl
    self.session.flush()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3449: in flush
    self._flush(objects)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3589: in _flush
    transaction.rollback(_capture_exception=True)
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__
    compat.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3549: in _flush
    flush_context.execute()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:456: in execute
    rec.execute(self)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:630: in execute
    util.preloaded.orm_persistence.save_obj(
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:237: in save_obj
    _emit_update_statements(
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:1001: in _emit_update_statements
    c = connection._execute_20(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1710: in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py:334: in _execute_on_connection
    return connection._execute_clauseelement(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1577: in _execute_clauseelement
    ret = self._execute_context(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1953: in _execute_context
    self._handle_dbapi_exception(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception
    util.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
    raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1910: in _execute_context
    self.dialect.do_execute(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

Eventually failing

E                       sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.DeadlockDetected) deadlock detected
E                       DETAIL:  Process 604 waits for ShareLock on transaction 7154; blocked by process 690.
E                       Process 690 waits for ShareLock on transaction 7152; blocked by process 604.
E                       HINT:  See server log for query details.
E                       CONTEXT:  while updating tuple (2,204) in relation "dag_run"
E                       
E                       [SQL: UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s]
E                       [parameters: {'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 22, 19, 58, 26, 211427, tzinfo=Timezone('UTC')), 'dag_run_id': 23}]
E                       (Background on this error at: https://sqlalche.me/e/14/e3q8) (Background on this error at: https://sqlalche.me/e/14/7s2a)

Would be great to track it down.

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.
@potiuk potiuk added the kind:meta High-level information important to the community label Jul 22, 2023
potiuk added a commit to potiuk/airflow that referenced this issue Jul 23, 2023
The test has been recently failing with deadlock (see apache#32778) and
needs thorough looking at if we want to find the root cause/remedium.

In the meantime it looks like a niche case connected with Dask
Executor that is rather obsure and we have no expertise in solving
problems with and diagnosing, so until the problem is diagnosed
it might be a long time (and maybe even we decide not to care
about it and let Dask community take a look and either fix or
ignore it.

We aim to have a very low number of those Quarantined tests
(currently we have 1 and we have not run it for a while as this
was a mysql test run on Postgres) but we have now the opportunity
to also improve the quarantined tests framework.

This test will be run together with other (1) quarantined test and:

* they will not be run in our regular tests
* they are run sequentially not in parallel with all other tests
* they are run for all 4 backends but only for the default
  versions of those backends
* failure of the quarantined tests will not cause failure of the
  whole job or limit constraints from being generated and updated
@potiuk
Copy link
Member Author

potiuk commented Jul 23, 2023

More information - aftere separating it out to a separate job that runs sequentially - it seems that it fails much more often and even in isolation:

https://github.com/apache/airflow/actions/runs/5635480217/job/15266574775?pr=32780#step:5:538

__________________ TestDaskExecutor.test_backfill_integration __________________
  
  self = <sqlalchemy.future.engine.Connection object at 0x7f99d4144100>
  dialect = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0x7f9a2d[526](https://github.com/apache/airflow/actions/runs/5635480217/job/15266574775?pr=32780#step:5:539)130>
  constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2'>>
  statement = 'UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s'
  parameters = {'dag_run_id': 1, 'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, tzinfo=Timezone('UTC'))}
  execution_options = immutabledict({'autocommit': True, 'compiled_cache': {(<sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 obj...ed_at'), False, False), <sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0x7f99d41ae3a0>, 208]}})
  args = (<sqlalchemy.dialects.postgresql.psycopg2.PGCompiler_psycopg2 object at 0x7f99d41a5fa0>, [{'dag_run_id': 1, 'last_sche...n': None}], <sqlalchemy.sql.dml.Update object at 0x7f99d58688e0>, [BindParameter('dag_run_id', None, type_=Integer())])
  kw = {'cache_hit': symbol('CACHE_HIT')}
  branched = <sqlalchemy.future.engine.Connection object at 0x7f99d4144100>
  yp = None
  conn = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f99d4159a60>
  context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0x7f99d41a8c70>
  cursor = <cursor object at 0x7f99d6b97c70; closed: -1>, evt_handled = False
  
      def _execute_context(
          self,
          dialect,
          constructor,
          statement,
          parameters,
          execution_options,
          *args,
          **kw
      ):
          """Create an :class:`.ExecutionContext` and execute, returning
          a :class:`_engine.CursorResult`."""
      
          branched = self
          if self.__branch_from:
              # if this is a "branched" connection, do everything in terms
              # of the "root" connection, *except* for .close(), which is
              # the only feature that branching provides
              self = self.__branch_from
      
          if execution_options:
              yp = execution_options.get("yield_per", None)
              if yp:
                  execution_options = execution_options.union(
                      {"stream_results": True, "max_row_buffer": yp}
                  )
      
          try:
              conn = self._dbapi_connection
              if conn is None:
                  conn = self._revalidate_connection()
      
              context = constructor(
                  dialect, self, conn, execution_options, *args, **kw
              )
          except (exc.PendingRollbackError, exc.ResourceClosedError):
              raise
          except BaseException as e:
              self._handle_dbapi_exception(
                  e, util.text_type(statement), parameters, None, None
              )
      
          if (
              self._transaction
              and not self._transaction.is_active
              or (
                  self._nested_transaction
                  and not self._nested_transaction.is_active
              )
          ):
              self._invalid_transaction()
      
          elif self._trans_context_manager:
              TransactionalContext._trans_ctx_check(self)
      
          if self._is_future and self._transaction is None:
              self._autobegin()
      
          context.pre_exec()
      
          if dialect.use_setinputsizes:
              context._set_input_sizes()
      
          cursor, statement, parameters = (
              context.cursor,
              context.statement,
              context.parameters,
          )
      
          if not context.executemany:
              parameters = parameters[0]
      
          if self._has_events or self.engine._has_events:
              for fn in self.dispatch.before_cursor_execute:
                  statement, parameters = fn(
                      self,
                      cursor,
                      statement,
                      parameters,
                      context,
                      context.executemany,
                  )
      
          if self._echo:
      
              self._log_info(statement)
      
              stats = context._get_cache_stats()
      
              if not self.engine.hide_parameters:
                  self._log_info(
                      "[%s] %r",
                      stats,
                      sql_util._repr_params(
                          parameters, batches=10, ismulti=context.executemany
                      ),
                  )
              else:
                  self._log_info(
                      "[%s] [SQL parameters hidden due to hide_parameters=True]"
                      % (stats,)
                  )
      
          evt_handled = False
          try:
              if context.executemany:
                  if self.dialect._has_events:
                      for fn in self.dialect.dispatch.do_executemany:
                          if fn(cursor, statement, parameters, context):
                              evt_handled = True
                              break
                  if not evt_handled:
                      self.dialect.do_executemany(
                          cursor, statement, parameters, context
                      )
              elif not parameters and context.no_parameters:
                  if self.dialect._has_events:
                      for fn in self.dialect.dispatch.do_execute_no_params:
                          if fn(cursor, statement, context):
                              evt_handled = True
                              break
                  if not evt_handled:
                      self.dialect.do_execute_no_params(
                          cursor, statement, context
                      )
              else:
                  if self.dialect._has_events:
                      for fn in self.dialect.dispatch.do_execute:
                          if fn(cursor, statement, parameters, context):
                              evt_handled = True
                              break
                  if not evt_handled:
  >                   self.dialect.do_execute(
                          cursor, statement, parameters, context
                      )
  
  /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1910: 
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  
  self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0x7f9a2d526130>
  cursor = <cursor object at 0x7f99d6b97c70; closed: -1>
  statement = 'UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s'
  parameters = {'dag_run_id': 1, 'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, tzinfo=Timezone('UTC'))}
  context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0x7f99d41a8c70>
  
      def do_execute(self, cursor, statement, parameters, context=None):
  >       cursor.execute(statement, parameters)
  E       psycopg2.errors.DeadlockDetected: deadlock detected
  E       DETAIL:  Process 206 waits for ShareLock on transaction 2211; blocked by process 265.
  E       Process 265 waits for ShareLock on transaction 2210; blocked by process 206.
  E       HINT:  See server log for query details.
  E       CONTEXT:  while updating tuple (0,215) in relation "dag_run"
  
  /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:736: DeadlockDetected
  
  The above exception was the direct cause of the following exception:
  
  self = <airflow.jobs.backfill_job_runner.BackfillJobRunner object at 0x7f99d608fcd0>
  session = <sqlalchemy.orm.session.Session object at 0x7f99fa1165b0>
  
      @provide_session
      def _execute(self, session: Session = NEW_SESSION) -> None:
          """
          Initialize all required components of a dag for a specified date range and execute the tasks.
      
          :meta private:
          """
          ti_status = BackfillJobRunner._DagRunTaskStatus()
      
          start_date = self.bf_start_date
      
          # Get DagRun schedule between the start/end dates, which will turn into dag runs.
          dagrun_start_date = timezone.coerce_datetime(start_date)
          if self.bf_end_date is None:
              dagrun_end_date = pendulum.now(timezone.utc)
          else:
              dagrun_end_date = pendulum.instance(self.bf_end_date)
          dagrun_infos = list(self.dag.iter_dagrun_infos_between(dagrun_start_date, dagrun_end_date))
          if self.run_backwards:
              tasks_that_depend_on_past = [t.task_id for t in self.dag.task_dict.values() if t.depends_on_past]
              if tasks_that_depend_on_past:
                  raise AirflowException(
                      f"You cannot backfill backwards because one or more "
                      f'tasks depend_on_past: {",".join(tasks_that_depend_on_past)}'
                  )
              dagrun_infos = dagrun_infos[::-1]
      
          if not dagrun_infos:
              if not self.run_at_least_once:
                  self.log.info("No run dates were found for the given dates and dag interval.")
                  return
              dagrun_infos = [DagRunInfo.interval(dagrun_start_date, dagrun_end_date)]
      
          dag_with_subdags_ids = [d.dag_id for d in self._get_dag_with_subdags()]
          running_dagruns = DagRun.find(
              dag_id=dag_with_subdags_ids,
              execution_start_date=self.bf_start_date,
              execution_end_date=self.bf_end_date,
              no_backfills=True,
              state=DagRunState.RUNNING,
          )
      
          if running_dagruns:
              for run in running_dagruns:
                  self.log.error(
                      "Backfill cannot be created for DagRun %s in %s, as there's already %s in a RUNNING "
                      "state.",
                      run.run_id,
                      run.execution_date.strftime("%Y-%m-%dT%H:%M:%S"),
                      run.run_type,
                  )
              self.log.error(
                  "Changing DagRun into BACKFILL would cause scheduler to lose track of executing "
                  "tasks. Not changing DagRun type into BACKFILL, and trying insert another DagRun into "
                  "database would cause database constraint violation for dag_id + execution_date "
                  "combination. Please adjust backfill dates or wait for this DagRun to finish.",
              )
              return
          # picklin'
          pickle_id = None
      
          executor_class, _ = ExecutorLoader.import_default_executor_cls()
      
          if not self.donot_pickle and executor_class.supports_pickling:
              pickle = DagPickle(self.dag)
              session.add(pickle)
              session.commit()
              pickle_id = pickle.id
      
          executor = self.job.executor
          executor.job_id = self.job.id
          executor.start()
      
          ti_status.total_runs = len(dagrun_infos)  # total dag runs in backfill
      
          try:
              remaining_dates = ti_status.total_runs
              while remaining_dates > 0:
                  dagrun_infos_to_process = [
                      dagrun_info
                      for dagrun_info in dagrun_infos
                      if dagrun_info.logical_date not in ti_status.executed_dag_run_dates
                  ]
  >               self._execute_dagruns(
                      dagrun_infos=dagrun_infos_to_process,
                      ti_status=ti_status,
                      executor=executor,
                      pickle_id=pickle_id,
                      start_date=start_date,
                      session=session,
                  )
  
  airflow/jobs/backfill_job_runner.py:914: 
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  airflow/utils/session.py:74: in wrapper
      return func(*args, **kwargs)
  airflow/jobs/backfill_job_runner.py:802: in _execute_dagruns
      processed_dag_run_dates = self._process_backfill_task_instances(
  airflow/jobs/backfill_job_runner.py:645: in _process_backfill_task_instances
      session.commit()
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1454: in commit
      self._transaction.commit(_to_root=self.future)
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:832: in commit
      self._prepare_impl()
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:811: in _prepare_impl
      self.session.flush()
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3449: in flush
      self._flush(objects)
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3589: in _flush
      transaction.rollback(_capture_exception=True)
  /usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:70: in __exit__
      compat.raise_(
  /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
      raise exception
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:3[549](https://github.com/apache/airflow/actions/runs/5635480217/job/15266574775?pr=32780#step:5:562): in _flush
      flush_context.execute()
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:456: in execute
      rec.execute(self)
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:630: in execute
      util.preloaded.orm_persistence.save_obj(
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:237: in save_obj
      _emit_update_statements(
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:1001: in _emit_update_statements
      c = connection._execute_20(
  /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1710: in _execute_20
      return meth(self, args_10style, kwargs_10style, execution_options)
  /usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py:334: in _execute_on_connection
      return connection._execute_clauseelement(
  /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1577: in _execute_clauseelement
      ret = self._execute_context(
  /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1953: in _execute_context
      self._handle_dbapi_exception(
  /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:2134: in _handle_dbapi_exception
      util.raise_(
  /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:211: in raise_
      raise exception
  /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1910: in _execute_context
      self.dialect.do_execute(
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  
  self = <sqlalchemy.dialects.postgresql.psycopg2.PGDialect_psycopg2 object at 0x7f9a2d526130>
  cursor = <cursor object at 0x7f99d6b97c70; closed: -1>
  statement = 'UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s'
  parameters = {'dag_run_id': 1, 'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, tzinfo=Timezone('UTC'))}
  context = <sqlalchemy.dialects.postgresql.psycopg2.PGExecutionContext_psycopg2 object at 0x7f99d41a8c70>
  
      def do_execute(self, cursor, statement, parameters, context=None):
  >       cursor.execute(statement, parameters)
  E       sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
  E       DETAIL:  Process 206 waits for ShareLock on transaction 2211; blocked by process 265.
  E       Process 265 waits for ShareLock on transaction 2210; blocked by process 206.
  E       HINT:  See server log for query details.
  E       CONTEXT:  while updating tuple (0,215) in relation "dag_run"
  E       
  E       [SQL: UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s]
  E       [parameters: {'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, tzinfo=Timezone('UTC')), 'dag_run_id': 1}]
  E       (Background on this error at: https://sqlalche.me/e/14/e3q8)
  
  /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:736: OperationalError
  
  During handling of the above exception, another exception occurred:
  
  self = <tests.executors.test_dask_executor.TestDaskExecutor object at 0x7f99fe7af340>
  
      @pytest.mark.quarantined
      @pytest.mark.execution_timeout(180)
      def test_backfill_integration(self):
          """
          Test that DaskExecutor can be used to backfill example dags
          """
          dag = self.dagbag.get_dag("example_bash_operator")
      
          job = Job(
              executor=DaskExecutor(cluster_address=self.cluster.scheduler_address),
          )
          job_runner = BackfillJobRunner(
              job=job,
              dag=dag,
              start_date=DEFAULT_DATE,
              end_date=DEFAULT_DATE,
              ignore_first_depends_on_past=True,
          )
  >       run_job(job=job, execute_callable=job_runner._execute)
  
  tests/executors/test_dask_executor.py:125: 
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  airflow/utils/session.py:77: in wrapper
      return func(*args, session=session, **kwargs)
  airflow/jobs/job.py:280: in run_job
      return execute_job(job, execute_callable=execute_callable)
  airflow/jobs/job.py:309: in execute_job
      ret = execute_callable()
  airflow/utils/session.py:77: in wrapper
      return func(*args, session=session, **kwargs)
  airflow/jobs/backfill_job_runner.py:943: in _execute
      session.commit()
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:1454: in commit
      self._transaction.commit(_to_root=self.future)
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:830: in commit
      self._assert_active(prepared_ok=True)
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  
  self = <sqlalchemy.orm.session.SessionTransaction object at 0x7f99d4154a30>
  prepared_ok = True, rollback_ok = False, deactive_ok = False
  closed_msg = 'This transaction is closed'
  
      def _assert_active(
          self,
          prepared_ok=False,
          rollback_ok=False,
          deactive_ok=False,
          closed_msg="This transaction is closed",
      ):
          if self._state is COMMITTED:
              raise sa_exc.InvalidRequestError(
                  "This session is in 'committed' state; no further "
                  "SQL can be emitted within this transaction."
              )
          elif self._state is PREPARED:
              if not prepared_ok:
                  raise sa_exc.InvalidRequestError(
                      "This session is in 'prepared' state; no further "
                      "SQL can be emitted within this transaction."
                  )
          elif self._state is DEACTIVE:
              if not deactive_ok and not rollback_ok:
                  if self._rollback_exception:
  >                   raise sa_exc.PendingRollbackError(
                          "This Session's transaction has been rolled back "
                          "due to a previous exception during flush."
                          " To begin a new transaction with this Session, "
                          "first issue Session.rollback()."
                          " Original exception was: %s"
                          % self._rollback_exception,
                          code="7s2a",
  E                       sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.DeadlockDetected) deadlock detected
  E                       DETAIL:  Process 206 waits for ShareLock on transaction 2211; blocked by process 265.
  E                       Process 265 waits for ShareLock on transaction 2210; blocked by process 206.
  E                       HINT:  See server log for query details.
  E                       CONTEXT:  while updating tuple (0,215) in relation "dag_run"
  E                       
  E                       [SQL: UPDATE dag_run SET last_scheduling_decision=%(last_scheduling_decision)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s]
  E                       [parameters: {'last_scheduling_decision': None, 'updated_at': datetime.datetime(2023, 7, 23, 8, 57, 50, 684579, tzinfo=Timezone('UTC')), 'dag_run_id': 1}]
  E                       (Background on this error at: https://sqlalche.me/e/14/e3q8) (Background on this error at: https://sqlalche.me/e/14/7s2a)
  
  /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:[604](https://github.com/apache/airflow/actions/runs/5635480217/job/15266574775?pr=32780#step:5:617): PendingRollbackError

@potiuk
Copy link
Member Author

potiuk commented Jul 23, 2023

cc: @ashb @uranusjr @hussein-awala @dstandish @o-nikolas @vandonr-amz @vincbeck - I have a reason to believe this has been caused by some of the recent changes in setup/teardown or some recent optimisations in the core of scheduling (this is a bit of a wild guess of course, but I think it only started to appear relatively recently (1 week or so).

I seriously doubt it has anything to do with Dask, it's more triggered by Dask because of it's distributed nature that can run task in much more parallel and disstributed fashion.

It occurs exclusively on Postgres and I think I ONLY saw it with Postgres 11. And it awfully looks like a REAL problem to diagnose and solve before we hit 2.7.0 - if we hit it that often in tests with a single backfil, then in production it might happen very, very quickly.

As written in #32780 I will be less available the coming week, so maybe if someone can take a look that woudl be great. In the meantime merging #32780 should mitigate the failures we have in main and isolate that one until we fix it.

@potiuk potiuk added this to the Airflow 2.7.0 milestone Jul 23, 2023
@potiuk potiuk added priority:high High priority bug that should be patched quickly but does not require immediate new release area:Scheduler including HA (high availability) scheduler Quarantine Issues that are occasionally failing and are quarantined area:core area:backfill Specifically for backfill related affected_version:main_branch Issues Reported for main branch labels Jul 23, 2023
@potiuk
Copy link
Member Author

potiuk commented Jul 23, 2023

The fact that this deadlock happens on Postgres, indicates that it is a "Real" issue. Unlike MySQL - Postgres is rather good in detecting real deadlocks and does a great job in avoiding them so this is rather real one. I will also modify the code of our CI build to always upload the logs from containers after qurantined job. That might help with diagnosing the issue as we will see more details from Postgres server on where the deadlock happen.

potiuk added a commit that referenced this issue Jul 23, 2023
The test has been recently failing with deadlock (see #32778) and
needs thorough looking at if we want to find the root cause/remedium.

In the meantime it looks like a niche case connected with Dask
Executor that is rather obsure and we have no expertise in solving
problems with and diagnosing, so until the problem is diagnosed
it might be a long time (and maybe even we decide not to care
about it and let Dask community take a look and either fix or
ignore it.

We aim to have a very low number of those Quarantined tests
(currently we have 1 and we have not run it for a while as this
was a mysql test run on Postgres) but we have now the opportunity
to also improve the quarantined tests framework.

This test will be run together with other (1) quarantined test and:

* they will not be run in our regular tests
* they are run sequentially not in parallel with all other tests
* they are run for all 4 backends but only for the default
  versions of those backends
* failure of the quarantined tests will not cause failure of the
  whole job or limit constraints from being generated and updated
potiuk added a commit that referenced this issue Jul 24, 2023
…#32775)

* Quarantine test_backfill_integration in dask executor

The test has been recently failing with deadlock (see #32778) and
needs thorough looking at if we want to find the root cause/remedium.

In the meantime it looks like a niche case connected with Dask
Executor that is rather obsure and we have no expertise in solving
problems with and diagnosing, so until the problem is diagnosed
it might be a long time (and maybe even we decide not to care
about it and let Dask community take a look and either fix or
ignore it.

We aim to have a very low number of those Quarantined tests
(currently we have 1 and we have not run it for a while as this
was a mysql test run on Postgres) but we have now the opportunity
to also improve the quarantined tests framework.

This test will be run together with other (1) quarantined test and:

* they will not be run in our regular tests
* they are run sequentially not in parallel with all other tests
* they are run for all 4 backends but only for the default
  versions of those backends
* failure of the quarantined tests will not cause failure of the
  whole job or limit constraints from being generated and updated

* Add pre-Airflow-2-7 hardcoded defaults for config for older providers

During thorough testing and review of moving configuration to provoders
I realised that there was a case that was not handled properly. In some
cases some providers and DAGs could rely on some default values being
available as default, but when we move them from core, and use older
version of provider those defaults were not available:

* they were remove as defaults in core
* the old providers did not have "config" section to contribute the
  defaults

This would be a breaking change and old providers (Celery, K8s) could
fail - as it happened in some tests.

This PR implements a nice solution to that, also allowing to remove
some manual fallbacks in Celery and Kubernetes executor code.

The solution is to add a hard-coded "pre-2.7" configuration which
would only contain "provider" pre-2.7 hard-coded defaults and make
it a fallback option if the values are neither set nor defaults
contributed by the providers.

We do not have to maintain those - the defaults are "frozen"
effectively at the values available just before 2.7. The nice side
effect is that we can remove a number of fallbacks, because this
hard-coded configuration becomes the fallback automatically,

That entirely solves the case where you want to install older
providers on 2.7 where config.yml does not contain those provider
values.

* Update airflow/configuration.py

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

---------

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
@vandonr-amz
Copy link
Contributor

I have very little free-work time this week, I can try taking a closer look next week if there is no movement by then.

@potiuk potiuk changed the title Flaky dask backfill test Flaky dask backfill test in quarantine Jul 31, 2023
potiuk added a commit to potiuk/airflow that referenced this issue Aug 8, 2023
The dask_executor backfill tests started to fail recently more often due
to backfill exception, and the likely cause for it is that it is now
better parallelise execution and triggering of the deadlocks because of
contention betwee dag_run state update and task state update had
become much easier.

While this PR does not fix the underlying issue, it catches the
operational error where the deadlock occured during the backfill.
and rolls back the operation.

This **should** be safe. backfil has a built-in mechanism to loop and
retry failed tasks and the test passed multiple times, completing the
backfill after this fix was applied. It was not easy to reproduce it
locally but it failed every 20-30 times. When extra logging was added,
it was always connected to OperationalException raised (and caught)
right after _per_task_process. The same exception was observed few times
when rollback was added, and despite it backfill job retried and
completed the process successfully every time. We also leave the logs
with exceptions and add reassuring messages that should make it clear
that in case backfill completes, the exceptions can be ignored as
the updates will be retried by the backfill job.

Fixes: apache#32778
potiuk added a commit that referenced this issue Aug 9, 2023
The dask_executor backfill tests started to fail recently more often due
to backfill exception, and the likely cause for it is that it is now
better parallelise execution and triggering of the deadlocks because of
contention betwee dag_run state update and task state update had
become much easier.

While this PR does not fix the underlying issue, it catches the
operational error where the deadlock occured during the backfill.
and rolls back the operation.

This **should** be safe. backfil has a built-in mechanism to loop and
retry failed tasks and the test passed multiple times, completing the
backfill after this fix was applied. It was not easy to reproduce it
locally but it failed every 20-30 times. When extra logging was added,
it was always connected to OperationalException raised (and caught)
right after _per_task_process. The same exception was observed few times
when rollback was added, and despite it backfill job retried and
completed the process successfully every time. We also leave the logs
with exceptions and add reassuring messages that should make it clear
that in case backfill completes, the exceptions can be ignored as
the updates will be retried by the backfill job.

Fixes: #32778
ephraimbuddy pushed a commit that referenced this issue Aug 9, 2023
The dask_executor backfill tests started to fail recently more often due
to backfill exception, and the likely cause for it is that it is now
better parallelise execution and triggering of the deadlocks because of
contention betwee dag_run state update and task state update had
become much easier.

While this PR does not fix the underlying issue, it catches the
operational error where the deadlock occured during the backfill.
and rolls back the operation.

This **should** be safe. backfil has a built-in mechanism to loop and
retry failed tasks and the test passed multiple times, completing the
backfill after this fix was applied. It was not easy to reproduce it
locally but it failed every 20-30 times. When extra logging was added,
it was always connected to OperationalException raised (and caught)
right after _per_task_process. The same exception was observed few times
when rollback was added, and despite it backfill job retried and
completed the process successfully every time. We also leave the logs
with exceptions and add reassuring messages that should make it clear
that in case backfill completes, the exceptions can be ignored as
the updates will be retried by the backfill job.

Fixes: #32778
(cherry picked from commit f616ee8)
@potiuk potiuk reopened this Nov 15, 2023
@potiuk
Copy link
Member Author

potiuk commented Nov 15, 2023

The test is still flaky - reopening it and marking as quarantined for now.

@potiuk
Copy link
Member Author

potiuk commented Dec 17, 2023

since we removed Daskexecutor provider this can be closed,

@potiuk potiuk closed this as completed Dec 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:main_branch Issues Reported for main branch area:backfill Specifically for backfill related area:core area:Scheduler including HA (high availability) scheduler kind:meta High-level information important to the community priority:high High priority bug that should be patched quickly but does not require immediate new release Quarantine Issues that are occasionally failing and are quarantined
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants