diff --git a/src/apps/epic/static/apps/epic/room.js b/src/apps/epic/static/apps/epic/room.js index 8955552..2ba9857 100644 --- a/src/apps/epic/static/apps/epic/room.js +++ b/src/apps/epic/static/apps/epic/room.js @@ -117,17 +117,39 @@ // multi-seat owner never receives the other polarity's countdown. const seatParam = new URLSearchParams(window.location.search).get('seat'); const wsSeat = seatParam ? `?seat=${encodeURIComponent(seatParam)}` : ''; - const ws = new WebSocket(`${wsScheme}://${window.location.host}/ws/room/${roomId}/${wsSeat}`); - window._roomSocket = ws; // exposed for sig-select.js hover broadcast + const wsUrl = `${wsScheme}://${window.location.host}/ws/room/${roomId}/${wsSeat}`; - ws.onmessage = function (event) { - const data = JSON.parse(event.data); - window.dispatchEvent(new CustomEvent('room:' + data.type, { detail: data })); - }; + // Auto-reconnect with capped exponential backoff. A dropped socket (proxy + // idle-timeout, brief network blip, server restart) otherwise stayed dead + // until a manual refresh, silently losing live events — most painfully the + // 12s sig countdown's polarity_room_done / pick_sky_available, so the + // tray→hex animation never played. Backoff resets on a clean open; we stop + // retrying once the page is unloading so we don't reconnect during nav. + let backoff = 1000; + const BACKOFF_MAX = 30000; + let unloading = false; + window.addEventListener('beforeunload', function () { unloading = true; }); - ws.onclose = function (event) { - if (!event.wasClean) { - console.warn('Room WebSocket closed unexpectedly'); - } - }; + function connect() { + const ws = new WebSocket(wsUrl); + window._roomSocket = ws; // exposed for sig-select.js hover broadcast + + ws.onopen = function () { backoff = 1000; }; + + ws.onmessage = function (event) { + const data = JSON.parse(event.data); + window.dispatchEvent(new CustomEvent('room:' + data.type, { detail: data })); + }; + + ws.onclose = function (event) { + if (unloading) return; + if (!event.wasClean) { + console.warn('Room WebSocket closed unexpectedly — reconnecting in ' + backoff + 'ms'); + } + setTimeout(connect, backoff); + backoff = Math.min(backoff * 2, BACKOFF_MAX); + }; + } + + connect(); }()); diff --git a/src/apps/epic/tasks.py b/src/apps/epic/tasks.py index a66a295..52bae4f 100644 --- a/src/apps/epic/tasks.py +++ b/src/apps/epic/tasks.py @@ -1,24 +1,35 @@ """ Countdown scheduler for the polarity-room SAVE SIG gate. -Uses threading.Timer so no separate Celery worker is needed in development. -Single-process only — swap for a Celery task if production uses multiple -web workers (gunicorn -w N with N > 1). +The 12s confirm runs as a **Celery task** (`apply_async` with a `countdown`) so +the post-countdown significator assignment + the `polarity_room_done` / +`pick_sky_available` broadcasts originate in the long-lived Celery worker +process. The old `threading.Timer` approach broadcast via +`async_to_sync(group_send)` from an ephemeral per-call event loop inside the web +process, which is unreliable with the Redis channel layer — the broadcast +reached the consumer only sporadically (the production analog of the +"broadcast must originate in daphne" test trap), so the tray→hex animation +fired intermittently while the server-side state still committed. The worker is +a stable process whose channel-layer singleton is never shared with a serving +loop, so its `group_send` reaches daphne reliably. + +Cancellation / supersession needs no task revocation: the cache token guard +makes a stale queued task a no-op. `schedule_polarity_confirm` writes a fresh +token (superseding any prior queued task), and `cancel_polarity_confirm` just +deletes it — either way a task that fires later finds a missing/replaced token +and returns without assigning. """ -import threading import time import uuid from asgiref.sync import async_to_sync +from celery import shared_task from channels.layers import get_channel_layer from django.core.cache import cache _LEVITY_ROLES = {'PC', 'NC', 'SC'} _GRAVITY_ROLES = {'BC', 'EC', 'AC'} -# In-process registry of pending timers: "{room_id}_{polarity}" → Timer -_timers = {} - def _cache_key(room_id, polarity): return f'sig_countdown_{room_id}_{polarity}' @@ -28,13 +39,15 @@ def _group_send(room_id, msg): async_to_sync(get_channel_layer().group_send)(f'room_{room_id}', msg) -def _fire(room_id, polarity, token): - """Callback run by threading.Timer after the countdown expires.""" - # Token guard: if cancelled or superseded, cache entry will differ. The - # entry is now a {token, deadline} dict (was a bare token string before the - # restore-on-load sprint) — read either shape defensively so a stale plain - # string left by an older deploy doesn't crash the timer callback. +@shared_task +def confirm_polarity_room(room_id, polarity, token): + """Assign significators + broadcast room-done once a polarity's countdown + elapses. Enqueued by `schedule_polarity_confirm` with a `countdown`; run by + the Celery worker. The cache token guard makes a cancelled or superseded + run a no-op (its token no longer matches what's in the cache).""" entry = cache.get(_cache_key(room_id, polarity)) + # The entry is a {token, deadline} dict; tolerate a bare-string token from + # an older deploy so a stale value can't crash the task. stored_token = entry.get('token') if isinstance(entry, dict) else entry if stored_token != token: return @@ -77,35 +90,28 @@ def _fire(room_id, polarity, token): _group_send(room_id, {'type': 'pick_sky_available'}) cache.delete(_cache_key(room_id, polarity)) - _timers.pop(f'{room_id}_{polarity}', None) def schedule_polarity_confirm(room_id, polarity, seconds): - """Schedule a polarity confirm `seconds` seconds from now. Cancels any prior timer.""" - cancel_polarity_confirm(room_id, polarity) - + """Schedule a polarity confirm `seconds` from now. A fresh token supersedes + any prior queued task (which then no-ops on the token guard).""" token = str(uuid.uuid4()) - # Store the absolute deadline alongside the token so a gamer loading a - # fresh seat view mid-countdown can derive the seconds left (the flashing - # numeral) instead of falling back to a static WAIT NVM. See - # countdown_remaining() + sig-select.js's restore-on-load. + # Store the absolute deadline alongside the token so a gamer loading a fresh + # seat view mid-countdown can derive the seconds left (the flashing numeral) + # instead of a static WAIT NVM. See countdown_remaining() + sig-select.js. cache.set( _cache_key(room_id, polarity), {'token': token, 'deadline': time.time() + seconds}, timeout=int(seconds) + 60, ) - - timer = threading.Timer(seconds, _fire, args=[str(room_id), polarity, token]) - timer.daemon = True - timer.start() - _timers[f'{room_id}_{polarity}'] = timer + confirm_polarity_room.apply_async( + args=[str(room_id), polarity, token], countdown=int(seconds), + ) def cancel_polarity_confirm(room_id, polarity): - """Cancel any pending confirm for this room + polarity.""" - timer = _timers.pop(f'{room_id}_{polarity}', None) - if timer: - timer.cancel() + """Cancel any pending confirm for this room + polarity. Deleting the token + is enough — the already-queued task finds no matching token and no-ops.""" cache.delete(_cache_key(room_id, polarity)) diff --git a/src/apps/epic/tests/unit/test_tasks.py b/src/apps/epic/tests/unit/test_tasks.py index 444f6a5..f5f3882 100644 --- a/src/apps/epic/tests/unit/test_tasks.py +++ b/src/apps/epic/tests/unit/test_tasks.py @@ -19,27 +19,22 @@ class CancelPolarityConfirmTest(TestCase): self.user = User.objects.create(email="owner@tasks.io") self.room = Room.objects.create(name="R", owner=self.user) - def test_cancel_with_no_timer_is_a_noop(self): + def test_cancel_with_no_entry_is_a_noop(self): cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) def test_cancel_clears_cache_entry(self): + # Deleting the token is the whole cancel: a queued confirm task that + # fires later finds no matching token and no-ops. from django.core.cache import cache key = _cache_key(str(self.room.id), SigReservation.LEVITY) cache.set(key, "sometoken", timeout=60) cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) self.assertIsNone(cache.get(key)) - @patch("apps.epic.tasks._timers") - def test_cancel_calls_timer_cancel_when_present(self, mock_timers): - mock_timer = MagicMock() - key = f"{self.room.id}_levity" - mock_timers.pop.return_value = mock_timer - cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) - mock_timer.cancel.assert_called_once() - class FireFunctionTest(TestCase): - """Tests for the _fire() callback executed by threading.Timer.""" + """Tests for the confirm_polarity_room() Celery task body (called directly, + synchronously, here — the worker runs it via apply_async in production).""" def setUp(self): self.owner = User.objects.create(email="owner@fire.io") @@ -70,23 +65,23 @@ class FireFunctionTest(TestCase): @patch("apps.epic.tasks._group_send") def test_fire_does_nothing_if_token_mismatch(self, mock_send): - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room self._set_token() - _fire(str(self.room.id), SigReservation.LEVITY, "wrong-token") + confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, "wrong-token") mock_send.assert_not_called() @patch("apps.epic.tasks._group_send") def test_fire_does_nothing_if_room_not_sig_select(self, mock_send): - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room token = self._set_token() self.room.table_status = Room.ROLE_SELECT self.room.save() - _fire(str(self.room.id), SigReservation.LEVITY, token) + confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) mock_send.assert_not_called() @patch("apps.epic.tasks._group_send") def test_fire_does_nothing_if_fewer_than_3_ready(self, mock_send): - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room token = self._set_token() cards = list(TarotCard.objects.all()[:2]) seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC"])) @@ -95,12 +90,12 @@ class FireFunctionTest(TestCase): room=self.room, gamer=seat.gamer, card=cards[i], polarity=SigReservation.LEVITY, seat=seat, ready=True, ) - _fire(str(self.room.id), SigReservation.LEVITY, token) + confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) mock_send.assert_not_called() @patch("apps.epic.tasks._group_send") def test_fire_assigns_significators_and_broadcasts_when_all_ready(self, mock_send): - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room token = self._set_token() cards = list(TarotCard.objects.all()[:3]) seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])) @@ -109,7 +104,7 @@ class FireFunctionTest(TestCase): room=self.room, gamer=seat.gamer, card=cards[i], polarity=SigReservation.LEVITY, seat=seat, ready=True, ) - _fire(str(self.room.id), SigReservation.LEVITY, token) + confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) self.assertTrue(mock_send.called) levity_seats = TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]) for i, seat in enumerate(levity_seats): @@ -117,29 +112,29 @@ class FireFunctionTest(TestCase): self.assertEqual(seat.significator, cards[i]) def test_fire_does_nothing_for_nonexistent_room(self): - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room from django.core.cache import cache fake_id = "00000000-0000-0000-0000-000000000000" token = "known-token" cache.set(_cache_key(fake_id, SigReservation.LEVITY), token, 60) - _fire(fake_id, SigReservation.LEVITY, token) + confirm_polarity_room(fake_id, SigReservation.LEVITY, token) @patch("apps.epic.tasks._group_send") def test_fire_does_nothing_if_all_sigs_already_assigned(self, mock_send): - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room token = self._set_token() cards = list(TarotCard.objects.all()[:3]) seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])) for i, seat in enumerate(seats): seat.significator = cards[i] seat.save(update_fields=["significator"]) - _fire(str(self.room.id), SigReservation.LEVITY, token) + confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) mock_send.assert_not_called() @patch("apps.epic.tasks._group_send") def test_fire_broadcasts_pick_sky_when_all_polarity_sigs_assigned(self, mock_send): """When both levity AND gravity seats all have significators, fire() triggers SKY_SELECT.""" - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room token = self._set_token() cards = list(TarotCard.objects.all()[:6]) # Give gravity seats significators so the all-assigned check passes @@ -155,7 +150,7 @@ class FireFunctionTest(TestCase): room=self.room, gamer=seat.gamer, card=levity_cards[i], polarity=SigReservation.LEVITY, seat=seat, ready=True, ) - _fire(str(self.room.id), SigReservation.LEVITY, token) + confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) call_types = [c.args[1]["type"] for c in mock_send.call_args_list] self.assertIn("polarity_room_done", call_types) self.assertIn("pick_sky_available", call_types) @@ -170,27 +165,30 @@ class SchedulePolarityConfirmTest(TestCase): def test_schedule_sets_cache_token(self): from django.core.cache import cache - schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) - token = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY)) - self.assertIsNotNone(token) + with patch("apps.epic.tasks.confirm_polarity_room.apply_async"): + schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) + entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY)) + self.assertIsNotNone(entry) - def test_schedule_registers_timer(self): - from apps.epic.tasks import _timers - key = f"{self.room.id}_{SigReservation.LEVITY}" - schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) - self.assertIn(key, _timers) - _timers[key].cancel() # clean up + def test_schedule_enqueues_confirm_task_with_countdown(self): + # The deferred confirm is a Celery task fired `seconds` from now (the + # worker process broadcasts reliably, unlike the old timer thread). + with patch("apps.epic.tasks.confirm_polarity_room.apply_async") as mock_async: + schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) + mock_async.assert_called_once() + self.assertEqual(mock_async.call_args.kwargs["countdown"], 12) - def test_schedule_cancels_prior_timer_before_scheduling(self): - from apps.epic.tasks import _timers - schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) - key = f"{self.room.id}_{SigReservation.LEVITY}" - first_timer = _timers.get(key) - schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) - second_timer = _timers.get(key) - self.assertIsNotNone(second_timer) - self.assertIsNot(first_timer, second_timer) - second_timer.cancel() + def test_schedule_supersedes_prior_via_fresh_token(self): + # No task revocation: a new schedule just writes a fresh token, so the + # previously-queued task no-ops on the token guard when it fires. + from django.core.cache import cache + key = _cache_key(str(self.room.id), SigReservation.LEVITY) + with patch("apps.epic.tasks.confirm_polarity_room.apply_async"): + schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) + first = cache.get(key)["token"] + schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) + second = cache.get(key)["token"] + self.assertNotEqual(first, second) class CountdownRemainingTest(TestCase): @@ -206,7 +204,8 @@ class CountdownRemainingTest(TestCase): def tearDown(self): cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) - def test_schedule_stores_deadline_alongside_token(self): + @patch("apps.epic.tasks.confirm_polarity_room.apply_async") + def test_schedule_stores_deadline_alongside_token(self, _async): from django.core.cache import cache schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY)) @@ -214,7 +213,8 @@ class CountdownRemainingTest(TestCase): self.assertIn("token", entry) self.assertIn("deadline", entry) - def test_countdown_remaining_returns_seconds_left(self): + @patch("apps.epic.tasks.confirm_polarity_room.apply_async") + def test_countdown_remaining_returns_seconds_left(self, _async): from apps.epic.tasks import countdown_remaining schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) remaining = countdown_remaining(str(self.room.id), SigReservation.LEVITY) diff --git a/src/core/settings.py b/src/core/settings.py index d837853..a21287f 100644 --- a/src/core/settings.py +++ b/src/core/settings.py @@ -250,3 +250,9 @@ if 'test' in sys.argv: 'BACKEND': 'channels.layers.InMemoryChannelLayer', } } + # In-process Kombu broker so Celery `apply_async` (the polarity-confirm + # countdown) queues without a live Redis + without running the task (no + # worker consumes it) — mirrors the old threading.Timer that was scheduled + # but never fired inside a sub-12s test. NOT eager: eager would ignore the + # countdown and assign significators synchronously during the ready POST. + CELERY_BROKER_URL = 'memory://'