CelestialSurveyor
840 строк · 33.2 Кб
1import json
2import numpy as np
3import os
4import requests
5import sys
6import uuid
7
8from astropy.wcs import WCS
9from astropy.coordinates import Angle
10from threading import Event
11from typing import Optional, Union, Tuple, Generator
12
13from backend.consuming_functions.load_headers import load_headers
14from backend.consuming_functions.load_images import load_images, PIXEL_TYPE, load_image
15from backend.consuming_functions.plate_solve_images import plate_solve, plate_solve_image
16from backend.consuming_functions.align_images import align_images_wcs
17from backend.consuming_functions.stretch_images import stretch_images
18from backend.data_classes import SharedMemoryParams
19from backend.progress_bar import AbstractProgressBar
20from backend.data_classes import Header
21from backend.known_object import KnownObject
22from logger.logger import get_logger
23
24
25logger = get_logger()
26
27
28CHUNK_SIZE = 64
29
30
31class SourceDataV2:
32"""
33Class to manage image data.
34"""
35def __init__(self, to_debayer: bool = False) -> None:
36self.headers = []
37self.original_frames = None
38self.shm = None
39if not os.path.exists(self.tmp_folder):
40os.mkdir(self.tmp_folder)
41else:
42# only one sourcedata instance to be loaded at the same time
43for item in os.listdir(self.tmp_folder):
44if item.endswith(".np"):
45os.remove(os.path.join(self.tmp_folder, item))
46
47self.shm_name = self.__create_shm_name('images')
48self.shm_params = None
49self.footprint_map = None
50self.to_debayer = to_debayer
51self.y_borders: slice = slice(None, None)
52self.x_borders: slice = slice(None, None)
53self.__usage_map = None
54self.__chunk_len: int = 0
55self.__wcs: Optional[WCS] = None
56self.__cropped = False
57self.__shared = True
58self.__images = None
59self.__stop_event = Event()
60self.__used_images = None
61self.__usage_map_changed = True
62self.__original_frames = None
63
64def __create_shm_name(self, postfix: str = '') -> str:
65"""
66Creates name for the shared memory file.
67
68Parameters:
69- postfix (str): Optional postfix to append to the shared memory file name.
70
71Returns:
72- str: The generated shared memory file name.
73"""
74shm_name = os.path.join(self.tmp_folder, f"tmp_{uuid.uuid4().hex}_{postfix}.np")
75return shm_name
76
77def __clear_tmp_folder(self):
78"""
79Clears temporary folder by removing all files with '.np' extension (shared memory files).
80"""
81for item in os.listdir(self.tmp_folder):
82if item.endswith(".np"):
83os.remove(os.path.join(self.tmp_folder, item))
84
85def __reset_shm(self):
86"""
87Resets the shared memory by clearing the temporary folder and creating a new shared memory file.
88Required in UI mode when user wants to add more images or stops loading data.
89"""
90self.original_frames = None
91self.__clear_tmp_folder()
92self.shm_name = self.__create_shm_name('images')
93
94def raise_stop_event(self):
95"""
96Raise the stop event to let the child processes to stop.
97"""
98self.__stop_event.set()
99
100def clear_stop_event(self):
101"""
102Raise the stop event to let the child processes that reloading may be done.
103"""
104self.__stop_event.clear()
105
106@property
107def tmp_folder(self) -> str:
108"""
109Get the path to the temporary folder where shared memory files are stored.
110
111Returns:
112str: The path to the temporary folder.
113"""
114return os.path.join(sys.path[1], "tmp")
115
116@property
117def stop_event(self) -> Event:
118return self.__stop_event
119
120@staticmethod
121def filter_file_list(file_list: list[str]) -> list[str]:
122"""
123Filter the file list to include only files with extensions .xisf, .fit, or .fits.
124
125Args:
126file_list (list[str]): List of file paths to filter.
127
128Returns:
129list[str]: Filtered list of file paths.
130"""
131return [item for item in file_list if item.lower().endswith(".xisf") or item.lower().endswith(".fit")
132or item.lower().endswith(".fits")]
133
134def extend_headers(self, file_list: list[str], progress_bar: Optional[AbstractProgressBar] = None) -> None:
135"""
136Extends the headers with information loaded from the given file list.
137
138Args:
139file_list (list[str]): List of file paths to load headers from.
140progress_bar (Optional[AbstractProgressBar]): An optional progress bar to show the loading progress.
141
142Returns:
143None
144"""
145file_list = self.filter_file_list(file_list)
146self.headers.extend(load_headers(file_list, progress_bar, stop_event=self.stop_event))
147self.headers.sort(key=lambda header: header.timestamp)
148
149def set_headers(self, headers: list[Header]) -> None:
150"""
151Set the headers.
152
153Args:
154headers (list[Header]): List of headers to set.
155
156Returns:
157None
158"""
159self.headers = headers
160self.headers.sort(key=lambda header: header.timestamp)
161self.__reset_shm()
162
163@property
164def shape(self) -> tuple:
165return self.images.shape
166
167@property
168def origional_shape(self) -> tuple:
169return self.original_frames.shape
170
171@property
172def usage_map(self) -> np.ndarray:
173if self.__usage_map is None:
174self.__usage_map = np.ones((len(self.__images), ), dtype=bool)
175return self.__usage_map
176
177@usage_map.setter
178def usage_map(self, usage_map):
179self.__usage_map = usage_map
180self.__usage_map_changed = True
181
182@property
183def images(self):
184if self.__shared:
185usage_map = self.__usage_map if self.__usage_map is not None else np.ones((len(self.headers), ), dtype=bool)
186return self.original_frames[
187usage_map, self.y_borders, self.x_borders] if self.original_frames is not None else None
188else:
189if self.__usage_map_changed:
190self.__used_images = self.__images[self.usage_map]
191self.__usage_map_changed = False
192return self.__used_images
193
194def images_from_buffer(self) -> None:
195"""
196Copy images from the shared memory file to RAM, update headers based on usage map,
197and reset shared memory properties. Needs to be done after image loading, calibration and alignment to speed up
198processing.
199"""
200self.__images = np.copy(self.images)
201self.headers = [header for idx, header in enumerate(self.headers) if self.usage_map[idx]]
202self.__usage_map_changed = True
203self.usage_map = np.ones((len(self.__images), ), dtype=bool)
204name = self.shm_name
205self.original_frames._mmap.close()
206del self.original_frames
207self.original_frames = None
208os.remove(name)
209self.__original_frames = None
210self.__shared = False
211
212@property
213def max_image(self) -> np.ndarray:
214return np.amax(self.images, axis=0)
215
216@property
217def wcs(self) -> WCS:
218if self.__wcs is None and self.__cropped is True:
219self.__wcs, _ = self.plate_solve()
220return self.__wcs
221
222@wcs.setter
223def wcs(self, value):
224self.__wcs = value
225
226def load_images(self, progress_bar: Optional[AbstractProgressBar] = None) -> None:
227"""
228Load images from the file list specified in the headers.
229
230Parameters:
231- progress_bar (Optional[AbstractProgressBar]): A progress bar to show the loading progress.
232
233Returns:
234- None
235"""
236logger.log.info("Loading images...")
237file_list = [header.file_name for header in self.headers]
238img = load_image(file_list[0])
239shape = (len(file_list), *img.shape)
240self.shm_params = SharedMemoryParams(
241shm_name=self.shm_name, shm_shape=shape, shm_size=img.nbytes * len(file_list), shm_dtype=img.dtype)
242self.original_frames = np.memmap(self.shm_name, dtype=PIXEL_TYPE, mode='w+', shape=shape)
243self.__shared = True
244load_images(
245file_list, self.shm_params, to_debayer=self.to_debayer, progress_bar=progress_bar,
246stop_event=self.stop_event)
247
248@staticmethod
249def calculate_raw_crop(footprint: np.ndarray) -> tuple[tuple[int, int], tuple[int, int]]:
250"""
251Calculate the crop coordinates based on the footprint array.
252Raw crop means that the lines and columns which contain only zeros will be cut.
253
254Parameters:
255- footprint (np.ndarray): The input array representing the footprint.
256
257Returns:
258- Tuple[Tuple[int, int], Tuple[int, int]]: A tuple containing the crop coordinates for y-axis and x-axis.
259
260"""
261y_top = x_left = 0
262y_bottom, x_right = footprint.shape[:2]
263for num, line in enumerate(footprint):
264if not np.all(line):
265y_top = num
266break
267for num, line in enumerate(footprint[::-1]):
268if not np.all(line):
269y_bottom -= num
270break
271
272for num, line in enumerate(footprint.T):
273if not np.all(line):
274x_left = num
275break
276
277for num, line in enumerate(footprint.T[::-1]):
278if not np.all(line):
279x_right -= num
280break
281
282return (y_top, y_bottom), (x_left, x_right)
283
284@staticmethod
285def crop_image(imgs: np.ndarray,
286y_borders: Union[slice, Tuple[int, int]],
287x_borders: Union[slice, Tuple[int, int]],
288usage_mask: Optional[np.ndarray] = None) -> np.ndarray:
289"""
290Crop the images based on the provided y and x borders.
291
292Args:
293imgs (np.ndarray): The input image array.
294y_borders (Union[slice, Tuple[int, int]]): The borders for the y-axis.
295x_borders (Union[slice, Tuple[int, int]]): The borders for the x-axis.
296usage_mask (Optional[np.ndarray]): Optional usage mask for cropping.
297
298Returns:
299np.ndarray: The cropped image array.
300"""
301if isinstance(y_borders, slice):
302pass
303elif isinstance(y_borders, tuple) and len(y_borders) == 2:
304y_borders = slice(*y_borders)
305else:
306raise ValueError("y_borders must be a tuple of length 2 or a slice")
307if isinstance(x_borders, slice):
308pass
309elif isinstance(x_borders, tuple) and len(x_borders) == 2:
310x_borders = slice(*x_borders)
311else:
312raise ValueError("x_borders must be a tuple of length 2 or a slice")
313if usage_mask:
314return imgs[usage_mask, y_borders, x_borders]
315else:
316return imgs[:, y_borders, x_borders]
317
318@staticmethod
319def __get_num_of_corner_zeros(line: np.ndarray) -> int:
320"""
321Count the number of zeros in the footprint line.
322
323Args:
324line (np.ndarray): The input line from the footprint.
325
326Returns:
327int: The number of zeros in the line.
328"""
329# True means zero in footprint
330return np.count_nonzero(line)
331
332@classmethod
333def __fine_crop_border(cls, footprint: np.ndarray, direction: int, transpon: bool = True) -> Tuple[np.array, int]:
334"""
335This method calculates the fine crop border based on the direction and whether to transpose the footprint.
336The goal is to leave areas where the image is without zeros after alignment.
337
338Args:
339footprint (np.ndarray): The input footprint array.
340direction (int): The direction to calculate the border.
341transpon (bool, optional): Whether to transpose the footprint. Defaults to True.
342
343Returns:
344Tuple[np.array, int]: The cropped footprint and the calculated border.
345"""
346if transpon:
347footprint = footprint.T
348x = 0
349line: np.ndarray
350for num, line in enumerate(footprint[::direction]):
351if cls.__get_num_of_corner_zeros(line) <= cls.__get_num_of_corner_zeros(
352footprint[::direction][num + 1]):
353x = num
354break
355if direction == -1:
356result_tmp = footprint[: (x + 1) * direction]
357x = footprint.shape[0] - x
358else:
359result_tmp = footprint[x:]
360return result_tmp.T if transpon else result_tmp, x
361
362@classmethod
363def calculate_crop(cls, footprint: np.ndarray) -> Tuple[Tuple[int, int], Tuple[int, int]]:
364"""
365Calculate the crop coordinates based on the footprint array. The goal is to calculate rectangle without zero
366areas after alignment.
367
368Parameters:
369- footprint (np.ndarray): The input array representing the footprint.
370
371Returns:
372- Tuple[Tuple[int, int], Tuple[int, int]]: A tuple containing the crop coordinates for y-axis and x-axis.
373"""
374
375y_pre_crop, x_pre_crop = cls.calculate_raw_crop(footprint)
376pre_cropped = footprint[slice(*y_pre_crop), slice(*x_pre_crop)]
377y_top_zeros = cls.__get_num_of_corner_zeros(pre_cropped[0])
378y_bottom_zeros = cls.__get_num_of_corner_zeros(pre_cropped[-1])
379x_left_zeros = cls.__get_num_of_corner_zeros(pre_cropped.T[0])
380x_right_zeros = cls.__get_num_of_corner_zeros(pre_cropped.T[-1])
381zeros = y_top_zeros, y_bottom_zeros, x_left_zeros, x_right_zeros
382trim_args = (1, False), (-1, False), (1, True), (-1, True)
383args_order = (item[1] for item in sorted(zip(zeros, trim_args), key=lambda x: x[0], reverse=True))
384border_map = {item: value for item, value in zip(trim_args, ["y_top", "y_bottom", "x_left", "x_right"])}
385result = {}
386cropped = pre_cropped
387for pair in args_order:
388boarder_name = border_map[pair]
389cropped, x = cls.__fine_crop_border(cropped, *pair)
390result.update({boarder_name: x})
391
392y_top = result["y_top"] + y_pre_crop[0]
393y_bottom = result["y_bottom"] + y_pre_crop[0]
394x_left = result["x_left"] + x_pre_crop[0]
395x_right = result["x_right"] + x_pre_crop[0]
396crop = (y_top, y_bottom), (x_left, x_right)
397return crop
398
399def align_images_wcs(self, progress_bar: Optional[AbstractProgressBar] = None) -> None:
400"""
401Align images with World Coordinate System (WCS).
402
403Args:
404progress_bar (Optional[AbstractProgressBar]): Progress bar object.
405
406Returns:
407None
408"""
409logger.log.info("Aligning images...")
410success_map, self.footprint_map = align_images_wcs(
411self.shm_params,
412[header.wcs for header in self.headers],
413progress_bar=progress_bar,
414stop_event=self.stop_event)
415self.__usage_map = success_map
416
417def crop_images(self) -> None:
418"""
419Crop the images based on the footprint map and update borders.
420Keeps common non-zero area on all the images after alignment.
421"""
422logger.log.info("Cropping images...")
423x_borders, y_borders = [], []
424if self.footprint_map is None:
425footprint_map = self.original_frames[self.__usage_map] == 0
426footprint_map = footprint_map[0]
427if len(footprint_map.shape) == 4:
428footprint_map = np.reshape(footprint_map, footprint_map.shape[:-1])
429else:
430footprint_map = self.footprint_map[np.array(self.__usage_map, dtype=bool)]
431for item in footprint_map:
432if self.stop_event.is_set():
433return
434y_border, x_border = self.calculate_crop(item)
435
436y_borders.append(y_border)
437x_borders.append(x_border)
438y_borders = np.array(y_borders)
439x_borders = np.array(x_borders)
440self.y_borders = slice(int(np.max(y_borders[:, 0])), int(np.min(y_borders[:, 1])))
441self.x_borders = slice(int(np.max(x_borders[:, 0])), int(np.min(x_borders[:, 1])))
442self.__cropped = True
443self.footprint_map = None
444# Plate solve after cropping
445self.wcs, _ = self.plate_solve(0)
446
447def make_master_dark(self, filenames: list[str], progress_bar: Optional[AbstractProgressBar] = None) -> np.ndarray:
448"""
449Create a master dark frame from a list of dark frame filenames.
450
451Args:
452filenames (list[str]): List of dark frame filenames.
453progress_bar (Optional[AbstractProgressBar], optional): Progress bar instance. Defaults to None.
454
455Returns:
456np.ndarray: Master dark frame.
457"""
458shape = (len(filenames), *self.origional_shape[1:])
459size = self.original_frames.itemsize
460shm_name = self.__create_shm_name('darks')
461for value in shape:
462size *= value
463shm_params = SharedMemoryParams(
464shm_name=shm_name, shm_shape=shape, shm_size=size, shm_dtype=PIXEL_TYPE)
465darks = np.memmap(shm_params.shm_name, dtype=PIXEL_TYPE, mode='w+', shape=shape)
466load_images(filenames, shm_params, progress_bar=progress_bar, to_debayer=self.to_debayer)
467master_dark = np.average(darks, axis=0)
468darks._mmap.close()
469del darks
470os.remove(shm_name)
471return master_dark
472
473def make_master_flat(self, flat_filenames: list[str], dark_flat_filenames: Optional[list[str]] = None,
474progress_bar: Optional[AbstractProgressBar] = None) -> np.ndarray:
475"""
476Create a master flat frame from a list of flat frame filenames.
477
478Args:
479flat_filenames (list[str]): List of flat frame filenames.
480dark_flat_filenames (Optional[list[str]], optional): List of dark flat frame filenames. Defaults to None.
481progress_bar (Optional[AbstractProgressBar], optional): Progress bar instance. Defaults to None.
482
483Returns:
484np.ndarray: Master flat frame.
485"""
486flat_shape = (len(flat_filenames), *self.origional_shape[1:])
487flat_size = self.original_frames.itemsize
488flat_shm_name = self.__create_shm_name('flats')
489for value in flat_shape:
490flat_size *= value
491flat_shm_params = SharedMemoryParams(
492shm_name=flat_shm_name, shm_shape=flat_shape, shm_size=flat_size, shm_dtype=PIXEL_TYPE)
493flats = np.memmap(flat_shm_params.shm_name, dtype=PIXEL_TYPE, mode='w+', shape=flat_shape)
494load_images(flat_filenames, flat_shm_params, progress_bar=progress_bar, to_debayer=self.to_debayer)
495if dark_flat_filenames is not None:
496master_dark_flat = self.make_master_dark(dark_flat_filenames, progress_bar=progress_bar)
497for flat in flats:
498flat -= master_dark_flat
499master_flat = np.average(flats, axis=0)
500flats._mmap.close()
501del flats
502os.remove(flat_shm_name)
503return master_flat
504
505def load_flats(self, flat_filenames: list[str], progress_bar: Optional[AbstractProgressBar] = None) -> np.ndarray:
506"""
507Load flat frames into memory.
508
509Args:
510flat_filenames (list[str]): List of flat frame filenames.
511progress_bar (Optional[AbstractProgressBar], optional): Progress bar instance. Defaults to None.
512
513Returns:
514np.ndarray: Loaded flat frames.
515"""
516flat_shape = (len(flat_filenames), *self.origional_shape[1:])
517flat_size = self.original_frames.itemsize
518flat_shm_name = self.__create_shm_name('flats')
519for value in flat_shape:
520flat_size *= value
521flat_shm_params = SharedMemoryParams(
522shm_name=flat_shm_name, shm_shape=flat_shape, shm_size=flat_size, shm_dtype=PIXEL_TYPE)
523flats = np.memmap(flat_shm_params.shm_name, dtype=PIXEL_TYPE, mode='w+', shape=flat_shape)
524load_images(flat_filenames, flat_shm_params, progress_bar=progress_bar, to_debayer=self.to_debayer)
525res = np.copy(flats)
526flats._mmap.close()
527del flats
528os.remove(flat_shm_name)
529return res
530
531def calibrate_images(self, dark_files: Optional[list[str]] = None, flat_files: Optional[list[str]] = None,
532dark_flat_files: Optional[list[str]] = None, progress_bar: Optional[AbstractProgressBar] = None
533) -> None:
534"""
535Calibrates images by subtracting master dark frames and dividing by master flat frames.
536
537Args:
538dark_files (Optional[list[str]]): List of dark frame filenames.
539flat_files (Optional[list[str]]): List of flat frame filenames.
540dark_flat_files (Optional[list[str]]): List of dark flat frame filenames.
541progress_bar (Optional[AbstractProgressBar]): Progress bar instance.
542
543Returns:
544None
545"""
546if dark_files is not None:
547master_dark = self.make_master_dark(dark_files, progress_bar=progress_bar)
548self.original_frames -= master_dark
549if flat_files is not None:
550master_flat = self.make_master_flat(flat_files, dark_flat_files, progress_bar=progress_bar)
551self.original_frames /= master_flat
552
553def stretch_images(self, progress_bar: Optional[AbstractProgressBar] = None) -> None:
554"""
555Stretch images stored in shared memory.
556
557Args:
558progress_bar (Optional[AbstractProgressBar]): Progress bar to track the stretching progress.
559
560Returns:
561None
562"""
563logger.log.info("Stretching images...")
564shm_params = self.shm_params
565shm_params.y_slice = self.y_borders
566shm_params.x_slice = self.x_borders
567stretch_images(self.shm_params, progress_bar=progress_bar, stop_event=self.stop_event)
568
569def get_number_of_chunks(self, size: tuple[int, int] = (CHUNK_SIZE, CHUNK_SIZE),
570overlap: float = 0.5) -> tuple[np.ndarray, np.ndarray]:
571"""
572Calculate the number of image chunks based on the specified size and overlap.
573
574Args:
575size (tuple[int, int], optional): The size of the image chunks in the format (height, width).
576Defaults to (CHUNK_SIZE, CHUNK_SIZE).
577overlap (float, optional): The overlap percentage between image chunks. Defaults to 0.5.
578
579Returns:
580tuple[np.ndarray, np.ndarray]: Two arrays representing the y and x coordinates of the image chunks.
581"""
582size_y, size_x = size
583ys = np.arange(0, self.shape[1] - size_y * overlap, size_y * overlap)
584ys[-1] = self.shape[1] - size_y
585xs = np.arange(0, self.shape[2] - size_x * overlap, size_x * overlap)
586xs[-1] = self.shape[2] - size_x
587return ys, xs
588
589def generate_image_chunks(self, size: tuple[int, int] = (CHUNK_SIZE, CHUNK_SIZE), overlap: float = 0.5):
590"""
591Generate image chunks based on the specified size and overlap.
592
593Args:
594size (tuple[int, int], optional): The size of the image chunks in the format (height, width).
595Defaults to (CHUNK_SIZE, CHUNK_SIZE).
596overlap (float, optional): The overlap percentage between image chunks. Defaults to 0.5.
597
598Yields:
599tuple: A tuple containing the coordinates and prepared images of the generated image chunks.
600"""
601size_y, size_x = size
602ys, xs = self.get_number_of_chunks(size, overlap)
603coordinates = ((y, x) for y in ys for x in xs)
604for y, x in coordinates:
605y, x = int(y), int(x)
606imgs = np.copy(self.images[:, y:y + size_y, x:x + size_x])
607yield (y, x), self.prepare_images(np.copy(imgs))
608
609@staticmethod
610def generate_batch(chunk_generator: Generator, batch_size: int) -> tuple[tuple[int, int], np.ndarray]:
611"""
612Generate batches of chunks for processing with the given batch size.
613
614Args:
615chunk_generator (Generator): Generator that yields chunks and coordinates.
616batch_size (int): The size of each batch.
617
618Yields:
619tuple[tuple[int, int], np.ndarray]: A tuple containing the coordinates and batch of chunks.
620"""
621batch = []
622coords = []
623for coord, chunk in chunk_generator:
624batch.append(chunk)
625coords.append(coord)
626if len(batch) == batch_size:
627yield coords, np.array(batch)
628batch = []
629coords = []
630if len(batch) > 0:
631yield coords, np.array(batch)
632
633@staticmethod
634def estimate_image_noize_level(imgs: np.ndarray) -> float:
635"""
636Estimate the noise level of the given images.
637
638Args:
639imgs (np.ndarray): The images to estimate the noise level for.
640
641Returns:
642float: The estimated noise level.
643"""
644return np.mean(np.var(imgs, axis=0))
645
646@classmethod
647def prepare_images(cls, images: np.ndarray) -> np.ndarray:
648"""
649Prepare the given images for processing by AI model.
650
651Args:
652images (np.ndarray): The images to prepare.
653
654Returns:
655np.ndarray: The prepared images.
656"""
657# normalize images
658images -= cls.estimate_image_noize_level(images)
659images = images - np.min(images)
660images = images / np.max(images)
661images = np.reshape(images, (*images.shape[:3], 1))
662return images
663
664@staticmethod
665def adjust_chunks_to_min_len(imgs: np.ndarray, timestamps: list, min_len: int = 8) -> tuple[np.ndarray, list]:
666"""
667Adjust the given chunks to the minimum length. To be used if there are fewer images than the minimum length.
668
669Args:
670imgs (np.ndarray): The images to adjust.
671timestamps (list): The timestamps of the images.
672min_len (int, optional): The minimum length of the chunks. Defaults to 8.
673
674Returns:
675tuple[np.ndarray, list]: The adjusted images and timestamps.
676"""
677assert len(imgs) == len(timestamps), \
678f"Images and timestamp amount mismatch: len(imgs)={len(imgs)}. len(timestamps)={len(timestamps)}"
679
680if len(imgs) >= min_len:
681return imgs, timestamps
682new_imgs = []
683new_timestamps = []
684while len(new_imgs) < min_len:
685new_imgs.extend(list(imgs))
686new_timestamps.extend(timestamps)
687new_imgs = new_imgs[:8]
688new_timestamps = new_timestamps[:8]
689timestamped_images = list(zip(new_imgs, new_timestamps))
690timestamped_images.sort(key=lambda x: x[1])
691new_imgs = [item[0] for item in timestamped_images]
692new_timestamps = [item[1] for item in timestamped_images]
693new_imgs = np.array(new_imgs)
694return new_imgs, new_timestamps
695
696@staticmethod
697def make_file_paths(folder: str) -> list[str]:
698"""
699Create a list of file paths for files in the specified folder that end with specific extensions.
700
701Parameters:
702- folder (str): The folder path to search for files.
703
704Returns:
705- list[str]: A list of file paths with extensions '.xisf', '.fit', or '.fits'.
706"""
707return [os.path.join(folder, item) for item in os.listdir(folder) if item.lower().endswith(
708".xisf") or item.lower().endswith(".fit") or item.lower().endswith(".fits")]
709
710def plate_solve(self, ref_idx: int = 0, sky_coord: Optional[np.ndarray] = None) -> tuple[WCS, np.ndarray]:
711"""
712Perform plate solving on the given reference image.
713
714Args:
715ref_idx (int, optional): The index of the reference image. Defaults to 0.
716sky_coord (np.ndarray, optional): The sky coordinates of the reference image. Defaults to None.
717
718Returns:
719tuple[WCS, np.ndarray]: The plate solved WCS and the sky coordinates of the reference image.
720"""
721logger.log.info("Plate solving...")
722wcs, sky_coord = plate_solve_image(self.images[ref_idx], header=self.headers[ref_idx], sky_coord=sky_coord)
723self.__wcs = wcs
724return wcs, sky_coord
725
726def plate_solve_all(self, progress_bar: Optional[AbstractProgressBar] = None) -> None:
727"""
728Perform plate solving on all images and update corresponding headers.
729
730Args:
731progress_bar (AbstractProgressBar, optional): The progress bar to display the progress. Defaults to None.
732
733Returns:
734None
735"""
736logger.log.info("Plate solving all images...")
737res = plate_solve(self.shm_params, self.headers, progress_bar=progress_bar, stop_event=self.stop_event)
738for wcs, header in zip(res, self.headers):
739header.wcs = wcs
740
741@staticmethod
742def convert_ra(ra: Angle) -> str:
743"""
744Convert right ascension from astropy Angle format to a string representation suitable for Small Body Api.
745https://ssd-api.jpl.nasa.gov/doc/sb_ident.html
746
747Args:
748ra (Angle): The right ascension angle to convert.
749
750Returns:
751str: The string representation of the right ascension.
752"""
753minus_substr = "M" if int(ra.h) < 0 else ""
754hour = f"{minus_substr}{abs(int(ra.h)):02d}"
755return f"{hour}-{abs(int(ra.m)):02d}-{abs(int(ra.s)):02d}"
756
757@staticmethod
758def convert_dec(dec: Angle) -> str:
759"""
760Convert declination from astropy Angle format to a string representation suitable for Small Body Api.
761https://ssd-api.jpl.nasa.gov/doc/sb_ident.html
762
763Args:
764dec (Angle): The declination angle to convert.
765
766Returns:
767str: The string representation of the declination.
768"""
769minus_substr = "M" if int(dec.d) < 0 else ""
770hour = f"{minus_substr}{abs(int(dec.d)):02d}"
771return f"{hour}-{abs(int(dec.m)):02d}-{abs(int(dec.s)):02d}"
772
773def fetch_known_asteroids_for_image(self, img_idx: int, magnitude_limit: float = 18.0
774) -> tuple[list[KnownObject], list[KnownObject]]:
775"""
776Fetch known asteroids and comets within the image's field of view based on the specified magnitude limit.
777Request data from JPL Small Body Api (https://ssd-api.jpl.nasa.gov/doc/sb_ident.html).
778
779Args:
780img_idx (int): The index of the image.
781magnitude_limit (float): The magnitude limit for known asteroids.
782
783Returns:
784tuple[list[KnownObject], list[KnownObject]]: A tuple containing lists of KnownObject instances for asteroids
785and comets.
786"""
787logger.log.debug("Requesting visible targets")
788obs_time = self.headers[img_idx].timestamp
789obs_time = (f"{obs_time.year:04d}-{obs_time.month:02d}-{obs_time.day:02d}_{obs_time.hour:02d}:"
790f"{obs_time.minute:02d}:{obs_time.second:02d}")
791corner_points = [self.wcs.pixel_to_world(x, y) for x, y in (
792(0, 0), (0, self.shape[1]), (self.shape[2], 0), (self.shape[2], self.shape[1]))]
793ra_max = max([item.ra.hms for item in corner_points])
794ra_min = min([item.ra.hms for item in corner_points])
795dec_max = max([item.dec.dms for item in corner_points])
796dec_min = min([item.dec.dms for item in corner_points])
797fov_ra_lim = f"{self.convert_ra(ra_min)},{self.convert_ra(ra_max)}"
798fov_dec_lim = f"{self.convert_dec(dec_min)},{self.convert_dec(dec_max)}"
799know_asteroids = []
800know_comets = []
801for sb_kind in ('a', 'c'):
802params = {
803"sb-kind": sb_kind,
804"lat": round(self.headers[img_idx].site_location.lat, 3),
805"lon": round(self.headers[img_idx].site_location.long, 4),
806"alt": 0,
807"obs-time": obs_time,
808"mag-required": True,
809"two-pass": True,
810"suppress-first-pass": True,
811"req-elem": False,
812"vmag-lim": magnitude_limit,
813"fov-ra-lim": fov_ra_lim,
814"fov-dec-lim": fov_dec_lim,
815}
816logger.log.debug(f"Params: {params}")
817res = requests.get("https://ssd-api.jpl.nasa.gov/sb_ident.api", params)
818res = json.loads(res.content)
819potential_known_objects = [dict(zip(res["fields_second"], item)) for item in
820res.get("data_second_pass", [])]
821potential_known_objects = [KnownObject(item, wcs=self.wcs) for item in potential_known_objects]
822first_pass_objects = [dict(zip(res["fields_first"], item)) for item in res.get("data_first_pass", [])]
823potential_known_objects.extend([KnownObject(item, wcs=self.wcs) for item in first_pass_objects])
824for item in potential_known_objects:
825x, y = item.pixel_coordinates
826if 0 <= x < self.shape[2] and 0 <= y < self.shape[1]:
827if sb_kind == 'a':
828know_asteroids.append(item)
829if sb_kind == 'c':
830know_comets.append(item)
831logger.log.info(f"Found {len(know_asteroids)} known asteroids and {len(know_comets)} known comets in the FOV")
832if know_asteroids:
833logger.log.info(f"Known asteroids:")
834for item in know_asteroids:
835logger.log.info(str(item))
836if know_comets:
837logger.log.info(f"Known comets:")
838for item in know_comets:
839logger.log.info(str(item))
840return know_asteroids, know_comets
841