-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-3892] Create Redis pub sub sensor #4712
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some review here.
from mock import patch | ||
from mock import call | ||
|
||
from unittest.mock import MagicMock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mans2singh FYI, CI failed due to from unittest.mock import MagicMock
. Maybe you could do
from unittest.mock import MagicMock | |
from mock import MagicMock |
and than optimize imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected.
:param context: the context object | ||
:type context: dict | ||
:return: ``True`` if message (with type 'message') is available or ``False`` if not | ||
:rtype: bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could remove rtype
:rtype: bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed from doc.
ui_color = '#f0eee4' | ||
|
||
@apply_defaults | ||
def __init__(self, channels, hook, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use redis_conn_id
instand of hook
. we already have redis_hook
in airflow.contrib.hook.redis_hook
we could use it.
def __init__(self, channels, hook, *args, **kwargs): | |
def __init__(self, channels, redis_conn_id, *args, **kwargs): |
than we could create connection using
self.hook = RedisHook(redis_conn_id=self.redis_conn_id)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @zhongjiajie - I was trying the decouple the responsibility of creating the hook from using it by the sensor . This will allow injection of the hook and use mock hooks for unit testing.
Let me know your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mans2singh I think we should use conn str rather than hook as parameter. airflow provide function to create hook by conn str.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhongjiajie -
I have seen that pattern of passing a connection id and the sensor/operator creating the hook and using it.
I was trying to use dependency injection pattern to make it easy to inject a mock hook and test the sensor.
Is there any disadvantage of using this pattern ?
Thanks for your advice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mans2singh disadvantage is we have to write code to create hook in DAG file. I think not high cohesion
If we pass conn_id str, user could just pass conn_id str to active sensors. For unittest I think we could do like other sensors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use redis_conn_id as recommended.
configuration.load_test_config() | ||
|
||
self.log = logging.getLogger() | ||
self.log.setLevel(logging.DEBUG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could do unittest without log
self.log.setLevel(logging.DEBUG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
@zhongjiajie - |
Codecov Report
@@ Coverage Diff @@
## master #4712 +/- ##
==========================================
+ Coverage 74.73% 74.74% +0.01%
==========================================
Files 449 450 +1
Lines 28961 28982 +21
==========================================
+ Hits 21643 21664 +21
Misses 7318 7318
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please find my two cents
super(RedisPubSubSensor, self).__init__(*args, **kwargs) | ||
self.channels = channels | ||
self.redis_conn_id = redis_conn_id | ||
self.pubsub = RedisHook(redis_conn_id=self.redis_conn_id).get_conn().pubsub() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move L47 and L48 into function poke
before message = self.pubsub.get_message()
? discuss
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I placed these lines in the __init__
function so that they are executed once and poke
, which can be called multiple times, can just check the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mans2singh Ok, I get your point. maybe we should ask advice from maintainers about it. you can ask their reivew now
self.log.info('RedisPubSubSensor checking for message on channels: %s', self.channels) | ||
|
||
message = self.pubsub.get_message() | ||
self.log.info('Message %s from channel %s', message, self.channels) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In L66 check is message
is None, but log message here. will this get Message None from channel %s
or not? And L68 you log message and channel again. Is L63 necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some redundancy in the logs but it is easier to review the logs the see that the data/channel when the message is received. I can remove it if required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mans2singh Maybe we should discuss with maintainers. You could ask their advice while we finish this review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhongjiajie - I've removed the log as per your recommendation. Thanks for your review.
from airflow.utils import timezone | ||
from mock import patch | ||
from mock import call | ||
from mock import MagicMock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use single line
from mock import MagicMock | |
from mock import MagicMock patch call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
LGTM, but we still have a discuss in #4712 (comment) |
@feng-tao - Can you please advice who can review this pull request and give me your feedback ? |
|
||
class TestRedisPubSubSensor(unittest.TestCase): | ||
|
||
@patch('airflow.contrib.hooks.redis_hook.RedisHook.get_conn') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the CI actually runs a Redis using docker-compose. Maybe you can do end to end testing instead of mocking. This makes it easier when you want to test against newer versions of Redis. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko -
I see that the redis key sensor uses an integration test and will try your recommendation.
Thanks.
we have a celery queue sensor(https://github.com/apache/airflow/blob/master/airflow/contrib/sensors/celery_queue_sensor.py)which could be used to wait wait for celery queue back by Redis. Could you share more on your sensor use case? |
Hi @feng-tao The use case I have is to drive EMR workflow and its configuration based on events. In my workflow the time at which data is available and data size vary. Using static cluster config results in over/under provisioning of EMR cluster. Scheduling the workflow late in day when I am sure the data is available, leads to higher latency in processing. By using the information in the event, I can trigger the workflow with the configuration of cluster based on contents of event. Airflow already has Redis hook and a key sensor. This sensor adds events based triggering/configuration capabilities to it. The celery_queue_sensor, from my understanding, is for task scheduling, but let me know if I have misunderstood it's functionality. Please let me know your thoughts. |
Please let me know if you have advice for me. I can also create a simple example dag to print the events to demonstrate its usage. Thanks Mans |
@mans2singh sorry for the late review. Overall lgtm. We recently upgrade redis client(#4834). Could you rebase your pr with master and do a sanity test and see if it still works as expected? |
Use redis_conn_id, removed logging from test, corrected unittest import, removed rtype from poke doc
Hi @feng-tao - I've rebased the code and the integration unit/tests for redis sensor are passing locally. Can you please let me know how to resolve this and if there is anything else is required ? Thanks for your advice and help. Mans |
@mans2singh The CI currently flaky, If you could not find detail error, maybe you could submit and restart CI test again. |
@zhongjiajie - |
@mans2singh , I think we are fine. But could we remove that example dag as I don't think we need that given the sensor is not complicated? Thanks. Once you remove, I think we are good to go. |
I've removed the example per your recommendation. Please let me know if there is anything else required. Thanks for your comments/advice. Mans |
thanks @mans2singh , merged |
* [AIRFLOW-3892] Create Redis pub sub sensor * [AIRFLOW-3892] - Updated based on review comments (#4712) Use redis_conn_id, removed logging from test, corrected unittest import, removed rtype from poke doc * [AIRFLOW-3892] Combined import based on review comments * [AIRFLOW-3892] Removed extra log based on review comments * [AIRFLOW-3892] - Added integration tests based on review comments * [AIRFLOW-3892] - Added example dag to kick off the build * [AIRFLOW-3892] - Removed example dag based on review comments
* [AIRFLOW-3892] Create Redis pub sub sensor * [AIRFLOW-3892] - Updated based on review comments (#4712) Use redis_conn_id, removed logging from test, corrected unittest import, removed rtype from poke doc * [AIRFLOW-3892] Combined import based on review comments * [AIRFLOW-3892] Removed extra log based on review comments * [AIRFLOW-3892] - Added integration tests based on review comments * [AIRFLOW-3892] - Added example dag to kick off the build * [AIRFLOW-3892] - Removed example dag based on review comments
* [AIRFLOW-3892] Create Redis pub sub sensor * [AIRFLOW-3892] - Updated based on review comments (apache#4712) Use redis_conn_id, removed logging from test, corrected unittest import, removed rtype from poke doc * [AIRFLOW-3892] Combined import based on review comments * [AIRFLOW-3892] Removed extra log based on review comments * [AIRFLOW-3892] - Added integration tests based on review comments * [AIRFLOW-3892] - Added example dag to kick off the build * [AIRFLOW-3892] - Removed example dag based on review comments
* [AIRFLOW-3892] Create Redis pub sub sensor * [AIRFLOW-3892] - Updated based on review comments (apache#4712) Use redis_conn_id, removed logging from test, corrected unittest import, removed rtype from poke doc * [AIRFLOW-3892] Combined import based on review comments * [AIRFLOW-3892] Removed extra log based on review comments * [AIRFLOW-3892] - Added integration tests based on review comments * [AIRFLOW-3892] - Added example dag to kick off the build * [AIRFLOW-3892] - Removed example dag based on review comments
Make sure you have checked all steps below.
Jira
Description
Tests
tests/contrib/sensors/test_redis_pub_sub_sensor.py
Commits
Documentation
Code Quality
flake8