Skip to content

Commit

Permalink
Integrate Upstream Engine Changes. Closes #157 (#160)
Browse files Browse the repository at this point in the history
* WIP- Cluster state changes

* Integrate Upstream Changes. Closes #157

* Wait for 10 seconds

* OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES

* Fix init_services
  • Loading branch information
umesh-timalsina authored Sep 19, 2023
1 parent 6ffed59 commit c0ea459
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 18 deletions.
1 change: 1 addition & 0 deletions .github/workflows/CI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ jobs:
run: |
brew install portaudio
echo "MANUAL_OS_SET=Darwin" >> $GITHUB_ENV
echo "OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES" >> $GITHUB_ENV
- uses: actions/checkout@v3
name: Checkout ChimeraPy-Engine (Bleeding Edge)
Expand Down
5 changes: 2 additions & 3 deletions chimerapy/orchestrator/init_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ def get(name):

def teardown():
"""Teardown the services."""
manager = available_services.get("cluster_manager")
manager: ClusterManager = available_services.get("cluster_manager")
if manager and not manager.has_shutdown():
manager.shutdown()

for worker in available_services.get("workers", []):
if not worker.has_shutdown:
worker.shutdown()
worker.shutdown()
49 changes: 42 additions & 7 deletions chimerapy/orchestrator/models/cluster_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,50 @@
)


class RegisteredMethod(BaseModel):
name: str
style: Literal["concurrent", "blocking", "reset"] = "concurrent"
params: Dict[str, str] = Field(default_factory=dict)


class NodeDiagnostics(BaseModel):
timestamp: str
latency: float
payload_size: float
memory_usage: float
cpu_usage: float
num_of_steps: int


class NodeState(BaseModel):
id: str
name: str = ""
init: bool = False
connected: bool = False
ready: bool = False
finished: bool = False
port: int = 0
fsm: Literal[
"NULL",
"INITIALIZED",
"CONNECTED",
"READY",
"PREVIEWING",
"RECORDING",
"STOPPED",
"SAVED",
"SHUTDOWN",
]
registered_methods: Dict[str, RegisteredMethod] = Field(
default_factory=dict
)
logdir: Optional[str] = None
diagnostics: NodeDiagnostics

@classmethod
def from_cp_node_state(cls, node_state: _NodeState):
node_state_dict = node_state.to_dict()
node_state_dict["logdir"] = (
str(node_state_dict["logdir"])
if node_state_dict["logdir"] is not None
else None
)
return cls(**node_state.to_dict())

model_config: ClassVar[ConfigDict] = ConfigDict(frozen=True, extra="forbid")
Expand All @@ -36,10 +69,12 @@ class WorkerState(BaseModel):
port: int = 0
ip: str = ""
nodes: Dict[str, NodeState] = Field(default_factory=dict)
tempfolder: str = ""

@classmethod
def from_cp_worker_state(cls, worker_state: _WorkerState):
state_dict = worker_state.to_dict()
state_dict["tempfolder"] = str(state_dict["tempfolder"])
return cls(
**state_dict,
)
Expand All @@ -54,16 +89,16 @@ class ClusterState(BaseModel):
workers: Dict[str, WorkerState] = Field(default_factory=dict)

logs_subscription_port: Optional[int] = None
running: bool = False
collecting: bool = False
collection_status: Optional[Literal["PASS", "FAIL"]] = None
log_sink_enabled: bool = False
zeroconf_discovery: bool = False
logdir: str = None

@classmethod
def from_cp_manager_state(
cls, state: ManagerState, zeroconf_discovery: bool
):
state_dict = state.to_dict()
state_dict["logdir"] = str(state_dict["logdir"])
return cls(**state_dict, zeroconf_discovery=zeroconf_discovery)

model_config: ClassVar[ConfigDict] = ConfigDict(frozen=True, extra="forbid")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def update_network_status(self) -> None:

def is_zeroconf_discovery_enabled(self) -> bool:
"""Check if zeroconf discovery is enabled."""
return self._manager.services.zeroconf.enabled
return self._manager.zeroconf_service.enabled

