diff --git a/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py b/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py index a80c304fdef2a..b4cb7e086a719 100644 --- a/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py +++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/consume.py @@ -133,6 +133,9 @@ def execute(self, context) -> Any: if isinstance(self.apply_function_batch, str): self.apply_function_batch = import_string(self.apply_function_batch) + if self.apply_function is not None and not callable(self.apply_function): + raise TypeError(f"apply_function is not a callable, got {type(self.apply_function)} instead.") + if self.apply_function: apply_callable = partial( self.apply_function, # type: ignore @@ -140,6 +143,11 @@ def execute(self, context) -> Any: **self.apply_function_kwargs, ) + if self.apply_function_batch is not None and not callable(self.apply_function_batch): + raise TypeError( + f"apply_function_batch is not a callable, got {type(self.apply_function_batch)} instead." + ) + if self.apply_function_batch: apply_callable = partial( self.apply_function_batch, # type: ignore diff --git a/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/produce.py b/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/produce.py index da460c3a87d2b..df7e0dc186ef8 100644 --- a/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/produce.py +++ b/providers/apache/kafka/src/airflow/providers/apache/kafka/operators/produce.py @@ -116,6 +116,11 @@ def execute(self, context) -> None: if isinstance(self.producer_function, str): self.producer_function = import_string(self.producer_function) + if self.producer_function is not None and not callable(self.producer_function): + raise TypeError( + f"producer_function is not a callable, got {type(self.producer_function)} instead." + ) + producer_callable = partial( self.producer_function, # type: ignore *self.producer_function_args,