FEDOT

Форк
0
/
ts_gapfilling.py 
489 строк · 19.3 Кб
1
from copy import deepcopy
2
from typing import List, Union
3

4
import numpy as np
5
from golem.core.log import default_log
6
from scipy import interpolate
7

8
from fedot.core.data.data import InputData
9
from fedot.core.repository.dataset_types import DataTypesEnum
10
from fedot.core.repository.tasks import Task, TaskTypesEnum, TsForecastingParams
11

12

13
def series_has_gaps_check(gapfilling_method):
14
    """ Check is time series has gaps or not. Return source array, if not """
15

16
    def wrapper(self, input_data, *args, **kwargs):
17
        input_data = replace_nan_with_label(input_data, label=self.gap_value)
18
        gap_ids = np.ravel(np.argwhere(input_data == self.gap_value))
19
        if len(gap_ids) == 0:
20
            self.log.info(f'Array does not contain values marked as gaps {self.gap_value}')
21
            return input_data
22
        else:
23
            self.log.debug(f'Array contain values marked as gaps {self.gap_value}. Start gap-filling')
24
            filled_array = gapfilling_method(self, input_data, *args, **kwargs)
25
            return filled_array
26

27
    return wrapper
28

29

30
class SimpleGapFiller:
31
    """
32
    Base class used for filling in the gaps in time series with simple methods.
33
    Methods from the SimpleGapFiller class can be used for comparison with more
34
    complex models in class ModelGapFiller
35

36
    Args:
37
        gap_value: value, which identify gap elements in array
38
    """
39

40
    def __init__(self, gap_value: float = -100.0):
41
        self.gap_value = gap_value
42
        self.log = default_log(self)
43

44
    @series_has_gaps_check
45
    def linear_interpolation(self, input_data: np.array):
46
        """
47
        Method allows to restore missing values in an array
48
        using linear interpolation
49

50
        Args:
51
            input_data: array with gaps
52

53
        Returns:
54
            array without gaps
55
        """
56

57
        output_data = np.array(input_data)
58
        output_data = replace_nan_with_label(output_data, label=self.gap_value)
59

60
        # Process first and last elements in time series
61
        output_data = self._fill_first_and_last_gaps(input_data, output_data)
62

63
        # The indices of the known elements
64
        non_nan = np.ravel(np.argwhere(output_data != self.gap_value))
65
        # All known elements in the array
66
        masked_array = output_data[non_nan]
67
        f_interploate = interpolate.interp1d(non_nan, masked_array)
68
        x = np.arange(0, len(output_data))
69
        output_data = f_interploate(x)
70
        return output_data
71

72
    @series_has_gaps_check
73
    def local_poly_approximation(self, input_data, degree: int = 2,
74
                                 n_neighbors: int = 5):
75
        """Method allows to restore missing values in an array
76
        using Savitzky-Golay filter
77

78
        Args:
79
            input_data: array with gaps
80
            degree: degree of a polynomial function
81
            n_neighbors: number of neighboring known elements of the time
82
            series that the approximation is based on
83

84
        Returns:
85
            array without gaps
86
        """
87

88
        output_data = np.array(input_data)
89
        output_data = replace_nan_with_label(output_data, label=self.gap_value)
90

91
        i_gaps = np.ravel(np.argwhere(output_data == self.gap_value))
92

93
        # Iterately fill in the gaps in the time series
94
        for gap_index in i_gaps:
95
            # Indexes of known elements (updated at each iteration)
96
            i_known = np.argwhere(output_data != self.gap_value)
97
            i_known = np.ravel(i_known)
98

99
            # Based on the indexes we calculate how far from the gap
100
            # the known values are located
101
            id_distances = np.abs(i_known - gap_index)
102

103
            # Now we know the indices of the smallest values in the array,
104
            # so sort indexes
105
            sorted_idx = np.argsort(id_distances)
106
            nearest_values = []
107
            nearest_indices = []
108
            for i in sorted_idx[:n_neighbors]:
109
                time_index = i_known[i]
110
                nearest_values.append(output_data[time_index])
111
                nearest_indices.append(time_index)
112
            nearest_values = np.array(nearest_values)
113
            nearest_indices = np.array(nearest_indices)
114

115
            local_coefs = np.polyfit(nearest_indices, nearest_values, degree)
116
            est_value = np.polyval(local_coefs, gap_index)
117
            output_data[gap_index] = est_value
118

119
        return output_data
120

121
    @series_has_gaps_check
122
    def batch_poly_approximation(self, input_data, degree: int = 3,
123
                                 n_neighbors: int = 10):
