pytorch

Форк
0
/
ruff_linter.py 
462 строки · 12.7 Кб
1
"""Adapter for https://github.com/charliermarsh/ruff."""
2

3
from __future__ import annotations
4

5
import argparse
6
import concurrent.futures
7
import dataclasses
8
import enum
9
import json
10
import logging
11
import os
12
import subprocess
13
import sys
14
import time
15
from typing import Any, BinaryIO
16

17
LINTER_CODE = "RUFF"
18
IS_WINDOWS: bool = os.name == "nt"
19

20

21
def eprint(*args: Any, **kwargs: Any) -> None:
22
    """Print to stderr."""
23
    print(*args, file=sys.stderr, flush=True, **kwargs)
24

25

26
class LintSeverity(str, enum.Enum):
27
    """Severity of a lint message."""
28

29
    ERROR = "error"
30
    WARNING = "warning"
31
    ADVICE = "advice"
32
    DISABLED = "disabled"
33

34

35
@dataclasses.dataclass(frozen=True)
36
class LintMessage:
37
    """A lint message defined by https://docs.rs/lintrunner/latest/lintrunner/lint_message/struct.LintMessage.html."""
38

39
    path: str | None
40
    line: int | None
41
    char: int | None
42
    code: str
43
    severity: LintSeverity
44
    name: str
45
    original: str | None
46
    replacement: str | None
47
    description: str | None
48

49
    def asdict(self) -> dict[str, Any]:
50
        return dataclasses.asdict(self)
51

52
    def display(self) -> None:
53
        """Print to stdout for lintrunner to consume."""
54
        print(json.dumps(self.asdict()), flush=True)
55

56

57
def as_posix(name: str) -> str:
58
    return name.replace("\\", "/") if IS_WINDOWS else name
59

60

61
def _run_command(
62
    args: list[str],
63
    *,
64
    timeout: int | None,
65
    stdin: BinaryIO | None,
66
    input: bytes | None,
67
    check: bool,
68
    cwd: os.PathLike[Any] | None,
69
) -> subprocess.CompletedProcess[bytes]:
70
    logging.debug("$ %s", " ".join(args))
71
    start_time = time.monotonic()
72
    try:
73
        if input is not None:
74
            return subprocess.run(
75
                args,
76
                capture_output=True,
77
                shell=False,
78
                input=input,
79
                timeout=timeout,
80
                check=check,
81
                cwd=cwd,
82
            )
83

84
        return subprocess.run(
85
            args,
86
            stdin=stdin,
87
            capture_output=True,
88
            shell=False,
89
            timeout=timeout,
90
            check=check,
91
            cwd=cwd,
92
        )
93
    finally:
94
        end_time = time.monotonic()
95
        logging.debug("took %dms", (end_time - start_time) * 1000)
96

97

98
def run_command(
99
    args: list[str],
100
    *,
101
    retries: int = 0,
102
    timeout: int | None = None,
103
    stdin: BinaryIO | None = None,
104
    input: bytes | None = None,
105
    check: bool = False,
106
    cwd: os.PathLike[Any] | None = None,
107
) -> subprocess.CompletedProcess[bytes]:
108
    remaining_retries = retries
109
    while True:
110
        try:
111
            return _run_command(
112
                args, timeout=timeout, stdin=stdin, input=input, check=check, cwd=cwd
113
            )
114
        except subprocess.TimeoutExpired as err:
115
            if remaining_retries == 0:
116
                raise err
117
            remaining_retries -= 1
118
            logging.warning(
119
                "(%s/%s) Retrying because command failed with: %r",
120
                retries - remaining_retries,
121
                retries,
122
                err,
123
            )
124
            time.sleep(1)
125

126

127
def add_default_options(parser: argparse.ArgumentParser) -> None:
128
    """Add default options to a parser.
129

130
    This should be called the last in the chain of add_argument calls.
131
    """
132
    parser.add_argument(
133
        "--retries",
134
        type=int,
135
        default=3,
136
        help="number of times to retry if the linter times out.",
137
    )
138
    parser.add_argument(
139
        "--verbose",
140
        action="store_true",
141
        help="verbose logging",
142
    )
143
    parser.add_argument(
144
        "filenames",
145
        nargs="+",
146
        help="paths to lint",
147
    )
148

149

150
def explain_rule(code: str) -> str:
151
    proc = run_command(
152
        ["ruff", "rule", "--output-format=json", code],
153
        check=True,
154
    )
155
    rule = json.loads(str(proc.stdout, "utf-8").strip())
156
    return f"\n{rule['linter']}: {rule['summary']}"
157

158

159
def get_issue_severity(code: str) -> LintSeverity:
160
    # "B901": `return x` inside a generator
161
    # "B902": Invalid first argument to a method
162
    # "B903": __slots__ efficiency
163
    # "B950": Line too long
164
    # "C4": Flake8 Comprehensions
165
    # "C9": Cyclomatic complexity
166
    # "E2": PEP8 horizontal whitespace "errors"
167
    # "E3": PEP8 blank line "errors"
168
    # "E5": PEP8 line length "errors"
169
    # "T400": type checking Notes
170
    # "T49": internal type checker errors or unmatched messages
171
    if any(
172
        code.startswith(x)
173
        for x in (
174
            "B9",
175
            "C4",
176
            "C9",
177
            "E2",
178
            "E3",
179
            "E5",
180
            "T400",
181
            "T49",
182
            "PLC",
183
            "PLR",
184
        )
185
    ):
186
        return LintSeverity.ADVICE
187

188
    # "F821": Undefined name
189
    # "E999": syntax error
190
    if any(code.startswith(x) for x in ("F821", "E999", "PLE")):
191
        return LintSeverity.ERROR
192

193
    # "F": PyFlakes Error
194
    # "B": flake8-bugbear Error
195
    # "E": PEP8 "Error"
196
    # "W": PEP8 Warning
197
    # possibly other plugins...
198
    return LintSeverity.WARNING
199

200

201
def format_lint_message(
202
    message: str, code: str, rules: dict[str, str], show_disable: bool
203
) -> str:
204
    if rules:
205
        message += f".\n{rules.get(code) or ''}"
206
    message += ".\nSee https://beta.ruff.rs/docs/rules/"
207
    if show_disable:
208
        message += f".\n\nTo disable, use `  # noqa: {code}`"
209
    return message
210

211

212
def check_files(
213
    filenames: list[str],
214
    severities: dict[str, LintSeverity],
215
    *,
216
    config: str | None,
217
    retries: int,
218
    timeout: int,
219
    explain: bool,
220
    show_disable: bool,
221
) -> list[LintMessage]:
222
    try:
223
        proc = run_command(
224
            [
225
                sys.executable,
226
                "-m",
227
                "ruff",
228
                "--exit-zero",
229
                "--quiet",
230
                "--output-format=json",
231
                *([f"--config={config}"] if config else []),
232
                *filenames,
233
            ],
234
            retries=retries,
235
            timeout=timeout,
236
            check=True,
237
        )
238
    except (OSError, subprocess.CalledProcessError) as err:
239
        return [
240
            LintMessage(
241
                path=None,
242
                line=None,
243
                char=None,
244
                code=LINTER_CODE,
245
                severity=LintSeverity.ERROR,
246
                name="command-failed",
247
                original=None,
248
                replacement=None,
249
                description=(
250
                    f"Failed due to {err.__class__.__name__}:\n{err}"
251
                    if not isinstance(err, subprocess.CalledProcessError)
252
                    else (
253
                        f"COMMAND (exit code {err.returncode})\n"
254
                        f"{' '.join(as_posix(x) for x in err.cmd)}\n\n"
255
                        f"STDERR\n{err.stderr.decode('utf-8').strip() or '(empty)'}\n\n"
256
                        f"STDOUT\n{err.stdout.decode('utf-8').strip() or '(empty)'}"
257
                    )
258
                ),
259
            )
260
        ]
261

262
    stdout = str(proc.stdout, "utf-8").strip()
263
    vulnerabilities = json.loads(stdout)
264

265
    if explain:
266
        all_codes = {v["code"] for v in vulnerabilities}
267
        rules = {code: explain_rule(code) for code in all_codes}
268
    else:
269
        rules = {}
270

271
    return [
272
        LintMessage(
273
            path=vuln["filename"],
274
            name=vuln["code"],
275
            description=(
276
                format_lint_message(
277
                    vuln["message"],
278
                    vuln["code"],
279
                    rules,
280
                    show_disable,
281
                )
282
            ),
283
            line=int(vuln["location"]["row"]),
284
            char=int(vuln["location"]["column"]),
285
            code=LINTER_CODE,
286
            severity=severities.get(vuln["code"], get_issue_severity(vuln["code"])),
287
            original=None,
288
            replacement=None,
289
        )
290
        for vuln in vulnerabilities
291
    ]
292

293

294
def check_file_for_fixes(
295
    filename: str,
296
    *,
297
    config: str | None,
298
    retries: int,
299
    timeout: int,
300
) -> list[LintMessage]:
301
    try:
302
        with open(filename, "rb") as f:
303
            original = f.read()
304
        with open(filename, "rb") as f:
305
            proc_fix = run_command(
306
                [
307
                    sys.executable,
308
                    "-m",
309
                    "ruff",
310
                    "--fix-only",
311
                    "--exit-zero",
312
                    *([f"--config={config}"] if config else []),
313
                    "--stdin-filename",
314
                    filename,
315
                    "-",
316
                ],
317
                stdin=f,
318
                retries=retries,
319
                timeout=timeout,
320
                check=True,
321
            )
