cubefs

Форк
0
/
wrapper.go 
630 строк · 18.7 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package wrapper
16

17
import (
18
	"fmt"
19
	syslog "log"
20
	"math"
21
	"net"
22
	"strings"
23
	"sync"
24
	"time"
25

26
	"github.com/cubefs/cubefs/proto"
27
	masterSDK "github.com/cubefs/cubefs/sdk/master"
28
	"github.com/cubefs/cubefs/util"
29
	"github.com/cubefs/cubefs/util/errors"
30
	"github.com/cubefs/cubefs/util/iputil"
31
	"github.com/cubefs/cubefs/util/log"
32
	"github.com/cubefs/cubefs/util/ump"
33
)
34

35
var (
36
	LocalIP                             string
37
	DefaultMinWriteAbleDataPartitionCnt = 10
38
)
39

40
type DataPartitionView struct {
41
	DataPartitions []*DataPartition
42
}
43

44
type SimpleClientInfo interface {
45
	GetFlowInfo() (*proto.ClientReportLimitInfo, bool)
46
	UpdateFlowInfo(limit *proto.LimitRsp2Client)
47
	SetClientID(id uint64) error
48
	UpdateLatestVer(verList *proto.VolVersionInfoList) error
49
	GetReadVer() uint64
50
	GetLatestVer() uint64
51
	GetVerMgr() *proto.VolVersionInfoList
52
}
53

54
// Wrapper TODO rename. This name does not reflect what it is doing.
55
type Wrapper struct {
56
	Lock                  sync.RWMutex
57
	clusterName           string
58
	volName               string
59
	volType               int
60
	EnablePosixAcl        bool
61
	masters               []string
62
	partitions            map[uint64]*DataPartition
63
	followerRead          bool
64
	followerReadClientCfg bool
65
	nearRead              bool
66
	dpSelectorChanged     bool
67
	dpSelectorName        string
68
	dpSelectorParm        string
69
	mc                    *masterSDK.MasterClient
70
	stopOnce              sync.Once
71
	stopC                 chan struct{}
72

73
	dpSelector DataPartitionSelector
74

75
	HostsStatus map[string]bool
76
	Uids        map[uint32]*proto.UidSimpleInfo
77
	UidLock     sync.RWMutex
78
	preload     bool
79
	LocalIp     string
80

81
	minWriteAbleDataPartitionCnt int
82
	verConfReadSeq               uint64
83
	verReadSeq                   uint64
84
	SimpleClient                 SimpleClientInfo
85
}
86

87
func (w *Wrapper) GetMasterClient() *masterSDK.MasterClient {
88
	return w.mc
89
}
90

91
// NewDataPartitionWrapper returns a new data partition wrapper.
92
func NewDataPartitionWrapper(client SimpleClientInfo, volName string, masters []string, preload bool, minWriteAbleDataPartitionCnt int, verReadSeq uint64) (w *Wrapper, err error) {
93
	log.LogInfof("action[NewDataPartitionWrapper] verReadSeq %v", verReadSeq)
94

95
	w = new(Wrapper)
96
	w.stopC = make(chan struct{})
97
	w.masters = masters
98
	w.mc = masterSDK.NewMasterClient(masters, false)
99
	w.volName = volName
100
	w.partitions = make(map[uint64]*DataPartition)
101
	w.HostsStatus = make(map[string]bool)
102
	w.preload = preload
103

104
	w.minWriteAbleDataPartitionCnt = minWriteAbleDataPartitionCnt
105
	if w.minWriteAbleDataPartitionCnt < 0 {
106
		w.minWriteAbleDataPartitionCnt = DefaultMinWriteAbleDataPartitionCnt
107
	}
108

109
	if w.LocalIp, err = ump.GetLocalIpAddr(); err != nil {
110
		err = errors.Trace(err, "NewDataPartitionWrapper:")
111
		return
112
	}
113

114
	if err = w.updateClusterInfo(); err != nil {
115
		err = errors.Trace(err, "NewDataPartitionWrapper:")
116
		return
117
	}
118

119
	if err = w.GetSimpleVolView(); err != nil {
120
		err = errors.Trace(err, "NewDataPartitionWrapper:")
121
		return
122
	}
123

124
	w.UploadFlowInfo(client, true)
125

126
	if err = w.initDpSelector(); err != nil {
127
		log.LogErrorf("NewDataPartitionWrapper: init initDpSelector failed, [%v]", err)
128
	}
129
	if err = w.updateDataPartition(true); err != nil {
130
		err = errors.Trace(err, "NewDataPartitionWrapper:")
131
		return
132
	}
133
	if err = w.updateDataNodeStatus(); err != nil {
134
		log.LogErrorf("NewDataPartitionWrapper: init DataNodeStatus failed, [%v]", err)
135
	}
136
	w.verConfReadSeq = verReadSeq
137
	if verReadSeq > 0 {
138
		var verList *proto.VolVersionInfoList
139
		if verList, err = w.mc.AdminAPI().GetVerList(volName); err != nil {
140
			return
141
		}
142
		if verReadSeq, err = w.CheckReadVerSeq(volName, verReadSeq, verList); err != nil {
143
			log.LogErrorf("NewDataPartitionWrapper: init Read with ver [%v] error [%v]", verReadSeq, err)
144
			return
145
		}
146
	}
147
	w.verReadSeq = verReadSeq
148
	w.SimpleClient = client
149
	go w.uploadFlowInfoByTick(client)
150
	go w.update(client)
151
	return
152
}
153

