lavkach3

Форк
0
227 строк · 8.0 Кб
1
from __future__ import annotations
2

3
import asyncio
4
import logging
5
import os
6
import time
7
from collections.abc import Callable
8
from typing import Any
9

10
import psutil
11
from fastapi import FastAPI
12
from starlette.middleware.base import RequestResponseEndpoint
13
from starlette.requests import Request
14
from starlette.responses import Response
15
from starlette.routing import Match, Mount
16
from starlette.types import Scope
17

18
from core.db_config import config
19

20
TIMER_ATTRIBUTE = "__fastapi_restful_timer__"
21

22

23
def add_timing_middleware(
24
    app: FastAPI, record: Callable[[str], None] | None = None, prefix: str = "", exclude: str | None = None
25
) -> None:
26
    """
27
    Adds a middleware to the provided `app` that records timing metrics using the provided `record` callable.
28

29
    Typically `record` would be something like `logger.info` for a `logging.Logger` instance.
30

31
    The provided `prefix` is used when generating route names.
32

33
    If `exclude` is provided, timings for any routes containing `exclude`
34
    as an exact substring of the generated metric name will not be logged.
35
    This provides an easy way to disable logging for routes
36

37
    The `exclude` will probably be replaced by a regex match at some point in the future. (PR welcome!)
38
    """
39
    metric_namer = _MetricNamer(prefix=prefix, app=app)
40

41
    @app.middleware("http")
42
    async def timing_middleware(request: Request, call_next: RequestResponseEndpoint) -> Response:
43
        metric_name = metric_namer(request.scope)
44
        with _TimingStats(metric_name, record=record, exclude=exclude) as timer:
45
            setattr(request.state, TIMER_ATTRIBUTE, timer)
46
            response = await call_next(request)
47
        return response
48

49

50
def record_timing(request: Request, note: str | None = None) -> None:
51
    """
52
    Call this function at any point that you want to display elapsed time during the handling of a single request
53

54
    This can help profile which piece of a request is causing a performance bottleneck.
55

56
    Note that for this function to succeed, the request should have been generated by a FastAPI app
57
    that has had timing middleware added using the `fastapi_restful.timing.add_timing_middleware` function.
58
    """
59
    timer = getattr(request.state, TIMER_ATTRIBUTE, None)
60
    if timer is not None:
61
        if not isinstance(timer, _TimingStats):
62
            raise ValueError("Timer should be of an instance of TimingStats")
63
        timer.emit(note)
64
    else:
65
        raise ValueError("No timer present on request")
66

67

68
class _TimingStats:
69
    """
70
    This class tracks and records endpoint timing data.
71

72
    Should be used as a context manager; on exit, timing stats will be emitted.
73

74
    name:
75
        The name to include with the recorded timing data
76
    record:
77
        The callable to call on generated messages. Defaults to `print`, but typically
78
        something like `logger.info` for a `logging.Logger` instance would be preferable.
79
    exclude:
80
        An optional string; if it is not None and occurs inside `name`, no stats will be emitted
81
    """
82

83
    def __init__(
84
        self, name: str | None = None, record: Callable[[str], None] | None = None, exclude: str | None = None
85
    ) -> None:
86
        self.name = name
87
        self.record = record or print
88

89
        self.process: psutil.Process = psutil.Process(os.getpid())
90
        self.start_time: float = 0
91
        self.start_cpu_time: float = 0
92
        self.end_cpu_time: float = 0
93
        self.end_time: float = 0
94
        self.silent: bool = False
95

96
        if self.name is not None and exclude is not None and (exclude in self.name):
97
            self.silent = True
98

99
    def start(self) -> None:
100
        self.start_time = time.time()
101
        self.start_cpu_time = self._get_cpu_time()
102

103
    def take_split(self) -> None:
104
        self.end_time = time.time()
105
        self.end_cpu_time = self._get_cpu_time()
106

107
    @property
108
    def time(self) -> float:
109
        return self.end_time - self.start_time
110

111
    @property
112
    def cpu_time(self) -> float:
113
        return self.end_cpu_time - self.start_cpu_time
114

115
    def __enter__(self) -> _TimingStats:
116
        self.start()
117
        return self
118

119
    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
120
        self.emit()
