-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(integration/airbyte): Airbyte source ingestion integration #11
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial review comments
@config_class(AirbyteConfig) | ||
@support_status(SupportStatus.CERTIFIED) | ||
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") | ||
class AirbyteSource(Source): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enable stateful ingsetion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
class AirbyteSource(Source): | ||
""" | ||
This plugin extracts airbyte workspace, connections, sources, destinations and jobs. | ||
This plugin is in beta and has only been tested on PostgreSQL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add a description related to when to provide api_key vs username/password?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added that in source config description
|
||
@platform_name("Airbyte") | ||
@config_class(AirbyteSourceConfig) | ||
@support_status(SupportStatus.CERTIFIED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change it to SupportStatus.INCUBATING
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
class AirbyteSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): | ||
cloud_deploy: bool = pydantic.Field( | ||
default=False, | ||
description="Whether to fetch metadata from Airbyte Cloud or Airbyte OSS. For Airbyte Cloud provide api_key and for Airbyte OSS provide username/password", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enabled to fetch metadata from Airbyte Cloud. If it is enabled then provide api_key in the recipe. username & password is required for Airbyte OSS only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
|
||
class AirbyteSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): | ||
cloud_deploy: bool = pydantic.Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a method to validate if cloud_deploye is true then api_key should be present, Check powerbi config class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
config = AirbyteSourceConfig.parse_obj(config_dict) | ||
return cls(config, ctx) | ||
|
||
def get_workspace_workunit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this pass method ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed pass and added code in that method
|
||
|
||
class OssAPIResolver(DataResolverBase): | ||
BASE_URL = "http://localhost:8000/api/v1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
take this from config and keep default to this value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
# If any entity does not support aspect 'status' then skip that entity from adding status aspect. | ||
# Example like dataProcessInstance doesn't suppport status aspect. | ||
# If not skipped gives error: java.lang.RuntimeException: Unknown aspect status for entity dataProcessInstance | ||
skip_urns.add(urn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can say continue instead of collecting urns in skip_urns array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At source_helpers.py:100 we are yielding all workunits which needs to ingest, hence we cannot have continue statement at source_helpers.py:98.
We can have CONTINUE logic next to source:helpers.py:102 but there we need to create logic to extract entity type from string urn.
I think current changes are small and simple so lets keep it otherwise let me know we can modify it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets discuss this in call
self.config.connector_platform_details[connector_type] | ||
) | ||
else: | ||
connector_platform_detail = PlatformDetail() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this else and empty PlatformDetail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty PlatformDetails will take default platform instance and env.
I will remove the else part and initialization of connector_platform_details will be done prior to if condition.
|
||
return DatasetUrn.create_from_ids( | ||
platform_id=supported_data_platform[connector_type], | ||
table_name=connector.name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it correct as per your concept mapping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes correct. We decided to map source/destination as a input/output dataset of datajob entity.
I forgot to add it inside concept mapping table but already mentioned in Approch paragraph.
logger.info(f"Processing workspace id: {workspace.workspace_id}") | ||
yield from self.get_workspace_workunit(workspace) | ||
|
||
def get_report(self) -> SourceReport: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check powerbi, add one test connection method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already added the test connection methods for both cloud and oss in airbyte/rest_api_wrapper/data_resolver.py.
And using it at airbyte_api.py:33.
# If any entity does not support aspect 'status' then skip that entity from adding status aspect. | ||
# Example like dataProcessInstance doesn't suppport status aspect. | ||
# If not skipped gives error: java.lang.RuntimeException: Unknown aspect status for entity dataProcessInstance | ||
skip_urns.add(urn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets discuss this in call
) | ||
if response.status_code == 401: | ||
raise ConfigurationError( | ||
"Please check if provided api key is correct or not." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Configured credentials don't have required permissions.
if response.status_code == 401: | ||
raise ConfigurationError( | ||
"Please check if provided api key is correct or not." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shubhamjagtap639 Please document what minimum permission user or API key needs to fetch metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no any permission thing present at API level
## Configuration Notes | ||
1. Airbyte source is available for both Airbyte Cloud and Airbyte Open Source (OSS) users. | ||
2. For Airbyte Cloud user need to provide api_key in recipe to ingest metadata and for Airbyte OSS username and password. | ||
3. Refer Walkthrough demo [here](https://www.loom.com/share/7997a7c67cd642cc8d1c72ef0dfcc4bc) to create a api_key from [Developer Portal](https://portal.airbyte.com/) in case you are using Airbyte Cloud. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Airbyte Cloud refer demo here to create a api_key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -0,0 +1,16 @@ | |||
## Configuration Notes | |||
1. Airbyte source is available for both Airbyte Cloud and Airbyte Open Source (OSS) users. | |||
2. For Airbyte Cloud user need to provide api_key in recipe to ingest metadata and for Airbyte OSS username and password. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Airbyte Cloud users need to provide api_key
in the recipe for authentication and Airbyte OSS users need to provide username
and password
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|--------------------------|-----------------------| | ||
| `Workspace` | `DataFlow` | | ||
| `Connection` | `DataJob` | | ||
| `Sourc` | `Dataset` | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -377,6 +377,7 @@ def get_long_description(): | |||
"powerbi-report-server": powerbi_report_server, | |||
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.1"}, | |||
"unity-catalog": databricks | sqllineage_lib, | |||
"airbyte": {"requests"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requests is a common library, check if it might be already there in this setup.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, If you see dbt-cloud and plusar source they are also defined in same way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
||
def _get_jobs_from_response(self, response: List[Dict]) -> List[Job]: | ||
jobs: List[Job] = [ | ||
Job( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the purpose of these if else? lets discuss this in call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aibyte cloud and OSS response contains some same metadata with different key.
Instead of defining seperate function in both CloudApiResolver and OssApiResolver I did this and define one function in abstraction class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define two separate method please , eventually they will get call as per instance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| `Destination` | `Dataset` | | ||
| `Connection Job History` | `DataProcessInstance` | | ||
|
||
Source and destination gets mapped with Dataset as an Input and Output of Connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Source and destination are mapped to Dataset as an Input and Output of Connection.
@@ -377,6 +377,7 @@ def get_long_description(): | |||
"powerbi-report-server": powerbi_report_server, | |||
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.1"}, | |||
"unity-catalog": databricks | sqllineage_lib, | |||
"airbyte": {"requests"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
||
def _get_jobs_from_response(self, response: List[Dict]) -> List[Job]: | ||
jobs: List[Job] = [ | ||
Job( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define two separate method please , eventually they will get call as per instance
from tests.test_helpers import mce_helpers | ||
|
||
|
||
def enable_logging(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this no need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Checklist