Sig countdown: run the post-countdown confirm as a Celery task + auto-reconnect the room WS — TDD

The flaky tray→thumbnail→hex animation after the 12s countdown.

Root cause: the confirm ran in a threading.Timer thread inside the web process,
and _fire broadcast polarity_room_done / pick_sky_available via
async_to_sync(group_send) from an ephemeral per-call event loop. With the Redis
channel layer that publish is unreliable across loops (the production analog of
the "broadcast must originate in daphne" test trap), so the live events reached
the client only sporadically — the server-side state (sig assignment,
SKY_SELECT) still committed, which is why a refresh always showed the concluded
hex but the animation usually didn't play.

Fix (chosen: migrate to Celery — the path the tasks.py docstring already called
for): _fire becomes the @shared_task confirm_polarity_room, enqueued by
schedule_polarity_confirm via apply_async(countdown=seconds). The worker is a
stable long-lived process whose channel-layer singleton is never shared with a
serving loop, so its group_send reaches daphne reliably; it also survives
web-worker restarts. No task revocation needed — cancellation/supersession ride
the existing cache token guard (cancel just deletes the token; a stale queued
task no-ops). Dropped threading.Timer + the _timers registry.

Test settings get CELERY_BROKER_URL='memory://' so apply_async queues without a
live Redis and without running the task (no worker) — mirrors the old timer that
was scheduled but never fired inside a sub-12s test. NOT eager: eager would
ignore the countdown and assign significators synchronously during the ready
POST. test_tasks rewritten: confirm_polarity_room called directly (task body),
schedule asserts the enqueue + countdown + fresh-token supersession; the
broadcast itself stays IT-uncoverable under InMemory (known channels limit).

Also: room.js now auto-reconnects the room WebSocket with capped exponential
backoff (1s→30s, reset on open, halted on beforeunload). A dropped socket (proxy
idle-timeout, blip, server restart) previously stayed dead until a manual
refresh, silently losing every live event — an independent reliability gap that
compounded the "sporadic" feel.

602 epic ITs + 18 task UTs green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Disco DeDisco
2026-06-05 14:52:21 -04:00
parent f3f509a59a
commit a6ce20761b
4 changed files with 120 additions and 86 deletions

View File

@@ -117,17 +117,39 @@
// multi-seat owner never receives the other polarity's countdown.
const seatParam = new URLSearchParams(window.location.search).get('seat');
const wsSeat = seatParam ? `?seat=${encodeURIComponent(seatParam)}` : '';
const ws = new WebSocket(`${wsScheme}://${window.location.host}/ws/room/${roomId}/${wsSeat}`);
const wsUrl = `${wsScheme}://${window.location.host}/ws/room/${roomId}/${wsSeat}`;
// Auto-reconnect with capped exponential backoff. A dropped socket (proxy
// idle-timeout, brief network blip, server restart) otherwise stayed dead
// until a manual refresh, silently losing live events — most painfully the
// 12s sig countdown's polarity_room_done / pick_sky_available, so the
// tray→hex animation never played. Backoff resets on a clean open; we stop
// retrying once the page is unloading so we don't reconnect during nav.
let backoff = 1000;
const BACKOFF_MAX = 30000;
let unloading = false;
window.addEventListener('beforeunload', function () { unloading = true; });
function connect() {
const ws = new WebSocket(wsUrl);
window._roomSocket = ws; // exposed for sig-select.js hover broadcast
ws.onopen = function () { backoff = 1000; };
ws.onmessage = function (event) {
const data = JSON.parse(event.data);
window.dispatchEvent(new CustomEvent('room:' + data.type, { detail: data }));
};
ws.onclose = function (event) {
if (unloading) return;
if (!event.wasClean) {
console.warn('Room WebSocket closed unexpectedly');
console.warn('Room WebSocket closed unexpectedly — reconnecting in ' + backoff + 'ms');
}
setTimeout(connect, backoff);
backoff = Math.min(backoff * 2, BACKOFF_MAX);
};
}
connect();
}());

View File

