2
"""Example of aiohttp.web.Application.on_startup signal handler"""
10
redis_listener = web.AppKey("redis_listener", asyncio.Task[None])
11
websockets = web.AppKey("websockets", List[web.WebSocketResponse])
14
async def websocket_handler(request: web.Request) -> web.StreamResponse:
15
ws = web.WebSocketResponse()
16
await ws.prepare(request)
17
request.app[websockets].append(ws)
21
await asyncio.sleep(1)
23
request.app[websockets].remove(ws)
27
async def on_shutdown(app: web.Application) -> None:
28
for ws in app[websockets]:
29
await ws.close(code=999, message=b"Server shutdown")
32
async def listen_to_redis(app: web.Application) -> None:
33
sub = await aioredis.Redis(host="localhost", port=6379)
34
ch, *_ = await sub.subscribe("news")
36
async for msg in ch.iter(encoding="utf-8"):
38
for ws in app[websockets]:
39
await ws.send_str(f"{ch.name}: {msg}")
40
print(f"message in {ch.name}: {msg}")
41
except asyncio.CancelledError:
44
print("Cancel Redis listener: close connection...")
45
await sub.unsubscribe(ch.name)
47
print("Redis connection closed.")
50
async def start_background_tasks(app: web.Application) -> None:
51
app[redis_listener] = asyncio.create_task(listen_to_redis(app))
54
async def cleanup_background_tasks(app: web.Application) -> None:
55
print("cleanup background tasks...")
56
app[redis_listener].cancel()
57
await app[redis_listener]
60
def init() -> web.Application:
61
app = web.Application()
62
l: List[web.WebSocketResponse] = []
64
app.router.add_get("/news", websocket_handler)
65
app.on_startup.append(start_background_tasks)
66
app.on_cleanup.append(cleanup_background_tasks)
67
app.on_shutdown.append(on_shutdown)