Dragonfly2

Форк
0
/
seed_peer_client.go 
206 строк · 5.6 Кб
1
/*
2
 *     Copyright 2022 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
//go:generate mockgen -destination seed_peer_client_mock.go -source seed_peer_client.go -package resource
18

19
package resource
20

21
import (
22
	"context"
23
	"fmt"
24
	reflect "reflect"
25

26
	"github.com/hashicorp/go-multierror"
27
	"google.golang.org/grpc"
28

29
	managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
30

31
	logger "d7y.io/dragonfly/v2/internal/dflog"
32
	"d7y.io/dragonfly/v2/pkg/dfnet"
33
	"d7y.io/dragonfly/v2/pkg/idgen"
34
	cdnsystemclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client"
35
	dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
36
	"d7y.io/dragonfly/v2/pkg/types"
37
	"d7y.io/dragonfly/v2/scheduler/config"
38
)
39

40
// SeedPeerClient is the interface used for client of seed peer.
41
type SeedPeerClient interface {
42
	// Addrs returns the addresses of seed peers.
43
	Addrs() []string
44

45
	// Client is cdnsystem grpc client interface.
46
	cdnsystemclient.Client
47

48
	// V2 is dfdaemon v2 grpc client interface.
49
	dfdaemonclient.V2
50

51
	// Observer is dynconfig observer interface.
52
	config.Observer
53
}
54

55
// seedPeerClient contains content for client of seed peer.
56
type seedPeerClient struct {
57
	// Client is cdnsystem grpc client interface.
58
	cdnsystemclient.Client
59

60
	// V2 is dfdaemon v2 grpc client interface.
61
	dfdaemonclient.V2
62

63
	// hostManager is host manager.
64
	hostManager HostManager
65

66
	// dynconfig is dynconfig interface.
67
	dynconfig config.DynconfigInterface
68

69
	// data is dynconfig data.
70
	data *config.DynconfigData
71
}
72

73
// New seed peer client interface.
74
func newSeedPeerClient(dynconfig config.DynconfigInterface, hostManager HostManager, opts ...grpc.DialOption) (SeedPeerClient, error) {
75
	config, err := dynconfig.Get()
76
	if err != nil {
77
		return nil, err
78
	}
79
	logger.Infof("initialize seed peer addresses: %#v", seedPeersToNetAddrs(config.Scheduler.SeedPeers))
80

81
	// Initialize seed peer grpc client.
82
	cdnsystemClient, err := cdnsystemclient.GetClient(context.Background(), dynconfig, opts...)
83
	if err != nil {
84
		return nil, err
85
	}
86

87
	fmt.Println("cdnsystemClient", cdnsystemClient)
88

89
	// Initialize dfdaemon v2 grpc client.
90
	dfdaemonClient, err := dfdaemonclient.GetV2(context.Background(), dynconfig, opts...)
91
	if err != nil {
92
		return nil, err
93
	}
94

95
	fmt.Println("dfdaemonClient", dfdaemonClient)
96

97
	sc := &seedPeerClient{
98
		hostManager: hostManager,
99
		Client:      cdnsystemClient,
100
		V2:          dfdaemonClient,
101
		dynconfig:   dynconfig,
102
		data:        config,
103
	}
104

105
	// Initialize seed peers for host manager.
106
	sc.updateSeedPeersForHostManager(config.Scheduler.SeedPeers)
107

108
	dynconfig.Register(sc)
109
	return sc, nil
110
}
111

112
// Close closes the seed peer client.
113
func (sc *seedPeerClient) Close() error {
114
	var errs error
115
	if err := sc.Client.Close(); err != nil {
116
		errs = multierror.Append(errs, err)
117
	}
118

119
	if err := sc.V2.Close(); err != nil {
120
		errs = multierror.Append(errs, err)
121
	}
122

123
	return errs
124
}
125

126
// Addrs returns the addresses of seed peers.
127
func (sc *seedPeerClient) Addrs() []string {
128
	var addrs []string
129
	for _, seedPeer := range sc.data.Scheduler.SeedPeers {
130
		addrs = append(addrs, fmt.Sprintf("%s:%d", seedPeer.Ip, seedPeer.Port))
131
	}
132

133
	return addrs
134
}
135

136
// Dynamic config notify function.
137
func (sc *seedPeerClient) OnNotify(data *config.DynconfigData) {
138
	if reflect.DeepEqual(sc.data, data) {
139
		return
140
	}
141

142
	// Update seed peers for host manager.
143
	sc.updateSeedPeersForHostManager(data.Scheduler.SeedPeers)
144

145
	// Update dynamic data.
146
	sc.data = data
147

148
	// Update grpc seed peer addresses.
149
	logger.Infof("addresses have been updated: %#v", seedPeersToNetAddrs(data.Scheduler.SeedPeers))
150
}
151

152
// updateSeedPeersForHostManager updates seed peers for host manager.
153
func (sc *seedPeerClient) updateSeedPeersForHostManager(seedPeers []*managerv2.SeedPeer) {
154
	for _, seedPeer := range seedPeers {
155
		var concurrentUploadLimit int32
156
		if config, err := config.GetSeedPeerClusterConfigBySeedPeer(seedPeer); err == nil {
157
			concurrentUploadLimit = int32(config.LoadLimit)
158
		}
159

160
		id := idgen.HostIDV2(seedPeer.Ip, seedPeer.Hostname)
161
		seedPeerHost, loaded := sc.hostManager.Load(id)
162
		if !loaded {
163
			options := []HostOption{WithNetwork(Network{
164
				Location: seedPeer.GetLocation(),
165
				IDC:      seedPeer.GetIdc(),
166
			})}
167
			if concurrentUploadLimit > 0 {
168
				options = append(options, WithConcurrentUploadLimit(concurrentUploadLimit))
169
			}
170

171
			host := NewHost(
172
				id, seedPeer.Ip, seedPeer.Hostname,
173
				seedPeer.Port, seedPeer.DownloadPort, types.HostTypeSuperSeed,
174
				options...,
175
			)
176

177
			sc.hostManager.Store(host)
178
			continue
179
		}
180

181
		seedPeerHost.Type = types.HostTypeSuperSeed
182
		seedPeerHost.Port = seedPeer.Port
183
		seedPeerHost.DownloadPort = seedPeer.DownloadPort
184
		seedPeerHost.Network.Location = seedPeer.GetLocation()
185
		seedPeerHost.Network.IDC = seedPeer.GetIdc()
186

187
		if concurrentUploadLimit > 0 {
188
			seedPeerHost.ConcurrentUploadLimit.Store(concurrentUploadLimit)
189
		}
190
	}
191

192
	return
193
}
194

195
// seedPeersToNetAddrs coverts []*config.SeedPeer to []dfnet.NetAddr.
196
func seedPeersToNetAddrs(seedPeers []*managerv2.SeedPeer) []dfnet.NetAddr {
197
	netAddrs := make([]dfnet.NetAddr, 0, len(seedPeers))
198
	for _, seedPeer := range seedPeers {
199
		netAddrs = append(netAddrs, dfnet.NetAddr{
200
			Type: dfnet.TCP,
201
			Addr: fmt.Sprintf("%s:%d", seedPeer.Ip, seedPeer.Port),
202
		})
203
	}
204

205
	return netAddrs
206
}
207

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.