-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CDK: Emit control message on config mutation #19428
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alafanechere could you show an example of how would this be used in the context of an HTTPStream and a single-use oauth authenticator?
@sherifnada I achieved a working example for BingAds in 733b89a (will revert afterward) A $ python main.py read --catalog integration_tests/configured_catalog.json --config secrets/config.json
{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceBingAds"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Fetching access token ..."}}
{"type": "CONNECTOR_CONFIG", "emitted_at": 1668613492681.509, "connectorConfig": {"config": {"tenant_id": "common", "refresh_token": "<OBFUSCATED-FOR-DEMO>", "client_id": "<OBFUSCATED-FOR-DEMO>", "developer_token": "<OBFUSCATED-FOR-DEMO>", "reports_start_date": "2020-02-25", "hourly_reports": false, "daily_reports": false, "weekly_reports": true, "monthly_reports": true}}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: ad_groups "}}
... Records are coming in afterward |
I think there's one more envelope layer on that message, and it should be something like:
|
I think I forgot to wrap this around an AirbyteMessage right? |
That's probably it! |
f5c3d0c
to
8fd52f9
Compare
@sherifnada I tried to go in the direction you suggested by implementing a specific |
I tried to use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice! looking really good. Some questions
airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py
Outdated
Show resolved
Hide resolved
ValueError: Raised if the defined getters are not returning a value. | ||
""" | ||
for field_name, getter in [ | ||
("client_id", self.get_client_id), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we care about retaining and validating client_id and client_secret in this class instead of just passing up the concern to the super?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As client_id
, client_secret
, and refresh_token
are mandatory arguments of the parent class, and the current constructor signature of the new class only takes a connector_config
dict, I thought it would be safer to fail early if these fields' values could not be parsed from the config.
airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py
Outdated
Show resolved
Hide resolved
token_refresh_endpoint="https://id.getharvest.com/api/v2/oauth2/token", | ||
client_id=credentials.get("client_id"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why were these removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client_id
parameter does not exist in my SingleUseRefreshTokenOauth2Authenticator
implementation. The client_id
is retrieved from the configuration with dpath
try: | ||
response_json = self._get_refresh_access_token_response() | ||
return ( | ||
response_json[self.get_access_token_name()], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would recommend using dpath
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed but for implementation consistency with parent class and abstract class, I'd prefer to do this in a separate PR in which we can replace the access_token_name
, expires_in_name
, and refresh_token_name
by dpaths that can be used when parsing these responses. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense!
@sherifnada I used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff! requesting a couple small changes but no extra review needed from my side. Thank you for making this awesome change.
What
Closes #17910
Quoting @davinchia in #3990:
A new protocol message:
AirbyteControlMessage.ConnectorConfig
was introduced to enable the connector to share a configuration update with the platform.This PR is an attempt to update the CDK to enable automatic emission of
AirbyteControlMessage.ConnectorConfig
on mutation of aconfig
dict
keys, and nested keys.How
config
kwarg toOAuth2Authenticator.__init__
. Observe thisconfig
by creating anObservedDict
assigned toself.connector_config
ObservedDict
get created with aConfigObserver
object.ObservedDict
callConfigObserver.update
on__set_item__
calls (this is the function called on key assignment: e.gconfig["refresh_token"] = "new_refresh_token"
.ConfigObserver.update
emits anAirbyteControlMessage.ConnectorConfig
with the updated config.N.B.
I chose the observer pattern because I thought this could be a silver-bullet-like approach to enable additional future use cases around
AirbyteControlMessage.ConnectorConfig
message.The platform is not currently processing these new messages, so emitting will
AirbyteControlMessage.ConnectorConfig
will currently have no effect/Recommended reading order
airbyte/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py
Line 31 in 12db1b7
airbyte/airbyte-cdk/python/airbyte_cdk/config_observation.py
Line 14 in 12db1b7
airbyte/airbyte-cdk/python/unit_tests/sources/streams/http/requests_native_auth/test_requests_native_auth.py
Line 178 in 12db1b7
airbyte/airbyte-cdk/python/unit_tests/test_config_observation.py
Line 12 in 12db1b7
🚨 User Impact 🚨
To benefit from config observation, the connector developer can implement a custom authenticator inheriting from OAuth2Authenticator and create any token-refresh logic in this authenticator. The functions implementing this logic will have to mutate
self.connector_config
to emit a control message.