Dragonfly2
206 строк · 5.6 Кб
1/*
2* Copyright 2022 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17//go:generate mockgen -destination seed_peer_client_mock.go -source seed_peer_client.go -package resource
18
19package resource
20
21import (
22"context"
23"fmt"
24reflect "reflect"
25
26"github.com/hashicorp/go-multierror"
27"google.golang.org/grpc"
28
29managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
30
31logger "d7y.io/dragonfly/v2/internal/dflog"
32"d7y.io/dragonfly/v2/pkg/dfnet"
33"d7y.io/dragonfly/v2/pkg/idgen"
34cdnsystemclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client"
35dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
36"d7y.io/dragonfly/v2/pkg/types"
37"d7y.io/dragonfly/v2/scheduler/config"
38)
39
40// SeedPeerClient is the interface used for client of seed peer.
41type SeedPeerClient interface {
42// Addrs returns the addresses of seed peers.
43Addrs() []string
44
45// Client is cdnsystem grpc client interface.
46cdnsystemclient.Client
47
48// V2 is dfdaemon v2 grpc client interface.
49dfdaemonclient.V2
50
51// Observer is dynconfig observer interface.
52config.Observer
53}
54
55// seedPeerClient contains content for client of seed peer.
56type seedPeerClient struct {
57// Client is cdnsystem grpc client interface.
58cdnsystemclient.Client
59
60// V2 is dfdaemon v2 grpc client interface.
61dfdaemonclient.V2
62
63// hostManager is host manager.
64hostManager HostManager
65
66// dynconfig is dynconfig interface.
67dynconfig config.DynconfigInterface
68
69// data is dynconfig data.
70data *config.DynconfigData
71}
72
73// New seed peer client interface.
74func newSeedPeerClient(dynconfig config.DynconfigInterface, hostManager HostManager, opts ...grpc.DialOption) (SeedPeerClient, error) {
75config, err := dynconfig.Get()
76if err != nil {
77return nil, err
78}
79logger.Infof("initialize seed peer addresses: %#v", seedPeersToNetAddrs(config.Scheduler.SeedPeers))
80
81// Initialize seed peer grpc client.
82cdnsystemClient, err := cdnsystemclient.GetClient(context.Background(), dynconfig, opts...)
83if err != nil {
84return nil, err
85}
86
87fmt.Println("cdnsystemClient", cdnsystemClient)
88
89// Initialize dfdaemon v2 grpc client.
90dfdaemonClient, err := dfdaemonclient.GetV2(context.Background(), dynconfig, opts...)
91if err != nil {
92return nil, err
93}
94
95fmt.Println("dfdaemonClient", dfdaemonClient)
96
97sc := &seedPeerClient{
98hostManager: hostManager,
99Client: cdnsystemClient,
100V2: dfdaemonClient,
101dynconfig: dynconfig,
102data: config,
103}
104
105// Initialize seed peers for host manager.
106sc.updateSeedPeersForHostManager(config.Scheduler.SeedPeers)
107
108dynconfig.Register(sc)
109return sc, nil
110}
111
112// Close closes the seed peer client.
113func (sc *seedPeerClient) Close() error {
114var errs error
115if err := sc.Client.Close(); err != nil {
116errs = multierror.Append(errs, err)
117}
118
119if err := sc.V2.Close(); err != nil {
120errs = multierror.Append(errs, err)
121}
122
123return errs
124}
125
126// Addrs returns the addresses of seed peers.
127func (sc *seedPeerClient) Addrs() []string {
128var addrs []string
129for _, seedPeer := range sc.data.Scheduler.SeedPeers {
130addrs = append(addrs, fmt.Sprintf("%s:%d", seedPeer.Ip, seedPeer.Port))
131}
132
133return addrs
134}
135
136// Dynamic config notify function.
137func (sc *seedPeerClient) OnNotify(data *config.DynconfigData) {
138if reflect.DeepEqual(sc.data, data) {
139return
140}
141
142// Update seed peers for host manager.
143sc.updateSeedPeersForHostManager(data.Scheduler.SeedPeers)
144
145// Update dynamic data.
146sc.data = data
147
148// Update grpc seed peer addresses.
149logger.Infof("addresses have been updated: %#v", seedPeersToNetAddrs(data.Scheduler.SeedPeers))
150}
151
152// updateSeedPeersForHostManager updates seed peers for host manager.
153func (sc *seedPeerClient) updateSeedPeersForHostManager(seedPeers []*managerv2.SeedPeer) {
154for _, seedPeer := range seedPeers {
155var concurrentUploadLimit int32
156if config, err := config.GetSeedPeerClusterConfigBySeedPeer(seedPeer); err == nil {
157concurrentUploadLimit = int32(config.LoadLimit)
158}
159
160id := idgen.HostIDV2(seedPeer.Ip, seedPeer.Hostname)
161seedPeerHost, loaded := sc.hostManager.Load(id)
162if !loaded {
163options := []HostOption{WithNetwork(Network{
164Location: seedPeer.GetLocation(),
165IDC: seedPeer.GetIdc(),
166})}
167if concurrentUploadLimit > 0 {
168options = append(options, WithConcurrentUploadLimit(concurrentUploadLimit))
169}
170
171host := NewHost(
172id, seedPeer.Ip, seedPeer.Hostname,
173seedPeer.Port, seedPeer.DownloadPort, types.HostTypeSuperSeed,
174options...,
175)
176
177sc.hostManager.Store(host)
178continue
179}
180
181seedPeerHost.Type = types.HostTypeSuperSeed
182seedPeerHost.Port = seedPeer.Port
183seedPeerHost.DownloadPort = seedPeer.DownloadPort
184seedPeerHost.Network.Location = seedPeer.GetLocation()
185seedPeerHost.Network.IDC = seedPeer.GetIdc()
186
187if concurrentUploadLimit > 0 {
188seedPeerHost.ConcurrentUploadLimit.Store(concurrentUploadLimit)
189}
190}
191
192return
193}
194
195// seedPeersToNetAddrs coverts []*config.SeedPeer to []dfnet.NetAddr.
196func seedPeersToNetAddrs(seedPeers []*managerv2.SeedPeer) []dfnet.NetAddr {
197netAddrs := make([]dfnet.NetAddr, 0, len(seedPeers))
198for _, seedPeer := range seedPeers {
199netAddrs = append(netAddrs, dfnet.NetAddr{
200Type: dfnet.TCP,
201Addr: fmt.Sprintf("%s:%d", seedPeer.Ip, seedPeer.Port),
202})
203}
204
205return netAddrs
206}
207