ClickHouse

Форк
0
/
AggregateFunctionSequenceMatch.cpp 
766 строк · 27.0 Кб
1
#include <AggregateFunctions/Helpers.h>
2
#include <AggregateFunctions/AggregateFunctionFactory.h>
3

4
#include <DataTypes/DataTypeDate.h>
5
#include <DataTypes/DataTypeDateTime.h>
6

7
#include <AggregateFunctions/IAggregateFunction.h>
8
#include <DataTypes/DataTypesNumber.h>
9
#include <Columns/ColumnsNumber.h>
10
#include <Common/assert_cast.h>
11
#include <IO/ReadHelpers.h>
12
#include <IO/WriteHelpers.h>
13
#include <base/range.h>
14

15
#include <bitset>
16
#include <stack>
17

18

19
namespace DB
20
{
21

22
struct Settings;
23

24
namespace ErrorCodes
25
{
26
    extern const int ILLEGAL_TYPE_OF_ARGUMENT;
27
    extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
28
    extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION;
29
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
30
    extern const int TOO_SLOW;
31
    extern const int SYNTAX_ERROR;
32
    extern const int BAD_ARGUMENTS;
33
    extern const int LOGICAL_ERROR;
34
}
35

36
namespace
37
{
38

39
/// helper type for comparing `std::pair`s using solely the .first member
40
template <template <typename> class Comparator>
41
struct ComparePairFirst final
42
{
43
    template <typename T1, typename T2>
44
    bool operator()(const std::pair<T1, T2> & lhs, const std::pair<T1, T2> & rhs) const
45
    {
46
        return Comparator<T1>{}(lhs.first, rhs.first);
47
    }
48
};
49

50
constexpr size_t max_events = 32;
51

52
template <typename T>
53
struct AggregateFunctionSequenceMatchData final
54
{
55
    using Timestamp = T;
56
    using Events = std::bitset<max_events>;
57
    using TimestampEvents = std::pair<Timestamp, Events>;
58
    using Comparator = ComparePairFirst<std::less>;
59

60
    bool sorted = true;
61
    PODArrayWithStackMemory<TimestampEvents, 64> events_list;
62
    /// sequenceMatch conditions met at least once in events_list
63
    Events conditions_met;
64

65
    void add(const Timestamp timestamp, const Events & events)
66
    {
67
        /// store information exclusively for rows with at least one event
68
        if (events.any())
69
        {
70
            events_list.emplace_back(timestamp, events);
71
            sorted = false;
72
            conditions_met |= events;
73
        }
74
    }
75

76
    void merge(const AggregateFunctionSequenceMatchData & other)
77
    {
78
        if (other.events_list.empty())
79
            return;
80

81
        events_list.insert(std::begin(other.events_list), std::end(other.events_list));
82
        sorted = false;
83
        conditions_met |= other.conditions_met;
84
    }
85

86
    void sort()
87
    {
88
        if (sorted)
89
            return;
90

91
        ::sort(std::begin(events_list), std::end(events_list), Comparator{});
92
        sorted = true;
93
    }
94

95
    void serialize(WriteBuffer & buf) const
96
    {
97
        writeBinary(sorted, buf);
98
        writeBinary(events_list.size(), buf);
99

100
        for (const auto & events : events_list)
101
        {
102
            writeBinary(events.first, buf);
103
            writeBinary(events.second.to_ulong(), buf);
104
        }
105
    }
106

107
    void deserialize(ReadBuffer & buf)
108
    {
109
        readBinary(sorted, buf);
110

111
        size_t size;
112
        readBinary(size, buf);
113

114
        /// If we lose these flags, functionality is broken
115
        /// If we serialize/deserialize these flags, we have compatibility issues
116
        /// If we set these flags to 1, we have a minor performance penalty, which seems acceptable
117
        conditions_met.set();
118

119
        events_list.clear();
120
        events_list.reserve(size);
121

122
        for (size_t i = 0; i < size; ++i)
123
        {
124
            Timestamp timestamp;
125
            readBinary(timestamp, buf);
126

127
            UInt64 events;
128
            readBinary(events, buf);
129

130
            events_list.emplace_back(timestamp, Events{events});
131
        }
132
    }
133
};
134

135

136
/// Max number of iterations to match the pattern against a sequence, exception thrown when exceeded
137
constexpr auto sequence_match_max_iterations = 1000000;
138

139

140
template <typename T, typename Data, typename Derived>
141
class AggregateFunctionSequenceBase : public IAggregateFunctionDataHelper<Data, Derived>
142
{
143
public:
144
    AggregateFunctionSequenceBase(const DataTypes & arguments, const Array & params, const String & pattern_, const DataTypePtr & result_type_)
145
        : IAggregateFunctionDataHelper<Data, Derived>(arguments, params, result_type_)
146
        , pattern(pattern_)
147
    {
148
        arg_count = arguments.size();
149
        parsePattern();
150
    }
151

152
    void add(AggregateDataPtr __restrict place, const IColumn ** columns, const size_t row_num, Arena *) const override
153
    {
154
        const auto timestamp = assert_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num];
155

156
        typename Data::Events events;
157
        for (const auto i : collections::range(1, arg_count))
158
        {
159
            const auto event = assert_cast<const ColumnUInt8 *>(columns[i])->getData()[row_num];
160
            events.set(i - 1, event);
161
        }
162

163
        this->data(place).add(timestamp, events);
164
    }
165

166
    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
167
    {
168
        this->data(place).merge(this->data(rhs));
169
    }
170

171
    void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
172
    {
173
        this->data(place).serialize(buf);
174
    }
175

176
    void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
177
    {
178
        this->data(place).deserialize(buf);
179
    }
180

181
    bool haveSameStateRepresentationImpl(const IAggregateFunction & rhs) const override
182
    {
183
        return this->getName() == rhs.getName() && this->haveEqualArgumentTypes(rhs);
184
    }
185

186
private:
187
    enum class PatternActionType
188
    {
189
        SpecificEvent,
190
        AnyEvent,
191
        KleeneStar,
192
        TimeLessOrEqual,
193
        TimeLess,
194
        TimeGreaterOrEqual,
195
        TimeGreater,
196
        TimeEqual
197
    };
198

199
    struct PatternAction final
200
    {
201
        PatternActionType type;
202
        std::uint64_t extra;
203

204
        PatternAction() = default;
205
        explicit PatternAction(const PatternActionType type_, const std::uint64_t extra_ = 0) : type{type_}, extra{extra_} {}
206
    };
207

208
    using PatternActions = PODArrayWithStackMemory<PatternAction, 64>;
209

210
    Derived & derived() { return static_cast<Derived &>(*this); }
211

212
    void parsePattern()
213
    {
214
        actions.clear();
215
        actions.emplace_back(PatternActionType::KleeneStar);
216

217
        dfa_states.clear();
218
        dfa_states.emplace_back(true);
219

220
        pattern_has_time = false;
221

222
        const char * pos = pattern.data();
223
        const char * begin = pos;
224
        const char * end = pos + pattern.size();
225

226
        auto throw_exception = [&](const std::string & msg)
227
        {
228
            throw Exception(ErrorCodes::SYNTAX_ERROR, "{} '{}' at position {}", msg, std::string(pos, end), toString(pos - begin));
229
        };
230

231
        auto match = [&pos, end](const char * str) mutable
232
        {
233
            size_t length = strlen(str);
234
            if (pos + length <= end && 0 == memcmp(pos, str, length))
235
            {
236
                pos += length;
237
                return true;
238
            }
239
            return false;
240
        };
241

242
        while (pos < end)
243
        {
244
            if (match("(?"))
245
            {
246
                if (match("t"))
247
                {
248
                    PatternActionType type;
249

250
                    if (match("<="))
251
                        type = PatternActionType::TimeLessOrEqual;
252
                    else if (match("<"))
253
                        type = PatternActionType::TimeLess;
254
                    else if (match(">="))
255
                        type = PatternActionType::TimeGreaterOrEqual;
256
                    else if (match(">"))
257
                        type = PatternActionType::TimeGreater;
258
                    else if (match("=="))
259
                        type = PatternActionType::TimeEqual;
260
                    else
261
                        throw_exception("Unknown time condition");
262

263
                    UInt64 duration = 0;
264
                    const auto * prev_pos = pos;
265
                    pos = tryReadIntText(duration, pos, end);
266
                    if (pos == prev_pos)
267
                        throw_exception("Could not parse number");
268

269
                    if (actions.back().type != PatternActionType::SpecificEvent &&
270
                        actions.back().type != PatternActionType::AnyEvent &&
271
                        actions.back().type != PatternActionType::KleeneStar)
272
                        throw Exception(ErrorCodes::BAD_ARGUMENTS, "Temporal condition should be preceded by an event condition");
273

274
                    pattern_has_time = true;
275
                    actions.emplace_back(type, duration);
276
                }
277
                else
278
                {
279
                    UInt64 event_number = 0;
280
                    const auto * prev_pos = pos;
281
                    pos = tryReadIntText(event_number, pos, end);
282
                    if (pos == prev_pos)
283
                        throw_exception("Could not parse number");
284

285
                    if (event_number > arg_count - 1)
286
                        throw Exception(ErrorCodes::BAD_ARGUMENTS, "Event number {} is out of range", event_number);
287

288
                    actions.emplace_back(PatternActionType::SpecificEvent, event_number - 1);
289
                    dfa_states.back().transition = DFATransition::SpecificEvent;
290
                    dfa_states.back().event = static_cast<uint32_t>(event_number - 1);
291
                    dfa_states.emplace_back();
292
                    conditions_in_pattern.set(event_number - 1);
293
                }
294

295
                if (!match(")"))
296
                    throw_exception("Expected closing parenthesis, found");
297

298
            }
299
            else if (match(".*"))
300
            {
301
                actions.emplace_back(PatternActionType::KleeneStar);
302
                dfa_states.back().has_kleene = true;
303
            }
304
            else if (match("."))
305
            {
306
                actions.emplace_back(PatternActionType::AnyEvent);
307
                dfa_states.back().transition = DFATransition::AnyEvent;
308
                dfa_states.emplace_back();
309
            }
310
            else
311
                throw_exception("Could not parse pattern, unexpected starting symbol");
312
        }
313
    }
314

315
protected:
316
    /// Uses a DFA based approach in order to better handle patterns without
317
    /// time assertions.
318
    ///
319
    /// NOTE: This implementation relies on the assumption that the pattern is *small*.
320
    ///
321
    /// This algorithm performs in O(mn) (with m the number of DFA states and N the number
322
    /// of events) with a memory consumption and memory allocations in O(m). It means that
323
    /// if n >>> m (which is expected to be the case), this algorithm can be considered linear.
324
    template <typename EventEntry>
325
    bool dfaMatch(EventEntry & events_it, const EventEntry events_end) const
326
    {
327
        using ActiveStates = std::vector<bool>;
328

329
        /// Those two vectors keep track of which states should be considered for the current
330
        /// event as well as the states which should be considered for the next event.
331
        ActiveStates active_states(dfa_states.size(), false);
332
        ActiveStates next_active_states(dfa_states.size(), false);
333
        active_states[0] = true;
334

335
        /// Keeps track of dead-ends in order not to iterate over all the events to realize that
336
        /// the match failed.
337
        size_t n_active = 1;
338

339
        for (/* empty */; events_it != events_end && n_active > 0 && !active_states.back(); ++events_it)
340
        {
341
            n_active = 0;
342
            next_active_states.assign(dfa_states.size(), false);
343

344
            for (size_t state = 0; state < dfa_states.size(); ++state)
345
            {
346
                if (!active_states[state])
347
                {
348
                    continue;
349
                }
350

351
                switch (dfa_states[state].transition)
352
                {
353
                    case DFATransition::None:
354
                        break;
355
                    case DFATransition::AnyEvent:
356
                        next_active_states[state + 1] = true;
357
                        ++n_active;
358
                        break;
359
                    case DFATransition::SpecificEvent:
360
                        if (events_it->second.test(dfa_states[state].event))
361
                        {
362
                            next_active_states[state + 1] = true;
363
                            ++n_active;
364
                        }
365
                        break;
366
                }
367

368
                if (dfa_states[state].has_kleene)
369
                {
370
                    next_active_states[state] = true;
371
                    ++n_active;
372
                }
373
            }
374
            swap(active_states, next_active_states);
375
        }
376

377
        return active_states.back();
378
    }
379

380
    template <typename EventEntry>
381
    bool backtrackingMatch(EventEntry & events_it, const EventEntry events_end) const
382
    {
383
        const auto action_begin = std::begin(actions);
384
        const auto action_end = std::end(actions);
385
        auto action_it = action_begin;
386

387
        const auto events_begin = events_it;
388
        auto base_it = events_it;
389

390
        /// an iterator to action plus an iterator to row in events list plus timestamp at the start of sequence
391
        using backtrack_info = std::tuple<decltype(action_it), EventEntry, EventEntry>;
392
        std::stack<backtrack_info> back_stack;
393

394
        /// backtrack if possible
395
        const auto do_backtrack = [&]
396
        {
397
            while (!back_stack.empty())
398
            {
399
                auto & top = back_stack.top();
400

401
                action_it = std::get<0>(top);
402
                events_it = std::next(std::get<1>(top));
403
                base_it = std::get<2>(top);
404

405
                back_stack.pop();
406

407
                if (events_it != events_end)
408
                    return true;
409
            }
410

411
            return false;
412
        };
413

414
        size_t i = 0;
415
        while (action_it != action_end && events_it != events_end)
416
        {
417
            if (action_it->type == PatternActionType::SpecificEvent)
418
            {
419
                if (events_it->second.test(action_it->extra))
420
                {
421
                    /// move to the next action and events
422
                    base_it = events_it;
423
                    ++action_it, ++events_it;
424
                }
425
                else if (!do_backtrack())
426
                    /// backtracking failed, bail out
427
                    break;
428
            }
429
            else if (action_it->type == PatternActionType::AnyEvent)
430
            {
431
                base_it = events_it;
432
                ++action_it, ++events_it;
433
            }
434
            else if (action_it->type == PatternActionType::KleeneStar)
435
            {
436
                back_stack.emplace(action_it, events_it, base_it);
437
                base_it = events_it;
438
                ++action_it;
439
            }
440
            else if (action_it->type == PatternActionType::TimeLessOrEqual)
441
            {
442
                if (events_it->first <= base_it->first + action_it->extra)
443
                {
444
                    /// condition satisfied, move onto next action
445
                    back_stack.emplace(action_it, events_it, base_it);
446
                    base_it = events_it;
447
                    ++action_it;
448
                }
449
                else if (!do_backtrack())
450
                    break;
451
            }
452
            else if (action_it->type == PatternActionType::TimeLess)
453
            {
454
                if (events_it->first < base_it->first + action_it->extra)
455
                {
456
                    back_stack.emplace(action_it, events_it, base_it);
457
                    base_it = events_it;
458
                    ++action_it;
459
                }
460
                else if (!do_backtrack())
461
                    break;
462
            }
463
            else if (action_it->type == PatternActionType::TimeGreaterOrEqual)
464
            {
465
                if (events_it->first >= base_it->first + action_it->extra)
466
                {
467
                    back_stack.emplace(action_it, events_it, base_it);
468
                    base_it = events_it;
469
                    ++action_it;
470
                }
471
                else if (++events_it == events_end && !do_backtrack())
472
                    break;
473
            }
474
            else if (action_it->type == PatternActionType::TimeGreater)
475
            {
476
                if (events_it->first > base_it->first + action_it->extra)
477
                {
478
                    back_stack.emplace(action_it, events_it, base_it);
479
                    base_it = events_it;
480
                    ++action_it;
481
                }
482
                else if (++events_it == events_end && !do_backtrack())
483
                    break;
484
            }
485
            else if (action_it->type == PatternActionType::TimeEqual)
486
            {
487
                if (events_it->first == base_it->first + action_it->extra)
488
                {
489
                    back_stack.emplace(action_it, events_it, base_it);
490
                    base_it = events_it;
491
                    ++action_it;
492
                }
493
                else if (++events_it == events_end && !do_backtrack())
494
                    break;
495
            }
496
            else
497
                throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown PatternActionType");
498

499
            if (++i > sequence_match_max_iterations)
500
                throw Exception(ErrorCodes::TOO_SLOW, "Pattern application proves too difficult, exceeding max iterations ({})",
501
                    sequence_match_max_iterations);
502
        }
503

504
        /// if there are some actions remaining
505
        if (action_it != action_end)
506
        {
507
            /// match multiple empty strings at end
508
            while (action_it->type == PatternActionType::KleeneStar ||
509
                   action_it->type == PatternActionType::TimeLessOrEqual ||
510
                   action_it->type == PatternActionType::TimeLess ||
511
                   (action_it->type == PatternActionType::TimeGreaterOrEqual && action_it->extra == 0))
512
                ++action_it;
513
        }
514

515
        if (events_it == events_begin)
516
            ++events_it;
517

518
        return action_it == action_end;
519
    }
520

521
    /// Splits the pattern into deterministic parts separated by non-deterministic fragments
522
    /// (time constraints and Kleene stars), and tries to match the deterministic parts in their specified order,
523
    /// ignoring the non-deterministic fragments.
524
    /// This function can quickly check that a full match is not possible if some deterministic fragment is missing.
525
    template <typename EventEntry>
526
    bool couldMatchDeterministicParts(const EventEntry events_begin, const EventEntry events_end, bool limit_iterations = true) const
527
    {
528
        size_t events_processed = 0;
529
        auto events_it = events_begin;
530

531
        const auto actions_end = std::end(actions);
532
        auto actions_it = std::begin(actions);
533
        auto det_part_begin = actions_it;
534

535
        auto match_deterministic_part = [&events_it, events_end, &events_processed, det_part_begin, actions_it, limit_iterations]()
536
        {
537
            auto events_it_init = events_it;
538
            auto det_part_it = det_part_begin;
539

540
            while (det_part_it != actions_it && events_it != events_end)
541
            {
542
                /// matching any event
543
                if (det_part_it->type == PatternActionType::AnyEvent)
544
                    ++events_it, ++det_part_it;
545

546
                /// matching specific event
547
                else
548
                {
549
                    if (events_it->second.test(det_part_it->extra))
550
                        ++events_it, ++det_part_it;
551

552
                    /// abandon current matching, try to match the deterministic fragment further in the list
553
                    else
554
                    {
555
                        events_it = ++events_it_init;
556
                        det_part_it = det_part_begin;
557
                    }
558
                }
559

560
                if (limit_iterations && ++events_processed > sequence_match_max_iterations)
561
                    throw Exception(ErrorCodes::TOO_SLOW, "Pattern application proves too difficult, exceeding max iterations ({})",
562
                        sequence_match_max_iterations);
563
            }
564

565
            return det_part_it == actions_it;
566
        };
567

568
        for (; actions_it != actions_end; ++actions_it)
569
            if (actions_it->type != PatternActionType::SpecificEvent && actions_it->type != PatternActionType::AnyEvent)
570
            {
571
                if (!match_deterministic_part())
572
                    return false;
573
                det_part_begin = std::next(actions_it);
574
            }
575

576
        return match_deterministic_part();
577
    }
578

579
private:
580
    enum class DFATransition : char
581
    {
582
        ///   .-------.
583
        ///   |       |
584
        ///   `-------'
585
        None,
586
        ///   .-------.  (?[0-9])
587
        ///   |       | ----------
588
        ///   `-------'
589
        SpecificEvent,
590
        ///   .-------.      .
591
        ///   |       | ----------
592
        ///   `-------'
593
        AnyEvent,
594
    };
595

596
    struct DFAState
597
    {
598
        explicit DFAState(bool has_kleene_ = false)
599
            : has_kleene{has_kleene_}, event{0}, transition{DFATransition::None}
600
        {}
601

602
        ///   .-------.
603
        ///   |       | - - -
604
        ///   `-------'
605
        ///     |_^
606
        bool has_kleene;
607
        /// In the case of a state transitions with a `SpecificEvent`,
608
        /// `event` contains the value of the event.
609
        uint32_t event;
610
        /// The kind of transition out of this state.
611
        DFATransition transition;
612
    };
613

614
    using DFAStates = std::vector<DFAState>;
615

616
protected:
617
    /// `True` if the parsed pattern contains time assertions (?t...), `false` otherwise.
618
    bool pattern_has_time;
619
    /// sequenceMatch conditions met at least once in the pattern
620
    std::bitset<max_events> conditions_in_pattern;
621

622
private:
623
    std::string pattern;
624
    size_t arg_count;
625
    PatternActions actions;
626

627
    DFAStates dfa_states;
628
};
629

630
template <typename T, typename Data>
631
class AggregateFunctionSequenceMatch final : public AggregateFunctionSequenceBase<T, Data, AggregateFunctionSequenceMatch<T, Data>>
632
{
633
public:
634
    AggregateFunctionSequenceMatch(const DataTypes & arguments, const Array & params, const String & pattern_)
635
        : AggregateFunctionSequenceBase<T, Data, AggregateFunctionSequenceMatch<T, Data>>(arguments, params, pattern_, std::make_shared<DataTypeUInt8>()) {}
636

637
    using AggregateFunctionSequenceBase<T, Data, AggregateFunctionSequenceMatch<T, Data>>::AggregateFunctionSequenceBase;
638

639
    String getName() const override { return "sequenceMatch"; }
640

641
    bool allocatesMemoryInArena() const override { return false; }
642

643
    void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
644
    {
645
        auto & output = assert_cast<ColumnUInt8 &>(to).getData();
646
        if ((this->conditions_in_pattern & this->data(place).conditions_met) != this->conditions_in_pattern)
647
        {
648
            output.push_back(false);
649
            return;
650
        }
651
        this->data(place).sort();
652

653
        const auto & data_ref = this->data(place);
654

655
        const auto events_begin = std::begin(data_ref.events_list);
656
        const auto events_end = std::end(data_ref.events_list);
657
        auto events_it = events_begin;
658

659
        bool match = (this->pattern_has_time ?
660
            (this->couldMatchDeterministicParts(events_begin, events_end) && this->backtrackingMatch(events_it, events_end)) :
661
            this->dfaMatch(events_it, events_end));
662
        output.push_back(match);
663
    }
664
};
665

666
template <typename T, typename Data>
667
class AggregateFunctionSequenceCount final : public AggregateFunctionSequenceBase<T, Data, AggregateFunctionSequenceCount<T, Data>>
668
{
669
public:
670
    AggregateFunctionSequenceCount(const DataTypes & arguments, const Array & params, const String & pattern_)
671
        : AggregateFunctionSequenceBase<T, Data, AggregateFunctionSequenceCount<T, Data>>(arguments, params, pattern_, std::make_shared<DataTypeUInt64>()) {}
672

673
    using AggregateFunctionSequenceBase<T, Data, AggregateFunctionSequenceCount<T, Data>>::AggregateFunctionSequenceBase;
674

675
    String getName() const override { return "sequenceCount"; }
676

677
    bool allocatesMemoryInArena() const override { return false; }
678

679
    void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
680
    {
681
        auto & output = assert_cast<ColumnUInt64 &>(to).getData();
682
        if ((this->conditions_in_pattern & this->data(place).conditions_met) != this->conditions_in_pattern)
683
        {
684
            output.push_back(0);
685
            return;
686
        }
687
        this->data(place).sort();
688
        output.push_back(count(place));
689
    }
690

691
private:
692
    UInt64 count(ConstAggregateDataPtr __restrict place) const
693
    {
694
        const auto & data_ref = this->data(place);
695

696
        const auto events_begin = std::begin(data_ref.events_list);
697
        const auto events_end = std::end(data_ref.events_list);
698
        auto events_it = events_begin;
699

700
        size_t count = 0;
701
        // check if there is a chance of matching the sequence at least once
702
        if (this->couldMatchDeterministicParts(events_begin, events_end))
703
        {
704
            while (events_it != events_end && this->backtrackingMatch(events_it, events_end))
705
                ++count;
706
        }
707

708
        return count;
709
    }
710
};
711

712

713
template <template <typename, typename> typename AggregateFunction, template <typename> typename Data>
714
AggregateFunctionPtr createAggregateFunctionSequenceBase(
715
    const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
716
{
717
    if (params.size() != 1)
718
        throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Aggregate function {} requires exactly one parameter.",
719
            name);
720

721
    const auto arg_count = argument_types.size();
722

723
    if (arg_count < 3)
724
        throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION, "Aggregate function {} requires at least 3 arguments.",
725
            name);
726

727
    if (arg_count - 1 > max_events)
728
        throw Exception(ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION, "Aggregate function {} supports up to {} event arguments.", name, max_events);
729

730
    const auto * time_arg = argument_types.front().get();
731

732
    for (const auto i : collections::range(1, arg_count))
733
    {
734
        const auto * cond_arg = argument_types[i].get();
735
        if (!isUInt8(cond_arg))
736
            throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
737
                            "Illegal type {} of argument {} of aggregate function {}, must be UInt8",
738
                            cond_arg->getName(), toString(i + 1), name);
739
    }
