Skip to content

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented Aug 16, 2025

🎯 Problem Statement

The Task SDK separation in Airflow 3.1 requires decoupling serialization and deserialization code to eliminate server-side dependencies on client SDK implementations:

  1. Task SDK Dependencies: airflow-core deserialization currently depends on Task SDK's BaseOperator for default values and field lists
  2. Architectural Coupling: Server components import and use Task SDK classes during deserialization, violating client/server separation
  3. Independent Deployment Blocker: Tight coupling prevents independent deployment and upgrade of server vs client components

🚀 Solution Overview

This PR decouples (to a great extent) serialization and deserialization code by removing Task SDK dependencies from airflow-core:

  • Remove dynamic SDK calls: Replace get_serialized_fields() calls with hardcoded class methods
  • Eliminate import dependencies: Remove OPERATOR_DEFAULTS and other Task SDK imports from server-side code
  • Schema-driven defaults: Use schema.json and client_defaults instead of Task SDK classes for default resolution
  • Independent deployment: Enable server/client components to be deployed and upgraded separately

📊 Benchmark

As part of this change, I optimised how the defaults are stored and when a field is stored and removed anything that matches defaults, nulls and the bigger impact change to remove storing entire callback functions as strings and instead store a boolean to indicate if a callback was set or not.

The bigger the DAG (more tasks + especially with callbacks), the more savings.

Using actual pre-optimization code:

Scenario Tasks Before After Saved Reduction
Basic DAGs 10 7.5 KB 5.7 KB 1.8 KB 24.0%
50 33.7 KB 24.5 KB 9.2 KB 27.3%
100 66.4 KB 48.0 KB 18.4 KB 27.7%
Production DAGs (3 callbacks/task) 10 15.5 KB 6.6 KB 8.9 KB 57.4%
50 73.8 KB 28.9 KB 44.9 KB 60.8%
100 146.7 KB 56.9 KB 89.8 KB 61.2%

🔥 Callback Optimization Analysis (100 tasks with 3 callbacks each):

Storage Method Before After Saved Reduction
Callback representation Lists of function code Boolean flags - -
Per-task callback overhead 822 bytes 91 bytes 731 bytes 89%
Total callback overhead 80.3 KB 8.9 KB 71.4 KB 89%
Per-task total size 1,502 bytes 583 bytes 919 bytes 61%

🎯 Key Optimization Impact:

  • Callback transformation: 89% reduction in callback storage overhead (function code → boolean flags)
  • Production scaling: 57% → 61% reduction as DAG size increases (10 → 100 tasks)
  • Per-task efficiency: 919 bytes saved per task (1,502 → 583 bytes) for callback DAGs
  • Consistent baseline: 24-28% reduction even for basic DAGs without callbacks

🏗️ Architecture Changes

Task Default Resolution

Implements hierarchical defaults during deserialization:

  1. Schema defaults (from schema.json) - lowest priority
  2. client_defaults.tasks - SDK-specific overrides
  3. partial_kwargs - MappedOperator values
  4. Explicit task values - highest priority

Serialization Exclusion

Fields matching client_defaults are automatically excluded from task serialization, reducing redundancy while maintaining full information.

fyi: Following the Task Execution API pattern, I aim to add versioned schema contract at Airflow website directly or version docs soon'ish:

Thinking about a URL like: https://airflow.apache.org/schemas/dag-serialization/v2.json

🚦 Migration Path

For Users

  • No action required - changes are completely transparent
  • Existing DAGs continue working unchanged
  • New DAGs automatically benefit from optimizations

Appendix (for my own tracking)

TODOs (some might be done in a future PR):

  • Add defaults to schema.json
  • Exclude defaults in schema from Serialized JSON
  • Change on_*_callback on tasks to use has_on_*_callback
  • Remove unmap method from scheduler-side #54816
  • Implement client_defaults generation in serialization (Task SDK side)
  • Verify if the change is backwards compatible. If not, Update serialization version to v3 and add backwards compatibility for v2. Update: It is a backwards-compatible change
  • Add tests to ensure defaults in Schema are same as the Server side classes. Or better add prek/pre-commit to autogenerate default from server-side to Schema
  • Evaluate the alternative of storing the list of attributes needed for Serialization & De-serialization in schema.JSON

