CelestialSurveyor
229 строк · 8.4 Кб
1import astropy.io.fits2import cv23import numpy as np4import traceback5
6from xisf import XISF7from multiprocessing import Queue, cpu_count, Pool, Manager8from typing import Optional9from functools import partial10
11from backend.progress_bar import AbstractProgressBar12from logger.logger import get_logger13from backend.data_classes import SharedMemoryParams14from threading import Event15from backend.consuming_functions.measure_execution_time import measure_execution_time16from logging.handlers import QueueHandler17
18
19PIXEL_TYPE = np.float3220logger = get_logger()21
22
23def debayer(img_data: np.ndarray) -> np.ndarray:24"""25Perform debayering on the input image data.
26
27Args:
28img_data: Image data to be debayered.
29
30Returns:
31np.ndarray: Debayered image data.
32"""
33res = np.array(cv2.cvtColor(img_data, cv2.COLOR_BayerBG2GRAY))34res.reshape(img_data.shape[0], img_data.shape[1], 1)35return res36
37
38def to_gray(img_data: np.ndarray) -> np.ndarray:39"""40Convert the input image data to grayscale.
41
42Args:
43img_data (np.ndarray): The input image data to be converted to grayscale.
44
45Returns:
46np.ndarray: The grayscale image data.
47"""
48return np.array(cv2.cvtColor(img_data, cv2.COLOR_BGR2GRAY))49
50
51def load_image_fits(filename: str, to_debayer: bool = False) -> np.ndarray:52"""53Load image data from a FIT(S) file and optionally debayer it.
54
55Args:
56filename (str): The path to the FIT(S) file.
57to_debayer (bool): Whether to perform debayering.
58
59Returns:
60np.ndarray: numpy array containing the image data converted to grayscale.
61"""
62with astropy.io.fits.open(filename) as hdul:63img_data = np.array(hdul[0].data)64if len(img_data.shape) == 2:65img_data = img_data.reshape(img_data.shape[0], img_data.shape[1], 1)66if img_data.shape[0] in [1, 3]:67img_data = np.swapaxes(img_data, 0, 2)68if to_debayer and img_data.shape[2] == 1:69img_data = np.array(debayer(img_data))70img_data = img_data.reshape(img_data.shape[0], img_data.shape[1], 1)71img_data = np.array(img_data)72if img_data.shape[2] == 3:73img_data = np.array(to_gray(img_data))74# Normalize75img_data = img_data.astype('float32')76img_data /= 256 * 256 - 177img_data = img_data.astype(PIXEL_TYPE)78img_data.shape = *img_data.shape[:2],79return img_data80
81
82def load_image_xisf(filename: str, to_debayer: bool = False) -> np.ndarray:83"""84Load image data from a XISF file and optionally debayer it.
85
86Args:
87filename (str): The path to the FITS file.
88to_debayer (bool): Whether to perform debayering.
89
90Returns:
91np.ndarray: numpy array containing the image data converted to grayscale.
92"""
93_ = to_debayer94xisf = XISF(filename)95img_data = xisf.read_image(0)96img_data = np.array(img_data)97if len(img_data.shape) == 2:98img_data.shape = *img_data.shape, 199if img_data.shape[0] in [1, 3]:100img_data = np.swapaxes(img_data, 0, 2)101if img_data.shape[2] == 3:102img_data = np.array(to_gray(img_data))103img_data = img_data.astype(PIXEL_TYPE)104if len(img_data.shape) == 2:105img_data.shape = *img_data.shape[:2],106return img_data107
108
109def load_image(file_path: str, to_debayer: bool = False) -> np.ndarray:110"""111Load an image file and optionally perform debayering.
112
113Args:
114file_path (str): The path to the image file.
115to_debayer (bool): Whether to perform debayering.
116
117Returns:
118np.ndarray: numpy array containing the image data converted to grayscale.
119"""
120if file_path.lower().endswith(".fits") or file_path.lower().endswith(".fit"):121return load_image_fits(file_path, to_debayer)122elif file_path.lower().endswith(".xisf"):123return load_image_xisf(file_path, to_debayer)124else:125raise ValueError(f"Unsupported file format: {file_path}")126
127
128def load_worker(indexes: list[int], file_list: list[str], shm_params: SharedMemoryParams, progress_queue: Queue,129to_debayer: bool = False, stop_queue: Optional[Queue] = None, log_queue: Optional[Queue] = None130) -> None:131"""132Load images into shared memory based on the provided indexes and file list.
133
134Args:
135indexes (list[int]): List of indexes specifying which images to load.
136file_list (list[str]): List of file paths for the images.
137shm_params (SharedMemoryParams): Shared memory parameters for loading images.
138progress_queue (Queue): Queue for reporting progress.
139to_debayer (bool, optional): Whether to perform debayering. Defaults to False.
140stop_queue (Queue, optional): Queue for stopping the loading process. Defaults to None.
141log_queue (Queue, optional): Queue for logging messages. Defaults to None.
142
143Returns:
144None
145"""
146handler = QueueHandler(log_queue)147logger.log.addHandler(handler)148logger.log.debug(f"Load worker started with {len(indexes)} images")149logger.log.debug(f"Shared memory parameters: {shm_params}")150try:151imgs = np.memmap(shm_params.shm_name, dtype=PIXEL_TYPE, mode='r+', shape=shm_params.shm_shape)152for img_idx in indexes:153if stop_queue and not stop_queue.empty():154logger.log.debug("Load images worker detected stop event. Stopping.")155break156img_data = load_image(file_list[img_idx], to_debayer)157imgs[img_idx] = img_data158progress_queue.put(img_idx)159
160except Exception:161logger.log.error(f"Load worker failed due to the following error:\n{traceback.format_exc()}")162stop_queue.put("ERROR")163raise164
165
166@measure_execution_time
167def load_images(file_list: list[str], shm_params: SharedMemoryParams, to_debayer: bool = False,168progress_bar: Optional[AbstractProgressBar] = None, stop_event: Optional[Event] = None):169"""170Load images from the provided file list into shared memory using multiple workers.
171
172Args:
173file_list (list[str]): List of file paths for the images to load.
174shm_params (SharedMemoryParams): Shared memory parameters for loading images.
175to_debayer (bool, optional): Whether to perform debayering. Defaults to False.
176progress_bar (Optional[AbstractProgressBar], optional): Progress bar for tracking the loading progress.
177Defaults to None.
178stop_event (Optional[Event], optional): Event to signal stopping the loading process. Defaults to None.
179
180Returns:
181None
182"""
183available_cpus = cpu_count() - 1184used_cpus = min(available_cpus, len(file_list))185logger.log.debug(f"Number of CPUs to be used for loading images: {used_cpus}")186with (Pool(processes=used_cpus) as pool):187m = Manager()188progress_queue = m.Queue()189log_queue = m.Queue()190logger.start_process_listener(log_queue)191stop_queue = m.Queue()192logger.log.debug(f"Starting loading images with {used_cpus} workers")193results = pool.map_async(194partial(195load_worker,196file_list=file_list,197shm_params=shm_params,198to_debayer=to_debayer,199progress_queue=progress_queue,200stop_queue=stop_queue,201log_queue=log_queue202),203np.array_split(np.arange(len(file_list)), used_cpus))204if progress_bar is not None:205progress_bar.set_total(len(file_list))206for _ in range(len(file_list)):207if stop_event is not None and stop_event.is_set():208stop_queue.put(True)209logger.log.debug("Stop event triggered")210break211
212got_result = False213while not got_result:214if not progress_queue.empty():215progress_queue.get()216logger.log.debug("Got a result from the progress queue")217got_result = True218if not stop_queue.empty():219logger.log.debug("Detected error from workers. Stopping.")220break221if not stop_queue.empty():222break223progress_bar.update()224progress_bar.complete()225results.get()226pool.close()227pool.join()228logger.log.debug(f"Load images pool stopped.")229logger.stop_process_listener()230