CelestialSurveyor

Форк
0
108 строк · 4.5 Кб
1
import numpy as np
2
import traceback
3

4
from auto_stretch.stretch import Stretch
5
from functools import partial
6
from logging.handlers import QueueHandler
7
from multiprocessing import Queue, cpu_count, Pool, Manager
8
from threading import Event
9
from typing import Optional
10

11
from backend.progress_bar import AbstractProgressBar
12
from logger.logger import get_logger
13
from backend.data_classes import SharedMemoryParams
14
from backend.consuming_functions.measure_execution_time import measure_execution_time
15

16

17
logger = get_logger()
18

19

20
@measure_execution_time
21
def stretch_images(shm_params: SharedMemoryParams, progress_bar: Optional[AbstractProgressBar] = None,
22
                   stop_event: Optional[Event] = None) -> None:
23
    """
24
    Stretch images stored in shared memory in parallel using multiprocessing.
25

26
    Args:
27
        shm_params (SharedMemoryParams): Shared memory parameters for the images.
28
        progress_bar (Optional[AbstractProgressBar]): Progress bar to track the stretching progress.
29
        stop_event (Optional[Event]): Event to stop the stretching process.
30

31
    Returns:
32
        None
33
    """
34
    available_cpus = cpu_count() - 1
35
    frames_num = shm_params.shm_shape[0]
36
    used_cpus = min(available_cpus, frames_num)
37
    logger.log.debug(f"Number of CPUs to be used for loading images: {used_cpus}")
38
    with Pool(processes=used_cpus) as pool:
39
        m = Manager()
40
        progress_queue = m.Queue()
41
        stop_queue = m.Queue(maxsize=1)
42
        log_queue = m.Queue()
43
        logger.start_process_listener(log_queue)
44
        logger.log.debug(f"Starting stretching images with {used_cpus} workers")
45
        results = pool.map_async(
46
            partial(stretch_worker, shm_params=shm_params, progress_queue=progress_queue, stop_queue=stop_queue,
47
                    log_queue=log_queue),
48
            np.array_split(np.arange(frames_num), used_cpus))
49
        if progress_bar is not None:
50
            progress_bar.set_total(frames_num)
51
            for _ in range(frames_num):
52
                if stop_event is not None and stop_event.is_set():
53
                    stop_queue.put(True)
54
                    logger.log.debug("Stop event triggered")
55
                    break
56
                got_result = False
57
                while not got_result:
58
                    if not progress_queue.empty():
59
                        progress_queue.get()
60
                        got_result = True
61
                        logger.log.debug("Got a result from the progress queue")
62
                    if not stop_queue.empty():
63
                        logger.log.debug("Detected error from workers. Stopping.")
64
                        break
65
                if not stop_queue.empty():
66
                    break
67
                progress_bar.update()
68
            progress_bar.complete()
69
        results.get()
70
        pool.close()
71
        pool.join()
72
        logger.log.debug(f"Plate solve pool stopped.")
73
        logger.stop_process_listener()
74

75

76
def stretch_worker(img_indexes: list[int], shm_params: SharedMemoryParams, progress_queue: Queue,
77
                   stop_queue: Optional[Queue] = None, log_queue: Optional[Queue] = None) -> None:
78
    """
79
    Worker function to stretch images with the provided indexes in shared memory.
80

81
    Args:
82
        img_indexes (list[int]): List of image indexes to stretch.
83
        shm_params (SharedMemoryParams): Shared memory parameters for images.
84
        progress_queue (Queue): Queue for reporting progress.
85
        stop_queue (Optional[Queue], optional): Queue for stopping the worker process. Defaults to None.
86
        log_queue (Optional[Queue], optional): Queue for logging messages. Defaults to None.
87

88
    Returns:
89
        None
90
    """
91
    handler = QueueHandler(log_queue)
92
    logger.log.addHandler(handler)
93
    logger.log.debug(f"Load worker started with {len(img_indexes)} images")
94
    logger.log.debug(f"Shared memory parameters: {shm_params}")
95
    try:
96
        imgs = np.memmap(shm_params.shm_name, dtype=shm_params.shm_dtype, mode='r+', shape=shm_params.shm_shape)
97
        for img_idx in img_indexes:
98
            if stop_queue is not None and not stop_queue.empty():
99
                logger.log.debug("Plate solve worker detected stop event. Stopping.")
100
                break
101
            img = imgs[img_idx, shm_params.y_slice, shm_params.x_slice]
102
            imgs[img_idx, shm_params.y_slice, shm_params.x_slice] = Stretch().stretch(img)
103
            progress_queue.put(img_idx)
104
            imgs.flush()
105
    except Exception:
106
        logger.log.error(f"Stretch worker failed due to the following error:\n{traceback.format_exc()}")
107
        stop_queue.put("ERROR")
108
        raise
109

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.