Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TSC support to Tron #1003

Merged
merged 3 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions requirements-minimal.txt
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change really shouldn't be necessary (and check-requirements is fine internally without this) - but i couldn't figure out what's going on

(and in any case, hopefully we can de-pickle things in the near future and actually delete all the unused mesos code)

Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
addict # not sure why check-requirements is not picking this up from task_processing[mesos_executor]
argcomplete
boto3
bsddb3
cryptography
dataclasses
ecdsa>=0.13.3
http-parser # not sure why check-requirements is not picking this up from task_processing[mesos_executor]
humanize
ipdb
ipython
Expand All @@ -15,6 +17,7 @@ psutil
py-bcrypt
pyasn1
pyformance
pymesos # not sure why check-requirements is not picking this up from task_processing[mesos_executor]
pysensu-yelp
PyStaticConfiguration
pytimeparse
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ setuptools==65.5.1
six==1.15.0
sshpubkeys==3.1.0
stack-data==0.6.2
task-processing==1.2.0
task-processing==1.3.0
traitlets==5.0.0
Twisted==22.10.0
typing-extensions==4.5.0
Expand Down
1 change: 1 addition & 0 deletions tests/core/actionrun_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1900,6 +1900,7 @@ def test_recover(self, mock_cluster_repo, mock_filehandler, mock_k8s_action_run)
task_id=last_attempt.kubernetes_task_id,
node_selectors=mock_k8s_action_run.command_config.node_selectors,
node_affinities=mock_k8s_action_run.command_config.node_affinities,
topology_spread_constraints=mock_k8s_action_run.command_config.topology_spread_constraints,
pod_labels=mock_k8s_action_run.command_config.labels,
pod_annotations=mock_k8s_action_run.command_config.annotations,
service_account_name=mock_k8s_action_run.command_config.service_account_name,
Expand Down
6 changes: 6 additions & 0 deletions tests/kubernetes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ def test_create_task_disabled():
cap_drop=[],
node_selectors={"yelp.com/pool": "default"},
node_affinities=[],
topology_spread_constraints=[],
pod_labels={},
pod_annotations={},
service_account_name=None,
Expand Down Expand Up @@ -504,6 +505,7 @@ def test_create_task(mock_kubernetes_cluster):
cap_drop=[],
node_selectors={"yelp.com/pool": "default"},
node_affinities=[],
topology_spread_constraints=[],
pod_labels={},
pod_annotations={},
service_account_name=None,
Expand Down Expand Up @@ -535,6 +537,7 @@ def test_create_task_with_task_id(mock_kubernetes_cluster):
cap_drop=[],
node_selectors={"yelp.com/pool": "default"},
node_affinities=[],
topology_spread_constraints=[],
pod_labels={},
pod_annotations={},
service_account_name=None,
Expand Down Expand Up @@ -569,6 +572,7 @@ def test_create_task_with_invalid_task_id(mock_kubernetes_cluster):
cap_drop=[],
node_selectors={"yelp.com/pool": "default"},
node_affinities=[],
topology_spread_constraints=[],
pod_labels={},
pod_annotations={},
service_account_name=None,
Expand Down Expand Up @@ -616,6 +620,7 @@ def test_create_task_with_config(mock_kubernetes_cluster):
"cap_drop": ["KILL", "CHOWN"],
"node_selectors": {"yelp.com/pool": "default"},
"node_affinities": [],
"topology_spread_constraints": [],
"labels": {},
"annotations": {},
"service_account_name": None,
Expand All @@ -641,6 +646,7 @@ def test_create_task_with_config(mock_kubernetes_cluster):
cap_drop=["KILL", "CHOWN"],
node_selectors={"yelp.com/pool": "default"},
node_affinities=[],
topology_spread_constraints=[],
pod_labels={},
pod_annotations={},
service_account_name=None,
Expand Down
44 changes: 44 additions & 0 deletions tron/config/config_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from tron.config.schema import ConfigSecretVolumeItem
from tron.config.schema import ConfigSSHOptions
from tron.config.schema import ConfigState
from tron.config.schema import ConfigTopologySpreadConstraints
from tron.config.schema import ConfigVolume
from tron.config.schema import MASTER_NAMESPACE
from tron.config.schema import NamedTronConfig
Expand Down Expand Up @@ -394,6 +395,41 @@ class ValidateNodeAffinity(Validator):
valid_node_affinity = ValidateNodeAffinity()


