Emcee

Форк
0
/
StartQueueServerCommand.swift 
270 строк · 11.5 Кб
1
import ArgLib
2
import AutomaticTermination
3
import EmceeDI
4
import Deployer
5
import DistWorkerModels
6
import EmceeLogging
7
import EmceeVersion
8
import Foundation
9
import LocalQueueServerRunner
10
import MetricsExtensions
11
import PortDeterminer
12
import QueueCommunication
13
import QueueModels
14
import QueueServer
15
import QueueServerConfiguration
16
import QueueServerPortProvider
17
import RemotePortDeterminer
18
import ScheduleStrategy
19
import Tmp
20
import UniqueIdentifierGenerator
21
import WorkerAlivenessProvider
22
import WorkerCapabilities
23

24
public final class StartQueueServerCommand: Command {
25
    public let name = "startLocalQueueServer"
26
    public let description = "Starts queue server on local machine. This mode waits for jobs to be scheduled via REST API."
27
    public let arguments: Arguments = [
28
        ArgumentDescriptions.emceeVersion.asOptional,
29
        ArgumentDescriptions.queueServerConfigurationLocation.asRequired,
30
        ArgumentDescriptions.hostname.asRequired,
31
    ]
32

33
    private let deployQueue = OperationQueue.create(
34
        name: "StartQueueServerCommand.deployQueue",
35
        maxConcurrentOperationCount: 20,
36
        qualityOfService: .default
37
    )
38
    
39
    private let di: DI
40

41
    public init(di: DI) throws {
42
        self.di = di
43
    }
44
    
45
    public func run(payload: CommandPayload) throws {
46
        let hostname: String = try payload.expectedSingleTypedValue(argumentName: ArgumentDescriptions.hostname.name)
47
        try HostnameSetup.update(hostname: hostname, di: di)
48
    
49
        let emceeVersion: Version = try payload.optionalSingleTypedValue(argumentName: ArgumentDescriptions.emceeVersion.name) ?? EmceeVersion.version
50
        let queueServerConfiguration = try ArgumentsReader.queueServerConfiguration(
51
            location: try payload.expectedSingleTypedValue(argumentName: ArgumentDescriptions.queueServerConfigurationLocation.name),
52
            resourceLocationResolver: try di.get()
53
        )
54
        
55
        try di.get(GlobalMetricRecorder.self).set(
56
            analyticsConfiguration: queueServerConfiguration.globalAnalyticsConfiguration
57
        )
58
        if let kibanaConfiguration = queueServerConfiguration.globalAnalyticsConfiguration.kibanaConfiguration {
59
            try di.get(LoggingSetup.self).set(kibanaConfiguration: kibanaConfiguration)
60
        }
61
        di.set(
62
            try di.get(ContextualLogger.self).with(
63
                analyticsConfiguration: queueServerConfiguration.globalAnalyticsConfiguration
64
            )
65
        )
66
        
67
        di.set(
68
            BucketGeneratorImpl(uniqueIdentifierGenerator: try di.get()),
69
            for: BucketGenerator.self
70
        )
71

72
        try startQueueServer(
73
            emceeVersion: emceeVersion,
74
            hostname: hostname,
75
            queueServerConfiguration: queueServerConfiguration,
76
            workerDestinations: queueServerConfiguration.workerDeploymentDestinations,
77
            logger: try di.get()
78
        )
79
    }
80
    
81
    private func startQueueServer(
82
        emceeVersion: Version,
83
        hostname: String,
84
        queueServerConfiguration: QueueServerConfiguration,
85
        workerDestinations: [DeploymentDestination],
86
        logger: ContextualLogger
87
    ) throws {
88
        di.set(
89
            PayloadSignature(value: try di.get(UniqueIdentifierGenerator.self).generate())
90
        )
91
        logger.trace("Generated payload signature: \(try di.get(PayloadSignature.self))")
92
        
93
        let automaticTerminationController = AutomaticTerminationControllerFactory(
94
            automaticTerminationPolicy: queueServerConfiguration.queueServerTerminationPolicy
95
        ).createAutomaticTerminationController()
96
        
97
        let queueServerPortProvider = SourcableQueueServerPortProvider()
98
        
99
        let remotePortDeterminer = RemoteQueuePortScanner(
100
            hosts: queueServerConfiguration.queueServerDeploymentDestinations.map(\.host),
101
            logger: logger,
102
            portRange: queueServerConfiguration.portRange.closedRange,
103
            requestSenderProvider: try di.get()
104
        )
105
        let queueCommunicationService = DefaultQueueCommunicationService(
106
            logger: logger,
107
            remoteQueueDetector: DefaultRemoteQueueDetector(
108
                emceeVersion: emceeVersion,
109
                logger: logger,
110
                remotePortDeterminer: remotePortDeterminer
111
            ),
112
            requestSenderProvider: try di.get(),
113
            requestTimeout: 10
114
        )
115
        let autoupdatingWorkerPermissionProvider = AutoupdatingWorkerPermissionProviderImpl(
116
            communicationService: queueCommunicationService,
117
            initialWorkerIds: Set(workerDestinations.map { $0.workerId }),
118
            emceeVersion: emceeVersion,
119
            logger: logger,
120
            globalMetricRecorder: try di.get(),
121
            queueHost: hostname,
122
            queueServerPortProvider: queueServerPortProvider
123
        )
124
        
125
        let workersToUtilizeService = DefaultWorkersToUtilizeService(
126
            cache: DefaultWorkersMappingCache(
127
                cacheIvalidationTime: 300,
128
                dateProvider: try di.get(),
129
                logger: logger
130
            ),
131
            calculator: DefaultWorkersToUtilizeCalculator(logger: logger),
132
            communicationService: queueCommunicationService,
133
            logger: logger,
134
            portDeterminer: remotePortDeterminer
135
        )
136
        
137
        let remoteWorkerStarterProvider = try createRemoteWorkerStarterProvider(
138
            di: di,
139
            emceeVersion: emceeVersion,
140
            logger: logger,
141
            workerStartMode: queueServerConfiguration.workerStartMode,
142
            workerDestinations: workerDestinations
143
        )
144
        let workerConfigurations = try createWorkerConfigurations(
145
            queueServerConfiguration: queueServerConfiguration
146
        )
147
        
148
        let numberOfParallelBuckets = queueServerConfiguration.workerDeploymentDestinations.reduce(into: 1, { result, keyValue in
149
            result += keyValue.configuration?.numberOfSimulators ?? queueServerConfiguration.defaultWorkerConfiguration?.numberOfSimulators ?? 0
150
        })
151
        
152
        let queueServer = QueueServerImpl(
153
            automaticTerminationController: automaticTerminationController,
154
            autoupdatingWorkerPermissionProvider: autoupdatingWorkerPermissionProvider,
155
            bucketGenerator: try di.get(),
156
            bucketSplitInfo: BucketSplitInfo(
157
                numberOfWorkers: UInt(queueServerConfiguration.workerDeploymentDestinations.count),
158
                numberOfParallelBuckets: UInt(numberOfParallelBuckets)
159
            ),
160
            checkAgainTimeInterval: queueServerConfiguration.checkAgainTimeInterval,
161
            dateProvider: try di.get(),
162
            emceeVersion: emceeVersion,
163
            hostname: hostname,
164
            localPortDeterminer: LocalPortDeterminer(
165
                logger: logger,
166
                portRange: queueServerConfiguration.portRange.closedRange
167
            ),
168
            logger: logger,
169
            globalMetricRecorder: try di.get(),
170
            specificMetricRecorderProvider: try di.get(),
171
            onDemandWorkerStarter: OnDemandWorkerStarterViaDeployer(
172
                hostname: hostname,
173
                queueServerPortProvider: queueServerPortProvider,
174
                remoteWorkerStarterProvider: remoteWorkerStarterProvider
175
            ),
176
            payloadSignature: try di.get(),
177
            queueServerLock: AutomaticTerminationControllerAwareQueueServerLock(
178
                automaticTerminationController: automaticTerminationController
179
            ),
180
            requestSenderProvider: try di.get(),
181
            uniqueIdentifierGenerator: try di.get(),
182
            workerAlivenessProvider: WorkerAlivenessProviderImpl(
183
                logger: logger,
184
                workerPermissionProvider: autoupdatingWorkerPermissionProvider
185
            ),
186
            workerCapabilitiesStorage: WorkerCapabilitiesStorageImpl(),
187
            workerConfigurations: workerConfigurations,
188
            workerIds: Set(workerDestinations.map { $0.workerId }),
189
            workersToUtilizeService: workersToUtilizeService,
190
            useOnlyIPv4: queueServerConfiguration.useOnlyIPv4
191
        )
192
        queueServerPortProvider.source = queueServer.queueServerPortProvider
193
        
194
        let pollPeriod: TimeInterval = 5.0
195
        let queueServerTerminationWaiter = QueueServerTerminationWaiterImpl(
196
            logger: logger,
197
            pollInterval: pollPeriod,
198
            queueServerTerminationPolicy: queueServerConfiguration.queueServerTerminationPolicy
199
        )
200
        
201
        let localQueueServerRunner = LocalQueueServerRunner(
202
            automaticTerminationController: automaticTerminationController,
203
            deployQueue: deployQueue,
204
            hostname: hostname,
205
            logger: logger,
206
            newWorkerRegistrationTimeAllowance: 360.0,
207
            pollPeriod: pollPeriod,
208
            queueServer: queueServer,
209
            queueServerTerminationPolicy: queueServerConfiguration.queueServerTerminationPolicy,
210
            queueServerTerminationWaiter: queueServerTerminationWaiter,
211
            remotePortDeterminer: remotePortDeterminer,
212
            remoteWorkerStarterProvider: remoteWorkerStarterProvider,
213
            workerIds: Set(workerDestinations.map { $0.workerId }),
214
            autoupdatingWorkerPermissionProvider: autoupdatingWorkerPermissionProvider
215
        )
216
        try localQueueServerRunner.start(emceeVersion: emceeVersion)
217
    }
218
    
219
    private func createRemoteWorkerStarterProvider(
220
        di: DI,
221
        emceeVersion: Version,
222
        logger: ContextualLogger,
223
        workerStartMode: WorkerStartMode,
224
        workerDestinations: [DeploymentDestination]
225
    ) throws -> RemoteWorkerStarterProvider {
226
        switch workerStartMode {
227
        case .queueStartsItsWorkersOverSshAndLaunchd:
228
            return DefaultRemoteWorkerStarterProvider(
229
                emceeVersion: emceeVersion,
230
                fileSystem: try di.get(),
231
                logger: logger,
232
                sshClientProvider: try di.get(),
233
                tempFolder: try TemporaryFolder(),
234
                uniqueIdentifierGenerator: try di.get(),
235
                workerDeploymentDestinations: workerDestinations,
236
                zipCompressor: try di.get()
237
            )
238
        case .unknownWayOfStartingWorkers:
239
            return NoOpRemoteWorkerStarterProvider(logger: logger)
240
        }
241
    }
242
    
243
    private func createWorkerConfigurations(
244
        queueServerConfiguration: QueueServerConfiguration
245
    ) throws -> WorkerConfigurations {
246
        var configurations: WorkerConfigurations = FixedWorkerConfigurations()
247
        if let defaultWorkerConfiguration = queueServerConfiguration.defaultWorkerConfiguration {
248
            configurations = WorkerConfigurationsWithDefaultConfiguration(
249
                defaultConfiguration: queueServerConfiguration.workerConfiguration(
250
                    workerSpecificConfiguration: defaultWorkerConfiguration,
251
                    payloadSignature: try di.get()
252
                ),
253
                wrapped: configurations
254
            )
255
        }
256
        
257
        for deploymentDestination in queueServerConfiguration.workerDeploymentDestinations {
258
            if let workerSpecificConfiguration = deploymentDestination.configuration {
259
                configurations.add(
260
                    workerId: deploymentDestination.workerId,
261
                    configuration: queueServerConfiguration.workerConfiguration(
262
                        workerSpecificConfiguration: workerSpecificConfiguration,
263
                        payloadSignature: try di.get()
264
                    )
265
                )
266
            }
267
        }
268
        return configurations
269
    }
270
}
271

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.