diff --git a/src/apps/epic/tasks.py b/src/apps/epic/tasks.py index 52bae4f..a66a295 100644 --- a/src/apps/epic/tasks.py +++ b/src/apps/epic/tasks.py @@ -1,35 +1,24 @@ """ Countdown scheduler for the polarity-room SAVE SIG gate. -The 12s confirm runs as a **Celery task** (`apply_async` with a `countdown`) so -the post-countdown significator assignment + the `polarity_room_done` / -`pick_sky_available` broadcasts originate in the long-lived Celery worker -process. The old `threading.Timer` approach broadcast via -`async_to_sync(group_send)` from an ephemeral per-call event loop inside the web -process, which is unreliable with the Redis channel layer — the broadcast -reached the consumer only sporadically (the production analog of the -"broadcast must originate in daphne" test trap), so the tray→hex animation -fired intermittently while the server-side state still committed. The worker is -a stable process whose channel-layer singleton is never shared with a serving -loop, so its `group_send` reaches daphne reliably. - -Cancellation / supersession needs no task revocation: the cache token guard -makes a stale queued task a no-op. `schedule_polarity_confirm` writes a fresh -token (superseding any prior queued task), and `cancel_polarity_confirm` just -deletes it — either way a task that fires later finds a missing/replaced token -and returns without assigning. +Uses threading.Timer so no separate Celery worker is needed in development. +Single-process only — swap for a Celery task if production uses multiple +web workers (gunicorn -w N with N > 1). """ +import threading import time import uuid from asgiref.sync import async_to_sync -from celery import shared_task from channels.layers import get_channel_layer from django.core.cache import cache _LEVITY_ROLES = {'PC', 'NC', 'SC'} _GRAVITY_ROLES = {'BC', 'EC', 'AC'} +# In-process registry of pending timers: "{room_id}_{polarity}" → Timer +_timers = {} + def _cache_key(room_id, polarity): return f'sig_countdown_{room_id}_{polarity}' @@ -39,15 +28,13 @@ def _group_send(room_id, msg): async_to_sync(get_channel_layer().group_send)(f'room_{room_id}', msg) -@shared_task -def confirm_polarity_room(room_id, polarity, token): - """Assign significators + broadcast room-done once a polarity's countdown - elapses. Enqueued by `schedule_polarity_confirm` with a `countdown`; run by - the Celery worker. The cache token guard makes a cancelled or superseded - run a no-op (its token no longer matches what's in the cache).""" +def _fire(room_id, polarity, token): + """Callback run by threading.Timer after the countdown expires.""" + # Token guard: if cancelled or superseded, cache entry will differ. The + # entry is now a {token, deadline} dict (was a bare token string before the + # restore-on-load sprint) — read either shape defensively so a stale plain + # string left by an older deploy doesn't crash the timer callback. entry = cache.get(_cache_key(room_id, polarity)) - # The entry is a {token, deadline} dict; tolerate a bare-string token from - # an older deploy so a stale value can't crash the task. stored_token = entry.get('token') if isinstance(entry, dict) else entry if stored_token != token: return @@ -90,28 +77,35 @@ def confirm_polarity_room(room_id, polarity, token): _group_send(room_id, {'type': 'pick_sky_available'}) cache.delete(_cache_key(room_id, polarity)) + _timers.pop(f'{room_id}_{polarity}', None) def schedule_polarity_confirm(room_id, polarity, seconds): - """Schedule a polarity confirm `seconds` from now. A fresh token supersedes - any prior queued task (which then no-ops on the token guard).""" + """Schedule a polarity confirm `seconds` seconds from now. Cancels any prior timer.""" + cancel_polarity_confirm(room_id, polarity) + token = str(uuid.uuid4()) - # Store the absolute deadline alongside the token so a gamer loading a fresh - # seat view mid-countdown can derive the seconds left (the flashing numeral) - # instead of a static WAIT NVM. See countdown_remaining() + sig-select.js. + # Store the absolute deadline alongside the token so a gamer loading a + # fresh seat view mid-countdown can derive the seconds left (the flashing + # numeral) instead of falling back to a static WAIT NVM. See + # countdown_remaining() + sig-select.js's restore-on-load. cache.set( _cache_key(room_id, polarity), {'token': token, 'deadline': time.time() + seconds}, timeout=int(seconds) + 60, ) - confirm_polarity_room.apply_async( - args=[str(room_id), polarity, token], countdown=int(seconds), - ) + + timer = threading.Timer(seconds, _fire, args=[str(room_id), polarity, token]) + timer.daemon = True + timer.start() + _timers[f'{room_id}_{polarity}'] = timer def cancel_polarity_confirm(room_id, polarity): - """Cancel any pending confirm for this room + polarity. Deleting the token - is enough — the already-queued task finds no matching token and no-ops.""" + """Cancel any pending confirm for this room + polarity.""" + timer = _timers.pop(f'{room_id}_{polarity}', None) + if timer: + timer.cancel() cache.delete(_cache_key(room_id, polarity)) diff --git a/src/apps/epic/tests/unit/test_tasks.py b/src/apps/epic/tests/unit/test_tasks.py index f5f3882..444f6a5 100644 --- a/src/apps/epic/tests/unit/test_tasks.py +++ b/src/apps/epic/tests/unit/test_tasks.py @@ -19,22 +19,27 @@ class CancelPolarityConfirmTest(TestCase): self.user = User.objects.create(email="owner@tasks.io") self.room = Room.objects.create(name="R", owner=self.user) - def test_cancel_with_no_entry_is_a_noop(self): + def test_cancel_with_no_timer_is_a_noop(self): cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) def test_cancel_clears_cache_entry(self): - # Deleting the token is the whole cancel: a queued confirm task that - # fires later finds no matching token and no-ops. from django.core.cache import cache key = _cache_key(str(self.room.id), SigReservation.LEVITY) cache.set(key, "sometoken", timeout=60) cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) self.assertIsNone(cache.get(key)) + @patch("apps.epic.tasks._timers") + def test_cancel_calls_timer_cancel_when_present(self, mock_timers): + mock_timer = MagicMock() + key = f"{self.room.id}_levity" + mock_timers.pop.return_value = mock_timer + cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) + mock_timer.cancel.assert_called_once() + class FireFunctionTest(TestCase): - """Tests for the confirm_polarity_room() Celery task body (called directly, - synchronously, here — the worker runs it via apply_async in production).""" + """Tests for the _fire() callback executed by threading.Timer.""" def setUp(self): self.owner = User.objects.create(email="owner@fire.io") @@ -65,23 +70,23 @@ class FireFunctionTest(TestCase): @patch("apps.epic.tasks._group_send") def test_fire_does_nothing_if_token_mismatch(self, mock_send): - from apps.epic.tasks import confirm_polarity_room + from apps.epic.tasks import _fire self._set_token() - confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, "wrong-token") + _fire(str(self.room.id), SigReservation.LEVITY, "wrong-token") mock_send.assert_not_called() @patch("apps.epic.tasks._group_send") def test_fire_does_nothing_if_room_not_sig_select(self, mock_send): - from apps.epic.tasks import confirm_polarity_room + from apps.epic.tasks import _fire token = self._set_token() self.room.table_status = Room.ROLE_SELECT self.room.save() - confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) + _fire(str(self.room.id), SigReservation.LEVITY, token) mock_send.assert_not_called() @patch("apps.epic.tasks._group_send") def test_fire_does_nothing_if_fewer_than_3_ready(self, mock_send): - from apps.epic.tasks import confirm_polarity_room + from apps.epic.tasks import _fire token = self._set_token() cards = list(TarotCard.objects.all()[:2]) seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC"])) @@ -90,12 +95,12 @@ class FireFunctionTest(TestCase): room=self.room, gamer=seat.gamer, card=cards[i], polarity=SigReservation.LEVITY, seat=seat, ready=True, ) - confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) + _fire(str(self.room.id), SigReservation.LEVITY, token) mock_send.assert_not_called() @patch("apps.epic.tasks._group_send") def test_fire_assigns_significators_and_broadcasts_when_all_ready(self, mock_send): - from apps.epic.tasks import confirm_polarity_room + from apps.epic.tasks import _fire token = self._set_token() cards = list(TarotCard.objects.all()[:3]) seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])) @@ -104,7 +109,7 @@ class FireFunctionTest(TestCase): room=self.room, gamer=seat.gamer, card=cards[i], polarity=SigReservation.LEVITY, seat=seat, ready=True, ) - confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) + _fire(str(self.room.id), SigReservation.LEVITY, token) self.assertTrue(mock_send.called) levity_seats = TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]) for i, seat in enumerate(levity_seats): @@ -112,29 +117,29 @@ class FireFunctionTest(TestCase): self.assertEqual(seat.significator, cards[i]) def test_fire_does_nothing_for_nonexistent_room(self): - from apps.epic.tasks import confirm_polarity_room + from apps.epic.tasks import _fire from django.core.cache import cache fake_id = "00000000-0000-0000-0000-000000000000" token = "known-token" cache.set(_cache_key(fake_id, SigReservation.LEVITY), token, 60) - confirm_polarity_room(fake_id, SigReservation.LEVITY, token) + _fire(fake_id, SigReservation.LEVITY, token) @patch("apps.epic.tasks._group_send") def test_fire_does_nothing_if_all_sigs_already_assigned(self, mock_send): - from apps.epic.tasks import confirm_polarity_room + from apps.epic.tasks import _fire token = self._set_token() cards = list(TarotCard.objects.all()[:3]) seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])) for i, seat in enumerate(seats): seat.significator = cards[i] seat.save(update_fields=["significator"]) - confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) + _fire(str(self.room.id), SigReservation.LEVITY, token) mock_send.assert_not_called() @patch("apps.epic.tasks._group_send") def test_fire_broadcasts_pick_sky_when_all_polarity_sigs_assigned(self, mock_send): """When both levity AND gravity seats all have significators, fire() triggers SKY_SELECT.""" - from apps.epic.tasks import confirm_polarity_room + from apps.epic.tasks import _fire token = self._set_token() cards = list(TarotCard.objects.all()[:6]) # Give gravity seats significators so the all-assigned check passes @@ -150,7 +155,7 @@ class FireFunctionTest(TestCase): room=self.room, gamer=seat.gamer, card=levity_cards[i], polarity=SigReservation.LEVITY, seat=seat, ready=True, ) - confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token) + _fire(str(self.room.id), SigReservation.LEVITY, token) call_types = [c.args[1]["type"] for c in mock_send.call_args_list] self.assertIn("polarity_room_done", call_types) self.assertIn("pick_sky_available", call_types) @@ -165,30 +170,27 @@ class SchedulePolarityConfirmTest(TestCase): def test_schedule_sets_cache_token(self): from django.core.cache import cache - with patch("apps.epic.tasks.confirm_polarity_room.apply_async"): - schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) - entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY)) - self.assertIsNotNone(entry) + schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) + token = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY)) + self.assertIsNotNone(token) - def test_schedule_enqueues_confirm_task_with_countdown(self): - # The deferred confirm is a Celery task fired `seconds` from now (the - # worker process broadcasts reliably, unlike the old timer thread). - with patch("apps.epic.tasks.confirm_polarity_room.apply_async") as mock_async: - schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) - mock_async.assert_called_once() - self.assertEqual(mock_async.call_args.kwargs["countdown"], 12) + def test_schedule_registers_timer(self): + from apps.epic.tasks import _timers + key = f"{self.room.id}_{SigReservation.LEVITY}" + schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) + self.assertIn(key, _timers) + _timers[key].cancel() # clean up - def test_schedule_supersedes_prior_via_fresh_token(self): - # No task revocation: a new schedule just writes a fresh token, so the - # previously-queued task no-ops on the token guard when it fires. - from django.core.cache import cache - key = _cache_key(str(self.room.id), SigReservation.LEVITY) - with patch("apps.epic.tasks.confirm_polarity_room.apply_async"): - schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) - first = cache.get(key)["token"] - schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) - second = cache.get(key)["token"] - self.assertNotEqual(first, second) + def test_schedule_cancels_prior_timer_before_scheduling(self): + from apps.epic.tasks import _timers + schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) + key = f"{self.room.id}_{SigReservation.LEVITY}" + first_timer = _timers.get(key) + schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60) + second_timer = _timers.get(key) + self.assertIsNotNone(second_timer) + self.assertIsNot(first_timer, second_timer) + second_timer.cancel() class CountdownRemainingTest(TestCase): @@ -204,8 +206,7 @@ class CountdownRemainingTest(TestCase): def tearDown(self): cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY) - @patch("apps.epic.tasks.confirm_polarity_room.apply_async") - def test_schedule_stores_deadline_alongside_token(self, _async): + def test_schedule_stores_deadline_alongside_token(self): from django.core.cache import cache schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY)) @@ -213,8 +214,7 @@ class CountdownRemainingTest(TestCase): self.assertIn("token", entry) self.assertIn("deadline", entry) - @patch("apps.epic.tasks.confirm_polarity_room.apply_async") - def test_countdown_remaining_returns_seconds_left(self, _async): + def test_countdown_remaining_returns_seconds_left(self): from apps.epic.tasks import countdown_remaining schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12) remaining = countdown_remaining(str(self.room.id), SigReservation.LEVITY) diff --git a/src/core/settings.py b/src/core/settings.py index a21287f..d837853 100644 --- a/src/core/settings.py +++ b/src/core/settings.py @@ -250,9 +250,3 @@ if 'test' in sys.argv: 'BACKEND': 'channels.layers.InMemoryChannelLayer', } } - # In-process Kombu broker so Celery `apply_async` (the polarity-confirm - # countdown) queues without a live Redis + without running the task (no - # worker consumes it) — mirrors the old threading.Timer that was scheduled - # but never fired inside a sub-12s test. NOT eager: eager would ignore the - # countdown and assign significators synchronously during the ready POST. - CELERY_BROKER_URL = 'memory://'