154
func (w *Wrapper) Stop() {
155
	w.stopOnce.Do(func() {
156
		close(w.stopC)
157
	})
158
}
159

160
func (w *Wrapper) InitFollowerRead(clientConfig bool) {
161
	w.followerReadClientCfg = clientConfig
162
	w.followerRead = w.followerReadClientCfg || w.followerRead
163
}
164

165
func (w *Wrapper) FollowerRead() bool {
166
	return w.followerRead
167
}
168

169
func (w *Wrapper) tryGetPartition(index uint64) (partition *DataPartition, ok bool) {
170
	w.Lock.RLock()
171
	defer w.Lock.RUnlock()
172
	partition, ok = w.partitions[index]
173
	return
174
}
175

176
func (w *Wrapper) updateClusterInfo() (err error) {
177
	var info *proto.ClusterInfo
178
	if info, err = w.mc.AdminAPI().GetClusterInfo(); err != nil {
179
		log.LogWarnf("UpdateClusterInfo: get cluster info fail: err(%v)", err)
180
		return
181
	}
182
	log.LogInfof("UpdateClusterInfo: get cluster info: cluster(%v) localIP(%v)", info.Cluster, info.Ip)
183
	w.clusterName = info.Cluster
184
	LocalIP = info.Ip
185
	return
186
}
187

188
func (w *Wrapper) UpdateUidsView(view *proto.SimpleVolView) {
189
	w.UidLock.Lock()
190
	defer w.UidLock.Unlock()
191
	w.Uids = make(map[uint32]*proto.UidSimpleInfo)
192
	for _, uid := range view.Uids {
193
		if !uid.Limited {
194
			continue
195
		}
196
		w.Uids[uid.UID] = &uid
197
	}
198
	log.LogDebugf("uid info be updated to %v", view.Uids)
199
}
200

201
func (w *Wrapper) GetSimpleVolView() (err error) {
202
	var view *proto.SimpleVolView
203

204
	if view, err = w.mc.AdminAPI().GetVolumeSimpleInfo(w.volName); err != nil {
205
		log.LogWarnf("GetSimpleVolView: get volume simple info fail: volume(%v) err(%v)", w.volName, err)
206
		return
207
	}
208

209
	if view.Status == 1 {
210
		log.LogWarnf("GetSimpleVolView: volume has been marked for deletion: volume(%v) status(%v - 0:normal/1:markDelete)",
211
			w.volName, view.Status)
212
		return proto.ErrVolNotExists
213
	}
214

215
	w.followerRead = view.FollowerRead
216
	w.dpSelectorName = view.DpSelectorName
217
	w.dpSelectorParm = view.DpSelectorParm
218
	w.volType = view.VolType
219
	w.EnablePosixAcl = view.EnablePosixAcl
220
	w.UpdateUidsView(view)
221

222
	log.LogDebugf("GetSimpleVolView: get volume simple info: ID(%v) name(%v) owner(%v) status(%v) capacity(%v) "+
223
		"metaReplicas(%v) dataReplicas(%v) mpCnt(%v) dpCnt(%v) followerRead(%v) createTime(%v) dpSelectorName(%v) "+
224
		"dpSelectorParm(%v) uids(%v)",
225
		view.ID, view.Name, view.Owner, view.Status, view.Capacity, view.MpReplicaNum, view.DpReplicaNum, view.MpCnt,
226
		view.DpCnt, view.FollowerRead, view.CreateTime, view.DpSelectorName, view.DpSelectorParm, view.Uids)
227

228
	return
229
}
230

