streamlit

Форк
0
589 строк · 19.7 Кб
1
# Copyright (c) Streamlit Inc. (2018-2022) Snowflake Inc. (2022-2024)
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
"""
16
Global pytest fixtures for e2e tests.
17
This file is automatically run by pytest before tests are executed.
18
"""
19
from __future__ import annotations
20

21
import hashlib
22
import os
23
import re
24
import shlex
25
import shutil
26
import socket
27
import subprocess
28
import sys
29
import time
30
from io import BytesIO
31
from pathlib import Path
32
from random import randint
33
from tempfile import TemporaryFile
34
from types import ModuleType
35
from typing import Any, Dict, Generator, List, Literal, Protocol, Tuple
36
from urllib import parse
37

38
import pytest
39
import requests
40
from PIL import Image
41
from playwright.sync_api import ElementHandle, Locator, Page
42
from pytest import FixtureRequest
43

44

45
def reorder_early_fixtures(metafunc: pytest.Metafunc):
46
    """Put fixtures with `pytest.mark.early` first during execution
47

48
    This allows patch of configurations before the application is initialized
49

50
    Copied from: https://github.com/pytest-dev/pytest/issues/1216#issuecomment-456109892
51
    """
52
    for fixturedef in metafunc._arg2fixturedefs.values():
53
        fixturedef = fixturedef[0]
54
        for mark in getattr(fixturedef.func, "pytestmark", []):
55
            if mark.name == "early":
56
                order = metafunc.fixturenames
57
                order.insert(0, order.pop(order.index(fixturedef.argname)))
58
                break
59

60

61
def pytest_generate_tests(metafunc: pytest.Metafunc):
62
    reorder_early_fixtures(metafunc)
63

64

65
class AsyncSubprocess:
66
    """A context manager. Wraps subprocess. Popen to capture output safely."""
67

68
    def __init__(self, args, cwd=None, env=None):
69
        self.args = args
70
        self.cwd = cwd
71
        self.env = env or {}
72
        self._proc = None
73
        self._stdout_file = None
74

75
    def terminate(self):
76
        """Terminate the process and return its stdout/stderr in a string."""
77
        if self._proc is not None:
78
            self._proc.terminate()
79
            self._proc.wait()
80
            self._proc = None
81

82
        # Read the stdout file and close it
83
        stdout = None
84
        if self._stdout_file is not None:
85
            self._stdout_file.seek(0)
86
            stdout = self._stdout_file.read()
87
            self._stdout_file.close()
88
            self._stdout_file = None
89

90
        return stdout
91

92
    def __enter__(self):
93
        self.start()
94
        return self
95

96
    def start(self):
97
        # Start the process and capture its stdout/stderr output to a temp
98
        # file. We do this instead of using subprocess.PIPE (which causes the
99
        # Popen object to capture the output to its own internal buffer),
100
        # because large amounts of output can cause it to deadlock.
101
        self._stdout_file = TemporaryFile("w+")
102
        print(f"Running: {shlex.join(self.args)}")
103
        self._proc = subprocess.Popen(
104
            self.args,
105
            cwd=self.cwd,
106
            stdout=self._stdout_file,
107
            stderr=subprocess.STDOUT,
108
            text=True,
109
            env={**os.environ.copy(), **self.env},
110
        )
111

112
    def __exit__(self, exc_type, exc_val, exc_tb):
113
        if self._proc is not None:
114
            self._proc.terminate()
115
            self._proc = None
116
        if self._stdout_file is not None:
117
            self._stdout_file.close()
118
            self._stdout_file = None
119

120

121
def resolve_test_to_script(test_module: ModuleType) -> str:
122
    """Resolve the test module to the corresponding test script filename."""
123
    assert test_module.__file__ is not None
124
    return test_module.__file__.replace("_test.py", ".py")
125

126

127
def hash_to_range(
128
    text: str,
129
    min: int = 10000,
130
    max: int = 65535,
131
) -> int:
132
    sha256_hash = hashlib.sha256(text.encode("utf-8")).hexdigest()
133
    return min + (int(sha256_hash, 16) % (max - min + 1))
134

135

136
def is_port_available(port: int, host: str) -> bool:
137
    """Check if a port is available on the given host."""
