"""End-to-end integration test for hooks → bus → correlation → response.""" import asyncio import pytest from aiohttp.test_utils import TestClient, TestServer from nanobot.hooks.server import HooksServer from nanobot.bus.queue import MessageBus from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.config.schema import HooksConfig from nanobot.channels.hook import HookChannel @pytest.fixture def bus(): return MessageBus() @pytest.fixture def config(): return HooksConfig( enabled=True, tokens={"gitea": "gitea-secret", "ha": "ha-secret"}, timeout_seconds=5, ) @pytest.fixture def server(bus, config): return HooksServer(host="127.0.0.1", port=0, config=config, bus=bus) async def fake_agent_loop(bus: MessageBus): """Simulate agent loop: consume inbound, process, publish outbound.""" msg = await asyncio.wait_for(bus.consume_inbound(), timeout=3.0) response_content = f"Processed: {msg.content}" await bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=response_content, metadata=msg.metadata or {}, )) async def fake_dispatch_loop(bus: MessageBus, hook_channel: HookChannel): """Simulate outbound dispatcher: consume outbound, resolve correlation, dispatch.""" msg = await asyncio.wait_for(bus.consume_outbound(), timeout=3.0) bus.resolve_correlation(msg) if msg.channel == "hook": await hook_channel.send(msg) @pytest.mark.asyncio async def test_full_hook_flow_default_channel(server, bus): """Hook with default channel: message goes through bus, response returned to HTTP caller.""" hook_channel = HookChannel(bus) client = TestClient(TestServer(server._app)) async with client: async def do_request(): return await client.post( "/hooks", json={"message": "deploy started"}, headers={"Authorization": "Bearer gitea-secret"}, ) # Run request + fake agent + fake dispatcher concurrently request_task = asyncio.create_task(do_request()) agent_task = asyncio.create_task(fake_agent_loop(bus)) dispatch_task = asyncio.create_task(fake_dispatch_loop(bus, hook_channel)) resp = await asyncio.wait_for(request_task, timeout=5.0) await agent_task await dispatch_task assert resp.status == 200 data = await resp.json() assert data["ok"] is True assert "deploy started" in data["response"] @pytest.mark.asyncio async def test_full_hook_flow_telegram_channel(server, bus): """Hook targeting telegram: uses telegram session, response still returned to HTTP caller.""" hook_channel = HookChannel(bus) client = TestClient(TestServer(server._app)) async with client: async def do_request(): return await client.post( "/hooks", json={"message": "doorbell rang", "channel": "telegram", "chat_id": "239824268"}, headers={"Authorization": "Bearer ha-secret"}, ) request_task = asyncio.create_task(do_request()) agent_task = asyncio.create_task(fake_agent_loop(bus)) dispatch_task = asyncio.create_task(fake_dispatch_loop(bus, hook_channel)) resp = await asyncio.wait_for(request_task, timeout=5.0) await agent_task await dispatch_task assert resp.status == 200 data = await resp.json() assert data["ok"] is True assert "doorbell rang" in data["response"] @pytest.mark.asyncio async def test_named_token_identification(server, bus): """Different tokens should produce different hook_source in metadata.""" client = TestClient(TestServer(server._app)) async with client: asyncio.create_task(client.post( "/hooks", json={"message": "from gitea"}, headers={"Authorization": "Bearer gitea-secret"}, )) msg1 = await asyncio.wait_for(bus.consume_inbound(), timeout=2.0) assert msg1.metadata["hook_source"] == "gitea" assert msg1.chat_id == "gitea" asyncio.create_task(client.post( "/hooks", json={"message": "from ha"}, headers={"Authorization": "Bearer ha-secret"}, )) msg2 = await asyncio.wait_for(bus.consume_inbound(), timeout=2.0) assert msg2.metadata["hook_source"] == "ha" assert msg2.chat_id == "ha"