Emcee
108 строк · 3.8 Кб
1import BalancingBucketQueue
2import BucketQueue
3import BucketQueueModels
4import DateProvider
5import Foundation
6import EmceeLogging
7import Metrics
8import MetricsExtensions
9import QueueModels
10import ScheduleStrategy
11import Timer
12
13public final class StuckBucketsPoller {
14private let dateProvider: DateProvider
15private let globalMetricRecorder: GlobalMetricRecorder
16private let jobStateProvider: JobStateProvider
17private let logger: ContextualLogger
18private let queueHostname: String
19private let specificMetricRecorderProvider: SpecificMetricRecorderProvider
20private let statefulBucketQueue: StatefulBucketQueue
21private let stuckBucketsReenqueuer: StuckBucketsReenqueuer
22private let stuckBucketsTrigger = DispatchBasedTimer(repeating: .seconds(5), leeway: .seconds(5))
23private let version: Version
24
25public init(
26dateProvider: DateProvider,
27globalMetricRecorder: GlobalMetricRecorder,
28jobStateProvider: JobStateProvider,
29logger: ContextualLogger,
30queueHostname: String,
31specificMetricRecorderProvider: SpecificMetricRecorderProvider,
32statefulBucketQueue: StatefulBucketQueue,
33stuckBucketsReenqueuer: StuckBucketsReenqueuer,
34version: Version
35) {
36self.dateProvider = dateProvider
37self.globalMetricRecorder = globalMetricRecorder
38self.jobStateProvider = jobStateProvider
39self.logger = logger
40self.queueHostname = queueHostname
41self.specificMetricRecorderProvider = specificMetricRecorderProvider
42self.statefulBucketQueue = statefulBucketQueue
43self.stuckBucketsReenqueuer = stuckBucketsReenqueuer
44self.version = version
45}
46
47public func startTrackingStuckBuckets() {
48stuckBucketsTrigger.start { [weak self] _ in
49self?.processStuckBuckets()
50}
51}
52
53/// internal for testing
54func processStuckBuckets() {
55let stuckBuckets: [StuckBucket]
56do {
57stuckBuckets = try stuckBucketsReenqueuer.reenqueueStuckBuckets()
58} catch {
59return logger.error("Failed to reenqueue stuck buckets: \(error)")
60}
61
62guard !stuckBuckets.isEmpty else { return }
63
64let stuckBucketMetrics: [StuckBucketsMetric] = stuckBuckets.map {
65StuckBucketsMetric(
66workerId: $0.workerId,
67reason: $0.reason.metricParameterName,
68version: version,
69queueHost: queueHostname,
70count: 1,
71timestamp: dateProvider.currentDate()
72)
73}
74
75logger.warning("Detected stuck buckets:")
76for stuckBucket in stuckBuckets {
77logger.warning("-- Bucket \(stuckBucket.bucket.bucketId) is stuck with worker '\(stuckBucket.workerId)': \(stuckBucket.reason)")
78do {
79try specificMetricRecorderProvider.specificMetricRecorder(
80analyticsConfiguration: stuckBucket.bucket.analyticsConfiguration
81).capture(stuckBucketMetrics)
82} catch {
83logger.error("Failed to send metrics: \(error)")
84}
85}
86
87let queueStateMetricGatherer = QueueStateMetricGatherer(
88dateProvider: dateProvider,
89queueHost: queueHostname,
90version: version
91)
92globalMetricRecorder.capture(
93queueStateMetricGatherer.metrics(
94jobStates: jobStateProvider.allJobStates,
95runningQueueState: statefulBucketQueue.runningQueueState
96)
97)
98}
99}
100
101private extension StuckBucket.Reason {
102var metricParameterName: String {
103switch self {
104case .workerIsSilent: return "workerIsSilent"
105case .bucketLost: return "bucketLost"
106}
107}
108}
109