138
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
139
        return sock.connect_ex((host, port)) != 0
140

141

142
def find_available_port(
143
    min_port: int = 10000,
144
    max_port: int = 65535,
145
    max_tries: int = 50,
146
    host: str = "localhost",
147
) -> int:
148
    """Find an available port on the given host."""
149
    for _ in range(max_tries):
150
        selected_port = randint(min_port, max_port)
151
        if is_port_available(selected_port, host):
152
            return selected_port
153
    raise RuntimeError("Unable to find an available port.")
154

155

156
def is_app_server_running(port: int, host: str = "localhost") -> bool:
157
    """Check if the app server is running."""
158
    try:
159
        return (
160
            requests.get(f"http://{host}:{port}/_stcore/health", timeout=1).text == "ok"
161
        )
162
    except Exception:
163
        return False
164

165

166
def wait_for_app_server_to_start(port: int, timeout: int = 5) -> bool:
167
    """Wait for the app server to start.
168

169
    Parameters
170
    ----------
171
    port : int
172
        The port on which the app server is running.
173

174
    timeout : int
175
        The number of minutes to wait for the app server to start.
176

177
    Returns
178
    -------
179
    bool
180
        True if the app server is started, False otherwise.
181
    """
182

183
    print(f"Waiting for app to start... {port}")
184
    start_time = time.time()
185
    while not is_app_server_running(port):
186
        time.sleep(3)
187
        if time.time() - start_time > 60 * timeout:
188
            return False
189
    return True
190

191

192
@pytest.fixture(scope="module")
193
def app_port(worker_id: str) -> int:
194
    """Fixture that returns an available port on localhost."""
195
    if worker_id and worker_id != "master":
196
        # This is run with xdist, we try to get a port by hashing the worker ID
197
        port = hash_to_range(worker_id)
198
        if is_port_available(port, "localhost"):
199
            return port
200
    # Find a random available port:
201
    return find_available_port()
202

203

204
@pytest.fixture(scope="module", autouse=True)
205
def app_server(
206
    app_port: int, request: FixtureRequest
207
) -> Generator[AsyncSubprocess, None, None]:
208
    """Fixture that starts and stops the Streamlit app server."""
209
    streamlit_proc = AsyncSubprocess(
210
        [
211
            "streamlit",
212
            "run",
213
            resolve_test_to_script(request.module),
214
            "--server.headless",
215
            "true",
216
            "--global.developmentMode",
217
            "false",
218
            "--server.port",
219
            str(app_port),
220
            "--browser.gatherUsageStats",
221
            "false",
222
            "--server.fileWatcherType",
223
            "none",
224
        ],
225
        cwd=".",
226
    )
227
    streamlit_proc.start()
228
    if not wait_for_app_server_to_start(app_port):
229
        streamlit_stdout = streamlit_proc.terminate()
230
        print(streamlit_stdout)
231
        raise RuntimeError("Unable to start Streamlit app")
232
    yield streamlit_proc
233
    streamlit_stdout = streamlit_proc.terminate()
234
    print(streamlit_stdout)
235

236

237
@pytest.fixture(scope="function")
238
def app(page: Page, app_port: int) -> Page:
239
    """Fixture that opens the app."""
240
    page.goto(f"http://localhost:{app_port}/")
241
    wait_for_app_loaded(page)
242
    return page
243

244

245
@pytest.fixture(scope="function")
246
def app_with_query_params(
247
    page: Page, app_port: int, request: FixtureRequest
248
) -> Tuple[Page, Dict]:
249
    """Fixture that opens the app with additional query parameters.
250
    The query parameters are passed as a dictionary in the 'param' key of the request.
251
    """
252
    query_params = request.param
253
    query_string = parse.urlencode(query_params, doseq=True)
254
    url = f"http://localhost:{app_port}/?{query_string}"
255
    page.goto(url)
256
    wait_for_app_loaded(page)
257

258
    return page, query_params
259

260

261
@pytest.fixture(scope="session")
262
def browser_type_launch_args(browser_type_launch_args: Dict, browser_name: str):
263
    """Fixture that adds the fake device and ui args to the browser type launch args."""