322
    except (OSError, subprocess.CalledProcessError) as err:
323
        return [
324
            LintMessage(
325
                path=None,
326
                line=None,
327
                char=None,
328
                code=LINTER_CODE,
329
                severity=LintSeverity.ERROR,
330
                name="command-failed",
331
                original=None,
332
                replacement=None,
333
                description=(
334
                    f"Failed due to {err.__class__.__name__}:\n{err}"
335
                    if not isinstance(err, subprocess.CalledProcessError)
336
                    else (
337
                        f"COMMAND (exit code {err.returncode})\n"
338
                        f"{' '.join(as_posix(x) for x in err.cmd)}\n\n"
339
                        f"STDERR\n{err.stderr.decode('utf-8').strip() or '(empty)'}\n\n"
340
                        f"STDOUT\n{err.stdout.decode('utf-8').strip() or '(empty)'}"
341
                    )
342
                ),
343
            )
344
        ]
345

346
    replacement = proc_fix.stdout
347
    if original == replacement:
348
        return []
349

350
    return [
351
        LintMessage(
352
            path=filename,
353
            name="format",
354
            description="Run `lintrunner -a` to apply this patch.",
355
            line=None,
356
            char=None,
357
            code=LINTER_CODE,
358
            severity=LintSeverity.WARNING,
359
            original=original.decode("utf-8"),
360
            replacement=replacement.decode("utf-8"),
361
        )
362
    ]
363

364

365
def main() -> None:
366
    parser = argparse.ArgumentParser(
367
        description=f"Ruff linter. Linter code: {LINTER_CODE}. Use with RUFF-FIX to auto-fix issues.",
368
        fromfile_prefix_chars="@",
369
    )
370
    parser.add_argument(
371
        "--config",
372
        default=None,
373
        help="Path to the `pyproject.toml` or `ruff.toml` file to use for configuration",
374
    )
375
    parser.add_argument(
376
        "--explain",
377
        action="store_true",
378
        help="Explain a rule",
379
    )
380
    parser.add_argument(
381
        "--show-disable",
382
        action="store_true",
383
        help="Show how to disable a lint message",
384
    )
385
    parser.add_argument(
386
        "--timeout",
387
        default=90,
388
        type=int,
389
        help="Seconds to wait for ruff",
390
    )
391
    parser.add_argument(
392
        "--severity",
393
        action="append",
394
        help="map code to severity (e.g. `F401:advice`). This option can be used multiple times.",
395
    )
396
    parser.add_argument(
397
        "--no-fix",
398
        action="store_true",
399
        help="Do not suggest fixes",
400
    )
401
    add_default_options(parser)
402
    args = parser.parse_args()
403

404
    logging.basicConfig(
405
        format="<%(threadName)s:%(levelname)s> %(message)s",
406
        level=logging.NOTSET
407
        if args.verbose
408
        else logging.DEBUG
409
        if len(args.filenames) < 1000
410
        else logging.INFO,
411
        stream=sys.stderr,
412
    )
413

414
    severities: dict[str, LintSeverity] = {}
415
    if args.severity:
416
        for severity in args.severity:
417
            parts = severity.split(":", 1)
418
            assert len(parts) == 2, f"invalid severity `{severity}`"
419
            severities[parts[0]] = LintSeverity(parts[1])
420

421
    lint_messages = check_files(
422
        args.filenames,
423
        severities=severities,
424
        config=args.config,
425
        retries=args.retries,
426
        timeout=args.timeout,
427
        explain=args.explain,
428
        show_disable=args.show_disable,
429
    )
430
    for lint_message in lint_messages:
431
        lint_message.display()
432

433
    if args.no_fix or not lint_messages:
434
        # If we're not fixing, we can exit early
435
        return
436

437
    files_with_lints = {lint.path for lint in lint_messages if lint.path is not None}
438
    with concurrent.futures.ThreadPoolExecutor(
439
        max_workers=os.cpu_count(),
440
        thread_name_prefix="Thread",
441
    ) as executor:
442
        futures = {
443
            executor.submit(
444
                check_file_for_fixes,
445
                path,
446
                config=args.config,
447
                retries=args.retries,
448
                timeout=args.timeout,
449
            ): path
450
            for path in files_with_lints
451
        }
452
        for future in concurrent.futures.as_completed(futures):
453
            try:
454
                for lint_message in future.result():
455
                    lint_message.display()
456
            except Exception:  # Catch all exceptions for lintrunner
457
                logging.critical('Failed at "%s".', futures[future])
458
                raise
459

460

461
if __name__ == "__main__":
462
    main()
463

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.