Emcee

Форк
0
/
EventDistributor.swift 
104 строки · 3.6 Кб
1
import Foundation
2
import EmceeLogging
3
import PluginSupport
4
import Swifter
5
import SynchronousWaiter
6

7
public final class EventDistributor {
8
    private final class WeakWebSocketSession {
9
        weak var webSocketSession: WebSocketSession?
10

11
        public init(webSocketSession: WebSocketSession?) {
12
            self.webSocketSession = webSocketSession
13
        }
14
    }
15
    
16
    private let logger: ContextualLogger
17
    private var pluginIdentifiers = Set<String>()
18
    private var connectedPluginIdentifiers = Set<String>()
19
    private let server = HttpServer()
20
    private let hostname: String
21
    private var webSocketSessions = [WeakWebSocketSession]()
22
    private let queue = DispatchQueue(label: "EventDistributor.queue")
23
    private let encoder = JSONEncoder()
24
    private let decoder = JSONDecoder()
25
    
26
    public init(
27
        hostname: String,
28
        logger: ContextualLogger
29
    ) {
30
        self.hostname = hostname
31
        self.logger = logger
32
    }
33
    
34
    public func start() throws {
35
        try queue.sync {
36
            logger.trace("Starting web socket server")
37
            server["/"] = websocket(text: nil, binary: onBinary, pong: nil, connected: nil, disconnected: nil)
38
            try server.start(0, forceIPv4: false, priority: .default)
39
        }
40
        logger.trace("Web socket server is available at: \(try webSocketAddress())")
41
    }
42
    
43
    public func stop() {
44
        queue.sync {
45
            logger.trace("Stopping web socket server")
46
            server.stop()
47
        }
48
    }
49
    
50
    public func add(pluginIdentifier: String) {
51
        pluginIdentifiers.insert(pluginIdentifier)
52
    }
53
    
54
    public func waitForPluginsToConnect(timeout: TimeInterval) throws {
55
        try SynchronousWaiter().waitWhile(pollPeriod: 0.5, timeout: timeout, description: "Waiting for \(pluginIdentifiers.count) plugins to connect") {
56
            connectedPluginIdentifiers != pluginIdentifiers
57
        }
58
    }
59
    
60
    public func webSocketAddress() throws -> String {
61
        return try queue.sync {
62
            let port = try server.port()
63
            let address = "ws://\(hostname):\(port)/"
64
            return address
65
        }
66
    }
67
    
68
    public func send(data: Data) {
69
        queue.sync {
70
            let bytes = [UInt8](data)
71
            for session in webSocketSessions {
72
                session.webSocketSession?.writeBinary(bytes)
73
            }
74
            webSocketSessions.removeAll { $0.webSocketSession == nil }
75
        }
76
    }
77

78
    private func onBinary(session: WebSocketSession, incomingData: [UInt8]) -> Void {
79
        let data = Data(incomingData)
80
        
81
        let acknowledgement: PluginHandshakeAcknowledgement
82
        do {
83
            let handshakeRequest = try decoder.decode(PluginHandshakeRequest.self, from: data)
84
            if pluginIdentifiers.contains(handshakeRequest.pluginIdentifier) {
85
                connectedPluginIdentifiers.insert(handshakeRequest.pluginIdentifier)
86
                webSocketSessions.append(WeakWebSocketSession(webSocketSession: session))
87
                acknowledgement = .successful
88
            } else {
89
                acknowledgement = .error("Unknown plugin identifier: '\(handshakeRequest.pluginIdentifier)'")
90
            }
91
        } catch {
92
            acknowledgement = .error("Internal error: '\(error)'")
93
        }
94
        
95
        logger.trace("New connection from plugin with acknowledgement: '\(acknowledgement)'")
96
        
97
        do {
98
            let data = try encoder.encode(acknowledgement)
99
            session.writeBinary([UInt8](data))
100
        } catch {
101
            logger.error("Failed to send acknowledgement: \(error)")
102
        }
103
    }
104
}
105

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.