From a6ce20761b731737771f569f631112d0a2632d32 Mon Sep 17 00:00:00 2001 From: Disco DeDisco Date: Fri, 5 Jun 2026 14:52:21 -0400 Subject: [PATCH] =?UTF-8?q?Sig=20countdown:=20run=20the=20post-countdown?= =?UTF-8?q?=20confirm=20as=20a=20Celery=20task=20+=20auto-reconnect=20the?= =?UTF-8?q?=20room=20WS=20=E2=80=94=20TDD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The flaky tray→thumbnail→hex animation after the 12s countdown. Root cause: the confirm ran in a threading.Timer thread inside the web process, and _fire broadcast polarity_room_done / pick_sky_available via async_to_sync(group_send) from an ephemeral per-call event loop. With the Redis channel layer that publish is unreliable across loops (the production analog of the "broadcast must originate in daphne" test trap), so the live events reached the client only sporadically — the server-side state (sig assignment, SKY_SELECT) still committed, which is why a refresh always showed the concluded hex but the animation usually didn't play. Fix (chosen: migrate to Celery — the path the tasks.py docstring already called for): _fire becomes the @shared_task confirm_polarity_room, enqueued by schedule_polarity_confirm via apply_async(countdown=seconds). The worker is a stable long-lived process whose channel-layer singleton is never shared with a serving loop, so its group_send reaches daphne reliably; it also survives web-worker restarts. No task revocation needed — cancellation/supersession ride the existing cache token guard (cancel just deletes the token; a stale queued task no-ops). Dropped threading.Timer + the _timers registry. Test settings get CELERY_BROKER_URL='memory://' so apply_async queues without a live Redis and without running the task (no worker) — mirrors the old timer that was scheduled but never fired inside a sub-12s test. NOT eager: eager would ignore the countdown and assign significators synchronously during the ready POST. test_tasks rewritten: confirm_polarity_room called directly (task body), schedule asserts the enqueue + countdown + fresh-token supersession; the broadcast itself stays IT-uncoverable under InMemory (known channels limit). Also: room.js now auto-reconnects the room WebSocket with capped exponential backoff (1s→30s, reset on open, halted on beforeunload). A dropped socket (proxy idle-timeout, blip, server restart) previously stayed dead until a manual refresh, silently losing every live event — an independent reliability gap that compounded the "sporadic" feel. 602 epic ITs + 18 task UTs green. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/apps/epic/static/apps/epic/room.js | 44 +++++++++---- src/apps/epic/tasks.py | 66 ++++++++++--------- src/apps/epic/tests/unit/test_tasks.py | 90 +++++++++++++------------- src/core/settings.py | 6 ++ 4 files changed, 120 insertions(+), 86 deletions(-) diff --git a/src/apps/epic/static/apps/epic/room.js b/src/apps/epic/static/apps/epic/room.js index 8955552..2ba9857 100644 --- a/src/apps/epic/static/apps/epic/room.js +++ b/src/apps/epic/static/apps/epic/room.js @@ -117,17 +117,39 @@ // multi-seat owner never receives the other polarity's countdown. const seatParam = new URLSearchParams(window.location.search).get('seat'); const wsSeat = seatParam ? `?seat=${encodeURIComponent(seatParam)}` : ''; - const ws = new WebSocket(`${wsScheme}://${window.location.host}/ws/room/${roomId}/${wsSeat}`); - window._roomSocket = ws; // exposed for sig-select.js hover broadcast + const wsUrl = `${wsScheme}://${window.location.host}/ws/room/${roomId}/${wsSeat}`; - ws.onmessage = function (event) { - const data = JSON.parse(event.data); - window.dispatchEvent(new CustomEvent('room:' + data.type, { detail: data })); - }; + // Auto-reconnect with capped exponential backoff. A dropped socket (proxy + // idle-timeout, brief network blip, server restart) otherwise stayed dead + // until a manual refresh, silently losing live events — most painfully the + // 12s sig countdown's polarity_room_done / pick_sky_available, so the + // tray→hex animation never played. Backoff resets on a clean open; we stop + // retrying once the page is unloading so we don't reconnect during nav. + let backoff = 1000; + const BACKOFF_MAX = 30000; + let unloading = false; + window.addEventListener('beforeunload', function () { unloading = true; }); - ws.onclose = function (event) { - if (!event.wasClean) { - console.warn('Room WebSocket closed unexpectedly'); - } - }; + function connect() { + const ws = new WebSocket(wsUrl); + window._roomSocket = ws; // exposed for sig-select.js hover broadcast + + ws.onopen = function () { backoff = 1000; }; + + ws.onmessage = function (event) { + const data = JSON.parse(event.data); + window.dispatchEvent(new CustomEvent('room:' + data.type, { detail: data })); + }; + + ws.onclose = function (event) { + if (unloading) return; + if (!event.wasClean) { + console.warn('Room WebSocket closed unexpectedly — reconnecting in ' + backoff + 'ms'); + } + setTimeout(connect, backoff); + backoff = Math.min(backoff * 2, BACKOFF_MAX); + }; + } + + connect(); }()); diff --git a/src/apps/epic/tasks.py b/src/apps/epic/tasks.py index a66a295..52bae4f 100644 --- a/src/apps/epic/tasks.py +++ b/src/apps/epic/tasks.py @@ -1,24 +1,35 @@ """ Countdown scheduler for the polarity-room SAVE SIG gate. -Uses threading.Timer so no separate Celery worker is needed in development. -Single-process only — swap for a Celery task if production uses multiple -web workers (gunicorn -w N with N > 1). +The 12s confirm runs as a **Celery task** (`apply_async` with a `countdown`) so +the post-countdown significator assignment + the `polarity_room_done` / +`pick_sky_available` broadcasts originate in the long-lived Celery worker +process. The old `threading.Timer` approach broadcast via +`async_to_sync(group_send)` from an ephemeral per-call event loop inside the web +process, which is unreliable with the Redis channel layer — the broadcast +reached the consumer only sporadically (the production analog of the +"broadcast must originate in daphne" test trap), so the tray→hex animation +fired intermittently while the server-side state still committed. The worker is +a stable process whose channel-layer singleton is never shared with a serving +loop, so its `group_send` reaches daphne reliably. + +Cancellation / supersession needs no task revocation: the cache token guard +makes a stale queued task a no-op. `schedule_polarity_confirm` writes a fresh +token (superseding any prior queued task), and `cancel_polarity_confirm` just +deletes it — either way a task that fires later finds a missing/replaced token +and returns without assigning. """ -import threading import time import uuid from asgiref.sync import async_to_sync +from celery import shared_task from channels.layers import get_channel_layer from django.core.cache import cache _LEVITY_ROLES = {'PC', 'NC', 'SC'} _GRAVITY_ROLES = {'BC', 'EC', 'AC'} -# In-process registry of pending timers: "{room_id}_{polarity}" → Timer -_timers = {} - def _cache_key(room_id, polarity): return f'sig_countdown_{room_id}_{polarity}' @@ -28,13 +39,15 @@ def _group_send(room_id, msg): async_to_sync(get_channel_layer().group_send)(f'room_{room_id}', msg) -def _fire(room_id, polarity, token): - """Callback run by threading.Timer after the countdown expires.""" - # Token guard: if cancelled or superseded, cache entry will differ. The - # entry is now a {token, deadline} dict (was a bare token string before the - # restore-on-load sprint) — read either shape defensively so a stale plain - # string left by an older deploy doesn't crash the timer callback. +@shared_task +def confirm_polarity_room(room_id, polarity, token): + """Assign significators + broadcast room-done once a polarity's countdown + elapses. Enqueued by `schedule_polarity_confirm` with a `countdown`; run by + the Celery worker. The cache token guard makes a cancelled or superseded + run a no-op (its token no longer matches what's in the cache).""" entry = cache.get(_cache_key(room_id, polarity)) + # The entry is a {token, deadline} dict; tolerate a bare-string token from + # an older deploy so a stale value can't crash the task. stored_token = entry.get('token') if isinstance(entry, dict) else entry if stored_token != token: return @@ -77,35 +90,28 @@ def _fire(room_id, polarity, token): _group_send(room_id, {'type': 'pick_sky_available'}) cache.delete(_cache_key(room_id, polarity)) - _timers.pop(f'{room_id}_{polarity}', None) def schedule_polarity_confirm(room_id, polarity, seconds): - """Schedule a polarity confirm `seconds` seconds from now. Cancels any prior timer.""" - cancel_polarity_confirm(room_id, polarity) - + """Schedule a polarity confirm `seconds` from now. A fresh token supersedes + any prior queued task (which then no-ops on the token guard).""" token = str(uuid.uuid4()) - # Store the absolute deadline alongside the token so a gamer loading a - # fresh seat view mid-countdown can derive the seconds left (the flashing - # numeral) instead of falling back to a static WAIT NVM. See - # countdown_remaining() + sig-select.js's restore-on-load. + # Store the absolute deadline alongside the token so a gamer loading a fresh + # seat view mid-countdown can derive the seconds left (the flashing numeral) + # instead of a static WAIT NVM. See countdown_remaining() + sig-select.js. cache.set( _cache_key(room_id, polarity), {'token': token, 'deadline': time.time() + seconds}, timeout=int(seconds) + 60, ) - - timer = threading.Timer(seconds, _fire, args=[str(room_id), polarity, token]) - timer.daemon = True - timer.start() - _timers[f'{room_id}_{polarity}'] = timer + confirm_polarity_room.apply_async( + args=[str(room_id), polarity, token], countdown=int(seconds), + ) def cancel_polarity_confirm(room_id, polarity): - """Cancel any pending confirm for this room + polarity.""" - timer = _timers.pop(f'{room_id}_{polarity}', None) - if timer: - timer.cancel() + """Cancel any pending confirm for this room + polarity. Deleting the token + is enough — the already-queued task finds no matching token and no-ops.""" cache.delete(_cache_key(room_id, polarity)) diff --git a/src/apps/epic/tests/unit/test_tasks.py b/src/apps/epic/tests/unit/test_tasks.py index 444f6a5..f5f3882 100644 --- a/src/apps/epic/tests/unit/test_tasks.py +++ b/src/apps/epic/tests/unit/test_tasks.py @@ -19,27 +19,22 @@ class CancelPolarityConfirmTest(TestCase): self.user = User.objects.create(email="owner@tasks.io") self.room = Room.objects.create(name="R", owner=self.user) - def test_cancel_with_no_timer_is_a_noop(self): + def test_cancel_with_no_entry_is_a_noop(self): cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) def test_cancel_clears_cache_entry(self): + # Deleting the token is the whole cancel: a queued confirm task that + # fires later finds no matching token and no-ops. from django.core.cache import cache key = _cache_key(str(self.room.id), SigReservation.LEVITY) cache.set(key, "sometoken", timeout=60) cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) self.assertIsNone(cache.get(key)) - @patch("apps.epic.tasks._timers") - def test_cancel_calls_timer_cancel_when_present(self, mock_timers): - mock_timer = MagicMock() - key = f"{self.room.id}_levity" - mock_timers.pop.return_value = mock_timer - cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) - mock_timer.cancel.assert_called_once() - class FireFunctionTest(TestCase): - """Tests for the _fire() callback executed by threading.Timer.""" + """Tests for the confirm_polarity_room() Celery task body (called directly, + synchronously, here — the worker runs it via apply_async in production).""" def setUp(self): self.owner = User.objects.create(email="owner@fire.io") @@ -70,23 +65,23 @@ class FireFunctionTest(TestCase): @patch("apps.epic.tasks._group_send") def test_fire_does_nothing_if_token_mismatch(self, mock_send): - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room self._set_token() - _fire(str(self.room.id), SigReservation.LEVITY, "wrong-token") + confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, "wrong-token") mock_send.assert_not_called() @patch("apps.epic.tasks._group_send") def test_fire_does_nothing_if_room_not_sig_select(self, mock_send): - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room token = self._set_token() self.room.table_status = Room.ROLE_SELECT self.room.save() - _fire(str(self.room.id), SigReservation.LEVITY, token) + confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) mock_send.assert_not_called() @patch("apps.epic.tasks._group_send") def test_fire_does_nothing_if_fewer_than_3_ready(self, mock_send): - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room token = self._set_token() cards = list(TarotCard.objects.all()[:2]) seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC"])) @@ -95,12 +90,12 @@ class FireFunctionTest(TestCase): room=self.room, gamer=seat.gamer, card=cards[i], polarity=SigReservation.LEVITY, seat=seat, ready=True, ) - _fire(str(self.room.id), SigReservation.LEVITY, token) + confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) mock_send.assert_not_called() @patch("apps.epic.tasks._group_send") def test_fire_assigns_significators_and_broadcasts_when_all_ready(self, mock_send): - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room token = self._set_token() cards = list(TarotCard.objects.all()[:3]) seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])) @@ -109,7 +104,7 @@ class FireFunctionTest(TestCase): room=self.room, gamer=seat.gamer, card=cards[i], polarity=SigReservation.LEVITY, seat=seat, ready=True, ) - _fire(str(self.room.id), SigReservation.LEVITY, token) + confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) self.assertTrue(mock_send.called) levity_seats = TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]) for i, seat in enumerate(levity_seats): @@ -117,29 +112,29 @@ class FireFunctionTest(TestCase): self.assertEqual(seat.significator, cards[i]) def test_fire_does_nothing_for_nonexistent_room(self): - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room from django.core.cache import cache fake_id = "00000000-0000-0000-0000-000000000000" token = "known-token" cache.set(_cache_key(fake_id, SigReservation.LEVITY), token, 60) - _fire(fake_id, SigReservation.LEVITY, token) + confirm_polarity_room(fake_id, SigReservation.LEVITY, token) @patch("apps.epic.tasks._group_send") def test_fire_does_nothing_if_all_sigs_already_assigned(self, mock_send): - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room token = self._set_token() cards = list(TarotCard.objects.all()[:3]) seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])) for i, seat in enumerate(seats): seat.significator = cards[i] seat.save(update_fields=["significator"]) - _fire(str(self.room.id), SigReservation.LEVITY, token) + confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) mock_send.assert_not_called() @patch("apps.epic.tasks._group_send") def test_fire_broadcasts_pick_sky_when_all_polarity_sigs_assigned(self, mock_send): """When both levity AND gravity seats all have significators, fire() triggers SKY_SELECT.""" - from apps.epic.tasks import _fire + from apps.epic.tasks import confirm_polarity_room token = self._set_token() cards = list(TarotCard.objects.all()[:6]) # Give gravity seats significators so the all-assigned check passes @@ -155,7 +150,7 @@ class FireFunctionTest(TestCase): room=self.room, gamer=seat.gamer, card=levity_cards[i], polarity=SigReservation.LEVITY, seat=seat, ready=True, ) - _fire(str(self.room.id), SigReservation.LEVITY, token) + confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) call_types = [c.args[1]["type"] for c in mock_send.call_args_list] self.assertIn("polarity_room_done", call_types) self.assertIn("pick_sky_available", call_types) @@ -170,27 +165,30 @@ class SchedulePolarityConfirmTest(TestCase): def test_schedule_sets_cache_token(self): from django.core.cache import cache - schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) - token = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY)) - self.assertIsNotNone(token) + with patch("apps.epic.tasks.confirm_polarity_room.apply_async"): + schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) + entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY)) + self.assertIsNotNone(entry) - def test_schedule_registers_timer(self): - from apps.epic.tasks import _timers - key = f"{self.room.id}_{SigReservation.LEVITY}" - schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) - self.assertIn(key, _timers) - _timers[key].cancel() # clean up + def test_schedule_enqueues_confirm_task_with_countdown(self): + # The deferred confirm is a Celery task fired `seconds` from now (the + # worker process broadcasts reliably, unlike the old timer thread). + with patch("apps.epic.tasks.confirm_polarity_room.apply_async") as mock_async: + schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) + mock_async.assert_called_once() + self.assertEqual(mock_async.call_args.kwargs["countdown"], 12) - def test_schedule_cancels_prior_timer_before_scheduling(self): - from apps.epic.tasks import _timers - schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) - key = f"{self.room.id}_{SigReservation.LEVITY}" - first_timer = _timers.get(key) - schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) - second_timer = _timers.get(key) - self.assertIsNotNone(second_timer) - self.assertIsNot(first_timer, second_timer) - second_timer.cancel() + def test_schedule_supersedes_prior_via_fresh_token(self): + # No task revocation: a new schedule just writes a fresh token, so the + # previously-queued task no-ops on the token guard when it fires. + from django.core.cache import cache + key = _cache_key(str(self.room.id), SigReservation.LEVITY) + with patch("apps.epic.tasks.confirm_polarity_room.apply_async"): + schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) + first = cache.get(key)["token"] + schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) + second = cache.get(key)["token"] + self.assertNotEqual(first, second) class CountdownRemainingTest(TestCase): @@ -206,7 +204,8 @@ class CountdownRemainingTest(TestCase): def tearDown(self): cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) - def test_schedule_stores_deadline_alongside_token(self): + @patch("apps.epic.tasks.confirm_polarity_room.apply_async") + def test_schedule_stores_deadline_alongside_token(self, _async): from django.core.cache import cache schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY)) @@ -214,7 +213,8 @@ class CountdownRemainingTest(TestCase): self.assertIn("token", entry) self.assertIn("deadline", entry) - def test_countdown_remaining_returns_seconds_left(self): + @patch("apps.epic.tasks.confirm_polarity_room.apply_async") + def test_countdown_remaining_returns_seconds_left(self, _async): from apps.epic.tasks import countdown_remaining schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) remaining = countdown_remaining(str(self.room.id), SigReservation.LEVITY) diff --git a/src/core/settings.py b/src/core/settings.py index d837853..a21287f 100644 --- a/src/core/settings.py +++ b/src/core/settings.py @@ -250,3 +250,9 @@ if 'test' in sys.argv: 'BACKEND': 'channels.layers.InMemoryChannelLayer', } } + # In-process Kombu broker so Celery `apply_async` (the polarity-confirm + # countdown) queues without a live Redis + without running the task (no + # worker consumes it) — mirrors the old threading.Timer that was scheduled + # but never fired inside a sub-12s test. NOT eager: eager would ignore the + # countdown and assign significators synchronously during the ready POST. + CELERY_BROKER_URL = 'memory://'