CelestialSurveyor
248 строк · 9.8 Кб
1import astropy.io.fits2import numpy as np3import traceback4
5from astropy.coordinates import SkyCoord6from datetime import datetime7from decimal import Decimal8from functools import partial9from logging.handlers import QueueHandler10from multiprocessing import Queue, cpu_count, Pool, Manager11from threading import Event12from typing import Optional13from xisf import XISF14
15from backend.progress_bar import AbstractProgressBar16from backend.data_classes import SolveData, SiteLocation, Header17from logger.logger import get_logger18from backend.consuming_functions.measure_execution_time import measure_execution_time19
20
21logger = get_logger()22
23
24def __get_datetime_from_str(date_str: str) -> datetime:25"""26Convert a string date representation to a datetime object.
27
28Args:
29date_str (str): The string containing the date in the format "%Y-%m-%dT%H:%M:%S.%f".
30
31Returns:
32datetime: A datetime object representing the parsed date from the input string.
33"""
34try:35res = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%f")36except ValueError:37res = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S")38return res39
40
41def load_headers_worker(filenames: list[str], progress_queue: Optional[Queue] = None,42stop_queue: Optional[Queue] = None, log_queue: Optional[Queue] = None) -> list[Header]:43"""44Worker function to load header information from XISF and FIT(S) files.
45
46Args:
47filenames (List[str]): List of filenames to load headers from within this worker.
48progress_queue (Optional[Queue], optional): Queue for reporting progress. Defaults to None.
49stop_queue (Optional[Queue], optional): Queue to stop the loading process. Defaults to None.
50log_queue (Optional[Queue], optional): Queue for logging. Defaults to None.
51
52Returns:
53List[Header]: A list of Header objects containing the loaded header information from the files.
54"""
55handler = QueueHandler(log_queue)56logger.log.addHandler(handler)57logger.log.debug(f"Load worker started with {len(filenames)} filenames")58
59try:60headers = []61for filename in filenames:62if stop_queue is not None and not stop_queue.empty():63logger.log.debug("Load headers worker detected stop event. Stopping.")64break65if filename.lower().endswith(".xisf"):66headers.append(load_header_xisf(filename))67elif filename.lower().endswith(".fit") or filename.lower().endswith(".fits"):68headers.append(load_header_fits(filename))69else:70raise ValueError("File type not supported. Supported file types: .xisf, .fit, .fits")71if progress_queue is not None:72progress_queue.put(True)73return headers74except Exception:75logger.log.error(f"Load headers worker failed due to the following error:\n{traceback.format_exc()}")76stop_queue.put("ERROR")77raise78
79
80@measure_execution_time
81def load_headers(filenames: list[str], progress_bar: Optional[AbstractProgressBar] = None,82stop_event: Optional[Event] = None) -> list[Header]:83"""84Load header information from XISF and FIT(S) files using multiple workers.
85
86Args:
87filenames (List[str]): List of filenames to load headers from.
88progress_bar (Optional[AbstractProgressBar], optional): Progress bar for tracking loading progress.
89Defaults to None.
90stop_event (Optional[Event], optional): Event to stop the loading process. Defaults to None.
91
92Returns:
93List[Header]: A list of Header objects containing the loaded header information from the files.
94"""
95available_cpu = min(4, cpu_count(), len(filenames))96logger.log.debug(f"Number of CPUs to be used for alignment: {available_cpu}")97with Pool(available_cpu) as pool:98m = Manager()99progress_queue = m.Queue()100log_queue = m.Queue()101logger.start_process_listener(log_queue)102stop_queue = m.Queue(maxsize=1)103logger.log.debug(f"Starting alignment with {available_cpu} workers")104results = pool.map_async(105partial(load_headers_worker, progress_queue=progress_queue, stop_queue=stop_queue, log_queue=log_queue),106np.array_split(filenames, available_cpu))107
108if progress_bar is not None:109progress_bar.set_total(len(filenames))110for _ in range(len(filenames)):111if stop_event is not None and stop_event.is_set():112stop_queue.put(True)113logger.log.debug("Stop event triggered")114break115got_result = False116while not got_result:117if not progress_queue.empty():118progress_queue.get()119logger.log.debug("Got a result from the progress queue")120got_result = True121if not stop_queue.empty():122logger.log.debug("Detected error from workers. Stopping.")123break124if not stop_queue.empty():125break126progress_bar.update()127progress_bar.complete()128res = results.get()129headers = []130for item in res:131headers.extend(item)132pool.close()133pool.join()134logger.log.debug(f"Load headers pool stopped.")135logger.stop_process_listener()136return headers137
138
139def load_header_xisf(filename: str) -> Header:140"""141Load header information from an XISF file.
142
143Args:
144filename (str): The path to the XISF file.
145
146Returns:
147Header: A Header object containing the extracted information from the XISF file.
148"""
149# Initialize XISF object150xisf = XISF(filename)151# Get the metadata of the first image in the XISF file152img_meta = xisf.get_images_metadata()[0]153
154header = img_meta["FITSKeywords"]155
156# Extract the timestamp from the FITS header157timestamp = __get_datetime_from_str(header["DATE-OBS"][0]['value'])158
159# Extract the exposure time from the FITS header160exposure = Decimal(header["EXPTIME"][0]['value'])161# Extract the right ascension (RA) from the FITS header162ra = header.get("RA")163if ra is not None:164ra = Decimal(ra[0]['value'])165dec = header.get("DEC")166if dec is not None:167dec = Decimal(dec[0]['value'])168pixel_scale = header.get('SCALE')169if pixel_scale is not None:170pixel_scale = Decimal(pixel_scale[0]['value'])171else:172focal_len = header.get('FOCALLEN')173pixel_size = header.get('XPIXSZ')174if focal_len is not None and pixel_size is not None:175focal_len = Decimal(focal_len[0]['value'])176pixel_size = Decimal(pixel_size[0]['value'])177pixel_scale = (pixel_size / focal_len) * Decimal(206.265)178# Create SolveData object with the extracted RA, DEC and pixel scale179plate_solve_data = SolveData(SkyCoord(180ra, dec, unit=["deg", "deg"]), pixel_scale) if ra and dec and pixel_scale else None181# Extract the latitude from the FITS header, default to 0 if not present182lat = img_meta["FITSKeywords"].get("SITELAT")183lat = Decimal(lat[0]['value'] if lat is not None else 0)184# Extract the longitude from the FITS header, default to 0 if not present185long = img_meta["FITSKeywords"].get("SITELONG")186long = Decimal(long[0]['value'] if long is not None else 0)187# Create SiteLocation object with the extracted latitude and longitude188site_location = SiteLocation(lat=lat, long=long)189# Create Header object with the extracted information190header = Header(filename, exposure, timestamp, site_location, plate_solve_data)191return header192
193
194def load_header_fits(filename: str) -> Header:195"""196Load header information from a FITS file.
197
198Args:
199filename (str): The path to the FITS file.
200
201Returns:
202Header: A Header object containing the extracted information from the FITS file.
203"""
204# Open the FITS file205with astropy.io.fits.open(filename) as hdul:206# Get the header of the first HDU207header = hdul[0].header208
209# Extract the exposure time from the header210exposure = Decimal(header['EXPTIME'])211
212# Extract the timestamp from the header and convert it to datetime213timestamp = __get_datetime_from_str(header['DATE-OBS'])214
215# Extract the right ascension (RA) from the header216ra = Decimal(header['RA'])217
218# Extract the declination (DEC) from the header219dec = Decimal(header['DEC'])220
221# Extract the pixel scale from the header. If not present, calculate it from focal length and pixel size.222pixel_scale = header.get('SCALE')223if pixel_scale is not None:224pixel_scale = Decimal(pixel_scale)225else:226focal_len = header.get('FOCALLEN')227pixel_size = header.get('XPIXSZ')228if focal_len is not None and pixel_size is not None:229focal_len = Decimal(focal_len)230pixel_size = Decimal(pixel_size)231pixel_scale = (pixel_size / focal_len) * Decimal(206.265)232else:233raise ValueError("Pixel scale information is not present in FITS header")234# Create a SolveData object with the extracted RA, DEC and pixel scale235plate_solve_data = SolveData(SkyCoord(ra, dec, unit=["deg", "deg"]), pixel_scale)236
237# Extract the latitude and longitude from the header238lat = Decimal(header.get("SITELAT", 0))239long = Decimal(header.get("SITELONG", 0))240
241# Create a SiteLocation object with the extracted latitude and longitude242site_location = SiteLocation(lat=lat, long=long)243
244# Create a Header object with the extracted information245header = Header(filename, exposure, timestamp, site_location, plate_solve_data)246
247# Return the Header object248return header249