124
        """Method allows to restore missing values in an array using
125
        batch polynomial approximations.
126
        Approximation is applied not for individual omissions, but for
127
        intervals of omitted values
128

129
        Args:
130
            input_data: array with gaps
131
            degree: degree of a polynomial function
132
            n_neighbors: the number of neighboring known elements of
133
            time series that the approximation is based on
134

135
        Returns:
136
            array without gaps
137
        """
138

139
        output_data = np.array(input_data)
140
        output_data = replace_nan_with_label(output_data, label=self.gap_value)
141

142
        # Gap indices
143
        gap_list = np.ravel(np.argwhere(output_data == self.gap_value))
144
        new_gap_list = self._parse_gap_ids(gap_list)
145

146
        # Iterately fill in the gaps in the time series
147
        for gap in new_gap_list:
148
            # Find the center point of the gap
149
            center_index = int((gap[0] + gap[-1]) / 2)
150

151
            # Indexes of known elements (updated at each iteration)
152
            i_known = np.argwhere(output_data != self.gap_value)
153
            i_known = np.ravel(i_known)
154

155
            # Based on the indexes we calculate how far from the gap
156
            # the known values are located
157
            id_distances = np.abs(i_known - center_index)
158

159
            # Now we know the indices of the smallest values in the array,
160
            # so sort indexes
161
            sorted_idx = np.argsort(id_distances)
162

163
            # Nearest known values to the gap
164
            nearest_values = []
165
            # And their indexes
166
            nearest_indices = []
167
            for i in sorted_idx[:n_neighbors]:
168
                # Getting the index value for the series - output_data
169
                time_index = i_known[i]
170
                # Using this index, we get the value of each of the "neighbors"
171
                nearest_values.append(output_data[time_index])
172
                nearest_indices.append(time_index)
173
            nearest_values = np.array(nearest_values)
174
            nearest_indices = np.array(nearest_indices)
175

176
            # Local approximation by an n-th degree polynomial
177
            local_coefs = np.polyfit(nearest_indices, nearest_values, degree)
178

179
            # Estimate our interval according to the selected coefficients
180
            est_value = np.polyval(local_coefs, gap)
181
            output_data[gap] = est_value
182

183
        return output_data
184

185
    def _parse_gap_ids(self, gap_list: Union[List, np.ndarray]) -> list:
186
        """Method allows parsing source array with gaps indexes
187

188
        Args:
189
            gap_list: array with indexes of gaps in array
190

191
        Returns:
192
            a list with separated gaps in continuous intervals
193
        """
194

195
        new_gap_list = []
196
        local_gaps = []
197
        for index, gap in enumerate(gap_list):
198
            if index == 0:
199
                local_gaps.append(gap)
200
            else:
201
                prev_gap = gap_list[index - 1]
202
                if gap - prev_gap > 1:
203
                    # There is a "gap" between gaps
204
                    new_gap_list.append(local_gaps)
205

206
                    local_gaps = []
207
                    local_gaps.append(gap)
208
                else:
209
                    local_gaps.append(gap)
210
        new_gap_list.append(local_gaps)
211

212
        return new_gap_list
213

214
    def _fill_first_and_last_gaps(self, input_data: np.array, output_data: np.array):
215
        """ Eliminate gaps, which place first or last index in time series """
216
        non_nan_ids = np.ravel(np.argwhere(output_data != self.gap_value))
217
        non_nan = output_data[non_nan_ids]
218
        if np.isclose(input_data[0], self.gap_value):
219
            # First element is a gap - replace with first known value
220
            self.log.info('First element in the array were replaced by first known value')
221
            output_data[0] = non_nan[0]
222
        if np.isclose(input_data[-1], self.gap_value):
223
            # Last element is a gap - last known value
224
            self.log.info('Last element in the array were replaced by last known value')
225
            output_data[-1] = non_nan[-1]
226

227
        return output_data
228

229

230
class ModelGapFiller(SimpleGapFiller):
231
    """
232
    Class used for filling in the gaps in time series
233

234
    Args:
235
        gap_value: value, which mask gap elements in array
236
        pipeline: TsForecastingPipeline object for filling in the gaps
237
    """
238

239
    def __init__(self, gap_value, pipeline):
240
        super().__init__(gap_value)
241
        self.pipeline = pipeline
242

243
        # At least 6 elements needed to train pipeline with lagged transformation
244
        self.min_train_ts_length = 6
245

246
    @series_has_gaps_check
247
    def forward_inverse_filling(self, input_data):
248
        """Method fills in the gaps in the input array using forward and inverse
249
        directions of predictions
250

251
        Args:
252
            input_data: data with gaps to filling in the gaps in it
253

254
        Returns:
255
            array without gaps
256
        """
257
        output_data = np.array(input_data)
258
        output_data = replace_nan_with_label(output_data, label=self.gap_value)
