streamlit

Форк
0
/
exception_capturing_thread.py 
116 строк · 4.0 Кб
1
# Copyright (c) Streamlit Inc. (2018-2022) Snowflake Inc. (2022-2024)
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import threading
16
from typing import Any, Callable, Optional
17

18
from streamlit.runtime.forward_msg_queue import ForwardMsgQueue
19
from streamlit.runtime.memory_uploaded_file_manager import MemoryUploadedFileManager
20
from streamlit.runtime.scriptrunner import ScriptRunContext, add_script_run_ctx
21
from streamlit.runtime.state import SafeSessionState, SessionState
22

23

24
def call_on_threads(
25
    func: Callable[[int], Any],
26
    num_threads: int,
27
    timeout: Optional[float] = 0.25,
28
    attach_script_run_ctx: bool = True,
29
) -> None:
30
    """Call a function on multiple threads simultaneously and assert that no
31
    thread raises an unhandled exception.
32

33
    The function must take single `int` param, which will be the index of
34
    the thread it's being called on.
35

36
    Note that a passing multi-threaded test does not generally guarantee that
37
    the tested code is thread safe! Because threading issues tend to be
38
    non-deterministic, a flaky test that fails only occasionally is a good
39
    indicator of an underlying issue.
40

41
    Parameters
42
    ----------
43
    func
44
        The function to call on each thread.
45
    num_threads
46
        The number of threads to create.
47
    timeout
48
        If the thread runs for longer than this amount of time, raise an
49
        Exception.
50
    attach_script_run_ctx
51
        If True, attach a mock ScriptRunContext to each thread before
52
        starting.
53
    """
54
    threads = [
55
        ExceptionCapturingThread(name=f"Thread {ii}", target=func, args=[ii])
56
        for ii in range(num_threads)
57
    ]
58

59
    if attach_script_run_ctx:
60
        for ii in range(num_threads):
61
            ctx = ScriptRunContext(
62
                session_id=f"Thread{ii}_Session",
63
                _enqueue=ForwardMsgQueue().enqueue,
64
                query_string="",
65
                session_state=SafeSessionState(SessionState(), lambda: None),
66
                uploaded_file_mgr=MemoryUploadedFileManager("/mock/upload"),
67
                main_script_path="",
68
                page_script_hash="",
69
                user_info={"email": "test@test.com"},
70
            )
71
            thread = threads[ii]
72
            add_script_run_ctx(thread, ctx)
73

74
    for thread in threads:
75
        thread.start()
76

77
    for thread in threads:
78
        thread.join(timeout=timeout)
79
        thread.assert_no_unhandled_exception()
80

81

82
class ExceptionCapturingThread(threading.Thread):
83
    """Thread subclass that captures unhandled exceptions."""
84

85
    def __init__(
86
        self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None
87
    ):
88
        super().__init__(
89
            group=group,
90
            target=target,
91
            name=name,
92
            args=args,
93
            kwargs=kwargs,
94
            daemon=daemon,
95
        )
96
        self._unhandled_exception: Optional[BaseException] = None
97

98
    @property
99
    def unhandled_exception(self) -> Optional[BaseException]:
100
        """The unhandled exception raised by the thread's target, if it raised one."""
101
        return self._unhandled_exception
102

103
    def assert_no_unhandled_exception(self) -> None:
104
        """If the thread target raised an unhandled exception, re-raise it.
105
        Otherwise no-op.
106
        """
107
        if self._unhandled_exception is not None:
108
            raise RuntimeError(
109
                f"Unhandled exception in thread '{self.name}'"
110
            ) from self._unhandled_exception
111

112
    def run(self) -> None:
113
        try:
114
            super().run()
115
        except Exception as e:
116
            self._unhandled_exception = e
117

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.