1
from copy import deepcopy
2
from typing import List, Union
5
from golem.core.log import default_log
6
from scipy import interpolate
8
from fedot.core.data.data import InputData
9
from fedot.core.repository.dataset_types import DataTypesEnum
10
from fedot.core.repository.tasks import Task, TaskTypesEnum, TsForecastingParams
13
def series_has_gaps_check(gapfilling_method):
14
""" Check is time series has gaps or not. Return source array, if not """
16
def wrapper(self, input_data, *args, **kwargs):
17
input_data = replace_nan_with_label(input_data, label=self.gap_value)
18
gap_ids = np.ravel(np.argwhere(input_data == self.gap_value))
20
self.log.info(f'Array does not contain values marked as gaps {self.gap_value}')
23
self.log.debug(f'Array contain values marked as gaps {self.gap_value}. Start gap-filling')
24
filled_array = gapfilling_method(self, input_data, *args, **kwargs)
32
Base class used for filling in the gaps in time series with simple methods.
33
Methods from the SimpleGapFiller class can be used for comparison with more
34
complex models in class ModelGapFiller
37
gap_value: value, which identify gap elements in array
40
def __init__(self, gap_value: float = -100.0):
41
self.gap_value = gap_value
42
self.log = default_log(self)
44
@series_has_gaps_check
45
def linear_interpolation(self, input_data: np.array):
47
Method allows to restore missing values in an array
48
using linear interpolation
51
input_data: array with gaps
57
output_data = np.array(input_data)
58
output_data = replace_nan_with_label(output_data, label=self.gap_value)
61
output_data = self._fill_first_and_last_gaps(input_data, output_data)
64
non_nan = np.ravel(np.argwhere(output_data != self.gap_value))
66
masked_array = output_data[non_nan]
67
f_interploate = interpolate.interp1d(non_nan, masked_array)
68
x = np.arange(0, len(output_data))
69
output_data = f_interploate(x)
72
@series_has_gaps_check
73
def local_poly_approximation(self, input_data, degree: int = 2,
74
n_neighbors: int = 5):
75
"""Method allows to restore missing values in an array
76
using Savitzky-Golay filter
79
input_data: array with gaps
80
degree: degree of a polynomial function
81
n_neighbors: number of neighboring known elements of the time
82
series that the approximation is based on
88
output_data = np.array(input_data)
89
output_data = replace_nan_with_label(output_data, label=self.gap_value)
91
i_gaps = np.ravel(np.argwhere(output_data == self.gap_value))
94
for gap_index in i_gaps:
96
i_known = np.argwhere(output_data != self.gap_value)
97
i_known = np.ravel(i_known)
101
id_distances = np.abs(i_known - gap_index)
105
sorted_idx = np.argsort(id_distances)
108
for i in sorted_idx[:n_neighbors]:
109
time_index = i_known[i]
110
nearest_values.append(output_data[time_index])
111
nearest_indices.append(time_index)
112
nearest_values = np.array(nearest_values)
113
nearest_indices = np.array(nearest_indices)
115
local_coefs = np.polyfit(nearest_indices, nearest_values, degree)
116
est_value = np.polyval(local_coefs, gap_index)
117
output_data[gap_index] = est_value
121
@series_has_gaps_check
122
def batch_poly_approximation(self, input_data, degree: int = 3,
123
n_neighbors: int = 10):
124
"""Method allows to restore missing values in an array using
125
batch polynomial approximations.
126
Approximation is applied not for individual omissions, but for
127
intervals of omitted values
130
input_data: array with gaps
131
degree: degree of a polynomial function
132
n_neighbors: the number of neighboring known elements of
133
time series that the approximation is based on
139
output_data = np.array(input_data)
140
output_data = replace_nan_with_label(output_data, label=self.gap_value)
143
gap_list = np.ravel(np.argwhere(output_data == self.gap_value))
144
new_gap_list = self._parse_gap_ids(gap_list)
147
for gap in new_gap_list:
149
center_index = int((gap[0] + gap[-1]) / 2)
152
i_known = np.argwhere(output_data != self.gap_value)
153
i_known = np.ravel(i_known)
157
id_distances = np.abs(i_known - center_index)
161
sorted_idx = np.argsort(id_distances)
167
for i in sorted_idx[:n_neighbors]:
169
time_index = i_known[i]
171
nearest_values.append(output_data[time_index])
172
nearest_indices.append(time_index)
173
nearest_values = np.array(nearest_values)
174
nearest_indices = np.array(nearest_indices)
177
local_coefs = np.polyfit(nearest_indices, nearest_values, degree)
180
est_value = np.polyval(local_coefs, gap)
181
output_data[gap] = est_value
185
def _parse_gap_ids(self, gap_list: Union[List, np.ndarray]) -> list:
186
"""Method allows parsing source array with gaps indexes
189
gap_list: array with indexes of gaps in array
192
a list with separated gaps in continuous intervals
197
for index, gap in enumerate(gap_list):
199
local_gaps.append(gap)
201
prev_gap = gap_list[index - 1]
202
if gap - prev_gap > 1:
204
new_gap_list.append(local_gaps)
207
local_gaps.append(gap)
209
local_gaps.append(gap)
210
new_gap_list.append(local_gaps)
214
def _fill_first_and_last_gaps(self, input_data: np.array, output_data: np.array):
215
""" Eliminate gaps, which place first or last index in time series """
216
non_nan_ids = np.ravel(np.argwhere(output_data != self.gap_value))
217
non_nan = output_data[non_nan_ids]
218
if np.isclose(input_data[0], self.gap_value):
220
self.log.info('First element in the array were replaced by first known value')
221
output_data[0] = non_nan[0]
222
if np.isclose(input_data[-1], self.gap_value):
224
self.log.info('Last element in the array were replaced by last known value')
225
output_data[-1] = non_nan[-1]
230
class ModelGapFiller(SimpleGapFiller):
232
Class used for filling in the gaps in time series
235
gap_value: value, which mask gap elements in array
236
pipeline: TsForecastingPipeline object for filling in the gaps
239
def __init__(self, gap_value, pipeline):
240
super().__init__(gap_value)
241
self.pipeline = pipeline
244
self.min_train_ts_length = 6
246
@series_has_gaps_check
247
def forward_inverse_filling(self, input_data):
248
"""Method fills in the gaps in the input array using forward and inverse
249
directions of predictions
252
input_data: data with gaps to filling in the gaps in it
257
output_data = np.array(input_data)
258
output_data = replace_nan_with_label(output_data, label=self.gap_value)
260
gap_list = np.ravel(np.argwhere(output_data == self.gap_value))
261
new_gap_list = self._parse_gap_ids(gap_list)
264
for batch_index in range(len(new_gap_list)):
269
for direction_function in [self._forward, self._inverse]:
270
weights_list, predicted_list = direction_function(output_data,
273
weights.append(weights_list)
274
preds.append(predicted_list)
276
preds = np.array(preds)
277
weights = np.array(weights)
278
result = np.average(preds, axis=0, weights=weights)
280
gap = new_gap_list[batch_index]
282
output_data[gap] = result
286
@series_has_gaps_check
287
def forward_filling(self, input_data: Union[List, np.ndarray]):
288
""" Method fills in the gaps in the input array using graph with only
289
forward direction (i.e. time series forecasting)
292
input_data: data with gaps to filling in the gaps in it
297
output_data = np.array(input_data)
298
output_data = replace_nan_with_label(output_data, label=self.gap_value)
301
gap_list = np.ravel(np.argwhere(output_data == self.gap_value))
302
new_gap_list = self._parse_gap_ids(gap_list)
305
for gap in new_gap_list:
307
first_gap_element_id = gap[0]
308
timeseries_train_part = output_data[:first_gap_element_id]
311
predicted = self.__forecast_in_gap(self.pipeline,
312
timeseries_train_part,
316
output_data[gap] = predicted
319
def _forward(self, output_data, batch_index, new_gap_list):
320
"""The time series method makes a forward forecast based on the part
321
of the time series that is located to the left of the gap.
324
output_data: one-dimensional array of a time series
325
batch_index: index of the interval (batch) with a gap
326
new_gap_list: array with nested lists of gap indexes
329
weights_list: numpy array with prediction weights for averaging
330
predicted: numpy array with prediction values in the gap
333
gap = new_gap_list[batch_index]
334
first_gap_element_id = gap[0]
335
timeseries_train_part = output_data[:first_gap_element_id]
339
predicted = self.__forecast_in_gap(self.pipeline,
340
timeseries_train_part,
342
weights_list = np.arange(len_gap, 0, -1)
343
return weights_list, predicted
345
def _inverse(self, output_data, batch_index, new_gap_list):
346
"""The time series method makes an inverse forecast based on the part
347
of the time series that is located to the right of the gap.
350
output_data: one-dimensional array of a time series
351
batch_index: index of the interval (batch) with a gap
352
new_gap_list: array with nested lists of gap indexes
355
weights_list: numpy array with prediction weights for averaging
356
predicted_values: numpy array with prediction values in the gap
359
gap = new_gap_list[batch_index]
362
weights_list = np.arange(1, (len_gap + 1), 1)
364
first_gap_element_id = gap[0]
365
latest_gap_element_id = gap[-1]
366
if batch_index == len(new_gap_list) - 1:
368
timeseries_train_part = output_data[(latest_gap_element_id + 1):]
370
is_gap_in_end_time_series = len(timeseries_train_part) == 0
371
is_series_size_not_enough = (len(timeseries_train_part) - len_gap) < self.min_train_ts_length
372
if is_gap_in_end_time_series:
374
last_known_value = output_data[first_gap_element_id - 1]
375
return weights_list, [last_known_value] * len_gap
376
elif is_series_size_not_enough:
379
last_known_value_id = first_gap_element_id - 1 if first_gap_element_id > 0 else 0
380
extended_part = output_data[last_known_value_id:]
383
next_gap = new_gap_list[batch_index + 1]
384
timeseries_train_part = output_data[(latest_gap_element_id + 1): next_gap[0]]
387
extended_part = output_data[(first_gap_element_id - 1): next_gap[0]]
389
if first_gap_element_id == 0:
391
first_known_value = timeseries_train_part[0]
392
return weights_list, [first_known_value] * len_gap
393
timeseries_train_part = np.flip(timeseries_train_part)
395
train_ts_len = len(timeseries_train_part) - len_gap
396
if train_ts_len < self.min_train_ts_length:
397
interpolated_part = self.linear_interpolation(extended_part)
399
interpolated_part = interpolated_part[1:]
401
predicted = interpolated_part[:len_gap]
403
predicted = self.__pipeline_fit_predict(self.pipeline,
404
timeseries_train_part,
407
predicted = np.flip(predicted)
408
return weights_list, predicted
410
def __pipeline_fit_predict(self, pipeline, timeseries_train: np.array, len_gap: int):
411
"""The method makes a prediction as a sequence of elements based on a
412
training sample. There are two main parts: fit model and predict.
415
pipeline: pipeline for forecasting
416
timeseries_train: part of the time series for training the model
417
len_gap: number of elements in the gap
422
pipeline_for_forecast = deepcopy(pipeline)
424
task = Task(TaskTypesEnum.ts_forecasting,
425
TsForecastingParams(forecast_length=len_gap))
427
input_data = InputData(idx=np.arange(0, len(timeseries_train)),
428
features=timeseries_train,
429
target=timeseries_train,
431
data_type=DataTypesEnum.ts)
433
forecast_length = input_data.task.task_params.forecast_length
434
data_length = input_data.features.shape[0]
435
for node in pipeline_for_forecast.nodes:
436
if node.name == 'lagged':
437
if node.parameters['window_size'] + forecast_length >= data_length:
438
node.parameters = {'window_size': max(data_length - forecast_length - 10, 2)}
441
pipeline_for_forecast.fit_from_scratch(input_data)
444
start_forecast = len(timeseries_train)
445
end_forecast = start_forecast + len_gap
446
idx_test = np.arange(start_forecast, end_forecast)
447
test_data = InputData(idx=idx_test,
448
features=timeseries_train,
451
data_type=DataTypesEnum.ts)
453
predicted_values = pipeline_for_forecast.predict(test_data)
454
predicted_values = np.ravel(np.array(predicted_values.predict))
455
return predicted_values
457
def __forecast_in_gap(self, pipeline, timeseries_train_part, output_data, gap):
458
""" Make forecast for desired part of time series with gap
461
pipeline: pipeline for forecasting
462
timeseries_train_part: part of time series without gaps to fit pipeline
463
output_data: array with gaps (some og them may be filled previously)
464
gap: indices of continuous batch (gap)
470
train_ts_len = len(timeseries_train_part) - len(gap)
471
if train_ts_len < self.min_train_ts_length:
473
gap_part = output_data[:gap[-1] + 2]
476
interpolated_part = self.linear_interpolation(gap_part)
477
predicted = interpolated_part[gap]
480
predicted = self.__pipeline_fit_predict(pipeline,
481
timeseries_train_part,
487
def replace_nan_with_label(time_series: np.ndarray, label: Union[int, float]):
488
""" Replace np.nan in the array with desired label """
489
return np.nan_to_num(time_series, nan=label)