Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion): isolate dependency requirements of airflow hooks #2977

Merged
merged 1 commit into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions metadata-ingestion/src/datahub_provider/hooks/datahub.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

from airflow.exceptions import AirflowException

Expand All @@ -11,11 +11,14 @@

AIRFLOW_1 = True

from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.sink.datahub_kafka import KafkaSinkConfig
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent

if TYPE_CHECKING:
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.sink.datahub_kafka import KafkaSinkConfig


_default_hook_args = []
if AIRFLOW_1:
_default_hook_args = [None]
Expand Down Expand Up @@ -64,8 +67,10 @@ def _get_config(self) -> Tuple[str, Optional[str]]:
raise AirflowException("host parameter is required")
return (host, conn.password)

def make_emitter(self) -> DatahubRestEmitter:
return DatahubRestEmitter(*self._get_config())
def make_emitter(self) -> "DatahubRestEmitter":
import datahub.emitter.rest_emitter

return datahub.emitter.rest_emitter.DatahubRestEmitter(*self._get_config())

def emit_mces(self, mces: List[MetadataChangeEvent]) -> None:
emitter = self.make_emitter()
Expand Down Expand Up @@ -110,7 +115,9 @@ def get_ui_field_behaviour() -> Dict:
},
}

def _get_config(self) -> KafkaSinkConfig:
def _get_config(self) -> "KafkaSinkConfig":
import datahub.ingestion.sink.datahub_kafka

conn = self.get_connection(self.datahub_kafka_conn_id)
obj = conn.extra_dejson
obj.setdefault("connection", {})
Expand All @@ -120,12 +127,14 @@ def _get_config(self) -> KafkaSinkConfig:
"Kafka broker specified twice (present in host and extra)"
)
obj["connection"]["bootstrap"] = conn.host
config = KafkaSinkConfig.parse_obj(obj)
config = datahub.ingestion.sink.datahub_kafka.KafkaSinkConfig.parse_obj(obj)
return config

def make_emitter(self) -> DatahubKafkaEmitter:
def make_emitter(self) -> "DatahubKafkaEmitter":
import datahub.emitter.kafka_emitter

sink_config = self._get_config()
return DatahubKafkaEmitter(sink_config)
return datahub.emitter.kafka_emitter.DatahubKafkaEmitter(sink_config)

def emit_mces(self, mces: List[MetadataChangeEvent]) -> None:
emitter = self.make_emitter()
Expand Down Expand Up @@ -176,7 +185,7 @@ def get_underlying_hook(self) -> Union[DatahubRestHook, DatahubKafkaHook]:
f"DataHub cannot handle conn_type {conn.conn_type} in {conn}"
)

def make_emitter(self) -> Union[DatahubRestEmitter, DatahubKafkaEmitter]:
def make_emitter(self) -> Union["DatahubRestEmitter", "DatahubKafkaEmitter"]:
return self.get_underlying_hook().make_emitter()

def emit_mces(self, mces: List[MetadataChangeEvent]) -> None:
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/tests/unit/test_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def patch_airflow_connection(conn: Connection) -> Iterator[Connection]:
yield conn


@mock.patch("datahub_provider.hooks.datahub.DatahubRestEmitter", autospec=True)
@mock.patch("datahub.emitter.rest_emitter.DatahubRestEmitter", autospec=True)
def test_datahub_rest_hook(mock_emitter):
with patch_airflow_connection(datahub_rest_connection_config) as config:
hook = DatahubRestHook(config.conn_id)
Expand All @@ -102,7 +102,7 @@ def test_datahub_rest_hook(mock_emitter):
instance.emit_mce.assert_called_with(lineage_mce)


@mock.patch("datahub_provider.hooks.datahub.DatahubKafkaEmitter", autospec=True)
@mock.patch("datahub.emitter.kafka_emitter.DatahubKafkaEmitter", autospec=True)
def test_datahub_kafka_hook(mock_emitter):
with patch_airflow_connection(datahub_kafka_connection_config) as config:
hook = DatahubKafkaHook(config.conn_id)
Expand Down