@@ -1,24 +1,35 @@
"""
Countdown scheduler for the polarity-room SAVE SIG gate.
Uses threading.Timer so no separate Celery worker is needed in development.
Single-process only — swap for a Celery task if production uses multiple
web workers (gunicorn -w N with N > 1).
The 12s confirm runs as a **Celery task** (`apply_async` with a `countdown`) so
the post-countdown significator assignment + the `polarity_room_done` /
`pick_sky_available` broadcasts originate in the long-lived Celery worker
process. The old `threading.Timer` approach broadcast via
`async_to_sync(group_send)` from an ephemeral per-call event loop inside the web
process, which is unreliable with the Redis channel layer — the broadcast
reached the consumer only sporadically (the production analog of the
"broadcast must originate in daphne" test trap), so the tray→hex animation
fired intermittently while the server-side state still committed. The worker is
a stable process whose channel-layer singleton is never shared with a serving
loop, so its `group_send` reaches daphne reliably.
Cancellation / supersession needs no task revocation: the cache token guard
makes a stale queued task a no-op. `schedule_polarity_confirm` writes a fresh
token (superseding any prior queued task), and `cancel_polarity_confirm` just
deletes it — either way a task that fires later finds a missing/replaced token
and returns without assigning.
"""
import threading
import time
import uuid
from asgiref.sync import async_to_sync
from celery import shared_task
from channels.layers import get_channel_layer
from django.core.cache import cache
_LEVITY_ROLES = {'PC', 'NC', 'SC'}
_GRAVITY_ROLES = {'BC', 'EC', 'AC'}
# In-process registry of pending timers: "{room_id}_{polarity}" → Timer
_timers = {}
def _cache_key(room_id, polarity):
return f'sig_countdown_{room_id}_{polarity}'
@@ -28,13 +39,15 @@ def _group_send(room_id, msg):
async_to_sync(get_channel_layer().group_send)(f'room_{room_id}', msg)
def _fire(room_id, polarity, token):
"""Callback run by threading.Timer after the countdown expires."""
# Token guard: if cancelled or superseded, cache entry will differ. The
# entry is now a {token, deadline} dict (was a bare token string before the
# restore-on-load sprint) — read either shape defensively so a stale plain
# string left by an older deploy doesn't crash the timer callback.
@shared_task
def confirm_polarity_room(room_id, polarity, token):
"""Assign significators + broadcast room-done once a polarity's countdown
elapses. Enqueued by `schedule_polarity_confirm` with a `countdown`; run by
the Celery worker. The cache token guard makes a cancelled or superseded
run a no-op (its token no longer matches what's in the cache)."""
entry = cache.get(_cache_key(room_id, polarity))
# The entry is a {token, deadline} dict; tolerate a bare-string token from
# an older deploy so a stale value can't crash the task.
stored_token = entry.get('token') if isinstance(entry, dict) else entry
if stored_token != token:
return
@@ -77,35 +90,28 @@ def _fire(room_id, polarity, token):
_group_send(room_id, {'type': 'pick_sky_available'})
cache.delete(_cache_key(room_id, polarity))
_timers.pop(f'{room_id}_{polarity}', None)
def schedule_polarity_confirm(room_id, polarity, seconds):
"""Schedule a polarity confirm `seconds` seconds from now. Cancels any prior timer."""
cancel_polarity_confirm(room_id, polarity)
"""Schedule a polarity confirm `seconds` from now. A fresh token supersedes
any prior queued task (which then no-ops on the token guard)."""
token = str(uuid.uuid4())
# Store the absolute deadline alongside the token so a gamer loading a
# fresh seat view mid-countdown can derive the seconds left (the flashing
# numeral) instead of falling back to a static WAIT NVM. See
# countdown_remaining() + sig-select.js's restore-on-load.
# Store the absolute deadline alongside the token so a gamer loading a fresh
# seat view mid-countdown can derive the seconds left (the flashing numeral)
# instead of a static WAIT NVM. See countdown_remaining() + sig-select.js.
cache.set(
_cache_key(room_id, polarity),
{'token': token, 'deadline': time.time() + seconds},
timeout=int(seconds) + 60,
)
timer = threading.Timer(seconds, _fire, args=[str(room_id), polarity, token])
timer.daemon = True
timer.start()
_timers[f'{room_id}_{polarity}'] = timer
confirm_polarity_room.apply_async(
args=[str(room_id), polarity, token], countdown=int(seconds),
)
def cancel_polarity_confirm(room_id, polarity):
"""Cancel any pending confirm for this room + polarity."""
timer = _timers.pop(f'{room_id}_{polarity}', None)
if timer:
timer.cancel()
"""Cancel any pending confirm for this room + polarity. Deleting the token
is enough — the already-queued task finds no matching token and no-ops."""
cache.delete(_cache_key(room_id, polarity))

View File

