diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 86d8ba1..1651076 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -48,6 +48,7 @@ jobs: run: | brew install portaudio echo "MANUAL_OS_SET=Darwin" >> $GITHUB_ENV + echo "OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES" >> $GITHUB_ENV - uses: actions/checkout@v3 name: Checkout ChimeraPy-Engine (Bleeding Edge) diff --git a/chimerapy/orchestrator/init_services.py b/chimerapy/orchestrator/init_services.py index 662f70b..b29f5b0 100644 --- a/chimerapy/orchestrator/init_services.py +++ b/chimerapy/orchestrator/init_services.py @@ -42,10 +42,9 @@ def get(name): def teardown(): """Teardown the services.""" - manager = available_services.get("cluster_manager") + manager: ClusterManager = available_services.get("cluster_manager") if manager and not manager.has_shutdown(): manager.shutdown() for worker in available_services.get("workers", []): - if not worker.has_shutdown: - worker.shutdown() + worker.shutdown() diff --git a/chimerapy/orchestrator/models/cluster_models.py b/chimerapy/orchestrator/models/cluster_models.py index d644958..4a36ab8 100644 --- a/chimerapy/orchestrator/models/cluster_models.py +++ b/chimerapy/orchestrator/models/cluster_models.py @@ -14,17 +14,50 @@ ) +class RegisteredMethod(BaseModel): + name: str + style: Literal["concurrent", "blocking", "reset"] = "concurrent" + params: Dict[str, str] = Field(default_factory=dict) + + +class NodeDiagnostics(BaseModel): + timestamp: str + latency: float + payload_size: float + memory_usage: float + cpu_usage: float + num_of_steps: int + + class NodeState(BaseModel): id: str name: str = "" - init: bool = False - connected: bool = False - ready: bool = False - finished: bool = False port: int = 0 + fsm: Literal[ + "NULL", + "INITIALIZED", + "CONNECTED", + "READY", + "PREVIEWING", + "RECORDING", + "STOPPED", + "SAVED", + "SHUTDOWN", + ] + registered_methods: Dict[str, RegisteredMethod] = Field( + default_factory=dict + ) + logdir: Optional[str] = None + diagnostics: NodeDiagnostics @classmethod def from_cp_node_state(cls, node_state: _NodeState): + node_state_dict = node_state.to_dict() + node_state_dict["logdir"] = ( + str(node_state_dict["logdir"]) + if node_state_dict["logdir"] is not None + else None + ) return cls(**node_state.to_dict()) model_config: ClassVar[ConfigDict] = ConfigDict(frozen=True, extra="forbid") @@ -36,10 +69,12 @@ class WorkerState(BaseModel): port: int = 0 ip: str = "" nodes: Dict[str, NodeState] = Field(default_factory=dict) + tempfolder: str = "" @classmethod def from_cp_worker_state(cls, worker_state: _WorkerState): state_dict = worker_state.to_dict() + state_dict["tempfolder"] = str(state_dict["tempfolder"]) return cls( **state_dict, ) @@ -54,16 +89,16 @@ class ClusterState(BaseModel): workers: Dict[str, WorkerState] = Field(default_factory=dict) logs_subscription_port: Optional[int] = None - running: bool = False - collecting: bool = False - collection_status: Optional[Literal["PASS", "FAIL"]] = None + log_sink_enabled: bool = False zeroconf_discovery: bool = False + logdir: str = None @classmethod def from_cp_manager_state( cls, state: ManagerState, zeroconf_discovery: bool ): state_dict = state.to_dict() + state_dict["logdir"] = str(state_dict["logdir"]) return cls(**state_dict, zeroconf_discovery=zeroconf_discovery) model_config: ClassVar[ConfigDict] = ConfigDict(frozen=True, extra="forbid") diff --git a/chimerapy/orchestrator/services/cluster_service/cluster_manager.py b/chimerapy/orchestrator/services/cluster_service/cluster_manager.py index 8b477a7..8d30e55 100644 --- a/chimerapy/orchestrator/services/cluster_service/cluster_manager.py +++ b/chimerapy/orchestrator/services/cluster_service/cluster_manager.py @@ -137,7 +137,7 @@ async def update_network_status(self) -> None: def is_zeroconf_discovery_enabled(self) -> bool: """Check if zeroconf discovery is enabled.""" - return self._manager.services.zeroconf.enabled + return self._manager.zeroconf_service.enabled async def instantiate_pipeline( self, pipeline_id diff --git a/chimerapy/orchestrator/services/cluster_service/updates_broadcaster.py b/chimerapy/orchestrator/services/cluster_service/updates_broadcaster.py index e2e51da..1f1ba94 100644 --- a/chimerapy/orchestrator/services/cluster_service/updates_broadcaster.py +++ b/chimerapy/orchestrator/services/cluster_service/updates_broadcaster.py @@ -110,7 +110,7 @@ async def add_client( await self.updater.add_client(q) if message is not None: - await q.put(message.dict()) + await q.put(message.model_dump(mode="json")) async def remove_client(self, q: asyncio.Queue) -> None: """Remove a client queue from the broadcaster.""" @@ -139,7 +139,7 @@ async def broadcast_updates(self) -> None: else: msg = None if msg is not None: - msg_dict = msg.dict() + msg_dict = msg.model_dump(mode="json") await self.updater.put_update(msg_dict) if msg and msg.signal is UpdateMessageType.SHUTDOWN: break @@ -165,7 +165,7 @@ async def put_update(self, msg: Dict[str, Any]) -> None: UpdateMessageType.NETWORK_UPDATE, self.zeroconf_enabled, ) - await self.updater.put_update(update_msg.dict()) + await self.updater.put_update(update_msg.model_dump(mode="json")) @staticmethod def is_cluster_update_message(msg: Dict[str, Any]) -> bool: diff --git a/chimerapy/orchestrator/tests/models/test_cluster_models.py b/chimerapy/orchestrator/tests/models/test_cluster_models.py new file mode 100644 index 0000000..8ff4853 --- /dev/null +++ b/chimerapy/orchestrator/tests/models/test_cluster_models.py @@ -0,0 +1,223 @@ +from pathlib import Path + +import pytest + +from chimerapy.engine.states import ( + ManagerState as _ManagerState, +) +from chimerapy.engine.states import ( + NodeState as _NodeState, +) +from chimerapy.engine.states import ( + RegisteredMethod as _RegisteredMethod, +) +from chimerapy.engine.states import ( + WorkerState as _WorkerState, +) +from chimerapy.orchestrator.models.cluster_models import ( + ClusterState, + NodeState, + RegisteredMethod, + WorkerState, +) +from chimerapy.orchestrator.tests.base_test import BaseTest + + +class TestClusterModels(BaseTest): + @pytest.fixture(scope="class") + def m_empty(self): + return _ManagerState( + id="manager1", + ) + + @pytest.fixture(scope="class") + def m_populated(self): + return _ManagerState( + id="manager1", + logdir=Path("/tmp24"), + ip="192.168.2.0", + port=55000, + workers={}, + log_sink_enabled=True, + logs_subscription_port=55001, + ) + + @pytest.fixture(scope="class") + def m_w_empty(self): + return _ManagerState( + id="manager1", + logdir=Path("/tmp24"), + ip="192.168.2.0", + port=55000, + log_sink_enabled=True, + logs_subscription_port=55001, + workers={ + "w1": _WorkerState( + id="w1", + name="worker1", + port=55002, + ip="192.168.2.1", + nodes={}, + tempfolder=Path("/tmp24/w1"), + ), + "w2": _WorkerState( + id="w2", + name="worker2", + port=55003, + ip="192.168.2.2", + nodes={}, + tempfolder=Path("/tmp24/w2"), + ), + }, + ) + + @pytest.fixture(scope="class") + def m_w_populated(self): + return _ManagerState( + id="manager1", + logdir=Path("/tmp24"), + ip="192.168.2.0", + port=55000, + log_sink_enabled=True, + logs_subscription_port=55001, + workers={ + "w1": _WorkerState( + id="w1", + name="worker1", + port=55002, + ip="192.168.2.1", + nodes={ + "n1": _NodeState( + id="n1", + name="node1", + port=55004, + fsm="NULL", + registered_methods={ + "m1": _RegisteredMethod( + name="func1", + style="concurrent", + params={ + "p1": "int", + }, + ) + }, + ) + }, + tempfolder=Path("/tmp24/w1"), + ), + "w2": _WorkerState( + id="w2", + name="worker2", + port=55003, + ip="192.168.2.2", + nodes={ + "n2": _NodeState( + id="n2", + name="node2", + port=55005, + fsm="INITIALIZED", + registered_methods={ + "m2": _RegisteredMethod( + name="func2", + style="concurrent", + params={ + "p2": "float", + }, + ) + }, + ) + }, + tempfolder=Path("/tmp24/w2"), + ), + }, + ) + + def test_m_empty(self, m_empty): + manager_state = ClusterState.from_cp_manager_state( + m_empty, zeroconf_discovery=False + ) + assert manager_state.id == "manager1" + assert manager_state.logdir == str(Path.cwd()) + assert manager_state.log_sink_enabled is False + assert manager_state.logs_subscription_port is None + assert manager_state.ip == "0.0.0.0" + assert manager_state.port == 0 + assert manager_state.workers == {} + + def test_m_populated(self, m_populated): + manager_state = ClusterState.from_cp_manager_state( + m_populated, zeroconf_discovery=True + ) + assert manager_state.id == "manager1" + assert manager_state.logdir == "/tmp24" + assert manager_state.log_sink_enabled is True + assert manager_state.logs_subscription_port == 55001 + assert manager_state.ip == "192.168.2.0" + assert manager_state.port == 55000 + assert manager_state.workers == {} + assert manager_state.zeroconf_discovery is True + + def test_m_w_empty(self, m_w_empty): + manager_state = ClusterState.from_cp_manager_state( + m_w_empty, zeroconf_discovery=True + ) + assert len(manager_state.workers) == 2 + w1 = manager_state.workers["w1"] + assert w1.id == "w1" + assert w1.name == "worker1" + assert w1.port == 55002 + assert w1.ip == "192.168.2.1" + assert w1.tempfolder == "/tmp24/w1" + assert w1.nodes == {} + assert isinstance(w1, WorkerState) + + w2 = manager_state.workers["w2"] + assert w2.id == "w2" + assert w2.name == "worker2" + assert w2.port == 55003 + assert w2.ip == "192.168.2.2" + assert w2.tempfolder == "/tmp24/w2" + assert w2.nodes == {} + assert isinstance(w2, WorkerState) + + def test_m_w_populated(self, m_w_populated): + manager_state = ClusterState.from_cp_manager_state( + m_w_populated, zeroconf_discovery=True + ) + assert len(manager_state.workers) == 2 + w1 = manager_state.workers["w1"] + + n1 = w1.nodes["n1"] + assert isinstance(n1, NodeState) + assert n1.id == "n1" + assert n1.name == "node1" + assert n1.port == 55004 + assert n1.fsm == "NULL" + + assert n1.registered_methods == { + "m1": RegisteredMethod( + name="func1", + style="concurrent", + params={ + "p1": "int", + }, + ) + } + + w2 = manager_state.workers["w2"] + n2 = w2.nodes["n2"] + assert isinstance(n2, NodeState) + assert n2.id == "n2" + assert n2.name == "node2" + assert n2.port == 55005 + assert n2.fsm == "INITIALIZED" + + assert n2.registered_methods == { + "m2": RegisteredMethod( + name="func2", + style="concurrent", + params={ + "p2": "float", + }, + ) + } diff --git a/chimerapy/orchestrator/tests/models/test_pipeline_config.py b/chimerapy/orchestrator/tests/models/test_pipeline_config.py index 46ce84e..ede5d64 100644 --- a/chimerapy/orchestrator/tests/models/test_pipeline_config.py +++ b/chimerapy/orchestrator/tests/models/test_pipeline_config.py @@ -17,7 +17,7 @@ def dummy_pipeline_config(self) -> ChimeraPyPipelineConfig: def test_pipeline_config(self, dummy_pipeline_config): assert dummy_pipeline_config.name == "Pipeline" - assert dummy_pipeline_config.description == "A pipeline" + assert dummy_pipeline_config.description == "" assert dummy_pipeline_config.runtime == 2000 def test_worker_config(self, dummy_pipeline_config): diff --git a/chimerapy/orchestrator/tests/services/cluster_service/test_cluster_manager.py b/chimerapy/orchestrator/tests/services/cluster_service/test_cluster_manager.py index c469bb9..1aacd95 100644 --- a/chimerapy/orchestrator/tests/services/cluster_service/test_cluster_manager.py +++ b/chimerapy/orchestrator/tests/services/cluster_service/test_cluster_manager.py @@ -67,11 +67,15 @@ def test_get_network(self, cluster_manager): assert cluster_manager.get_network().map( lambda n: n.to_dict() ).unwrap() == { - "id": "Manager", + "id": cluster_manager._manager.state.id, # pylint: disable=protected-access + "workers": {}, "ip": get_ip_address(), "port": cluster_manager._manager.port, # pylint: disable=protected-access - "workers": {}, "logs_subscription_port": None, + "log_sink_enabled": True, + "logdir": str( + cluster_manager._manager.logdir + ), # pylint: disable=protected-access } @pytest.mark.timeout(30) @@ -134,7 +138,7 @@ async def test_pipeline_operations(self, cluster_manager, pipeline_test): # Stop and Back to preview stop_result = await cluster_manager.stop_pipeline() - await asyncio.sleep(5) # 5 Seconds to stop + await asyncio.sleep(10) # 10 Seconds to stop assert stop_result.ok().is_some() assert cluster_manager.current_state.name == "STOPPED"