Dragonfly2

Форк
0
/
dynconfig_manager.go 
346 строк · 8.8 Кб
1
/*
2
 *     Copyright 2022 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package config
18

19
import (
20
	"context"
21
	"errors"
22
	"fmt"
23
	"net"
24
	"os"
25
	"path/filepath"
26
	"reflect"
27
	"sync"
28
	"time"
29

30
	"google.golang.org/grpc"
31
	"google.golang.org/grpc/credentials"
32
	"google.golang.org/grpc/credentials/insecure"
33
	"google.golang.org/grpc/resolver"
34

35
	managerv1 "d7y.io/api/v2/pkg/apis/manager/v1"
36

37
	logger "d7y.io/dragonfly/v2/internal/dflog"
38
	internaldynconfig "d7y.io/dragonfly/v2/internal/dynconfig"
39
	"d7y.io/dragonfly/v2/manager/searcher"
40
	"d7y.io/dragonfly/v2/pkg/net/ip"
41
	healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client"
42
	managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"
43
	"d7y.io/dragonfly/v2/version"
44
)
45

46
// Daemon cache file name.
47
var cacheFileName = "daemon"
48

49
type dynconfigManager struct {
50
	config *DaemonOption
51
	internaldynconfig.Dynconfig[DynconfigData]
52
	observers            map[Observer]struct{}
53
	done                 chan struct{}
54
	cachePath            string
55
	transportCredentials credentials.TransportCredentials
56
	schedulerClusterID   uint64
57
	mu                   sync.Mutex
58
}
59

60
// newDynconfigManager returns a new manager dynconfig instence.
61
func newDynconfigManager(cfg *DaemonOption, rawManagerClient managerclient.V1, cacheDir string, creds credentials.TransportCredentials) (Dynconfig, error) {
62
	cachePath := filepath.Join(cacheDir, cacheFileName)
63
	d, err := internaldynconfig.New[DynconfigData](
64
		newManagerClient(rawManagerClient, cfg),
65
		cachePath,
66
		cfg.Scheduler.Manager.RefreshInterval,
67
	)
68
	if err != nil {
69
		return nil, err
70
	}
71

72
	return &dynconfigManager{
73
		config:               cfg,
74
		observers:            map[Observer]struct{}{},
75
		done:                 make(chan struct{}),
76
		cachePath:            cachePath,
77
		Dynconfig:            d,
78
		transportCredentials: creds,
79
		mu:                   sync.Mutex{},
80
	}, nil
81
}
82

83
// Get the dynamic seed peers config.
84
func (d *dynconfigManager) GetSeedPeers() ([]*managerv1.SeedPeer, error) {
85
	data, err := d.Get()
86
	if err != nil {
87
		return nil, err
88
	}
89

90
	if len(data.SeedPeers) == 0 {
91
		return nil, errors.New("seed peers not found")
92
	}
93

94
	return data.SeedPeers, nil
95
}
96

97
// Get the dynamic schedulers config from manager.
98
func (d *dynconfigManager) GetResolveSchedulerAddrs() ([]resolver.Address, error) {
99
	schedulers, err := d.GetSchedulers()
100
	if err != nil {
101
		return nil, err
102
	}
103

104
	var (
105
		addrs              = map[string]bool{}
106
		resolveAddrs       []resolver.Address
107
		schedulerClusterID uint64
108
	)
109
	for _, scheduler := range schedulers {
110
		// Check whether scheduler is in the same cluster.
111
		if schedulerClusterID != 0 && schedulerClusterID != scheduler.SchedulerClusterId {
112
			continue
113
		}
114

115
		dialOptions := []grpc.DialOption{}
116
		if d.transportCredentials != nil {
117
			dialOptions = append(dialOptions, grpc.WithTransportCredentials(d.transportCredentials))
118
		} else {
119
			dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
120
		}
121

122
		var addr string
123
		if ip, ok := ip.FormatIP(scheduler.GetIp()); ok {
124
			// Check health with ip address.
125
			target := fmt.Sprintf("%s:%d", ip, scheduler.GetPort())
126
			if err := healthclient.Check(context.Background(), target, dialOptions...); err != nil {
127
				logger.Warnf("scheduler ip address %s is unreachable: %s", target, err.Error())
128

129
				// Check health with host address.
130
				target = fmt.Sprintf("%s:%d", scheduler.GetHostname(), scheduler.GetPort())
131
				if err := healthclient.Check(context.Background(), target, dialOptions...); err != nil {
132
					logger.Warnf("scheduler host address %s is unreachable: %s", target, err.Error())
133
				} else {
134
					addr = target
135
				}
136
			} else {
137
				addr = target
138
			}
139
		}
140

141
		if addr == "" {
142
			logger.Warnf("scheduler %s %s %d has not reachable addresses",
143
				scheduler.GetIp(), scheduler.GetHostname(), scheduler.GetPort())
144
			continue
145
		}
146

147
		if addrs[addr] {
148
			continue
149
		}
150

151
		host, _, err := net.SplitHostPort(addr)
152
		if err != nil {
153
			continue
154
		}
155

156
		schedulerClusterID = scheduler.SchedulerClusterId
157
		resolveAddrs = append(resolveAddrs, resolver.Address{
158
			ServerName: host,
159
			Addr:       addr,
160
		})
161
		addrs[addr] = true
162
	}
163

164
	if len(resolveAddrs) == 0 {
165
		return nil, errors.New("can not found available scheduler addresses")
166
	}
167

168
	d.schedulerClusterID = schedulerClusterID
169
	return resolveAddrs, nil
170
}
171

172
// Get the dynamic schedulers resolve addrs.
173
func (d *dynconfigManager) GetSchedulers() ([]*managerv1.Scheduler, error) {
174
	data, err := d.Get()
175
	if err != nil {
176
		return nil, err
177
	}
178

179
	if len(data.Schedulers) == 0 {
180
		return nil, errors.New("schedulers not found")
181
	}
182

183
	return data.Schedulers, nil
184
}
185

186
// Get the dynamic schedulers cluster id.
187
func (d *dynconfigManager) GetSchedulerClusterID() uint64 {
188
	return d.schedulerClusterID
189
}
190

191
// Get the dynamic object storage config from manager.
192
func (d *dynconfigManager) GetObjectStorage() (*managerv1.ObjectStorage, error) {
193
	data, err := d.Get()
194
	if err != nil {
195
		return nil, err
196
	}
197

198
	if data.ObjectStorage == nil {
199
		return nil, errors.New("invalid object storage")
200
	}
201

202
	return data.ObjectStorage, nil
203
}
204

205
// Refresh refreshes dynconfig in cache.
206
func (d *dynconfigManager) Refresh() error {
207
	// If another load is in progress, return directly.
208
	if !d.mu.TryLock() {
209
		return nil
210
	}
211
	defer d.mu.Unlock()
212

213
	if err := d.Dynconfig.Refresh(); err != nil {
214
		return err
215
	}
216

217
	if err := d.Notify(); err != nil {
218
		return err
219
	}
220

221
	return nil
222
}
223

224
// Register allows an instance to register itself to listen/observe events.
225
func (d *dynconfigManager) Register(l Observer) {
226
	d.observers[l] = struct{}{}
227
}
228

229
// Deregister allows an instance to remove itself from the collection of observers/listeners.
230
func (d *dynconfigManager) Deregister(l Observer) {
231
	delete(d.observers, l)
232
}
233

234
// Notify publishes new events to listeners.
235
func (d *dynconfigManager) Notify() error {
236
	data, err := d.Get()
237
	if err != nil {
238
		return err
239
	}
240

241
	for o := range d.observers {
242
		o.OnNotify(data)
243
	}
244

245
	return nil
246
}
247

248
// OnNotify allows an event to be published to the dynconfig.
249
// Used for listening changes of the local configuration.
250
func (d *dynconfigManager) OnNotify(cfg *DaemonOption) {
251
	if reflect.DeepEqual(d.config, cfg) {
252
		return
253
	}
254

255
	d.config = cfg
256
}
257

258
// Serve the dynconfig listening service.
259
func (d *dynconfigManager) Serve() error {
260
	if err := d.Notify(); err != nil {
261
		return err
262
	}
263

264
	tick := time.NewTicker(watchInterval)
265
	for {
266
		select {
267
		case <-tick.C:
268
			if err := d.Notify(); err != nil {
269
				logger.Error("dynconfig notify failed", err)
270
			}
271
		case <-d.done:
272
			return nil
273
		}
274
	}
275
}
276

277
// Stop the dynconfig listening service.
278
func (d *dynconfigManager) Stop() error {
279
	close(d.done)
280
	if err := os.Remove(d.cachePath); err != nil {
281
		return err
282
	}
283

284
	return nil
285
}
286

287
type managerClient struct {
288
	managerClient managerclient.V1
289
	config        *DaemonOption
290
}
291

292
// New the manager client used by dynconfig.
293
func newManagerClient(client managerclient.V1, cfg *DaemonOption) internaldynconfig.ManagerClient {
294
	return &managerClient{
295
		managerClient: client,
296
		config:        cfg,
297
	}
298
}
299

300
func (mc *managerClient) Get() (any, error) {
301
	data := DynconfigData{}
302

303
	listSchedulersResp, err := mc.managerClient.ListSchedulers(context.Background(), &managerv1.ListSchedulersRequest{
304
		SourceType: managerv1.SourceType_PEER_SOURCE,
305
		Hostname:   mc.config.Host.Hostname,
306
		Ip:         mc.config.Host.AdvertiseIP.String(),
307
		Version:    version.GitVersion,
308
		Commit:     version.GitCommit,
309
		HostInfo: map[string]string{
310
			searcher.ConditionIDC:      mc.config.Host.IDC,
311
			searcher.ConditionLocation: mc.config.Host.Location,
312
		},
313
	})
314
	if err != nil {
315
		return nil, err
316
	}
317
	data.Schedulers = listSchedulersResp.Schedulers
318

319
	if mc.config.Scheduler.Manager.SeedPeer.Enable {
320
		listSeedPeersResp, err := mc.managerClient.ListSeedPeers(context.Background(), &managerv1.ListSeedPeersRequest{
321
			SourceType: managerv1.SourceType_PEER_SOURCE,
322
			Hostname:   mc.config.Host.Hostname,
323
			Ip:         mc.config.Host.AdvertiseIP.String(),
324
		})
325
		if err != nil {
326
			logger.Warnf("list seed peers failed: %s", err.Error())
327
		} else {
328
			data.SeedPeers = listSeedPeersResp.SeedPeers
329
		}
330
	}
331

332
	if mc.config.ObjectStorage.Enable {
333
		getObjectStorageResp, err := mc.managerClient.GetObjectStorage(context.Background(), &managerv1.GetObjectStorageRequest{
334
			SourceType: managerv1.SourceType_PEER_SOURCE,
335
			Hostname:   mc.config.Host.Hostname,
336
			Ip:         mc.config.Host.AdvertiseIP.String(),
337
		})
338
		if err != nil {
339
			return nil, err
340
		}
341

342
		data.ObjectStorage = getObjectStorageResp
343
	}
344

345
	return data, nil
346
}
347

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.