Dragonfly2

Форк
0
/
network_topology.go 
203 строки · 5.3 Кб
1
/*
2
 *     Copyright 2023 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
//go:generate mockgen -destination mocks/network_topology_mock.go -source network_topology.go -package mocks
18

19
package networktopology
20

21
import (
22
	"context"
23
	"io"
24
	"sync"
25
	"time"
26

27
	"google.golang.org/protobuf/types/known/durationpb"
28
	"google.golang.org/protobuf/types/known/timestamppb"
29

30
	v1 "d7y.io/api/v2/pkg/apis/common/v1"
31
	schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
32

33
	"d7y.io/dragonfly/v2/client/config"
34
	logger "d7y.io/dragonfly/v2/internal/dflog"
35
	"d7y.io/dragonfly/v2/pkg/net/ping"
36
	schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
37
)
38

39
type NetworkTopology interface {
40
	// Serve starts network topology server.
41
	Serve()
42

43
	// Stop stops network topology server.
44
	Stop()
45
}
46

47
// networkTopology implements NetworkTopology.
48
type networkTopology struct {
49
	config             *config.DaemonOption
50
	hostID             string
51
	daemonPort         int32
52
	daemonDownloadPort int32
53
	schedulerClient    schedulerclient.V1
54
	done               chan struct{}
55
}
56

57
// NewNetworkTopology returns a new NetworkTopology interface.
58
func NewNetworkTopology(cfg *config.DaemonOption, hostID string, daemonPort int32, daemonDownloadPort int32,
59
	schedulerClient schedulerclient.V1) (NetworkTopology, error) {
60
	return &networkTopology{
61
		config:             cfg,
62
		hostID:             hostID,
63
		daemonPort:         daemonPort,
64
		daemonDownloadPort: daemonDownloadPort,
65
		schedulerClient:    schedulerClient,
66
		done:               make(chan struct{}),
67
	}, nil
68
}
69

70
// Serve starts network topology server.
71
func (nt *networkTopology) Serve() {
72
	tick := time.NewTicker(nt.config.NetworkTopology.Probe.Interval)
73
	for {
74
		select {
75
		case <-tick.C:
76
			if err := nt.syncProbes(); err != nil {
77
				logger.Error(err)
78
			}
79
		case <-nt.done:
80
			return
81
		}
82
	}
83
}
84

85
// Stop stops network topology server.
86
func (nt *networkTopology) Stop() {
87
	close(nt.done)
88
}
89

90
// syncProbes syncs probes to scheduler.
91
func (nt *networkTopology) syncProbes() error {
92
	host := &v1.Host{
93
		Id:           nt.hostID,
94
		Ip:           nt.config.Host.AdvertiseIP.String(),
95
		Hostname:     nt.config.Host.Hostname,
96
		Port:         nt.daemonPort,
97
		DownloadPort: nt.daemonDownloadPort,
98
		Location:     nt.config.Host.Location,
99
		Idc:          nt.config.Host.IDC,
100
	}
101

102
	stream, err := nt.schedulerClient.SyncProbes(context.Background(), &schedulerv1.SyncProbesRequest{
103
		Host: host,
104
		Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
105
			ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
106
		},
107
	})
108
	if err != nil {
109
		return err
110
	}
111

112
	resp, err := stream.Recv()
113
	if err != nil {
114
		if err == io.EOF {
115
			return nil
116
		}
117

118
		return err
119
	}
120

121
	// Ping the destination host with the ICMP protocol.
122
	probes, failedProbes := nt.pingHosts(resp.Hosts)
123
	if len(probes) > 0 {
124
		if err := stream.Send(&schedulerv1.SyncProbesRequest{
125
			Host: host,
126
			Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{
127
				ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{
128
					Probes: probes,
129
				},
130
			},
131
		}); err != nil {
132
			return err
133
		}
134
	}
135

136
	if len(failedProbes) > 0 {
137
		if err := stream.Send(&schedulerv1.SyncProbesRequest{
138
			Host: host,
139
			Request: &schedulerv1.SyncProbesRequest_ProbeFailedRequest{
140
				ProbeFailedRequest: &schedulerv1.ProbeFailedRequest{
141
					Probes: failedProbes,
142
				},
143
			},
144
		}); err != nil {
145
			return err
146
		}
147
	}
148

149
	return nil
150
}
151

152
// Ping the destination host with the ICMP protocol. If the host is unreachable,
153
// we will send the failed probe result to the scheduler. If the host is reachable,
154
// we will send the probe result to the scheduler.
155
func (nt *networkTopology) pingHosts(destHosts []*v1.Host) ([]*schedulerv1.Probe, []*schedulerv1.FailedProbe) {
156
	var (
157
		probes       []*schedulerv1.Probe
158
		failedProbes []*schedulerv1.FailedProbe
159
	)
160

161
	wg := &sync.WaitGroup{}
162
	wg.Add(len(destHosts))
163
	for _, destHost := range destHosts {
164
		go func(destHost *v1.Host) {
165
			defer wg.Done()
166

167
			stats, err := ping.Ping(destHost.Ip)
168
			if err != nil {
169
				failedProbes = append(failedProbes, &schedulerv1.FailedProbe{
170
					Host: &v1.Host{
171
						Id:           destHost.Id,
172
						Ip:           destHost.Ip,
173
						Hostname:     destHost.Hostname,
174
						Port:         destHost.Port,
175
						DownloadPort: destHost.DownloadPort,
176
						Location:     destHost.Location,
177
						Idc:          destHost.Idc,
178
					},
179
					Description: err.Error(),
180
				})
181

182
				return
183
			}
184

185
			probes = append(probes, &schedulerv1.Probe{
186
				Host: &v1.Host{
187
					Id:           destHost.Id,
188
					Ip:           destHost.Ip,
189
					Hostname:     destHost.Hostname,
190
					Port:         destHost.Port,
191
					DownloadPort: destHost.DownloadPort,
192
					Location:     destHost.Location,
193
					Idc:          destHost.Idc,
194
				},
195
				Rtt:       durationpb.New(stats.AvgRtt),
196
				CreatedAt: timestamppb.New(time.Now()),
197
			})
198
		}(destHost)
199
	}
200

201
	wg.Wait()
202
	return probes, failedProbes
203
}
204

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.