1
"""Adapter for https://github.com/charliermarsh/ruff."""
3
from __future__ import annotations
6
import concurrent.futures
15
from typing import Any, BinaryIO
18
IS_WINDOWS: bool = os.name == "nt"
21
def eprint(*args: Any, **kwargs: Any) -> None:
22
"""Print to stderr."""
23
print(*args, file=sys.stderr, flush=True, **kwargs)
26
class LintSeverity(str, enum.Enum):
27
"""Severity of a lint message."""
35
@dataclasses.dataclass(frozen=True)
37
"""A lint message defined by https://docs.rs/lintrunner/latest/lintrunner/lint_message/struct.LintMessage.html."""
43
severity: LintSeverity
46
replacement: str | None
47
description: str | None
49
def asdict(self) -> dict[str, Any]:
50
return dataclasses.asdict(self)
52
def display(self) -> None:
53
"""Print to stdout for lintrunner to consume."""
54
print(json.dumps(self.asdict()), flush=True)
57
def as_posix(name: str) -> str:
58
return name.replace("\\", "/") if IS_WINDOWS else name
65
stdin: BinaryIO | None,
68
cwd: os.PathLike[Any] | None,
69
) -> subprocess.CompletedProcess[bytes]:
70
logging.debug("$ %s", " ".join(args))
71
start_time = time.monotonic()
74
return subprocess.run(
84
return subprocess.run(
94
end_time = time.monotonic()
95
logging.debug("took %dms", (end_time - start_time) * 1000)
102
timeout: int | None = None,
103
stdin: BinaryIO | None = None,
104
input: bytes | None = None,
106
cwd: os.PathLike[Any] | None = None,
107
) -> subprocess.CompletedProcess[bytes]:
108
remaining_retries = retries
112
args, timeout=timeout, stdin=stdin, input=input, check=check, cwd=cwd
114
except subprocess.TimeoutExpired as err:
115
if remaining_retries == 0:
117
remaining_retries -= 1
119
"(%s/%s) Retrying because command failed with: %r",
120
retries - remaining_retries,
127
def add_default_options(parser: argparse.ArgumentParser) -> None:
128
"""Add default options to a parser.
130
This should be called the last in the chain of add_argument calls.
136
help="number of times to retry if the linter times out.",
141
help="verbose logging",
146
help="paths to lint",
150
def explain_rule(code: str) -> str:
152
["ruff", "rule", "--output-format=json", code],
155
rule = json.loads(str(proc.stdout, "utf-8").strip())
156
return f"\n{rule['linter']}: {rule['summary']}"
159
def get_issue_severity(code: str) -> LintSeverity:
160
# "B901": `return x` inside a generator
161
# "B902": Invalid first argument to a method
162
# "B903": __slots__ efficiency
163
# "B950": Line too long
164
# "C4": Flake8 Comprehensions
165
# "C9": Cyclomatic complexity
166
# "E2": PEP8 horizontal whitespace "errors"
167
# "E3": PEP8 blank line "errors"
168
# "E5": PEP8 line length "errors"
169
# "T400": type checking Notes
170
# "T49": internal type checker errors or unmatched messages
186
return LintSeverity.ADVICE
188
# "F821": Undefined name
189
# "E999": syntax error
190
if any(code.startswith(x) for x in ("F821", "E999", "PLE")):
191
return LintSeverity.ERROR
193
# "F": PyFlakes Error
194
# "B": flake8-bugbear Error
197
# possibly other plugins...
198
return LintSeverity.WARNING
201
def format_lint_message(
202
message: str, code: str, rules: dict[str, str], show_disable: bool
205
message += f".\n{rules.get(code) or ''}"
206
message += ".\nSee https://beta.ruff.rs/docs/rules/"
208
message += f".\n\nTo disable, use ` # noqa: {code}`"
213
filenames: list[str],
214
severities: dict[str, LintSeverity],
221
) -> list[LintMessage]:
230
"--output-format=json",
231
*([f"--config={config}"] if config else []),
238
except (OSError, subprocess.CalledProcessError) as err:
245
severity=LintSeverity.ERROR,
246
name="command-failed",
250
f"Failed due to {err.__class__.__name__}:\n{err}"
251
if not isinstance(err, subprocess.CalledProcessError)
253
f"COMMAND (exit code {err.returncode})\n"
254
f"{' '.join(as_posix(x) for x in err.cmd)}\n\n"
255
f"STDERR\n{err.stderr.decode('utf-8').strip() or '(empty)'}\n\n"
256
f"STDOUT\n{err.stdout.decode('utf-8').strip() or '(empty)'}"
262
stdout = str(proc.stdout, "utf-8").strip()
263
vulnerabilities = json.loads(stdout)
266
all_codes = {v["code"] for v in vulnerabilities}
267
rules = {code: explain_rule(code) for code in all_codes}
273
path=vuln["filename"],
283
line=int(vuln["location"]["row"]),
284
char=int(vuln["location"]["column"]),
286
severity=severities.get(vuln["code"], get_issue_severity(vuln["code"])),
290
for vuln in vulnerabilities
294
def check_file_for_fixes(
300
) -> list[LintMessage]:
302
with open(filename, "rb") as f:
304
with open(filename, "rb") as f:
305
proc_fix = run_command(
312
*([f"--config={config}"] if config else []),
322
except (OSError, subprocess.CalledProcessError) as err:
329
severity=LintSeverity.ERROR,
330
name="command-failed",
334
f"Failed due to {err.__class__.__name__}:\n{err}"
335
if not isinstance(err, subprocess.CalledProcessError)
337
f"COMMAND (exit code {err.returncode})\n"
338
f"{' '.join(as_posix(x) for x in err.cmd)}\n\n"
339
f"STDERR\n{err.stderr.decode('utf-8').strip() or '(empty)'}\n\n"
340
f"STDOUT\n{err.stdout.decode('utf-8').strip() or '(empty)'}"
346
replacement = proc_fix.stdout
347
if original == replacement:
354
description="Run `lintrunner -a` to apply this patch.",
358
severity=LintSeverity.WARNING,
359
original=original.decode("utf-8"),
360
replacement=replacement.decode("utf-8"),
366
parser = argparse.ArgumentParser(
367
description=f"Ruff linter. Linter code: {LINTER_CODE}. Use with RUFF-FIX to auto-fix issues.",
368
fromfile_prefix_chars="@",
373
help="Path to the `pyproject.toml` or `ruff.toml` file to use for configuration",
378
help="Explain a rule",
383
help="Show how to disable a lint message",
389
help="Seconds to wait for ruff",
394
help="map code to severity (e.g. `F401:advice`). This option can be used multiple times.",
399
help="Do not suggest fixes",
401
add_default_options(parser)
402
args = parser.parse_args()
405
format="<%(threadName)s:%(levelname)s> %(message)s",
409
if len(args.filenames) < 1000
414
severities: dict[str, LintSeverity] = {}
416
for severity in args.severity:
417
parts = severity.split(":", 1)
418
assert len(parts) == 2, f"invalid severity `{severity}`"
419
severities[parts[0]] = LintSeverity(parts[1])
421
lint_messages = check_files(
423
severities=severities,
425
retries=args.retries,
426
timeout=args.timeout,
427
explain=args.explain,
428
show_disable=args.show_disable,
430
for lint_message in lint_messages:
431
lint_message.display()
433
if args.no_fix or not lint_messages:
434
# If we're not fixing, we can exit early
437
files_with_lints = {lint.path for lint in lint_messages if lint.path is not None}
438
with concurrent.futures.ThreadPoolExecutor(
439
max_workers=os.cpu_count(),
440
thread_name_prefix="Thread",
444
check_file_for_fixes,
447
retries=args.retries,
448
timeout=args.timeout,
450
for path in files_with_lints
452
for future in concurrent.futures.as_completed(futures):
454
for lint_message in future.result():
455
lint_message.display()
456
except Exception: # Catch all exceptions for lintrunner
457
logging.critical('Failed at "%s".', futures[future])
461
if __name__ == "__main__":