Skip to content

Conversation

@ipaddicting
Copy link
Contributor

@ipaddicting ipaddicting commented Nov 27, 2025

This PR is trying to fix the issue of serialization on redis message for AwaitMessageTrigger: Object of type 'bytes' is not JSON serializable.

FYI, error logs without this fix as following:

triggerer  | 2025-11-27T02:21:31.725627Z [info     ] 1 triggers currently running   [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:735
triggerer  | 2025-11-27T02:21:31.725707Z [info     ] 0 watchers currently running   [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:735
triggerer  | 2025-11-27T02:21:32.468039Z [info     ] Trigger fired event            [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:735 name='ID 3' result='TriggerEvent<{\'type\': \'message\', \'pattern\': None, \'channel\': b\'hug_alarm_events\', \'data\': b\'{"alarm_id":1234, "status":"Open", "timestamp":"2025-11-25 16:01:57"}\'}>'
triggerer  | 2025-11-27T02:21:32.468319Z [info     ] trigger completed              [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:735 name='ID 3'
triggerer  | 2025-11-27T02:21:32.746994Z [error    ] Exception when executing TriggerRunnerSupervisor.run [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:173
triggerer  | Traceback (most recent call last):
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1810, in _execute_context
triggerer  | context = constructor(
triggerer  | ^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 1078, in _init_compiled
triggerer  | processors[key](compiled_params[key])
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", line 1668, in process
triggerer  | return impl_processor(process_param(value, dialect))
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", line 2669, in process
triggerer  | serialized = json_serializer(value)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
triggerer  | return _default_encoder.encode(obj)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
triggerer  | chunks = self.iterencode(o, _one_shot=True)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
triggerer  | return _iterencode(o, 0)
triggerer  | ^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in default
triggerer  | raise TypeError(f'Object of type {o.__class__.__name__} '
triggerer  | TypeError: Object of type bytes is not JSON serializable
triggerer  | The above exception was the direct cause of the following exception:
triggerer  | Traceback (most recent call last):
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 170, in _execute
triggerer  | self.trigger_runner.run()
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 531, in run
triggerer  | self.handle_events()
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
triggerer  | return func(*args, **kwargs)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in handle_events
triggerer  | Trigger.submit_event(trigger_id=trigger_id, event=event)
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper
triggerer  | return func(*args, session=session, **kwargs)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", line 260, in submit_event
triggerer  | AssetManager.register_asset_change(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/assets/manager.py", line 161, in register_asset_change
triggerer  | session.flush()  # Ensure the event is written earlier than DDRQ entries below.
triggerer  | ^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
triggerer  | self._flush(objects)
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
triggerer  | with util.safe_reraise():
triggerer  | ^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
triggerer  | compat.raise_(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
triggerer  | raise exception
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
triggerer  | flush_context.execute()
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
triggerer  | rec.execute(self)
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
triggerer  | util.preloaded.orm_persistence.save_obj(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
triggerer  | _emit_insert_statements(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1238, in _emit_insert_statements
triggerer  | result = connection._execute_20(
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
triggerer  | return meth(self, args_10style, kwargs_10style, execution_options)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
triggerer  | return connection._execute_clauseelement(
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
triggerer  | ret = self._execute_context(
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1816, in _execute_context
triggerer  | self._handle_dbapi_exception(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
triggerer  | util.raise_(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
triggerer  | raise exception
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1810, in _execute_context
triggerer  | context = constructor(
triggerer  | ^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 1078, in _init_compiled
triggerer  | processors[key](compiled_params[key])
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", line 1668, in process
triggerer  | return impl_processor(process_param(value, dialect))
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", line 2669, in process
triggerer  | serialized = json_serializer(value)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
triggerer  | return _default_encoder.encode(obj)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
triggerer  | chunks = self.iterencode(o, _one_shot=True)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
triggerer  | return _iterencode(o, 0)
triggerer  | ^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in default
triggerer  | raise TypeError(f'Object of type {o.__class__.__name__} '
triggerer  | sqlalchemy.exc.StatementError: (builtins.TypeError) Object of type bytes is not JSON serializable
triggerer  | [SQL: INSERT INTO asset_event (asset_id, extra, source_task_id, source_dag_id, source_run_id, timestamp) VALUES (%s, %s, %s, %s, %s, %s)]
triggerer  | [parameters: [{'extra': {'from_trigger': True, 'payload': {'type': 'message', 'pattern': None, 'channel': b'hug_alarm_events', 'data': b'{"alarm_id":1234, "status":"Open", "timestamp":"2025-11-25 16:01:57"}'}}, 'asset_id': 3, 'source_task_id': None, 'source_run_id': None, 'source_dag_id': None}]]
triggerer  | 2025-11-27T02:21:32.757560Z [info     ] Waiting for triggers to clean up [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:176
triggerer  | 2025-11-27T02:21:32.761020Z [info     ] Process exited                 [supervisor] exit_code=-2 loc=supervisor.py:709 pid=8484 signal_sent=SIGINT
triggerer  | 2025-11-27T02:21:32.761116Z [info     ] Exited trigger loop            [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:181
triggerer  | Traceback (most recent call last):
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1810, in _execute_context
triggerer  | context = constructor(
triggerer  | ^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 1078, in _init_compiled
triggerer  | processors[key](compiled_params[key])
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", line 1668, in process
triggerer  | return impl_processor(process_param(value, dialect))
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", line 2669, in process
triggerer  | serialized = json_serializer(value)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
triggerer  | return _default_encoder.encode(obj)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
triggerer  | chunks = self.iterencode(o, _one_shot=True)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
triggerer  | return _iterencode(o, 0)
triggerer  | ^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in default
triggerer  | raise TypeError(f'Object of type {o.__class__.__name__} '
triggerer  | TypeError: Object of type bytes is not JSON serializable
triggerer  | The above exception was the direct cause of the following exception:
triggerer  | Traceback (most recent call last):
triggerer  | File "/home/airflow/.local/bin/airflow", line 7, in <module>
triggerer  | sys.exit(main())
triggerer  | ^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 55, in main
triggerer  | args.func(args)
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
triggerer  | return func(*args, **kwargs)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 114, in wrapper
triggerer  | return f(*args, **kwargs)
triggerer  | ^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
triggerer  | return func(*args, **kwargs)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 69, in triggerer
triggerer  | run_command_with_daemon_option(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
triggerer  | callback()
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 72, in <lambda>
triggerer  | callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate),
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 55, in triggerer_run
triggerer  | run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper
triggerer  | return func(*args, session=session, **kwargs)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 368, in run_job
triggerer  | return execute_job(job, execute_callable=execute_callable)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 397, in execute_job
triggerer  | ret = execute_callable()
triggerer  | ^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 170, in _execute
triggerer  | self.trigger_runner.run()
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 531, in run
triggerer  | self.handle_events()
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
triggerer  | return func(*args, **kwargs)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in handle_events
triggerer  | Trigger.submit_event(trigger_id=trigger_id, event=event)
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper
triggerer  | return func(*args, session=session, **kwargs)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", line 260, in submit_event
triggerer  | AssetManager.register_asset_change(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/assets/manager.py", line 161, in register_asset_change
triggerer  | session.flush()  # Ensure the event is written earlier than DDRQ entries below.
triggerer  | ^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
triggerer  | self._flush(objects)
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
triggerer  | with util.safe_reraise():
triggerer  | ^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
triggerer  | compat.raise_(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
triggerer  | raise exception
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
triggerer  | flush_context.execute()
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
triggerer  | rec.execute(self)
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
triggerer  | util.preloaded.orm_persistence.save_obj(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
triggerer  | _emit_insert_statements(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1238, in _emit_insert_statements
triggerer  | result = connection._execute_20(
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
triggerer  | return meth(self, args_10style, kwargs_10style, execution_options)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
triggerer  | return connection._execute_clauseelement(
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
triggerer  | ret = self._execute_context(
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1816, in _execute_context
triggerer  | self._handle_dbapi_exception(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
triggerer  | util.raise_(
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
triggerer  | raise exception
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1810, in _execute_context
triggerer  | context = constructor(
triggerer  | ^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 1078, in _init_compiled
triggerer  | processors[key](compiled_params[key])
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", line 1668, in process
triggerer  | return impl_processor(process_param(value, dialect))
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", line 2669, in process
triggerer  | serialized = json_serializer(value)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
triggerer  | return _default_encoder.encode(obj)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
triggerer  | chunks = self.iterencode(o, _one_shot=True)
triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
triggerer  | return _iterencode(o, 0)
triggerer  | ^^^^^^^^^^^^^^^^^
triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in default
triggerer  | raise TypeError(f'Object of type {o.__class__.__name__} '
triggerer  | sqlalchemy.exc.StatementError: (builtins.TypeError) Object of type bytes is not JSON serializable
triggerer  | [SQL: INSERT INTO asset_event (asset_id, extra, source_task_id, source_dag_id, source_run_id, timestamp) VALUES (%s, %s, %s, %s, %s, %s)]
triggerer  | [parameters: [{'extra': {'from_trigger': True, 'payload': {'type': 'message', 'pattern': None, 'channel': b'hug_alarm_events', 'data': b'{"alarm_id":1234, "status":"Open", "timestamp":"2025-11-25 16:01:57"}'}}, 'asset_id': 3, 'source_task_id': None, 'source_run_id': None, 'source_dag_id': None}]]

Basically, the TriggerEvent couldn't be persisted due to the JSON serialiaztion error, and the triggerer instance would crash after that.

Unfortunately, all tests for AwaitMessageTrigger were mocked...

@boring-cyborg
Copy link

boring-cyborg bot commented Nov 27, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the fix!

It would be nice to update the corresponding unit tests as well in providers/redis/tests/unit/redis/triggers/test_redis_await_message.py, thanks!

@ipaddicting
Copy link
Contributor Author

ipaddicting commented Dec 2, 2025

Nice! Thanks for the fix!

It would be nice to update the corresponding unit tests as well in providers/redis/tests/unit/redis/triggers/test_redis_await_message.py, thanks!

Just added a simple test that copied from the existing one. And I had a little trouble setting up a local development with dev container so that no local tests could be executed...

@jason810496 jason810496 merged commit 1c53e08 into apache:main Dec 2, 2025
79 checks passed
@boring-cyborg
Copy link

boring-cyborg bot commented Dec 2, 2025

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants