pytorch

Форк
0
/
cli_function_profiler.py 
302 строки · 10.8 Кб
1
# mypy: disallow-untyped-defs
2

3
import functools
4
import logging
5
import os
6
import re
7
import subprocess
8
import time
9
from threading import Lock
10
from typing import Any, List, Optional, Sequence
11

12

13
logger = logging.getLogger("strobelight_function_profiler")
14

15
console_handler = logging.StreamHandler()
16
formatter = logging.Formatter(
17
    "%(name)s, line %(lineno)d, %(asctime)s, %(levelname)s: %(message)s"
18
)
19
console_handler.setFormatter(formatter)
20

21
logger.addHandler(console_handler)
22
logger.setLevel(logging.INFO)
23
logger.propagate = False
24

25

26
class StrobelightCLIProfilerError(Exception):
27
    """
28
    Raised when an error happens during strobelight profiling
29
    """
30

31

32
def _pid_namespace_link(pid: Optional[int] = None) -> str:
33
    """Returns the link to the process's namespace, example: pid:[4026531836]"""
34
    PID_NAMESPACE_PATH = "/proc/{}/ns/pid"
35
    pid = pid or os.getpid()
36
    return os.readlink(PID_NAMESPACE_PATH.format(pid))
37

38

39
def _pid_namespace(pid: Optional[int] = None) -> int:
40
    """Returns the process's namespace id"""
41
    pid = pid or os.getpid()
42
    link = _pid_namespace_link(pid)
43
    return int(link[link.find("[") + 1 : -1])
44

45

46
def _command_to_string(command: Sequence[str]) -> str:
47
    return " ".join(command)
48

49

50
class StrobelightCLIFunctionProfiler:
51
    """
52
    Note: this is a meta only tool.
53

54
    StrobelightCLIFunctionProfiler can be used to profile a python function and
55
    generate a strobelight link with the results. It works on meta servers but
56
    does not requries an fbcode target.
57
    When stop_at_error is false(default), error during profiling does not prevent
58
    the work function from running.
59

60
    Check function_profiler_example.py for an example.
61
    """
62

63
    # This lock is used to make sure only one thread is running the profiler at any point.
64
    _lock = Lock()
65

66
    def __init__(
67
        self,
68
        *,
69
        stop_at_error: bool = False,
70
        max_profile_duration_sec: int = 60 * 10,
71
        sample_each: float = 1e7,  # sample each sample_each cycles.
72
        run_user_name: str = "pytorch-strobelight-ondemand",
73
        timeout_wait_for_running_sec: int = 60,
74
        timeout_wait_for_finished_sec: int = 60,
75
        recorded_env_variables: Optional[List[str]] = None,
76
        sample_tags: Optional[List[str]] = None,
77
        stack_max_len: int = 127,
78
        async_stack_max_len: int = 127,
79
    ):
80
        self.stop_at_error = stop_at_error
81
        self.max_profile_duration_sec = max_profile_duration_sec
82
        self.sample_each = sample_each
83
        self.run_user_name = run_user_name
84
        self.timeout_wait_for_running_sec = timeout_wait_for_running_sec
85
        self.timeout_wait_for_finished_sec = timeout_wait_for_finished_sec
86
        # Results of the most recent run.
87
        # Tracks the strobelight run id of the most recent run
88
        self.current_run_id: Optional[int] = None
89
        self.sample_tags = sample_tags
90

91
    def _run_async(self) -> None:
92
        processId = os.getpid()
93
        namespace = _pid_namespace(processId)
94
        command = [
95
            "strobeclient",
96
            "run",
97
            "--profiler",
98
            "pyperf",
99
            "--event",
100
            "cycles",
101
            "--async",
102
            "--sample-interval",
103
            f"{int(self.sample_each)}",
104
            "--duration-ms",
105
            f"{int(self.max_profile_duration_sec * 1000)}",
106
            "--pid",
107
            f"{namespace}:{processId}",
108
        ]
109

110
        if self.sample_tags:
111
            command.append("--sample-tags")
112
            command.append(",".join(self.sample_tags))
113

114
        logger.debug("running command: %s", _command_to_string(command))
115
        result = subprocess.run(command, capture_output=True)
116
        output = result.stderr.decode("utf-8")
117
        logger.debug("output:\n{%s}", output)
118

119
        if result.returncode != 0:
120
            raise StrobelightCLIProfilerError(
121
                f"failed to start strobelight profiling, error in run_async:{output}"
122
            )
123

124
        if match := re.search(r"INFO Run Id: (-?\d+)", output):
125
            self.current_run_id = int(match.group(1))
126
            return
127

128
        raise StrobelightCLIProfilerError(
129
            f"failed to start strobelight profiling, unexpected result {output}"
130
        )
131

132
    def _wait_for_running(self, counter: int = 0) -> None:
133
        if counter > 20:
134
            raise StrobelightCLIProfilerError(
135
                "wait_for_running called more than 20 times"
136
            )
137

138
        command = ["strobeclient", "getRunStatus", "--run-id", f"{self.current_run_id}"]
139
        logger.debug("running command: %s", _command_to_string(command))
140
        result = subprocess.run(command, capture_output=True)
141
        output = result.stderr.decode("utf-8")
142
        logger.debug("output:\n{%s}", output)
143

144
        if result.returncode != 0:
145
            raise StrobelightCLIProfilerError(
146
                f"failed to start strobelight profiling, error in wait_for_running:{output}"
147
            )
148

149
        if match := re.search("Profile run status: (.*)", output):
150
            current_status = match.group(1)
151
            if current_status == "RUNNING":
152
                return
