Emcee

Форк
0
/
StuckBucketsPoller.swift 
108 строк · 3.8 Кб
1
import BalancingBucketQueue
2
import BucketQueue
3
import BucketQueueModels
4
import DateProvider
5
import Foundation
6
import EmceeLogging
7
import Metrics
8
import MetricsExtensions
9
import QueueModels
10
import ScheduleStrategy
11
import Timer
12

13
public final class StuckBucketsPoller {
14
    private let dateProvider: DateProvider
15
    private let globalMetricRecorder: GlobalMetricRecorder
16
    private let jobStateProvider: JobStateProvider
17
    private let logger: ContextualLogger
18
    private let queueHostname: String
19
    private let specificMetricRecorderProvider: SpecificMetricRecorderProvider
20
    private let statefulBucketQueue: StatefulBucketQueue
21
    private let stuckBucketsReenqueuer: StuckBucketsReenqueuer
22
    private let stuckBucketsTrigger = DispatchBasedTimer(repeating: .seconds(5), leeway: .seconds(5))
23
    private let version: Version
24
    
25
    public init(
26
        dateProvider: DateProvider,
27
        globalMetricRecorder: GlobalMetricRecorder,
28
        jobStateProvider: JobStateProvider,
29
        logger: ContextualLogger,
30
        queueHostname: String,
31
        specificMetricRecorderProvider: SpecificMetricRecorderProvider,
32
        statefulBucketQueue: StatefulBucketQueue,
33
        stuckBucketsReenqueuer: StuckBucketsReenqueuer,
34
        version: Version
35
    ) {
36
        self.dateProvider = dateProvider
37
        self.globalMetricRecorder = globalMetricRecorder
38
        self.jobStateProvider = jobStateProvider
39
        self.logger = logger
40
        self.queueHostname = queueHostname
41
        self.specificMetricRecorderProvider = specificMetricRecorderProvider
42
        self.statefulBucketQueue = statefulBucketQueue
43
        self.stuckBucketsReenqueuer = stuckBucketsReenqueuer
44
        self.version = version
45
    }
46
    
47
    public func startTrackingStuckBuckets() {
48
        stuckBucketsTrigger.start { [weak self] _ in
49
            self?.processStuckBuckets()
50
        }
51
    }
52
    
53
    /// internal for testing
54
    func processStuckBuckets() {
55
        let stuckBuckets: [StuckBucket]
56
        do {
57
            stuckBuckets = try stuckBucketsReenqueuer.reenqueueStuckBuckets()
58
        } catch {
59
            return logger.error("Failed to reenqueue stuck buckets: \(error)")
60
        }
61
        
62
        guard !stuckBuckets.isEmpty else { return }
63
        
64
        let stuckBucketMetrics: [StuckBucketsMetric] = stuckBuckets.map {
65
            StuckBucketsMetric(
66
                workerId: $0.workerId,
67
                reason: $0.reason.metricParameterName,
68
                version: version,
69
                queueHost: queueHostname,
70
                count: 1,
71
                timestamp: dateProvider.currentDate()
72
            )
73
        }
74
        
75
        logger.warning("Detected stuck buckets:")
76
        for stuckBucket in stuckBuckets {
77
            logger.warning("-- Bucket \(stuckBucket.bucket.bucketId) is stuck with worker '\(stuckBucket.workerId)': \(stuckBucket.reason)")
78
            do {
79
                try specificMetricRecorderProvider.specificMetricRecorder(
80
                    analyticsConfiguration: stuckBucket.bucket.analyticsConfiguration
81
                ).capture(stuckBucketMetrics)
82
            } catch {
83
                logger.error("Failed to send metrics: \(error)")
84
            }
85
        }
86
        
87
        let queueStateMetricGatherer = QueueStateMetricGatherer(
88
            dateProvider: dateProvider,
89
            queueHost: queueHostname,
90
            version: version
91
        )
92
        globalMetricRecorder.capture(
93
            queueStateMetricGatherer.metrics(
94
                jobStates: jobStateProvider.allJobStates,
95
                runningQueueState: statefulBucketQueue.runningQueueState
96
            )
97
        )
98
    }
99
}
100

101
private extension StuckBucket.Reason {
102
    var metricParameterName: String {
103
        switch self {
104
        case .workerIsSilent: return "workerIsSilent"
105
        case .bucketLost: return "bucketLost"
106
        }
107
    }
108
}
109

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.