@@ -19,27 +19,22 @@ class CancelPolarityConfirmTest(TestCase):
self.user = User.objects.create(email="owner@tasks.io")
self.room = Room.objects.create(name="R", owner=self.user)
def test_cancel_with_no_timer_is_a_noop(self):
def test_cancel_with_no_entry_is_a_noop(self):
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
def test_cancel_clears_cache_entry(self):
# Deleting the token is the whole cancel: a queued confirm task that
# fires later finds no matching token and no-ops.
from django.core.cache import cache
key = _cache_key(str(self.room.id), SigReservation.LEVITY)
cache.set(key, "sometoken", timeout=60)
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
self.assertIsNone(cache.get(key))
@patch("apps.epic.tasks._timers")
def test_cancel_calls_timer_cancel_when_present(self, mock_timers):
mock_timer = MagicMock()
key = f"{self.room.id}_levity"
mock_timers.pop.return_value = mock_timer
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
mock_timer.cancel.assert_called_once()
class FireFunctionTest(TestCase):
"""Tests for the _fire() callback executed by threading.Timer."""
"""Tests for the confirm_polarity_room() Celery task body (called directly,
synchronously, here — the worker runs it via apply_async in production)."""
def setUp(self):
self.owner = User.objects.create(email="owner@fire.io")
@@ -70,23 +65,23 @@ class FireFunctionTest(TestCase):
@patch("apps.epic.tasks._group_send")
def test_fire_does_nothing_if_token_mismatch(self, mock_send):
from apps.epic.tasks import _fire
from apps.epic.tasks import confirm_polarity_room
self._set_token()
_fire(str(self.room.id), SigReservation.LEVITY, "wrong-token")
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, "wrong-token")
mock_send.assert_not_called()
@patch("apps.epic.tasks._group_send")
def test_fire_does_nothing_if_room_not_sig_select(self, mock_send):
from apps.epic.tasks import _fire
from apps.epic.tasks import confirm_polarity_room
token = self._set_token()
self.room.table_status = Room.ROLE_SELECT
self.room.save()
_fire(str(self.room.id), SigReservation.LEVITY, token)
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
mock_send.assert_not_called()
@patch("apps.epic.tasks._group_send")
def test_fire_does_nothing_if_fewer_than_3_ready(self, mock_send):
from apps.epic.tasks import _fire
from apps.epic.tasks import confirm_polarity_room
token = self._set_token()
cards = list(TarotCard.objects.all()[:2])
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC"]))
@@ -95,12 +90,12 @@ class FireFunctionTest(TestCase):
room=self.room, gamer=seat.gamer, card=cards[i],
polarity=SigReservation.LEVITY, seat=seat, ready=True,
)
_fire(str(self.room.id), SigReservation.LEVITY, token)
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
mock_send.assert_not_called()
@patch("apps.epic.tasks._group_send")
def test_fire_assigns_significators_and_broadcasts_when_all_ready(self, mock_send):
from apps.epic.tasks import _fire
from apps.epic.tasks import confirm_polarity_room
token = self._set_token()
cards = list(TarotCard.objects.all()[:3])
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]))
@@ -109,7 +104,7 @@ class FireFunctionTest(TestCase):
room=self.room, gamer=seat.gamer, card=cards[i],
polarity=SigReservation.LEVITY, seat=seat, ready=True,
)
_fire(str(self.room.id), SigReservation.LEVITY, token)
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
self.assertTrue(mock_send.called)
levity_seats = TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])
for i, seat in enumerate(levity_seats):
@@ -117,29 +112,29 @@ class FireFunctionTest(TestCase):
self.assertEqual(seat.significator, cards[i])
def test_fire_does_nothing_for_nonexistent_room(self):
from apps.epic.tasks import _fire
from apps.epic.tasks import confirm_polarity_room
from django.core.cache import cache
fake_id = "00000000-0000-0000-0000-000000000000"
token = "known-token"
cache.set(_cache_key(fake_id, SigReservation.LEVITY), token, 60)
_fire(fake_id, SigReservation.LEVITY, token)
confirm_polarity_room(fake_id, SigReservation.LEVITY, token)
@patch("apps.epic.tasks._group_send")
def test_fire_does_nothing_if_all_sigs_already_assigned(self, mock_send):
from apps.epic.tasks import _fire
from apps.epic.tasks import confirm_polarity_room
token = self._set_token()
cards = list(TarotCard.objects.all()[:3])
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]))
for i, seat in enumerate(seats):
seat.significator = cards[i]
seat.save(update_fields=["significator"])
_fire(str(self.room.id), SigReservation.LEVITY, token)
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
mock_send.assert_not_called()
@patch("apps.epic.tasks._group_send")
def test_fire_broadcasts_pick_sky_when_all_polarity_sigs_assigned(self, mock_send):
"""When both levity AND gravity seats all have significators, fire() triggers SKY_SELECT."""
from apps.epic.tasks import _fire
from apps.epic.tasks import confirm_polarity_room
token = self._set_token()
cards = list(TarotCard.objects.all()[:6])
# Give gravity seats significators so the all-assigned check passes
@@ -155,7 +150,7 @@ class FireFunctionTest(TestCase):
room=self.room, gamer=seat.gamer, card=levity_cards[i],
polarity=SigReservation.LEVITY, seat=seat, ready=True,
)
_fire(str(self.room.id), SigReservation.LEVITY, token)
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
call_types = [c.args[1]["type"] for c in mock_send.call_args_list]
self.assertIn("polarity_room_done", call_types)
self.assertIn("pick_sky_available", call_types)
@@ -170,27 +165,30 @@ class SchedulePolarityConfirmTest(TestCase):
def test_schedule_sets_cache_token(self):
from django.core.cache import cache
with patch("apps.epic.tasks.confirm_polarity_room.apply_async"):
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
token = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY))
self.assertIsNotNone(token)
entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY))
self.assertIsNotNone(entry)
def test_schedule_registers_timer(self):
from apps.epic.tasks import _timers
key = f"{self.room.id}_{SigReservation.LEVITY}"
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
self.assertIn(key, _timers)
_timers[key].cancel() # clean up
def test_schedule_enqueues_confirm_task_with_countdown(self):
# The deferred confirm is a Celery task fired `seconds` from now (the
# worker process broadcasts reliably, unlike the old timer thread).
with patch("apps.epic.tasks.confirm_polarity_room.apply_async") as mock_async:
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
mock_async.assert_called_once()
self.assertEqual(mock_async.call_args.kwargs["countdown"], 12)
def test_schedule_cancels_prior_timer_before_scheduling(self):
from apps.epic.tasks import _timers
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
key = f"{self.room.id}_{SigReservation.LEVITY}"
first_timer = _timers.get(key)
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
second_timer = _timers.get(key)
self.assertIsNotNone(second_timer)
self.assertIsNot(first_timer, second_timer)
second_timer.cancel()
def test_schedule_supersedes_prior_via_fresh_token(self):
# No task revocation: a new schedule just writes a fresh token, so the
# previously-queued task no-ops on the token guard when it fires.
from django.core.cache import cache
key = _cache_key(str(self.room.id), SigReservation.LEVITY)
with patch("apps.epic.tasks.confirm_polarity_room.apply_async"):
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
first = cache.get(key)["token"]
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
second = cache.get(key)["token"]
self.assertNotEqual(first, second)
class CountdownRemainingTest(TestCase):
@@ -206,7 +204,8 @@ class CountdownRemainingTest(TestCase):
def tearDown(self):
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
def test_schedule_stores_deadline_alongside_token(self):
@patch("apps.epic.tasks.confirm_polarity_room.apply_async")
def test_schedule_stores_deadline_alongside_token(self, _async):
from django.core.cache import cache
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY))
@@ -214,7 +213,8 @@ class CountdownRemainingTest(TestCase):
self.assertIn("token", entry)
self.assertIn("deadline", entry)
def test_countdown_remaining_returns_seconds_left(self):
@patch("apps.epic.tasks.confirm_polarity_room.apply_async")
def test_countdown_remaining_returns_seconds_left(self, _async):
from apps.epic.tasks import countdown_remaining
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
remaining = countdown_remaining(str(self.room.id), SigReservation.LEVITY)

View File

@@ -250,3 +250,9 @@ if 'test' in sys.argv:
'BACKEND': 'channels.layers.InMemoryChannelLayer',
}
}
# In-process Kombu broker so Celery `apply_async` (the polarity-confirm
# countdown) queues without a live Redis + without running the task (no
# worker consumes it) — mirrors the old threading.Timer that was scheduled
# but never fired inside a sub-12s test. NOT eager: eager would ignore the
# countdown and assign significators synchronously during the ready POST.
CELERY_BROKER_URL = 'memory://'