121

122
    def emit(self, note: str | None = None) -> None:
123
        """
124
        Emit timing information, optionally including a specified note
125
        """
126
        if not self.silent:
127
            self.take_split()
128
            cpu_ms = 1000 * self.cpu_time
129
            wall_ms = 1000 * self.time
130
            message = f"TIMING: Wall: {wall_ms:6.1f}ms | CPU: {cpu_ms:6.1f}ms | {self.name}"
131
            if note is not None:
132
                message += f" ({note})"
133
            self.record(message)
134

135
    def _get_cpu_time(self) -> float:
136
        """
137
        Generates the cpu time to report. Adds the user and system time, following the implementation from timing-asgi
138
        """
139
        resources = self.process.cpu_times()
140
        # add up user time and system time
141
        return resources[0] + resources[1]
142

143

144
class _MetricNamer:
145
    """
146
    This class generates the route "name" used when logging timing records.
147

148
    If the route has `endpoint` and `name` attributes, the endpoint's module and route's name will be used
149
    (along with an optional prefix that can be used, e.g., to distinguish between multiple mounted ASGI apps).
150

151
    By default, in FastAPI the route name is the `__name__` of the route's function (or type if it is a callable class
152
    instance).
153

154
    For example, with prefix == "custom", a function defined in the module `app.crud` with name `read_item`
155
    would get name `custom.app.crud.read_item`. If the empty string were used as the prefix, the result would be
156
    just "app.crud.read_item".
157

158
    For starlette.routing.Mount instances, the name of the type of `route.app` is used in a slightly different format.
159

160
    For other routes missing either an endpoint or name, the raw route path is included in the generated name.
161
    """
162

163
    def __init__(self, prefix: str, app: FastAPI):
164
        if prefix:
165
            prefix += "."
166
        self.prefix = prefix
167
        self.app = app
168

169
    def __call__(self, scope: Scope) -> str:
170
        """
171
        Generates the actual name to use when logging timing metrics for a specified ASGI Scope
172
        """
173
        route = None
174
        for r in self.app.router.routes:
175
            if r.matches(scope)[0] == Match.FULL:
176
                route = r
177
                break
178
        if hasattr(route, "endpoint") and hasattr(route, "name"):
179
            name = f"{self.prefix}{route.endpoint.__module__}.{route.name}"  # type: ignore
180
        elif isinstance(route, Mount):
181
            name = f"{type(route.app).__name__}<{route.name!r}>"
182
        else:
183
            name = str(f"<Path: {scope['path']}>")
184
        return name
185

186
def timed(fn: Callable[..., Any]) -> Callable[..., Any]:
187
    """
188
    Decorator log test start and end time of a function
189
    :param fn: Function to decorate
190
    :return: Decorated function
191
    Example:
192
    """
193

194
    def wrapped_fn(*args: Any, **kwargs: Any) -> Any:
195
        start = time.time()
196
        logging.info(f"Running: {fn.__name__}")
197
        ret = fn(*args, **kwargs)
198
        duration_str = get_duration_str(start)
199
        logging.info(f"Finished: {fn.__name__} in {duration_str}")
200
        return ret
201

202
    async def wrapped_fn_async(*args: Any, **kwargs: Any) -> Any:
203
        start = time.time()
204
        logging.info(f"Running: {fn.__name__}")
205
        ret = await fn(*args, **kwargs)
206
        duration_str = get_duration_str(start)
207
        logging.info(f"Finished: {fn.__name__} in {duration_str}")
208
        return ret
209
    if config.ENV in ('local', 'dev'):
210
        if asyncio.iscoroutinefunction(fn):
211
            return wrapped_fn_async
212
        else:
213
            return wrapped_fn
214

215

216
def get_duration_str(start: float) -> str:
217
    """Get human readable duration string from start time"""
218
    duration = time.time() - start
219
    if duration > 1:
220
        duration_str = f'{duration:,.3f}s'
221
    elif duration > 1e-3:
222
        duration_str = f'{round(duration * 1e3)}ms'
223
    elif duration > 1e-6:
224
        duration_str = f'{round(duration * 1e6)}us'
225
    else:
226
        duration_str = f'{duration * 1e9}ns'
227
    return duration_str
228

229

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.