26
"github.com/cubefs/cubefs/proto"
27
masterSDK "github.com/cubefs/cubefs/sdk/master"
28
"github.com/cubefs/cubefs/util"
29
"github.com/cubefs/cubefs/util/errors"
30
"github.com/cubefs/cubefs/util/iputil"
31
"github.com/cubefs/cubefs/util/log"
32
"github.com/cubefs/cubefs/util/ump"
37
DefaultMinWriteAbleDataPartitionCnt = 10
40
type DataPartitionView struct {
41
DataPartitions []*DataPartition
44
type SimpleClientInfo interface {
45
GetFlowInfo() (*proto.ClientReportLimitInfo, bool)
46
UpdateFlowInfo(limit *proto.LimitRsp2Client)
47
SetClientID(id uint64) error
48
UpdateLatestVer(verList *proto.VolVersionInfoList) error
51
GetVerMgr() *proto.VolVersionInfoList
62
partitions map[uint64]*DataPartition
64
followerReadClientCfg bool
66
dpSelectorChanged bool
69
mc *masterSDK.MasterClient
73
dpSelector DataPartitionSelector
75
HostsStatus map[string]bool
76
Uids map[uint32]*proto.UidSimpleInfo
81
minWriteAbleDataPartitionCnt int
84
SimpleClient SimpleClientInfo
87
func (w *Wrapper) GetMasterClient() *masterSDK.MasterClient {
92
func NewDataPartitionWrapper(client SimpleClientInfo, volName string, masters []string, preload bool, minWriteAbleDataPartitionCnt int, verReadSeq uint64) (w *Wrapper, err error) {
93
log.LogInfof("action[NewDataPartitionWrapper] verReadSeq %v", verReadSeq)
96
w.stopC = make(chan struct{})
98
w.mc = masterSDK.NewMasterClient(masters, false)
100
w.partitions = make(map[uint64]*DataPartition)
101
w.HostsStatus = make(map[string]bool)
104
w.minWriteAbleDataPartitionCnt = minWriteAbleDataPartitionCnt
105
if w.minWriteAbleDataPartitionCnt < 0 {
106
w.minWriteAbleDataPartitionCnt = DefaultMinWriteAbleDataPartitionCnt
109
if w.LocalIp, err = ump.GetLocalIpAddr(); err != nil {
110
err = errors.Trace(err, "NewDataPartitionWrapper:")
114
if err = w.updateClusterInfo(); err != nil {
115
err = errors.Trace(err, "NewDataPartitionWrapper:")
119
if err = w.GetSimpleVolView(); err != nil {
120
err = errors.Trace(err, "NewDataPartitionWrapper:")
124
w.UploadFlowInfo(client, true)
126
if err = w.initDpSelector(); err != nil {
127
log.LogErrorf("NewDataPartitionWrapper: init initDpSelector failed, [%v]", err)
129
if err = w.updateDataPartition(true); err != nil {
130
err = errors.Trace(err, "NewDataPartitionWrapper:")
133
if err = w.updateDataNodeStatus(); err != nil {
134
log.LogErrorf("NewDataPartitionWrapper: init DataNodeStatus failed, [%v]", err)
136
w.verConfReadSeq = verReadSeq
138
var verList *proto.VolVersionInfoList
139
if verList, err = w.mc.AdminAPI().GetVerList(volName); err != nil {
142
if verReadSeq, err = w.CheckReadVerSeq(volName, verReadSeq, verList); err != nil {
143
log.LogErrorf("NewDataPartitionWrapper: init Read with ver [%v] error [%v]", verReadSeq, err)
147
w.verReadSeq = verReadSeq
148
w.SimpleClient = client
149
go w.uploadFlowInfoByTick(client)
154
func (w *Wrapper) Stop() {
155
w.stopOnce.Do(func() {
160
func (w *Wrapper) InitFollowerRead(clientConfig bool) {
161
w.followerReadClientCfg = clientConfig
162
w.followerRead = w.followerReadClientCfg || w.followerRead
165
func (w *Wrapper) FollowerRead() bool {
166
return w.followerRead
169
func (w *Wrapper) tryGetPartition(index uint64) (partition *DataPartition, ok bool) {
171
defer w.Lock.RUnlock()
172
partition, ok = w.partitions[index]
176
func (w *Wrapper) updateClusterInfo() (err error) {
177
var info *proto.ClusterInfo
178
if info, err = w.mc.AdminAPI().GetClusterInfo(); err != nil {
179
log.LogWarnf("UpdateClusterInfo: get cluster info fail: err(%v)", err)
182
log.LogInfof("UpdateClusterInfo: get cluster info: cluster(%v) localIP(%v)", info.Cluster, info.Ip)
183
w.clusterName = info.Cluster
188
func (w *Wrapper) UpdateUidsView(view *proto.SimpleVolView) {
190
defer w.UidLock.Unlock()
191
w.Uids = make(map[uint32]*proto.UidSimpleInfo)
192
for _, uid := range view.Uids {
196
w.Uids[uid.UID] = &uid
198
log.LogDebugf("uid info be updated to %v", view.Uids)
201
func (w *Wrapper) GetSimpleVolView() (err error) {
202
var view *proto.SimpleVolView
204
if view, err = w.mc.AdminAPI().GetVolumeSimpleInfo(w.volName); err != nil {
205
log.LogWarnf("GetSimpleVolView: get volume simple info fail: volume(%v) err(%v)", w.volName, err)
209
if view.Status == 1 {
210
log.LogWarnf("GetSimpleVolView: volume has been marked for deletion: volume(%v) status(%v - 0:normal/1:markDelete)",
211
w.volName, view.Status)
212
return proto.ErrVolNotExists
215
w.followerRead = view.FollowerRead
216
w.dpSelectorName = view.DpSelectorName
217
w.dpSelectorParm = view.DpSelectorParm
218
w.volType = view.VolType
219
w.EnablePosixAcl = view.EnablePosixAcl
220
w.UpdateUidsView(view)
222
log.LogDebugf("GetSimpleVolView: get volume simple info: ID(%v) name(%v) owner(%v) status(%v) capacity(%v) "+
223
"metaReplicas(%v) dataReplicas(%v) mpCnt(%v) dpCnt(%v) followerRead(%v) createTime(%v) dpSelectorName(%v) "+
224
"dpSelectorParm(%v) uids(%v)",
225
view.ID, view.Name, view.Owner, view.Status, view.Capacity, view.MpReplicaNum, view.DpReplicaNum, view.MpCnt,
226
view.DpCnt, view.FollowerRead, view.CreateTime, view.DpSelectorName, view.DpSelectorParm, view.Uids)
231
func (w *Wrapper) uploadFlowInfoByTick(clientInfo SimpleClientInfo) {
232
ticker := time.NewTicker(5 * time.Second)
236
w.UploadFlowInfo(clientInfo, false)
243
func (w *Wrapper) update(clientInfo SimpleClientInfo) {
244
ticker := time.NewTicker(time.Minute)
246
w.updateSimpleVolView()
247
w.updateDataPartition(false)
248
w.updateDataNodeStatus()
250
w.updateVerlist(clientInfo)
263
func (w *Wrapper) UploadFlowInfo(clientInfo SimpleClientInfo, init bool) (err error) {
264
var limitRsp *proto.LimitRsp2Client
266
flowInfo, isNeedReport := clientInfo.GetFlowInfo()
268
log.LogDebugf("action[UploadFlowInfo] no need report!")
272
if limitRsp, err = w.mc.AdminAPI().UploadFlowInfo(w.volName, flowInfo); err != nil {
273
log.LogWarnf("UpdateSimpleVolView: get volume simple info fail: volume(%v) err(%v)", w.volName, err)
278
if limitRsp.ID == 0 {
279
err = fmt.Errorf("init client get id 0")
280
log.LogInfof("action[UploadFlowInfo] err %v", err.Error())
283
log.LogInfof("action[UploadFlowInfo] get id %v", limitRsp.ID)
284
clientInfo.SetClientID(limitRsp.ID)
286
clientInfo.UpdateFlowInfo(limitRsp)
290
func (w *Wrapper) CheckPermission() {
291
if info, err := w.mc.UserAPI().AclOperation(w.volName, w.LocalIp, util.AclCheckIP); err != nil {
295
log.LogFatal("Client Addr not allowed to access CubeFS Cluster!")
299
func (w *Wrapper) updateVerlist(client SimpleClientInfo) (err error) {
300
verList, err := w.mc.AdminAPI().GetVerList(w.volName)
302
log.LogErrorf("CheckReadVerSeq: get cluster fail: err(%v)", err)
307
msg := fmt.Sprintf("get verList nil, vol [%v] reqd seq [%v]", w.volName, w.verReadSeq)
308
log.LogErrorf("action[CheckReadVerSeq] %v", msg)
309
return fmt.Errorf("%v", msg)
312
if w.verReadSeq > 0 {
313
if _, err = w.CheckReadVerSeq(w.volName, w.verConfReadSeq, verList); err != nil {
314
log.LogFatalf("updateSimpleVolView: readSeq abnormal %v", err)
319
log.LogDebugf("updateSimpleVolView.UpdateLatestVer.try update to verlist[%v]", verList)
320
if err = client.UpdateLatestVer(verList); err != nil {
321
log.LogWarnf("updateSimpleVolView: UpdateLatestVer ver %v faile err %v", verList.GetLastVer(), err)
327
func (w *Wrapper) updateSimpleVolView() (err error) {
328
var view *proto.SimpleVolView
329
if view, err = w.mc.AdminAPI().GetVolumeSimpleInfo(w.volName); err != nil {
330
log.LogWarnf("updateSimpleVolView: get volume simple info fail: volume(%v) err(%v)", w.volName, err)
334
w.UpdateUidsView(view)
336
if w.followerRead != view.FollowerRead && !w.followerReadClientCfg {
337
log.LogDebugf("UpdateSimpleVolView: update followerRead from old(%v) to new(%v)",
338
w.followerRead, view.FollowerRead)
339
w.followerRead = view.FollowerRead
342
if w.dpSelectorName != view.DpSelectorName || w.dpSelectorParm != view.DpSelectorParm {
343
log.LogDebugf("UpdateSimpleVolView: update dpSelector from old(%v %v) to new(%v %v)",
344
w.dpSelectorName, w.dpSelectorParm, view.DpSelectorName, view.DpSelectorParm)
346
w.dpSelectorName = view.DpSelectorName
347
w.dpSelectorParm = view.DpSelectorParm
348
w.dpSelectorChanged = true
355
func (w *Wrapper) updateDataPartitionByRsp(isInit bool, DataPartitions []*proto.DataPartitionResponse) (err error) {
356
convert := func(response *proto.DataPartitionResponse) *DataPartition {
357
return &DataPartition{
358
DataPartitionResponse: *response,
363
if proto.IsCold(w.volType) {
366
rwPartitionGroups := make([]*DataPartition, 0)
367
for index, partition := range DataPartitions {
368
if partition == nil {
369
log.LogErrorf("action[updateDataPartitionByRsp] index [%v] is nil", index)
372
dp := convert(partition)
373
if w.followerRead && w.nearRead {
374
dp.NearHosts = w.sortHostsByDistance(dp.Hosts)
376
log.LogInfof("updateDataPartition: dp(%v)", dp)
377
w.replaceOrInsertPartition(dp)
379
if proto.IsCold(w.volType) && proto.IsPreLoadDp(dp.PartitionType) {
382
if dp.Status == proto.ReadWrite {
384
rwPartitionGroups = append(rwPartitionGroups, dp)
385
log.LogInfof("updateDataPartition: dp(%v) address(%p) insert to rwPartitionGroups", dp.PartitionID, dp)
390
if isInit || len(rwPartitionGroups) >= w.minWriteAbleDataPartitionCnt || (proto.IsCold(w.volType) && (len(rwPartitionGroups) >= 1)) {
391
log.LogInfof("updateDataPartition: refresh dpSelector of volume(%v) with %v rw partitions(%v all), isInit(%v), minWriteAbleDataPartitionCnt(%v)",
392
w.volName, len(rwPartitionGroups), len(DataPartitions), isInit, w.minWriteAbleDataPartitionCnt)
393
w.refreshDpSelector(rwPartitionGroups)
395
err = errors.New("updateDataPartition: no writable data partition")
396
log.LogWarnf("updateDataPartition: no enough writable data partitions, volume(%v) with %v rw partitions(%v all), isInit(%v), minWriteAbleDataPartitionCnt(%v)",
397
w.volName, len(rwPartitionGroups), len(DataPartitions), isInit, w.minWriteAbleDataPartitionCnt)
400
log.LogInfof("updateDataPartition: finish")
404
func (w *Wrapper) updateDataPartition(isInit bool) (err error) {
408
var dpv *proto.DataPartitionsView
409
if dpv, err = w.mc.ClientAPI().EncodingGzip().GetDataPartitions(w.volName); err != nil {
410
log.LogErrorf("updateDataPartition: get data partitions fail: volume(%v) err(%v)", w.volName, err)
413
log.LogInfof("updateDataPartition: get data partitions: volume(%v) partitions(%v)", w.volName, len(dpv.DataPartitions))
414
return w.updateDataPartitionByRsp(isInit, dpv.DataPartitions)
417
func (w *Wrapper) UpdateDataPartition() (err error) {
418
return w.updateDataPartition(false)
423
func (w *Wrapper) getDataPartitionFromMaster(isInit bool, dpId uint64) (err error) {
424
var dpInfo *proto.DataPartitionInfo
425
if dpInfo, err = w.mc.AdminAPI().GetDataPartition(w.volName, dpId); err != nil {
426
log.LogErrorf("getDataPartitionFromMaster: get data partitions fail: volume(%v) dpId(%v) err(%v)",
427
w.volName, dpId, err)
431
log.LogInfof("getDataPartitionFromMaster: get data partitions: volume(%v), dpId(%v)", w.volName, dpId)
432
var leaderAddr string
433
for _, replica := range dpInfo.Replicas {
434
if replica.IsLeader {
435
leaderAddr = replica.Addr
439
dpr := new(proto.DataPartitionResponse)
440
dpr.PartitionID = dpId
441
dpr.Status = dpInfo.Status
442
dpr.ReplicaNum = dpInfo.ReplicaNum
443
dpr.Hosts = make([]string, len(dpInfo.Hosts))
444
copy(dpr.Hosts, dpInfo.Hosts)
445
dpr.LeaderAddr = leaderAddr
446
dpr.IsRecover = dpInfo.IsRecover
447
dpr.IsDiscard = dpInfo.IsDiscard
449
DataPartitions := make([]*proto.DataPartitionResponse, 1)
450
DataPartitions = append(DataPartitions, dpr)
451
return w.updateDataPartitionByRsp(isInit, DataPartitions)
454
func (w *Wrapper) clearPartitions() {
456
defer w.Lock.Unlock()
457
w.partitions = make(map[uint64]*DataPartition)
460
func (w *Wrapper) AllocatePreLoadDataPartition(volName string, count int, capacity, ttl uint64, zones string) (err error) {
461
var dpv *proto.DataPartitionsView
463
if dpv, err = w.mc.AdminAPI().CreatePreLoadDataPartition(volName, count, capacity, ttl, zones); err != nil {
464
log.LogWarnf("CreatePreLoadDataPartition fail: err(%v)", err)
467
convert := func(response *proto.DataPartitionResponse) *DataPartition {
468
return &DataPartition{
469
DataPartitionResponse: *response,
473
rwPartitionGroups := make([]*DataPartition, 0)
474
for _, partition := range dpv.DataPartitions {
475
dp := convert(partition)
476
if proto.IsCold(w.volType) && !proto.IsPreLoadDp(dp.PartitionType) {
479
log.LogInfof("updateDataPartition: dp(%v)", dp)
480
w.replaceOrInsertPartition(dp)
482
rwPartitionGroups = append(rwPartitionGroups, dp)
485
w.refreshDpSelector(rwPartitionGroups)
489
func (w *Wrapper) replaceOrInsertPartition(dp *DataPartition) {
492
old, ok := w.partitions[dp.PartitionID]
494
oldstatus = old.Status
496
old.Status = dp.Status
497
old.ReplicaNum = dp.ReplicaNum
499
old.IsDiscard = dp.IsDiscard
500
old.NearHosts = dp.Hosts
502
dp.Metrics = old.Metrics
504
dp.Metrics = NewDataPartitionMetrics()
505
w.partitions[dp.PartitionID] = dp
510
if ok && oldstatus != dp.Status {
511
log.LogInfof("partition:dp[%v] address %p status change (%v) -> (%v)", dp.PartitionID, &old, oldstatus, dp.Status)
516
func (w *Wrapper) GetDataPartition(partitionID uint64) (*DataPartition, error) {
517
dp, ok := w.tryGetPartition(partitionID)
518
if !ok && !proto.IsCold(w.volType) {
519
err := w.getDataPartitionFromMaster(false, partitionID)
521
dp, ok = w.tryGetPartition(partitionID)
523
return nil, fmt.Errorf("partition[%v] not exsit", partitionID)
527
return nil, fmt.Errorf("partition[%v] not exsit", partitionID)
530
return nil, fmt.Errorf("partition[%v] not exsit", partitionID)
535
func (w *Wrapper) GetReadVerSeq() uint64 {
539
func (w *Wrapper) CheckReadVerSeq(volName string, verReadSeq uint64, verList *proto.VolVersionInfoList) (readReadVer uint64, err error) {
541
defer w.Lock.RUnlock()
543
log.LogInfof("action[CheckReadVerSeq] vol [%v] req seq [%v]", volName, verReadSeq)
545
readReadVer = verReadSeq
548
if verReadSeq == math.MaxUint64 {
554
ver *proto.VolVersionInfo
555
verLen = len(verList.VerList)
557
for id, ver = range verList.VerList {
559
err = fmt.Errorf("action[CheckReadVerSeq] readReadVer %v not found", readReadVer)
562
log.LogInfof("action[CheckReadVerSeq] ver %v,%v", ver.Ver, ver.Status)
563
if ver.Ver == verReadSeq {
564
if ver.Status != proto.VersionNormal {
565
err = fmt.Errorf("action[CheckReadVerSeq] status %v not right", ver.Status)
568
readReadVer = verList.VerList[id+1].Ver - 1
569
log.LogInfof("action[CheckReadVerSeq] get read ver %v", readReadVer)
574
err = fmt.Errorf("not found read ver %v", verReadSeq)
579
func (w *Wrapper) WarningMsg() string {
580
return fmt.Sprintf("%s_client_warning", w.clusterName)
583
func (w *Wrapper) updateDataNodeStatus() (err error) {
584
var cv *proto.ClusterView
585
cv, err = w.mc.AdminAPI().GetCluster()
587
log.LogErrorf("updateDataNodeStatus: get cluster fail: err(%v)", err)
591
newHostsStatus := make(map[string]bool)
592
for _, node := range cv.DataNodes {
593
newHostsStatus[node.Addr] = node.IsActive
595
log.LogInfof("updateDataNodeStatus: update %d hosts status", len(newHostsStatus))
597
w.HostsStatus = newHostsStatus
602
func (w *Wrapper) SetNearRead(nearRead bool) {
603
w.nearRead = nearRead
604
log.LogInfof("SetNearRead: set nearRead to %v", w.nearRead)
607
func (w *Wrapper) NearRead() bool {
612
func (w *Wrapper) sortHostsByDistance(srcHosts []string) []string {
613
hosts := make([]string, len(srcHosts))
614
copy(hosts, srcHosts)
616
for i := 0; i < len(hosts); i++ {
617
for j := i + 1; j < len(hosts); j++ {
618
if distanceFromLocal(hosts[i]) > distanceFromLocal(hosts[j]) {
619
hosts[i], hosts[j] = hosts[j], hosts[i]
626
func distanceFromLocal(b string) int {
627
remote := strings.Split(b, ":")[0]
629
return iputil.GetDistance(net.ParseIP(LocalIP), net.ParseIP(remote))