231
func (w *Wrapper) uploadFlowInfoByTick(clientInfo SimpleClientInfo) {
232
	ticker := time.NewTicker(5 * time.Second)
233
	for {
234
		select {
235
		case <-ticker.C:
236
			w.UploadFlowInfo(clientInfo, false)
237
		case <-w.stopC:
238
			return
239
		}
240
	}
241
}
242

243
func (w *Wrapper) update(clientInfo SimpleClientInfo) {
244
	ticker := time.NewTicker(time.Minute)
245
	taskFunc := func() {
246
		w.updateSimpleVolView()
247
		w.updateDataPartition(false)
248
		w.updateDataNodeStatus()
249
		w.CheckPermission()
250
		w.updateVerlist(clientInfo)
251
	}
252
	taskFunc()
253
	for {
254
		select {
255
		case <-ticker.C:
256
			taskFunc()
257
		case <-w.stopC:
258
			return
259
		}
260
	}
261
}
262

263
func (w *Wrapper) UploadFlowInfo(clientInfo SimpleClientInfo, init bool) (err error) {
264
	var limitRsp *proto.LimitRsp2Client
265

266
	flowInfo, isNeedReport := clientInfo.GetFlowInfo()
267
	if !isNeedReport {
268
		log.LogDebugf("action[UploadFlowInfo] no need report!")
269
		return nil
270
	}
271

272
	if limitRsp, err = w.mc.AdminAPI().UploadFlowInfo(w.volName, flowInfo); err != nil {
273
		log.LogWarnf("UpdateSimpleVolView: get volume simple info fail: volume(%v) err(%v)", w.volName, err)
274
		return
275
	}
276

277
	if init {
278
		if limitRsp.ID == 0 {
279
			err = fmt.Errorf("init client get id 0")
280
			log.LogInfof("action[UploadFlowInfo] err %v", err.Error())
281
			return
282
		}
283
		log.LogInfof("action[UploadFlowInfo] get id %v", limitRsp.ID)
284
		clientInfo.SetClientID(limitRsp.ID)
285
	}
286
	clientInfo.UpdateFlowInfo(limitRsp)
287
	return
288
}
289

290
func (w *Wrapper) CheckPermission() {
291
	if info, err := w.mc.UserAPI().AclOperation(w.volName, w.LocalIp, util.AclCheckIP); err != nil {
292
		syslog.Println(err)
293
	} else if !info.OK {
294
		syslog.Println(err)
295
		log.LogFatal("Client Addr not allowed to access CubeFS Cluster!")
296
	}
297
}
298

299
func (w *Wrapper) updateVerlist(client SimpleClientInfo) (err error) {
300
	verList, err := w.mc.AdminAPI().GetVerList(w.volName)
301
	if err != nil {
302
		log.LogErrorf("CheckReadVerSeq: get cluster fail: err(%v)", err)
303
		return err
304
	}
305

306
	if verList == nil {
307
		msg := fmt.Sprintf("get verList nil, vol [%v] reqd seq [%v]", w.volName, w.verReadSeq)
308
		log.LogErrorf("action[CheckReadVerSeq] %v", msg)
309
		return fmt.Errorf("%v", msg)
310
	}
311

312
	if w.verReadSeq > 0 {
313
		if _, err = w.CheckReadVerSeq(w.volName, w.verConfReadSeq, verList); err != nil {
314
			log.LogFatalf("updateSimpleVolView: readSeq abnormal %v", err)
315
		}
316
		return
317
	}
318

319
	log.LogDebugf("updateSimpleVolView.UpdateLatestVer.try update to verlist[%v]", verList)
320
	if err = client.UpdateLatestVer(verList); err != nil {
321
		log.LogWarnf("updateSimpleVolView: UpdateLatestVer ver %v faile err %v", verList.GetLastVer(), err)
322
		return
323
	}
324
	return
325
}
326

327
func (w *Wrapper) updateSimpleVolView() (err error) {
328
	var view *proto.SimpleVolView
329
	if view, err = w.mc.AdminAPI().GetVolumeSimpleInfo(w.volName); err != nil {
330
		log.LogWarnf("updateSimpleVolView: get volume simple info fail: volume(%v) err(%v)", w.volName, err)
331
		return
332
	}
333

334
	w.UpdateUidsView(view)
335

336
	if w.followerRead != view.FollowerRead && !w.followerReadClientCfg {
337
		log.LogDebugf("UpdateSimpleVolView: update followerRead from old(%v) to new(%v)",
338
			w.followerRead, view.FollowerRead)
339
		w.followerRead = view.FollowerRead
340
	}
341

342
	if w.dpSelectorName != view.DpSelectorName || w.dpSelectorParm != view.DpSelectorParm {
343
		log.LogDebugf("UpdateSimpleVolView: update dpSelector from old(%v %v) to new(%v %v)",
344
			w.dpSelectorName, w.dpSelectorParm, view.DpSelectorName, view.DpSelectorParm)
345
		w.Lock.Lock()
346
		w.dpSelectorName = view.DpSelectorName
347
		w.dpSelectorParm = view.DpSelectorParm
348
		w.dpSelectorChanged = true
349
		w.Lock.Unlock()
350
	}
351

352
	return nil
353
}
354

355
func (w *Wrapper) updateDataPartitionByRsp(isInit bool, DataPartitions []*proto.DataPartitionResponse) (err error) {
356
	convert := func(response *proto.DataPartitionResponse) *DataPartition {
357
		return &DataPartition{
358
			DataPartitionResponse: *response,
359
			ClientWrapper:         w,
360
		}
361
	}
362

363
	if proto.IsCold(w.volType) {
364
		w.clearPartitions()
365
	}
366
	rwPartitionGroups := make([]*DataPartition, 0)
367
	for index, partition := range DataPartitions {
368
		if partition == nil {
369
			log.LogErrorf("action[updateDataPartitionByRsp] index [%v] is nil", index)
370
			continue
371
		}
372
		dp := convert(partition)
373
		if w.followerRead && w.nearRead {
374
			dp.NearHosts = w.sortHostsByDistance(dp.Hosts)
375
		}
376
		log.LogInfof("updateDataPartition: dp(%v)", dp)
377
		w.replaceOrInsertPartition(dp)
378
		// do not insert preload dp in cold vol
379
		if proto.IsCold(w.volType) && proto.IsPreLoadDp(dp.PartitionType) {
380
			continue
381
		}
382
		if dp.Status == proto.ReadWrite {
383
			dp.MetricsRefresh()
384
			rwPartitionGroups = append(rwPartitionGroups, dp)
385
			log.LogInfof("updateDataPartition: dp(%v) address(%p) insert to rwPartitionGroups", dp.PartitionID, dp)
386
		}
387
	}
388

389
	// isInit used to identify whether this call is caused by mount action
390
	if isInit || len(rwPartitionGroups) >= w.minWriteAbleDataPartitionCnt || (proto.IsCold(w.volType) && (len(rwPartitionGroups) >= 1)) {
391
		log.LogInfof("updateDataPartition: refresh dpSelector of volume(%v) with %v rw partitions(%v all), isInit(%v), minWriteAbleDataPartitionCnt(%v)",
392
			w.volName, len(rwPartitionGroups), len(DataPartitions), isInit, w.minWriteAbleDataPartitionCnt)
393
		w.refreshDpSelector(rwPartitionGroups)
394
	} else {
395
		err = errors.New("updateDataPartition: no writable data partition")
396
		log.LogWarnf("updateDataPartition: no enough writable data partitions, volume(%v) with %v rw partitions(%v all), isInit(%v), minWriteAbleDataPartitionCnt(%v)",
397
			w.volName, len(rwPartitionGroups), len(DataPartitions), isInit, w.minWriteAbleDataPartitionCnt)
398
	}
399

400
	log.LogInfof("updateDataPartition: finish")
401
	return err
402
}
403

404
func (w *Wrapper) updateDataPartition(isInit bool) (err error) {
405
	if w.preload {
406
		return
407
	}
408
	var dpv *proto.DataPartitionsView
409
	if dpv, err = w.mc.ClientAPI().EncodingGzip().GetDataPartitions(w.volName); err != nil {
410
		log.LogErrorf("updateDataPartition: get data partitions fail: volume(%v) err(%v)", w.volName, err)
411
		return
412
	}
413
	log.LogInfof("updateDataPartition: get data partitions: volume(%v) partitions(%v)", w.volName, len(dpv.DataPartitions))
414
	return w.updateDataPartitionByRsp(isInit, dpv.DataPartitions)
415
}
416

417
func (w *Wrapper) UpdateDataPartition() (err error) {
418
	return w.updateDataPartition(false)
419
}
420

