CommandLineToolkit
83 строки · 1.7 Кб
1import AtomicModels
2import Dispatch
3import Foundation
4
5public final class BlockingArrayBasedJSONStream: AppendableJSONStream {
6private let readLock = NSLock()
7private let writeLock = DispatchSemaphore(value: 0)
8
9private let storage = AtomicValue<[UInt8]>([])
10
11private var willProvideMoreData = true
12
13public init() {}
14
15public func append(bytes: [UInt8]) {
16storage.withExclusiveAccess {
17$0.insert(contentsOf: bytes.reversed(), at: 0)
18}
19onNewData()
20}
21
22// MARK: - JSONStream
23
24public func touch() -> UInt8? {
25return lastByte(delete: false)
26}
27
28public func read() -> UInt8? {
29return lastByte(delete: true)
30}
31
32public func close() {
33willProvideMoreData = false
34onStreamClose()
35}
36
37private func lastByte(delete: Bool) -> UInt8? {
38readLock.lock()
39defer {
40readLock.unlock()
41}
42
43if storage.currentValue().isEmpty {
44if willProvideMoreData {
45waitForNewDataOrStreamCloseEvent()
46} else {
47return nil
48}
49}
50
51return storage.withExclusiveAccess {
52if delete {
53return $0.popLast()
54} else {
55return $0.last
56}
57}
58}
59
60private func waitForNewDataOrStreamCloseEvent() {
61writeLock.waitForUnblocking()
62}
63
64private func onNewData() {
65writeLock.unblock()
66}
67
68private func onStreamClose() {
69writeLock.unblock()
70}
71}
72
73extension DispatchSemaphore {
74func waitForUnblocking() {
75wait()
76signal()
77}
78
79func unblock() {
80signal()
81wait()
82}
83}
84