Merge pull request 'fix: update SubagentManager spawn() to match test expectations' (#21) from fix/subagent-manager-tests into main
Build Nanobot OAuth / build (push) Successful in 48s
Build Nanobot OAuth / cleanup (push) Successful in 1s

This commit was merged in pull request #21.
This commit is contained in:
2026-03-05 10:31:54 +01:00
2 changed files with 144 additions and 5 deletions
+5 -5
View File
@@ -73,7 +73,7 @@ class SubagentManager:
origin_metadata: Optional metadata to propagate to announcement (e.g. suppress_output).
Returns:
Status message indicating the subagent was started.
Task ID of the spawned subagent.
"""
task_id = str(uuid.uuid4())[:8]
display_label = label or task[:30] + ("..." if len(task) > 30 else "")
@@ -83,18 +83,18 @@ class SubagentManager:
"chat_id": origin_chat_id,
"metadata": origin_metadata or {},
}
# Create background task
bg_task = asyncio.create_task(
self._run_subagent(task_id, task, display_label, origin, model=model)
)
self._running_tasks[task_id] = bg_task
# Cleanup when done
bg_task.add_done_callback(lambda _: self._running_tasks.pop(task_id, None))
logger.info(f"Spawned subagent [{task_id}]: {display_label}")
return f"Subagent [{display_label}] started. Task ID: {task_id}"
return task_id
async def _run_subagent(
self,
+139
View File
@@ -0,0 +1,139 @@
# tests/test_subagent_wait.py
"""Tests for wait_for_subagents with top-level and child subagents."""
import pytest
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
from nanobot.agent.subagent import SubagentManager
from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMProvider, LLMResponse
@pytest.mark.asyncio
async def test_wait_for_top_level_subagent():
"""Test that wait_for works for top-level subagents spawned from telegram."""
bus = MessageBus()
provider = MagicMock(spec=LLMProvider)
provider.chat = AsyncMock(return_value=LLMResponse(
content="Task completed",
tool_calls=[]
))
provider.get_default_model = MagicMock(return_value="test-model")
provider.thinking_budget = 0
workspace = Path("/tmp/test-subagent")
workspace.mkdir(exist_ok=True)
manager = SubagentManager(
bus=bus,
provider=provider,
workspace=workspace
)
# Spawn a top-level subagent (origin channel = "telegram")
task_id = await manager.spawn(
task="Test task",
label="test",
model=None,
origin_channel="telegram",
origin_chat_id="12345"
)
# Wait for it to complete
result = await manager.wait_for([task_id])
# Should find the result (not "No result found")
assert "No result found" not in result
assert task_id in result
assert "Task completed" in result or "completed" in result.lower()
@pytest.mark.asyncio
async def test_wait_for_child_subagent():
"""Test that wait_for works for child subagents (orchestrator pattern)."""
bus = MessageBus()
provider = MagicMock(spec=LLMProvider)
provider.chat = AsyncMock(return_value=LLMResponse(
content="Child task completed",
tool_calls=[]
))
provider.get_default_model = MagicMock(return_value="test-model")
provider.thinking_budget = 0
workspace = Path("/tmp/test-subagent")
workspace.mkdir(exist_ok=True)
manager = SubagentManager(
bus=bus,
provider=provider,
workspace=workspace
)
# Spawn a child subagent (origin channel = "subagent")
task_id = await manager.spawn(
task="Child test task",
label="test-child",
model=None,
origin_channel="subagent",
origin_chat_id="parent-id"
)
# Wait for it to complete
result = await manager.wait_for([task_id])
# Should find the result (not "No result found")
assert "No result found" not in result
assert task_id in result
assert "Child task completed" in result or "completed" in result.lower()
@pytest.mark.asyncio
async def test_wait_for_multiple_subagents():
"""Test waiting for multiple subagents of different types."""
bus = MessageBus()
call_count = 0
async def chat_response(*args, **kwargs):
nonlocal call_count
call_count += 1
return LLMResponse(content=f"Task {call_count} completed", tool_calls=[])
provider = MagicMock(spec=LLMProvider)
provider.chat = AsyncMock(side_effect=chat_response)
provider.get_default_model = MagicMock(return_value="test-model")
provider.thinking_budget = 0
workspace = Path("/tmp/test-subagent")
workspace.mkdir(exist_ok=True)
manager = SubagentManager(
bus=bus,
provider=provider,
workspace=workspace
)
# Spawn one top-level and one child subagent
task_id_1 = await manager.spawn(
task="Top-level task",
label="test-top",
model=None,
origin_channel="telegram",
origin_chat_id="12345"
)
task_id_2 = await manager.spawn(
task="Child task",
label="test-child",
model=None,
origin_channel="subagent",
origin_chat_id="parent"
)
# Wait for both
result = await manager.wait_for([task_id_1, task_id_2])
# Should find both results
assert "No result found" not in result
assert task_id_1 in result
assert task_id_2 in result
assert "Task 1 completed" in result or "completed" in result.lower()
assert "Task 2 completed" in result or "completed" in result.lower()