CelestialSurveyor

Форк
0
/
training_dataset_v2.py 
767 строк · 32.6 Кб
1
import cv2
2
import datetime
3
import json
4
import numba
5
import numpy as np
6
import os
7
import random
8
import sys
9
import tifffile
10

11

12
from dataclasses import dataclass
13
from decimal import Decimal
14
from PIL import Image
15
from typing import Optional, Sequence, Generator
16

17
from backend.source_data_v2 import SourceDataV2, CHUNK_SIZE
18
from logger.logger import get_logger
19

20

21
logger = get_logger()
22

23
MIN_TOTAL_MOVEMENT = 5  # px
24

25

26
@dataclass
27
class RandomObject:
28
    """
29
    Represents a random object to be drawn on the image series.
30

31
    Attributes:
32
        start_y (int): The starting y-coordinate of the object.
33
        start_x (int): The starting x-coordinate of the object.
34
        start_frame_idx (int): The starting frame index of the object.
35
        movement_vector (np.ndarray): The movement vector of the object.
36
        brightness_above_noize (float): The brightness level above noise.
37
        star_sample (np.ndarray): The sample of the star.
38
        exposure (Decimal): The exposure value of the object.
39
        timestamps (Sequence[datetime.datetime]): The timestamps associated with the object.
40
    """
41
    start_y: int
42
    start_x: int
43
    start_frame_idx: int
44
    movement_vector: np.ndarray
45
    brightness_above_noize: float
46
    star_sample: np.ndarray
47
    exposure: Decimal
48
    timestamps: list[datetime.datetime]
49

50

51
class TrainingSourceDataV2(SourceDataV2):
52
    """
53
    Class to manage image data. One instance of this class manages one object captured.
54
    There are some methods to generate synthetic data for AI model training.
55
    """
56

57
    SAMPLES_FOLDER = os.path.join(sys.path[1], "star_samples")
58
    OBJ_TYPE_STAR = "star"
59
    OBJ_TYPE_ALL = "all"
60
    OBJ_TYPE_COMET = "comet"
61

62
    def __init__(self, to_debayer: bool = False, number_of_images: Optional[int] = None) -> None:
63
        super().__init__(to_debayer)
64
        self.number_of_images = number_of_images
65
        self.exclusion_boxes = []
66
        logger.log.info("Loading star samples")
67
        self.star_samples, self.comet_samples = self._load_star_samples()
68

69
    @classmethod
70
    def _load_star_samples(cls) -> tuple[list[np.ndarray], list[np.ndarray]]:
71
        """
72
        Loads star and comet samples from the SAMPLES_FOLDER directory.
73

74
        Returns:
75
        Tuple containing a list of star samples and a list of comet samples, both as numpy arrays.
76
        """
77
        file_list_stars = [
78
            os.path.join(cls.SAMPLES_FOLDER, item) for item in os.listdir(cls.SAMPLES_FOLDER)
79
            if ".tif" in item.lower() and "star" in item.lower()]
80
        file_list_comets = [
81
            os.path.join(cls.SAMPLES_FOLDER, item) for item in os.listdir(cls.SAMPLES_FOLDER)
82
            if ".tif" in item.lower() and "comet" in item.lower()]
83
        star_samples = [np.array(tifffile.tifffile.imread(item)) for item in file_list_stars]
84
        comet_samples = [np.array(tifffile.tifffile.imread(item)) for item in file_list_comets]
85
        for num, sample in enumerate(star_samples):
86
            if len(sample.shape) == 3:
87
                star_samples[num] = np.reshape(sample, sample.shape[:2])
88
            if sample.shape[0] % 2 == 1:
89
                star_samples[num] = np.delete(star_samples[num], 0, axis=0)
90

91
            if sample.shape[1] % 2 == 1:
92
                star_samples[num] = np.delete(star_samples[num], 0, axis=1)
93
            if sample.dtype == 'uint16':
94
                star_samples[num] = star_samples[num].astype(np.float32) / 65535
95

96
        for num, sample in enumerate(comet_samples):
97
            if len(sample.shape) == 3:
98
                comet_samples[num] = np.reshape(sample, sample.shape[:2])
99
            if sample.shape[0] % 2 == 1:
100
                comet_samples[num] = np.delete(comet_samples[num], 0, axis=0)
101

102
            if sample.shape[1] % 2 == 1:
103
                comet_samples[num] = np.delete(comet_samples[num], 0, axis=1)
