Emcee
104 строки · 3.6 Кб
1import Foundation
2import EmceeLogging
3import PluginSupport
4import Swifter
5import SynchronousWaiter
6
7public final class EventDistributor {
8private final class WeakWebSocketSession {
9weak var webSocketSession: WebSocketSession?
10
11public init(webSocketSession: WebSocketSession?) {
12self.webSocketSession = webSocketSession
13}
14}
15
16private let logger: ContextualLogger
17private var pluginIdentifiers = Set<String>()
18private var connectedPluginIdentifiers = Set<String>()
19private let server = HttpServer()
20private let hostname: String
21private var webSocketSessions = [WeakWebSocketSession]()
22private let queue = DispatchQueue(label: "EventDistributor.queue")
23private let encoder = JSONEncoder()
24private let decoder = JSONDecoder()
25
26public init(
27hostname: String,
28logger: ContextualLogger
29) {
30self.hostname = hostname
31self.logger = logger
32}
33
34public func start() throws {
35try queue.sync {
36logger.trace("Starting web socket server")
37server["/"] = websocket(text: nil, binary: onBinary, pong: nil, connected: nil, disconnected: nil)
38try server.start(0, forceIPv4: false, priority: .default)
39}
40logger.trace("Web socket server is available at: \(try webSocketAddress())")
41}
42
43public func stop() {
44queue.sync {
45logger.trace("Stopping web socket server")
46server.stop()
47}
48}
49
50public func add(pluginIdentifier: String) {
51pluginIdentifiers.insert(pluginIdentifier)
52}
53
54public func waitForPluginsToConnect(timeout: TimeInterval) throws {
55try SynchronousWaiter().waitWhile(pollPeriod: 0.5, timeout: timeout, description: "Waiting for \(pluginIdentifiers.count) plugins to connect") {
56connectedPluginIdentifiers != pluginIdentifiers
57}
58}
59
60public func webSocketAddress() throws -> String {
61return try queue.sync {
62let port = try server.port()
63let address = "ws://\(hostname):\(port)/"
64return address
65}
66}
67
68public func send(data: Data) {
69queue.sync {
70let bytes = [UInt8](data)
71for session in webSocketSessions {
72session.webSocketSession?.writeBinary(bytes)
73}
74webSocketSessions.removeAll { $0.webSocketSession == nil }
75}
76}
77
78private func onBinary(session: WebSocketSession, incomingData: [UInt8]) -> Void {
79let data = Data(incomingData)
80
81let acknowledgement: PluginHandshakeAcknowledgement
82do {
83let handshakeRequest = try decoder.decode(PluginHandshakeRequest.self, from: data)
84if pluginIdentifiers.contains(handshakeRequest.pluginIdentifier) {
85connectedPluginIdentifiers.insert(handshakeRequest.pluginIdentifier)
86webSocketSessions.append(WeakWebSocketSession(webSocketSession: session))
87acknowledgement = .successful
88} else {
89acknowledgement = .error("Unknown plugin identifier: '\(handshakeRequest.pluginIdentifier)'")
90}
91} catch {
92acknowledgement = .error("Internal error: '\(error)'")
93}
94
95logger.trace("New connection from plugin with acknowledgement: '\(acknowledgement)'")
96
97do {
98let data = try encoder.encode(acknowledgement)
99session.writeBinary([UInt8](data))
100} catch {
101logger.error("Failed to send acknowledgement: \(error)")
102}
103}
104}
105