def _valid_when_unsatisfiable(value: str, config_context: ConfigContext) -> str:
valid_values = {"DoNotSchedule", "ScheduleAnyway"}
if value not in valid_values:
raise ConfigError(f"Got {value} as a when_unsatisfiable value, expected one of {valid_values}")

return value


def _valid_topology_spread_label_selector(value: Dict[str, str], config_context: ConfigContext) -> Dict[str, str]:
if not value:
raise ConfigError("TopologySpreadConstraints must have a label_selector")

# XXX: we probably also want to enforce k8s limits for label lengths and whatnot
if not all(isinstance(k, str) for k in value.keys()):
raise ConfigError("TopologySpreadConstraints label_selector keys must be strings")

if not all(isinstance(s, str) for s in value.values()):
raise ConfigError("TopologySpreadConstraints label_selector values must be strings")

return value


class ValidateTopologySpreadConstraints(Validator):
config_class = ConfigTopologySpreadConstraints
validators = {
"max_skew": valid_int,
"when_unsatisfiable": _valid_when_unsatisfiable,
"topology_key": valid_string,
"label_selector": _valid_topology_spread_label_selector,
}


valid_topology_spread_constraints = ValidateTopologySpreadConstraints()


class ValidateSSHOptions(Validator):
"""Validate SSH options."""

Expand Down Expand Up @@ -564,6 +600,7 @@ class ValidateAction(Validator):
"trigger_timeout": None,
"node_selectors": None,
"node_affinities": None,
"topology_spread_constraints": None,
"labels": None,
"annotations": None,
"service_account_name": None,
Expand Down Expand Up @@ -605,6 +642,9 @@ class ValidateAction(Validator):
"trigger_timeout": config_utils.valid_time_delta,
"node_selectors:": valid_dict,
"node_affinities": build_list_of_type_validator(valid_node_affinity, allow_empty=True),
"topology_spread_constraints": build_list_of_type_validator(
valid_topology_spread_constraints, allow_empty=True
),
"labels:": valid_dict,
"annotations": valid_dict,
"service_account_name": valid_string,
Expand Down Expand Up @@ -655,6 +695,7 @@ class ValidateCleanupAction(Validator):
"trigger_timeout": None,
"node_selectors": None,
"node_affinities": None,
"topology_spread_constraints": None,
"labels": None,
"annotations": None,
"service_account_name": None,
Expand Down Expand Up @@ -691,6 +732,9 @@ class ValidateCleanupAction(Validator):
"trigger_timeout": config_utils.valid_time_delta,
"node_selectors:": valid_dict,
"node_affinities": build_list_of_type_validator(valid_node_affinity, allow_empty=True),
"topology_spread_constraints": build_list_of_type_validator(
valid_topology_spread_constraints, allow_empty=True
),
"labels": valid_dict,
"annotations": valid_dict,
"service_account_name": valid_string,
Expand Down
8 changes: 8 additions & 0 deletions tron/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def config_object_factory(name, required=None, optional=None):
"trigger_timeout", # datetime.deltatime or None
"node_selectors", # Dict of str, str
"node_affinities", # List of ConfigNodeAffinity
"topology_spread_constraints", # List of ConfigTopologySpreadConstraints
"labels", # Dict of str, str
"annotations", # Dict of str, str
"service_account_name", # str
Expand Down Expand Up @@ -222,6 +223,7 @@ def config_object_factory(name, required=None, optional=None):
"trigger_timeout", # datetime.deltatime or None
"node_selectors", # Dict of str, str
"node_affinities", # List of ConfigNodeAffinity
"topology_spread_constraints", # List of ConfigTopologySpreadConstraints
"labels", # Dict of str, str
"annotations", # Dict of str, str
"service_account_name", # str
Expand Down Expand Up @@ -306,6 +308,12 @@ def _asdict(self) -> dict:
optional=[],
)