104
            if sample.dtype == 'uint16':
105
                comet_samples[num] = comet_samples[num].astype(np.float32) / 65535
106

107
        return star_samples, comet_samples
108

109
    def __get_exclusion_boxes_paths(self) -> list[str]:
110
        """
111
        Get the paths of exclusion boxes files based on the directories of the headers.
112

113
        Returns:
114
        List of paths to exclusion boxes files.
115
        """
116
        folders = {os.path.dirname(header.file_name) for header in self.headers}
117
        exclusion_boxes_files = []
118
        for folder in folders:
119
            if "exclusion_boxes.json" in os.listdir(folder):
120
                exclusion_boxes_files.append(os.path.join(folder, "exclusion_boxes.json"))
121
        return exclusion_boxes_files
122

123
    def load_exclusion_boxes(self, force_rebuild: bool = False, magnitude_limit: float = 18.0) -> None:
124
        """
125
        Loads exclusion. Exclusion boxes contain real asteroids in the training data.
126
        These boxes shouldn't be used for training due to we cannot say if there was synthetic object drawn.
127

128
        Parameters:
129
        - force_rebuild: bool, optional, default is False
130
            If True, forces a rebuild of the exclusion boxes.
131
        - magnitude_limit: float, optional, default is 18.0
132
            The magnitude limit for exclusion boxes.
133

134
        Returns:
135
        None
136
        """
137
        all_boxes = []
138
        file_paths = self.__get_exclusion_boxes_paths()
139
        if len(file_paths) > 0 and not force_rebuild:
140
            logger.log.info("Loading exclusion boxes...")
141
            for fp in file_paths:
142
                with open(fp, 'r') as fileo:
143
                    exclusion_boxes = json.load(fileo)
144
                    all_boxes.extend(exclusion_boxes)
145
            all_boxes = np.array(all_boxes)
146
            self.exclusion_boxes = all_boxes
147
        else:
148
            logger.log.info("Making exclusion boxes...")
149
            self.make_exclusion_boxes(magnitude_limit=magnitude_limit)
150

151
    def make_exclusion_boxes(self, magnitude_limit: float = 18.0) -> None:
152
        """
153
        Create exclusion boxes. It requests the list of known objects for the first image of each imaging session.
154
        Creates JSON file with the exclusion boxes not to request them again.
155

156
        Parameters:
157
        - magnitude_limit: float, optional, default is 18.0
158
            The magnitude limit for exclusion boxes.
159

160
        Returns:
161
        None
162
        """
163
        start_session_idx = 0
164
        session_timestamps = []
165
        exclusion_boxes = []
166
        for num, header in enumerate(self.headers):
167
            if header.timestamp - self.headers[start_session_idx].timestamp > datetime.timedelta(hours=14):
168
                session_timestamps.append((start_session_idx, num - 1))
169
                start_session_idx = num
170
        else:
171
            session_timestamps.append((start_session_idx, len(self.headers) - 1))
172

173
        for start, end in session_timestamps:
174
            start_asteroids, start_comets = self.fetch_known_asteroids_for_image(start, magnitude_limit=magnitude_limit)
175
            end_asteroids, end_comets = self.fetch_known_asteroids_for_image(end, magnitude_limit=magnitude_limit)
176
            start_objects = start_asteroids + start_comets
177
            end_objects = end_asteroids + end_comets
178
            for start_object in start_objects:
179
                for end_object in end_objects:
180
                    if start_object.name == end_object.name:
181
                        start_x, start_y = start_object.pixel_coordinates
182
                        end_x, end_y = end_object.pixel_coordinates
183
                        start_x = int(start_x)
184
                        start_y = int(start_y)
185
                        end_x = int(end_x)
186
                        end_y = int(end_y)
187
                        exclusion_boxes.append((
188
                            max(min(start_x, end_x) - 50, 0),
189
                            max(min(start_y, end_y) - 50, 0),
190
                            min(max(start_x, end_x) + 50, self.shape[2] - 1),
191
                            min(max(start_y, end_y) + 50, self.shape[1] - 1),
192
                        ))
193
        folder = os.path.dirname(self.headers[0].file_name)
194
        file_name = os.path.join(folder, "exclusion_boxes.json")
195
        with open(file_name, 'w') as fileo:
196
            json.dump(exclusion_boxes, fileo)
197
        exclusion_boxes = np.array(exclusion_boxes)
198
        self.exclusion_boxes = exclusion_boxes
