1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
24
"github.com/cubefs/cubefs/proto"
25
"github.com/cubefs/cubefs/util/auditlog"
26
"github.com/cubefs/cubefs/util/errors"
27
"github.com/cubefs/cubefs/util/exporter"
28
"github.com/cubefs/cubefs/util/log"
31
func (mp *metaPartition) CheckQuota(inodeId uint64, p *Packet) (iParm *Inode, inode *Inode, err error) {
32
iParm = NewInode(inodeId, 0)
33
status := mp.isOverQuota(inodeId, true, false)
35
log.LogErrorf("CheckQuota dir quota fail inode[%v] status [%v]", inodeId, status)
36
err = errors.New("CheckQuota dir quota is over quota")
37
reply := []byte(err.Error())
38
p.PacketErrorWithBody(status, reply)
42
item := mp.inodeTree.Get(iParm)
44
err = fmt.Errorf("inode[%v] not exist", iParm)
45
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
49
mp.uidManager.acLock.Lock()
50
if mp.uidManager.getUidAcl(inode.Uid) {
51
log.LogWarnf("CheckQuota UidSpace.volname [%v] mp[%v] uid %v be set full", mp.uidManager.mpID, mp.uidManager.volName, inode.Uid)
52
mp.uidManager.acLock.Unlock()
53
status = proto.OpNoSpaceErr
54
err = errors.New("CheckQuota UidSpace is over quota")
55
reply := []byte(err.Error())
56
p.PacketErrorWithBody(status, reply)
59
mp.uidManager.acLock.Unlock()
63
// ExtentAppend appends an extent.
64
func (mp *metaPartition) ExtentAppend(req *proto.AppendExtentKeyRequest, p *Packet) (err error) {
65
if !proto.IsHot(mp.volType) {
66
err = fmt.Errorf("only support hot vol")
67
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
70
ino := NewInode(req.Inode, 0)
71
if _, _, err = mp.CheckQuota(req.Inode, p); err != nil {
72
log.LogErrorf("ExtentAppend fail status [%v]", err)
76
ino.Extents.Append(ext)
77
val, err := ino.Marshal()
79
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
82
resp, err := mp.submit(opFSMExtentsAdd, val)
84
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
87
p.PacketErrorWithBody(resp.(uint8), nil)
91
// ExtentAppendWithCheck appends an extent with discard extents check.
92
// Format: one valid extent key followed by non or several discard keys.
93
func (mp *metaPartition) ExtentAppendWithCheck(req *proto.AppendExtentKeyWithCheckRequest, p *Packet) (err error) {
94
status := mp.isOverQuota(req.Inode, true, false)
96
log.LogErrorf("ExtentAppendWithCheck fail status [%v]", status)
97
err = errors.New("ExtentAppendWithCheck is over quota")
98
reply := []byte(err.Error())
99
p.PacketErrorWithBody(status, reply)
106
if inoParm, i, err = mp.CheckQuota(req.Inode, p); err != nil {
107
log.LogErrorf("ExtentAppendWithCheck CheckQuota fail err [%v]", err)
111
// check volume's Type: if volume's type is cold, cbfs' extent can be modify/add only when objextent exist
112
if proto.IsCold(mp.volType) {
114
exist, idx := i.ObjExtents.FindOffsetExist(req.Extent.FileOffset)
117
err = fmt.Errorf("ebs's objextent not exist with offset[%v]", req.Extent.FileOffset)
118
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
121
if i.ObjExtents.eks[idx].Size != uint64(req.Extent.Size) {
122
err = fmt.Errorf("ebs's objextent size[%v] isn't equal to the append size[%v]", i.ObjExtents.eks[idx].Size, req.Extent.Size)
123
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
132
// extent key verSeq not set value since marshal will not include verseq
133
// use inode verSeq instead
134
inoParm.setVer(mp.verSeq)
135
inoParm.Extents.Append(ext)
136
log.LogDebugf("ExtentAppendWithCheck: ino(%v) mp[%v] verSeq (%v)", req.Inode, req.PartitionID, mp.verSeq)
138
// Store discard extents right after the append extent key.
139
if len(req.DiscardExtents) != 0 {
140
inoParm.Extents.eks = append(inoParm.Extents.eks, req.DiscardExtents...)
142
val, err := inoParm.Marshal()
144
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
147
var opFlag uint32 = opFSMExtentsAddWithCheck
149
opFlag = opFSMExtentSplit
151
resp, err := mp.submit(opFlag, val)
153
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
157
log.LogDebugf("ExtentAppendWithCheck: ino(%v) mp[%v] verSeq (%v) req.VerSeq(%v) rspcode(%v)", req.Inode, req.PartitionID, mp.verSeq, req.VerSeq, resp.(uint8))
159
if mp.verSeq > req.VerSeq {
160
// reuse ExtentType to identify flag of version inconsistent between metanode and client
161
// will resp to client and make client update all streamer's extent and it's verSeq
162
p.ExtentType |= proto.MultiVersionFlag
165
p.PacketErrorWithBody(resp.(uint8), nil)
169
func (mp *metaPartition) SetTxInfo(info []*proto.TxInfo) {
170
for _, txInfo := range info {
171
if txInfo.Volume != mp.config.VolName {
174
mp.txProcessor.mask = txInfo.Mask
175
mp.txProcessor.txManager.setLimit(txInfo.OpLimitVal)
176
log.LogInfof("SetTxInfo mp[%v] mask %v limit %v", mp.config.PartitionId, proto.GetMaskString(txInfo.Mask), txInfo.OpLimitVal)
180
type VerOpData struct {
183
VerList []*proto.VolVersionInfo
186
func (mp *metaPartition) checkByMasterVerlist(mpVerList *proto.VolVersionInfoList, masterVerList *proto.VolVersionInfoList) (err error) {
187
currMasterSeq := masterVerList.GetLastVer()
188
verMapMaster := make(map[uint64]*proto.VolVersionInfo)
189
for _, ver := range masterVerList.VerList {
190
verMapMaster[ver.Ver] = ver
192
log.LogDebugf("checkVerList. volname [%v] mp[%v] masterVerList %v mpVerList.VerList %v", mp.config.VolName, mp.config.PartitionId, masterVerList, mpVerList.VerList)
193
mp.multiVersionList.RWLock.Lock()
194
defer mp.multiVersionList.RWLock.Unlock()
195
vlen := len(mpVerList.VerList)
196
for id, info2 := range mpVerList.VerList {
200
log.LogDebugf("checkVerList. volname [%v] mp[%v] ver info %v currMasterseq [%v]", mp.config.VolName, mp.config.PartitionId, info2, currMasterSeq)
201
_, exist := verMapMaster[info2.Ver]
203
if _, ok := mp.multiVersionList.TemporaryVerMap[info2.Ver]; !ok {
204
log.LogInfof("checkVerList. volname [%v] mp[%v] ver info %v be consider as TemporaryVer", mp.config.VolName, mp.config.PartitionId, info2)
205
mp.multiVersionList.TemporaryVerMap[info2.Ver] = info2
210
for verSeq := range mp.multiVersionList.TemporaryVerMap {
211
for index, verInfo := range mp.multiVersionList.VerList {
212
if verInfo.Ver == verSeq {
213
log.LogInfof("checkVerList.updateVerList volname [%v] mp[%v] ver info %v be consider as TemporaryVer and do deletion verlist %v",
214
mp.config.VolName, mp.config.PartitionId, verInfo, mp.multiVersionList.VerList)
215
if index == len(mp.multiVersionList.VerList)-1 {
216
log.LogInfof("checkVerList.updateVerList volname [%v] mp[%v] last ver info %v should not be consider as TemporaryVer and do deletion verlist %v",
217
mp.config.VolName, mp.config.PartitionId, verInfo, mp.multiVersionList.VerList)
220
mp.multiVersionList.VerList = append(mp.multiVersionList.VerList[:index], mp.multiVersionList.VerList[index+1:]...)
223
log.LogInfof("checkVerList.updateVerList volname [%v] mp[%v] verlist %v", mp.config.VolName, mp.config.PartitionId, mp.multiVersionList.VerList)
231
func (mp *metaPartition) checkVerList(reqVerListInfo *proto.VolVersionInfoList, sync bool) (needUpdate bool, err error) {
232
mp.multiVersionList.RWLock.RLock()
233
verMapLocal := make(map[uint64]*proto.VolVersionInfo)
234
verMapReq := make(map[uint64]*proto.VolVersionInfo)
235
for _, ver := range reqVerListInfo.VerList {
236
verMapReq[ver.Ver] = ver
239
var VerList []*proto.VolVersionInfo
241
for _, info2 := range mp.multiVersionList.VerList {
242
log.LogDebugf("checkVerList. volname [%v] mp[%v] ver info %v", mp.config.VolName, mp.config.PartitionId, info2)
243
vms, exist := verMapReq[info2.Ver]
245
log.LogWarnf("checkVerList. volname [%v] mp[%v] version info(%v) not exist in master (%v)",
246
mp.config.VolName, mp.config.PartitionId, info2, reqVerListInfo.VerList)
247
} else if info2.Status != proto.VersionNormal && info2.Status != vms.Status {
248
log.LogWarnf("checkVerList. volname [%v] mp[%v] ver [%v] status abnormal %v", mp.config.VolName, mp.config.PartitionId, info2.Ver, info2.Status)
249
info2.Status = vms.Status
253
if _, ok := verMapLocal[info2.Ver]; !ok {
254
verMapLocal[info2.Ver] = info2
255
VerList = append(VerList, info2)
258
mp.multiVersionList.RWLock.RUnlock()
260
for _, vInfo := range reqVerListInfo.VerList {
261
if vInfo.Status != proto.VersionNormal && vInfo.Status != proto.VersionPrepare {
262
log.LogDebugf("checkVerList. volname [%v] mp[%v] master info %v", mp.config.VolName, mp.config.PartitionId, vInfo)
265
ver, exist := verMapLocal[vInfo.Ver]
267
expStr := fmt.Sprintf("checkVerList.volname [%v] mp[%v] not found %v in mp list and append version %v",
268
mp.config.VolName, mp.config.PartitionId, vInfo.Ver, vInfo)
269
log.LogWarnf("[checkVerList] volname [%v]", expStr)
270
if vInfo.Ver < mp.multiVersionList.GetLastVer() {
273
exporter.Warning(expStr)
274
VerList = append(VerList, vInfo)
276
verMapLocal[vInfo.Ver] = vInfo
279
if ver.Status != vInfo.Status {
280
warn := fmt.Sprintf("checkVerList.volname [%v] mp[%v] ver [%v] inoraml.local status [%v] update to %v",
281
mp.config.VolName, mp.config.PartitionId, vInfo.Status, vInfo.Ver, vInfo.Status)
283
ver.Status = vInfo.Status
288
sort.SliceStable(VerList, func(i, j int) bool {
289
if VerList[i].Ver < VerList[j].Ver {
290
lastSeq = VerList[j].Ver
293
lastSeq = VerList[i].Ver
296
if err = mp.HandleVersionOp(proto.SyncBatchVersionList, lastSeq, VerList, sync); err != nil {
303
func (mp *metaPartition) HandleVersionOp(op uint8, verSeq uint64, verList []*proto.VolVersionInfo, sync bool) (err error) {
304
verData := &VerOpData{
309
data, _ := json.Marshal(verData)
311
_, err = mp.submit(opFSMVersionOp, data)
315
case mp.verUpdateChan <- data:
316
log.LogDebugf("mp[%v] verseq [%v] op [%v] be pushed to queue", mp.config.PartitionId, verSeq, op)
318
err = fmt.Errorf("mp[%v] version update channel full, verdata %v not be executed", mp.config.PartitionId, string(data))
323
func (mp *metaPartition) GetAllVersionInfo(req *proto.MultiVersionOpRequest, p *Packet) (err error) {
327
func (mp *metaPartition) GetSpecVersionInfo(req *proto.MultiVersionOpRequest, p *Packet) (err error) {
331
func (mp *metaPartition) GetExtentByVer(ino *Inode, req *proto.GetExtentsRequest, rsp *proto.GetExtentsResponse) {
332
log.LogInfof("action[GetExtentByVer] read ino[%v] readseq [%v] ino seq [%v] hist len %v", ino.Inode, req.VerSeq, ino.getVer(), ino.getLayerLen())
334
if isInitSnapVer(req.VerSeq) {
337
ino.DoReadFunc(func() {
338
ino.Extents.Range(func(_ int, ek proto.ExtentKey) bool {
339
if ek.GetSeq() <= reqVer {
340
rsp.Extents = append(rsp.Extents, ek)
341
log.LogInfof("action[GetExtentByVer] fresh layer.read ino[%v] readseq [%v] ino seq [%v] include ek [%v]", ino.Inode, reqVer, ino.getVer(), ek)
343
log.LogInfof("action[GetExtentByVer] fresh layer.read ino[%v] readseq [%v] ino seq [%v] exclude ek [%v]", ino.Inode, reqVer, ino.getVer(), ek)
347
ino.RangeMultiVer(func(idx int, snapIno *Inode) bool {
348
log.LogInfof("action[GetExtentByVer] read ino[%v] readseq [%v] snapIno ino seq [%v]", ino.Inode, reqVer, snapIno.getVer())
349
for _, ek := range snapIno.Extents.eks {
350
if reqVer >= ek.GetSeq() {
351
log.LogInfof("action[GetExtentByVer] get extent ino[%v] readseq [%v] snapIno ino seq [%v], include ek (%v)", ino.Inode, reqVer, snapIno.getVer(), ek.String())
352
rsp.Extents = append(rsp.Extents, ek)
354
log.LogInfof("action[GetExtentByVer] not get extent ino[%v] readseq [%v] snapIno ino seq [%v], exclude ek (%v)", ino.Inode, reqVer, snapIno.getVer(), ek.String())
357
if reqVer >= snapIno.getVer() {
358
log.LogInfof("action[GetExtentByVer] finish read ino[%v] readseq [%v] snapIno ino seq [%v]", ino.Inode, reqVer, snapIno.getVer())
363
sort.SliceStable(rsp.Extents, func(i, j int) bool {
364
return rsp.Extents[i].FileOffset < rsp.Extents[j].FileOffset
371
func (mp *metaPartition) SetUidLimit(info []*proto.UidSpaceInfo) {
372
mp.uidManager.volName = mp.config.VolName
373
mp.uidManager.setUidAcl(info)
376
func (mp *metaPartition) GetUidInfo() (info []*proto.UidReportSpaceInfo) {
377
return mp.uidManager.getAllUidSpace()
380
// ExtentsList returns the list of extents.
381
func (mp *metaPartition) ExtentsList(req *proto.GetExtentsRequest, p *Packet) (err error) {
382
log.LogDebugf("action[ExtentsList] inode[%v] verseq [%v]", req.Inode, req.VerSeq)
384
// note:don't need set reqSeq, extents get be done in next step
385
ino := NewInode(req.Inode, 0)
386
retMsg := mp.getInodeTopLayer(ino)
388
// notice.getInode should not set verSeq due to extent need filter from the newest layer to req.VerSeq
392
status = retMsg.Status
395
if status == proto.OpOk {
396
resp := &proto.GetExtentsResponse{}
397
log.LogInfof("action[ExtentsList] inode[%v] request verseq [%v] ino ver [%v] extent size %v ino.Size %v ino[%v] hist len %v",
398
req.Inode, req.VerSeq, ino.getVer(), len(ino.Extents.eks), ino.Size, ino, ino.getLayerLen())
400
if req.VerSeq > 0 && ino.getVer() > 0 && (req.VerSeq < ino.getVer() || isInitSnapVer(req.VerSeq)) {
401
mp.GetExtentByVer(ino, req, resp)
402
vIno := ino.Copy().(*Inode)
403
vIno.setVerNoCheck(req.VerSeq)
404
if vIno = mp.getInodeByVer(vIno); vIno != nil {
405
resp.Generation = vIno.Generation
406
resp.Size = vIno.Size
409
ino.DoReadFunc(func() {
410
resp.Generation = ino.Generation
412
ino.Extents.Range(func(_ int, ek proto.ExtentKey) bool {
413
resp.Extents = append(resp.Extents, ek)
414
log.LogInfof("action[ExtentsList] append ek [%v]", ek)
420
resp.LayerInfo = retMsg.Msg.getAllLayerEks()
422
reply, err = json.Marshal(resp)
425
reply = []byte(err.Error())
428
p.PacketErrorWithBody(status, reply)
432
// ObjExtentsList returns the list of obj extents and extents.
433
func (mp *metaPartition) ObjExtentsList(req *proto.GetExtentsRequest, p *Packet) (err error) {
434
ino := NewInode(req.Inode, 0)
435
ino.setVer(req.VerSeq)
436
retMsg := mp.getInode(ino, false)
440
status = retMsg.Status
442
if status == proto.OpOk {
443
resp := &proto.GetObjExtentsResponse{}
444
ino.DoReadFunc(func() {
445
resp.Generation = ino.Generation
447
ino.Extents.Range(func(_ int, ek proto.ExtentKey) bool {
448
resp.Extents = append(resp.Extents, ek)
451
ino.ObjExtents.Range(func(ek proto.ObjExtentKey) bool {
452
resp.ObjExtents = append(resp.ObjExtents, ek)
457
reply, err = json.Marshal(resp)
460
reply = []byte(err.Error())
463
p.PacketErrorWithBody(status, reply)
467
// ExtentsTruncate truncates an extent.
468
func (mp *metaPartition) ExtentsTruncate(req *ExtentsTruncateReq, p *Packet, remoteAddr string) (err error) {
469
if !proto.IsHot(mp.volType) {
470
err = fmt.Errorf("only support hot vol")
471
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
474
fileSize := uint64(0)
476
if mp.IsEnableAuditLog() {
478
auditlog.LogInodeOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Inode, fileSize)
481
ino := NewInode(req.Inode, proto.Mode(os.ModePerm))
482
item := mp.inodeTree.CopyGet(ino)
484
err = fmt.Errorf("inode[%v] is not exist", req.Inode)
485
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
489
status := mp.isOverQuota(req.Inode, req.Size > i.Size, false)
491
log.LogErrorf("ExtentsTruncate fail status [%v]", status)
492
err = errors.New("ExtentsTruncate is over quota")
493
reply := []byte(err.Error())
494
p.PacketErrorWithBody(status, reply)
500
ino.setVer(mp.verSeq)
501
val, err := ino.Marshal()
503
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
506
resp, err := mp.submit(opFSMExtentTruncate, val)
508
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
511
msg := resp.(*InodeResponse)
512
p.PacketErrorWithBody(msg.Status, nil)
516
func (mp *metaPartition) BatchExtentAppend(req *proto.AppendExtentKeysRequest, p *Packet) (err error) {
517
if !proto.IsHot(mp.volType) {
518
err = fmt.Errorf("only support hot vol")
519
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
524
if ino, _, err = mp.CheckQuota(req.Inode, p); err != nil {
525
log.LogErrorf("BatchExtentAppend fail err [%v]", err)
529
extents := req.Extents
530
for _, extent := range extents {
531
ino.Extents.Append(extent)
533
val, err := ino.Marshal()
535
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
538
resp, err := mp.submit(opFSMExtentsAdd, val)
540
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
543
p.PacketErrorWithBody(resp.(uint8), nil)
547
func (mp *metaPartition) BatchObjExtentAppend(req *proto.AppendObjExtentKeysRequest, p *Packet) (err error) {
549
if ino, _, err = mp.CheckQuota(req.Inode, p); err != nil {
550
log.LogErrorf("BatchObjExtentAppend fail status [%v]", err)
554
objExtents := req.Extents
555
for _, objExtent := range objExtents {
556
err = ino.ObjExtents.Append(objExtent)
558
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
562
val, err := ino.Marshal()
564
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
567
resp, err := mp.submit(opFSMObjExtentsAdd, val)
569
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
572
p.PacketErrorWithBody(resp.(uint8), nil)
576
// func (mp *metaPartition) ExtentsDelete(req *proto.DelExtentKeyRequest, p *Packet) (err error) {
577
// ino := NewInode(req.Inode, 0)
578
// inode := mp.inodeTree.Get(ino).(*Inode)
579
// inode.Extents.Delete(req.Extents)
580
// curTime := timeutil.GetCurrentTimeUnix()
581
// if inode.ModifyTime < curTime {
582
// inode.ModifyTime = curTime
584
// val, err := inode.Marshal()
586
// p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
589
// resp, err := mp.submit(opFSMExtentsDel, val)
591
// p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
594
// p.PacketErrorWithBody(resp.(uint8), nil)
598
// ExtentsEmpty only use in datalake situation
599
func (mp *metaPartition) ExtentsOp(p *Packet, ino *Inode, op uint32) (err error) {
600
val, err := ino.Marshal()
602
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
605
resp, err := mp.submit(op, val)
607
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
610
p.PacketErrorWithBody(resp.(uint8), nil)
614
func (mp *metaPartition) sendExtentsToChan(eks []proto.ExtentKey) (err error) {
619
sortExts := NewSortedExtentsFromEks(eks)
620
val, err := sortExts.MarshalBinary(true)
622
return fmt.Errorf("[delExtents] marshal binary fail, %s", err.Error())
625
_, err = mp.submit(opFSMSentToChan, val)