CelestialSurveyor
767 строк · 32.6 Кб
1import cv2
2import datetime
3import json
4import numba
5import numpy as np
6import os
7import random
8import sys
9import tifffile
10
11
12from dataclasses import dataclass
13from decimal import Decimal
14from PIL import Image
15from typing import Optional, Sequence, Generator
16
17from backend.source_data_v2 import SourceDataV2, CHUNK_SIZE
18from logger.logger import get_logger
19
20
21logger = get_logger()
22
23MIN_TOTAL_MOVEMENT = 5 # px
24
25
26@dataclass
27class RandomObject:
28"""
29Represents a random object to be drawn on the image series.
30
31Attributes:
32start_y (int): The starting y-coordinate of the object.
33start_x (int): The starting x-coordinate of the object.
34start_frame_idx (int): The starting frame index of the object.
35movement_vector (np.ndarray): The movement vector of the object.
36brightness_above_noize (float): The brightness level above noise.
37star_sample (np.ndarray): The sample of the star.
38exposure (Decimal): The exposure value of the object.
39timestamps (Sequence[datetime.datetime]): The timestamps associated with the object.
40"""
41start_y: int
42start_x: int
43start_frame_idx: int
44movement_vector: np.ndarray
45brightness_above_noize: float
46star_sample: np.ndarray
47exposure: Decimal
48timestamps: list[datetime.datetime]
49
50
51class TrainingSourceDataV2(SourceDataV2):
52"""
53Class to manage image data. One instance of this class manages one object captured.
54There are some methods to generate synthetic data for AI model training.
55"""
56
57SAMPLES_FOLDER = os.path.join(sys.path[1], "star_samples")
58OBJ_TYPE_STAR = "star"
59OBJ_TYPE_ALL = "all"
60OBJ_TYPE_COMET = "comet"
61
62def __init__(self, to_debayer: bool = False, number_of_images: Optional[int] = None) -> None:
63super().__init__(to_debayer)
64self.number_of_images = number_of_images
65self.exclusion_boxes = []
66logger.log.info("Loading star samples")
67self.star_samples, self.comet_samples = self._load_star_samples()
68
69@classmethod
70def _load_star_samples(cls) -> tuple[list[np.ndarray], list[np.ndarray]]:
71"""
72Loads star and comet samples from the SAMPLES_FOLDER directory.
73
74Returns:
75Tuple containing a list of star samples and a list of comet samples, both as numpy arrays.
76"""
77file_list_stars = [
78os.path.join(cls.SAMPLES_FOLDER, item) for item in os.listdir(cls.SAMPLES_FOLDER)
79if ".tif" in item.lower() and "star" in item.lower()]
80file_list_comets = [
81os.path.join(cls.SAMPLES_FOLDER, item) for item in os.listdir(cls.SAMPLES_FOLDER)
82if ".tif" in item.lower() and "comet" in item.lower()]
83star_samples = [np.array(tifffile.tifffile.imread(item)) for item in file_list_stars]
84comet_samples = [np.array(tifffile.tifffile.imread(item)) for item in file_list_comets]
85for num, sample in enumerate(star_samples):
86if len(sample.shape) == 3:
87star_samples[num] = np.reshape(sample, sample.shape[:2])
88if sample.shape[0] % 2 == 1:
89star_samples[num] = np.delete(star_samples[num], 0, axis=0)
90
91if sample.shape[1] % 2 == 1:
92star_samples[num] = np.delete(star_samples[num], 0, axis=1)
93if sample.dtype == 'uint16':
94star_samples[num] = star_samples[num].astype(np.float32) / 65535
95
96for num, sample in enumerate(comet_samples):
97if len(sample.shape) == 3:
98comet_samples[num] = np.reshape(sample, sample.shape[:2])
99if sample.shape[0] % 2 == 1:
100comet_samples[num] = np.delete(comet_samples[num], 0, axis=0)
101
102if sample.shape[1] % 2 == 1:
103comet_samples[num] = np.delete(comet_samples[num], 0, axis=1)
104if sample.dtype == 'uint16':
105comet_samples[num] = comet_samples[num].astype(np.float32) / 65535
106
107return star_samples, comet_samples
108
109def __get_exclusion_boxes_paths(self) -> list[str]:
110"""
111Get the paths of exclusion boxes files based on the directories of the headers.
112
113Returns:
114List of paths to exclusion boxes files.
115"""
116folders = {os.path.dirname(header.file_name) for header in self.headers}
117exclusion_boxes_files = []
118for folder in folders:
119if "exclusion_boxes.json" in os.listdir(folder):
120exclusion_boxes_files.append(os.path.join(folder, "exclusion_boxes.json"))
121return exclusion_boxes_files
122
123def load_exclusion_boxes(self, force_rebuild: bool = False, magnitude_limit: float = 18.0) -> None:
124"""
125Loads exclusion. Exclusion boxes contain real asteroids in the training data.
126These boxes shouldn't be used for training due to we cannot say if there was synthetic object drawn.
127
128Parameters:
129- force_rebuild: bool, optional, default is False
130If True, forces a rebuild of the exclusion boxes.
131- magnitude_limit: float, optional, default is 18.0
132The magnitude limit for exclusion boxes.
133
134Returns:
135None
136"""
137all_boxes = []
138file_paths = self.__get_exclusion_boxes_paths()
139if len(file_paths) > 0 and not force_rebuild:
140logger.log.info("Loading exclusion boxes...")
141for fp in file_paths:
142with open(fp, 'r') as fileo:
143exclusion_boxes = json.load(fileo)
144all_boxes.extend(exclusion_boxes)
145all_boxes = np.array(all_boxes)
146self.exclusion_boxes = all_boxes
147else:
148logger.log.info("Making exclusion boxes...")
149self.make_exclusion_boxes(magnitude_limit=magnitude_limit)
150
151def make_exclusion_boxes(self, magnitude_limit: float = 18.0) -> None:
152"""
153Create exclusion boxes. It requests the list of known objects for the first image of each imaging session.
154Creates JSON file with the exclusion boxes not to request them again.
155
156Parameters:
157- magnitude_limit: float, optional, default is 18.0
158The magnitude limit for exclusion boxes.
159
160Returns:
161None
162"""
163start_session_idx = 0
164session_timestamps = []
165exclusion_boxes = []
166for num, header in enumerate(self.headers):
167if header.timestamp - self.headers[start_session_idx].timestamp > datetime.timedelta(hours=14):
168session_timestamps.append((start_session_idx, num - 1))
169start_session_idx = num
170else:
171session_timestamps.append((start_session_idx, len(self.headers) - 1))
172
173for start, end in session_timestamps:
174start_asteroids, start_comets = self.fetch_known_asteroids_for_image(start, magnitude_limit=magnitude_limit)
175end_asteroids, end_comets = self.fetch_known_asteroids_for_image(end, magnitude_limit=magnitude_limit)
176start_objects = start_asteroids + start_comets
177end_objects = end_asteroids + end_comets
178for start_object in start_objects:
179for end_object in end_objects:
180if start_object.name == end_object.name:
181start_x, start_y = start_object.pixel_coordinates
182end_x, end_y = end_object.pixel_coordinates
183start_x = int(start_x)
184start_y = int(start_y)
185end_x = int(end_x)
186end_y = int(end_y)
187exclusion_boxes.append((
188max(min(start_x, end_x) - 50, 0),
189max(min(start_y, end_y) - 50, 0),
190min(max(start_x, end_x) + 50, self.shape[2] - 1),
191min(max(start_y, end_y) + 50, self.shape[1] - 1),
192))
193folder = os.path.dirname(self.headers[0].file_name)
194file_name = os.path.join(folder, "exclusion_boxes.json")
195with open(file_name, 'w') as fileo:
196json.dump(exclusion_boxes, fileo)
197exclusion_boxes = np.array(exclusion_boxes)
198self.exclusion_boxes = exclusion_boxes
199
200def show_exclusion_boxes(self) -> None:
201"""
202Display exclusion boxes on an image. (Debug method)
203"""
204max_image = np.copy(self.max_image)
205max_image = cv2.cvtColor(max_image, cv2.COLOR_GRAY2BGR)
206for xlt, ylt, xbr, ybr in self.exclusion_boxes:
207max_image = cv2.rectangle(max_image, (xlt, ylt), (xbr, ybr), (255, 0, 0), 3)
208small = cv2.resize(max_image, (0, 0), fx=0.4, fy=0.4)
209cv2.imshow("Exclusion boxes", small)
210cv2.waitKey(0)
211
212def get_random_shrink(self) -> np.ndarray:
213"""
214Generate a random image chunk that does not overlap with exclusion boxes.
215
216Returns:
217Numpy array representing the random image chunk.
218"""
219generated = False
220while not generated:
221y = random.randint(0, self.shape[1] - CHUNK_SIZE)
222x = random.randint(0, self.shape[2] - CHUNK_SIZE)
223for box in self.exclusion_boxes:
224x1, y1, x2, y2 = box
225if (x1 - CHUNK_SIZE <= x <= x2 + CHUNK_SIZE) and (y1 - CHUNK_SIZE <= y <= y2 + CHUNK_SIZE):
226break
227else:
228res = np.copy(self.images[:, y:y + CHUNK_SIZE, x:x + CHUNK_SIZE])
229res = np.reshape(res, res.shape[:3])
230return res
231
232@staticmethod
233def insert_star_by_coords(image: np.ndarray, star: np.ndarray, coords: tuple[int, int]) -> np.ndarray:
234"""
235Inserts a sample of the star onto another image at specified coordinates.
236
237Args:
238image (np.ndarray): The image to insert the star image into.
239star (np.ndarray): The star image to be inserted.
240coords (tuple[int, int]): The coordinates to place the star image.
241
242Returns:
243np.ndarray: The image with the star image inserted.
244"""
245return insert_star_by_coords(image, star, coords)
246
247@classmethod
248def calculate_star_form_on_single_image(
249cls, image: np.ndarray, star: np.ndarray, start_coords: tuple[int, int],
250movement_vector: np.ndarray, exposure_time: Optional[Decimal] = None) -> np.ndarray:
251"""
252Calculate the form of a star sample on a single image based on the star sample, coordinates, movement vector,
253and exposure time. The idea is to emulate object sample movement during the single exposure.
254
255Args:
256image (np.ndarray): The image to insert the star form into.
257star (np.ndarray): The star sample image to be inserted.
258start_coords (Tuple[int, int]): The starting coordinates to place the star image.
259movement_vector (np.ndarray): The movement vector of the star image.
260exposure_time (Optional[float], optional): The exposure time of the image. Defaults to None.
261
262Returns:
263np.ndarray: The image with the star form inserted.
264"""
265return calculate_star_form_on_single_image(image, star, start_coords, movement_vector, exposure_time)
266
267def generate_timestamps(self) -> tuple[list[datetime.datetime], Decimal]:
268"""
269Generate random timestamps emulating timestamps of observation sessions.
270
271Returns:
272Tuple: A tuple containing a list of generated timestamps and a random exposure time.
273"""
274timestamps, exposure = generate_timestamps(len(self.images))
275exposure = Decimal(exposure)
276timestamps = [datetime.datetime.utcfromtimestamp(ts) for ts in timestamps]
277return timestamps, exposure
278
279def generate_random_objects(self, obj_type: str = "star") -> RandomObject:
280"""
281Generates random objects based on the provided object type.
282
283Parameters:
284obj_type (str): The type of object to generate ("star" or "comet"). Default is "star".
285
286Returns:
287RandomObject: An object containing the generated random properties.
288"""
289if obj_type == self.OBJ_TYPE_STAR:
290samples = self.star_samples
291elif obj_type == self.OBJ_TYPE_COMET:
292samples = self.comet_samples
293elif obj_type == self.OBJ_TYPE_ALL:
294samples = self.comet_samples + self.star_samples
295else:
296raise ValueError(f"Unknown object type: {obj_type}")
297
298(start_y, start_x, start_frame_idx, movement_vector, brightness_above_noize, star_sample, exposure, timestamps
299) = generate_random_objects(len(self.images), samples)
300timestamps = [datetime.datetime.utcfromtimestamp(ts) for ts in timestamps]
301return RandomObject(start_y, start_x, start_frame_idx, movement_vector,
302brightness_above_noize, star_sample, Decimal(exposure), timestamps)
303
304def draw_object_on_image_series_numpy(self, rand_obg: RandomObject) -> tuple[np.ndarray, int]:
305"""
306Draws a random object on a random image series. The idea is to emulate object sample movement on the image
307series.
308We choose random object sample, random movement vector (in pixels per hour), and random exposure time.
309Then we need to draw the object on the image series taking in account timestamps and exposure time.
310
311Args:
312rand_obg (RandomObject): The random object to draw on the image series.
313
314Returns:
315np.ndarray: The image series with the random object drawn on it.
316"""
317imgs = self.get_random_shrink()
318imgs = np.reshape(np.copy(imgs), (imgs.shape[:3]))
319old_images = np.copy(imgs)
320drawn = 0
321while not drawn:
322noise_level = self.estimate_image_noize_level(imgs)
323signal_space = 1 - noise_level
324expected_star_max = signal_space * rand_obg.brightness_above_noize + noise_level
325star_max = np.amax(rand_obg.star_sample)
326multiplier = expected_star_max / star_max
327star_img = rand_obg.star_sample * multiplier
328
329movement_vector = rand_obg.movement_vector
330
331# Here we solve the problem object coordinates and movement vector selection to guarantee that the object
332# will appear in the series. Instead of choosing coordinates on the first image and calculating possible
333# movement vector - we choose coordinates on the image in the middle of the series, choose random movement
334# vector and insert it on each emage from the middle to the beginning and from the middle to the end.
335
336# Draw the object on the image series moving backwards from the start frame to the beginning of the series.
337to_beginning_slice = slice(None, rand_obg.start_frame_idx)
338start_ts = rand_obg.timestamps[rand_obg.start_frame_idx]
339for img, timestamp in zip(
340imgs[to_beginning_slice][::-1],
341rand_obg.timestamps[to_beginning_slice][::-1]
342):
343inter_image_movement_vector = np.array(movement_vector) * (timestamp - start_ts).total_seconds() / 3600
344y, x = inter_image_movement_vector + np.array([rand_obg.start_y, rand_obg.start_x])
345if y + star_img.shape[0] < 0 or y - star_img.shape[0] > img.shape[0]:
346break
347if x + star_img.shape[1] < 0 or x - star_img.shape[1] > img.shape[1]:
348break
349
350new_img = self.calculate_star_form_on_single_image(
351img, star_img, (y, x), movement_vector, rand_obg.exposure)
352img[:] = new_img
353
354# Draw the object on the image series moving from the start frame to the end of the series.
355to_end_slice = slice(rand_obg.start_frame_idx, None, None)
356for img, timestamp in zip(
357imgs[to_end_slice],
358rand_obg.timestamps[to_end_slice]
359):
360inter_image_movement_vector = np.array(movement_vector) * (timestamp - start_ts).total_seconds() / 3600
361y, x = inter_image_movement_vector + np.array([rand_obg.start_y, rand_obg.start_x])
362if y + star_img.shape[0] < 0 or y - star_img.shape[0] > img.shape[0]:
363break
364if x + star_img.shape[1] < 0 or x - star_img.shape[1] > img.shape[1]:
365break
366new_img = self.calculate_star_form_on_single_image(
367img, star_img, (y, x), movement_vector, rand_obg.exposure)
368img[:] = new_img
369result = imgs
370
371drawn = 1
372if (result == old_images).all():
373drawn = 0
374rand_obg = self.generate_random_objects(self.OBJ_TYPE_ALL)
375return result, drawn
376
377def draw_variable_star(self, rand_obj: RandomObject) -> tuple[np.ndarray, int]:
378"""
379Emulates variable stars on image series.
380
381Note: For future debugging. Is not used in the current implementation.
382"""
383raise NotImplementedError("This function needs to be reviewed and refactored.")
384# TODO: Refactor
385imgs = np.copy(self.get_random_shrink())
386old_images = np.copy(imgs)
387
388original_timestamps = rand_obj.timestamps
389timestamps = [0.] + [(item - original_timestamps[0]).total_seconds(
390) for num, item in enumerate(original_timestamps)]
391period = 1.5 * timestamps[-1]
392max_brightness = random.randrange(80, 101) / 100
393min_brightness = max_brightness - random.randrange(30, 61) / 100
394starting_phase = (random.randrange(0, 201) / 100) * np.pi
395y_shape, x_shape = imgs[0].shape[:2]
396y = random.randint(0, y_shape - 1)
397x = random.randint(0, x_shape - 1)
398star_img = self.star_samples[-1]
399star_brightness = np.max(star_img)
400for num, (img, ts) in enumerate(zip(imgs, timestamps)):
401new_phaze = 2 * np.pi * ts / period + starting_phase
402new_brightness = np.sin(new_phaze) * (max_brightness - min_brightness) / 2 + (
403max_brightness + min_brightness) / 2
404brightness_multiplier = new_brightness / star_brightness
405new_star_image = star_img * brightness_multiplier
406new_img = self.calculate_star_form_on_single_image(img, new_star_image, (y, x), (0, 0), 10000)
407imgs[num] = new_img
408drawn = 1
409if (imgs == old_images).all():
410drawn = 0
411return imgs, drawn
412
413# @measure_execution_time
414def draw_one_image_artefact(self, imgs: np.ndarray) -> np.ndarray:
415"""
416Draws one image artefact. Something like cosmic rays or satellite/airplane tracks.
417
418Args:
419imgs (np.ndarray): The input images.
420
421Returns:
422np.ndarray: The modified images.
423"""
424number_of_artefacts = random.choice(list(range(1, 5)) + [0] * 10)
425for _ in range(number_of_artefacts):
426y_shape, x_shape = imgs[0].shape[:2]
427star_img = random.choice(self.star_samples)
428start_image_idx = random.randint(0, len(imgs) - 1)
429y = random.randint(0, y_shape - 1)
430x = random.randint(0, x_shape - 1)
431object_factor = random.randrange(120, 300) / 300
432star_max = np.amax(star_img)
433expected_max = np.average(imgs) + (np.max(imgs) - np.average(imgs)) * object_factor
434if star_max == 0:
435multiplier = 1
436else:
437multiplier = expected_max / star_max
438star_img = star_img * multiplier
439
440# 0 means point like object which appears on the single image
441# 1 means satellite like object which appears on the single image
442is_satellite_like = random.randrange(0, 2)
443if not is_satellite_like:
444movement_vector = np.array([0, 0])
445else:
446movement_vector = np.array([random.randrange(1, 300) * 100, random.randrange(1, 300) * 100])
447
448imgs[start_image_idx] = self.calculate_star_form_on_single_image(
449imgs[start_image_idx], star_img, (y, x), movement_vector, 10000)
450imgs[start_image_idx] = self.calculate_star_form_on_single_image(
451imgs[start_image_idx], star_img, (y, x), - movement_vector, 10000)
452return imgs
453
454@staticmethod
455def draw_hot_pixels(imgs: np.ndarray, dead: bool = False) -> np.ndarray:
456"""
457Draws hot pixels on the images.
458
459Args:
460imgs: The images to draw hot pixels on.
461dead (bool): If True, sets the pixel value to 0, otherwise adjusts brightness.
462
463Returns:
464np.ndarray: The images with hot pixels drawn on them.
465"""
466imgs = np.copy(imgs)
467probablity = random.randrange(10, 51)
468brightness = random.randrange(90, 101) / 100.
469result = []
470for img in imgs:
471if random.randrange(1, 101) < probablity:
472y_shape, x_shape = imgs[0].shape[:2]
473y = random.randint(0, y_shape - 1)
474x = random.randint(0, x_shape - 1)
475img[y, x] = 0 if dead else brightness
476result.append(img)
477result = np.array(result)
478return result
479
480def draw_hot_stars(self, imgs: np.ndarray) -> np.ndarray:
481"""
482Draws hot stars on the images. Like it's done in draw_hot_pixels function, but for object samples,
483not for single pixels.
484
485Args:
486imgs: The images to draw hot stars on.
487
488Returns:
489np.ndarray: The images with hot stars drawn on them.
490"""
491imgs = np.copy(imgs)
492probability = random.randrange(10, 51)
493brightness = random.randrange(80, 101) / 100.
494star_img = random.choice(self.star_samples)
495star_img *= brightness / np.amax(star_img)
496result = []
497for img in imgs:
498if random.randrange(1, 101) < probability:
499y_shape, x_shape = imgs[0].shape[:2]
500y = random.randint(0, y_shape - 1)
501x = random.randint(0, x_shape - 1)
502img = self.insert_star_by_coords(img, star_img, (y, x))
503result.append(img)
504result = np.array(result)
505return result
506
507
508class TrainingDatasetV2:
509"""
510Represents all the data used for training the model.
511"""
512def __init__(self, source_datas: Sequence[TrainingSourceDataV2]):
513self.source_datas = source_datas
514
515@staticmethod
516def make_series(source_data: TrainingSourceDataV2) -> tuple[np.ndarray, int]:
517"""
518Creates a series of images with random objects for training purposes.
519
520Args:
521source_data (TrainingSourceDataV2): The data source for generating random objects.
522
523Returns:
524Tuple containing the generated image series and a result value.
525"""
526rand_obg = source_data.generate_random_objects(obj_type=source_data.OBJ_TYPE_ALL)
527if random.randint(1, 101) > 50:
528what_to_draw = random.randrange(0, 100)
529if what_to_draw < 200:
530imgs, res = source_data.draw_object_on_image_series_numpy(rand_obg)
531else:
532imgs, drawn = source_data.draw_variable_star(rand_obg)
533res = drawn
534else:
535res = 0
536imgs = source_data.get_random_shrink()
537
538if random.randint(0, 100) >= 90:
539imgs = source_data.draw_one_image_artefact(imgs)
540if random.randint(0, 100) >= 90:
541if random.randint(0, 100) >= 50:
542imgs = source_data.draw_hot_stars(imgs)
543else:
544imgs = source_data.draw_hot_pixels(imgs, bool(random.randrange(0, 2)))
545imgs = source_data.prepare_images(imgs)
546imgs, timestamps = source_data.adjust_chunks_to_min_len(imgs, rand_obg.timestamps, min_len=5)
547return imgs, res
548
549def make_batch(self, batch_size: int, save: bool = False) -> tuple[np.ndarray, np.ndarray]:
550"""
551Creates a batch of images with random objects for training purposes.
552
553Args:
554batch_size (int): The size of the batch.
555save (bool): If True, saves the generated images as GIFs.
556
557Returns:
558Tuple containing the generated image series and a result value.
559"""
560source_data = random.choice(self.source_datas)
561batch = [self.make_series(source_data) for _ in range(batch_size)]
562x_fast_batch = np.array([item[0] for item in batch])
563y_batch = np.array([item[1] for item in batch])
564
565# for debug purposes - it's possible to review some samples of synthetic data used for training
566if save:
567for num, (bla_imgs, res) in enumerate(zip(x_fast_batch, y_batch)):
568bla_imgs.shape = bla_imgs.shape[:3]
569bla_imgs = bla_imgs * 256
570new_frames = [Image.fromarray(frame).convert('L').convert('P') for frame in bla_imgs]
571new_frames[0].save(
572f"{num}_{res[0]}.gif",
573save_all=True,
574append_images=new_frames[1:],
575duration=200,
576loop=0)
577return x_fast_batch, y_batch
578
579def batch_generator(self, batch_size: int) -> Generator[tuple[np.ndarray, np.ndarray]]:
580"""
581Generator function that yields batches of image data and corresponding labels.
582
583Args:
584batch_size (int): The size of each batch.
585
586Yields:
587tuple[np.ndarray, np.ndarray]: A tuple containing the batch of images and their corresponding labels.
588"""
589bla = False
590i = 0
591while True:
592# for debug purposes - it's possible to review some samples of synthetic data used for training
593if i == 450:
594bla = True
595yield self.make_batch(batch_size, bla)
596i += 1
597bla = False
598
599
600@numba.jit(nopython=True, fastmath=True)
601def insert_star_by_coords(image, star, coords):
602"""
603Inserts a sample of the star onto another image at specified coordinates.
604
605Note: speed up with Numba
606
607Args:
608image (np.ndarray): The image to insert the star image into.
609star (np.ndarray): The star image to be inserted.
610coords (Tuple[int, int]): The coordinates to place the star image.
611
612Returns:
613np.ndarray: The image with the star image inserted.
614"""
615star_y_size, star_x_size = star.shape[:2]
616image_y_size, image_x_size = image.shape[:2]
617x, y = coords
618x = round(x)
619y = round(y)
620if x + star_x_size // 2 < 0 or x - star_x_size // 2 > image_x_size:
621return image
622if y + star_y_size // 2 < 0 or y - star_y_size // 2 > image_y_size:
623return image
624
625cut_top = y - star_y_size // 2
626cut_top = -cut_top if cut_top < 0 else 0
627cut_bottom = image_y_size - y - star_y_size // 2
628cut_bottom = -cut_bottom if cut_bottom < 0 else 0
629cut_left = x - star_x_size // 2
630cut_left = -cut_left if cut_left < 0 else 0
631cut_right = image_x_size - x - star_x_size // 2
632cut_right = -cut_right if cut_right < 0 else 0
633
634y_slice = slice(y + cut_top - star_y_size // 2, y - cut_bottom + star_y_size // 2)
635x_slice = slice(x + cut_left - star_x_size // 2, x - cut_right + star_x_size // 2)
636image_to_correct = image[y_slice, x_slice]
637image[y_slice, x_slice] = np.maximum(
638star[int(cut_top):int(star_y_size - cut_bottom), int(cut_left):int(star_x_size - cut_right)],
639image_to_correct)
640return image
641
642
643@numba.jit(nopython=True, fastmath=True)
644def calculate_star_form_on_single_image(image: np.ndarray, star: np.ndarray, start_coords: tuple[int, int],
645movement_vector: np.ndarray, exposure_time: Optional[Decimal] = None
646) -> np.ndarray:
647"""
648Calculate the form of a star sample on a single image based on the star sample, coordinates, movement vector,
649and exposure time. The idea is to emulate object sample movement during the single exposure.
650
651Note: speed up with Numba
652
653Args:
654image (np.ndarray): The image to insert the star form into.
655star (np.ndarray): The star sample image to be inserted.
656start_coords (Tuple[int, int]): The starting coordinates to place the star image.
657movement_vector (np.ndarray): The movement vector of the star image.
658exposure_time (Optional[Decimal], optional): The exposure time of the image. Defaults to None.
659
660Returns:
661np.ndarray: The image with the star form inserted.
662"""
663per_image_movement_vector = movement_vector * exposure_time / 3600
664y_move, x_move = per_image_movement_vector
665start_y, start_x = start_coords
666dx = 0
667if x_move == y_move == 0:
668image = insert_star_by_coords(image, star, (start_y, start_x))
669return image
670x_moves_per_y_moves = x_move / y_move
671for dy in range(round(y_move + 1)):
672if dy * x_moves_per_y_moves // 1 > dx:
673dx += 1
674elif dy * x_moves_per_y_moves // 1 < -dx:
675dx -= 1
676image = insert_star_by_coords(image, star, (start_y + dy, start_x + dx))
677if (start_y + dy + star.shape[0] < 0 or start_y + dy - star.shape[0] > image.shape[0]
678or start_x + dx + star.shape[1] < 0 or start_x + dx - star.shape[1] > image.shape[1]):
679break
680return image
681
682
683@numba.jit(nopython=True, fastmath=True)
684def generate_timestamps(ts_num: int) -> tuple[list[int], float]:
685"""
686Generate random timestamps emulating timestamps of images. It's needed to emulate different number of sessions and
687different exposure times within each session.
688
689Args:
690ts_num (int): The number of timestamps to generate.
691
692Returns:
693Tuple: A tuple containing a list of generated timestamps and a numpy array of random exposure times.
694"""
695# generate random timestamps emulating timestamps of observations sessions staring from now
696# each session should have at least 5 timestamps. If there are less than 5 timestamps - only one session will be
697# generated. number of sessions is random. Interval between timestamps is equal to the same random exposure time
698# applicable for all intervals plus some random offset
699min_frames_per_session = 8
700
701max_sessions_num = min((ts_num - 1) // 8, 4)
702sessions_num = 0 if max_sessions_num == 0 else random.randrange(0, max_sessions_num)
703exposures = np.array([0.25, 0.5, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
704exposure = float(exposures[random.randrange(len(exposures))])
705
706if sessions_num == 0:
707frames_per_session = [ts_num]
708else:
709frames_per_session = []
710remaining_sum = ts_num
711for i in range(sessions_num):
712num = random.randint(min_frames_per_session, remaining_sum - min_frames_per_session * (sessions_num - i))
713frames_per_session.append(num)
714remaining_sum -= num
715frames_per_session.append(remaining_sum)
716
717new_session_starts = []
718for i in range(len(frames_per_session) - 1):
719new_session_starts.append(sum(frames_per_session[:i + 1]))
720start_ts = 0
721timestamps = []
722max_inter_exposure = 20 * 60 # 20 minutes
723added_days = 0
724for num in range(ts_num):
725if num == 0:
726next_timestamp = 0
727elif num in new_session_starts:
728added_days += 1
729next_timestamp = start_ts + added_days * 24 * 3600 + random.randrange(
7300, 3) * 60 + random.randrange(0, 3600)
731else:
732next_timestamp = timestamps[-1] + exposure + random.randrange(1, max_inter_exposure + 1)
733timestamps.append(next_timestamp)
734return timestamps, exposure
735
736
737@numba.jit(nopython=True, fastmath=True)
738def generate_random_objects(imgs_num: int, star_samples: list[np.ndarray]
739) -> tuple[int, int, int, np.ndarray, float, np.ndarray, float, list[int]]:
740"""
741Generates random objects based on the provided image number and star samples.
742
743Args:
744imgs_num (int): The number of images.
745star_samples: The samples of stars for object generation.
746
747Returns:
748Tuple: A tuple containing random object properties.
749"""
750start_y = random.randrange(0, CHUNK_SIZE)
751start_x = random.randrange(0, CHUNK_SIZE)
752start_frame_idx = random.randrange(0, imgs_num)
753timestamps, exposure = generate_timestamps(imgs_num)
754brightness_above_noize = float(random.randrange(500, 1001)) / 1000
755star_sample = star_samples[random.randrange(len(star_samples))]
756total_time = timestamps[-1] - timestamps[0]
757total_time /= 3600
758min_vector = max(MIN_TOTAL_MOVEMENT / total_time, 0.5)
759max_vector = 30. # pixels/hour
760vector_len = random.uniform(min_vector, max_vector)
761movement_angle = random.uniform(0., 2 * np.pi)
762movement_vector = np.array([np.sin(movement_angle), np.cos(movement_angle)], dtype=np.float32) * vector_len
763return start_y, start_x, start_frame_idx, movement_vector, brightness_above_noize, star_sample, exposure, timestamps
764
765
766if __name__ == '__main__':
767pass
768