CelestialSurveyor
184 строки · 7.9 Кб
1import numpy as np2import traceback3import twirl4
5from astropy import units as u6from astropy.wcs import WCS7from astropy.coordinates import SkyCoord8from astroquery.gaia import Gaia9from functools import partial10from logging.handlers import QueueHandler11from multiprocessing import Queue, cpu_count, Pool, Manager12from threading import Event13from typing import Optional14
15from backend.progress_bar import AbstractProgressBar16from backend.data_classes import Header17from logger.logger import get_logger18from backend.data_classes import SharedMemoryParams19from backend.consuming_functions.measure_execution_time import measure_execution_time20
21
22logger = get_logger()23
24
25def plate_solve_image(image: np.ndarray, header: Header,26sky_coord: Optional[np.ndarray] = None) -> tuple[WCS, np.ndarray]:27"""28Plate solves the image basing on the provided GAIA data. If this data is not available - it will be requested.
29
30Args:
31image (np.ndarray): The image to be plate solved.
32header (Header): The header information related to the image.
33sky_coord (Optional[np.ndarray]): Source information for the current FOV got from GAIA. Defaults to None.
34
35Returns:
36tuple[WCS, np.ndarray]: The plate solved WCS and sky coordinates.
37"""
38header_data = header.solve_data39pixel = header_data.pixel_scale * u.arcsec # known pixel scale40img = np.copy(image)41img = np.reshape(img, (img.shape[0], img.shape[1]))42shape = img.shape43fov = np.min(shape[:2]) * pixel.to(u.deg)44if sky_coord is None:45# sky_coord = twirl.gaia_radecs(header_data.sky_coord, fov)[0:200]46sky_coord = get_sources_from_gaia(header_data.sky_coord, fov)[0:200]47sky_coord = twirl.geometry.sparsify(sky_coord, 0.1)48sky_coord = sky_coord[:25]49top_left_corner = (slice(None, img.shape[0] // 2), slice(None, img.shape[1] // 2), (0, 0))50bottom_left_corner = (slice(img.shape[0] // 2, None), slice(None, img.shape[1] // 2), (img.shape[0]//2, 0))51top_right_corner = (slice(None, img.shape[0] // 2), slice(img.shape[1] // 2, None), (0, img.shape[1]//2))52bottom_right_corner = (slice(img.shape[0] // 2, None), slice(img.shape[1] // 2, None),53(img.shape[0]//2, img.shape[1]//2))54corners = [top_left_corner, bottom_left_corner, top_right_corner, bottom_right_corner]55all_corner_peaks = []56for y_slice, x_slice, (y_offset, x_offset) in corners:57peak_cos = twirl.find_peaks(img[y_slice, x_slice], threshold=1)[0:200]58corner_peaks = []59for x, y in peak_cos:60y += y_offset61x += x_offset62dist_from_center_x = x - shape[1] // 263dist_from_center_y = y - shape[0] // 264if np.sqrt(dist_from_center_x**2 + dist_from_center_y**2) < min(shape) // 2:65corner_peaks.append([x, y])66all_corner_peaks.extend(corner_peaks[:8])67all_corner_peaks = np.array(all_corner_peaks)68wcs = twirl.compute_wcs(all_corner_peaks, sky_coord, asterism=4)69return wcs, sky_coord70
71
72@measure_execution_time
73def plate_solve(shm_params: SharedMemoryParams, headers: list[Header],74progress_bar: Optional[AbstractProgressBar] = None,75stop_event: Optional[Event] = None) -> list[WCS]:76"""77Plate solving images stored in shared memory.
78
79Args:
80shm_params (SharedMemoryParams): Shared memory parameters for accessing image data.
81headers (list[Header]): List of image headers.
82progress_bar (Optional[AbstractProgressBar]): Progress bar for tracking plate solving progress.
83stop_event (Optional[Event]): Event for stopping plate solving process.
84
85Returns:
86list[WCS]: List of plate solved WCS coordinates.
87"""
88logger.log.info("Plate solving...")89imgs = np.memmap(shm_params.shm_name, dtype=shm_params.shm_dtype, mode='r+', shape=shm_params.shm_shape)90# get reference stars from GAIA for the first image's FOV91_, reference_stars = plate_solve_image(imgs[0], headers[0])92available_cpus = cpu_count() - 193frames_num = shm_params.shm_shape[0]94used_cpus = min(available_cpus, frames_num)95logger.log.debug(f"Number of CPUs to be used for loading images: {used_cpus}")96m = Manager()97progress_queue = m.Queue()98log_queue = m.Queue()99logger.start_process_listener(log_queue)100stop_queue = m.Queue(maxsize=1)101with Pool(processes=used_cpus) as pool:102logger.log.debug(f"Starting loading images with {used_cpus} workers")103results = pool.map_async(104partial(plate_solve_worker, shm_params=shm_params, progress_queue=progress_queue,105reference_stars=reference_stars, header=headers[0], stop_queue=stop_queue, log_queue=log_queue),106np.array_split(np.arange(frames_num), used_cpus))107if progress_bar is not None:108progress_bar.set_total(frames_num)109for _ in range(frames_num):110if stop_event is not None and stop_event.is_set():111stop_queue.put(True)112logger.log.debug("Stop event triggered")113break114got_result = False115while not got_result:116if not progress_queue.empty():117progress_queue.get()118logger.log.debug("Got a result from the progress queue")119got_result = True120if not stop_queue.empty():121logger.log.debug("Detected error from workers. Stopping.")122break123if not stop_queue.empty():124break125progress_bar.update()126progress_bar.complete()127res = results.get()128pool.close()129pool.join()130logger.log.debug(f"Plate solve pool stopped.")131logger.stop_process_listener()132new_res = []133for item in res:134new_res.extend(item)135new_res.sort(key=lambda x: x[0])136return [item[1] for item in new_res]137
138
139def plate_solve_worker(img_indexes: list[int], header: Header, shm_params: SharedMemoryParams,140reference_stars: np.ndarray, progress_queue: Queue, stop_queue: Optional[Queue] = None,141log_queue: Optional[Queue] = None) -> list[tuple[int, WCS]]:142handler = QueueHandler(log_queue)143logger.log.addHandler(handler)144logger.log.debug(f"Load worker started with {len(img_indexes)} images")145logger.log.debug(f"Shared memory parameters: {shm_params}")146try:147imgs = np.memmap(shm_params.shm_name, dtype=shm_params.shm_dtype, mode='r', shape=shm_params.shm_shape)148res = []149for img_idx in img_indexes:150if stop_queue is not None and not stop_queue.empty():151logger.log.debug("Plate solve worker detected stop event. Stopping.")152break153img = imgs[img_idx]154wcs, _ = plate_solve_image(img, header, reference_stars)155progress_queue.put(img_idx)156res.append((img_idx, wcs))157except Exception:158logger.log.error(f"Plate solve worker failed due to the following error:\n{traceback.format_exc()}")159stop_queue.put("ERROR")160raise161return res162
163
164@measure_execution_time
165def get_sources_from_gaia(center: SkyCoord, fov, limit: int = 10000, mag_limit: float = 10) -> np.ndarray:166ra = center.ra.deg167dec = center.dec.deg168if fov.ndim == 1:169ra_fov, dec_fov = fov.to(u.deg).value170else:171ra_fov = dec_fov = fov.to(u.deg).value172radius = np.min([ra_fov, dec_fov]) / 2173
174fields = "ra, dec"175job = Gaia.launch_job(176f"select top {limit} {fields} from gaiadr2.gaia_source where "177"1=CONTAINS("178f"POINT('ICRS', {ra}, {dec}), "179f"CIRCLE('ICRS',ra, dec, {radius})) "180f"and phot_g_mean_mag < {mag_limit} "181"order by phot_g_mean_mag"182)183table = job.get_results()184return np.array([table["ra"].value.data, table["dec"].value.data]).T185