Dragonfly2

Форк
0
/
piece_broker.go 
109 строк · 2.4 Кб
1
/*
2
 *     Copyright 2022 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package peer
18

19
type pieceBroker struct {
20
	stopCh    chan struct{}
21
	publishCh chan *PieceInfo
22
	subCh     chan chan *PieceInfo
23
	unsubCh   chan chan *PieceInfo
24
}
25

26
type PieceInfo struct {
27
	// Num is the current piece num
28
	Num int32
29
	// OrderedNum is the max pieces num with ordered, eg: 0 1 2 3 5 7 8, the OrderedNum is 3
30
	OrderedNum int32
31
	Finished   bool
32
}
33

34
func newPieceBroker() *pieceBroker {
35
	return &pieceBroker{
36
		stopCh:    make(chan struct{}),
37
		publishCh: make(chan *PieceInfo, 10),
38
		subCh:     make(chan chan *PieceInfo),
39
		unsubCh:   make(chan chan *PieceInfo),
40
	}
41
}
42

43
func (b *pieceBroker) Start() {
44
	var (
45
		orderedNum int32 = -1
46
		subs             = map[chan *PieceInfo]struct{}{}
47
		pieces           = map[int32]struct{}{}
48
	)
49

50
	for {
51
		select {
52
		case <-b.stopCh:
53
			//for msgCh := range subs {
54
			//	close(msgCh)
55
			//}
56
			return
57
		case msgCh := <-b.subCh:
58
			subs[msgCh] = struct{}{}
59
		case msgCh := <-b.unsubCh:
60
			delete(subs, msgCh)
61
		case msg := <-b.publishCh:
62
			pieces[msg.Num] = struct{}{}
63
			if orderedNum+1 == msg.Num {
64
				orderedNum++
65
				// search cached pieces
66
				for {
67
					if _, ok := pieces[orderedNum+1]; ok {
68
						orderedNum++
69
					} else {
70
						break
71
					}
72
				}
73
			}
74
			msg.OrderedNum = orderedNum
75
			for msgCh := range subs {
76
				// msgCh is buffered, use non-blocking send to protect the broker
77
				select {
78
				case msgCh <- msg:
79
				default:
80
				}
81
			}
82
		}
83
	}
84
}
85

86
func (b *pieceBroker) Stop() {
87
	close(b.stopCh)
88
}
89

90
func (b *pieceBroker) Subscribe() chan *PieceInfo {
91
	msgCh := make(chan *PieceInfo, 5)
92
	select {
93
	case <-b.stopCh:
94
	case b.subCh <- msgCh:
95
	}
96

97
	return msgCh
98
}
99

100
func (b *pieceBroker) Unsubscribe(msgCh chan *PieceInfo) {
101
	b.unsubCh <- msgCh
102
}
103

104
func (b *pieceBroker) Publish(msg *PieceInfo) {
105
	select {
106
	case b.publishCh <- msg:
107
	case <-b.stopCh:
108
	}
109
}
110

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.