Skip to content

Commit

Permalink
Misc bug fixes for incorporating upstream changes. Closes #158 (#159)
Browse files Browse the repository at this point in the history
* Misc bug fixes for incorporating upstream changes. Closes #158

- [x] incorporate `worker.id` while creating local workers.
- [x] start manager before the `record` stage.

* Fix test cases
  • Loading branch information
umesh-timalsina authored Aug 17, 2023
1 parent 16c5962 commit 6ffed59
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
17 changes: 13 additions & 4 deletions chimerapy/orchestrator/cli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@
import time
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from pathlib import Path
from typing import Iterable

from chimerapy.engine import Manager
from chimerapy.engine.config import set
from chimerapy.orchestrator.models.pipeline_config import (
ChimeraPyPipelineConfig,
)
from chimerapy.orchestrator.orchestrator_config import OrchestratorConfig


def orchestrate(config: ChimeraPyPipelineConfig):
manager, pipeline, mappings, remote_workers = config.pipeline_graph()

print("Waiting for remote workers to connect...")
def _wait_for_workers(manager: Manager, remote_workers: Iterable[str]):
while True:
if all(
[
Expand All @@ -25,6 +24,13 @@ def orchestrate(config: ChimeraPyPipelineConfig):
print("All remote workers connected!")
break


def orchestrate(config: ChimeraPyPipelineConfig):
manager, pipeline, mappings, remote_workers = config.pipeline_graph()

# Wait until workers connect
_wait_for_workers(manager, remote_workers)

# Commit the graph
manager.commit_graph(graph=pipeline, mapping=mappings).result(
timeout=config.timeouts.commit_timeout
Expand All @@ -39,6 +45,9 @@ def orchestrate(config: ChimeraPyPipelineConfig):
if q.lower() == "y":
break

if config.mode == "record":
manager.start().result(timeout=config.timeouts.preview_timeout)

manager.record().result(timeout=config.timeouts.record_timeout)

# Wait until user stops
Expand Down
2 changes: 1 addition & 1 deletion chimerapy/orchestrator/models/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def pipeline_graph(
remote_workers = set()
for wc in self.workers.instances:
if not wc.remote:
wo = cpe.Worker(name=wc.name, port=0)
wo = cpe.Worker(name=wc.name, id=wc.id, port=0)
workers[wo.name] = wo
else:
remote_workers.add(wc.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async def test_pipeline_operations(self, cluster_manager, pipeline_test):

# Preview pipeline
preview_result = await cluster_manager.preview_pipeline()
await asyncio.sleep(2) # 2 Seconds to preview
await asyncio.sleep(2) # 5 Seconds to preview
assert preview_result.ok().is_some()
assert cluster_manager.current_state.name == "PREVIEWING"

Expand All @@ -134,7 +134,7 @@ async def test_pipeline_operations(self, cluster_manager, pipeline_test):

# Stop and Back to preview
stop_result = await cluster_manager.stop_pipeline()
await asyncio.sleep(2) # 2 Seconds to stop
await asyncio.sleep(5) # 5 Seconds to stop
assert stop_result.ok().is_some()
assert cluster_manager.current_state.name == "STOPPED"

Expand All @@ -144,13 +144,13 @@ async def test_pipeline_operations(self, cluster_manager, pipeline_test):
assert cluster_manager.current_state.name == "COLLECTED"

preview_result = await cluster_manager.preview_pipeline()
await asyncio.sleep(2) # 2 Seconds to preview
await asyncio.sleep(5) # 5 Seconds to preview
assert preview_result.ok().is_some()
assert cluster_manager.current_state.name == "PREVIEWING"

# Reset pipeline
reset_result = await cluster_manager.reset_pipeline()
await asyncio.sleep(10) # 10 Seconds to reset
await asyncio.sleep(20) # 20 Seconds to reset
assert reset_result.ok().is_some()
assert cluster_manager.current_state.name == "INITIALIZED"
assert cluster_manager._active_pipeline is None
Expand Down

0 comments on commit 6ffed59

Please sign in to comment.