Dragonfly2
346 строк · 8.8 Кб
1/*
2* Copyright 2022 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package config18
19import (20"context"21"errors"22"fmt"23"net"24"os"25"path/filepath"26"reflect"27"sync"28"time"29
30"google.golang.org/grpc"31"google.golang.org/grpc/credentials"32"google.golang.org/grpc/credentials/insecure"33"google.golang.org/grpc/resolver"34
35managerv1 "d7y.io/api/v2/pkg/apis/manager/v1"36
37logger "d7y.io/dragonfly/v2/internal/dflog"38internaldynconfig "d7y.io/dragonfly/v2/internal/dynconfig"39"d7y.io/dragonfly/v2/manager/searcher"40"d7y.io/dragonfly/v2/pkg/net/ip"41healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client"42managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"43"d7y.io/dragonfly/v2/version"44)
45
46// Daemon cache file name.
47var cacheFileName = "daemon"48
49type dynconfigManager struct {50config *DaemonOption51internaldynconfig.Dynconfig[DynconfigData]52observers map[Observer]struct{}53done chan struct{}54cachePath string55transportCredentials credentials.TransportCredentials56schedulerClusterID uint6457mu sync.Mutex58}
59
60// newDynconfigManager returns a new manager dynconfig instence.
61func newDynconfigManager(cfg *DaemonOption, rawManagerClient managerclient.V1, cacheDir string, creds credentials.TransportCredentials) (Dynconfig, error) {62cachePath := filepath.Join(cacheDir, cacheFileName)63d, err := internaldynconfig.New[DynconfigData](64newManagerClient(rawManagerClient, cfg),65cachePath,66cfg.Scheduler.Manager.RefreshInterval,67)68if err != nil {69return nil, err70}71
72return &dynconfigManager{73config: cfg,74observers: map[Observer]struct{}{},75done: make(chan struct{}),76cachePath: cachePath,77Dynconfig: d,78transportCredentials: creds,79mu: sync.Mutex{},80}, nil81}
82
83// Get the dynamic seed peers config.
84func (d *dynconfigManager) GetSeedPeers() ([]*managerv1.SeedPeer, error) {85data, err := d.Get()86if err != nil {87return nil, err88}89
90if len(data.SeedPeers) == 0 {91return nil, errors.New("seed peers not found")92}93
94return data.SeedPeers, nil95}
96
97// Get the dynamic schedulers config from manager.
98func (d *dynconfigManager) GetResolveSchedulerAddrs() ([]resolver.Address, error) {99schedulers, err := d.GetSchedulers()100if err != nil {101return nil, err102}103
104var (105addrs = map[string]bool{}106resolveAddrs []resolver.Address107schedulerClusterID uint64108)109for _, scheduler := range schedulers {110// Check whether scheduler is in the same cluster.111if schedulerClusterID != 0 && schedulerClusterID != scheduler.SchedulerClusterId {112continue113}114
115dialOptions := []grpc.DialOption{}116if d.transportCredentials != nil {117dialOptions = append(dialOptions, grpc.WithTransportCredentials(d.transportCredentials))118} else {119dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))120}121
122var addr string123if ip, ok := ip.FormatIP(scheduler.GetIp()); ok {124// Check health with ip address.125target := fmt.Sprintf("%s:%d", ip, scheduler.GetPort())126if err := healthclient.Check(context.Background(), target, dialOptions...); err != nil {127logger.Warnf("scheduler ip address %s is unreachable: %s", target, err.Error())128
129// Check health with host address.130target = fmt.Sprintf("%s:%d", scheduler.GetHostname(), scheduler.GetPort())131if err := healthclient.Check(context.Background(), target, dialOptions...); err != nil {132logger.Warnf("scheduler host address %s is unreachable: %s", target, err.Error())133} else {134addr = target135}136} else {137addr = target138}139}140
141if addr == "" {142logger.Warnf("scheduler %s %s %d has not reachable addresses",143scheduler.GetIp(), scheduler.GetHostname(), scheduler.GetPort())144continue145}146
147if addrs[addr] {148continue149}150
151host, _, err := net.SplitHostPort(addr)152if err != nil {153continue154}155
156schedulerClusterID = scheduler.SchedulerClusterId157resolveAddrs = append(resolveAddrs, resolver.Address{158ServerName: host,159Addr: addr,160})161addrs[addr] = true162}163
164if len(resolveAddrs) == 0 {165return nil, errors.New("can not found available scheduler addresses")166}167
168d.schedulerClusterID = schedulerClusterID169return resolveAddrs, nil170}
171
172// Get the dynamic schedulers resolve addrs.
173func (d *dynconfigManager) GetSchedulers() ([]*managerv1.Scheduler, error) {174data, err := d.Get()175if err != nil {176return nil, err177}178
179if len(data.Schedulers) == 0 {180return nil, errors.New("schedulers not found")181}182
183return data.Schedulers, nil184}
185
186// Get the dynamic schedulers cluster id.
187func (d *dynconfigManager) GetSchedulerClusterID() uint64 {188return d.schedulerClusterID189}
190
191// Get the dynamic object storage config from manager.
192func (d *dynconfigManager) GetObjectStorage() (*managerv1.ObjectStorage, error) {193data, err := d.Get()194if err != nil {195return nil, err196}197
198if data.ObjectStorage == nil {199return nil, errors.New("invalid object storage")200}201
202return data.ObjectStorage, nil203}
204
205// Refresh refreshes dynconfig in cache.
206func (d *dynconfigManager) Refresh() error {207// If another load is in progress, return directly.208if !d.mu.TryLock() {209return nil210}211defer d.mu.Unlock()212
213if err := d.Dynconfig.Refresh(); err != nil {214return err215}216
217if err := d.Notify(); err != nil {218return err219}220
221return nil222}
223
224// Register allows an instance to register itself to listen/observe events.
225func (d *dynconfigManager) Register(l Observer) {226d.observers[l] = struct{}{}227}
228
229// Deregister allows an instance to remove itself from the collection of observers/listeners.
230func (d *dynconfigManager) Deregister(l Observer) {231delete(d.observers, l)232}
233
234// Notify publishes new events to listeners.
235func (d *dynconfigManager) Notify() error {236data, err := d.Get()237if err != nil {238return err239}240
241for o := range d.observers {242o.OnNotify(data)243}244
245return nil246}
247
248// OnNotify allows an event to be published to the dynconfig.
249// Used for listening changes of the local configuration.
250func (d *dynconfigManager) OnNotify(cfg *DaemonOption) {251if reflect.DeepEqual(d.config, cfg) {252return253}254
255d.config = cfg256}
257
258// Serve the dynconfig listening service.
259func (d *dynconfigManager) Serve() error {260if err := d.Notify(); err != nil {261return err262}263
264tick := time.NewTicker(watchInterval)265for {266select {267case <-tick.C:268if err := d.Notify(); err != nil {269logger.Error("dynconfig notify failed", err)270}271case <-d.done:272return nil273}274}275}
276
277// Stop the dynconfig listening service.
278func (d *dynconfigManager) Stop() error {279close(d.done)280if err := os.Remove(d.cachePath); err != nil {281return err282}283
284return nil285}
286
287type managerClient struct {288managerClient managerclient.V1289config *DaemonOption290}
291
292// New the manager client used by dynconfig.
293func newManagerClient(client managerclient.V1, cfg *DaemonOption) internaldynconfig.ManagerClient {294return &managerClient{295managerClient: client,296config: cfg,297}298}
299
300func (mc *managerClient) Get() (any, error) {301data := DynconfigData{}302
303listSchedulersResp, err := mc.managerClient.ListSchedulers(context.Background(), &managerv1.ListSchedulersRequest{304SourceType: managerv1.SourceType_PEER_SOURCE,305Hostname: mc.config.Host.Hostname,306Ip: mc.config.Host.AdvertiseIP.String(),307Version: version.GitVersion,308Commit: version.GitCommit,309HostInfo: map[string]string{310searcher.ConditionIDC: mc.config.Host.IDC,311searcher.ConditionLocation: mc.config.Host.Location,312},313})314if err != nil {315return nil, err316}317data.Schedulers = listSchedulersResp.Schedulers318
319if mc.config.Scheduler.Manager.SeedPeer.Enable {320listSeedPeersResp, err := mc.managerClient.ListSeedPeers(context.Background(), &managerv1.ListSeedPeersRequest{321SourceType: managerv1.SourceType_PEER_SOURCE,322Hostname: mc.config.Host.Hostname,323Ip: mc.config.Host.AdvertiseIP.String(),324})325if err != nil {326logger.Warnf("list seed peers failed: %s", err.Error())327} else {328data.SeedPeers = listSeedPeersResp.SeedPeers329}330}331
332if mc.config.ObjectStorage.Enable {333getObjectStorageResp, err := mc.managerClient.GetObjectStorage(context.Background(), &managerv1.GetObjectStorageRequest{334SourceType: managerv1.SourceType_PEER_SOURCE,335Hostname: mc.config.Host.Hostname,336Ip: mc.config.Host.AdvertiseIP.String(),337})338if err != nil {339return nil, err340}341
342data.ObjectStorage = getObjectStorageResp343}344
345return data, nil346}
347