264
    # The browser context fixture in pytest-playwright is defined in session scope, and
265
    # depends on the browser_type_launch_args fixture. This means that we can't
266
    # redefine the browser_type_launch_args fixture more narrow scope
267
    # e.g. function or module scope.
268
    # https://github.com/microsoft/playwright-pytest/blob/ef99541352b307411dbc15c627e50f95de30cc71/pytest_playwright/pytest_playwright.py#L128
269

270
    # We need to extend browser launch args to support fake video stream for
271
    # st.camera_input test.
272
    # https://github.com/microsoft/playwright/issues/4532#issuecomment-1491761713
273

274
    if browser_name == "chromium":
275
        browser_type_launch_args = {
276
            **browser_type_launch_args,
277
            "args": [
278
                "--use-fake-device-for-media-stream",
279
                "--use-fake-ui-for-media-stream",
280
            ],
281
        }
282

283
    elif browser_name == "firefox":
284
        browser_type_launch_args = {
285
            **browser_type_launch_args,
286
            "firefox_user_prefs": {
287
                "media.navigator.streams.fake": True,
288
                "media.navigator.permission.disabled": True,
289
                "permissions.default.microphone": 1,
290
                "permissions.default.camera": 1,
291
            },
292
        }
293
    return browser_type_launch_args
294

295

296
@pytest.fixture(scope="function", params=["light_theme", "dark_theme"])
297
def app_theme(request) -> str:
298
    """Fixture that returns the theme name."""
299
    return str(request.param)
300

301

302
@pytest.fixture(scope="function")
303
def themed_app(page: Page, app_port: int, app_theme: str) -> Page:
304
    """Fixture that opens the app with the given theme."""
305
    page.goto(f"http://localhost:{app_port}/?embed_options={app_theme}")
306
    wait_for_app_loaded(page)
307
    return page
308

309

310
class ImageCompareFunction(Protocol):
311
    def __call__(
312
        self,
313
        element: ElementHandle | Locator | Page,
314
        *,
315
        image_threshold: float = 0.002,
316
        pixel_threshold: float = 0.05,
317
        name: str | None = None,
318
        fail_fast: bool = False,
319
    ) -> None:
320
        """Compare a screenshot with screenshot from a past run.
321

322
        Parameters
323
        ----------
324
        element : ElementHandle or Locator
325
            The element to take a screenshot of.
326
        image_threshold : float, optional
327
            The allowed percentage of different pixels in the image.
328
        pixel_threshold : float, optional
329
            The allowed percentage of difference for a single pixel.
330
        name : str | None, optional
331
            The name of the screenshot without an extension. If not provided, the name
332
            of the test function will be used.
333
        fail_fast : bool, optional
334
            If True, the comparison will stop at the first pixel mismatch.
335
        """
336

337

338
@pytest.fixture(scope="session")
339
def output_folder(pytestconfig: Any) -> Path:
340
    """Fixture that returns the directory that is used for all test failures information.
341

342
    This includes:
343
    - snapshot-tests-failures: This directory contains all the snapshots that did not
344
    match with the snapshots from past runs. The folder structure is based on the folder
345
    structure used in the main snapshots folder.
346
    - snapshot-updates: This directory contains all the snapshots that got updated in
347
    the current run based on folder structure used in the main snapshots folder.
348
    """
349
    return Path(pytestconfig.getoption("--output")).resolve()
350

351

352
@pytest.fixture(scope="function")
353
def assert_snapshot(
354
    request: FixtureRequest, output_folder: Path
355
) -> Generator[ImageCompareFunction, None, None]:
356
    """Fixture that compares a screenshot with screenshot from a past run."""
357
    root_path = Path(os.getcwd()).resolve()
358
    platform = str(sys.platform)
359
    module_name = request.module.__name__.split(".")[-1]
360
    test_function_name = request.node.originalname
361

362
    snapshot_dir: Path = root_path / "__snapshots__" / platform / module_name
363

364
    module_snapshot_failures_dir: Path = (
365
        output_folder / "snapshot-tests-failures" / platform / module_name
366
    )
367
    module_snapshot_updates_dir: Path = (
368
        output_folder / "snapshot-updates" / platform / module_name
369
    )