259
        # Gap indices
260
        gap_list = np.ravel(np.argwhere(output_data == self.gap_value))
261
        new_gap_list = self._parse_gap_ids(gap_list)
262

263
        # Iteratively fill in the gaps in the time series
264
        for batch_index in range(len(new_gap_list)):
265

266
            preds = []
267
            weights = []
268
            # Two predictions are generated for each gap - forward and backward
269
            for direction_function in [self._forward, self._inverse]:
270
                weights_list, predicted_list = direction_function(output_data,
271
                                                                  batch_index,
272
                                                                  new_gap_list)
273
                weights.append(weights_list)
274
                preds.append(predicted_list)
275

276
            preds = np.array(preds)
277
            weights = np.array(weights)
278
            result = np.average(preds, axis=0, weights=weights)
279

280
            gap = new_gap_list[batch_index]
281
            # Replace gaps in an array with prediction values
282
            output_data[gap] = result
283

284
        return output_data
285

286
    @series_has_gaps_check
287
    def forward_filling(self, input_data: Union[List, np.ndarray]):
288
        """ Method fills in the gaps in the input array using graph with only
289
        forward direction (i.e. time series forecasting)
290

291
        Args:
292
            input_data: data with gaps to filling in the gaps in it
293

294
        Returns:
295
            array without gaps
296
        """
297
        output_data = np.array(input_data)
298
        output_data = replace_nan_with_label(output_data, label=self.gap_value)
299

300
        # Gap indices
301
        gap_list = np.ravel(np.argwhere(output_data == self.gap_value))
302
        new_gap_list = self._parse_gap_ids(gap_list)
303

304
        # Iterately fill in the gaps in the time series
305
        for gap in new_gap_list:
306
            # The entire time series is used for training until the gap
307
            first_gap_element_id = gap[0]
308
            timeseries_train_part = output_data[:first_gap_element_id]
309

310
            # Make forecast in the gap
311
            predicted = self.__forecast_in_gap(self.pipeline,
312
                                               timeseries_train_part,
313
                                               output_data, gap)
314

315
            # Replace gaps in an array with prediction values
316
            output_data[gap] = predicted
317
        return output_data
318

319
    def _forward(self, output_data, batch_index, new_gap_list):
320
        """The time series method makes a forward forecast based on the part
321
        of the time series that is located to the left of the gap.
322

323
        Args:
324
            output_data: one-dimensional array of a time series
325
            batch_index: index of the interval (batch) with a gap
326
            new_gap_list: array with nested lists of gap indexes
327

328
        Returns:
329
            weights_list: numpy array with prediction weights for averaging
330
            predicted: numpy array with prediction values in the gap
331
        """
332

333
        gap = new_gap_list[batch_index]
334
        first_gap_element_id = gap[0]
335
        timeseries_train_part = output_data[:first_gap_element_id]
336

337
        # Adaptive prediction interval length
338
        len_gap = len(gap)
339
        predicted = self.__forecast_in_gap(self.pipeline,
340
                                           timeseries_train_part,
341
                                           output_data, gap)
342
        weights_list = np.arange(len_gap, 0, -1)
343
        return weights_list, predicted
344

345
    def _inverse(self, output_data, batch_index, new_gap_list):
346
        """The time series method makes an inverse forecast based on the part
347
        of the time series that is located to the right of the gap.
348

349
        Args:
350
            output_data: one-dimensional array of a time series
351
            batch_index: index of the interval (batch) with a gap
352
            new_gap_list: array with nested lists of gap indexes
353

354
        Returns:
355
            weights_list: numpy array with prediction weights for averaging
356
            predicted_values: numpy array with prediction values in the gap
357
        """
358

359
        gap = new_gap_list[batch_index]
360
        # Adaptive prediction interval length
361
        len_gap = len(gap)
362
        weights_list = np.arange(1, (len_gap + 1), 1)
363

364
        first_gap_element_id = gap[0]
365
        latest_gap_element_id = gap[-1]
366
        if batch_index == len(new_gap_list) - 1:
367
            # If the interval with a gap is the last one in the array
368
            timeseries_train_part = output_data[(latest_gap_element_id + 1):]
369

370
            is_gap_in_end_time_series = len(timeseries_train_part) == 0
371
            is_series_size_not_enough = (len(timeseries_train_part) - len_gap) < self.min_train_ts_length
372
            if is_gap_in_end_time_series:
373
                # The gap is last element - take last observed value as predicted
374
                last_known_value = output_data[first_gap_element_id - 1]
375
                return weights_list, [last_known_value] * len_gap
376
            elif is_series_size_not_enough:
377
                # Number of elements in time series after gap is not enough for
