ray-llm

Форк
0
/
event_loop_monitoring.py 
133 строки · 3.7 Кб
1
import asyncio
2
import logging
3
import time
4
from typing import Dict
5

6
from ray.util import metrics
7

8
logger = logging.getLogger(__name__)
9

10
_METRICS_LOOP_INTERVAL = 5  # 5 seconds
11
_LATENCY_HISTOGRAM_BOUNDARIES = [
12
    0.05,
13
    0.1,
14
    0.15,
15
    0.20,
16
    0.25,
17
    0.5,
18
    0.75,
19
    1.0,
20
    1.5,
21
    2.0,
22
    3.0,
23
    5.0,
24
    10.0,
25
    15.0,
26
    20.0,
27
    30.0,
28
    45.0,
29
    60.0,
30
    90.0,
31
    120.0,
32
    150.0,
33
    180.0,
34
    300.0,
35
    600.0,
36
]
37

38

39
def setup_event_loop_monitoring(
40
    loop: asyncio.AbstractEventLoop,
41
    scheduling_latency: metrics.Histogram,
42
    iterations: metrics.Counter,
43
    tasks: metrics.Gauge,
44
    tags: Dict[str, str],
45
) -> asyncio.Task:
46
    return asyncio.create_task(
47
        _run_monitoring_loop(
48
            loop,
49
            schedule_latency=scheduling_latency,
50
            iterations=iterations,
51
            task_gauge=tasks,
52
            tags=tags,
53
        )
54
    )
55

56

57
async def _run_monitoring_loop(
58
    loop: asyncio.AbstractEventLoop,
59
    schedule_latency: metrics.Histogram,
60
    iterations: metrics.Counter,
61
    task_gauge: metrics.Gauge,
62
    tags: Dict[str, str],
63
) -> None:
64
    while loop.is_running():
65
        iterations.inc(1, tags)
66
        num_tasks = len(asyncio.all_tasks())
67
        task_gauge.set(num_tasks, tags)
68
        yield_time = time.monotonic()
69
        await asyncio.sleep(_METRICS_LOOP_INTERVAL)
70
        elapsed_time = time.monotonic() - yield_time
71

72
        # Historically, Ray's implementation of histograms are extremely finicky
73
        # with non-positive values (https://github.com/ray-project/ray/issues/26698).
74
        # Technically it shouldn't be possible for this to be negative, add the
75
        # max just to be safe.
76
        latency = max(0, elapsed_time - _METRICS_LOOP_INTERVAL)
77
        schedule_latency.observe(latency, tags)
78

79

80
def _event_loop_available() -> bool:
81
    try:
82
        asyncio.get_running_loop()
83
        return True
84
    except RuntimeError:
85
        # Likely that actor is being run outside of Ray, for example in a
86
        # unit test.
87
        return False
88

89

90
class InstrumentAsyncioEventLoop:
91
    """
92
    This is a mixin that starts an asyncio task to monitors the health of the
93
    event loop. This is meant to be added to any Actor class that has an asyncio
94
    reconciler.
95
    """
96

97
    def __init__(self, *args, **kwargs):
98
        # This calls other parent class __init__ methods in multiple inheritance
99
        # situations
100
        super().__init__(*args, **kwargs)
101

102
        if _event_loop_available():
103
            tag_keys = ("actor",)
104

105
            iterations = metrics.Counter(
106
                "anyscale_event_loop_monitoring_iterations",
107
                description="Number of times the monitoring loop has iterated for this actor.",
108
                tag_keys=tag_keys,
109
            )
110
            tasks = metrics.Gauge(
111
                "anyscale_event_loop_tasks",
112
                description="Number of outstanding tasks on the event loop.",
113
                tag_keys=tag_keys,
114
            )
115
            scheduling_latency = metrics.Histogram(
116
                "anyscale_event_loop_schedule_latency",
117
                description="Latency of getting yielded control on the event loop in seconds",
118
                boundaries=_LATENCY_HISTOGRAM_BOUNDARIES,
119
                tag_keys=tag_keys,
120
            )
121

122
            tags = {"actor": self.__class__.__name__}
123

124
            # Store the task handle to prevent it from being garbage collected
125
            self._event_loop_metrics_task = setup_event_loop_monitoring(
126
                asyncio.get_running_loop(),
127
                scheduling_latency=scheduling_latency,
128
                iterations=iterations,
129
                tasks=tasks,
130
                tags=tags,
131
            )
132
        else:
133
            logger.info("No event loop running. Skipping event loop metrics.")
134

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.