740

741
    String pattern = params.front().safeGet<std::string>();
742

743
    AggregateFunctionPtr res(createWithUnsignedIntegerType<AggregateFunction, Data>(*argument_types[0], argument_types, params, pattern));
744
    if (res)
745
        return res;
746

747
    WhichDataType which(argument_types.front().get());
748
    if (which.isDateTime())
749
        return std::make_shared<AggregateFunction<DataTypeDateTime::FieldType, Data<DataTypeDateTime::FieldType>>>(argument_types, params, pattern);
750
    else if (which.isDate())
751
        return std::make_shared<AggregateFunction<DataTypeDate::FieldType, Data<DataTypeDate::FieldType>>>(argument_types, params, pattern);
752

753
    throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
754
                    "Illegal type {} of first argument of aggregate function {}, must be DateTime",
755
                    time_arg->getName(), name);
756
}
757

758
}
759

760
void registerAggregateFunctionsSequenceMatch(AggregateFunctionFactory & factory)
761
{
762
    factory.registerFunction("sequenceMatch", createAggregateFunctionSequenceBase<AggregateFunctionSequenceMatch, AggregateFunctionSequenceMatchData>);
763
    factory.registerFunction("sequenceCount", createAggregateFunctionSequenceBase<AggregateFunctionSequenceCount, AggregateFunctionSequenceMatchData>);
764
}
765

766
}
767

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.