Future Work:

  • Move the S10n code over for DAG & Task Group classes to Server-side
  • Include schema.json in the calver OpenAPI spec for Execution API and/or in airflow versioned docs
  • Move Serialization code to Task SDK
  • Remove ui_color & ui_fgcolor

Other points

  • If we move serialization to Task SDK and keep de-serializtion to the Server side, how do we handle the following:
    • XCom deserialization -- it currently uses the airflow.serialization module
    • ExtendedJSON - TypeDecorator used in serialization of the following:
      • DagRun.context_carrier
      • TaskInstance.next_kwargs

Benchmark script:

#!/usr/bin/env python3
"""
Script to measure real serialization scaling for different DAG sizes.
Uses actual JSON examination for accurate before/after comparison.
"""

import json
import sys
from datetime import datetime, timedelta
from pathlib import Path

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.serialization.serialized_objects import SerializedDAG

def success_callback(context):
    """Example success callback function."""
    print(f"Task {context['task_instance'].task_id} succeeded!")
    return "success"

def failure_callback(context):
    """Example failure callback function."""
    print(f"Task {context['task_instance'].task_id} failed!")
    # Send notification to Slack
    import requests
    requests.post("https://hooks.slack.com/webhook", json={
        "text": f"❌ Task failed: {context['task_instance'].task_id}"
    })
    return "failure_handled"

def retry_callback(context):
    """Example retry callback function."""
    print(f"Task {context['task_instance'].task_id} will retry!")
    return "retry_scheduled"

def create_test_dag(num_tasks: int, with_callbacks: bool = True) -> DAG:
    """Create a test DAG with specified number of tasks."""
    dag = DAG(
        dag_id=f"scaling_test_dag_{num_tasks}",
        start_date=datetime(2024, 1, 1),
        schedule="@daily",
        default_args={
            "owner": "test_user",
            "retries": 2,
            "retry_delay": timedelta(minutes=5),
            "email": "test@example.com",
            "email_on_failure": True,
            "email_on_retry": True,
        }
    )
    
    with dag:
        for i in range(num_tasks):
            task_kwargs = {
                "task_id": f"task_{i:03d}",
                "bash_command": f"echo 'Processing item {i}'",
                "email_on_failure": True,
                "email_on_retry": True,
            }
            
            if with_callbacks:
                task_kwargs.update({
                    "on_success_callback": success_callback,
                    "on_failure_callback": failure_callback,
                    "on_retry_callback": retry_callback,
                })
            
            BashOperator(**task_kwargs)
    
    return dag

def measure_dag_serialization(num_tasks: int, with_callbacks: bool = True) -> dict:
    """Measure serialization for a DAG with specified parameters."""
    dag = create_test_dag(num_tasks, with_callbacks)
    
    try:
        serialized = SerializedDAG.to_dict(dag)
    except Exception as e:
        return {"error": str(e)}
    
    # Convert to compact JSON
    json_compact = json.dumps(serialized, separators=(',', ':'))
    total_size = len(json_compact.encode('utf-8'))
    
    # Analyze callback fields if callbacks are enabled
    callback_info = {}
    if with_callbacks and "dag" in serialized and "tasks" in serialized["dag"]:
        tasks = serialized["dag"]["tasks"]
        if tasks:
            first_task = tasks[0]["__var"]
            
            # Find callback fields
            callback_fields = [k for k in first_task if "callback" in k.lower()]
            
            # Calculate callback overhead
            task_json = json.dumps(first_task, separators=(',', ':'))
            total_task_size = len(task_json.encode('utf-8'))
            
            task_without_callbacks = {k: v for k, v in first_task.items() if "callback" not in k.lower()}
            no_callback_json = json.dumps(task_without_callbacks, separators=(',', ':'))
            no_callback_size = len(no_callback_json.encode('utf-8'))
            
            callback_overhead_per_task = total_task_size - no_callback_size
            total_callback_overhead = callback_overhead_per_task * len(tasks)
            
            callback_info = {
                "callback_fields": callback_fields,
                "callback_overhead_per_task": callback_overhead_per_task,
                "total_callback_overhead": total_callback_overhead,
                "task_size": total_task_size,
                "task_size_without_callbacks": no_callback_size
            }
    
    return {
        "num_tasks": num_tasks,
        "with_callbacks": with_callbacks,
        "total_size": total_size,
        "size_kb": total_size / 1024,
        "per_task_bytes": total_size / num_tasks,
        "callback_info": callback_info
    }

