Dragonfly2

Форк
0
337 строк · 9.3 Кб
1
/*
2
 *     Copyright 2022 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
//go:generate mockgen -destination mocks/announcer_mock.go -source announcer.go -package mocks
18

19
package announcer
20

21
import (
22
	"context"
23
	"os"
24
	"time"
25

26
	"github.com/shirou/gopsutil/v3/cpu"
27
	"github.com/shirou/gopsutil/v3/disk"
28
	"github.com/shirou/gopsutil/v3/host"
29
	"github.com/shirou/gopsutil/v3/mem"
30
	gopsutilnet "github.com/shirou/gopsutil/v3/net"
31
	"github.com/shirou/gopsutil/v3/process"
32

33
	managerv1 "d7y.io/api/v2/pkg/apis/manager/v1"
34
	schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
35

36
	"d7y.io/dragonfly/v2/client/config"
37
	logger "d7y.io/dragonfly/v2/internal/dflog"
38
	managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"
39
	schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
40
	"d7y.io/dragonfly/v2/pkg/types"
41
	"d7y.io/dragonfly/v2/version"
42
)
43

44
// Announcer is the interface used for announce service.
45
type Announcer interface {
46
	// Started announcer server.
47
	Serve() error
48

49
	// Stop announcer server.
50
	Stop() error
51
}
52

53
// announcer provides announce function.
54
type announcer struct {
55
	config                  *config.DaemonOption
56
	dynconfig               config.Dynconfig
57
	hostID                  string
58
	daemonPort              int32
59
	daemonDownloadPort      int32
60
	daemonObjectStoragePort int32
61
	schedulerClient         schedulerclient.V1
62
	managerClient           managerclient.V1
63
	done                    chan struct{}
64
}
65

66
// Option is a functional option for configuring the announcer.
67
type Option func(s *announcer)
68

69
// WithManagerClient sets the grpc client of manager.
70
func WithManagerClient(client managerclient.V1) Option {
71
	return func(a *announcer) {
72
		a.managerClient = client
73
	}
74
}
75

76
// WithObjectStoragePort sets the daemonObjectStoragePort.
77
func WithObjectStoragePort(port int32) Option {
78
	return func(a *announcer) {
79
		a.daemonObjectStoragePort = port
80
	}
81
}
82

83
// New returns a new Announcer interface.
84
func New(cfg *config.DaemonOption, dynconfig config.Dynconfig, hostID string, daemonPort int32, daemonDownloadPort int32, schedulerClient schedulerclient.V1, options ...Option) Announcer {
85
	a := &announcer{
86
		config:             cfg,
87
		dynconfig:          dynconfig,
88
		hostID:             hostID,
89
		daemonPort:         daemonPort,
90
		daemonDownloadPort: daemonDownloadPort,
91
		schedulerClient:    schedulerClient,
92
		done:               make(chan struct{}),
93
	}
94

95
	for _, opt := range options {
96
		opt(a)
97
	}
98

99
	return a
100
}
101

102
// Started announcer server.
103
func (a *announcer) Serve() error {
104
	if a.managerClient != nil {
105
		logger.Info("announce seed peer to manager")
106
		if err := a.announceToManager(); err != nil {
107
			return err
108
		}
109
	}
110

111
	logger.Info("announce peer to scheduler")
112
	if err := a.announceToScheduler(); err != nil {
113
		return err
114
	}
115

116
	return nil
117
}
118

119
// Stop announcer server.
120
func (a *announcer) Stop() error {
121
	close(a.done)
122
	return nil
123
}
124

125
// announceToScheduler announces peer information to scheduler.
126
func (a *announcer) announceToScheduler() error {
127
	req, err := a.newAnnounceHostRequest()
128
	if err != nil {
129
		return err
130
	}
131

132
	if err := a.schedulerClient.AnnounceHost(context.Background(), req); err != nil {
133
		logger.Errorf("announce for the first time failed: %s", err.Error())
134
	}
135

136
	// Announce to scheduler.
137
	tick := time.NewTicker(a.config.Announcer.SchedulerInterval)
138
	for {
139
		select {
140
		case <-tick.C:
141
			req, err := a.newAnnounceHostRequest()
142
			if err != nil {
143
				logger.Error(err)
144
				break
145
			}
146

147
			if err := a.schedulerClient.AnnounceHost(context.Background(), req); err != nil {
148
				logger.Error(err)
149
				break
150
			}
151
		case <-a.done:
152
			return nil
153
		}
154
	}
155
}
156

157
// newAnnounceHostRequest returns announce host request.
158
func (a *announcer) newAnnounceHostRequest() (*schedulerv1.AnnounceHostRequest, error) {
159
	hostType := types.HostTypeNormalName
160
	if a.config.Scheduler.Manager.SeedPeer.Enable {
161
		hostType = types.HostTypeSuperSeedName
162
	}
163

164
	var objectStoragePort int32
165
	if a.config.ObjectStorage.Enable {
166
		objectStoragePort = a.daemonObjectStoragePort
167
	}
168

169
	pid := os.Getpid()
170

171
	h, err := host.Info()
172
	if err != nil {
173
		return nil, err
174
	}
175

176
	proc, err := process.NewProcess(int32(pid))
177
	if err != nil {
178
		return nil, err
179
	}
180

181
	procCPUPercent, err := proc.CPUPercent()
182
	if err != nil {
183
		return nil, err
184
	}
185

186
	cpuPercent, err := cpu.Percent(0, false)
187
	if err != nil {
188
		return nil, err
189
	}
190

191
	cpuLogicalCount, err := cpu.Counts(true)
192
	if err != nil {
193
		return nil, err
194
	}
195

196
	cpuPhysicalCount, err := cpu.Counts(false)
197
	if err != nil {
198
		return nil, err
199
	}
200

201
	cpuTimes, err := cpu.Times(false)
202
	if err != nil {
203
		return nil, err
204
	}
205

206
	procMemoryPercent, err := proc.MemoryPercent()
207
	if err != nil {
208
		return nil, err
209
	}
210

211
	virtualMemory, err := mem.VirtualMemory()
212
	if err != nil {
213
		return nil, err
214
	}
215

216
	procTCPConnections, err := gopsutilnet.ConnectionsPid("tcp", int32(pid))
217
	if err != nil {
218
		return nil, err
219
	}
220

221
	var uploadTCPConnections []gopsutilnet.ConnectionStat
222
	for _, procTCPConnection := range procTCPConnections {
223
		if procTCPConnection.Laddr.Port == uint32(a.daemonDownloadPort) && procTCPConnection.Status == "ESTABLISHED" {
224
			uploadTCPConnections = append(uploadTCPConnections, procTCPConnection)
225
		}
226
	}
227

228
	tcpConnections, err := gopsutilnet.Connections("tcp")
229
	if err != nil {
230
		return nil, err
231
	}
232

233
	disk, err := disk.Usage(a.config.Storage.DataPath)
234
	if err != nil {
235
		return nil, err
236
	}
237

238
	return &schedulerv1.AnnounceHostRequest{
239
		Id:                a.hostID,
240
		Type:              hostType,
241
		Hostname:          a.config.Host.Hostname,
242
		Ip:                a.config.Host.AdvertiseIP.String(),
243
		Port:              a.daemonPort,
244
		DownloadPort:      a.daemonDownloadPort,
245
		ObjectStoragePort: objectStoragePort,
246
		Os:                h.OS,
247
		Platform:          h.Platform,
248
		PlatformFamily:    h.PlatformFamily,
249
		PlatformVersion:   h.PlatformVersion,
250
		KernelVersion:     h.KernelVersion,
251
		Cpu: &schedulerv1.CPU{
252
			LogicalCount:   uint32(cpuLogicalCount),
253
			PhysicalCount:  uint32(cpuPhysicalCount),
254
			Percent:        cpuPercent[0],
255
			ProcessPercent: procCPUPercent,
256
			Times: &schedulerv1.CPUTimes{
257
				User:      cpuTimes[0].User,
258
				System:    cpuTimes[0].System,
259
				Idle:      cpuTimes[0].Idle,
260
				Nice:      cpuTimes[0].Nice,
261
				Iowait:    cpuTimes[0].Iowait,
262
				Irq:       cpuTimes[0].Irq,
263
				Softirq:   cpuTimes[0].Softirq,
264
				Steal:     cpuTimes[0].Steal,
265
				Guest:     cpuTimes[0].Guest,
266
				GuestNice: cpuTimes[0].GuestNice,
267
			},
268
		},
269
		Memory: &schedulerv1.Memory{
270
			Total:              virtualMemory.Total,
271
			Available:          virtualMemory.Available,
272
			Used:               virtualMemory.Used,
273
			UsedPercent:        virtualMemory.UsedPercent,
274
			ProcessUsedPercent: float64(procMemoryPercent),
275
			Free:               virtualMemory.Free,
276
		},
277
		Network: &schedulerv1.Network{
278
			TcpConnectionCount:       uint32(len(tcpConnections)),
279
			UploadTcpConnectionCount: uint32(len(uploadTCPConnections)),
280
			Location:                 a.config.Host.Location,
281
			Idc:                      a.config.Host.IDC,
282
		},
283
		Disk: &schedulerv1.Disk{
284
			Total:             disk.Total,
285
			Free:              disk.Free,
286
			Used:              disk.Used,
287
			UsedPercent:       disk.UsedPercent,
288
			InodesTotal:       disk.InodesTotal,
289
			InodesUsed:        disk.InodesUsed,
290
			InodesFree:        disk.InodesFree,
291
			InodesUsedPercent: disk.InodesUsedPercent,
292
		},
293
		Build: &schedulerv1.Build{
294
			GitVersion: version.GitVersion,
295
			GitCommit:  version.GitCommit,
296
			GoVersion:  version.GoVersion,
297
			Platform:   version.Platform,
298
		},
299
		SchedulerClusterId: a.dynconfig.GetSchedulerClusterID(),
300
	}, nil
301
}
302

303
// announceSeedPeer announces peer information to manager.
304
func (a *announcer) announceToManager() error {
305
	// Accounce seed peer information to manager.
306
	if a.config.Scheduler.Manager.SeedPeer.Enable {
307
		var objectStoragePort int32
308
		if a.config.ObjectStorage.Enable {
309
			objectStoragePort = int32(a.config.ObjectStorage.TCPListen.PortRange.Start)
310
		}
311

312
		if _, err := a.managerClient.UpdateSeedPeer(context.Background(), &managerv1.UpdateSeedPeerRequest{
313
			SourceType:        managerv1.SourceType_SEED_PEER_SOURCE,
314
			Hostname:          a.config.Host.Hostname,
315
			Type:              a.config.Scheduler.Manager.SeedPeer.Type,
316
			Idc:               a.config.Host.IDC,
317
			Location:          a.config.Host.Location,
318
			Ip:                a.config.Host.AdvertiseIP.String(),
319
			Port:              a.daemonPort,
320
			DownloadPort:      a.daemonDownloadPort,
321
			ObjectStoragePort: objectStoragePort,
322
			SeedPeerClusterId: uint64(a.config.Scheduler.Manager.SeedPeer.ClusterID),
323
		}); err != nil {
324
			return err
325
		}
326

327
		// Start keepalive to manager.
328
		go a.managerClient.KeepAlive(a.config.Scheduler.Manager.SeedPeer.KeepAlive.Interval, &managerv1.KeepAliveRequest{
329
			SourceType: managerv1.SourceType_SEED_PEER_SOURCE,
330
			Hostname:   a.config.Host.Hostname,
331
			Ip:         a.config.Host.AdvertiseIP.String(),
332
			ClusterId:  uint64(a.config.Scheduler.Manager.SeedPeer.ClusterID),
333
		}, a.done)
334
	}
335

336
	return nil
337
}
338

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.