lavkach3
1import asyncio2from contextvars import ContextVar3from uuid import uuid44
5from taskiq import SimpleRetryMiddleware, TaskiqMiddleware6from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend, PubSubBroker7
8from core.helpers.broker.initializator import init9
10class TaskSession(TaskiqMiddleware):11"""Middleware to add retries."""12
13def __init__(self,) -> None:14...15
16def pre_execute(17self,18message: "TaskiqMessage",19) -> "Union[TaskiqMessage, Coroutine[Any, Any, TaskiqMessage]]":20return message21
22
23
24redis_async_result = RedisAsyncResultBackend(25redis_url="rediss://default:AVNS_w6X_JVOCj6vbTjwIowO@redis-do-user-15109425-0.c.db.ondigitalocean.com:25061",26result_ex_time=60, # Сколько хранить результаты в секундах27ssl_cert_reqs=None,28socket_timeout=36029)
30
31# Or you can use PubSubBroker if you need broadcasting
32list_brocker = ListQueueBroker(33url="rediss://default:AVNS_w6X_JVOCj6vbTjwIowO@redis-do-user-15109425-0.c.db.ondigitalocean.com:25061",34ssl_cert_reqs=None,35queue_name='model',36socket_timeout=36037).with_result_backend(redis_async_result).with_middlewares(SimpleRetryMiddleware(default_retry_count=3)).with_middlewares(TaskSession())38init(list_brocker, 'core.env:Env')39#
40# async def main():
41# async with broker:
42# await broker.send_task("core.tasks:task", args=(1, 2), kwargs={"a": 3, "b": 4})
43#
44# if __name__ == "__main__":
45# asyncio.run(main())
46
47
48