def run_scaling_measurements():
    """Run measurements for different DAG sizes."""
    print("🔍 Measuring Real Serialization Scaling")
    print("=" * 60)
    
    task_counts = [10, 25, 50, 100]
    
    # Test with callbacks
    print("\n📊 **Production DAGs (With Callbacks)**")
    print("| Tasks | Size | Per Task | Callback Overhead |")
    print("|-------|------|----------|-------------------|")
    
    callback_results = []
    for num_tasks in task_counts:
        result = measure_dag_serialization(num_tasks, with_callbacks=True)
        
        if "error" in result:
            print(f"| {num_tasks} | ERROR: {result['error']} |")
            continue
            
        callback_results.append(result)
        
        callback_overhead_kb = result["callback_info"]["total_callback_overhead"] / 1024 if result["callback_info"] else 0
        
        print(f"| {num_tasks} | {result['size_kb']:.1f} KB | {result['per_task_bytes']:.0f} bytes | {callback_overhead_kb:.1f} KB |")
    
    # Test without callbacks
    print("\n📊 **Basic DAGs (No Callbacks)**")
    print("| Tasks | Size | Per Task |")
    print("|-------|------|----------|")
    
    basic_results = []
    for num_tasks in task_counts:
        result = measure_dag_serialization(num_tasks, with_callbacks=False)
        
        if "error" in result:
            print(f"| {num_tasks} | ERROR: {result['error']} |")
            continue
            
        basic_results.append(result)
        print(f"| {num_tasks} | {result['size_kb']:.1f} KB | {result['per_task_bytes']:.0f} bytes |")
    
    # Show callback field details for largest DAG
    if callback_results:
        largest = callback_results[-1]
        if largest["callback_info"]:
            print(f"\n🔍 **Callback Analysis (100 tasks):**")
            ci = largest["callback_info"]
            print(f"   Callback fields: {ci['callback_fields']}")
            print(f"   Per-task callback overhead: {ci['callback_overhead_per_task']} bytes")
            print(f"   Total callback overhead: {ci['total_callback_overhead']:,} bytes ({ci['total_callback_overhead']/1024:.1f} KB)")
            print(f"   Task size with callbacks: {ci['task_size']} bytes")
            print(f"   Task size without callbacks: {ci['task_size_without_callbacks']} bytes")
    
    # Generate CSV for easy import
    print(f"\n📋 **CSV Format (for PR description):**")
    print("# callbacks")
    for result in callback_results:
        print(f"{result['num_tasks']},{result['size_kb']:.1f},{result['per_task_bytes']:.0f},True")
    
    print("# basic")  
    for result in basic_results:
        print(f"{result['num_tasks']},{result['size_kb']:.1f},{result['per_task_bytes']:.0f},False")
    
    return callback_results, basic_results

def main():
    """Main function."""
    run_scaling_measurements()

if __name__ == "__main__":
    main()