153
            elif current_status == "PREPARING":
154
                time.sleep(10)
155
                self._wait_for_running(counter + 1)
156
                return
157
            else:
158
                raise StrobelightCLIProfilerError(f"unexpected {current_status} phase")
159

160
        raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ")
161

162
    def _stop_run(self) -> None:
163
        command = ["strobeclient", "stopRun", "--run-id", str(self.current_run_id)]
164
        logger.debug("running command: %s", _command_to_string(command))
165
        result = subprocess.run(command, capture_output=True)
166
        output = result.stderr.decode("utf-8")
167
        logger.debug("output:\n{%s}", output)
168

169
        if result.returncode != 0:
170
            raise StrobelightCLIProfilerError(
171
                f"failed to stop strobelight profiling, return code is not 0 :{output}"
172
            )
173

174
        if match := re.search("INFO ::1:(.*)", output):
175
            current_status = match.group(1)
176
            if current_status.__contains__("Success!"):
177
                return
178
            else:
179
                raise StrobelightCLIProfilerError(
180
                    f"failed to stop strobelight profiling, got {current_status} result"
181
                )
182

183
        raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ")
184

185
    def _get_results(self) -> None:
186
        command = ["strobeclient", "getRunStatus", "--run-id", str(self.current_run_id)]
187
        logger.debug("running command: %s", _command_to_string(command))
188
        result = subprocess.run(command, capture_output=True)
189
        output = result.stderr.decode("utf-8")
190
        logger.debug("output:\n{%s}", output)
191

192
        if result.returncode != 0:
193
            raise StrobelightCLIProfilerError(
194
                f"failed to extract profiling results, return code is not 0 : {output}"
195
            )
196

197
        if match := re.search("INFO ::1:(.*)", output):
198
            current_status = match.group(1)
199
            if current_status.__contains__("Profile run status: PROCESSING"):
200
                time.sleep(10)
201
                self._get_results()
202
                return
203
            elif not current_status.__contains__("Profile run finished with SUCCESS"):
204
                raise StrobelightCLIProfilerError(
205
                    f"failed to extract profiling results, unexpected response {output}"
206
                )
207

208
        for item in re.findall(
209
            r"(Total samples(.*)|GraphProfiler(.*)|Icicle view \(python stack\)(.*))",
210
            output,
211
        ):
212
            logger.info(item[0])
213

214
    def _stop_strobelight_no_throw(
215
        self,
216
        collect_results: bool,
217
    ) -> None:
218
        try:
219
            # call stop run
220
            self._stop_run()
221
            logger.info("strobelight profiling stopped")
222

223
            logger.debug("collection stopped")
224

225
            if not collect_results:
226
                return
227

228
            self._get_results()
229
        except Exception as error:
230
            logger.warning("error during stop_strobelight", exc_info=True)
231

232
    # Return true if strobelight started and is running. Never throw.
233
    def _start_strobelight(self) -> bool:
234
        strobelight_started = False
235
        try:
236
            self._run_async()
237
            strobelight_started = True
238
            logger.info("strobelight run id is: %s", self.current_run_id)
239
            self._wait_for_running()
240
            logger.info("strobelight profiling running")
241
            return True
242

243
        except Exception as error:
244
            logger.warning("error during start_strobelight:", exc_info=True)
245
            if strobelight_started:
246
                self._stop_strobelight_no_throw(collect_results=False)
247
            return False
248

249
    def profile(self, work_function: Any, *args: Any, **kwargs: Any) -> Any:
250
        self.current_run_id = None
251

252
        if locked := StrobelightCLIFunctionProfiler._lock.acquire(False):
253
            if not locked:
254
                if self.stop_at_error:
255
                    raise StrobelightCLIProfilerError("concurrent runs not supported")
256

257
                logger.warning("concurrent runs not supported")
258
                return work_function(*args, **kwargs)
259

260
            started = self._start_strobelight()
261
            if not started:
262
                if self.stop_at_error:
263
                    StrobelightCLIFunctionProfiler._lock.release()
264
                    raise StrobelightCLIProfilerError(
265
                        "failed to start strobelight profiling"
266
                    )
267
                result = work_function(*args, **kwargs)
268
                StrobelightCLIFunctionProfiler._lock.release()
269
                return result
270

271
            try:
272
                logger.debug("collection started")
273
                result = work_function(*args, **kwargs)
274
                self._stop_strobelight_no_throw(collect_results=True)
275
                StrobelightCLIFunctionProfiler._lock.release()
276
                return result
277
            except Exception as error:
278
                logger.warning("work function throw exception", exc_info=True)
279
                self._stop_strobelight_no_throw(collect_results=False)
280
                StrobelightCLIFunctionProfiler._lock.release()
281
                raise error
282

283

284
# A function decorator that wraps profile, if no profiler is provided one with
285
# default args is created. A function can be annotated as:
286
# @strobelight()
287
# @strobelight(profiler = StrobelightFunctionProfiler(stop_at_error=True,..))
288
# @strobelight(stop_at_error=True,...)
289
def strobelight(
290
    profiler: Optional[StrobelightCLIFunctionProfiler] = None, **kwargs: Any
291
) -> Any:
292
    if not profiler:
293
        profiler = StrobelightCLIFunctionProfiler(**kwargs)
294

295
    def strobelight_inner(work_function: Any) -> Any:
296
        @functools.wraps(work_function)
297
        def wrapper_function(*args: Any, **kwargs: Any) -> Any:
298
            return profiler.profile(work_function, *args, **kwargs)
299

300
        return wrapper_function
301

302
    return strobelight_inner
303

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.