-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow Provider(s)
microsoft-azure
Versions of Apache Airflow Providers
apache-airflow-providers-microsoft-azure==12.3.1
Apache Airflow version
apache/airflow:3.0.1-python3.12
Operating System
PRETTY_NAME="Debian GNU/Linux 12 (bookworm)"
Deployment
Other Docker-based deployment
Deployment details
I used this Docker file to deploy Airflow to Azure Container Instance
FROM apache/airflow:3.0.1-python3.12
USER root
RUN apt-get update && \
apt-get install -y --no-install-recommends \
build-essential \
git \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
USER airflow
ENV AIRFLOW_VERSION=3.0.1
ENV PYTHON_VERSION=3.12
RUN curl -sSL "https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" \
-o /tmp/constraints.txt
RUN pip install --no-cache-dir \
apache-airflow-providers-microsoft-azure \
apache-airflow-providers-common-sql \
apache-airflow-providers-postgres \
apache-airflow-providers-git \
psycopg2-binary \
azure-identity \
azure-storage-file-datalake \
--constraint /tmp/constraints.txt
What happened
def upload_file(self,file_system_name: FileSystemProperties | str, file_name: str,file_path: str, overwrite: bool = False,
**kwargs: Any,) -> None:
file_client = self.create_file(file_system_name, file_name)
with open(file_path, "rb") as data:
file_client.upload_data(data, overwrite=overwrite, kwargs=kwargs)
The function from AzureDataLakeStorageV2Hook class is passing kwargs wrong and causing the below error when trying to upload
ERROR - Task failed with exception: source="task"
TypeError: Session.request() got an unexpected keyword argument 'kwargs'
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 838 in run
What you think should happen instead
file_client.upload_data(data, overwrite=overwrite, kwargs=kwargs) should be
file_client.upload_data(data, overwrite=overwrite, **kwargs)
Because the function is not unpacking the dictionary; instead, it is passing kwargs as a single argument called kwargs.
How to reproduce
@task
def upload_to_adls_not_working():
hook = AzureDataLakeStorageV2Hook(adls_conn_id='DLAKEGEN2')
file_content = json.dumps({"message": "Hello, World!"})
with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as tmp:
tmp.write(file_content.encode("utf-8"))
tmp_path = tmp.name
hook.upload_file(
file_system_name='xxxxxxx',
file_name="upload_to_adls_not_working.json",
file_path=tmp_path,
overwrite=True,
)
Anything else
same bug also in upload_file_to_directory() and create_directory()
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct