Sig countdown: run the post-countdown confirm as a Celery task + auto-reconnect the room WS — TDD

The flaky tray→thumbnail→hex animation after the 12s countdown.

Root cause: the confirm ran in a threading.Timer thread inside the web process,
and _fire broadcast polarity_room_done / pick_sky_available via
async_to_sync(group_send) from an ephemeral per-call event loop. With the Redis
channel layer that publish is unreliable across loops (the production analog of
the "broadcast must originate in daphne" test trap), so the live events reached
the client only sporadically — the server-side state (sig assignment,
SKY_SELECT) still committed, which is why a refresh always showed the concluded
hex but the animation usually didn't play.

Fix (chosen: migrate to Celery — the path the tasks.py docstring already called
for): _fire becomes the @shared_task confirm_polarity_room, enqueued by
schedule_polarity_confirm via apply_async(countdown=seconds). The worker is a
stable long-lived process whose channel-layer singleton is never shared with a
serving loop, so its group_send reaches daphne reliably; it also survives
web-worker restarts. No task revocation needed — cancellation/supersession ride
the existing cache token guard (cancel just deletes the token; a stale queued
task no-ops). Dropped threading.Timer + the _timers registry.

Test settings get CELERY_BROKER_URL='memory://' so apply_async queues without a
live Redis and without running the task (no worker) — mirrors the old timer that
was scheduled but never fired inside a sub-12s test. NOT eager: eager would
ignore the countdown and assign significators synchronously during the ready
POST. test_tasks rewritten: confirm_polarity_room called directly (task body),
schedule asserts the enqueue + countdown + fresh-token supersession; the
broadcast itself stays IT-uncoverable under InMemory (known channels limit).

Also: room.js now auto-reconnects the room WebSocket with capped exponential
backoff (1s→30s, reset on open, halted on beforeunload). A dropped socket (proxy
idle-timeout, blip, server restart) previously stayed dead until a manual
refresh, silently losing every live event — an independent reliability gap that
compounded the "sporadic" feel.

602 epic ITs + 18 task UTs green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Disco DeDisco
2026-06-05 14:52:21 -04:00
parent f3f509a59a
commit a6ce20761b
4 changed files with 120 additions and 86 deletions

View File

@@ -117,17 +117,39 @@
// multi-seat owner never receives the other polarity's countdown. // multi-seat owner never receives the other polarity's countdown.
const seatParam = new URLSearchParams(window.location.search).get('seat'); const seatParam = new URLSearchParams(window.location.search).get('seat');
const wsSeat = seatParam ? `?seat=${encodeURIComponent(seatParam)}` : ''; const wsSeat = seatParam ? `?seat=${encodeURIComponent(seatParam)}` : '';
const ws = new WebSocket(`${wsScheme}://${window.location.host}/ws/room/${roomId}/${wsSeat}`); const wsUrl = `${wsScheme}://${window.location.host}/ws/room/${roomId}/${wsSeat}`;
// Auto-reconnect with capped exponential backoff. A dropped socket (proxy
// idle-timeout, brief network blip, server restart) otherwise stayed dead
// until a manual refresh, silently losing live events — most painfully the
// 12s sig countdown's polarity_room_done / pick_sky_available, so the
// tray→hex animation never played. Backoff resets on a clean open; we stop
// retrying once the page is unloading so we don't reconnect during nav.
let backoff = 1000;
const BACKOFF_MAX = 30000;
let unloading = false;
window.addEventListener('beforeunload', function () { unloading = true; });
function connect() {
const ws = new WebSocket(wsUrl);
window._roomSocket = ws; // exposed for sig-select.js hover broadcast window._roomSocket = ws; // exposed for sig-select.js hover broadcast
ws.onopen = function () { backoff = 1000; };
ws.onmessage = function (event) { ws.onmessage = function (event) {
const data = JSON.parse(event.data); const data = JSON.parse(event.data);
window.dispatchEvent(new CustomEvent('room:' + data.type, { detail: data })); window.dispatchEvent(new CustomEvent('room:' + data.type, { detail: data }));
}; };
ws.onclose = function (event) { ws.onclose = function (event) {
if (unloading) return;
if (!event.wasClean) { if (!event.wasClean) {
console.warn('Room WebSocket closed unexpectedly'); console.warn('Room WebSocket closed unexpectedly — reconnecting in ' + backoff + 'ms');
} }
setTimeout(connect, backoff);
backoff = Math.min(backoff * 2, BACKOFF_MAX);
}; };
}
connect();
}()); }());

View File

@@ -1,24 +1,35 @@
""" """
Countdown scheduler for the polarity-room SAVE SIG gate. Countdown scheduler for the polarity-room SAVE SIG gate.
Uses threading.Timer so no separate Celery worker is needed in development. The 12s confirm runs as a **Celery task** (`apply_async` with a `countdown`) so
Single-process only — swap for a Celery task if production uses multiple the post-countdown significator assignment + the `polarity_room_done` /
web workers (gunicorn -w N with N > 1). `pick_sky_available` broadcasts originate in the long-lived Celery worker
process. The old `threading.Timer` approach broadcast via
`async_to_sync(group_send)` from an ephemeral per-call event loop inside the web
process, which is unreliable with the Redis channel layer — the broadcast
reached the consumer only sporadically (the production analog of the
"broadcast must originate in daphne" test trap), so the tray→hex animation
fired intermittently while the server-side state still committed. The worker is
a stable process whose channel-layer singleton is never shared with a serving
loop, so its `group_send` reaches daphne reliably.
Cancellation / supersession needs no task revocation: the cache token guard
makes a stale queued task a no-op. `schedule_polarity_confirm` writes a fresh
token (superseding any prior queued task), and `cancel_polarity_confirm` just
deletes it — either way a task that fires later finds a missing/replaced token
and returns without assigning.
""" """
import threading
import time import time
import uuid import uuid
from asgiref.sync import async_to_sync from asgiref.sync import async_to_sync
from celery import shared_task
from channels.layers import get_channel_layer from channels.layers import get_channel_layer
from django.core.cache import cache from django.core.cache import cache
_LEVITY_ROLES = {'PC', 'NC', 'SC'} _LEVITY_ROLES = {'PC', 'NC', 'SC'}
_GRAVITY_ROLES = {'BC', 'EC', 'AC'} _GRAVITY_ROLES = {'BC', 'EC', 'AC'}
# In-process registry of pending timers: "{room_id}_{polarity}" → Timer
_timers = {}
def _cache_key(room_id, polarity): def _cache_key(room_id, polarity):
return f'sig_countdown_{room_id}_{polarity}' return f'sig_countdown_{room_id}_{polarity}'
@@ -28,13 +39,15 @@ def _group_send(room_id, msg):
async_to_sync(get_channel_layer().group_send)(f'room_{room_id}', msg) async_to_sync(get_channel_layer().group_send)(f'room_{room_id}', msg)
def _fire(room_id, polarity, token): @shared_task
"""Callback run by threading.Timer after the countdown expires.""" def confirm_polarity_room(room_id, polarity, token):
# Token guard: if cancelled or superseded, cache entry will differ. The """Assign significators + broadcast room-done once a polarity's countdown
# entry is now a {token, deadline} dict (was a bare token string before the elapses. Enqueued by `schedule_polarity_confirm` with a `countdown`; run by
# restore-on-load sprint) — read either shape defensively so a stale plain the Celery worker. The cache token guard makes a cancelled or superseded
# string left by an older deploy doesn't crash the timer callback. run a no-op (its token no longer matches what's in the cache)."""
entry = cache.get(_cache_key(room_id, polarity)) entry = cache.get(_cache_key(room_id, polarity))
# The entry is a {token, deadline} dict; tolerate a bare-string token from
# an older deploy so a stale value can't crash the task.
stored_token = entry.get('token') if isinstance(entry, dict) else entry stored_token = entry.get('token') if isinstance(entry, dict) else entry
if stored_token != token: if stored_token != token:
return return
@@ -77,35 +90,28 @@ def _fire(room_id, polarity, token):
_group_send(room_id, {'type': 'pick_sky_available'}) _group_send(room_id, {'type': 'pick_sky_available'})
cache.delete(_cache_key(room_id, polarity)) cache.delete(_cache_key(room_id, polarity))
_timers.pop(f'{room_id}_{polarity}', None)
def schedule_polarity_confirm(room_id, polarity, seconds): def schedule_polarity_confirm(room_id, polarity, seconds):
"""Schedule a polarity confirm `seconds` seconds from now. Cancels any prior timer.""" """Schedule a polarity confirm `seconds` from now. A fresh token supersedes
cancel_polarity_confirm(room_id, polarity) any prior queued task (which then no-ops on the token guard)."""
token = str(uuid.uuid4()) token = str(uuid.uuid4())
# Store the absolute deadline alongside the token so a gamer loading a # Store the absolute deadline alongside the token so a gamer loading a fresh
# fresh seat view mid-countdown can derive the seconds left (the flashing # seat view mid-countdown can derive the seconds left (the flashing numeral)
# numeral) instead of falling back to a static WAIT NVM. See # instead of a static WAIT NVM. See countdown_remaining() + sig-select.js.
# countdown_remaining() + sig-select.js's restore-on-load.
cache.set( cache.set(
_cache_key(room_id, polarity), _cache_key(room_id, polarity),
{'token': token, 'deadline': time.time() + seconds}, {'token': token, 'deadline': time.time() + seconds},
timeout=int(seconds) + 60, timeout=int(seconds) + 60,
) )
confirm_polarity_room.apply_async(
timer = threading.Timer(seconds, _fire, args=[str(room_id), polarity, token]) args=[str(room_id), polarity, token], countdown=int(seconds),
timer.daemon = True )
timer.start()
_timers[f'{room_id}_{polarity}'] = timer
def cancel_polarity_confirm(room_id, polarity): def cancel_polarity_confirm(room_id, polarity):
"""Cancel any pending confirm for this room + polarity.""" """Cancel any pending confirm for this room + polarity. Deleting the token
timer = _timers.pop(f'{room_id}_{polarity}', None) is enough — the already-queued task finds no matching token and no-ops."""
if timer:
timer.cancel()
cache.delete(_cache_key(room_id, polarity)) cache.delete(_cache_key(room_id, polarity))

View File

@@ -19,27 +19,22 @@ class CancelPolarityConfirmTest(TestCase):
self.user = User.objects.create(email="owner@tasks.io") self.user = User.objects.create(email="owner@tasks.io")
self.room = Room.objects.create(name="R", owner=self.user) self.room = Room.objects.create(name="R", owner=self.user)
def test_cancel_with_no_timer_is_a_noop(self): def test_cancel_with_no_entry_is_a_noop(self):
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
def test_cancel_clears_cache_entry(self): def test_cancel_clears_cache_entry(self):
# Deleting the token is the whole cancel: a queued confirm task that
# fires later finds no matching token and no-ops.
from django.core.cache import cache from django.core.cache import cache
key = _cache_key(str(self.room.id), SigReservation.LEVITY) key = _cache_key(str(self.room.id), SigReservation.LEVITY)
cache.set(key, "sometoken", timeout=60) cache.set(key, "sometoken", timeout=60)
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
self.assertIsNone(cache.get(key)) self.assertIsNone(cache.get(key))
@patch("apps.epic.tasks._timers")
def test_cancel_calls_timer_cancel_when_present(self, mock_timers):
mock_timer = MagicMock()
key = f"{self.room.id}_levity"
mock_timers.pop.return_value = mock_timer
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
mock_timer.cancel.assert_called_once()
class FireFunctionTest(TestCase): class FireFunctionTest(TestCase):
"""Tests for the _fire() callback executed by threading.Timer.""" """Tests for the confirm_polarity_room() Celery task body (called directly,
synchronously, here — the worker runs it via apply_async in production)."""
def setUp(self): def setUp(self):
self.owner = User.objects.create(email="owner@fire.io") self.owner = User.objects.create(email="owner@fire.io")
@@ -70,23 +65,23 @@ class FireFunctionTest(TestCase):
@patch("apps.epic.tasks._group_send") @patch("apps.epic.tasks._group_send")
def test_fire_does_nothing_if_token_mismatch(self, mock_send): def test_fire_does_nothing_if_token_mismatch(self, mock_send):
from apps.epic.tasks import _fire from apps.epic.tasks import confirm_polarity_room
self._set_token() self._set_token()
_fire(str(self.room.id), SigReservation.LEVITY, "wrong-token") confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, "wrong-token")
mock_send.assert_not_called() mock_send.assert_not_called()
@patch("apps.epic.tasks._group_send") @patch("apps.epic.tasks._group_send")
def test_fire_does_nothing_if_room_not_sig_select(self, mock_send): def test_fire_does_nothing_if_room_not_sig_select(self, mock_send):
from apps.epic.tasks import _fire from apps.epic.tasks import confirm_polarity_room
token = self._set_token() token = self._set_token()
self.room.table_status = Room.ROLE_SELECT self.room.table_status = Room.ROLE_SELECT
self.room.save() self.room.save()
_fire(str(self.room.id), SigReservation.LEVITY, token) confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
mock_send.assert_not_called() mock_send.assert_not_called()
@patch("apps.epic.tasks._group_send") @patch("apps.epic.tasks._group_send")
def test_fire_does_nothing_if_fewer_than_3_ready(self, mock_send): def test_fire_does_nothing_if_fewer_than_3_ready(self, mock_send):
from apps.epic.tasks import _fire from apps.epic.tasks import confirm_polarity_room
token = self._set_token() token = self._set_token()
cards = list(TarotCard.objects.all()[:2]) cards = list(TarotCard.objects.all()[:2])
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC"])) seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC"]))
@@ -95,12 +90,12 @@ class FireFunctionTest(TestCase):
room=self.room, gamer=seat.gamer, card=cards[i], room=self.room, gamer=seat.gamer, card=cards[i],
polarity=SigReservation.LEVITY, seat=seat, ready=True, polarity=SigReservation.LEVITY, seat=seat, ready=True,
) )
_fire(str(self.room.id), SigReservation.LEVITY, token) confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
mock_send.assert_not_called() mock_send.assert_not_called()
@patch("apps.epic.tasks._group_send") @patch("apps.epic.tasks._group_send")
def test_fire_assigns_significators_and_broadcasts_when_all_ready(self, mock_send): def test_fire_assigns_significators_and_broadcasts_when_all_ready(self, mock_send):
from apps.epic.tasks import _fire from apps.epic.tasks import confirm_polarity_room
token = self._set_token() token = self._set_token()
cards = list(TarotCard.objects.all()[:3]) cards = list(TarotCard.objects.all()[:3])
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])) seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]))
@@ -109,7 +104,7 @@ class FireFunctionTest(TestCase):
room=self.room, gamer=seat.gamer, card=cards[i], room=self.room, gamer=seat.gamer, card=cards[i],
polarity=SigReservation.LEVITY, seat=seat, ready=True, polarity=SigReservation.LEVITY, seat=seat, ready=True,
) )
_fire(str(self.room.id), SigReservation.LEVITY, token) confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
self.assertTrue(mock_send.called) self.assertTrue(mock_send.called)
levity_seats = TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]) levity_seats = TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])
for i, seat in enumerate(levity_seats): for i, seat in enumerate(levity_seats):
@@ -117,29 +112,29 @@ class FireFunctionTest(TestCase):
self.assertEqual(seat.significator, cards[i]) self.assertEqual(seat.significator, cards[i])
def test_fire_does_nothing_for_nonexistent_room(self): def test_fire_does_nothing_for_nonexistent_room(self):
from apps.epic.tasks import _fire from apps.epic.tasks import confirm_polarity_room
from django.core.cache import cache from django.core.cache import cache
fake_id = "00000000-0000-0000-0000-000000000000" fake_id = "00000000-0000-0000-0000-000000000000"
token = "known-token" token = "known-token"
cache.set(_cache_key(fake_id, SigReservation.LEVITY), token, 60) cache.set(_cache_key(fake_id, SigReservation.LEVITY), token, 60)
_fire(fake_id, SigReservation.LEVITY, token) confirm_polarity_room(fake_id, SigReservation.LEVITY, token)
@patch("apps.epic.tasks._group_send") @patch("apps.epic.tasks._group_send")
def test_fire_does_nothing_if_all_sigs_already_assigned(self, mock_send): def test_fire_does_nothing_if_all_sigs_already_assigned(self, mock_send):
from apps.epic.tasks import _fire from apps.epic.tasks import confirm_polarity_room
token = self._set_token() token = self._set_token()
cards = list(TarotCard.objects.all()[:3]) cards = list(TarotCard.objects.all()[:3])
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])) seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]))
for i, seat in enumerate(seats): for i, seat in enumerate(seats):
seat.significator = cards[i] seat.significator = cards[i]
seat.save(update_fields=["significator"]) seat.save(update_fields=["significator"])
_fire(str(self.room.id), SigReservation.LEVITY, token) confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
mock_send.assert_not_called() mock_send.assert_not_called()
@patch("apps.epic.tasks._group_send") @patch("apps.epic.tasks._group_send")
def test_fire_broadcasts_pick_sky_when_all_polarity_sigs_assigned(self, mock_send): def test_fire_broadcasts_pick_sky_when_all_polarity_sigs_assigned(self, mock_send):
"""When both levity AND gravity seats all have significators, fire() triggers SKY_SELECT.""" """When both levity AND gravity seats all have significators, fire() triggers SKY_SELECT."""
from apps.epic.tasks import _fire from apps.epic.tasks import confirm_polarity_room
token = self._set_token() token = self._set_token()
cards = list(TarotCard.objects.all()[:6]) cards = list(TarotCard.objects.all()[:6])
# Give gravity seats significators so the all-assigned check passes # Give gravity seats significators so the all-assigned check passes
@@ -155,7 +150,7 @@ class FireFunctionTest(TestCase):
room=self.room, gamer=seat.gamer, card=levity_cards[i], room=self.room, gamer=seat.gamer, card=levity_cards[i],
polarity=SigReservation.LEVITY, seat=seat, ready=True, polarity=SigReservation.LEVITY, seat=seat, ready=True,
) )
_fire(str(self.room.id), SigReservation.LEVITY, token) confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
call_types = [c.args[1]["type"] for c in mock_send.call_args_list] call_types = [c.args[1]["type"] for c in mock_send.call_args_list]
self.assertIn("polarity_room_done", call_types) self.assertIn("polarity_room_done", call_types)
self.assertIn("pick_sky_available", call_types) self.assertIn("pick_sky_available", call_types)
@@ -170,27 +165,30 @@ class SchedulePolarityConfirmTest(TestCase):
def test_schedule_sets_cache_token(self): def test_schedule_sets_cache_token(self):
from django.core.cache import cache from django.core.cache import cache
with patch("apps.epic.tasks.confirm_polarity_room.apply_async"):
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
token = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY)) entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY))
self.assertIsNotNone(token) self.assertIsNotNone(entry)
def test_schedule_registers_timer(self): def test_schedule_enqueues_confirm_task_with_countdown(self):
from apps.epic.tasks import _timers # The deferred confirm is a Celery task fired `seconds` from now (the
key = f"{self.room.id}_{SigReservation.LEVITY}" # worker process broadcasts reliably, unlike the old timer thread).
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) with patch("apps.epic.tasks.confirm_polarity_room.apply_async") as mock_async:
self.assertIn(key, _timers) schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
_timers[key].cancel() # clean up mock_async.assert_called_once()
self.assertEqual(mock_async.call_args.kwargs["countdown"], 12)
def test_schedule_cancels_prior_timer_before_scheduling(self): def test_schedule_supersedes_prior_via_fresh_token(self):
from apps.epic.tasks import _timers # No task revocation: a new schedule just writes a fresh token, so the
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) # previously-queued task no-ops on the token guard when it fires.
key = f"{self.room.id}_{SigReservation.LEVITY}" from django.core.cache import cache
first_timer = _timers.get(key) key = _cache_key(str(self.room.id), SigReservation.LEVITY)
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) with patch("apps.epic.tasks.confirm_polarity_room.apply_async"):
second_timer = _timers.get(key) schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
self.assertIsNotNone(second_timer) first = cache.get(key)["token"]
self.assertIsNot(first_timer, second_timer) schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
second_timer.cancel() second = cache.get(key)["token"]
self.assertNotEqual(first, second)
class CountdownRemainingTest(TestCase): class CountdownRemainingTest(TestCase):
@@ -206,7 +204,8 @@ class CountdownRemainingTest(TestCase):
def tearDown(self): def tearDown(self):
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
def test_schedule_stores_deadline_alongside_token(self): @patch("apps.epic.tasks.confirm_polarity_room.apply_async")
def test_schedule_stores_deadline_alongside_token(self, _async):
from django.core.cache import cache from django.core.cache import cache
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY)) entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY))
@@ -214,7 +213,8 @@ class CountdownRemainingTest(TestCase):
self.assertIn("token", entry) self.assertIn("token", entry)
self.assertIn("deadline", entry) self.assertIn("deadline", entry)
def test_countdown_remaining_returns_seconds_left(self): @patch("apps.epic.tasks.confirm_polarity_room.apply_async")
def test_countdown_remaining_returns_seconds_left(self, _async):
from apps.epic.tasks import countdown_remaining from apps.epic.tasks import countdown_remaining
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
remaining = countdown_remaining(str(self.room.id), SigReservation.LEVITY) remaining = countdown_remaining(str(self.room.id), SigReservation.LEVITY)

View File

@@ -250,3 +250,9 @@ if 'test' in sys.argv:
'BACKEND': 'channels.layers.InMemoryChannelLayer', 'BACKEND': 'channels.layers.InMemoryChannelLayer',
} }
} }
# In-process Kombu broker so Celery `apply_async` (the polarity-confirm
# countdown) queues without a live Redis + without running the task (no
# worker consumes it) — mirrors the old threading.Timer that was scheduled
# but never fired inside a sub-12s test. NOT eager: eager would ignore the
# countdown and assign significators synchronously during the ready POST.
CELERY_BROKER_URL = 'memory://'