199

200
    def show_exclusion_boxes(self) -> None:
201
        """
202
        Display exclusion boxes on an image. (Debug method)
203
        """
204
        max_image = np.copy(self.max_image)
205
        max_image = cv2.cvtColor(max_image, cv2.COLOR_GRAY2BGR)
206
        for xlt, ylt, xbr, ybr in self.exclusion_boxes:
207
            max_image = cv2.rectangle(max_image, (xlt, ylt), (xbr, ybr), (255, 0, 0), 3)
208
        small = cv2.resize(max_image, (0, 0), fx=0.4, fy=0.4)
209
        cv2.imshow("Exclusion boxes", small)
210
        cv2.waitKey(0)
211

212
    def get_random_shrink(self) -> np.ndarray:
213
        """
214
        Generate a random image chunk that does not overlap with exclusion boxes.
215

216
        Returns:
217
        Numpy array representing the random image chunk.
218
        """
219
        generated = False
220
        while not generated:
221
            y = random.randint(0, self.shape[1] - CHUNK_SIZE)
222
            x = random.randint(0, self.shape[2] - CHUNK_SIZE)
223
            for box in self.exclusion_boxes:
224
                x1, y1, x2, y2 = box
225
                if (x1 - CHUNK_SIZE <= x <= x2 + CHUNK_SIZE) and (y1 - CHUNK_SIZE <= y <= y2 + CHUNK_SIZE):
226
                    break
227
            else:
228
                res = np.copy(self.images[:, y:y + CHUNK_SIZE, x:x + CHUNK_SIZE])
229
                res = np.reshape(res, res.shape[:3])
230
                return res
231

232
    @staticmethod
233
    def insert_star_by_coords(image: np.ndarray, star: np.ndarray, coords: tuple[int, int]) -> np.ndarray:
234
        """
235
        Inserts a sample of the star onto another image at specified coordinates.
236

237
        Args:
238
        image (np.ndarray): The image to insert the star image into.
239
        star (np.ndarray): The star image to be inserted.
240
        coords (tuple[int, int]): The coordinates to place the star image.
241

242
        Returns:
243
        np.ndarray: The image with the star image inserted.
244
        """
245
        return insert_star_by_coords(image, star, coords)
246

247
    @classmethod
248
    def calculate_star_form_on_single_image(
249
            cls, image: np.ndarray, star: np.ndarray, start_coords: tuple[int, int],
250
            movement_vector: np.ndarray, exposure_time: Optional[Decimal] = None) -> np.ndarray:
251
        """
252
        Calculate the form of a star sample on a single image based on the star sample, coordinates, movement vector,
253
        and exposure time. The idea is to emulate object sample movement during the single exposure.
254

255
        Args:
256
            image (np.ndarray): The image to insert the star form into.
257
            star (np.ndarray): The star sample image to be inserted.
258
            start_coords (Tuple[int, int]): The starting coordinates to place the star image.
259
            movement_vector (np.ndarray): The movement vector of the star image.
260
            exposure_time (Optional[float], optional): The exposure time of the image. Defaults to None.
261

262
        Returns:
263
            np.ndarray: The image with the star form inserted.
264
        """
265
        return calculate_star_form_on_single_image(image, star, start_coords, movement_vector, exposure_time)
266

267
    def generate_timestamps(self) -> tuple[list[datetime.datetime], Decimal]:
268
        """
269
        Generate random timestamps emulating timestamps of observation sessions.
270

271
        Returns:
272
            Tuple: A tuple containing a list of generated timestamps and a random exposure time.
273
        """
274
        timestamps, exposure = generate_timestamps(len(self.images))
275
        exposure = Decimal(exposure)
276
        timestamps = [datetime.datetime.utcfromtimestamp(ts) for ts in timestamps]
277
        return timestamps, exposure
278

279
    def generate_random_objects(self, obj_type: str = "star") -> RandomObject:
280
        """
281
        Generates random objects based on the provided object type.
282

283
        Parameters:
284
            obj_type (str): The type of object to generate ("star" or "comet"). Default is "star".
285

286
        Returns:
287
            RandomObject: An object containing the generated random properties.
288
        """
289
        if obj_type == self.OBJ_TYPE_STAR:
290
            samples = self.star_samples
291
        elif obj_type == self.OBJ_TYPE_COMET:
292
            samples = self.comet_samples
293
        elif obj_type == self.OBJ_TYPE_ALL:
294
            samples = self.comet_samples + self.star_samples
295
        else:
296
            raise ValueError(f"Unknown object type: {obj_type}")
297

298
        (start_y, start_x, start_frame_idx, movement_vector, brightness_above_noize, star_sample, exposure, timestamps
299
         ) = generate_random_objects(len(self.images), samples)
300
        timestamps = [datetime.datetime.utcfromtimestamp(ts) for ts in timestamps]
301
        return RandomObject(start_y, start_x, start_frame_idx, movement_vector,
302
                            brightness_above_noize, star_sample, Decimal(exposure), timestamps)
303

304
    def draw_object_on_image_series_numpy(self, rand_obg: RandomObject) -> tuple[np.ndarray, int]:
305
        """
306
        Draws a random object on a random image series. The idea is to emulate object sample movement on the image
307
            series.
308
        We choose random object sample, random movement vector (in pixels per hour), and random exposure time.
309
        Then we need to draw the object on the image series taking in account timestamps and exposure time.
310

311
        Args:
312
            rand_obg (RandomObject): The random object to draw on the image series.
313

314
        Returns:
315
            np.ndarray: The image series with the random object drawn on it.
316
        """
317
        imgs = self.get_random_shrink()
318
        imgs = np.reshape(np.copy(imgs), (imgs.shape[:3]))
319
        old_images = np.copy(imgs)
320
        drawn = 0
321
        while not drawn:
322
            noise_level = self.estimate_image_noize_level(imgs)
323
            signal_space = 1 - noise_level
324
            expected_star_max = signal_space * rand_obg.brightness_above_noize + noise_level
325
            star_max = np.amax(rand_obg.star_sample)
326
            multiplier = expected_star_max / star_max
327
            star_img = rand_obg.star_sample * multiplier
328

329
            movement_vector = rand_obg.movement_vector
330

331
            # Here we solve the problem object coordinates and movement vector selection to guarantee that the object
332
            # will appear in the series. Instead of choosing coordinates on the first image and calculating possible
333
            # movement vector - we choose coordinates on the image in the middle of the series, choose random movement
334
            # vector and insert it on each emage from the middle to the beginning and from the middle to the end.
335

336
            # Draw the object on the image series moving backwards from the start frame to the beginning of the series.
337
            to_beginning_slice = slice(None, rand_obg.start_frame_idx)
338
            start_ts = rand_obg.timestamps[rand_obg.start_frame_idx]
339
            for img, timestamp in zip(
340
                    imgs[to_beginning_slice][::-1],
341
                    rand_obg.timestamps[to_beginning_slice][::-1]
342
            ):
343
                inter_image_movement_vector = np.array(movement_vector) * (timestamp - start_ts).total_seconds() / 3600
344
                y, x = inter_image_movement_vector + np.array([rand_obg.start_y, rand_obg.start_x])
345
                if y + star_img.shape[0] < 0 or y - star_img.shape[0] > img.shape[0]:
346
                    break
347
                if x + star_img.shape[1] < 0 or x - star_img.shape[1] > img.shape[1]:
348
                    break
349

350
                new_img = self.calculate_star_form_on_single_image(
351
                    img, star_img, (y, x), movement_vector, rand_obg.exposure)
352
                img[:] = new_img
353

354
            # Draw the object on the image series moving from the start frame to the end of the series.
355
            to_end_slice = slice(rand_obg.start_frame_idx, None, None)
356
            for img, timestamp in zip(
357
                    imgs[to_end_slice],
358
                    rand_obg.timestamps[to_end_slice]
359
            ):
360
                inter_image_movement_vector = np.array(movement_vector) * (timestamp - start_ts).total_seconds() / 3600
361
                y, x = inter_image_movement_vector + np.array([rand_obg.start_y, rand_obg.start_x])
362
                if y + star_img.shape[0] < 0 or y - star_img.shape[0] > img.shape[0]:
363
                    break
364
                if x + star_img.shape[1] < 0 or x - star_img.shape[1] > img.shape[1]:
365
                    break
366
                new_img = self.calculate_star_form_on_single_image(
367
                    img, star_img, (y, x), movement_vector, rand_obg.exposure)
368
                img[:] = new_img
369
            result = imgs
370

371
            drawn = 1
372
            if (result == old_images).all():
373
                drawn = 0
374
                rand_obg = self.generate_random_objects(self.OBJ_TYPE_ALL)
375
        return result, drawn
376

377
    def draw_variable_star(self, rand_obj: RandomObject) -> tuple[np.ndarray, int]:
378
        """
379
        Emulates variable stars on image series.
380

381
        Note: For future debugging. Is not used in the current implementation.
382
        """
383
        raise NotImplementedError("This function needs to be reviewed and refactored.")
384
        # TODO: Refactor
385
        imgs = np.copy(self.get_random_shrink())
386
        old_images = np.copy(imgs)
387

388
        original_timestamps = rand_obj.timestamps
389
        timestamps = [0.] + [(item - original_timestamps[0]).total_seconds(
390
        ) for num, item in enumerate(original_timestamps)]
391
        period = 1.5 * timestamps[-1]
392
        max_brightness = random.randrange(80, 101) / 100
393
        min_brightness = max_brightness - random.randrange(30, 61) / 100
394
        starting_phase = (random.randrange(0, 201) / 100) * np.pi
395
        y_shape, x_shape = imgs[0].shape[:2]
396
        y = random.randint(0, y_shape - 1)
397
        x = random.randint(0, x_shape - 1)
398
        star_img = self.star_samples[-1]
399
        star_brightness = np.max(star_img)
400
        for num, (img, ts) in enumerate(zip(imgs, timestamps)):
401
            new_phaze = 2 * np.pi * ts / period + starting_phase
402
            new_brightness = np.sin(new_phaze) * (max_brightness - min_brightness) / 2 + (
403
                        max_brightness + min_brightness) / 2
404
            brightness_multiplier = new_brightness / star_brightness
405
            new_star_image = star_img * brightness_multiplier
406
            new_img = self.calculate_star_form_on_single_image(img, new_star_image, (y, x), (0, 0), 10000)
407
            imgs[num] = new_img
408
        drawn = 1
409
        if (imgs == old_images).all():
410
            drawn = 0
411
        return imgs, drawn
412

413
    # @measure_execution_time
414
    def draw_one_image_artefact(self, imgs: np.ndarray) -> np.ndarray:
415
        """
416
        Draws one image artefact. Something like cosmic rays or satellite/airplane tracks.
417

418
        Args:
419
            imgs (np.ndarray): The input images.
420

421
        Returns:
422
            np.ndarray: The modified images.
423
        """
424
        number_of_artefacts = random.choice(list(range(1, 5)) + [0] * 10)
425
        for _ in range(number_of_artefacts):
426
            y_shape, x_shape = imgs[0].shape[:2]
427
            star_img = random.choice(self.star_samples)
428
            start_image_idx = random.randint(0, len(imgs) - 1)
429
            y = random.randint(0, y_shape - 1)
430
            x = random.randint(0, x_shape - 1)
431
            object_factor = random.randrange(120, 300) / 300
432
            star_max = np.amax(star_img)
433
            expected_max = np.average(imgs) + (np.max(imgs) - np.average(imgs)) * object_factor
434
            if star_max == 0:
435
                multiplier = 1
436
            else:
437
                multiplier = expected_max / star_max
438
            star_img = star_img * multiplier
439

440
            # 0 means point like object which appears on the single image
441
            # 1 means satellite like object which appears on the single image
442
            is_satellite_like = random.randrange(0, 2)
443
            if not is_satellite_like:
444
                movement_vector = np.array([0, 0])
445
            else:
446
                movement_vector = np.array([random.randrange(1, 300) * 100, random.randrange(1, 300) * 100])
447

448
            imgs[start_image_idx] = self.calculate_star_form_on_single_image(
449
                imgs[start_image_idx], star_img, (y, x), movement_vector, 10000)
450
            imgs[start_image_idx] = self.calculate_star_form_on_single_image(
451
                imgs[start_image_idx], star_img, (y, x), - movement_vector, 10000)
452
        return imgs
453

454
    @staticmethod
455
    def draw_hot_pixels(imgs: np.ndarray, dead: bool = False) -> np.ndarray:
456
        """
457
        Draws hot pixels on the images.
458

459
        Args:
460
            imgs: The images to draw hot pixels on.
461
            dead (bool): If True, sets the pixel value to 0, otherwise adjusts brightness.
462

463
        Returns:
464
            np.ndarray: The images with hot pixels drawn on them.
465
        """
466
        imgs = np.copy(imgs)
467
        probablity = random.randrange(10, 51)
468
        brightness = random.randrange(90, 101) / 100.
469
        result = []
470
        for img in imgs:
471
            if random.randrange(1, 101) < probablity:
472
                y_shape, x_shape = imgs[0].shape[:2]
473
                y = random.randint(0, y_shape - 1)
474
                x = random.randint(0, x_shape - 1)
475
                img[y, x] = 0 if dead else brightness
476
            result.append(img)
477
        result = np.array(result)
478
        return result
479

480
    def draw_hot_stars(self, imgs: np.ndarray) -> np.ndarray:
481
        """
482
        Draws hot stars on the images. Like it's done in draw_hot_pixels function, but for object samples,
483
        not for single pixels.
484

485
        Args:
486
            imgs: The images to draw hot stars on.
487

488
        Returns:
489
            np.ndarray: The images with hot stars drawn on them.
490
        """
491
        imgs = np.copy(imgs)
492
        probability = random.randrange(10, 51)
493
        brightness = random.randrange(80, 101) / 100.
494
        star_img = random.choice(self.star_samples)
495
        star_img *= brightness / np.amax(star_img)
496
        result = []
497
        for img in imgs:
498
            if random.randrange(1, 101) < probability:
499
                y_shape, x_shape = imgs[0].shape[:2]
500
                y = random.randint(0, y_shape - 1)
501
                x = random.randint(0, x_shape - 1)
502
                img = self.insert_star_by_coords(img, star_img, (y, x))
503
            result.append(img)
504
        result = np.array(result)
505
        return result
506

507

508
class TrainingDatasetV2:
509
    """
510
    Represents all the data used for training the model.
511
    """
512
    def __init__(self, source_datas: Sequence[TrainingSourceDataV2]):
513
        self.source_datas = source_datas
514

515
    @staticmethod
516
    def make_series(source_data: TrainingSourceDataV2) -> tuple[np.ndarray, int]:
517
        """
518
        Creates a series of images with random objects for training purposes.
519

520
        Args:
521
            source_data (TrainingSourceDataV2): The data source for generating random objects.
522

523
        Returns:
524
            Tuple containing the generated image series and a result value.
525
        """
526
        rand_obg = source_data.generate_random_objects(obj_type=source_data.OBJ_TYPE_ALL)
527
        if random.randint(1, 101) > 50:
528
            what_to_draw = random.randrange(0, 100)
529
            if what_to_draw < 200:
530
                imgs, res = source_data.draw_object_on_image_series_numpy(rand_obg)
531
            else:
532
                imgs, drawn = source_data.draw_variable_star(rand_obg)
533
                res = drawn
534
        else:
535
            res = 0
536
            imgs = source_data.get_random_shrink()
537

538
        if random.randint(0, 100) >= 90:
539
            imgs = source_data.draw_one_image_artefact(imgs)
540
        if random.randint(0, 100) >= 90:
541
            if random.randint(0, 100) >= 50:
542
                imgs = source_data.draw_hot_stars(imgs)
543
            else:
544
                imgs = source_data.draw_hot_pixels(imgs, bool(random.randrange(0, 2)))
545
        imgs = source_data.prepare_images(imgs)
546
        imgs, timestamps = source_data.adjust_chunks_to_min_len(imgs, rand_obg.timestamps, min_len=5)
547
        return imgs, res
548

549
    def make_batch(self, batch_size: int, save: bool = False) -> tuple[np.ndarray, np.ndarray]:
550
        """
551
        Creates a batch of images with random objects for training purposes.
552

553
        Args:
554
            batch_size (int): The size of the batch.
555
            save (bool): If True, saves the generated images as GIFs.
556

557
        Returns:
558
            Tuple containing the generated image series and a result value.
559
        """
560
        source_data = random.choice(self.source_datas)
561
        batch = [self.make_series(source_data) for _ in range(batch_size)]
562
        x_fast_batch = np.array([item[0] for item in batch])
563
        y_batch = np.array([item[1] for item in batch])
564

565
        # for debug purposes - it's possible to review some samples of synthetic data used for training
566
        if save:
567
            for num, (bla_imgs, res) in enumerate(zip(x_fast_batch, y_batch)):
568
                bla_imgs.shape = bla_imgs.shape[:3]
569
                bla_imgs = bla_imgs * 256
570
                new_frames = [Image.fromarray(frame).convert('L').convert('P') for frame in bla_imgs]
571
                new_frames[0].save(
572
                    f"{num}_{res[0]}.gif",
573
                    save_all=True,
574
                    append_images=new_frames[1:],
575
                    duration=200,
576
                    loop=0)
577
        return x_fast_batch, y_batch
578

579
    def batch_generator(self, batch_size: int) -> Generator[tuple[np.ndarray, np.ndarray]]:
580
        """
581
        Generator function that yields batches of image data and corresponding labels.
582

583
        Args:
584
            batch_size (int): The size of each batch.
585

586
        Yields:
587
            tuple[np.ndarray, np.ndarray]: A tuple containing the batch of images and their corresponding labels.
588
        """
589
        bla = False
590
        i = 0
591
        while True:
592
            # for debug purposes - it's possible to review some samples of synthetic data used for training
593
            if i == 450:
594
                bla = True
595
            yield self.make_batch(batch_size, bla)
596
            i += 1
597
            bla = False
598

599

600
@numba.jit(nopython=True, fastmath=True)
601
def insert_star_by_coords(image, star, coords):
602
    """
603
    Inserts a sample of the star onto another image at specified coordinates.
604

605
    Note: speed up with Numba
606

607
    Args:
608
    image (np.ndarray): The image to insert the star image into.
609
    star (np.ndarray): The star image to be inserted.
610
    coords (Tuple[int, int]): The coordinates to place the star image.
611

612
    Returns:
613
    np.ndarray: The image with the star image inserted.
614
    """
615
    star_y_size, star_x_size = star.shape[:2]
616
    image_y_size, image_x_size = image.shape[:2]
617
    x, y = coords
618
    x = round(x)
619
    y = round(y)
620
    if x + star_x_size // 2 < 0 or x - star_x_size // 2 > image_x_size:
621
        return image
622
    if y + star_y_size // 2 < 0 or y - star_y_size // 2 > image_y_size:
623
        return image
624

625
    cut_top = y - star_y_size // 2
626
    cut_top = -cut_top if cut_top < 0 else 0
627
    cut_bottom = image_y_size - y - star_y_size // 2
628
    cut_bottom = -cut_bottom if cut_bottom < 0 else 0
629
    cut_left = x - star_x_size // 2
630
    cut_left = -cut_left if cut_left < 0 else 0
631
    cut_right = image_x_size - x - star_x_size // 2
632
    cut_right = -cut_right if cut_right < 0 else 0
633

634
    y_slice = slice(y + cut_top - star_y_size // 2, y - cut_bottom + star_y_size // 2)
635
    x_slice = slice(x + cut_left - star_x_size // 2, x - cut_right + star_x_size // 2)
636
    image_to_correct = image[y_slice, x_slice]
637
    image[y_slice, x_slice] = np.maximum(
638
        star[int(cut_top):int(star_y_size - cut_bottom), int(cut_left):int(star_x_size - cut_right)],
639
        image_to_correct)
640
    return image
641

642

643
@numba.jit(nopython=True, fastmath=True)
644
def calculate_star_form_on_single_image(image: np.ndarray, star: np.ndarray, start_coords: tuple[int, int],
645
                                        movement_vector: np.ndarray, exposure_time: Optional[Decimal] = None
646
                                        ) -> np.ndarray:
647
    """
648
    Calculate the form of a star sample on a single image based on the star sample, coordinates, movement vector,
649
    and exposure time. The idea is to emulate object sample movement during the single exposure.
650

651
    Note: speed up with Numba
652

653
    Args:
654
        image (np.ndarray): The image to insert the star form into.
655
        star (np.ndarray): The star sample image to be inserted.
656
        start_coords (Tuple[int, int]): The starting coordinates to place the star image.
657
        movement_vector (np.ndarray): The movement vector of the star image.
658
        exposure_time (Optional[Decimal], optional): The exposure time of the image. Defaults to None.
659

660
    Returns:
661
        np.ndarray: The image with the star form inserted.
662
    """
663
    per_image_movement_vector = movement_vector * exposure_time / 3600
664
    y_move, x_move = per_image_movement_vector
665
    start_y, start_x = start_coords
666
    dx = 0
667
    if x_move == y_move == 0:
668
        image = insert_star_by_coords(image, star, (start_y, start_x))
669
        return image
670
    x_moves_per_y_moves = x_move / y_move
671
    for dy in range(round(y_move + 1)):
672
        if dy * x_moves_per_y_moves // 1 > dx:
673
            dx += 1
674
        elif dy * x_moves_per_y_moves // 1 < -dx:
675
            dx -= 1
676
        image = insert_star_by_coords(image, star, (start_y + dy, start_x + dx))
677
        if (start_y + dy + star.shape[0] < 0 or start_y + dy - star.shape[0] > image.shape[0]
678
                or start_x + dx + star.shape[1] < 0 or start_x + dx - star.shape[1] > image.shape[1]):
679
            break
680
    return image
681

682

683
@numba.jit(nopython=True, fastmath=True)
684
def generate_timestamps(ts_num: int) -> tuple[list[int], float]:
685
    """
686
    Generate random timestamps emulating timestamps of images. It's needed to emulate different number of sessions and
687
    different exposure times within each session.
688

689
    Args:
690
        ts_num (int): The number of timestamps to generate.
691

692
    Returns:
693
        Tuple: A tuple containing a list of generated timestamps and a numpy array of random exposure times.
694
    """
695
    # generate random timestamps emulating timestamps of observations sessions staring from now
696
    # each session should have at least 5 timestamps. If there are less than 5 timestamps - only one session will be
697
    # generated. number of sessions is random. Interval between timestamps is equal to the same random exposure time
698
    # applicable for all intervals plus some random offset
699
    min_frames_per_session = 8
700

701
    max_sessions_num = min((ts_num - 1) // 8, 4)
702
    sessions_num = 0 if max_sessions_num == 0 else random.randrange(0, max_sessions_num)
703
    exposures = np.array([0.25, 0.5, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
704
    exposure = float(exposures[random.randrange(len(exposures))])
705

706
    if sessions_num == 0:
707
        frames_per_session = [ts_num]
708
    else:
709
        frames_per_session = []
710
        remaining_sum = ts_num
711
        for i in range(sessions_num):
712
            num = random.randint(min_frames_per_session, remaining_sum - min_frames_per_session * (sessions_num - i))
713
            frames_per_session.append(num)
714
            remaining_sum -= num
715
        frames_per_session.append(remaining_sum)
716

717
    new_session_starts = []
718
    for i in range(len(frames_per_session) - 1):
719
        new_session_starts.append(sum(frames_per_session[:i + 1]))
720
    start_ts = 0
721
    timestamps = []
722
    max_inter_exposure = 20 * 60  # 20 minutes
723
    added_days = 0
724
    for num in range(ts_num):
725
        if num == 0:
726
            next_timestamp = 0
727
        elif num in new_session_starts:
728
            added_days += 1
729
            next_timestamp = start_ts + added_days * 24 * 3600 + random.randrange(
730
                0, 3) * 60 + random.randrange(0, 3600)
731
        else:
732
            next_timestamp = timestamps[-1] + exposure + random.randrange(1, max_inter_exposure + 1)
733
        timestamps.append(next_timestamp)
734
    return timestamps, exposure
735

736

737
@numba.jit(nopython=True, fastmath=True)
738
def generate_random_objects(imgs_num: int, star_samples: list[np.ndarray]
739
                            ) -> tuple[int, int, int, np.ndarray, float, np.ndarray, float, list[int]]:
740
    """
741
    Generates random objects based on the provided image number and star samples.
742

743
    Args:
744
        imgs_num (int): The number of images.
745
        star_samples: The samples of stars for object generation.
746

747
    Returns:
748
        Tuple: A tuple containing random object properties.
749
    """
750
    start_y = random.randrange(0, CHUNK_SIZE)
751
    start_x = random.randrange(0, CHUNK_SIZE)
752
    start_frame_idx = random.randrange(0, imgs_num)
753
    timestamps, exposure = generate_timestamps(imgs_num)
754
    brightness_above_noize = float(random.randrange(500, 1001)) / 1000
755
    star_sample = star_samples[random.randrange(len(star_samples))]
756
    total_time = timestamps[-1] - timestamps[0]
757
    total_time /= 3600
758
    min_vector = max(MIN_TOTAL_MOVEMENT / total_time, 0.5)
759
    max_vector = 30.  # pixels/hour
760
    vector_len = random.uniform(min_vector, max_vector)
761
    movement_angle = random.uniform(0., 2 * np.pi)
762
    movement_vector = np.array([np.sin(movement_angle), np.cos(movement_angle)], dtype=np.float32) * vector_len
763
    return start_y, start_x, start_frame_idx, movement_vector, brightness_above_noize, star_sample, exposure, timestamps
764

765

766
if __name__ == '__main__':
767
    pass
768

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.