421
// getDataPartitionFromMaster will call master to get data partition info which not include in  cache updated by
422
// updateDataPartition which may not take effect if nginx be placed for reduce the pressure of master
423
func (w *Wrapper) getDataPartitionFromMaster(isInit bool, dpId uint64) (err error) {
424
	var dpInfo *proto.DataPartitionInfo
425
	if dpInfo, err = w.mc.AdminAPI().GetDataPartition(w.volName, dpId); err != nil {
426
		log.LogErrorf("getDataPartitionFromMaster: get data partitions fail: volume(%v) dpId(%v) err(%v)",
427
			w.volName, dpId, err)
428
		return
429
	}
430

431
	log.LogInfof("getDataPartitionFromMaster: get data partitions: volume(%v), dpId(%v)", w.volName, dpId)
432
	var leaderAddr string
433
	for _, replica := range dpInfo.Replicas {
434
		if replica.IsLeader {
435
			leaderAddr = replica.Addr
436
		}
437
	}
438

439
	dpr := new(proto.DataPartitionResponse)
440
	dpr.PartitionID = dpId
441
	dpr.Status = dpInfo.Status
442
	dpr.ReplicaNum = dpInfo.ReplicaNum
443
	dpr.Hosts = make([]string, len(dpInfo.Hosts))
444
	copy(dpr.Hosts, dpInfo.Hosts)
445
	dpr.LeaderAddr = leaderAddr
446
	dpr.IsRecover = dpInfo.IsRecover
447
	dpr.IsDiscard = dpInfo.IsDiscard
448

449
	DataPartitions := make([]*proto.DataPartitionResponse, 1)
450
	DataPartitions = append(DataPartitions, dpr)
451
	return w.updateDataPartitionByRsp(isInit, DataPartitions)
452
}
453

454
func (w *Wrapper) clearPartitions() {
455
	w.Lock.Lock()
456
	defer w.Lock.Unlock()
457
	w.partitions = make(map[uint64]*DataPartition)
458
}
459

460
func (w *Wrapper) AllocatePreLoadDataPartition(volName string, count int, capacity, ttl uint64, zones string) (err error) {
461
	var dpv *proto.DataPartitionsView
462

463
	if dpv, err = w.mc.AdminAPI().CreatePreLoadDataPartition(volName, count, capacity, ttl, zones); err != nil {
464
		log.LogWarnf("CreatePreLoadDataPartition fail: err(%v)", err)
465
		return
466
	}
467
	convert := func(response *proto.DataPartitionResponse) *DataPartition {
468
		return &DataPartition{
469
			DataPartitionResponse: *response,
470
			ClientWrapper:         w,
471
		}
472
	}
473
	rwPartitionGroups := make([]*DataPartition, 0)
474
	for _, partition := range dpv.DataPartitions {
475
		dp := convert(partition)
476
		if proto.IsCold(w.volType) && !proto.IsPreLoadDp(dp.PartitionType) {
477
			continue
478
		}
479
		log.LogInfof("updateDataPartition: dp(%v)", dp)
480
		w.replaceOrInsertPartition(dp)
481
		dp.MetricsRefresh()
482
		rwPartitionGroups = append(rwPartitionGroups, dp)
483
	}
484

485
	w.refreshDpSelector(rwPartitionGroups)
486
	return nil
487
}
488

489
func (w *Wrapper) replaceOrInsertPartition(dp *DataPartition) {
490
	var oldstatus int8
491
	w.Lock.Lock()
492
	old, ok := w.partitions[dp.PartitionID]
493
	if ok {
494
		oldstatus = old.Status
495

496
		old.Status = dp.Status
497
		old.ReplicaNum = dp.ReplicaNum
498
		old.Hosts = dp.Hosts
499
		old.IsDiscard = dp.IsDiscard
500
		old.NearHosts = dp.Hosts
501

502
		dp.Metrics = old.Metrics
503
	} else {
504
		dp.Metrics = NewDataPartitionMetrics()
505
		w.partitions[dp.PartitionID] = dp
506
	}
507

508
	w.Lock.Unlock()
509

510
	if ok && oldstatus != dp.Status {
511
		log.LogInfof("partition:dp[%v] address %p status change (%v) -> (%v)", dp.PartitionID, &old, oldstatus, dp.Status)
512
	}
513
}
514

