ray-llm
133 строки · 3.7 Кб
1import asyncio
2import logging
3import time
4from typing import Dict
5
6from ray.util import metrics
7
8logger = logging.getLogger(__name__)
9
10_METRICS_LOOP_INTERVAL = 5 # 5 seconds
11_LATENCY_HISTOGRAM_BOUNDARIES = [
120.05,
130.1,
140.15,
150.20,
160.25,
170.5,
180.75,
191.0,
201.5,
212.0,
223.0,
235.0,
2410.0,
2515.0,
2620.0,
2730.0,
2845.0,
2960.0,
3090.0,
31120.0,
32150.0,
33180.0,
34300.0,
35600.0,
36]
37
38
39def setup_event_loop_monitoring(
40loop: asyncio.AbstractEventLoop,
41scheduling_latency: metrics.Histogram,
42iterations: metrics.Counter,
43tasks: metrics.Gauge,
44tags: Dict[str, str],
45) -> asyncio.Task:
46return asyncio.create_task(
47_run_monitoring_loop(
48loop,
49schedule_latency=scheduling_latency,
50iterations=iterations,
51task_gauge=tasks,
52tags=tags,
53)
54)
55
56
57async def _run_monitoring_loop(
58loop: asyncio.AbstractEventLoop,
59schedule_latency: metrics.Histogram,
60iterations: metrics.Counter,
61task_gauge: metrics.Gauge,
62tags: Dict[str, str],
63) -> None:
64while loop.is_running():
65iterations.inc(1, tags)
66num_tasks = len(asyncio.all_tasks())
67task_gauge.set(num_tasks, tags)
68yield_time = time.monotonic()
69await asyncio.sleep(_METRICS_LOOP_INTERVAL)
70elapsed_time = time.monotonic() - yield_time
71
72# Historically, Ray's implementation of histograms are extremely finicky
73# with non-positive values (https://github.com/ray-project/ray/issues/26698).
74# Technically it shouldn't be possible for this to be negative, add the
75# max just to be safe.
76latency = max(0, elapsed_time - _METRICS_LOOP_INTERVAL)
77schedule_latency.observe(latency, tags)
78
79
80def _event_loop_available() -> bool:
81try:
82asyncio.get_running_loop()
83return True
84except RuntimeError:
85# Likely that actor is being run outside of Ray, for example in a
86# unit test.
87return False
88
89
90class InstrumentAsyncioEventLoop:
91"""
92This is a mixin that starts an asyncio task to monitors the health of the
93event loop. This is meant to be added to any Actor class that has an asyncio
94reconciler.
95"""
96
97def __init__(self, *args, **kwargs):
98# This calls other parent class __init__ methods in multiple inheritance
99# situations
100super().__init__(*args, **kwargs)
101
102if _event_loop_available():
103tag_keys = ("actor",)
104
105iterations = metrics.Counter(
106"anyscale_event_loop_monitoring_iterations",
107description="Number of times the monitoring loop has iterated for this actor.",
108tag_keys=tag_keys,
109)
110tasks = metrics.Gauge(
111"anyscale_event_loop_tasks",
112description="Number of outstanding tasks on the event loop.",
113tag_keys=tag_keys,
114)
115scheduling_latency = metrics.Histogram(
116"anyscale_event_loop_schedule_latency",
117description="Latency of getting yielded control on the event loop in seconds",
118boundaries=_LATENCY_HISTOGRAM_BOUNDARIES,
119tag_keys=tag_keys,
120)
121
122tags = {"actor": self.__class__.__name__}
123
124# Store the task handle to prevent it from being garbage collected
125self._event_loop_metrics_task = setup_event_loop_monitoring(
126asyncio.get_running_loop(),
127scheduling_latency=scheduling_latency,
128iterations=iterations,
129tasks=tasks,
130tags=tags,
131)
132else:
133logger.info("No event loop running. Skipping event loop metrics.")
134