Dragonfly2
109 строк · 2.4 Кб
1/*
2* Copyright 2022 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package peer
18
19type pieceBroker struct {
20stopCh chan struct{}
21publishCh chan *PieceInfo
22subCh chan chan *PieceInfo
23unsubCh chan chan *PieceInfo
24}
25
26type PieceInfo struct {
27// Num is the current piece num
28Num int32
29// OrderedNum is the max pieces num with ordered, eg: 0 1 2 3 5 7 8, the OrderedNum is 3
30OrderedNum int32
31Finished bool
32}
33
34func newPieceBroker() *pieceBroker {
35return &pieceBroker{
36stopCh: make(chan struct{}),
37publishCh: make(chan *PieceInfo, 10),
38subCh: make(chan chan *PieceInfo),
39unsubCh: make(chan chan *PieceInfo),
40}
41}
42
43func (b *pieceBroker) Start() {
44var (
45orderedNum int32 = -1
46subs = map[chan *PieceInfo]struct{}{}
47pieces = map[int32]struct{}{}
48)
49
50for {
51select {
52case <-b.stopCh:
53//for msgCh := range subs {
54// close(msgCh)
55//}
56return
57case msgCh := <-b.subCh:
58subs[msgCh] = struct{}{}
59case msgCh := <-b.unsubCh:
60delete(subs, msgCh)
61case msg := <-b.publishCh:
62pieces[msg.Num] = struct{}{}
63if orderedNum+1 == msg.Num {
64orderedNum++
65// search cached pieces
66for {
67if _, ok := pieces[orderedNum+1]; ok {
68orderedNum++
69} else {
70break
71}
72}
73}
74msg.OrderedNum = orderedNum
75for msgCh := range subs {
76// msgCh is buffered, use non-blocking send to protect the broker
77select {
78case msgCh <- msg:
79default:
80}
81}
82}
83}
84}
85
86func (b *pieceBroker) Stop() {
87close(b.stopCh)
88}
89
90func (b *pieceBroker) Subscribe() chan *PieceInfo {
91msgCh := make(chan *PieceInfo, 5)
92select {
93case <-b.stopCh:
94case b.subCh <- msgCh:
95}
96
97return msgCh
98}
99
100func (b *pieceBroker) Unsubscribe(msgCh chan *PieceInfo) {
101b.unsubCh <- msgCh
102}
103
104func (b *pieceBroker) Publish(msg *PieceInfo) {
105select {
106case b.publishCh <- msg:
107case <-b.stopCh:
108}
109}
110