378
                # model training - interpolation is required
379
                last_known_value_id = first_gap_element_id - 1 if first_gap_element_id > 0 else 0
380
                extended_part = output_data[last_known_value_id:]
381
        else:
382
            # Next gap interval is exist
383
            next_gap = new_gap_list[batch_index + 1]
384
            timeseries_train_part = output_data[(latest_gap_element_id + 1): next_gap[0]]
385

386
            # Take part with known values to the left from the gap
387
            extended_part = output_data[(first_gap_element_id - 1): next_gap[0]]
388

389
            if first_gap_element_id == 0:
390
                # Gap in the first part of time series - take first observed value
391
                first_known_value = timeseries_train_part[0]
392
                return weights_list, [first_known_value] * len_gap
393
        timeseries_train_part = np.flip(timeseries_train_part)
394

395
        train_ts_len = len(timeseries_train_part) - len_gap
396
        if train_ts_len < self.min_train_ts_length:
397
            interpolated_part = self.linear_interpolation(extended_part)
398
            # Clip pre-history
399
            interpolated_part = interpolated_part[1:]
400
            # Clip parts after gap interval
401
            predicted = interpolated_part[:len_gap]
402
        else:
403
            predicted = self.__pipeline_fit_predict(self.pipeline,
404
                                                    timeseries_train_part,
405
                                                    len_gap)
406

407
            predicted = np.flip(predicted)
408
        return weights_list, predicted
409

410
    def __pipeline_fit_predict(self, pipeline, timeseries_train: np.array, len_gap: int):
411
        """The method makes a prediction as a sequence of elements based on a
412
        training sample. There are two main parts: fit model and predict.
413

414
        Args:
415
            pipeline: pipeline for forecasting
416
            timeseries_train: part of the time series for training the model
417
            len_gap: number of elements in the gap
418

419
        Returns:
420
            array without gaps
421
        """
422
        pipeline_for_forecast = deepcopy(pipeline)
423

424
        task = Task(TaskTypesEnum.ts_forecasting,
425
                    TsForecastingParams(forecast_length=len_gap))
426

427
        input_data = InputData(idx=np.arange(0, len(timeseries_train)),
428
                               features=timeseries_train,
429
                               target=timeseries_train,
430
                               task=task,
431
                               data_type=DataTypesEnum.ts)
432

433
        forecast_length = input_data.task.task_params.forecast_length
434
        data_length = input_data.features.shape[0]
435
        for node in pipeline_for_forecast.nodes:
436
            if node.name == 'lagged':
437
                if node.parameters['window_size'] + forecast_length >= data_length:
438
                    node.parameters = {'window_size': max(data_length - forecast_length - 10, 2)}
439

440
        # Making predictions for the missing part in the time series
441
        pipeline_for_forecast.fit_from_scratch(input_data)
442

443
        # "Test data" for making prediction for a specific length
444
        start_forecast = len(timeseries_train)
445
        end_forecast = start_forecast + len_gap
446
        idx_test = np.arange(start_forecast, end_forecast)
447
        test_data = InputData(idx=idx_test,
448
                              features=timeseries_train,
449
                              target=None,
450
                              task=task,
451
                              data_type=DataTypesEnum.ts)
452

453
        predicted_values = pipeline_for_forecast.predict(test_data)
454
        predicted_values = np.ravel(np.array(predicted_values.predict))
455
        return predicted_values
456

457
    def __forecast_in_gap(self, pipeline, timeseries_train_part, output_data, gap):
458
        """ Make forecast for desired part of time series with gap
459

460
        Args:
461
            pipeline: pipeline for forecasting
462
            timeseries_train_part: part of time series without gaps to fit pipeline
463
            output_data: array with gaps (some og them may be filled previously)
464
            gap: indices of continuous batch (gap)
465

466
        Returns:
467
            predicted values
468
        """
469

470
        train_ts_len = len(timeseries_train_part) - len(gap)
471
        if train_ts_len < self.min_train_ts_length:
472
            # Take part with gap [..., gap, gap, known_value]
473
            gap_part = output_data[:gap[-1] + 2]
474

475
            # Use linear interpolation - get full time series
476
            interpolated_part = self.linear_interpolation(gap_part)
477
            predicted = interpolated_part[gap]
478
        else:
479
            # Pipeline for the task of filling in gaps
480
            predicted = self.__pipeline_fit_predict(pipeline,
481
                                                    timeseries_train_part,
482
                                                    len(gap))
483

484
        return predicted
485

486

487
def replace_nan_with_label(time_series: np.ndarray, label: Union[int, float]):
488
    """ Replace np.nan in the array with desired label """
489
    return np.nan_to_num(time_series, nan=label)
490

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.