Dragonfly2
203 строки · 5.3 Кб
1/*
2* Copyright 2023 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17//go:generate mockgen -destination mocks/network_topology_mock.go -source network_topology.go -package mocks
18
19package networktopology
20
21import (
22"context"
23"io"
24"sync"
25"time"
26
27"google.golang.org/protobuf/types/known/durationpb"
28"google.golang.org/protobuf/types/known/timestamppb"
29
30v1 "d7y.io/api/v2/pkg/apis/common/v1"
31schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
32
33"d7y.io/dragonfly/v2/client/config"
34logger "d7y.io/dragonfly/v2/internal/dflog"
35"d7y.io/dragonfly/v2/pkg/net/ping"
36schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
37)
38
39type NetworkTopology interface {
40// Serve starts network topology server.
41Serve()
42
43// Stop stops network topology server.
44Stop()
45}
46
47// networkTopology implements NetworkTopology.
48type networkTopology struct {
49config *config.DaemonOption
50hostID string
51daemonPort int32
52daemonDownloadPort int32
53schedulerClient schedulerclient.V1
54done chan struct{}
55}
56
57// NewNetworkTopology returns a new NetworkTopology interface.
58func NewNetworkTopology(cfg *config.DaemonOption, hostID string, daemonPort int32, daemonDownloadPort int32,
59schedulerClient schedulerclient.V1) (NetworkTopology, error) {
60return &networkTopology{
61config: cfg,
62hostID: hostID,
63daemonPort: daemonPort,
64daemonDownloadPort: daemonDownloadPort,
65schedulerClient: schedulerClient,
66done: make(chan struct{}),
67}, nil
68}
69
70// Serve starts network topology server.
71func (nt *networkTopology) Serve() {
72tick := time.NewTicker(nt.config.NetworkTopology.Probe.Interval)
73for {
74select {
75case <-tick.C:
76if err := nt.syncProbes(); err != nil {
77logger.Error(err)
78}
79case <-nt.done:
80return
81}
82}
83}
84
85// Stop stops network topology server.
86func (nt *networkTopology) Stop() {
87close(nt.done)
88}
89
90// syncProbes syncs probes to scheduler.
91func (nt *networkTopology) syncProbes() error {
92host := &v1.Host{
93Id: nt.hostID,
94Ip: nt.config.Host.AdvertiseIP.String(),
95Hostname: nt.config.Host.Hostname,
96Port: nt.daemonPort,
97DownloadPort: nt.daemonDownloadPort,
98Location: nt.config.Host.Location,
99Idc: nt.config.Host.IDC,
100}
101
102stream, err := nt.schedulerClient.SyncProbes(context.Background(), &schedulerv1.SyncProbesRequest{
103Host: host,
104Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
105ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
106},
107})
108if err != nil {
109return err
110}
111
112resp, err := stream.Recv()
113if err != nil {
114if err == io.EOF {
115return nil
116}
117
118return err
119}
120
121// Ping the destination host with the ICMP protocol.
122probes, failedProbes := nt.pingHosts(resp.Hosts)
123if len(probes) > 0 {
124if err := stream.Send(&schedulerv1.SyncProbesRequest{
125Host: host,
126Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{
127ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{
128Probes: probes,
129},
130},
131}); err != nil {
132return err
133}
134}
135
136if len(failedProbes) > 0 {
137if err := stream.Send(&schedulerv1.SyncProbesRequest{
138Host: host,
139Request: &schedulerv1.SyncProbesRequest_ProbeFailedRequest{
140ProbeFailedRequest: &schedulerv1.ProbeFailedRequest{
141Probes: failedProbes,
142},
143},
144}); err != nil {
145return err
146}
147}
148
149return nil
150}
151
152// Ping the destination host with the ICMP protocol. If the host is unreachable,
153// we will send the failed probe result to the scheduler. If the host is reachable,
154// we will send the probe result to the scheduler.
155func (nt *networkTopology) pingHosts(destHosts []*v1.Host) ([]*schedulerv1.Probe, []*schedulerv1.FailedProbe) {
156var (
157probes []*schedulerv1.Probe
158failedProbes []*schedulerv1.FailedProbe
159)
160
161wg := &sync.WaitGroup{}
162wg.Add(len(destHosts))
163for _, destHost := range destHosts {
164go func(destHost *v1.Host) {
165defer wg.Done()
166
167stats, err := ping.Ping(destHost.Ip)
168if err != nil {
169failedProbes = append(failedProbes, &schedulerv1.FailedProbe{
170Host: &v1.Host{
171Id: destHost.Id,
172Ip: destHost.Ip,
173Hostname: destHost.Hostname,
174Port: destHost.Port,
175DownloadPort: destHost.DownloadPort,
176Location: destHost.Location,
177Idc: destHost.Idc,
178},
179Description: err.Error(),
180})
181
182return
183}
184
185probes = append(probes, &schedulerv1.Probe{
186Host: &v1.Host{
187Id: destHost.Id,
188Ip: destHost.Ip,
189Hostname: destHost.Hostname,
190Port: destHost.Port,
191DownloadPort: destHost.DownloadPort,
192Location: destHost.Location,
193Idc: destHost.Idc,
194},
195Rtt: durationpb.New(stats.AvgRtt),
196CreatedAt: timestamppb.New(time.Now()),
197})
198}(destHost)
199}
200
201wg.Wait()
202return probes, failedProbes
203}
204