2
import AutomaticTermination
9
import LocalQueueServerRunner
10
import MetricsExtensions
12
import QueueCommunication
15
import QueueServerConfiguration
16
import QueueServerPortProvider
17
import RemotePortDeterminer
18
import ScheduleStrategy
20
import UniqueIdentifierGenerator
21
import WorkerAlivenessProvider
22
import WorkerCapabilities
24
public final class StartQueueServerCommand: Command {
25
public let name = "startLocalQueueServer"
26
public let description = "Starts queue server on local machine. This mode waits for jobs to be scheduled via REST API."
27
public let arguments: Arguments = [
28
ArgumentDescriptions.emceeVersion.asOptional,
29
ArgumentDescriptions.queueServerConfigurationLocation.asRequired,
30
ArgumentDescriptions.hostname.asRequired,
33
private let deployQueue = OperationQueue.create(
34
name: "StartQueueServerCommand.deployQueue",
35
maxConcurrentOperationCount: 20,
36
qualityOfService: .default
41
public init(di: DI) throws {
45
public func run(payload: CommandPayload) throws {
46
let hostname: String = try payload.expectedSingleTypedValue(argumentName: ArgumentDescriptions.hostname.name)
47
try HostnameSetup.update(hostname: hostname, di: di)
49
let emceeVersion: Version = try payload.optionalSingleTypedValue(argumentName: ArgumentDescriptions.emceeVersion.name) ?? EmceeVersion.version
50
let queueServerConfiguration = try ArgumentsReader.queueServerConfiguration(
51
location: try payload.expectedSingleTypedValue(argumentName: ArgumentDescriptions.queueServerConfigurationLocation.name),
52
resourceLocationResolver: try di.get()
55
try di.get(GlobalMetricRecorder.self).set(
56
analyticsConfiguration: queueServerConfiguration.globalAnalyticsConfiguration
58
if let kibanaConfiguration = queueServerConfiguration.globalAnalyticsConfiguration.kibanaConfiguration {
59
try di.get(LoggingSetup.self).set(kibanaConfiguration: kibanaConfiguration)
62
try di.get(ContextualLogger.self).with(
63
analyticsConfiguration: queueServerConfiguration.globalAnalyticsConfiguration
68
BucketGeneratorImpl(uniqueIdentifierGenerator: try di.get()),
69
for: BucketGenerator.self
73
emceeVersion: emceeVersion,
75
queueServerConfiguration: queueServerConfiguration,
76
workerDestinations: queueServerConfiguration.workerDeploymentDestinations,
81
private func startQueueServer(
82
emceeVersion: Version,
84
queueServerConfiguration: QueueServerConfiguration,
85
workerDestinations: [DeploymentDestination],
86
logger: ContextualLogger
89
PayloadSignature(value: try di.get(UniqueIdentifierGenerator.self).generate())
91
logger.trace("Generated payload signature: \(try di.get(PayloadSignature.self))")
93
let automaticTerminationController = AutomaticTerminationControllerFactory(
94
automaticTerminationPolicy: queueServerConfiguration.queueServerTerminationPolicy
95
).createAutomaticTerminationController()
97
let queueServerPortProvider = SourcableQueueServerPortProvider()
99
let remotePortDeterminer = RemoteQueuePortScanner(
100
hosts: queueServerConfiguration.queueServerDeploymentDestinations.map(\.host),
102
portRange: queueServerConfiguration.portRange.closedRange,
103
requestSenderProvider: try di.get()
105
let queueCommunicationService = DefaultQueueCommunicationService(
107
remoteQueueDetector: DefaultRemoteQueueDetector(
108
emceeVersion: emceeVersion,
110
remotePortDeterminer: remotePortDeterminer
112
requestSenderProvider: try di.get(),
115
let autoupdatingWorkerPermissionProvider = AutoupdatingWorkerPermissionProviderImpl(
116
communicationService: queueCommunicationService,
117
initialWorkerIds: Set(workerDestinations.map { $0.workerId }),
118
emceeVersion: emceeVersion,
120
globalMetricRecorder: try di.get(),
122
queueServerPortProvider: queueServerPortProvider
125
let workersToUtilizeService = DefaultWorkersToUtilizeService(
126
cache: DefaultWorkersMappingCache(
127
cacheIvalidationTime: 300,
128
dateProvider: try di.get(),
131
calculator: DefaultWorkersToUtilizeCalculator(logger: logger),
132
communicationService: queueCommunicationService,
134
portDeterminer: remotePortDeterminer
137
let remoteWorkerStarterProvider = try createRemoteWorkerStarterProvider(
139
emceeVersion: emceeVersion,
141
workerStartMode: queueServerConfiguration.workerStartMode,
142
workerDestinations: workerDestinations
144
let workerConfigurations = try createWorkerConfigurations(
145
queueServerConfiguration: queueServerConfiguration
148
let numberOfParallelBuckets = queueServerConfiguration.workerDeploymentDestinations.reduce(into: 1, { result, keyValue in
149
result += keyValue.configuration?.numberOfSimulators ?? queueServerConfiguration.defaultWorkerConfiguration?.numberOfSimulators ?? 0
152
let queueServer = QueueServerImpl(
153
automaticTerminationController: automaticTerminationController,
154
autoupdatingWorkerPermissionProvider: autoupdatingWorkerPermissionProvider,
155
bucketGenerator: try di.get(),
156
bucketSplitInfo: BucketSplitInfo(
157
numberOfWorkers: UInt(queueServerConfiguration.workerDeploymentDestinations.count),
158
numberOfParallelBuckets: UInt(numberOfParallelBuckets)
160
checkAgainTimeInterval: queueServerConfiguration.checkAgainTimeInterval,
161
dateProvider: try di.get(),
162
emceeVersion: emceeVersion,
164
localPortDeterminer: LocalPortDeterminer(
166
portRange: queueServerConfiguration.portRange.closedRange
169
globalMetricRecorder: try di.get(),
170
specificMetricRecorderProvider: try di.get(),
171
onDemandWorkerStarter: OnDemandWorkerStarterViaDeployer(
173
queueServerPortProvider: queueServerPortProvider,
174
remoteWorkerStarterProvider: remoteWorkerStarterProvider
176
payloadSignature: try di.get(),
177
queueServerLock: AutomaticTerminationControllerAwareQueueServerLock(
178
automaticTerminationController: automaticTerminationController
180
requestSenderProvider: try di.get(),
181
uniqueIdentifierGenerator: try di.get(),
182
workerAlivenessProvider: WorkerAlivenessProviderImpl(
184
workerPermissionProvider: autoupdatingWorkerPermissionProvider
186
workerCapabilitiesStorage: WorkerCapabilitiesStorageImpl(),
187
workerConfigurations: workerConfigurations,
188
workerIds: Set(workerDestinations.map { $0.workerId }),
189
workersToUtilizeService: workersToUtilizeService,
190
useOnlyIPv4: queueServerConfiguration.useOnlyIPv4
192
queueServerPortProvider.source = queueServer.queueServerPortProvider
194
let pollPeriod: TimeInterval = 5.0
195
let queueServerTerminationWaiter = QueueServerTerminationWaiterImpl(
197
pollInterval: pollPeriod,
198
queueServerTerminationPolicy: queueServerConfiguration.queueServerTerminationPolicy
201
let localQueueServerRunner = LocalQueueServerRunner(
202
automaticTerminationController: automaticTerminationController,
203
deployQueue: deployQueue,
206
newWorkerRegistrationTimeAllowance: 360.0,
207
pollPeriod: pollPeriod,
208
queueServer: queueServer,
209
queueServerTerminationPolicy: queueServerConfiguration.queueServerTerminationPolicy,
210
queueServerTerminationWaiter: queueServerTerminationWaiter,
211
remotePortDeterminer: remotePortDeterminer,
212
remoteWorkerStarterProvider: remoteWorkerStarterProvider,
213
workerIds: Set(workerDestinations.map { $0.workerId }),
214
autoupdatingWorkerPermissionProvider: autoupdatingWorkerPermissionProvider
216
try localQueueServerRunner.start(emceeVersion: emceeVersion)
219
private func createRemoteWorkerStarterProvider(
221
emceeVersion: Version,
222
logger: ContextualLogger,
223
workerStartMode: WorkerStartMode,
224
workerDestinations: [DeploymentDestination]
225
) throws -> RemoteWorkerStarterProvider {
226
switch workerStartMode {
227
case .queueStartsItsWorkersOverSshAndLaunchd:
228
return DefaultRemoteWorkerStarterProvider(
229
emceeVersion: emceeVersion,
230
fileSystem: try di.get(),
232
sshClientProvider: try di.get(),
233
tempFolder: try TemporaryFolder(),
234
uniqueIdentifierGenerator: try di.get(),
235
workerDeploymentDestinations: workerDestinations,
236
zipCompressor: try di.get()
238
case .unknownWayOfStartingWorkers:
239
return NoOpRemoteWorkerStarterProvider(logger: logger)
243
private func createWorkerConfigurations(
244
queueServerConfiguration: QueueServerConfiguration
245
) throws -> WorkerConfigurations {
246
var configurations: WorkerConfigurations = FixedWorkerConfigurations()
247
if let defaultWorkerConfiguration = queueServerConfiguration.defaultWorkerConfiguration {
248
configurations = WorkerConfigurationsWithDefaultConfiguration(
249
defaultConfiguration: queueServerConfiguration.workerConfiguration(
250
workerSpecificConfiguration: defaultWorkerConfiguration,
251
payloadSignature: try di.get()
253
wrapped: configurations
257
for deploymentDestination in queueServerConfiguration.workerDeploymentDestinations {
258
if let workerSpecificConfiguration = deploymentDestination.configuration {
260
workerId: deploymentDestination.workerId,
261
configuration: queueServerConfiguration.workerConfiguration(
262
workerSpecificConfiguration: workerSpecificConfiguration,
263
payloadSignature: try di.get()
268
return configurations