CelestialSurveyor

Форк
0
248 строк · 9.8 Кб
1
import astropy.io.fits
2
import numpy as np
3
import traceback
4

5
from astropy.coordinates import SkyCoord
6
from datetime import datetime
7
from decimal import Decimal
8
from functools import partial
9
from logging.handlers import QueueHandler
10
from multiprocessing import Queue, cpu_count, Pool, Manager
11
from threading import Event
12
from typing import Optional
13
from xisf import XISF
14

15
from backend.progress_bar import AbstractProgressBar
16
from backend.data_classes import SolveData, SiteLocation, Header
17
from logger.logger import get_logger
18
from backend.consuming_functions.measure_execution_time import measure_execution_time
19

20

21
logger = get_logger()
22

23

24
def __get_datetime_from_str(date_str: str) -> datetime:
25
    """
26
    Convert a string date representation to a datetime object.
27

28
    Args:
29
        date_str (str): The string containing the date in the format "%Y-%m-%dT%H:%M:%S.%f".
30

31
    Returns:
32
        datetime: A datetime object representing the parsed date from the input string.
33
    """
34
    try:
35
        res = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%f")
36
    except ValueError:
37
        res = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S")
38
    return res
39

40

41
def load_headers_worker(filenames: list[str], progress_queue: Optional[Queue] = None,
42
                        stop_queue: Optional[Queue] = None, log_queue: Optional[Queue] = None) -> list[Header]:
43
    """
44
    Worker function to load header information from XISF and FIT(S) files.
45

46
    Args:
47
        filenames (List[str]): List of filenames to load headers from within this worker.
48
        progress_queue (Optional[Queue], optional): Queue for reporting progress. Defaults to None.
49
        stop_queue (Optional[Queue], optional): Queue to stop the loading process. Defaults to None.
50
        log_queue (Optional[Queue], optional): Queue for logging. Defaults to None.
51

52
    Returns:
53
        List[Header]: A list of Header objects containing the loaded header information from the files.
54
    """
55
    handler = QueueHandler(log_queue)
56
    logger.log.addHandler(handler)
57
    logger.log.debug(f"Load worker started with {len(filenames)} filenames")
58

59
    try:
60
        headers = []
61
        for filename in filenames:
62
            if stop_queue is not None and not stop_queue.empty():
63
                logger.log.debug("Load headers worker detected stop event. Stopping.")
64
                break
65
            if filename.lower().endswith(".xisf"):
66
                headers.append(load_header_xisf(filename))
67
            elif filename.lower().endswith(".fit") or filename.lower().endswith(".fits"):
68
                headers.append(load_header_fits(filename))
69
            else:
70
                raise ValueError("File type not supported. Supported file types: .xisf, .fit, .fits")
71
            if progress_queue is not None:
72
                progress_queue.put(True)
73
        return headers
74
    except Exception:
75
        logger.log.error(f"Load headers worker failed due to the following error:\n{traceback.format_exc()}")
76
        stop_queue.put("ERROR")
77
        raise
78

79

80
@measure_execution_time
81
def load_headers(filenames: list[str], progress_bar: Optional[AbstractProgressBar] = None,
82
                 stop_event: Optional[Event] = None) -> list[Header]:
83
    """
84
    Load header information from XISF and FIT(S) files using multiple workers.
85

86
    Args:
87
        filenames (List[str]): List of filenames to load headers from.
88
        progress_bar (Optional[AbstractProgressBar], optional): Progress bar for tracking loading progress.
89
            Defaults to None.
90
        stop_event (Optional[Event], optional): Event to stop the loading process. Defaults to None.
91

92
    Returns:
93
        List[Header]: A list of Header objects containing the loaded header information from the files.
94
    """
95
    available_cpu = min(4, cpu_count(), len(filenames))
96
    logger.log.debug(f"Number of CPUs to be used for alignment: {available_cpu}")
97
    with Pool(available_cpu) as pool:
98
        m = Manager()
99
        progress_queue = m.Queue()
100
        log_queue = m.Queue()
101
        logger.start_process_listener(log_queue)
102
        stop_queue = m.Queue(maxsize=1)
103
        logger.log.debug(f"Starting alignment with {available_cpu} workers")
104
        results = pool.map_async(
105
            partial(load_headers_worker, progress_queue=progress_queue, stop_queue=stop_queue, log_queue=log_queue),
106
            np.array_split(filenames, available_cpu))
107

108
        if progress_bar is not None:
109
            progress_bar.set_total(len(filenames))
110
            for _ in range(len(filenames)):
111
                if stop_event is not None and stop_event.is_set():
112
                    stop_queue.put(True)
113
                    logger.log.debug("Stop event triggered")
114
                    break
115
                got_result = False
116
                while not got_result:
117
                    if not progress_queue.empty():
118
                        progress_queue.get()
119
                        logger.log.debug("Got a result from the progress queue")
120
                        got_result = True
121
                    if not stop_queue.empty():
122
                        logger.log.debug("Detected error from workers. Stopping.")
123
                        break
124
                if not stop_queue.empty():
125
                    break
126
                progress_bar.update()
127
            progress_bar.complete()