370

371
    snapshot_file_suffix = ""
372
    # Extract the parameter ids if they exist
373
    match = re.search(r"\[(.*?)\]", request.node.name)
374
    if match:
375
        snapshot_file_suffix = f"[{match.group(1)}]"
376

377
    snapshot_default_file_name: str = test_function_name + snapshot_file_suffix
378

379
    test_failure_messages: List[str] = []
380

381
    def compare(
382
        element: ElementHandle | Locator | Page,
383
        *,
384
        image_threshold: float = 0.002,
385
        pixel_threshold: float = 0.05,
386
        name: str | None = None,
387
        fail_fast: bool = False,
388
        file_type: Literal["png", "jpg"] = "png",
389
    ) -> None:
390
        """Compare a screenshot with screenshot from a past run.
391

392
        Parameters
393
        ----------
394
        element : ElementHandle or Locator
395
            The element to take a screenshot of.
396
        image_threshold : float, optional
397
            The allowed percentage of different pixels in the image.
398
        pixel_threshold : float, optional
399
            The allowed percentage of difference for a single pixel to be considered
400
            different.
401
        name : str | None, optional
402
            The name of the screenshot without an extension. If not provided, the name
403
            of the test function will be used.
404
        fail_fast : bool, optional
405
            If True, the comparison will stop at the first pixel mismatch.
406
        file_type: "png" or "jpg"
407
            The file type of the screenshot. Defaults to "png".
408
        """
409
        nonlocal test_failure_messages
410
        nonlocal snapshot_default_file_name
411
        nonlocal module_snapshot_updates_dir
412
        nonlocal module_snapshot_failures_dir
413
        nonlocal snapshot_file_suffix
414

415
        if file_type == "jpg":
416
            file_extension = ".jpg"
417
            img_bytes = element.screenshot(
418
                type="jpeg", quality=90, animations="disabled"
419
            )
420

421
        else:
422
            file_extension = ".png"
423
            img_bytes = element.screenshot(type="png", animations="disabled")
424

425
        snapshot_file_name: str = snapshot_default_file_name
426
        if name:
427
            snapshot_file_name = name + snapshot_file_suffix
428

429
        snapshot_file_path: Path = (
430
            snapshot_dir / f"{snapshot_file_name}{file_extension}"
431
        )
432

433
        snapshot_updates_file_path: Path = (
434
            module_snapshot_updates_dir / f"{snapshot_file_name}{file_extension}"
435
        )
436

437
        snapshot_file_path.parent.mkdir(parents=True, exist_ok=True)
438

439
        test_failures_dir = module_snapshot_failures_dir / snapshot_file_name
440
        if test_failures_dir.exists():
441
            # Remove the past runs failure dir for this specific screenshot
442
            shutil.rmtree(test_failures_dir)
443

444
        if not snapshot_file_path.exists():
445
            snapshot_file_path.write_bytes(img_bytes)
446
            # Update this in updates folder:
447
            snapshot_updates_file_path.parent.mkdir(parents=True, exist_ok=True)
448
            snapshot_updates_file_path.write_bytes(img_bytes)
449
            # For missing snapshots, we don't want to directly fail in order to generate
450
            # all missing snapshots in one run.
451
            test_failure_messages.append(f"Missing snapshot for {snapshot_file_name}")
452
            return
453

454
        from pixelmatch.contrib.PIL import pixelmatch
455

456
        # Compare the new screenshot with the screenshot from past runs:
457
        img_a = Image.open(BytesIO(img_bytes))
458
        img_b = Image.open(snapshot_file_path)
459
        img_diff = Image.new("RGBA", img_a.size)
460
        try:
461
            mismatch = pixelmatch(
462
                img_a,
463
                img_b,
464
                img_diff,
465
                threshold=pixel_threshold,
466
                fail_fast=fail_fast,
467
                alpha=0,
468
            )
469
        except ValueError as ex:
470
            # ValueError is thrown when the images have different sizes
471
            # Update this in updates folder:
472
            snapshot_updates_file_path.parent.mkdir(parents=True, exist_ok=True)
473
            snapshot_updates_file_path.write_bytes(img_bytes)
