lavkach3

Форк
0
/
initializator.py 
108 строк · 2.8 Кб
1
import uuid
2
from typing import Awaitable, Callable
3

4
from fastapi import FastAPI, Request
5
from starlette.requests import HTTPConnection
6
from taskiq import AsyncBroker, TaskiqEvents, TaskiqState
7
from taskiq.cli.utils import import_object
8

9
#from core.fastapi.schemas import CurrentUser
10
from core.service_config import config
11

12
def startup_event_generator(
13
    broker: AsyncBroker,
14
    app_path: str,
15
) -> Callable[[TaskiqState], Awaitable[None]]:
16
    """
17
    Generate shutdown event.
18

19
    This function takes FastAPI application path
20
    and runs startup event on broker's startup.
21

22
    :param broker: current broker.
23
    :param app_path: fastapi application path.
24
    :returns: startup handler.
25
    """
26

27
    async def startup(state: TaskiqState) -> None:
28
        if not broker.is_worker_process:
29
            return
30
        env = import_object(app_path)
31
        state.env = env
32
        populate_dependency_context(broker, env)
33

34
    return startup
35

36

37
def shutdown_event_generator(
38
    broker: AsyncBroker,
39
) -> Callable[[TaskiqState], Awaitable[None]]:
40
    """
41
    Generate shutdown event.
42

43
    This function takes FastAPI application
44
    and runs shutdown event on broker's shutdown.
45

46
    :param broker: current broker.
47
    :return: shutdown event handler.
48
    """
49

50
    async def shutdown(state: TaskiqState) -> None:
51
        if not broker.is_worker_process:
52
            return
53
        await state.fastapi_app.router.shutdown()
54
        await state.lf_ctx.__aexit__(None, None, None)
55

56
    return shutdown
57

58

59
def init(broker: AsyncBroker, app_path: str) -> None:
60
    """
61
    Add taskiq startup events.
62

63
    This is the main function to integrate FastAPI
64
    with taskiq.
65

66
    It creates startup events for broker. So
67
    in worker processes all fastapi
68
    startup events will run.
69

70
    :param broker: current broker to use.
71
    :param app_path: path to fastapi application.
72
    """
73
    broker.add_event_handler(
74
        TaskiqEvents.WORKER_STARTUP,
75
        startup_event_generator(broker, app_path),
76
    )
77

78
    broker.add_event_handler(
79
        TaskiqEvents.WORKER_SHUTDOWN,
80
        shutdown_event_generator(broker),
81
    )
82

83

84
def populate_dependency_context(broker: AsyncBroker, app: FastAPI) -> None:
85
    """
86
    Создает пустышку Request для Taskiq.
87
    """
88
    scope = {
89
    'type' : 'http',
90
    'http_version' : '1.1',
91
    'scheme' : 'http',
92
    'root_path' : '',
93
    'headers' : [(b'Authorization', f'{config.INTERCO_TOKEN}'),],
94
    'state' : {},
95
    'method' : 'GET',
96
    'path' : '/',
97
    'raw_path' : b'/',
98
    'query_string' : b'',
99
    'app' : app,
100
    #'user': CurrentUser(id=uuid.uuid4(), is_admin=True)
101
    }
102
    request = Request(scope=scope)
103
    #scope['env'] = app.extra['env']['cls'](app.extra['env']['domains'], request, broker)
104
    broker.add_dependency_context(
105
        {
106
            'request': request,
107
        },
108
    )
109

110

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.