128
        res = results.get()
129
        headers = []
130
        for item in res:
131
            headers.extend(item)
132
        pool.close()
133
        pool.join()
134
        logger.log.debug(f"Load headers pool stopped.")
135
        logger.stop_process_listener()
136
    return headers
137

138

139
def load_header_xisf(filename: str) -> Header:
140
    """
141
    Load header information from an XISF file.
142

143
    Args:
144
        filename (str): The path to the XISF file.
145

146
    Returns:
147
        Header: A Header object containing the extracted information from the XISF file.
148
    """
149
    # Initialize XISF object
150
    xisf = XISF(filename)
151
    # Get the metadata of the first image in the XISF file
152
    img_meta = xisf.get_images_metadata()[0]
153

154
    header = img_meta["FITSKeywords"]
155

156
    # Extract the timestamp from the FITS header
157
    timestamp = __get_datetime_from_str(header["DATE-OBS"][0]['value'])
158

159
    # Extract the exposure time from the FITS header
160
    exposure = Decimal(header["EXPTIME"][0]['value'])
161
    # Extract the right ascension (RA) from the FITS header
162
    ra = header.get("RA")
163
    if ra is not None:
164
        ra = Decimal(ra[0]['value'])
165
    dec = header.get("DEC")
166
    if dec is not None:
167
        dec = Decimal(dec[0]['value'])
168
    pixel_scale = header.get('SCALE')
169
    if pixel_scale is not None:
170
        pixel_scale = Decimal(pixel_scale[0]['value'])
171
    else:
172
        focal_len = header.get('FOCALLEN')
173
        pixel_size = header.get('XPIXSZ')
174
        if focal_len is not None and pixel_size is not None:
175
            focal_len = Decimal(focal_len[0]['value'])
176
            pixel_size = Decimal(pixel_size[0]['value'])
177
            pixel_scale = (pixel_size / focal_len) * Decimal(206.265)
178
    # Create SolveData object with the extracted RA, DEC and pixel scale
179
    plate_solve_data = SolveData(SkyCoord(
180
        ra, dec, unit=["deg", "deg"]), pixel_scale) if ra and dec and pixel_scale else None
181
    # Extract the latitude from the FITS header, default to 0 if not present
182
    lat = img_meta["FITSKeywords"].get("SITELAT")
183
    lat = Decimal(lat[0]['value'] if lat is not None else 0)
184
    # Extract the longitude from the FITS header, default to 0 if not present
185
    long = img_meta["FITSKeywords"].get("SITELONG")
186
    long = Decimal(long[0]['value'] if long is not None else 0)
187
    # Create SiteLocation object with the extracted latitude and longitude
188
    site_location = SiteLocation(lat=lat, long=long)
189
    # Create Header object with the extracted information
190
    header = Header(filename, exposure, timestamp, site_location, plate_solve_data)
191
    return header
192

193

194
def load_header_fits(filename: str) -> Header:
195
    """
196
    Load header information from a FITS file.
197

198
    Args:
199
        filename (str): The path to the FITS file.
200

201
    Returns:
202
        Header: A Header object containing the extracted information from the FITS file.
203
    """
204
    # Open the FITS file
205
    with astropy.io.fits.open(filename) as hdul:
206
        # Get the header of the first HDU
207
        header = hdul[0].header
208

209
        # Extract the exposure time from the header
210
        exposure = Decimal(header['EXPTIME'])
211

212
        # Extract the timestamp from the header and convert it to datetime
213
        timestamp = __get_datetime_from_str(header['DATE-OBS'])
214

215
        # Extract the right ascension (RA) from the header
216
        ra = Decimal(header['RA'])
217

218
        # Extract the declination (DEC) from the header
219
        dec = Decimal(header['DEC'])
220

221
        # Extract the pixel scale from the header. If not present, calculate it from focal length and pixel size.
222
        pixel_scale = header.get('SCALE')
223
        if pixel_scale is not None:
224
            pixel_scale = Decimal(pixel_scale)
225
        else:
226
            focal_len = header.get('FOCALLEN')
227
            pixel_size = header.get('XPIXSZ')
228
            if focal_len is not None and pixel_size is not None:
229
                focal_len = Decimal(focal_len)
230
                pixel_size = Decimal(pixel_size)
231
                pixel_scale = (pixel_size / focal_len) * Decimal(206.265)
232
            else:
233
                raise ValueError("Pixel scale information is not present in FITS header")
234
        # Create a SolveData object with the extracted RA, DEC and pixel scale
235
        plate_solve_data = SolveData(SkyCoord(ra, dec, unit=["deg", "deg"]), pixel_scale)
236

237
        # Extract the latitude and longitude from the header
238
        lat = Decimal(header.get("SITELAT", 0))
239
        long = Decimal(header.get("SITELONG", 0))
240

241
        # Create a SiteLocation object with the extracted latitude and longitude
242
        site_location = SiteLocation(lat=lat, long=long)
243

244
        # Create a Header object with the extracted information
245
        header = Header(filename, exposure, timestamp,  site_location, plate_solve_data)
246

247
        # Return the Header object
248
        return header
249

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.