pytorch
302 строки · 10.8 Кб
1# mypy: disallow-untyped-defs
2
3import functools4import logging5import os6import re7import subprocess8import time9from threading import Lock10from typing import Any, List, Optional, Sequence11
12
13logger = logging.getLogger("strobelight_function_profiler")14
15console_handler = logging.StreamHandler()16formatter = logging.Formatter(17"%(name)s, line %(lineno)d, %(asctime)s, %(levelname)s: %(message)s"18)
19console_handler.setFormatter(formatter)20
21logger.addHandler(console_handler)22logger.setLevel(logging.INFO)23logger.propagate = False24
25
26class StrobelightCLIProfilerError(Exception):27"""28Raised when an error happens during strobelight profiling
29"""
30
31
32def _pid_namespace_link(pid: Optional[int] = None) -> str:33"""Returns the link to the process's namespace, example: pid:[4026531836]"""34PID_NAMESPACE_PATH = "/proc/{}/ns/pid"35pid = pid or os.getpid()36return os.readlink(PID_NAMESPACE_PATH.format(pid))37
38
39def _pid_namespace(pid: Optional[int] = None) -> int:40"""Returns the process's namespace id"""41pid = pid or os.getpid()42link = _pid_namespace_link(pid)43return int(link[link.find("[") + 1 : -1])44
45
46def _command_to_string(command: Sequence[str]) -> str:47return " ".join(command)48
49
50class StrobelightCLIFunctionProfiler:51"""52Note: this is a meta only tool.
53
54StrobelightCLIFunctionProfiler can be used to profile a python function and
55generate a strobelight link with the results. It works on meta servers but
56does not requries an fbcode target.
57When stop_at_error is false(default), error during profiling does not prevent
58the work function from running.
59
60Check function_profiler_example.py for an example.
61"""
62
63# This lock is used to make sure only one thread is running the profiler at any point.64_lock = Lock()65
66def __init__(67self,68*,69stop_at_error: bool = False,70max_profile_duration_sec: int = 60 * 10,71sample_each: float = 1e7, # sample each sample_each cycles.72run_user_name: str = "pytorch-strobelight-ondemand",73timeout_wait_for_running_sec: int = 60,74timeout_wait_for_finished_sec: int = 60,75recorded_env_variables: Optional[List[str]] = None,76sample_tags: Optional[List[str]] = None,77stack_max_len: int = 127,78async_stack_max_len: int = 127,79):80self.stop_at_error = stop_at_error81self.max_profile_duration_sec = max_profile_duration_sec82self.sample_each = sample_each83self.run_user_name = run_user_name84self.timeout_wait_for_running_sec = timeout_wait_for_running_sec85self.timeout_wait_for_finished_sec = timeout_wait_for_finished_sec86# Results of the most recent run.87# Tracks the strobelight run id of the most recent run88self.current_run_id: Optional[int] = None89self.sample_tags = sample_tags90
91def _run_async(self) -> None:92processId = os.getpid()93namespace = _pid_namespace(processId)94command = [95"strobeclient",96"run",97"--profiler",98"pyperf",99"--event",100"cycles",101"--async",102"--sample-interval",103f"{int(self.sample_each)}",104"--duration-ms",105f"{int(self.max_profile_duration_sec * 1000)}",106"--pid",107f"{namespace}:{processId}",108]109
110if self.sample_tags:111command.append("--sample-tags")112command.append(",".join(self.sample_tags))113
114logger.debug("running command: %s", _command_to_string(command))115result = subprocess.run(command, capture_output=True)116output = result.stderr.decode("utf-8")117logger.debug("output:\n{%s}", output)118
119if result.returncode != 0:120raise StrobelightCLIProfilerError(121f"failed to start strobelight profiling, error in run_async:{output}"122)123
124if match := re.search(r"INFO Run Id: (-?\d+)", output):125self.current_run_id = int(match.group(1))126return127
128raise StrobelightCLIProfilerError(129f"failed to start strobelight profiling, unexpected result {output}"130)131
132def _wait_for_running(self, counter: int = 0) -> None:133if counter > 20:134raise StrobelightCLIProfilerError(135"wait_for_running called more than 20 times"136)137
138command = ["strobeclient", "getRunStatus", "--run-id", f"{self.current_run_id}"]139logger.debug("running command: %s", _command_to_string(command))140result = subprocess.run(command, capture_output=True)141output = result.stderr.decode("utf-8")142logger.debug("output:\n{%s}", output)143
144if result.returncode != 0:145raise StrobelightCLIProfilerError(146f"failed to start strobelight profiling, error in wait_for_running:{output}"147)148
149if match := re.search("Profile run status: (.*)", output):150current_status = match.group(1)151if current_status == "RUNNING":152return153elif current_status == "PREPARING":154time.sleep(10)155self._wait_for_running(counter + 1)156return157else:158raise StrobelightCLIProfilerError(f"unexpected {current_status} phase")159
160raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ")161
162def _stop_run(self) -> None:163command = ["strobeclient", "stopRun", "--run-id", str(self.current_run_id)]164logger.debug("running command: %s", _command_to_string(command))165result = subprocess.run(command, capture_output=True)166output = result.stderr.decode("utf-8")167logger.debug("output:\n{%s}", output)168
169if result.returncode != 0:170raise StrobelightCLIProfilerError(171f"failed to stop strobelight profiling, return code is not 0 :{output}"172)173
174if match := re.search("INFO ::1:(.*)", output):175current_status = match.group(1)176if current_status.__contains__("Success!"):177return178else:179raise StrobelightCLIProfilerError(180f"failed to stop strobelight profiling, got {current_status} result"181)182
183raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ")184
185def _get_results(self) -> None:186command = ["strobeclient", "getRunStatus", "--run-id", str(self.current_run_id)]187logger.debug("running command: %s", _command_to_string(command))188result = subprocess.run(command, capture_output=True)189output = result.stderr.decode("utf-8")190logger.debug("output:\n{%s}", output)191
192if result.returncode != 0:193raise StrobelightCLIProfilerError(194f"failed to extract profiling results, return code is not 0 : {output}"195)196
197if match := re.search("INFO ::1:(.*)", output):198current_status = match.group(1)199if current_status.__contains__("Profile run status: PROCESSING"):200time.sleep(10)201self._get_results()202return203elif not current_status.__contains__("Profile run finished with SUCCESS"):204raise StrobelightCLIProfilerError(205f"failed to extract profiling results, unexpected response {output}"206)207
208for item in re.findall(209r"(Total samples(.*)|GraphProfiler(.*)|Icicle view \(python stack\)(.*))",210output,211):212logger.info(item[0])213
214def _stop_strobelight_no_throw(215self,216collect_results: bool,217) -> None:218try:219# call stop run220self._stop_run()221logger.info("strobelight profiling stopped")222
223logger.debug("collection stopped")224
225if not collect_results:226return227
228self._get_results()229except Exception as error:230logger.warning("error during stop_strobelight", exc_info=True)231
232# Return true if strobelight started and is running. Never throw.233def _start_strobelight(self) -> bool:234strobelight_started = False235try:236self._run_async()237strobelight_started = True238logger.info("strobelight run id is: %s", self.current_run_id)239self._wait_for_running()240logger.info("strobelight profiling running")241return True242
243except Exception as error:244logger.warning("error during start_strobelight:", exc_info=True)245if strobelight_started:246self._stop_strobelight_no_throw(collect_results=False)247return False248
249def profile(self, work_function: Any, *args: Any, **kwargs: Any) -> Any:250self.current_run_id = None251
252if locked := StrobelightCLIFunctionProfiler._lock.acquire(False):253if not locked:254if self.stop_at_error:255raise StrobelightCLIProfilerError("concurrent runs not supported")256
257logger.warning("concurrent runs not supported")258return work_function(*args, **kwargs)259
260started = self._start_strobelight()261if not started:262if self.stop_at_error:263StrobelightCLIFunctionProfiler._lock.release()264raise StrobelightCLIProfilerError(265"failed to start strobelight profiling"266)267result = work_function(*args, **kwargs)268StrobelightCLIFunctionProfiler._lock.release()269return result270
271try:272logger.debug("collection started")273result = work_function(*args, **kwargs)274self._stop_strobelight_no_throw(collect_results=True)275StrobelightCLIFunctionProfiler._lock.release()276return result277except Exception as error:278logger.warning("work function throw exception", exc_info=True)279self._stop_strobelight_no_throw(collect_results=False)280StrobelightCLIFunctionProfiler._lock.release()281raise error282
283
284# A function decorator that wraps profile, if no profiler is provided one with
285# default args is created. A function can be annotated as:
286# @strobelight()
287# @strobelight(profiler = StrobelightFunctionProfiler(stop_at_error=True,..))
288# @strobelight(stop_at_error=True,...)
289def strobelight(290profiler: Optional[StrobelightCLIFunctionProfiler] = None, **kwargs: Any291) -> Any:292if not profiler:293profiler = StrobelightCLIFunctionProfiler(**kwargs)294
295def strobelight_inner(work_function: Any) -> Any:296@functools.wraps(work_function)297def wrapper_function(*args: Any, **kwargs: Any) -> Any:298return profiler.profile(work_function, *args, **kwargs)299
300return wrapper_function301
302return strobelight_inner303