ray-llm
43 строки · 1.3 Кб
1import concurrent.futures.thread
2from functools import wraps
3from typing import Collection
4
5from opentelemetry import trace
6from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
7from opentelemetry.trace import (
8get_current_span,
9)
10
11from rayllm.backend.observability.tracing.context import use_context
12
13
14class _InstrumentedThreadPoolExecutorWorkItem(concurrent.futures.thread._WorkItem):
15def __init__(self, future, fn, args, kwargs):
16super().__init__(
17future,
18self.wrap_with_context(fn, parent_span=get_current_span()),
19args,
20kwargs,
21)
22
23@staticmethod
24def wrap_with_context(fn, parent_span: trace.Span):
25@wraps(fn)
26def wrapper(*args, **kwargs):
27with use_context(trace.set_span_in_context(parent_span)):
28return fn(*args, **kwargs)
29
30return wrapper
31
32
33class ThreadPoolExecutorInstrumentor(BaseInstrumentor):
34original_workitem = concurrent.futures.thread._WorkItem
35
36def instrumentation_dependencies(self) -> Collection[str]:
37return ()
38
39def _instrument(self, *args, **kwargs):
40concurrent.futures.thread._WorkItem = _InstrumentedThreadPoolExecutorWorkItem
41
42def _uninstrument(self, **kwargs):
43concurrent.futures.thread._WorkItem = self.original_workitem
44