Revert the Celery countdown migration — it broke local dev (no worker) — back to threading.Timer
a6ce207moved the 12s polarity confirm to a Celery task (apply_async countdown). That requires a running Celery worker to execute it — but local dev runs only uvicorn (the dev-server skill starts no worker; the original tasks.py docstring chose threading.Timer precisely "so no separate Celery worker is needed in development"). So locally the confirm was queued and never ran: the countdown hit 0, no significators saved, and a refresh stayed in SIG_SELECT (no skip to the table hex). A regression in the core flow. Restore tasks.py + test_tasks.py to thefaaa4ecthreading.Timer version (still in-process, with the {token, deadline} cache + countdown_remaining restore-on- load intact) and drop the now-unneeded CELERY_BROKER_URL='memory://' test override. Kept froma6ce207: the room.js WebSocket auto-reconnect — that is the actual fix for the dropped-socket delivery bug (the SigSelectSpec bisection proved the client restarts the numeral fine on re-received events; the failure was delivery, which a dead socket with no reconnect explains). Celery was a misdiagnosis of an in-process broadcast that works fine for a single-process dev/staging server. 23 task UTs + CARTE sig ITs green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,35 +1,24 @@
|
||||
"""
|
||||
Countdown scheduler for the polarity-room SAVE SIG gate.
|
||||
|
||||
The 12s confirm runs as a **Celery task** (`apply_async` with a `countdown`) so
|
||||
the post-countdown significator assignment + the `polarity_room_done` /
|
||||
`pick_sky_available` broadcasts originate in the long-lived Celery worker
|
||||
process. The old `threading.Timer` approach broadcast via
|
||||
`async_to_sync(group_send)` from an ephemeral per-call event loop inside the web
|
||||
process, which is unreliable with the Redis channel layer — the broadcast
|
||||
reached the consumer only sporadically (the production analog of the
|
||||
"broadcast must originate in daphne" test trap), so the tray→hex animation
|
||||
fired intermittently while the server-side state still committed. The worker is
|
||||
a stable process whose channel-layer singleton is never shared with a serving
|
||||
loop, so its `group_send` reaches daphne reliably.
|
||||
|
||||
Cancellation / supersession needs no task revocation: the cache token guard
|
||||
makes a stale queued task a no-op. `schedule_polarity_confirm` writes a fresh
|
||||
token (superseding any prior queued task), and `cancel_polarity_confirm` just
|
||||
deletes it — either way a task that fires later finds a missing/replaced token
|
||||
and returns without assigning.
|
||||
Uses threading.Timer so no separate Celery worker is needed in development.
|
||||
Single-process only — swap for a Celery task if production uses multiple
|
||||
web workers (gunicorn -w N with N > 1).
|
||||
"""
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from asgiref.sync import async_to_sync
|
||||
from celery import shared_task
|
||||
from channels.layers import get_channel_layer
|
||||
from django.core.cache import cache
|
||||
|
||||
_LEVITY_ROLES = {'PC', 'NC', 'SC'}
|
||||
_GRAVITY_ROLES = {'BC', 'EC', 'AC'}
|
||||
|
||||
# In-process registry of pending timers: "{room_id}_{polarity}" → Timer
|
||||
_timers = {}
|
||||
|
||||
|
||||
def _cache_key(room_id, polarity):
|
||||
return f'sig_countdown_{room_id}_{polarity}'
|
||||
@@ -39,15 +28,13 @@ def _group_send(room_id, msg):
|
||||
async_to_sync(get_channel_layer().group_send)(f'room_{room_id}', msg)
|
||||
|
||||
|
||||
@shared_task
|
||||
def confirm_polarity_room(room_id, polarity, token):
|
||||
"""Assign significators + broadcast room-done once a polarity's countdown
|
||||
elapses. Enqueued by `schedule_polarity_confirm` with a `countdown`; run by
|
||||
the Celery worker. The cache token guard makes a cancelled or superseded
|
||||
run a no-op (its token no longer matches what's in the cache)."""
|
||||
def _fire(room_id, polarity, token):
|
||||
"""Callback run by threading.Timer after the countdown expires."""
|
||||
# Token guard: if cancelled or superseded, cache entry will differ. The
|
||||
# entry is now a {token, deadline} dict (was a bare token string before the
|
||||
# restore-on-load sprint) — read either shape defensively so a stale plain
|
||||
# string left by an older deploy doesn't crash the timer callback.
|
||||
entry = cache.get(_cache_key(room_id, polarity))
|
||||
# The entry is a {token, deadline} dict; tolerate a bare-string token from
|
||||
# an older deploy so a stale value can't crash the task.
|
||||
stored_token = entry.get('token') if isinstance(entry, dict) else entry
|
||||
if stored_token != token:
|
||||
return
|
||||
@@ -90,28 +77,35 @@ def confirm_polarity_room(room_id, polarity, token):
|
||||
_group_send(room_id, {'type': 'pick_sky_available'})
|
||||
|
||||
cache.delete(_cache_key(room_id, polarity))
|
||||
_timers.pop(f'{room_id}_{polarity}', None)
|
||||
|
||||
|
||||
def schedule_polarity_confirm(room_id, polarity, seconds):
|
||||
"""Schedule a polarity confirm `seconds` from now. A fresh token supersedes
|
||||
any prior queued task (which then no-ops on the token guard)."""
|
||||
"""Schedule a polarity confirm `seconds` seconds from now. Cancels any prior timer."""
|
||||
cancel_polarity_confirm(room_id, polarity)
|
||||
|
||||
token = str(uuid.uuid4())
|
||||
# Store the absolute deadline alongside the token so a gamer loading a fresh
|
||||
# seat view mid-countdown can derive the seconds left (the flashing numeral)
|
||||
# instead of a static WAIT NVM. See countdown_remaining() + sig-select.js.
|
||||
# Store the absolute deadline alongside the token so a gamer loading a
|
||||
# fresh seat view mid-countdown can derive the seconds left (the flashing
|
||||
# numeral) instead of falling back to a static WAIT NVM. See
|
||||
# countdown_remaining() + sig-select.js's restore-on-load.
|
||||
cache.set(
|
||||
_cache_key(room_id, polarity),
|
||||
{'token': token, 'deadline': time.time() + seconds},
|
||||
timeout=int(seconds) + 60,
|
||||
)
|
||||
confirm_polarity_room.apply_async(
|
||||
args=[str(room_id), polarity, token], countdown=int(seconds),
|
||||
)
|
||||
|
||||
timer = threading.Timer(seconds, _fire, args=[str(room_id), polarity, token])
|
||||
timer.daemon = True
|
||||
timer.start()
|
||||
_timers[f'{room_id}_{polarity}'] = timer
|
||||
|
||||
|
||||
def cancel_polarity_confirm(room_id, polarity):
|
||||
"""Cancel any pending confirm for this room + polarity. Deleting the token
|
||||
is enough — the already-queued task finds no matching token and no-ops."""
|
||||
"""Cancel any pending confirm for this room + polarity."""
|
||||
timer = _timers.pop(f'{room_id}_{polarity}', None)
|
||||
if timer:
|
||||
timer.cancel()
|
||||
cache.delete(_cache_key(room_id, polarity))
|
||||
|
||||
|
||||
|
||||
@@ -19,22 +19,27 @@ class CancelPolarityConfirmTest(TestCase):
|
||||
self.user = User.objects.create(email="owner@tasks.io")
|
||||
self.room = Room.objects.create(name="R", owner=self.user)
|
||||
|
||||
def test_cancel_with_no_entry_is_a_noop(self):
|
||||
def test_cancel_with_no_timer_is_a_noop(self):
|
||||
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
|
||||
|
||||
def test_cancel_clears_cache_entry(self):
|
||||
# Deleting the token is the whole cancel: a queued confirm task that
|
||||
# fires later finds no matching token and no-ops.
|
||||
from django.core.cache import cache
|
||||
key = _cache_key(str(self.room.id), SigReservation.LEVITY)
|
||||
cache.set(key, "sometoken", timeout=60)
|
||||
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
|
||||
self.assertIsNone(cache.get(key))
|
||||
|
||||
@patch("apps.epic.tasks._timers")
|
||||
def test_cancel_calls_timer_cancel_when_present(self, mock_timers):
|
||||
mock_timer = MagicMock()
|
||||
key = f"{self.room.id}_levity"
|
||||
mock_timers.pop.return_value = mock_timer
|
||||
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
|
||||
mock_timer.cancel.assert_called_once()
|
||||
|
||||
|
||||
class FireFunctionTest(TestCase):
|
||||
"""Tests for the confirm_polarity_room() Celery task body (called directly,
|
||||
synchronously, here — the worker runs it via apply_async in production)."""
|
||||
"""Tests for the _fire() callback executed by threading.Timer."""
|
||||
|
||||
def setUp(self):
|
||||
self.owner = User.objects.create(email="owner@fire.io")
|
||||
@@ -65,23 +70,23 @@ class FireFunctionTest(TestCase):
|
||||
|
||||
@patch("apps.epic.tasks._group_send")
|
||||
def test_fire_does_nothing_if_token_mismatch(self, mock_send):
|
||||
from apps.epic.tasks import confirm_polarity_room
|
||||
from apps.epic.tasks import _fire
|
||||
self._set_token()
|
||||
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, "wrong-token")
|
||||
_fire(str(self.room.id), SigReservation.LEVITY, "wrong-token")
|
||||
mock_send.assert_not_called()
|
||||
|
||||
@patch("apps.epic.tasks._group_send")
|
||||
def test_fire_does_nothing_if_room_not_sig_select(self, mock_send):
|
||||
from apps.epic.tasks import confirm_polarity_room
|
||||
from apps.epic.tasks import _fire
|
||||
token = self._set_token()
|
||||
self.room.table_status = Room.ROLE_SELECT
|
||||
self.room.save()
|
||||
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
|
||||
_fire(str(self.room.id), SigReservation.LEVITY, token)
|
||||
mock_send.assert_not_called()
|
||||
|
||||
@patch("apps.epic.tasks._group_send")
|
||||
def test_fire_does_nothing_if_fewer_than_3_ready(self, mock_send):
|
||||
from apps.epic.tasks import confirm_polarity_room
|
||||
from apps.epic.tasks import _fire
|
||||
token = self._set_token()
|
||||
cards = list(TarotCard.objects.all()[:2])
|
||||
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC"]))
|
||||
@@ -90,12 +95,12 @@ class FireFunctionTest(TestCase):
|
||||
room=self.room, gamer=seat.gamer, card=cards[i],
|
||||
polarity=SigReservation.LEVITY, seat=seat, ready=True,
|
||||
)
|
||||
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
|
||||
_fire(str(self.room.id), SigReservation.LEVITY, token)
|
||||
mock_send.assert_not_called()
|
||||
|
||||
@patch("apps.epic.tasks._group_send")
|
||||
def test_fire_assigns_significators_and_broadcasts_when_all_ready(self, mock_send):
|
||||
from apps.epic.tasks import confirm_polarity_room
|
||||
from apps.epic.tasks import _fire
|
||||
token = self._set_token()
|
||||
cards = list(TarotCard.objects.all()[:3])
|
||||
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]))
|
||||
@@ -104,7 +109,7 @@ class FireFunctionTest(TestCase):
|
||||
room=self.room, gamer=seat.gamer, card=cards[i],
|
||||
polarity=SigReservation.LEVITY, seat=seat, ready=True,
|
||||
)
|
||||
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
|
||||
_fire(str(self.room.id), SigReservation.LEVITY, token)
|
||||
self.assertTrue(mock_send.called)
|
||||
levity_seats = TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])
|
||||
for i, seat in enumerate(levity_seats):
|
||||
@@ -112,29 +117,29 @@ class FireFunctionTest(TestCase):
|
||||
self.assertEqual(seat.significator, cards[i])
|
||||
|
||||
def test_fire_does_nothing_for_nonexistent_room(self):
|
||||
from apps.epic.tasks import confirm_polarity_room
|
||||
from apps.epic.tasks import _fire
|
||||
from django.core.cache import cache
|
||||
fake_id = "00000000-0000-0000-0000-000000000000"
|
||||
token = "known-token"
|
||||
cache.set(_cache_key(fake_id, SigReservation.LEVITY), token, 60)
|
||||
confirm_polarity_room(fake_id, SigReservation.LEVITY, token)
|
||||
_fire(fake_id, SigReservation.LEVITY, token)
|
||||
|
||||
@patch("apps.epic.tasks._group_send")
|
||||
def test_fire_does_nothing_if_all_sigs_already_assigned(self, mock_send):
|
||||
from apps.epic.tasks import confirm_polarity_room
|
||||
from apps.epic.tasks import _fire
|
||||
token = self._set_token()
|
||||
cards = list(TarotCard.objects.all()[:3])
|
||||
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]))
|
||||
for i, seat in enumerate(seats):
|
||||
seat.significator = cards[i]
|
||||
seat.save(update_fields=["significator"])
|
||||
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
|
||||
_fire(str(self.room.id), SigReservation.LEVITY, token)
|
||||
mock_send.assert_not_called()
|
||||
|
||||
@patch("apps.epic.tasks._group_send")
|
||||
def test_fire_broadcasts_pick_sky_when_all_polarity_sigs_assigned(self, mock_send):
|
||||
"""When both levity AND gravity seats all have significators, fire() triggers SKY_SELECT."""
|
||||
from apps.epic.tasks import confirm_polarity_room
|
||||
from apps.epic.tasks import _fire
|
||||
token = self._set_token()
|
||||
cards = list(TarotCard.objects.all()[:6])
|
||||
# Give gravity seats significators so the all-assigned check passes
|
||||
@@ -150,7 +155,7 @@ class FireFunctionTest(TestCase):
|
||||
room=self.room, gamer=seat.gamer, card=levity_cards[i],
|
||||
polarity=SigReservation.LEVITY, seat=seat, ready=True,
|
||||
)
|
||||
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
|
||||
_fire(str(self.room.id), SigReservation.LEVITY, token)
|
||||
call_types = [c.args[1]["type"] for c in mock_send.call_args_list]
|
||||
self.assertIn("polarity_room_done", call_types)
|
||||
self.assertIn("pick_sky_available", call_types)
|
||||
@@ -165,30 +170,27 @@ class SchedulePolarityConfirmTest(TestCase):
|
||||
|
||||
def test_schedule_sets_cache_token(self):
|
||||
from django.core.cache import cache
|
||||
with patch("apps.epic.tasks.confirm_polarity_room.apply_async"):
|
||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
|
||||
entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY))
|
||||
self.assertIsNotNone(entry)
|
||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
|
||||
token = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY))
|
||||
self.assertIsNotNone(token)
|
||||
|
||||
def test_schedule_enqueues_confirm_task_with_countdown(self):
|
||||
# The deferred confirm is a Celery task fired `seconds` from now (the
|
||||
# worker process broadcasts reliably, unlike the old timer thread).
|
||||
with patch("apps.epic.tasks.confirm_polarity_room.apply_async") as mock_async:
|
||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
|
||||
mock_async.assert_called_once()
|
||||
self.assertEqual(mock_async.call_args.kwargs["countdown"], 12)
|
||||
def test_schedule_registers_timer(self):
|
||||
from apps.epic.tasks import _timers
|
||||
key = f"{self.room.id}_{SigReservation.LEVITY}"
|
||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
|
||||
self.assertIn(key, _timers)
|
||||
_timers[key].cancel() # clean up
|
||||
|
||||
def test_schedule_supersedes_prior_via_fresh_token(self):
|
||||
# No task revocation: a new schedule just writes a fresh token, so the
|
||||
# previously-queued task no-ops on the token guard when it fires.
|
||||
from django.core.cache import cache
|
||||
key = _cache_key(str(self.room.id), SigReservation.LEVITY)
|
||||
with patch("apps.epic.tasks.confirm_polarity_room.apply_async"):
|
||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
|
||||
first = cache.get(key)["token"]
|
||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
|
||||
second = cache.get(key)["token"]
|
||||
self.assertNotEqual(first, second)
|
||||
def test_schedule_cancels_prior_timer_before_scheduling(self):
|
||||
from apps.epic.tasks import _timers
|
||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
|
||||
key = f"{self.room.id}_{SigReservation.LEVITY}"
|
||||
first_timer = _timers.get(key)
|
||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
|
||||
second_timer = _timers.get(key)
|
||||
self.assertIsNotNone(second_timer)
|
||||
self.assertIsNot(first_timer, second_timer)
|
||||
second_timer.cancel()
|
||||
|
||||
|
||||
class CountdownRemainingTest(TestCase):
|
||||
@@ -204,8 +206,7 @@ class CountdownRemainingTest(TestCase):
|
||||
def tearDown(self):
|
||||
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
|
||||
|
||||
@patch("apps.epic.tasks.confirm_polarity_room.apply_async")
|
||||
def test_schedule_stores_deadline_alongside_token(self, _async):
|
||||
def test_schedule_stores_deadline_alongside_token(self):
|
||||
from django.core.cache import cache
|
||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
|
||||
entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY))
|
||||
@@ -213,8 +214,7 @@ class CountdownRemainingTest(TestCase):
|
||||
self.assertIn("token", entry)
|
||||
self.assertIn("deadline", entry)
|
||||
|
||||
@patch("apps.epic.tasks.confirm_polarity_room.apply_async")
|
||||
def test_countdown_remaining_returns_seconds_left(self, _async):
|
||||
def test_countdown_remaining_returns_seconds_left(self):
|
||||
from apps.epic.tasks import countdown_remaining
|
||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
|
||||
remaining = countdown_remaining(str(self.room.id), SigReservation.LEVITY)
|
||||
|
||||
@@ -250,9 +250,3 @@ if 'test' in sys.argv:
|
||||
'BACKEND': 'channels.layers.InMemoryChannelLayer',
|
||||
}
|
||||
}
|
||||
# In-process Kombu broker so Celery `apply_async` (the polarity-confirm
|
||||
# countdown) queues without a live Redis + without running the task (no
|
||||
# worker consumes it) — mirrors the old threading.Timer that was scheduled
|
||||
# but never fired inside a sub-12s test. NOT eager: eager would ignore the
|
||||
# countdown and assign significators synchronously during the ready POST.
|
||||
CELERY_BROKER_URL = 'memory://'
|
||||
|
||||
Reference in New Issue
Block a user