2
from typing import Awaitable, Callable
4
from fastapi import FastAPI, Request
5
from starlette.requests import HTTPConnection
6
from taskiq import AsyncBroker, TaskiqEvents, TaskiqState
7
from taskiq.cli.utils import import_object
10
from core.service_config import config
12
def startup_event_generator(
15
) -> Callable[[TaskiqState], Awaitable[None]]:
17
Generate shutdown event.
19
This function takes FastAPI application path
20
and runs startup event on broker's startup.
22
:param broker: current broker.
23
:param app_path: fastapi application path.
24
:returns: startup handler.
27
async def startup(state: TaskiqState) -> None:
28
if not broker.is_worker_process:
30
env = import_object(app_path)
32
populate_dependency_context(broker, env)
37
def shutdown_event_generator(
39
) -> Callable[[TaskiqState], Awaitable[None]]:
41
Generate shutdown event.
43
This function takes FastAPI application
44
and runs shutdown event on broker's shutdown.
46
:param broker: current broker.
47
:return: shutdown event handler.
50
async def shutdown(state: TaskiqState) -> None:
51
if not broker.is_worker_process:
53
await state.fastapi_app.router.shutdown()
54
await state.lf_ctx.__aexit__(None, None, None)
59
def init(broker: AsyncBroker, app_path: str) -> None:
61
Add taskiq startup events.
63
This is the main function to integrate FastAPI
66
It creates startup events for broker. So
67
in worker processes all fastapi
68
startup events will run.
70
:param broker: current broker to use.
71
:param app_path: path to fastapi application.
73
broker.add_event_handler(
74
TaskiqEvents.WORKER_STARTUP,
75
startup_event_generator(broker, app_path),
78
broker.add_event_handler(
79
TaskiqEvents.WORKER_SHUTDOWN,
80
shutdown_event_generator(broker),
84
def populate_dependency_context(broker: AsyncBroker, app: FastAPI) -> None:
86
Создает пустышку Request для Taskiq.
90
'http_version' : '1.1',
93
'headers' : [(b'Authorization', f'{config.INTERCO_TOKEN}'),],
102
request = Request(scope=scope)
104
broker.add_dependency_context(