ray-llm
147 строк · 4.9 Кб
1import asyncio
2import logging
3import time
4
5from fastapi import FastAPI
6from ray.util import metrics
7from starlette.middleware.base import RequestResponseEndpoint
8from starlette.requests import Request
9from starlette.responses import Response
10
11from rayllm.backend.observability.event_loop_monitoring import (
12_LATENCY_HISTOGRAM_BOUNDARIES,
13setup_event_loop_monitoring,
14)
15from rayllm.backend.observability.tracing.fastapi import (
16FastAPIInstrumentor,
17_get_route_details,
18)
19
20logger = logging.getLogger(__name__)
21
22
23def configure_telemetry(app: FastAPI, name: str):
24"""Configures common telemetry hooks for FastAPI applications"""
25
26# Would prefer to patch all of FastAPI, but i don't have enough control over the
27# startup sequence of processes to make this work right now.
28FastAPIInstrumentor().instrument_app(app)
29
30app.state.name = name
31
32register_event_loop_telemetry(app)
33register_http_metrics_middleware(app)
34
35
36def register_event_loop_telemetry(app: FastAPI):
37@app.on_event("startup")
38async def add_fastapi_event_loop_monitoring():
39# Store the task handle to prevent it from being garbage collected
40app.state.fastapi_event_loop_schedule_latency_metrics = metrics.Histogram(
41"anyscale_fastapi_event_loop_schedule_latency",
42description="Latency of getting yielded control on the FastAPI event loop in seconds",
43boundaries=_LATENCY_HISTOGRAM_BOUNDARIES,
44tag_keys=("api_server",),
45)
46app.state.fastapi_event_loop_monitoring_iterations = metrics.Counter(
47"anyscale_fastapi_event_loop_monitoring_iterations",
48description="Number of times the FastAPI event loop has iterated to get anyscale_fastapi_event_loop_schedule_latency.",
49tag_keys=("api_server",),
50)
51app.state.fastapi_event_loop_monitoring_tasks = metrics.Gauge(
52"anyscale_fastapi_event_loop_monitoring_tasks",
53description="Number of outsanding tasks on the FastAPI event loop.",
54tag_keys=("api_server",),
55)
56
57tags = {"api_server": _get_app_name(app)}
58
59app.state.fastapi_event_loop_schedule_latency_metrics_task = (
60setup_event_loop_monitoring(
61asyncio.get_running_loop(),
62app.state.fastapi_event_loop_schedule_latency_metrics,
63app.state.fastapi_event_loop_monitoring_iterations,
64app.state.fastapi_event_loop_monitoring_tasks,
65tags,
66)
67)
68
69
70def register_http_metrics_middleware(app: FastAPI):
71@app.on_event("startup")
72def setup_http_metrics() -> None:
73app.state.http_requests_metrics = metrics.Counter(
74"anyscale_http_requests",
75description=(
76"Total number of HTTP requests by status code, handler and method."
77),
78tag_keys=("code", "handler", "method", "api_server"),
79)
80app.state.http_requests_latency_metrics = metrics.Histogram(
81"anyscale_http_request_duration_seconds",
82description=("Duration in seconds of HTTP requests."),
83boundaries=[
840.01,
850.05,
860.1,
870.25,
880.5,
890.75,
901,
911.5,
922,
935,
9410,
9530,
9660,
97120,
98300,
99],
100tag_keys=("handler", "method", "api_server"),
101)
102
103@app.middleware("http")
104async def add_http_requests_metrics(
105request: Request, call_next: RequestResponseEndpoint
106) -> Response:
107now = time.monotonic()
108resp = await call_next(request)
109duration = time.monotonic() - now
110
111handler = _get_route_details(request.scope) or "unknown"
112method = request.get("method", "UNKNOWN")
113
114if hasattr(app.state, "http_requests_metrics"):
115app.state.http_requests_metrics.inc(
116# increment count by 1
1171,
118{
119"code": str(resp.status_code),
120"handler": handler,
121"api_server": _get_app_name(app),
122"method": method,
123},
124)
125else:
126logger.debug("HTTP requests telemetry not initialized, skipping")
127
128if hasattr(app.state, "http_requests_latency_metrics"):
129app.state.http_requests_latency_metrics.observe(
130duration,
131{
132"handler": handler,
133"api_server": _get_app_name(app),
134"method": method,
135},
136)
137else:
138logger.debug("HTTP requests telemetry not initialized, skipping")
139
140return resp
141
142
143def _get_app_name(app: FastAPI) -> str:
144if hasattr(app.state, "name"):
145return app.state.name
146
147return "unknown"
148