@kaxil kaxil force-pushed the serialization/op-defaults branch from 8baf823 to 0334138 Compare August 20, 2025 14:58
@kaxil kaxil force-pushed the serialization/op-defaults branch 5 times, most recently from 8248e4a to c126079 Compare August 23, 2025 22:52
@kaxil kaxil mentioned this pull request Aug 26, 2025
4 tasks
@kaxil kaxil force-pushed the serialization/op-defaults branch 2 times, most recently from 89a6807 to c0be635 Compare August 27, 2025 07:29
@kaxil kaxil force-pushed the serialization/op-defaults branch 5 times, most recently from 6351823 to 3674170 Compare August 28, 2025 19:42
@kaxil kaxil force-pushed the serialization/op-defaults branch 2 times, most recently from 7850b64 to 93c2642 Compare August 28, 2025 22:58
@kaxil kaxil changed the title [DO NOT REVIEW] Remove Task SDK dependencies from airflow-core deserialization Decouple Serialization and Deserialization Code for Operators Aug 28, 2025
@kaxil kaxil changed the title Decouple Serialization and Deserialization Code for Operators Decouple Serialization and Deserialization Code for tasks Aug 28, 2025
@kaxil kaxil force-pushed the serialization/op-defaults branch from 93c2642 to 3bfc590 Compare August 29, 2025 00:39
@kaxil kaxil added the full tests needed We need to run full set of tests for this PR to merge label Aug 29, 2025
@kaxil kaxil marked this pull request as ready for review August 29, 2025 01:25
@kaxil kaxil requested review from amoghrajesh and ashb as code owners August 29, 2025 01:25
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 3, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 3, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 4, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 4, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 7, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 7, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 8, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 8, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 9, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 9, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 10, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 10, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 11, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 11, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 12, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 12, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 14, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 14, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 15, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 15, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 17, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 17, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 19, 2025
Remove Task SDK dependencies from airflow-core deserialization by establishing
a schema-based contract between client and server components. This
change enables independent deployment and upgrades while laying the foundation
for multi-language SDK support.

Key Decoupling Achievements:
- Replace dynamic get_serialized_fields() calls with hardcoded class methods
- Add schema-driven default resolution with get_operator_defaults_from_schema()
- Remove OPERATOR_DEFAULTS import dependency from airflow-core
- Implement SerializedBaseOperator class attributes for all operator defaults
- Update _is_excluded() logic to use schema defaults for efficient serialization

Serialization Optimizations:
- Unified partial_kwargs optimization supporting both encoded/non-encoded formats
- Intelligent default exclusion reducing storage redundancy
- MappedOperator.operator_class memory optimization (~90-95% reduction)
- Comprehensive client_defaults system with hierarchical resolution

Compatibility & Performance:
- Significant size reduction for typical DAGs with mapped operators
- Minimal overhead for client_defaults section (excellent efficiency)
- All existing serialized DAGs continue to work unchanged

Technical Implementation:
- Add generate_client_defaults() with LRU caching for optimal performance
- Implement _deserialize_partial_kwargs() supporting dual formats
- Centralized field deserialization eliminating code duplication
- Consolidated preprocessing logic in _preprocess_encoded_operator()
- Callback field preprocessing for backward compatibility

Testing & Validation:
- Added TestMappedOperatorSerializationAndClientDefaults with 9 comprehensive tests
- Parameterized tests for multiple serialization formats
- End-to-end validation of serialization/deserialization workflows
- Backward compatibility validation for callback field migration

This decoupling enables independent deployment/upgrades and provides the
foundation for multi-language SDK ecosystem alongside the Task Execution API.

Part of apache#45428
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 19, 2025
…#55849)

This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache#54569
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this pull request Oct 24, 2025
This change reduces serialized DAG size by automatically excluding fields
that match their schema default values, similar to how operator serialization
works. Fields like `catchup=False`, `max_active_runs=16`, and `fail_fast=False`
are no longer stored when they have default values.

Follow-up of apache/airflow#54569

(cherry picked from commit a582464766d984f34b07d5ac848de2057b43d0ae)

GitOrigin-RevId: 67468ef2c5bb40ace29449778b000d71d19e461a
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:serialization area:task-sdk full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants