TelegramWalletPay
243 строки · 7.4 Кб
1import asyncio
2import ssl
3import warnings
4from contextlib import asynccontextmanager
5from decimal import Decimal
6from http import HTTPStatus
7from typing import (
8Any,
9AsyncIterator,
10Dict,
11Literal,
12Mapping,
13Optional,
14Type,
15TypeVar,
16Union,
17)
18
19import certifi
20from aiohttp import ClientResponse, ClientSession, TCPConnector
21from pydantic import BaseModel
22
23from telegram_wallet_pay.enums import Currency
24from telegram_wallet_pay.errors import (
25InvalidAPIKeyError,
26InvalidRequestError,
27NotFountError,
28RequestLimitReachedError,
29TelegramWalletPayError,
30UnexpectedError,
31)
32from telegram_wallet_pay.schemas import (
33CreateOrderRequest,
34CreateOrderResponse,
35GetOrderPreviewResponse,
36GetOrderReconciliationListResponse,
37MoneyAmount,
38OrderAmountResponse,
39)
40
41T = TypeVar("T", bound=BaseModel)
42
43AUTH_HEADER = "Wpay-Store-Api-Key"
44DEFAULT_API_HOST = "https://pay.wallet.tg"
45
46EXCEPTIONS_MAPPING: Dict[Union[HTTPStatus, int], Type[TelegramWalletPayError]] = {
47HTTPStatus.BAD_REQUEST: InvalidRequestError,
48HTTPStatus.UNAUTHORIZED: InvalidAPIKeyError,
49HTTPStatus.NOT_FOUND: NotFountError,
50HTTPStatus.TOO_MANY_REQUESTS: RequestLimitReachedError,
51HTTPStatus.INTERNAL_SERVER_ERROR: UnexpectedError,
52}
53
54
55class TelegramWalletPay:
56"""Telegram Wallet API client."""
57
58def __init__(self, token: str, api_host: str = DEFAULT_API_HOST) -> None:
59if not token or not isinstance(token, str):
60msg = f"String token should be provided. You passed: {token}"
61raise RuntimeError(msg)
62
63self._base_url = api_host
64self._session: Optional[ClientSession] = None
65self._headers = {AUTH_HEADER: token}
66
67async def create_order( # noqa: PLR0913
68self,
69*,
70amount: Union[str, Decimal, float],
71currency_code: Literal[
72Currency.TON,
73Currency.NOT,
74Currency.BTC,
75Currency.USDT,
76Currency.EUR,
77Currency.USD,
78Currency.RUB,
79],
80description: str,
81external_id: str,
82timeout_seconds: int,
83customer_telegram_user_id: int,
84auto_conversion_currency: Optional[
85Literal[
86Currency.TON,
87Currency.NOT,
88Currency.BTC,
89Currency.USDT,
90]
91] = None,
92return_url: Optional[str] = None,
93fail_return_url: Optional[str] = None,
94custom_data: Optional[str] = None,
95) -> CreateOrderResponse:
96"""Create an order.
97
98Docs:
99https://docs.wallet.tg/pay/#tag/Order/operation/create
100"""
101create_order_request = CreateOrderRequest(
102amount=MoneyAmount(
103amount=str(amount),
104currency_code=currency_code,
105),
106auto_conversion_currency=auto_conversion_currency,
107description=description,
108return_url=return_url,
109fail_return_url=fail_return_url,
110custom_data=custom_data,
111external_id=external_id,
112timeout_seconds=timeout_seconds,
113customer_telegram_user_id=customer_telegram_user_id,
114)
115
116async with self._make_request(
117method="POST",
118url="/wpay/store-api/v1/order",
119json=create_order_request.model_dump(by_alias=True),
120) as response:
121return await self._prepare_result(response, CreateOrderResponse)
122
123async def get_preview(self, order_id: str) -> GetOrderPreviewResponse:
124"""Retrieve the order information.
125
126Deprecated! Use method `.get_order_preview()` instead.
127"""
128warnings.warn(
129"Method `.get_preview()` is deprecated and will be removed in v1.0.0\n"
130"Use method `.get_order_preview()` instead.",
131category=DeprecationWarning,
132stacklevel=2,
133)
134return await self.get_order_preview(order_id)
135
136async def get_order_preview(self, order_id: str) -> GetOrderPreviewResponse:
137"""Retrieve the order information.
138
139Docs:
140https://docs.wallet.tg/pay/#tag/Order/operation/getPreview
141"""
142async with self._make_request(
143method="GET",
144url="/wpay/store-api/v1/order/preview",
145params={"id": order_id},
146) as response:
147return await self._prepare_result(response, GetOrderPreviewResponse)
148
149async def get_order_list(
150self,
151*,
152offset: int,
153count: int,
154) -> GetOrderReconciliationListResponse:
155"""Get list of store orders.
156
157Items sorted by creation time in ascending order.
158
159Docs:
160https://docs.wallet.tg/pay/#tag/Order-Reconciliation/operation/getOrderList
161"""
162query_params: Dict[str, Any] = {
163"offset": offset,
164"count": count,
165}
166
167async with self._make_request(
168method="GET",
169url="/wpay/store-api/v1/reconciliation/order-list",
170params=query_params,
171) as response:
172return await self._prepare_result(
173response,
174GetOrderReconciliationListResponse,
175)
176
177async def get_order_amount(self) -> OrderAmountResponse:
178"""Get total count of all created orders in the Store.
179
180Including all - paid and unpaid.
181
182Docs:
183https://docs.wallet.tg/pay/#tag/Order-Reconciliation/operation/getOrderAmount
184"""
185async with self._make_request(
186method="GET",
187url="/wpay/store-api/v1/reconciliation/order-amount",
188) as response:
189return await self._prepare_result(response, OrderAmountResponse)
190
191async def close(self) -> None:
192"""Graceful session close."""
193if not self._session:
194return
195
196await self._session.close()
197
198# Wait 250 ms for the underlying SSL connections to close
199# https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
200await asyncio.sleep(0.25)
201
202async def _get_session(self) -> ClientSession:
203"""Get aiohttp session with cache."""
204if self._session is None or self._session.closed:
205ssl_context = ssl.create_default_context(cafile=certifi.where())
206connector = TCPConnector(ssl=ssl_context)
207self._session = ClientSession(
208base_url=self._base_url,
209connector=connector,
210headers=self._headers,
211)
212
213return self._session
214
215@asynccontextmanager
216async def _make_request(
217self,
218method: str,
219url: str,
220params: Optional[Mapping[str, str]] = None,
221json: Optional[Mapping[str, str]] = None,
222) -> AsyncIterator[ClientResponse]:
223"""Make request with cached session."""
224session = await self._get_session()
225async with session.request(
226method=method,
227url=url,
228params=params,
229json=json,
230) as response:
231yield response
232
233@staticmethod
234async def _prepare_result(response: ClientResponse, schema: Type[T]) -> T:
235"""Prepare response result or raise an exception."""
236status = response.status
237body = await response.text()
238
239if status == HTTPStatus.OK:
240return schema.model_validate_json(body)
241
242exc_type = EXCEPTIONS_MAPPING.get(status, TelegramWalletPayError)
243raise exc_type(body)
244