Sig countdown: run the post-countdown confirm as a Celery task + auto-reconnect the room WS — TDD
The flaky tray→thumbnail→hex animation after the 12s countdown. Root cause: the confirm ran in a threading.Timer thread inside the web process, and _fire broadcast polarity_room_done / pick_sky_available via async_to_sync(group_send) from an ephemeral per-call event loop. With the Redis channel layer that publish is unreliable across loops (the production analog of the "broadcast must originate in daphne" test trap), so the live events reached the client only sporadically — the server-side state (sig assignment, SKY_SELECT) still committed, which is why a refresh always showed the concluded hex but the animation usually didn't play. Fix (chosen: migrate to Celery — the path the tasks.py docstring already called for): _fire becomes the @shared_task confirm_polarity_room, enqueued by schedule_polarity_confirm via apply_async(countdown=seconds). The worker is a stable long-lived process whose channel-layer singleton is never shared with a serving loop, so its group_send reaches daphne reliably; it also survives web-worker restarts. No task revocation needed — cancellation/supersession ride the existing cache token guard (cancel just deletes the token; a stale queued task no-ops). Dropped threading.Timer + the _timers registry. Test settings get CELERY_BROKER_URL='memory://' so apply_async queues without a live Redis and without running the task (no worker) — mirrors the old timer that was scheduled but never fired inside a sub-12s test. NOT eager: eager would ignore the countdown and assign significators synchronously during the ready POST. test_tasks rewritten: confirm_polarity_room called directly (task body), schedule asserts the enqueue + countdown + fresh-token supersession; the broadcast itself stays IT-uncoverable under InMemory (known channels limit). Also: room.js now auto-reconnects the room WebSocket with capped exponential backoff (1s→30s, reset on open, halted on beforeunload). A dropped socket (proxy idle-timeout, blip, server restart) previously stayed dead until a manual refresh, silently losing every live event — an independent reliability gap that compounded the "sporadic" feel. 602 epic ITs + 18 task UTs green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -117,17 +117,39 @@
|
|||||||
// multi-seat owner never receives the other polarity's countdown.
|
// multi-seat owner never receives the other polarity's countdown.
|
||||||
const seatParam = new URLSearchParams(window.location.search).get('seat');
|
const seatParam = new URLSearchParams(window.location.search).get('seat');
|
||||||
const wsSeat = seatParam ? `?seat=${encodeURIComponent(seatParam)}` : '';
|
const wsSeat = seatParam ? `?seat=${encodeURIComponent(seatParam)}` : '';
|
||||||
const ws = new WebSocket(`${wsScheme}://${window.location.host}/ws/room/${roomId}/${wsSeat}`);
|
const wsUrl = `${wsScheme}://${window.location.host}/ws/room/${roomId}/${wsSeat}`;
|
||||||
window._roomSocket = ws; // exposed for sig-select.js hover broadcast
|
|
||||||
|
|
||||||
ws.onmessage = function (event) {
|
// Auto-reconnect with capped exponential backoff. A dropped socket (proxy
|
||||||
const data = JSON.parse(event.data);
|
// idle-timeout, brief network blip, server restart) otherwise stayed dead
|
||||||
window.dispatchEvent(new CustomEvent('room:' + data.type, { detail: data }));
|
// until a manual refresh, silently losing live events — most painfully the
|
||||||
};
|
// 12s sig countdown's polarity_room_done / pick_sky_available, so the
|
||||||
|
// tray→hex animation never played. Backoff resets on a clean open; we stop
|
||||||
|
// retrying once the page is unloading so we don't reconnect during nav.
|
||||||
|
let backoff = 1000;
|
||||||
|
const BACKOFF_MAX = 30000;
|
||||||
|
let unloading = false;
|
||||||
|
window.addEventListener('beforeunload', function () { unloading = true; });
|
||||||
|
|
||||||
ws.onclose = function (event) {
|
function connect() {
|
||||||
if (!event.wasClean) {
|
const ws = new WebSocket(wsUrl);
|
||||||
console.warn('Room WebSocket closed unexpectedly');
|
window._roomSocket = ws; // exposed for sig-select.js hover broadcast
|
||||||
}
|
|
||||||
};
|
ws.onopen = function () { backoff = 1000; };
|
||||||
|
|
||||||
|
ws.onmessage = function (event) {
|
||||||
|
const data = JSON.parse(event.data);
|
||||||
|
window.dispatchEvent(new CustomEvent('room:' + data.type, { detail: data }));
|
||||||
|
};
|
||||||
|
|
||||||
|
ws.onclose = function (event) {
|
||||||
|
if (unloading) return;
|
||||||
|
if (!event.wasClean) {
|
||||||
|
console.warn('Room WebSocket closed unexpectedly — reconnecting in ' + backoff + 'ms');
|
||||||
|
}
|
||||||
|
setTimeout(connect, backoff);
|
||||||
|
backoff = Math.min(backoff * 2, BACKOFF_MAX);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
connect();
|
||||||
}());
|
}());
|
||||||
|
|||||||
@@ -1,24 +1,35 @@
|
|||||||
"""
|
"""
|
||||||
Countdown scheduler for the polarity-room SAVE SIG gate.
|
Countdown scheduler for the polarity-room SAVE SIG gate.
|
||||||
|
|
||||||
Uses threading.Timer so no separate Celery worker is needed in development.
|
The 12s confirm runs as a **Celery task** (`apply_async` with a `countdown`) so
|
||||||
Single-process only — swap for a Celery task if production uses multiple
|
the post-countdown significator assignment + the `polarity_room_done` /
|
||||||
web workers (gunicorn -w N with N > 1).
|
`pick_sky_available` broadcasts originate in the long-lived Celery worker
|
||||||
|
process. The old `threading.Timer` approach broadcast via
|
||||||
|
`async_to_sync(group_send)` from an ephemeral per-call event loop inside the web
|
||||||
|
process, which is unreliable with the Redis channel layer — the broadcast
|
||||||
|
reached the consumer only sporadically (the production analog of the
|
||||||
|
"broadcast must originate in daphne" test trap), so the tray→hex animation
|
||||||
|
fired intermittently while the server-side state still committed. The worker is
|
||||||
|
a stable process whose channel-layer singleton is never shared with a serving
|
||||||
|
loop, so its `group_send` reaches daphne reliably.
|
||||||
|
|
||||||
|
Cancellation / supersession needs no task revocation: the cache token guard
|
||||||
|
makes a stale queued task a no-op. `schedule_polarity_confirm` writes a fresh
|
||||||
|
token (superseding any prior queued task), and `cancel_polarity_confirm` just
|
||||||
|
deletes it — either way a task that fires later finds a missing/replaced token
|
||||||
|
and returns without assigning.
|
||||||
"""
|
"""
|
||||||
import threading
|
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from asgiref.sync import async_to_sync
|
from asgiref.sync import async_to_sync
|
||||||
|
from celery import shared_task
|
||||||
from channels.layers import get_channel_layer
|
from channels.layers import get_channel_layer
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
|
|
||||||
_LEVITY_ROLES = {'PC', 'NC', 'SC'}
|
_LEVITY_ROLES = {'PC', 'NC', 'SC'}
|
||||||
_GRAVITY_ROLES = {'BC', 'EC', 'AC'}
|
_GRAVITY_ROLES = {'BC', 'EC', 'AC'}
|
||||||
|
|
||||||
# In-process registry of pending timers: "{room_id}_{polarity}" → Timer
|
|
||||||
_timers = {}
|
|
||||||
|
|
||||||
|
|
||||||
def _cache_key(room_id, polarity):
|
def _cache_key(room_id, polarity):
|
||||||
return f'sig_countdown_{room_id}_{polarity}'
|
return f'sig_countdown_{room_id}_{polarity}'
|
||||||
@@ -28,13 +39,15 @@ def _group_send(room_id, msg):
|
|||||||
async_to_sync(get_channel_layer().group_send)(f'room_{room_id}', msg)
|
async_to_sync(get_channel_layer().group_send)(f'room_{room_id}', msg)
|
||||||
|
|
||||||
|
|
||||||
def _fire(room_id, polarity, token):
|
@shared_task
|
||||||
"""Callback run by threading.Timer after the countdown expires."""
|
def confirm_polarity_room(room_id, polarity, token):
|
||||||
# Token guard: if cancelled or superseded, cache entry will differ. The
|
"""Assign significators + broadcast room-done once a polarity's countdown
|
||||||
# entry is now a {token, deadline} dict (was a bare token string before the
|
elapses. Enqueued by `schedule_polarity_confirm` with a `countdown`; run by
|
||||||
# restore-on-load sprint) — read either shape defensively so a stale plain
|
the Celery worker. The cache token guard makes a cancelled or superseded
|
||||||
# string left by an older deploy doesn't crash the timer callback.
|
run a no-op (its token no longer matches what's in the cache)."""
|
||||||
entry = cache.get(_cache_key(room_id, polarity))
|
entry = cache.get(_cache_key(room_id, polarity))
|
||||||
|
# The entry is a {token, deadline} dict; tolerate a bare-string token from
|
||||||
|
# an older deploy so a stale value can't crash the task.
|
||||||
stored_token = entry.get('token') if isinstance(entry, dict) else entry
|
stored_token = entry.get('token') if isinstance(entry, dict) else entry
|
||||||
if stored_token != token:
|
if stored_token != token:
|
||||||
return
|
return
|
||||||
@@ -77,35 +90,28 @@ def _fire(room_id, polarity, token):
|
|||||||
_group_send(room_id, {'type': 'pick_sky_available'})
|
_group_send(room_id, {'type': 'pick_sky_available'})
|
||||||
|
|
||||||
cache.delete(_cache_key(room_id, polarity))
|
cache.delete(_cache_key(room_id, polarity))
|
||||||
_timers.pop(f'{room_id}_{polarity}', None)
|
|
||||||
|
|
||||||
|
|
||||||
def schedule_polarity_confirm(room_id, polarity, seconds):
|
def schedule_polarity_confirm(room_id, polarity, seconds):
|
||||||
"""Schedule a polarity confirm `seconds` seconds from now. Cancels any prior timer."""
|
"""Schedule a polarity confirm `seconds` from now. A fresh token supersedes
|
||||||
cancel_polarity_confirm(room_id, polarity)
|
any prior queued task (which then no-ops on the token guard)."""
|
||||||
|
|
||||||
token = str(uuid.uuid4())
|
token = str(uuid.uuid4())
|
||||||
# Store the absolute deadline alongside the token so a gamer loading a
|
# Store the absolute deadline alongside the token so a gamer loading a fresh
|
||||||
# fresh seat view mid-countdown can derive the seconds left (the flashing
|
# seat view mid-countdown can derive the seconds left (the flashing numeral)
|
||||||
# numeral) instead of falling back to a static WAIT NVM. See
|
# instead of a static WAIT NVM. See countdown_remaining() + sig-select.js.
|
||||||
# countdown_remaining() + sig-select.js's restore-on-load.
|
|
||||||
cache.set(
|
cache.set(
|
||||||
_cache_key(room_id, polarity),
|
_cache_key(room_id, polarity),
|
||||||
{'token': token, 'deadline': time.time() + seconds},
|
{'token': token, 'deadline': time.time() + seconds},
|
||||||
timeout=int(seconds) + 60,
|
timeout=int(seconds) + 60,
|
||||||
)
|
)
|
||||||
|
confirm_polarity_room.apply_async(
|
||||||
timer = threading.Timer(seconds, _fire, args=[str(room_id), polarity, token])
|
args=[str(room_id), polarity, token], countdown=int(seconds),
|
||||||
timer.daemon = True
|
)
|
||||||
timer.start()
|
|
||||||
_timers[f'{room_id}_{polarity}'] = timer
|
|
||||||
|
|
||||||
|
|
||||||
def cancel_polarity_confirm(room_id, polarity):
|
def cancel_polarity_confirm(room_id, polarity):
|
||||||
"""Cancel any pending confirm for this room + polarity."""
|
"""Cancel any pending confirm for this room + polarity. Deleting the token
|
||||||
timer = _timers.pop(f'{room_id}_{polarity}', None)
|
is enough — the already-queued task finds no matching token and no-ops."""
|
||||||
if timer:
|
|
||||||
timer.cancel()
|
|
||||||
cache.delete(_cache_key(room_id, polarity))
|
cache.delete(_cache_key(room_id, polarity))
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -19,27 +19,22 @@ class CancelPolarityConfirmTest(TestCase):
|
|||||||
self.user = User.objects.create(email="owner@tasks.io")
|
self.user = User.objects.create(email="owner@tasks.io")
|
||||||
self.room = Room.objects.create(name="R", owner=self.user)
|
self.room = Room.objects.create(name="R", owner=self.user)
|
||||||
|
|
||||||
def test_cancel_with_no_timer_is_a_noop(self):
|
def test_cancel_with_no_entry_is_a_noop(self):
|
||||||
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
|
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
|
||||||
|
|
||||||
def test_cancel_clears_cache_entry(self):
|
def test_cancel_clears_cache_entry(self):
|
||||||
|
# Deleting the token is the whole cancel: a queued confirm task that
|
||||||
|
# fires later finds no matching token and no-ops.
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
key = _cache_key(str(self.room.id), SigReservation.LEVITY)
|
key = _cache_key(str(self.room.id), SigReservation.LEVITY)
|
||||||
cache.set(key, "sometoken", timeout=60)
|
cache.set(key, "sometoken", timeout=60)
|
||||||
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
|
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
|
||||||
self.assertIsNone(cache.get(key))
|
self.assertIsNone(cache.get(key))
|
||||||
|
|
||||||
@patch("apps.epic.tasks._timers")
|
|
||||||
def test_cancel_calls_timer_cancel_when_present(self, mock_timers):
|
|
||||||
mock_timer = MagicMock()
|
|
||||||
key = f"{self.room.id}_levity"
|
|
||||||
mock_timers.pop.return_value = mock_timer
|
|
||||||
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
|
|
||||||
mock_timer.cancel.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
class FireFunctionTest(TestCase):
|
class FireFunctionTest(TestCase):
|
||||||
"""Tests for the _fire() callback executed by threading.Timer."""
|
"""Tests for the confirm_polarity_room() Celery task body (called directly,
|
||||||
|
synchronously, here — the worker runs it via apply_async in production)."""
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.owner = User.objects.create(email="owner@fire.io")
|
self.owner = User.objects.create(email="owner@fire.io")
|
||||||
@@ -70,23 +65,23 @@ class FireFunctionTest(TestCase):
|
|||||||
|
|
||||||
@patch("apps.epic.tasks._group_send")
|
@patch("apps.epic.tasks._group_send")
|
||||||
def test_fire_does_nothing_if_token_mismatch(self, mock_send):
|
def test_fire_does_nothing_if_token_mismatch(self, mock_send):
|
||||||
from apps.epic.tasks import _fire
|
from apps.epic.tasks import confirm_polarity_room
|
||||||
self._set_token()
|
self._set_token()
|
||||||
_fire(str(self.room.id), SigReservation.LEVITY, "wrong-token")
|
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, "wrong-token")
|
||||||
mock_send.assert_not_called()
|
mock_send.assert_not_called()
|
||||||
|
|
||||||
@patch("apps.epic.tasks._group_send")
|
@patch("apps.epic.tasks._group_send")
|
||||||
def test_fire_does_nothing_if_room_not_sig_select(self, mock_send):
|
def test_fire_does_nothing_if_room_not_sig_select(self, mock_send):
|
||||||
from apps.epic.tasks import _fire
|
from apps.epic.tasks import confirm_polarity_room
|
||||||
token = self._set_token()
|
token = self._set_token()
|
||||||
self.room.table_status = Room.ROLE_SELECT
|
self.room.table_status = Room.ROLE_SELECT
|
||||||
self.room.save()
|
self.room.save()
|
||||||
_fire(str(self.room.id), SigReservation.LEVITY, token)
|
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
|
||||||
mock_send.assert_not_called()
|
mock_send.assert_not_called()
|
||||||
|
|
||||||
@patch("apps.epic.tasks._group_send")
|
@patch("apps.epic.tasks._group_send")
|
||||||
def test_fire_does_nothing_if_fewer_than_3_ready(self, mock_send):
|
def test_fire_does_nothing_if_fewer_than_3_ready(self, mock_send):
|
||||||
from apps.epic.tasks import _fire
|
from apps.epic.tasks import confirm_polarity_room
|
||||||
token = self._set_token()
|
token = self._set_token()
|
||||||
cards = list(TarotCard.objects.all()[:2])
|
cards = list(TarotCard.objects.all()[:2])
|
||||||
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC"]))
|
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC"]))
|
||||||
@@ -95,12 +90,12 @@ class FireFunctionTest(TestCase):
|
|||||||
room=self.room, gamer=seat.gamer, card=cards[i],
|
room=self.room, gamer=seat.gamer, card=cards[i],
|
||||||
polarity=SigReservation.LEVITY, seat=seat, ready=True,
|
polarity=SigReservation.LEVITY, seat=seat, ready=True,
|
||||||
)
|
)
|
||||||
_fire(str(self.room.id), SigReservation.LEVITY, token)
|
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
|
||||||
mock_send.assert_not_called()
|
mock_send.assert_not_called()
|
||||||
|
|
||||||
@patch("apps.epic.tasks._group_send")
|
@patch("apps.epic.tasks._group_send")
|
||||||
def test_fire_assigns_significators_and_broadcasts_when_all_ready(self, mock_send):
|
def test_fire_assigns_significators_and_broadcasts_when_all_ready(self, mock_send):
|
||||||
from apps.epic.tasks import _fire
|
from apps.epic.tasks import confirm_polarity_room
|
||||||
token = self._set_token()
|
token = self._set_token()
|
||||||
cards = list(TarotCard.objects.all()[:3])
|
cards = list(TarotCard.objects.all()[:3])
|
||||||
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]))
|
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]))
|
||||||
@@ -109,7 +104,7 @@ class FireFunctionTest(TestCase):
|
|||||||
room=self.room, gamer=seat.gamer, card=cards[i],
|
room=self.room, gamer=seat.gamer, card=cards[i],
|
||||||
polarity=SigReservation.LEVITY, seat=seat, ready=True,
|
polarity=SigReservation.LEVITY, seat=seat, ready=True,
|
||||||
)
|
)
|
||||||
_fire(str(self.room.id), SigReservation.LEVITY, token)
|
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
|
||||||
self.assertTrue(mock_send.called)
|
self.assertTrue(mock_send.called)
|
||||||
levity_seats = TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])
|
levity_seats = TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"])
|
||||||
for i, seat in enumerate(levity_seats):
|
for i, seat in enumerate(levity_seats):
|
||||||
@@ -117,29 +112,29 @@ class FireFunctionTest(TestCase):
|
|||||||
self.assertEqual(seat.significator, cards[i])
|
self.assertEqual(seat.significator, cards[i])
|
||||||
|
|
||||||
def test_fire_does_nothing_for_nonexistent_room(self):
|
def test_fire_does_nothing_for_nonexistent_room(self):
|
||||||
from apps.epic.tasks import _fire
|
from apps.epic.tasks import confirm_polarity_room
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
fake_id = "00000000-0000-0000-0000-000000000000"
|
fake_id = "00000000-0000-0000-0000-000000000000"
|
||||||
token = "known-token"
|
token = "known-token"
|
||||||
cache.set(_cache_key(fake_id, SigReservation.LEVITY), token, 60)
|
cache.set(_cache_key(fake_id, SigReservation.LEVITY), token, 60)
|
||||||
_fire(fake_id, SigReservation.LEVITY, token)
|
confirm_polarity_room(fake_id, SigReservation.LEVITY, token)
|
||||||
|
|
||||||
@patch("apps.epic.tasks._group_send")
|
@patch("apps.epic.tasks._group_send")
|
||||||
def test_fire_does_nothing_if_all_sigs_already_assigned(self, mock_send):
|
def test_fire_does_nothing_if_all_sigs_already_assigned(self, mock_send):
|
||||||
from apps.epic.tasks import _fire
|
from apps.epic.tasks import confirm_polarity_room
|
||||||
token = self._set_token()
|
token = self._set_token()
|
||||||
cards = list(TarotCard.objects.all()[:3])
|
cards = list(TarotCard.objects.all()[:3])
|
||||||
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]))
|
seats = list(TableSeat.objects.filter(room=self.room, role__in=["PC", "NC", "SC"]))
|
||||||
for i, seat in enumerate(seats):
|
for i, seat in enumerate(seats):
|
||||||
seat.significator = cards[i]
|
seat.significator = cards[i]
|
||||||
seat.save(update_fields=["significator"])
|
seat.save(update_fields=["significator"])
|
||||||
_fire(str(self.room.id), SigReservation.LEVITY, token)
|
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
|
||||||
mock_send.assert_not_called()
|
mock_send.assert_not_called()
|
||||||
|
|
||||||
@patch("apps.epic.tasks._group_send")
|
@patch("apps.epic.tasks._group_send")
|
||||||
def test_fire_broadcasts_pick_sky_when_all_polarity_sigs_assigned(self, mock_send):
|
def test_fire_broadcasts_pick_sky_when_all_polarity_sigs_assigned(self, mock_send):
|
||||||
"""When both levity AND gravity seats all have significators, fire() triggers SKY_SELECT."""
|
"""When both levity AND gravity seats all have significators, fire() triggers SKY_SELECT."""
|
||||||
from apps.epic.tasks import _fire
|
from apps.epic.tasks import confirm_polarity_room
|
||||||
token = self._set_token()
|
token = self._set_token()
|
||||||
cards = list(TarotCard.objects.all()[:6])
|
cards = list(TarotCard.objects.all()[:6])
|
||||||
# Give gravity seats significators so the all-assigned check passes
|
# Give gravity seats significators so the all-assigned check passes
|
||||||
@@ -155,7 +150,7 @@ class FireFunctionTest(TestCase):
|
|||||||
room=self.room, gamer=seat.gamer, card=levity_cards[i],
|
room=self.room, gamer=seat.gamer, card=levity_cards[i],
|
||||||
polarity=SigReservation.LEVITY, seat=seat, ready=True,
|
polarity=SigReservation.LEVITY, seat=seat, ready=True,
|
||||||
)
|
)
|
||||||
_fire(str(self.room.id), SigReservation.LEVITY, token)
|
confirm_polarity_room(str(self.room.id), SigReservation.LEVITY, token)
|
||||||
call_types = [c.args[1]["type"] for c in mock_send.call_args_list]
|
call_types = [c.args[1]["type"] for c in mock_send.call_args_list]
|
||||||
self.assertIn("polarity_room_done", call_types)
|
self.assertIn("polarity_room_done", call_types)
|
||||||
self.assertIn("pick_sky_available", call_types)
|
self.assertIn("pick_sky_available", call_types)
|
||||||
@@ -170,27 +165,30 @@ class SchedulePolarityConfirmTest(TestCase):
|
|||||||
|
|
||||||
def test_schedule_sets_cache_token(self):
|
def test_schedule_sets_cache_token(self):
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
|
with patch("apps.epic.tasks.confirm_polarity_room.apply_async"):
|
||||||
token = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY))
|
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
|
||||||
self.assertIsNotNone(token)
|
entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY))
|
||||||
|
self.assertIsNotNone(entry)
|
||||||
|
|
||||||
def test_schedule_registers_timer(self):
|
def test_schedule_enqueues_confirm_task_with_countdown(self):
|
||||||
from apps.epic.tasks import _timers
|
# The deferred confirm is a Celery task fired `seconds` from now (the
|
||||||
key = f"{self.room.id}_{SigReservation.LEVITY}"
|
# worker process broadcasts reliably, unlike the old timer thread).
|
||||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
|
with patch("apps.epic.tasks.confirm_polarity_room.apply_async") as mock_async:
|
||||||
self.assertIn(key, _timers)
|
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
|
||||||
_timers[key].cancel() # clean up
|
mock_async.assert_called_once()
|
||||||
|
self.assertEqual(mock_async.call_args.kwargs["countdown"], 12)
|
||||||
|
|
||||||
def test_schedule_cancels_prior_timer_before_scheduling(self):
|
def test_schedule_supersedes_prior_via_fresh_token(self):
|
||||||
from apps.epic.tasks import _timers
|
# No task revocation: a new schedule just writes a fresh token, so the
|
||||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
|
# previously-queued task no-ops on the token guard when it fires.
|
||||||
key = f"{self.room.id}_{SigReservation.LEVITY}"
|
from django.core.cache import cache
|
||||||
first_timer = _timers.get(key)
|
key = _cache_key(str(self.room.id), SigReservation.LEVITY)
|
||||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 60)
|
with patch("apps.epic.tasks.confirm_polarity_room.apply_async"):
|
||||||
second_timer = _timers.get(key)
|
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
|
||||||
self.assertIsNotNone(second_timer)
|
first = cache.get(key)["token"]
|
||||||
self.assertIsNot(first_timer, second_timer)
|
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
|
||||||
second_timer.cancel()
|
second = cache.get(key)["token"]
|
||||||
|
self.assertNotEqual(first, second)
|
||||||
|
|
||||||
|
|
||||||
class CountdownRemainingTest(TestCase):
|
class CountdownRemainingTest(TestCase):
|
||||||
@@ -206,7 +204,8 @@ class CountdownRemainingTest(TestCase):
|
|||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
|
cancel_polarity_confirm(str(self.room.id), SigReservation.LEVITY)
|
||||||
|
|
||||||
def test_schedule_stores_deadline_alongside_token(self):
|
@patch("apps.epic.tasks.confirm_polarity_room.apply_async")
|
||||||
|
def test_schedule_stores_deadline_alongside_token(self, _async):
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
|
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
|
||||||
entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY))
|
entry = cache.get(_cache_key(str(self.room.id), SigReservation.LEVITY))
|
||||||
@@ -214,7 +213,8 @@ class CountdownRemainingTest(TestCase):
|
|||||||
self.assertIn("token", entry)
|
self.assertIn("token", entry)
|
||||||
self.assertIn("deadline", entry)
|
self.assertIn("deadline", entry)
|
||||||
|
|
||||||
def test_countdown_remaining_returns_seconds_left(self):
|
@patch("apps.epic.tasks.confirm_polarity_room.apply_async")
|
||||||
|
def test_countdown_remaining_returns_seconds_left(self, _async):
|
||||||
from apps.epic.tasks import countdown_remaining
|
from apps.epic.tasks import countdown_remaining
|
||||||
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
|
schedule_polarity_confirm(str(self.room.id), SigReservation.LEVITY, 12)
|
||||||
remaining = countdown_remaining(str(self.room.id), SigReservation.LEVITY)
|
remaining = countdown_remaining(str(self.room.id), SigReservation.LEVITY)
|
||||||
|
|||||||
@@ -250,3 +250,9 @@ if 'test' in sys.argv:
|
|||||||
'BACKEND': 'channels.layers.InMemoryChannelLayer',
|
'BACKEND': 'channels.layers.InMemoryChannelLayer',
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
# In-process Kombu broker so Celery `apply_async` (the polarity-confirm
|
||||||
|
# countdown) queues without a live Redis + without running the task (no
|
||||||
|
# worker consumes it) — mirrors the old threading.Timer that was scheduled
|
||||||
|
# but never fired inside a sub-12s test. NOT eager: eager would ignore the
|
||||||
|
# countdown and assign significators synchronously during the ready POST.
|
||||||
|
CELERY_BROKER_URL = 'memory://'
|
||||||
|
|||||||
Reference in New Issue
Block a user