474
            pytest.fail(f"Snapshot matching for {snapshot_file_name} failed: {ex}")
475
        max_diff_pixels = int(image_threshold * img_a.size[0] * img_a.size[1])
476

477
        if mismatch < max_diff_pixels:
478
            return
479

480
        # Update this in updates folder:
481
        snapshot_updates_file_path.parent.mkdir(parents=True, exist_ok=True)
482
        snapshot_updates_file_path.write_bytes(img_bytes)
483

484
        # Create new failures folder for this test:
485
        test_failures_dir.mkdir(parents=True, exist_ok=True)
486
        img_diff.save(f"{test_failures_dir}/diff_{snapshot_file_name}{file_extension}")
487
        img_a.save(f"{test_failures_dir}/actual_{snapshot_file_name}{file_extension}")
488
        img_b.save(f"{test_failures_dir}/expected_{snapshot_file_name}{file_extension}")
489

490
        pytest.fail(
491
            f"Snapshot mismatch for {snapshot_file_name} ({mismatch} pixels difference)"
492
        )
493

494
    yield compare
495

496
    if test_failure_messages:
497
        pytest.fail("Missing snapshots: \n" + "\n".join(test_failure_messages))
498

499

500
# Public utility methods:
501

502

503
def wait_for_app_run(page: Page, wait_delay: int = 100):
504
    """Wait for the given page to finish running."""
505
    page.wait_for_selector(
506
        "[data-testid='stStatusWidget']", timeout=20000, state="detached"
507
    )
508

509
    if wait_delay > 0:
510
        # Give the app a little more time to render everything
511
        page.wait_for_timeout(wait_delay)
512

513

514
def wait_for_app_loaded(page: Page, embedded: bool = False):
515
    """Wait for the app to fully load."""
516
    # Wait for the app view container to appear:
517
    page.wait_for_selector(
518
        "[data-testid='stAppViewContainer']", timeout=30000, state="attached"
519
    )
520

521
    # Wait for the main menu to appear:
522
    if not embedded:
523
        page.wait_for_selector(
524
            "[data-testid='stMainMenu']", timeout=20000, state="attached"
525
        )
526

527
    wait_for_app_run(page)
528

529

530
def rerun_app(page: Page):
531
    """Triggers an app rerun and waits for the run to be finished."""
532
    # Click somewhere to clear the focus from elements:
533
    page.get_by_test_id("stApp").click(position={"x": 0, "y": 0})
534
    # Press "r" to rerun the app:
535
    page.keyboard.press("r")
536
    wait_for_app_run(page)
537

538

539
def wait_until(page: Page, fn: callable, timeout: int = 5000, interval: int = 100):
540
    """Run a test function in a loop until it evaluates to True
541
    or times out.
542

543
    For example:
544
    >>> wait_until(lambda: x.values() == ['x'], page)
545

546
    Parameters
547
    ----------
548
    page : playwright.sync_api.Page
549
        Playwright page
550
    fn : callable
551
        Callback
552
    timeout : int, optional
553
        Total timeout in milliseconds, by default 5000
554
    interval : int, optional
555
        Waiting interval, by default 100
556

557
    Adapted from panel.
558
    """
559
    # Hide this function traceback from the pytest output if the test fails
560
    __tracebackhide__ = True
561

562
    start = time.time()
563

564
    def timed_out():
565
        elapsed = time.time() - start
566
        elapsed_ms = elapsed * 1000
567
        return elapsed_ms > timeout
568

569
    timeout_msg = f"wait_until timed out in {timeout} milliseconds"
570

571
    while True:
572
        try:
573
            result = fn()
574
        except AssertionError as e:
575
            if timed_out():
576
                raise TimeoutError(timeout_msg) from e
577
        else:
578
            if result not in (None, True, False):
579
                raise ValueError(
580
                    "`wait_until` callback must return None, True or "
581
                    f"False, returned {result!r}"
582
                )
583
            # Stop is result is True or None
584
            # None is returned when the function has an assert
585
            if result is None or result:
586
                return
587
            if timed_out():
588
                raise TimeoutError(timeout_msg)
589
        page.wait_for_timeout(interval)
590

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.