Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions .conda/scripts/post-link.bat

This file was deleted.

9 changes: 0 additions & 9 deletions .conda/scripts/post-link.sh

This file was deleted.

26 changes: 0 additions & 26 deletions .cursorignore

This file was deleted.

19 changes: 19 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
version: 2
updates:
# GitHub Actions
- package-ecosystem: "github-actions"
directory: "/"
target-branch: "dev"
schedule:
interval: "weekly"
day: "monday"
time: "06:00"
open-pull-requests-limit: 5
reviewers:
- "msgcenterpy-team"
labels:
- "dependencies"
- "github-actions"
commit-message:
prefix: "ci"
include: "scope"
52 changes: 52 additions & 0 deletions .github/workflows/ci-check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: CI Check

on:
pull_request:
branches: [main, dev]

jobs:
registry-check:
runs-on: ubuntu-latest

defaults:
run:
shell: bash -l {0}

steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Setup Miniconda
uses: conda-incubator/setup-miniconda@v3
with:
miniconda-version: 'latest'
channels: conda-forge,robostack-staging,uni-lab,defaults
channel-priority: strict
activate-environment: check-env
auto-activate-base: false
auto-update-conda: false
show-channel-urls: true

- name: Install minimal ROS dependencies
run: |
conda install ros-humble-ros-core ros-humble-std-msgs ros-humble-geometry-msgs ros-humble-control-msgs -c robostack-staging -c conda-forge

- name: Install unilabos-msgs and project
run: |
conda install ros-humble-unilabos-msgs -c uni-lab -c robostack-staging -c conda-forge
pip install -e .

- name: Run check mode (complete_registry)
run: |
python -m unilabos --check_mode --skip_env_check

- name: Check for uncommitted changes
run: |
if ! git diff --exit-code; then
echo "::error::检测到文件变化!请先在本地运行 'python -m unilabos --complete_registry' 并提交变更"
echo "变化的文件:"
git diff --name-only
exit 1
fi
echo "检查通过:无文件变化"
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ temp/
output/
unilabos_data/
pyrightconfig.json
.cursorignore
## Python

