10
import SynchronousWaiter
13
public final class EnableWorkerCommand: Command {
14
public let name = "enableWorker"
16
public let description = "Enables the provided worker: queue will let it execute further jobs"
18
public var arguments: Arguments = [
19
ArgumentDescriptions.queueServer.asRequired,
20
ArgumentDescriptions.workerId.asRequired,
23
private let callbackQueue = DispatchQueue(label: "EnableWorkerCommand.callbackQueue")
26
public init(di: DI) throws {
30
public func run(payload: CommandPayload) throws {
31
let queueServerAddress: SocketAddress = try payload.expectedSingleTypedValue(argumentName: ArgumentDescriptions.queueServer.name)
32
let workerId: WorkerId = try payload.expectedSingleTypedValue(argumentName: ArgumentDescriptions.workerId.name)
34
let logger = try di.get(ContextualLogger.self)
36
let workerEnabler = WorkerEnablerImpl(
37
requestSender: try di.get(RequestSenderProvider.self).requestSender(
38
socketAddress: queueServerAddress
42
let callbackWaiter: CallbackWaiter<Either<WorkerId, Error>> = try di.get(Waiter.self).createCallbackWaiter()
44
workerEnabler.enableWorker(
46
callbackQueue: callbackQueue
47
) { (result: Either<WorkerId, Error>) in
48
callbackWaiter.set(result: result)
51
let enabledWorkerId = try callbackWaiter.wait(timeout: 15, description: "Request to enable worker \(workerId) on queue")
53
logger.info("Successfully enabled worker \(try enabledWorkerId.dematerialize()) on queue \(queueServerAddress)")
55
logger.error("Failed to enable worker \(workerId) on queue \(queueServerAddress): \(error)")