515
// GetDataPartition returns the data partition based on the given partition ID.
516
func (w *Wrapper) GetDataPartition(partitionID uint64) (*DataPartition, error) {
517
	dp, ok := w.tryGetPartition(partitionID)
518
	if !ok && !proto.IsCold(w.volType) { // cache miss && hot volume
519
		err := w.getDataPartitionFromMaster(false, partitionID)
520
		if err == nil {
521
			dp, ok = w.tryGetPartition(partitionID)
522
			if !ok {
523
				return nil, fmt.Errorf("partition[%v] not exsit", partitionID)
524
			}
525
			return dp, nil
526
		}
527
		return nil, fmt.Errorf("partition[%v] not exsit", partitionID)
528
	}
529
	if !ok {
530
		return nil, fmt.Errorf("partition[%v] not exsit", partitionID)
531
	}
532
	return dp, nil
533
}
534

535
func (w *Wrapper) GetReadVerSeq() uint64 {
536
	return w.verReadSeq
537
}
538

539
func (w *Wrapper) CheckReadVerSeq(volName string, verReadSeq uint64, verList *proto.VolVersionInfoList) (readReadVer uint64, err error) {
540
	w.Lock.RLock()
541
	defer w.Lock.RUnlock()
542

543
	log.LogInfof("action[CheckReadVerSeq] vol [%v] req seq [%v]", volName, verReadSeq)
544

545
	readReadVer = verReadSeq
546
	// Whether it is version 0 or any other version, there may be uncommitted versions between the requested version
547
	// and the next official version. In this case, the data needs to be read.
548
	if verReadSeq == math.MaxUint64 {
549
		verReadSeq = 0
550
	}
551

552
	var (
553
		id     int
554
		ver    *proto.VolVersionInfo
555
		verLen = len(verList.VerList)
556
	)
557
	for id, ver = range verList.VerList {
558
		if id == verLen-1 {
559
			err = fmt.Errorf("action[CheckReadVerSeq] readReadVer %v not found", readReadVer)
560
			break
561
		}
562
		log.LogInfof("action[CheckReadVerSeq] ver %v,%v", ver.Ver, ver.Status)
563
		if ver.Ver == verReadSeq {
564
			if ver.Status != proto.VersionNormal {
565
				err = fmt.Errorf("action[CheckReadVerSeq] status %v not right", ver.Status)
566
				return
567
			}
568
			readReadVer = verList.VerList[id+1].Ver - 1
569
			log.LogInfof("action[CheckReadVerSeq] get read ver %v", readReadVer)
570
			return
571
		}
572
	}
573

574
	err = fmt.Errorf("not found read ver %v", verReadSeq)
575
	return
576
}
577

578
// WarningMsg returns the warning message that contains the cluster name.
579
func (w *Wrapper) WarningMsg() string {
580
	return fmt.Sprintf("%s_client_warning", w.clusterName)
581
}
582

583
func (w *Wrapper) updateDataNodeStatus() (err error) {
584
	var cv *proto.ClusterView
585
	cv, err = w.mc.AdminAPI().GetCluster()
586
	if err != nil {
587
		log.LogErrorf("updateDataNodeStatus: get cluster fail: err(%v)", err)
588
		return
589
	}
590

591
	newHostsStatus := make(map[string]bool)
592
	for _, node := range cv.DataNodes {
593
		newHostsStatus[node.Addr] = node.IsActive
594
	}
595
	log.LogInfof("updateDataNodeStatus: update %d hosts status", len(newHostsStatus))
596

597
	w.HostsStatus = newHostsStatus
598

599
	return
600
}
601

602
func (w *Wrapper) SetNearRead(nearRead bool) {
603
	w.nearRead = nearRead
604
	log.LogInfof("SetNearRead: set nearRead to %v", w.nearRead)
605
}
606

607
func (w *Wrapper) NearRead() bool {
608
	return w.nearRead
609
}
610

611
// Sort hosts by distance form local
612
func (w *Wrapper) sortHostsByDistance(srcHosts []string) []string {
613
	hosts := make([]string, len(srcHosts))
614
	copy(hosts, srcHosts)
615

616
	for i := 0; i < len(hosts); i++ {
617
		for j := i + 1; j < len(hosts); j++ {
618
			if distanceFromLocal(hosts[i]) > distanceFromLocal(hosts[j]) {
619
				hosts[i], hosts[j] = hosts[j], hosts[i]
620
			}
621
		}
622
	}
623
	return hosts
624
}
625

626
func distanceFromLocal(b string) int {
627
	remote := strings.Split(b, ":")[0]
628

629
	return iputil.GetDistance(net.ParseIP(LocalIP), net.ParseIP(remote))
630
}
631

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.