# Byte-compiled / optimized / DLL files
Expand Down
24 changes: 20 additions & 4 deletions unilabos/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ def parse_args():
default=False,
help="Complete registry information",
)
parser.add_argument(
"--check_mode",
action="store_true",
default=False,
help="Run in check mode for CI: validates registry imports and ensures no file changes",
)
parser.add_argument(
"--no_update_feedback",
action="store_true",
Expand Down Expand Up @@ -314,6 +320,12 @@ def main():
BasicConfig.machine_name = machine_name
BasicConfig.vis_2d_enable = args_dict["2d_vis"]

# Check mode 处理
check_mode = args_dict.get("check_mode", False)
BasicConfig.check_mode = check_mode
if check_mode:
print_status("Check mode 启用,将进行 complete_registry 检查", "info")

from unilabos.resources.graphio import (
read_node_link_json,
read_graphml,
Expand All @@ -331,10 +343,14 @@ def main():
# 显示启动横幅
print_unilab_banner(args_dict)

# 注册表
lab_registry = build_registry(
args_dict["registry_path"], args_dict.get("complete_registry", False), BasicConfig.upload_registry
)
# 注册表 - check_mode 时强制启用 complete_registry
complete_registry = args_dict.get("complete_registry", False) or check_mode
lab_registry = build_registry(args_dict["registry_path"], complete_registry, BasicConfig.upload_registry)

# Check mode: complete_registry 完成后直接退出,git diff 检测由 CI workflow 执行
if check_mode:
print_status("Check mode: complete_registry 完成,退出", "info")
os._exit(0)

if BasicConfig.upload_registry:
# 设备注册到服务端 - 需要 ak 和 sk
Expand Down
4 changes: 2 additions & 2 deletions unilabos/app/web/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ def store_result(
feedback=feedback or {},
timestamp=time.time(),
)
logger.debug(f"[JobResultStore] Stored result for job {job_id[:8]}, status={status}")
logger.trace(f"[JobResultStore] Stored result for job {job_id[:8]}, status={status}")

def get_and_remove(self, job_id: str) -> Optional[JobResult]:
"""获取并删除任务结果"""
with self._results_lock:
result = self._results.pop(job_id, None)
if result:
logger.debug(f"[JobResultStore] Retrieved and removed result for job {job_id[:8]}")
logger.trace(f"[JobResultStore] Retrieved and removed result for job {job_id[:8]}")
return result

def get_result(self, job_id: str) -> Optional[JobResult]:
Expand Down
31 changes: 16 additions & 15 deletions unilabos/app/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def add_queue_request(self, job_info: JobInfo) -> bool:
job_info.set_ready_timeout(10) # 设置10秒超时
self.active_jobs[device_key] = job_info
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.info(f"[DeviceActionManager] Job {job_log} can start immediately for {device_key}")
logger.trace(f"[DeviceActionManager] Job {job_log} can start immediately for {device_key}")
return True

def start_job(self, job_id: str) -> bool:
Expand Down Expand Up @@ -210,8 +210,9 @@ def end_job(self, job_id: str) -> Optional[JobInfo]:
job_info.update_timestamp()
# 从all_jobs中移除已结束的job
del self.all_jobs[job_id]
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.info(f"[DeviceActionManager] Job {job_log} ended for {device_key}")
# job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
# logger.debug(f"[DeviceActionManager] Job {job_log} ended for {device_key}")
pass
else:
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.warning(f"[DeviceActionManager] Job {job_log} was not active for {device_key}")
Expand All @@ -227,7 +228,7 @@ def end_job(self, job_id: str) -> Optional[JobInfo]:
next_job_log = format_job_log(
next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name
)
logger.info(f"[DeviceActionManager] Next job {next_job_log} can start for {device_key}")
logger.trace(f"[DeviceActionManager] Next job {next_job_log} can start for {device_key}")
return next_job

return None
Expand Down Expand Up @@ -268,7 +269,7 @@ def cancel_job(self, job_id: str) -> bool:
# 从all_jobs中移除
del self.all_jobs[job_id]
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.info(f"[DeviceActionManager] Active job {job_log} cancelled for {device_key}")
logger.trace(f"[DeviceActionManager] Active job {job_log} cancelled for {device_key}")

# 启动下一个任务
if device_key in self.device_queues and self.device_queues[device_key]:
Expand All @@ -281,7 +282,7 @@ def cancel_job(self, job_id: str) -> bool:
next_job_log = format_job_log(
next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name
)
logger.info(f"[DeviceActionManager] Next job {next_job_log} can start after cancel")
logger.trace(f"[DeviceActionManager] Next job {next_job_log} can start after cancel")
return True

# 如果是排队中的任务
Expand All @@ -295,7 +296,7 @@ def cancel_job(self, job_id: str) -> bool:
job_log = format_job_log(
job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name
)
logger.info(f"[DeviceActionManager] Queued job {job_log} cancelled for {device_key}")
logger.trace(f"[DeviceActionManager] Queued job {job_log} cancelled for {device_key}")
return True

job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
Expand Down Expand Up @@ -565,7 +566,7 @@ async def _send_handler(self):

async def _process_message(self, message_type: str, message_data: Dict[str, Any]):
"""处理收到的消息"""
logger.debug(f"[MessageProcessor] Processing message: {message_type}")
logger.trace(f"[MessageProcessor] Processing message: {message_type}")

try:
if message_type == "pong":
Expand Down Expand Up @@ -637,13 +638,13 @@ async def _handle_query_action_state(self, data: Dict[str, Any]):
await self._send_action_state_response(
device_id, action_name, task_id, job_id, "query_action_status", True, 0
)
logger.info(f"[MessageProcessor] Job {job_log} can start immediately")
logger.trace(f"[MessageProcessor] Job {job_log} can start immediately")
else:
# 需要排队
await self._send_action_state_response(
device_id, action_name, task_id, job_id, "query_action_status", False, 10
)
logger.info(f"[MessageProcessor] Job {job_log} queued")
logger.trace(f"[MessageProcessor] Job {job_log} queued")

# 通知QueueProcessor有新的队列更新
if self.queue_processor:
Expand Down Expand Up @@ -1128,7 +1129,7 @@ def _send_busy_status(self):
success = self.message_processor.send_message(message)
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
if success:
logger.debug(f"[QueueProcessor] Sent busy/need_more for queued job {job_log}")
logger.trace(f"[QueueProcessor] Sent busy/need_more for queued job {job_log}")
else:
logger.warning(f"[QueueProcessor] Failed to send busy status for job {job_log}")

Expand All @@ -1151,7 +1152,7 @@ def handle_job_completed(self, job_id: str, status: str) -> None:
job_info.action_name,
)

logger.info(f"[QueueProcessor] Job {job_log} completed with status: {status}")
logger.trace(f"[QueueProcessor] Job {job_log} completed with status: {status}")

# 结束任务,获取下一个可执行的任务
next_job = self.device_manager.end_job(job_id)
Expand All @@ -1171,8 +1172,8 @@ def handle_job_completed(self, job_id: str, status: str) -> None:
},
}
self.message_processor.send_message(message)
next_job_log = format_job_log(next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name)
logger.info(f"[QueueProcessor] Notified next job {next_job_log} can start")
# next_job_log = format_job_log(next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name)
# logger.debug(f"[QueueProcessor] Notified next job {next_job_log} can start")

# 立即触发下一轮状态检查
self.notify_queue_update()
Expand Down Expand Up @@ -1314,7 +1315,7 @@ def publish_job_status(
except (KeyError, AttributeError):
logger.warning(f"[WebSocketClient] Failed to remove job {item.job_id} from HostNode status")

logger.info(f"[WebSocketClient] Intercepting final status for job_id: {item.job_id} - {status}")
# logger.debug(f"[WebSocketClient] Intercepting final status for job_id: {item.job_id} - {status}")

# 通知队列处理器job完成(包括timeout的job)
self.queue_processor.handle_job_completed(item.job_id, status)
Expand Down
1 change: 1 addition & 0 deletions unilabos/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class BasicConfig:
startup_json_path = None # 填写绝对路径
disable_browser = False # 禁止浏览器自动打开
port = 8002 # 本地HTTP服务
check_mode = False # CI 检查模式,用于验证 registry 导入和文件一致性
# 'TRACE', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
log_level: Literal["TRACE", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG"

Expand Down
Loading
Loading