Files
python-tdd/src/apps/epic/tests/integrated/test_consumers.py
Disco DeDisco 758c9c5377 COVERAGE: patch 91% → 96%+ — 603 tests, tasks.py at 100%
New/extended tests across billboard, dashboard, drama, epic, gameboard,
and lyric to cover previously untested branches: dev_login view, scroll
position endpoints, sky preview error paths, drama to_prose/to_activity
branches, consumer broadcast handlers, tarot deck draw/shuffle, astrology
model __str__, character model, sig reserve/ready/confirm views, natus
preview/save views, and the full tasks.py countdown scheduler.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 23:23:28 -04:00

321 lines
13 KiB
Python

from channels.db import database_sync_to_async
from channels.testing.websocket import WebsocketCommunicator
from channels.layers import get_channel_layer
from django.test import Client, SimpleTestCase, TransactionTestCase, override_settings, tag
from apps.epic.models import Room, TableSeat
from apps.lyric.models import User
from core.asgi import application
TEST_CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
}
}
@override_settings(CHANNEL_LAYERS=TEST_CHANNEL_LAYERS)
class RoomConsumerTest(SimpleTestCase):
async def test_can_connect_and_disconnect(self):
communicator = WebsocketCommunicator(application, "/ws/room/00000000-0000-0000-0000-000000000001/")
connected, _ = await communicator.connect()
self.assertTrue(connected)
await communicator.disconnect()
async def test_receives_role_select_start_broadcast(self):
communicator = WebsocketCommunicator(application, "/ws/room/00000000-0000-0000-0000-000000000001/")
await communicator.connect()
channel_layer = get_channel_layer()
await channel_layer.group_send(
"room_00000000-0000-0000-0000-000000000001",
{"type": "role_select_start", "slot_order": [1, 2, 3, 4, 5, 6]},
)
response = await communicator.receive_json_from()
self.assertEqual(response["type"], "role_select_start")
self.assertEqual(response["slot_order"], [1, 2, 3, 4, 5, 6])
await communicator.disconnect()
async def test_receives_turn_changed_broadcast(self):
communicator = WebsocketCommunicator(application, "/ws/room/00000000-0000-0000-0000-000000000001/")
await communicator.connect()
channel_layer = get_channel_layer()
await channel_layer.group_send(
"room_00000000-0000-0000-0000-000000000001",
{"type": "turn_changed", "active_slot": 2},
)
response = await communicator.receive_json_from()
self.assertEqual(response["type"], "turn_changed")
self.assertEqual(response["active_slot"], 2)
await communicator.disconnect()
async def test_receives_all_roles_filled_broadcast(self):
communicator = WebsocketCommunicator(application, "/ws/room/00000000-0000-0000-0000-000000000001/")
await communicator.connect()
channel_layer = get_channel_layer()
await channel_layer.group_send(
"room_00000000-0000-0000-0000-000000000001",
{"type": "all_roles_filled"},
)
response = await communicator.receive_json_from()
self.assertEqual(response["type"], "all_roles_filled")
await communicator.disconnect()
async def test_receives_sig_select_started_broadcast(self):
communicator = WebsocketCommunicator(application, "/ws/room/00000000-0000-0000-0000-000000000001/")
await communicator.connect()
channel_layer = get_channel_layer()
await channel_layer.group_send(
"room_00000000-0000-0000-0000-000000000001",
{"type": "sig_select_started"},
)
response = await communicator.receive_json_from()
self.assertEqual(response["type"], "sig_select_started")
await communicator.disconnect()
async def test_receives_gate_update_broadcast(self):
communicator = WebsocketCommunicator(application, "/ws/room/00000000-0000-0000-0000-000000000001/")
await communicator.connect()
channel_layer = get_channel_layer()
await channel_layer.group_send(
"room_00000000-0000-0000-0000-000000000001",
{"type": "gate_update", "gate_state": "some_state"},
)
response = await communicator.receive_json_from()
self.assertEqual(response["type"], "gate_update")
self.assertEqual(response["gate_state"], "some_state")
await communicator.disconnect()
@tag('channels')
@override_settings(CHANNEL_LAYERS=TEST_CHANNEL_LAYERS)
class CursorMoveConsumerTest(TransactionTestCase):
"""Cursor moves are broadcast only within the same polarity group
(levity: PC/NC/SC — gravity: BC/EC/AC)."""
async def _make_communicator(self, user, room):
client = Client()
await database_sync_to_async(client.force_login)(user)
session_key = await database_sync_to_async(lambda: client.session.session_key)()
comm = WebsocketCommunicator(
application,
f"/ws/room/{room.id}/",
headers=[(b"cookie", f"sessionid={session_key}".encode())],
)
connected, _ = await comm.connect()
self.assertTrue(connected)
return comm
async def test_levity_cursor_received_by_fellow_levity_player(self):
pc_user = await database_sync_to_async(User.objects.create)(email="pc@test.io")
nc_user = await database_sync_to_async(User.objects.create)(email="nc@test.io")
room = await database_sync_to_async(Room.objects.create)(name="T", owner=pc_user)
await database_sync_to_async(TableSeat.objects.create)(
room=room, gamer=pc_user, slot_number=1, role="PC"
)
await database_sync_to_async(TableSeat.objects.create)(
room=room, gamer=nc_user, slot_number=2, role="NC"
)
pc_comm = await self._make_communicator(pc_user, room)
nc_comm = await self._make_communicator(nc_user, room)
await pc_comm.send_json_to({"type": "cursor_move", "x": 0.5, "y": 0.3})
msg = await nc_comm.receive_json_from(timeout=2)
self.assertEqual(msg["type"], "cursor_move")
self.assertAlmostEqual(msg["x"], 0.5)
await pc_comm.disconnect()
await nc_comm.disconnect()
async def test_levity_cursor_not_received_by_gravity_player(self):
pc_user = await database_sync_to_async(User.objects.create)(email="pc@test.io")
bc_user = await database_sync_to_async(User.objects.create)(email="bc@test.io")
room = await database_sync_to_async(Room.objects.create)(name="T", owner=pc_user)
await database_sync_to_async(TableSeat.objects.create)(
room=room, gamer=pc_user, slot_number=1, role="PC"
)
await database_sync_to_async(TableSeat.objects.create)(
room=room, gamer=bc_user, slot_number=2, role="BC"
)
pc_comm = await self._make_communicator(pc_user, room)
bc_comm = await self._make_communicator(bc_user, room)
await pc_comm.send_json_to({"type": "cursor_move", "x": 0.5, "y": 0.3})
self.assertTrue(await bc_comm.receive_nothing(timeout=1))
await pc_comm.disconnect()
await bc_comm.disconnect()
@override_settings(CHANNEL_LAYERS=TEST_CHANNEL_LAYERS)
class MissingConsumerHandlersTest(SimpleTestCase):
"""Covers the simple pass-through handlers not exercised by other tests."""
async def _send_and_receive(self, room_path, group_name, msg):
communicator = WebsocketCommunicator(application, room_path)
await communicator.connect()
channel_layer = get_channel_layer()
await channel_layer.group_send(group_name, msg)
response = await communicator.receive_json_from()
await communicator.disconnect()
return response
async def test_receives_sig_selected_broadcast(self):
room_id = "00000000-0000-0000-0000-000000000002"
response = await self._send_and_receive(
f"/ws/room/{room_id}/",
f"room_{room_id}",
{"type": "sig_selected", "card_id": "abc"},
)
self.assertEqual(response["type"], "sig_selected")
async def test_receives_countdown_start_broadcast(self):
room_id = "00000000-0000-0000-0000-000000000002"
response = await self._send_and_receive(
f"/ws/room/{room_id}/",
f"room_{room_id}",
{"type": "countdown_start", "polarity": "levity", "seconds": 12},
)
self.assertEqual(response["type"], "countdown_start")
async def test_receives_countdown_cancel_broadcast(self):
room_id = "00000000-0000-0000-0000-000000000002"
response = await self._send_and_receive(
f"/ws/room/{room_id}/",
f"room_{room_id}",
{"type": "countdown_cancel", "polarity": "levity", "seconds_remaining": 7},
)
self.assertEqual(response["type"], "countdown_cancel")
async def test_receives_polarity_room_done_broadcast(self):
room_id = "00000000-0000-0000-0000-000000000002"
response = await self._send_and_receive(
f"/ws/room/{room_id}/",
f"room_{room_id}",
{"type": "polarity_room_done", "polarity": "levity"},
)
self.assertEqual(response["type"], "polarity_room_done")
async def test_receives_pick_sky_available_broadcast(self):
room_id = "00000000-0000-0000-0000-000000000002"
response = await self._send_and_receive(
f"/ws/room/{room_id}/",
f"room_{room_id}",
{"type": "pick_sky_available"},
)
self.assertEqual(response["type"], "pick_sky_available")
@tag('channels')
@override_settings(CHANNEL_LAYERS=TEST_CHANNEL_LAYERS)
class SigHoverConsumerTest(TransactionTestCase):
"""sig_hover messages sent by a client are forwarded within the polarity group only."""
async def _make_communicator(self, user, room):
client = Client()
await database_sync_to_async(client.force_login)(user)
session_key = await database_sync_to_async(lambda: client.session.session_key)()
comm = WebsocketCommunicator(
application,
f"/ws/room/{room.id}/",
headers=[(b"cookie", f"sessionid={session_key}".encode())],
)
connected, _ = await comm.connect()
self.assertTrue(connected)
return comm
async def test_sig_hover_forwarded_to_polarity_group(self):
pc_user = await database_sync_to_async(User.objects.create)(email="pc@test.io")
nc_user = await database_sync_to_async(User.objects.create)(email="nc@test.io")
room = await database_sync_to_async(Room.objects.create)(name="T", owner=pc_user)
await database_sync_to_async(TableSeat.objects.create)(
room=room, gamer=pc_user, slot_number=1, role="PC"
)
await database_sync_to_async(TableSeat.objects.create)(
room=room, gamer=nc_user, slot_number=2, role="NC"
)
pc_comm = await self._make_communicator(pc_user, room)
nc_comm = await self._make_communicator(nc_user, room)
await pc_comm.send_json_to({
"type": "sig_hover", "card_id": "abc-123", "role": "PC", "active": True
})
msg = await nc_comm.receive_json_from(timeout=2)
self.assertEqual(msg["type"], "sig_hover")
self.assertEqual(msg["card_id"], "abc-123")
self.assertEqual(msg["role"], "PC")
self.assertTrue(msg["active"])
await pc_comm.disconnect()
await nc_comm.disconnect()
async def test_sig_hover_not_forwarded_to_other_polarity(self):
pc_user = await database_sync_to_async(User.objects.create)(email="pc@test.io")
bc_user = await database_sync_to_async(User.objects.create)(email="bc@test.io")
room = await database_sync_to_async(Room.objects.create)(name="T", owner=pc_user)
await database_sync_to_async(TableSeat.objects.create)(
room=room, gamer=pc_user, slot_number=1, role="PC"
)
await database_sync_to_async(TableSeat.objects.create)(
room=room, gamer=bc_user, slot_number=2, role="BC"
)
pc_comm = await self._make_communicator(pc_user, room)
bc_comm = await self._make_communicator(bc_user, room)
await pc_comm.send_json_to({
"type": "sig_hover", "card_id": "abc-123", "role": "PC", "active": True
})
self.assertTrue(await bc_comm.receive_nothing(timeout=1))
await pc_comm.disconnect()
await bc_comm.disconnect()
async def test_sig_reserved_broadcast_received_by_polarity_group(self):
pc_user = await database_sync_to_async(User.objects.create)(email="pc@test.io")
nc_user = await database_sync_to_async(User.objects.create)(email="nc@test.io")
room = await database_sync_to_async(Room.objects.create)(name="T", owner=pc_user)
await database_sync_to_async(TableSeat.objects.create)(
room=room, gamer=pc_user, slot_number=1, role="PC"
)
await database_sync_to_async(TableSeat.objects.create)(
room=room, gamer=nc_user, slot_number=2, role="NC"
)
nc_comm = await self._make_communicator(nc_user, room)
channel_layer = get_channel_layer()
await channel_layer.group_send(
f"cursors_{room.id}_levity",
{"type": "sig_reserved", "card_id": "card-xyz", "role": "PC", "reserved": True},
)
msg = await nc_comm.receive_json_from(timeout=2)
self.assertEqual(msg["type"], "sig_reserved")
self.assertEqual(msg["card_id"], "card-xyz")
self.assertTrue(msg["reserved"])
await nc_comm.disconnect()