-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
3.0.4
If "Other Airflow 2 version" selected, which one?
No response
What happened?
I am using @task.virtualenv, and i want to log as i did in @task like:
import logging
logging.info("This is a simple log")But when i do the same in @task.virtualenv, it doesnt show anything, but venv setup logs.
logging.info(f"Output directory: {output_dir}")
logging.info(f"Temporary directory: {temp_dir}")[2025-08-20, 07:53:38] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-08-20, 07:53:38] INFO - Filling up the DagBag from /home/zifengma/Airflow/dags/example/dag_example.py: source="airflow.models.dagbag.DagBag"
[2025-08-20, 07:53:38] INFO - Python virtual environment will be cached in venvs/example/venv-705d7621: source="airflow.task.operators.airflow.providers.standard.decorators.python_virtualenv._PythonVirtualenvDecoratedOperator"
[2025-08-20, 07:53:38] INFO - Reusing cached Python virtual environment in venvs/example/venv-705d7621: source="airflow.task.operators.airflow.providers.standard.decorators.python_virtualenv._PythonVirtualenvDecoratedOperator"
[2025-08-20, 07:53:38] INFO - Use 'pickle' as serializer.: source="airflow.task.operators.airflow.providers.standard.decorators.python_virtualenv._PythonVirtualenvDecoratedOperator"
[2025-08-20, 07:53:38] INFO - Executing cmd: venvs/example/venv-705d7621/bin/python /tmp/venv-call4g9iyonr/script.py /tmp/venv-call4g9iyonr/script.in /tmp/venv-call4g9iyonr/script.out /tmp/venv-call4g9iyonr/string_args.txt /tmp/venv-call4g9iyonr/termination.log /tmp/venv-call4g9iyonr/airflow_context.json: source="airflow.utils.process_utils"
[2025-08-20, 07:53:38] INFO - Output:: source="airflow.utils.process_utils"
[2025-08-20, 07:53:38] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.decorators.python_virtualenv._PythonVirtualenvDecoratedOperator"
What you think should happen instead?
No response
How to reproduce
Here's the example code that you can reproduce it.
/utils/example.py
# Example: @task
@task
def task_one(config_path: str, logical_date=None, params=None, ti=None):
import pandas as pd # Import inside task per best practices
import json
import csv
from pathlib import Path
import logging
try:
with open(config_path) as f:
config = json.load(f)
task_id = ti.task_id if ti else "unknown_task"
temp_dir = Path(config.get("temp_dir"))
raw_data = config.get("raw_data", 0)
processed_data = raw_data * 2
temp_dir.mkdir(parents=True, exist_ok=True)
csv_file = temp_dir / "processed_data.csv"
with open(csv_file, mode="w", newline="") as f:
writer = csv.DictWriter(
f, fieldnames=["logical_date", "task_id", "processed_data"]
)
writer.writeheader()
writer.writerow(
{
"logical_date": str(logical_date),
"task_id": task_id,
"processed_data": processed_data,
}
)
logging.info(f"Processed data saved to {csv_file}")
except Exception as e:
logging.error(f"Error in @task: {e}")
return
# Example: @task.virtualenv
@task.virtualenv(
python_version="3.9",
requirements="requirements_example.txt",
system_site_packages=True,
venv_cache_path="venvs/example",
inherit_env=True,
)
def task_two(config_path: str, logical_date=None, params=None):
import json
import pendulum
import os
import csv
from pathlib import Path
import logging
try:
with open(config_path) as f:
config = json.load(f)
output_dir = Path(config.get("output_dir"))
temp_dir = Path(config.get("temp_dir"))
output_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Output directory: {output_dir}")
logging.info(f"Temporary directory: {temp_dir}")
processed_csv = temp_dir / "processed_data.csv"
if not processed_csv.exists():
raise FileNotFoundError(f"Processed data file not found: {processed_csv}")
with open(processed_csv, mode="r", newline="") as f:
reader = csv.DictReader(f)
processed_rows = [row for row in reader]
logging.info(f"Read {len(processed_rows)} rows from processed data")
output_file = output_dir / f"output_{logical_date}.csv"
with open(output_file, mode="w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=processed_rows[0].keys())
writer.writeheader()
for row in processed_rows:
writer.writerow(row)
logging.info(f"Output data saved to {output_file}")
except Exception as e:
logging.error(f"Error in @task.virtualenv: {e}")
return/dags/example/dag_example.py
from airflow.sdk import dag, Param
@dag(
dag_id="example",
schedule = None,
params={
"Greeting": Param("Hello!", type="string")
}
# https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html#params
)
def example():
from utils.example import task_one, task_two
config_path = "configs/config_example.json"
task_one = task_one(config_path = config_path)
task_two = task_two(config_path = config_path)
task_one >> task_two
example()Operating System
NAME="Oracle Linux Server" VERSION="9.6" ID="ol" ID_LIKE="fedora" VARIANT="Server" VARIANT_ID="server" VERSION_ID="9.6" PLATFORM_ID="platform:el9" PRETTY_NAME="Oracle Linux Server 9.6" ANSI_COLOR="0;31" CPE_NAME="cpe:/o:oracle:linux:9:6:server" HOME_URL="https://linux.oracle.com/" BUG_REPORT_URL="https://github.com/oracle/oracle-linux" ORACLE_BUGZILLA_PRODUCT="Oracle Linux 9" ORACLE_BUGZILLA_PRODUCT_VERSION=9.6 ORACLE_SUPPORT_PRODUCT="Oracle Linux" ORACLE_SUPPORT_PRODUCT_VERSION=9.6
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
uv add "apache-airflow[celery]==3.0.4" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.0.4/constraints-3.9.txt"
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
Type
Projects
Status