-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow Provider(s)
cohere
Versions of Apache Airflow Providers
apache-airflow-providers-cohere==1.4.2 and above.
Apache Airflow version
2.10.5
Operating System
Ubuntu 22.04.3 LTS
Deployment
Docker-Compose
Deployment details
Airflow is deployed by following:
(2.10.5) https://airflow.apache.org/docs/apache-airflow/2.10.5/howto/docker-compose/index.html
(3.0.0) https://airflow.apache.org/docs/apache-airflow/3.0.0/howto/docker-compose/index.html
What happened
I tried to test the provider by following the tutorial https://www.astronomer.io/docs/learn/airflow-cohere/.
Run on Airflow 2.10.5
When the DAG is run using the provider version greater than and include 1.4.2, I got the following error message.
ImportError: cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings was not found in allow list for deserialization imports. To allow it, add it to allowed_deserialization_classes in the configuration
[2025-05-21T03:31:59.690+0000] {local_task_job_runner.py:266} INFO - Task exited with return code 1
[2025-05-21T03:31:59.703+0000] {taskinstance.py:3901} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2025-05-21T03:31:59.705+0000] {local_task_job_runner.py:245} INFO - ::endgroup::
Below is the code snippet of the task that causes the error.
get_embeddings = CohereEmbeddingOperator.partial(
task_id="get_embeddings",
conn_id=COHERE_CONN_ID,
).expand(input_text=recipes_list)Run on Airflow 3.0.0
When the DAG is run using the provider version greater than and include 1.4.2, I got the following error message.
TypeError: cannot serialize object of type <class 'cohere.types.embed_by_type_response_embeddings.EmbedByTypeResponseEmbeddings'>
Below is the code snippet of the task that causes the error.
chain(
get_embeddings,
plot_embeddings(
get_embeddings.output,
text_labels=countries_list,
file_name=IMAGE_PATH,
),
)For Airflow 2, there is a deserialization issue for the type EmbedByTypeResponseEmbeddings. For Airflow 3, there a serialization issue is observed.
What have been changed for this provider in 1.4.2.
Prior to 1.4.2, the return type of the execute method is list[list[float]]. Below is the link to the code.
| def execute(self, context: Context) -> list[list[float]]: |
Since 1.4.2, the return type of the execute method is EmbedByTypeResponseEmbeddings. Below is the link to the code.
The issue was initially discussed in #50599 (comment).
What you think should happen instead
If we consider the embedding is a proper thing to be propagated through XComs (I think usually we don't recommend to pass large data through XComs), the serialization/deserialization should work for the updated object.
How to reproduce
Create a Dockerfile and a requirements.txt file to define the build and dependencies.
Dockerfile
FROM apache/airflow:3.0.0
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
USER ${AIRFLOW_UID}requirements.txt
apache-airflow-providers-cohere==1.4.2
matplotlib==3.8.1
seaborn==0.13.0
scikit-learn==1.3.2
pandas
numpy==1.26.2
adjustText==0.8
Follow the guideline to deploy Airflow locally using Docker Compose
(2.10.5) https://airflow.apache.org/docs/apache-airflow/2.10.5/howto/docker-compose/index.html
(3.0.0) https://airflow.apache.org/docs/apache-airflow/3.0.0/howto/docker-compose/index.html
Copy the DAG code and run in Airflow
https://www.astronomer.io/docs/learn/airflow-cohere/
I only used the DAG code, so I need to configure the docker-compose.yml to create and mount the /include folder.
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct