From 680fe19f537578e34e7b46eaabd5331c4367ce20 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 30 Jan 2024 15:05:05 -0800 Subject: [PATCH] fix: tweak datahub-kafka requirement (#110) --- datahub-actions/setup.py | 2 +- .../src/datahub_actions/pipeline/pipeline.py | 8 ++++++-- .../datahub_actions/pipeline/pipeline_util.py | 8 +++++--- .../plugin/action/utils/term_resolver.py | 12 +++++------ .../datahub_actions/utils/name_resolver.py | 20 +++++++++---------- 5 files changed, 28 insertions(+), 22 deletions(-) diff --git a/datahub-actions/setup.py b/datahub-actions/setup.py index 6fffa70c..3d606fc9 100644 --- a/datahub-actions/setup.py +++ b/datahub-actions/setup.py @@ -33,7 +33,7 @@ def get_long_description(): acryl_datahub_min_version = os.environ.get("ACRYL_DATAHUB_MIN_VERSION") or "0.12.1.2" base_requirements = { - f"acryl-datahub[kafka]>={acryl_datahub_min_version}", + f"acryl-datahub[datahub-kafka]>={acryl_datahub_min_version}", # Compatibility. "typing_extensions>=3.7.4; python_version < '3.8'", "mypy_extensions>=0.4.3", diff --git a/datahub-actions/src/datahub_actions/pipeline/pipeline.py b/datahub-actions/src/datahub_actions/pipeline/pipeline.py index 3639a144..8c81e72b 100644 --- a/datahub-actions/src/datahub_actions/pipeline/pipeline.py +++ b/datahub-actions/src/datahub_actions/pipeline/pipeline.py @@ -82,9 +82,13 @@ class Pipeline: _stats: PipelineStats = PipelineStats() # Options - _retry_count: int = DEFAULT_RETRY_COUNT # Number of times a single event should be retried in case of processing error. + _retry_count: int = ( + DEFAULT_RETRY_COUNT # Number of times a single event should be retried in case of processing error. + ) _failure_mode: FailureMode = DEFAULT_FAILURE_MODE - _failed_events_dir: str = DEFAULT_FAILED_EVENTS_DIR # The top-level path where failed events will be logged. + _failed_events_dir: str = ( + DEFAULT_FAILED_EVENTS_DIR # The top-level path where failed events will be logged. + ) def __init__( self, diff --git a/datahub-actions/src/datahub_actions/pipeline/pipeline_util.py b/datahub-actions/src/datahub_actions/pipeline/pipeline_util.py index fbebe263..6a430b37 100644 --- a/datahub-actions/src/datahub_actions/pipeline/pipeline_util.py +++ b/datahub-actions/src/datahub_actions/pipeline/pipeline_util.py @@ -45,9 +45,11 @@ def create_action_context( ) -> PipelineContext: return PipelineContext( pipeline_name, - AcrylDataHubGraph(DataHubGraph(datahub_config)) - if datahub_config is not None - else None, + ( + AcrylDataHubGraph(DataHubGraph(datahub_config)) + if datahub_config is not None + else None + ), ) diff --git a/datahub-actions/src/datahub_actions/plugin/action/utils/term_resolver.py b/datahub-actions/src/datahub_actions/plugin/action/utils/term_resolver.py index cd0b9930..fef68131 100644 --- a/datahub-actions/src/datahub_actions/plugin/action/utils/term_resolver.py +++ b/datahub-actions/src/datahub_actions/plugin/action/utils/term_resolver.py @@ -41,9 +41,9 @@ def __init__( f"Following terms need server-side resolution {terms_needing_resolution} but a DataHub server wasn't provided. Either use fully qualified glossary term ids (e.g. urn:li:glossaryTerm:ec428203-ce86-4db3-985d-5a8ee6df32ba) or provide a datahub_api config in your recipe." ) for term_identifier in terms_needing_resolution: - self.glossary_term_registry[ - term_identifier - ] = self._resolve_term_id_to_urn(term_identifier) + self.glossary_term_registry[term_identifier] = ( + self._resolve_term_id_to_urn(term_identifier) + ) nodes_needing_resolution = [ d for d in glossary_entities @@ -54,9 +54,9 @@ def __init__( f"Following term groups (glossary nodes) need server-side resolution {nodes_needing_resolution} but a DataHub server wasn't provided. Either use fully qualified glossary term ids (e.g. urn:li:glossaryTerm:ec428203-ce86-4db3-985d-5a8ee6df32ba) or provide a datahub_api config in your recipe." ) for node_identifier in nodes_needing_resolution: - self.glossary_node_registry[ - node_identifier - ] = self._resolve_node_id_to_urn(node_identifier) + self.glossary_node_registry[node_identifier] = ( + self._resolve_node_id_to_urn(node_identifier) + ) def _resolve_term_id_to_urn(self, term_identifier: str) -> Optional[str]: assert self.graph diff --git a/datahub-actions/src/datahub_actions/utils/name_resolver.py b/datahub-actions/src/datahub_actions/utils/name_resolver.py index ba6ff9b5..4ce40f38 100644 --- a/datahub-actions/src/datahub_actions/utils/name_resolver.py +++ b/datahub-actions/src/datahub_actions/utils/name_resolver.py @@ -145,10 +145,10 @@ def get_entity_name( self, entity_urn: Urn, datahub_graph: Optional[DataHubGraph] ) -> str: if datahub_graph: - container_props: Optional[ - ContainerPropertiesClass - ] = datahub_graph.get_aspect( - entity_urn=str(entity_urn), aspect_type=ContainerPropertiesClass + container_props: Optional[ContainerPropertiesClass] = ( + datahub_graph.get_aspect( + entity_urn=str(entity_urn), aspect_type=ContainerPropertiesClass + ) ) if container_props and container_props.name: return container_props.name @@ -200,9 +200,9 @@ def get_entity_name( if user_properties and user_properties.displayName: entity_name = user_properties.displayName - editable_properties: Optional[ - CorpUserEditableInfoClass - ] = datahub_graph.get_aspect(str(entity_urn), CorpUserEditableInfoClass) + editable_properties: Optional[CorpUserEditableInfoClass] = ( + datahub_graph.get_aspect(str(entity_urn), CorpUserEditableInfoClass) + ) if editable_properties and editable_properties.displayName: entity_name = editable_properties.displayName @@ -229,9 +229,9 @@ def get_entity_name( self, entity_urn: Urn, datahub_graph: Optional[DataHubGraph] ) -> str: if datahub_graph: - dashboard_properties: Optional[ - DashboardInfoClass - ] = datahub_graph.get_aspect(str(entity_urn), DashboardInfoClass) + dashboard_properties: Optional[DashboardInfoClass] = ( + datahub_graph.get_aspect(str(entity_urn), DashboardInfoClass) + ) if dashboard_properties and dashboard_properties.title: return dashboard_properties.title