Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…16692-show-cron-schedule-description-in-ui
  • Loading branch information
pateash committed Oct 15, 2021
2 parents bf849b5 + 6032159 commit 7ff533d
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 55 deletions.
8 changes: 4 additions & 4 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ Doc only changes
- Docs: Fix url for ``Elasticsearch`` (#16275)
- Small improvements for README.md files (#16244)
- Fix docs for ``dag_concurrency`` (#16177)
- Check syntatic correctness for code-snippets (#16005)
- Check syntactic correctness for code-snippets (#16005)
- Improvements for Docker Image docs (#14843)
- Add proper link for wheel packages in docs. (#15999)
- Add Docs for ``default_pool`` slots (#15997)
Expand All @@ -295,7 +295,7 @@ Misc/Internal
- Update ``boto3`` to ``<1.19`` (#18389)
- Improve coverage for ``airflow.security.kerberos module`` (#18258)
- Fix kinesis test (#18337)
- Fix provider test acessing importlib-resources (#18228)
- Fix provider test accessing importlib-resources (#18228)
- Silence warnings in tests from using SubDagOperator (#18275)
- Fix usage of ``range(len())`` to ``enumerate`` (#18174)
- Test coverage on the autocomplete view (#15943)
Expand Down Expand Up @@ -2175,7 +2175,7 @@ Bug fixes
- [AIRFLOW-4763] Allow list in DockerOperator.command (#5408)
- [AIRFLOW-5260] Allow empty uri arguments in connection strings (#5855)
- [AIRFLOW-5257] Fix ElasticSearch log handler errors when attempting to close logs (#5863)
- [AIRFLOW-1772] Google Updated Sensor doesnt work with CRON expressions (#5730)
- [AIRFLOW-1772] Google Updated Sensor doesn't work with CRON expressions (#5730)
- [AIRFLOW-5085] When you run kubernetes git-sync test from TAG, it fails (#5699)
- [AIRFLOW-5258] ElasticSearch log handler, has 2 times of hours (%H and %I) in _clean_execution_dat (#5864)
- [AIRFLOW-5348] Escape Label in deprecated chart view when set via JS (#5952)
Expand Down Expand Up @@ -3263,7 +3263,7 @@ Bug fixes
- [AIRFLOW-3072] Only admin can view logs in RBAC UI
- [AIRFLOW-3079] Improve initdb to support MSSQL Server
- [AIRFLOW-3089] Google auth doesn't work under http
- [AIRFLOW-3099] Errors raised when some blocs are missing in airflow.cfg
- [AIRFLOW-3099] Errors raised when some blocks are missing in airflow.cfg
- [AIRFLOW-3109] Default user permission should contain 'can_clear'
- [AIRFLOW-3111] Confusing comments and instructions for log templates in UPDATING.md and default_airflow.cfg
- [AIRFLOW-3124] Broken webserver debug mode (RBAC)
Expand Down
66 changes: 29 additions & 37 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,32 @@ def _value_is_hardcoded_default(cls, attrname: str, value: Any, instance: Any) -
return True
return False

@classmethod
def _serialize_params_dict(cls, params: ParamsDict):
"""Serialize Params dict for a DAG/Task"""
serialized_params = {}
for k, v in params.items():
# TODO: As of now, we would allow serialization of params which are of type Param only
if f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param':
serialized_params[k] = v.dump()
else:
raise ValueError('Params to a DAG or a Task can be only of type airflow.models.param.Param')
return serialized_params

@classmethod
def _deserialize_params_dict(cls, encoded_params: Dict) -> ParamsDict:
"""Deserialize a DAGs Params dict"""
op_params = {}
for k, v in encoded_params.items():
if isinstance(v, dict) and "__class" in v:
param_class = import_string(v['__class'])
op_params[k] = param_class(**v)
else:
# Old style params, upgrade it
op_params[k] = Param(v)

return ParamsDict(op_params)


class DependencyDetector:
"""Detects dependencies between DAGs."""
Expand Down Expand Up @@ -584,7 +610,7 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
elif k == "deps":
v = cls._deserialize_deps(v)
elif k == "params":
v = cls._deserialize_operator_params(v)
v = cls._deserialize_params_dict(v)
elif k in cls._decorated_fields or k not in op.get_serialized_fields():
v = cls._deserialize(v)
# else use v as it is
Expand Down Expand Up @@ -721,17 +747,6 @@ def _serialize_operator_extra_links(cls, operator_extra_links: Iterable[BaseOper

return serialize_operator_extra_links

@classmethod
def _deserialize_operator_params(cls, encoded_op_params: Dict) -> Dict[str, Param]:
"""Deserialize Params dict of a operator"""
op_params = {}
for k, v in encoded_op_params.items():
param_class = import_string(v['__class'])
del v['__class']
op_params[k] = param_class(**v)

return ParamsDict(op_params)

@classmethod
def _serialize_operator_params(cls, op_params: ParamsDict):
"""Serialize Params dict of a operator"""
Expand Down Expand Up @@ -802,7 +817,7 @@ def serialize_dag(cls, dag: DAG) -> dict:

# Edge info in the JSON exactly matches our internal structure
serialize_dag["edge_info"] = dag.edge_info
serialize_dag["params"] = cls._serialize_dag_params(dag.params)
serialize_dag["params"] = cls._serialize_params_dict(dag.params)

# has_on_*_callback are only stored if the value is True, as the default is False
if dag.has_on_success_callback:
Expand Down Expand Up @@ -843,7 +858,7 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) -> 'SerializedDAG':
elif k in cls._decorated_fields:
v = cls._deserialize(v)
elif k == "params":
v = cls._deserialize_dag_params(v)
v = cls._deserialize_params_dict(v)
# else use v as it is

setattr(dag, k, v)
Expand Down Expand Up @@ -915,29 +930,6 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
raise ValueError(f"Unsure how to deserialize version {ver!r}")
return cls.deserialize_dag(serialized_obj['dag'])

@classmethod
def _serialize_dag_params(cls, dag_params: ParamsDict):
"""Serialize Params dict for a DAG"""
serialized_params = {}
for k, v in dag_params.items():
# TODO: As of now, we would allow serialization of params which are of type Param only
if f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param':
serialized_params[k] = v.dump()
else:
raise ValueError('Params to a DAG can be only of type airflow.models.param.Param')
return serialized_params

@classmethod
def _deserialize_dag_params(cls, encoded_dag_params: Dict) -> ParamsDict:
"""Deserialize a DAGs Params dict"""
op_params = {}
for k, v in encoded_dag_params.items():
param_class = import_string(v['__class'])
del v['__class']
op_params[k] = param_class(**v)

return ParamsDict(op_params)


class SerializedTaskGroup(TaskGroup, BaseSerialization):
"""A JSON serializable representation of TaskGroup."""
Expand Down
12 changes: 6 additions & 6 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1225,17 +1225,17 @@
"timeoutSeconds": {
"description": "Scheduler Liveness probe timeout seconds.",
"type": "integer",
"default": 5
"default": 10
},
"failureThreshold": {
"description": "Scheduler Liveness probe failure threshold.",
"type": "integer",
"default": 10
"default": 5
},
"periodSeconds": {
"description": "Scheduler Liveness probe period seconds.",
"type": "integer",
"default": 30
"default": 60
}
}
},
Expand Down Expand Up @@ -1483,17 +1483,17 @@
"timeoutSeconds": {
"description": "Triggerer Liveness probe timeout seconds.",
"type": "integer",
"default": 5
"default": 10
},
"failureThreshold": {
"description": "Triggerer Liveness probe failure threshold.",
"type": "integer",
"default": 10
"default": 5
},
"periodSeconds": {
"description": "Triggerer Liveness probe period seconds.",
"type": "integer",
"default": 30
"default": 60
}
}
},
Expand Down
16 changes: 8 additions & 8 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,13 @@ workers:

# Airflow scheduler settings
scheduler:
# If the scheduler stops heartbeating for 5 minutes (10*30s) kill the
# If the scheduler stops heartbeating for 5 minutes (5*60s) kill the
# scheduler and let Kubernetes restart it
livenessProbe:
initialDelaySeconds: 10
timeoutSeconds: 5
failureThreshold: 10
periodSeconds: 30
timeoutSeconds: 10
failureThreshold: 5
periodSeconds: 60
# Airflow 2.0 allows users to run multiple schedulers,
# However this feature is only recommended for MySQL 8+ and Postgres
replicas: 1
Expand Down Expand Up @@ -787,13 +787,13 @@ triggerer:
maxSurge: "100%"
maxUnavailable: "50%"

# If the triggerer stops heartbeating for 5 minutes (10*30s) kill the
# If the triggerer stops heartbeating for 5 minutes (5*60s) kill the
# triggerer and let Kubernetes restart it
livenessProbe:
initialDelaySeconds: 10
timeoutSeconds: 5
failureThreshold: 10
periodSeconds: 30
timeoutSeconds: 10
failureThreshold: 5
periodSeconds: 60

# Create ServiceAccount
serviceAccount:
Expand Down
18 changes: 18 additions & 0 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,24 @@ def test_serialized_objects_are_sorted(self, object_to_serialized, expected_outp
serialized_obj = serialized_obj["__var"]
assert serialized_obj == expected_output

def test_params_upgrade(self):
serialized = {
"__version": 1,
"dag": {
"_dag_id": "simple_dag",
"fileloc": __file__,
"tasks": [],
"timezone": "UTC",
"params": {"none": None, "str": "str", "dict": {"a": "b"}},
},
}
SerializedDAG.validate_schema(serialized)
dag = SerializedDAG.from_dict(serialized)

assert dag.params["none"] is None
assert isinstance(dict.__getitem__(dag.params, "none"), Param)
assert dag.params["str"] == "str"


def test_kubernetes_optional():
"""Serialisation / deserialisation continues to work without kubernetes installed"""
Expand Down

0 comments on commit 7ff533d

Please sign in to comment.