ray-llm

Форк
0
147 строк · 4.9 Кб
1
import asyncio
2
import logging
3
import time
4

5
from fastapi import FastAPI
6
from ray.util import metrics
7
from starlette.middleware.base import RequestResponseEndpoint
8
from starlette.requests import Request
9
from starlette.responses import Response
10

11
from rayllm.backend.observability.event_loop_monitoring import (
12
    _LATENCY_HISTOGRAM_BOUNDARIES,
13
    setup_event_loop_monitoring,
14
)
15
from rayllm.backend.observability.tracing.fastapi import (
16
    FastAPIInstrumentor,
17
    _get_route_details,
18
)
19

20
logger = logging.getLogger(__name__)
21

22

23
def configure_telemetry(app: FastAPI, name: str):
24
    """Configures common telemetry hooks for FastAPI applications"""
25

26
    # Would prefer to patch all of FastAPI, but i don't have enough control over the
27
    # startup sequence of processes to make this work right now.
28
    FastAPIInstrumentor().instrument_app(app)
29

30
    app.state.name = name
31

32
    register_event_loop_telemetry(app)
33
    register_http_metrics_middleware(app)
34

35

36
def register_event_loop_telemetry(app: FastAPI):
37
    @app.on_event("startup")
38
    async def add_fastapi_event_loop_monitoring():
39
        # Store the task handle to prevent it from being garbage collected
40
        app.state.fastapi_event_loop_schedule_latency_metrics = metrics.Histogram(
41
            "anyscale_fastapi_event_loop_schedule_latency",
42
            description="Latency of getting yielded control on the FastAPI event loop in seconds",
43
            boundaries=_LATENCY_HISTOGRAM_BOUNDARIES,
44
            tag_keys=("api_server",),
45
        )
46
        app.state.fastapi_event_loop_monitoring_iterations = metrics.Counter(
47
            "anyscale_fastapi_event_loop_monitoring_iterations",
48
            description="Number of times the FastAPI event loop has iterated to get anyscale_fastapi_event_loop_schedule_latency.",
49
            tag_keys=("api_server",),
50
        )
51
        app.state.fastapi_event_loop_monitoring_tasks = metrics.Gauge(
52
            "anyscale_fastapi_event_loop_monitoring_tasks",
53
            description="Number of outsanding tasks on the FastAPI event loop.",
54
            tag_keys=("api_server",),
55
        )
56

57
        tags = {"api_server": _get_app_name(app)}
58

59
        app.state.fastapi_event_loop_schedule_latency_metrics_task = (
60
            setup_event_loop_monitoring(
61
                asyncio.get_running_loop(),
62
                app.state.fastapi_event_loop_schedule_latency_metrics,
63
                app.state.fastapi_event_loop_monitoring_iterations,
64
                app.state.fastapi_event_loop_monitoring_tasks,
65
                tags,
66
            )
67
        )
68

69

70
def register_http_metrics_middleware(app: FastAPI):
71
    @app.on_event("startup")
72
    def setup_http_metrics() -> None:
73
        app.state.http_requests_metrics = metrics.Counter(
74
            "anyscale_http_requests",
75
            description=(
76
                "Total number of HTTP requests by status code, handler and method."
77
            ),
78
            tag_keys=("code", "handler", "method", "api_server"),
79
        )
80
        app.state.http_requests_latency_metrics = metrics.Histogram(
81
            "anyscale_http_request_duration_seconds",
82
            description=("Duration in seconds of HTTP requests."),
83
            boundaries=[
84
                0.01,
85
                0.05,
86
                0.1,
87
                0.25,
88
                0.5,
89
                0.75,
90
                1,
91
                1.5,
92
                2,
93
                5,
94
                10,
95
                30,
96
                60,
97
                120,
98
                300,
99
            ],
100
            tag_keys=("handler", "method", "api_server"),
101
        )
102

103
    @app.middleware("http")
104
    async def add_http_requests_metrics(
105
        request: Request, call_next: RequestResponseEndpoint
106
    ) -> Response:
107
        now = time.monotonic()
108
        resp = await call_next(request)
109
        duration = time.monotonic() - now
110

111
        handler = _get_route_details(request.scope) or "unknown"
112
        method = request.get("method", "UNKNOWN")
113

114
        if hasattr(app.state, "http_requests_metrics"):
115
            app.state.http_requests_metrics.inc(
116
                # increment count by 1
117
                1,
118
                {
119
                    "code": str(resp.status_code),
120
                    "handler": handler,
121
                    "api_server": _get_app_name(app),
122
                    "method": method,
123
                },
124
            )
125
        else:
126
            logger.debug("HTTP requests telemetry not initialized, skipping")
127

128
        if hasattr(app.state, "http_requests_latency_metrics"):
129
            app.state.http_requests_latency_metrics.observe(
130
                duration,
131
                {
132
                    "handler": handler,
133
                    "api_server": _get_app_name(app),
134
                    "method": method,
135
                },
136
            )
137
        else:
138
            logger.debug("HTTP requests telemetry not initialized, skipping")
139

140
        return resp
141

142

143
def _get_app_name(app: FastAPI) -> str:
144
    if hasattr(app.state, "name"):
145
        return app.state.name
146

147
    return "unknown"
148

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.