aiohttp

Форк
0
/
background_tasks.py 
71 строка · 2.1 Кб
1
#!/usr/bin/env python3
2
"""Example of aiohttp.web.Application.on_startup signal handler"""
3
import asyncio
4
from typing import List
5

6
import aioredis
7

8
from aiohttp import web
9

10
redis_listener = web.AppKey("redis_listener", asyncio.Task[None])
11
websockets = web.AppKey("websockets", List[web.WebSocketResponse])
12

13

14
async def websocket_handler(request: web.Request) -> web.StreamResponse:
15
    ws = web.WebSocketResponse()
16
    await ws.prepare(request)
17
    request.app[websockets].append(ws)
18
    try:
19
        async for msg in ws:
20
            print(msg)
21
            await asyncio.sleep(1)
22
    finally:
23
        request.app[websockets].remove(ws)
24
    return ws
25

26

27
async def on_shutdown(app: web.Application) -> None:
28
    for ws in app[websockets]:
29
        await ws.close(code=999, message=b"Server shutdown")
30

31

32
async def listen_to_redis(app: web.Application) -> None:
33
    sub = await aioredis.Redis(host="localhost", port=6379)
34
    ch, *_ = await sub.subscribe("news")
35
    try:
36
        async for msg in ch.iter(encoding="utf-8"):
37
            # Forward message to all connected websockets:
38
            for ws in app[websockets]:
39
                await ws.send_str(f"{ch.name}: {msg}")
40
            print(f"message in {ch.name}: {msg}")
41
    except asyncio.CancelledError:
42
        pass
43
    finally:
44
        print("Cancel Redis listener: close connection...")
45
        await sub.unsubscribe(ch.name)
46
        await sub.quit()
47
        print("Redis connection closed.")
48

49

50
async def start_background_tasks(app: web.Application) -> None:
51
    app[redis_listener] = asyncio.create_task(listen_to_redis(app))
52

53

54
async def cleanup_background_tasks(app: web.Application) -> None:
55
    print("cleanup background tasks...")
56
    app[redis_listener].cancel()
57
    await app[redis_listener]
58

59

60
def init() -> web.Application:
61
    app = web.Application()
62
    l: List[web.WebSocketResponse] = []
63
    app[websockets] = l
64
    app.router.add_get("/news", websocket_handler)
65
    app.on_startup.append(start_background_tasks)
66
    app.on_cleanup.append(cleanup_background_tasks)
67
    app.on_shutdown.append(on_shutdown)
68
    return app
69

70

71
web.run_app(init())
72

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.