Dragonfly2
337 строк · 9.3 Кб
1/*
2* Copyright 2022 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17//go:generate mockgen -destination mocks/announcer_mock.go -source announcer.go -package mocks
18
19package announcer20
21import (22"context"23"os"24"time"25
26"github.com/shirou/gopsutil/v3/cpu"27"github.com/shirou/gopsutil/v3/disk"28"github.com/shirou/gopsutil/v3/host"29"github.com/shirou/gopsutil/v3/mem"30gopsutilnet "github.com/shirou/gopsutil/v3/net"31"github.com/shirou/gopsutil/v3/process"32
33managerv1 "d7y.io/api/v2/pkg/apis/manager/v1"34schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"35
36"d7y.io/dragonfly/v2/client/config"37logger "d7y.io/dragonfly/v2/internal/dflog"38managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"39schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"40"d7y.io/dragonfly/v2/pkg/types"41"d7y.io/dragonfly/v2/version"42)
43
44// Announcer is the interface used for announce service.
45type Announcer interface {46// Started announcer server.47Serve() error48
49// Stop announcer server.50Stop() error51}
52
53// announcer provides announce function.
54type announcer struct {55config *config.DaemonOption56dynconfig config.Dynconfig57hostID string58daemonPort int3259daemonDownloadPort int3260daemonObjectStoragePort int3261schedulerClient schedulerclient.V162managerClient managerclient.V163done chan struct{}64}
65
66// Option is a functional option for configuring the announcer.
67type Option func(s *announcer)68
69// WithManagerClient sets the grpc client of manager.
70func WithManagerClient(client managerclient.V1) Option {71return func(a *announcer) {72a.managerClient = client73}74}
75
76// WithObjectStoragePort sets the daemonObjectStoragePort.
77func WithObjectStoragePort(port int32) Option {78return func(a *announcer) {79a.daemonObjectStoragePort = port80}81}
82
83// New returns a new Announcer interface.
84func New(cfg *config.DaemonOption, dynconfig config.Dynconfig, hostID string, daemonPort int32, daemonDownloadPort int32, schedulerClient schedulerclient.V1, options ...Option) Announcer {85a := &announcer{86config: cfg,87dynconfig: dynconfig,88hostID: hostID,89daemonPort: daemonPort,90daemonDownloadPort: daemonDownloadPort,91schedulerClient: schedulerClient,92done: make(chan struct{}),93}94
95for _, opt := range options {96opt(a)97}98
99return a100}
101
102// Started announcer server.
103func (a *announcer) Serve() error {104if a.managerClient != nil {105logger.Info("announce seed peer to manager")106if err := a.announceToManager(); err != nil {107return err108}109}110
111logger.Info("announce peer to scheduler")112if err := a.announceToScheduler(); err != nil {113return err114}115
116return nil117}
118
119// Stop announcer server.
120func (a *announcer) Stop() error {121close(a.done)122return nil123}
124
125// announceToScheduler announces peer information to scheduler.
126func (a *announcer) announceToScheduler() error {127req, err := a.newAnnounceHostRequest()128if err != nil {129return err130}131
132if err := a.schedulerClient.AnnounceHost(context.Background(), req); err != nil {133logger.Errorf("announce for the first time failed: %s", err.Error())134}135
136// Announce to scheduler.137tick := time.NewTicker(a.config.Announcer.SchedulerInterval)138for {139select {140case <-tick.C:141req, err := a.newAnnounceHostRequest()142if err != nil {143logger.Error(err)144break145}146
147if err := a.schedulerClient.AnnounceHost(context.Background(), req); err != nil {148logger.Error(err)149break150}151case <-a.done:152return nil153}154}155}
156
157// newAnnounceHostRequest returns announce host request.
158func (a *announcer) newAnnounceHostRequest() (*schedulerv1.AnnounceHostRequest, error) {159hostType := types.HostTypeNormalName160if a.config.Scheduler.Manager.SeedPeer.Enable {161hostType = types.HostTypeSuperSeedName162}163
164var objectStoragePort int32165if a.config.ObjectStorage.Enable {166objectStoragePort = a.daemonObjectStoragePort167}168
169pid := os.Getpid()170
171h, err := host.Info()172if err != nil {173return nil, err174}175
176proc, err := process.NewProcess(int32(pid))177if err != nil {178return nil, err179}180
181procCPUPercent, err := proc.CPUPercent()182if err != nil {183return nil, err184}185
186cpuPercent, err := cpu.Percent(0, false)187if err != nil {188return nil, err189}190
191cpuLogicalCount, err := cpu.Counts(true)192if err != nil {193return nil, err194}195
196cpuPhysicalCount, err := cpu.Counts(false)197if err != nil {198return nil, err199}200
201cpuTimes, err := cpu.Times(false)202if err != nil {203return nil, err204}205
206procMemoryPercent, err := proc.MemoryPercent()207if err != nil {208return nil, err209}210
211virtualMemory, err := mem.VirtualMemory()212if err != nil {213return nil, err214}215
216procTCPConnections, err := gopsutilnet.ConnectionsPid("tcp", int32(pid))217if err != nil {218return nil, err219}220
221var uploadTCPConnections []gopsutilnet.ConnectionStat222for _, procTCPConnection := range procTCPConnections {223if procTCPConnection.Laddr.Port == uint32(a.daemonDownloadPort) && procTCPConnection.Status == "ESTABLISHED" {224uploadTCPConnections = append(uploadTCPConnections, procTCPConnection)225}226}227
228tcpConnections, err := gopsutilnet.Connections("tcp")229if err != nil {230return nil, err231}232
233disk, err := disk.Usage(a.config.Storage.DataPath)234if err != nil {235return nil, err236}237
238return &schedulerv1.AnnounceHostRequest{239Id: a.hostID,240Type: hostType,241Hostname: a.config.Host.Hostname,242Ip: a.config.Host.AdvertiseIP.String(),243Port: a.daemonPort,244DownloadPort: a.daemonDownloadPort,245ObjectStoragePort: objectStoragePort,246Os: h.OS,247Platform: h.Platform,248PlatformFamily: h.PlatformFamily,249PlatformVersion: h.PlatformVersion,250KernelVersion: h.KernelVersion,251Cpu: &schedulerv1.CPU{252LogicalCount: uint32(cpuLogicalCount),253PhysicalCount: uint32(cpuPhysicalCount),254Percent: cpuPercent[0],255ProcessPercent: procCPUPercent,256Times: &schedulerv1.CPUTimes{257User: cpuTimes[0].User,258System: cpuTimes[0].System,259Idle: cpuTimes[0].Idle,260Nice: cpuTimes[0].Nice,261Iowait: cpuTimes[0].Iowait,262Irq: cpuTimes[0].Irq,263Softirq: cpuTimes[0].Softirq,264Steal: cpuTimes[0].Steal,265Guest: cpuTimes[0].Guest,266GuestNice: cpuTimes[0].GuestNice,267},268},269Memory: &schedulerv1.Memory{270Total: virtualMemory.Total,271Available: virtualMemory.Available,272Used: virtualMemory.Used,273UsedPercent: virtualMemory.UsedPercent,274ProcessUsedPercent: float64(procMemoryPercent),275Free: virtualMemory.Free,276},277Network: &schedulerv1.Network{278TcpConnectionCount: uint32(len(tcpConnections)),279UploadTcpConnectionCount: uint32(len(uploadTCPConnections)),280Location: a.config.Host.Location,281Idc: a.config.Host.IDC,282},283Disk: &schedulerv1.Disk{284Total: disk.Total,285Free: disk.Free,286Used: disk.Used,287UsedPercent: disk.UsedPercent,288InodesTotal: disk.InodesTotal,289InodesUsed: disk.InodesUsed,290InodesFree: disk.InodesFree,291InodesUsedPercent: disk.InodesUsedPercent,292},293Build: &schedulerv1.Build{294GitVersion: version.GitVersion,295GitCommit: version.GitCommit,296GoVersion: version.GoVersion,297Platform: version.Platform,298},299SchedulerClusterId: a.dynconfig.GetSchedulerClusterID(),300}, nil301}
302
303// announceSeedPeer announces peer information to manager.
304func (a *announcer) announceToManager() error {305// Accounce seed peer information to manager.306if a.config.Scheduler.Manager.SeedPeer.Enable {307var objectStoragePort int32308if a.config.ObjectStorage.Enable {309objectStoragePort = int32(a.config.ObjectStorage.TCPListen.PortRange.Start)310}311
312if _, err := a.managerClient.UpdateSeedPeer(context.Background(), &managerv1.UpdateSeedPeerRequest{313SourceType: managerv1.SourceType_SEED_PEER_SOURCE,314Hostname: a.config.Host.Hostname,315Type: a.config.Scheduler.Manager.SeedPeer.Type,316Idc: a.config.Host.IDC,317Location: a.config.Host.Location,318Ip: a.config.Host.AdvertiseIP.String(),319Port: a.daemonPort,320DownloadPort: a.daemonDownloadPort,321ObjectStoragePort: objectStoragePort,322SeedPeerClusterId: uint64(a.config.Scheduler.Manager.SeedPeer.ClusterID),323}); err != nil {324return err325}326
327// Start keepalive to manager.328go a.managerClient.KeepAlive(a.config.Scheduler.Manager.SeedPeer.KeepAlive.Interval, &managerv1.KeepAliveRequest{329SourceType: managerv1.SourceType_SEED_PEER_SOURCE,330Hostname: a.config.Host.Hostname,331Ip: a.config.Host.AdvertiseIP.String(),332ClusterId: uint64(a.config.Scheduler.Manager.SeedPeer.ClusterID),333}, a.done)334}335
336return nil337}
338