async def instantiate_pipeline(
self, pipeline_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def add_client(
await self.updater.add_client(q)

if message is not None:
await q.put(message.dict())
await q.put(message.model_dump(mode="json"))

async def remove_client(self, q: asyncio.Queue) -> None:
"""Remove a client queue from the broadcaster."""
Expand Down Expand Up @@ -139,7 +139,7 @@ async def broadcast_updates(self) -> None:
else:
msg = None
if msg is not None:
msg_dict = msg.dict()
msg_dict = msg.model_dump(mode="json")
await self.updater.put_update(msg_dict)
if msg and msg.signal is UpdateMessageType.SHUTDOWN:
break
Expand All @@ -165,7 +165,7 @@ async def put_update(self, msg: Dict[str, Any]) -> None:
UpdateMessageType.NETWORK_UPDATE,
self.zeroconf_enabled,
)
await self.updater.put_update(update_msg.dict())
await self.updater.put_update(update_msg.model_dump(mode="json"))

@staticmethod
def is_cluster_update_message(msg: Dict[str, Any]) -> bool:
Expand Down
223 changes: 223 additions & 0 deletions chimerapy/orchestrator/tests/models/test_cluster_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
from pathlib import Path

import pytest

from chimerapy.engine.states import (
ManagerState as _ManagerState,
)
from chimerapy.engine.states import (
NodeState as _NodeState,
)
from chimerapy.engine.states import (
RegisteredMethod as _RegisteredMethod,
)
from chimerapy.engine.states import (
WorkerState as _WorkerState,
)
from chimerapy.orchestrator.models.cluster_models import (
ClusterState,
NodeState,
RegisteredMethod,
WorkerState,
)
from chimerapy.orchestrator.tests.base_test import BaseTest


class TestClusterModels(BaseTest):
@pytest.fixture(scope="class")
def m_empty(self):
return _ManagerState(
id="manager1",
)

@pytest.fixture(scope="class")
def m_populated(self):
return _ManagerState(
id="manager1",
logdir=Path("/tmp24"),
ip="192.168.2.0",
port=55000,
workers={},
log_sink_enabled=True,
logs_subscription_port=55001,
)

@pytest.fixture(scope="class")
def m_w_empty(self):
return _ManagerState(
id="manager1",
logdir=Path("/tmp24"),
ip="192.168.2.0",
port=55000,
log_sink_enabled=True,
logs_subscription_port=55001,
workers={
"w1": _WorkerState(
id="w1",
name="worker1",
port=55002,
ip="192.168.2.1",
nodes={},
tempfolder=Path("/tmp24/w1"),
),
"w2": _WorkerState(
id="w2",
name="worker2",
port=55003,
ip="192.168.2.2",
nodes={},
tempfolder=Path("/tmp24/w2"),
),
},
)

@pytest.fixture(scope="class")
def m_w_populated(self):
return _ManagerState(
id="manager1",
logdir=Path("/tmp24"),
ip="192.168.2.0",
port=55000,
log_sink_enabled=True,
logs_subscription_port=55001,
workers={
"w1": _WorkerState(
id="w1",
name="worker1",
port=55002,
ip="192.168.2.1",
nodes={
"n1": _NodeState(
id="n1",
name="node1",
port=55004,
fsm="NULL",
registered_methods={
"m1": _RegisteredMethod(
name="func1",
style="concurrent",
params={
"p1": "int",
},
)
},
)
},
tempfolder=Path("/tmp24/w1"),
),
"w2": _WorkerState(
id="w2",
name="worker2",
port=55003,
ip="192.168.2.2",
nodes={
"n2": _NodeState(
id="n2",
name="node2",
port=55005,
fsm="INITIALIZED",
registered_methods={
"m2": _RegisteredMethod(
name="func2",
style="concurrent",
params={
"p2": "float",
},
)
},
)
},
tempfolder=Path("/tmp24/w2"),
),
},
)

def test_m_empty(self, m_empty):
manager_state = ClusterState.from_cp_manager_state(
m_empty, zeroconf_discovery=False
)
assert manager_state.id == "manager1"
assert manager_state.logdir == str(Path.cwd())
assert manager_state.log_sink_enabled is False
assert manager_state.logs_subscription_port is None
assert manager_state.ip == "0.0.0.0"
assert manager_state.port == 0
assert manager_state.workers == {}

def test_m_populated(self, m_populated):
manager_state = ClusterState.from_cp_manager_state(
m_populated, zeroconf_discovery=True
)
assert manager_state.id == "manager1"
assert manager_state.logdir == "/tmp24"
assert manager_state.log_sink_enabled is True
assert manager_state.logs_subscription_port == 55001
assert manager_state.ip == "192.168.2.0"
assert manager_state.port == 55000
assert manager_state.workers == {}
assert manager_state.zeroconf_discovery is True

def test_m_w_empty(self, m_w_empty):
manager_state = ClusterState.from_cp_manager_state(
m_w_empty, zeroconf_discovery=True
)
assert len(manager_state.workers) == 2
w1 = manager_state.workers["w1"]
assert w1.id == "w1"
assert w1.name == "worker1"
assert w1.port == 55002
assert w1.ip == "192.168.2.1"
assert w1.tempfolder == "/tmp24/w1"
assert w1.nodes == {}
assert isinstance(w1, WorkerState)

w2 = manager_state.workers["w2"]
assert w2.id == "w2"
assert w2.name == "worker2"
assert w2.port == 55003
assert w2.ip == "192.168.2.2"
assert w2.tempfolder == "/tmp24/w2"
assert w2.nodes == {}
assert isinstance(w2, WorkerState)

def test_m_w_populated(self, m_w_populated):
manager_state = ClusterState.from_cp_manager_state(
m_w_populated, zeroconf_discovery=True
)
assert len(manager_state.workers) == 2
w1 = manager_state.workers["w1"]

n1 = w1.nodes["n1"]
assert isinstance(n1, NodeState)
assert n1.id == "n1"
assert n1.name == "node1"
assert n1.port == 55004
assert n1.fsm == "NULL"

assert n1.registered_methods == {
"m1": RegisteredMethod(
name="func1",
style="concurrent",
params={
"p1": "int",
},
)
}

w2 = manager_state.workers["w2"]
n2 = w2.nodes["n2"]
assert isinstance(n2, NodeState)
assert n2.id == "n2"
assert n2.name == "node2"
assert n2.port == 55005
assert n2.fsm == "INITIALIZED"

assert n2.registered_methods == {
"m2": RegisteredMethod(
name="func2",
style="concurrent",
params={
"p2": "float",
},
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def dummy_pipeline_config(self) -> ChimeraPyPipelineConfig:

def test_pipeline_config(self, dummy_pipeline_config):
assert dummy_pipeline_config.name == "Pipeline"
assert dummy_pipeline_config.description == "A pipeline"
assert dummy_pipeline_config.description == ""
assert dummy_pipeline_config.runtime == 2000

def test_worker_config(self, dummy_pipeline_config):
Expand Down
Loading

0 comments on commit c0ea459

Please sign in to comment.