ConfigTopologySpreadConstraints = config_object_factory(
name="ConfigTopologySpreadConstraints",
required=["max_skew", "label_selector", "topology_key", "when_unsatisfiable"],
optional=[],
)

ConfigParameter = config_object_factory(
name="ConfigParameter",
required=[
Expand Down
3 changes: 3 additions & 0 deletions tron/core/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from tron.config.schema import ConfigNodeAffinity
from tron.config.schema import ConfigProjectedSAVolume
from tron.config.schema import ConfigSecretVolume
from tron.config.schema import ConfigTopologySpreadConstraints

log = logging.getLogger(__name__)

Expand All @@ -39,6 +40,7 @@ class ActionCommandConfig:
extra_volumes: set = field(default_factory=set)
node_selectors: dict = field(default_factory=dict)
node_affinities: List[ConfigNodeAffinity] = field(default_factory=list)
topology_spread_constraints: List[ConfigTopologySpreadConstraints] = field(default_factory=list)
labels: dict = field(default_factory=dict)
annotations: dict = field(default_factory=dict)
service_account_name: Optional[str] = None
Expand Down Expand Up @@ -98,6 +100,7 @@ def from_config(cls, config: ConfigAction) -> "Action":
cap_drop=config.cap_drop or [],
node_selectors=config.node_selectors or {},
node_affinities=config.node_affinities or [],
topology_spread_constraints=config.topology_spread_constraints or [],
labels=config.labels or {},
annotations=config.annotations or {},
service_account_name=config.service_account_name or None,
Expand Down
2 changes: 2 additions & 0 deletions tron/core/actionrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,7 @@ def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]:
cap_drop=attempt.command_config.cap_drop,
node_selectors=attempt.command_config.node_selectors,
node_affinities=attempt.command_config.node_affinities,
topology_spread_constraints=attempt.command_config.topology_spread_constraints,
pod_labels=build_labels(run_id=self.id, original_labels=attempt.command_config.labels),
pod_annotations=attempt.command_config.annotations,
service_account_name=attempt.command_config.service_account_name,
Expand Down Expand Up @@ -1253,6 +1254,7 @@ def recover(self) -> Optional[KubernetesTask]:
task_id=last_attempt.kubernetes_task_id,
node_selectors=last_attempt.command_config.node_selectors,
node_affinities=last_attempt.command_config.node_affinities,
topology_spread_constraints=last_attempt.command_config.topology_spread_constraints,
pod_labels=build_labels(run_id=self.id, original_labels=last_attempt.command_config.labels),
pod_annotations=last_attempt.command_config.annotations,
service_account_name=last_attempt.command_config.service_account_name,
Expand Down
3 changes: 3 additions & 0 deletions tron/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from tron.config.schema import ConfigProjectedSAVolume
from tron.config.schema import ConfigSecretSource
from tron.config.schema import ConfigSecretVolume
from tron.config.schema import ConfigTopologySpreadConstraints
from tron.config.schema import ConfigVolume
from tron.serialize.filehandler import OutputStreamSerializer
from tron.utils import exitcode
Expand Down Expand Up @@ -490,6 +491,7 @@ def create_task(
cap_drop: Collection[str],
node_selectors: Dict[str, str],
node_affinities: List[ConfigNodeAffinity],
topology_spread_constraints: List[ConfigTopologySpreadConstraints],
pod_labels: Dict[str, str],
pod_annotations: Dict[str, str],
service_account_name: Optional[str],
Expand Down Expand Up @@ -529,6 +531,7 @@ def create_task(
],
node_selectors=node_selectors,
node_affinities=[affinity._asdict() for affinity in node_affinities],
topology_spread_constraints=[tsc._asdict() for tsc in topology_spread_constraints],
labels=pod_labels,
annotations=pod_annotations,
service_account_name=service_account_name,
Expand Down
Loading