cubefs

Форк
0
/
partition_op_extent.go 
628 строк · 20.1 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package metanode
16

17
import (
18
	"encoding/json"
19
	"fmt"
20
	"os"
21
	"sort"
22
	"time"
23

24
	"github.com/cubefs/cubefs/proto"
25
	"github.com/cubefs/cubefs/util/auditlog"
26
	"github.com/cubefs/cubefs/util/errors"
27
	"github.com/cubefs/cubefs/util/exporter"
28
	"github.com/cubefs/cubefs/util/log"
29
)
30

31
func (mp *metaPartition) CheckQuota(inodeId uint64, p *Packet) (iParm *Inode, inode *Inode, err error) {
32
	iParm = NewInode(inodeId, 0)
33
	status := mp.isOverQuota(inodeId, true, false)
34
	if status != 0 {
35
		log.LogErrorf("CheckQuota dir quota fail inode[%v] status [%v]", inodeId, status)
36
		err = errors.New("CheckQuota dir quota is over quota")
37
		reply := []byte(err.Error())
38
		p.PacketErrorWithBody(status, reply)
39
		return
40
	}
41

42
	item := mp.inodeTree.Get(iParm)
43
	if item == nil {
44
		err = fmt.Errorf("inode[%v] not exist", iParm)
45
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
46
		return
47
	}
48
	inode = item.(*Inode)
49
	mp.uidManager.acLock.Lock()
50
	if mp.uidManager.getUidAcl(inode.Uid) {
51
		log.LogWarnf("CheckQuota UidSpace.volname [%v] mp[%v] uid %v be set full", mp.uidManager.mpID, mp.uidManager.volName, inode.Uid)
52
		mp.uidManager.acLock.Unlock()
53
		status = proto.OpNoSpaceErr
54
		err = errors.New("CheckQuota UidSpace is over quota")
55
		reply := []byte(err.Error())
56
		p.PacketErrorWithBody(status, reply)
57
		return
58
	}
59
	mp.uidManager.acLock.Unlock()
60
	return
61
}
62

63
// ExtentAppend appends an extent.
64
func (mp *metaPartition) ExtentAppend(req *proto.AppendExtentKeyRequest, p *Packet) (err error) {
65
	if !proto.IsHot(mp.volType) {
66
		err = fmt.Errorf("only support hot vol")
67
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
68
		return
69
	}
70
	ino := NewInode(req.Inode, 0)
71
	if _, _, err = mp.CheckQuota(req.Inode, p); err != nil {
72
		log.LogErrorf("ExtentAppend fail status [%v]", err)
73
		return
74
	}
75
	ext := req.Extent
76
	ino.Extents.Append(ext)
77
	val, err := ino.Marshal()
78
	if err != nil {
79
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
80
		return
81
	}
82
	resp, err := mp.submit(opFSMExtentsAdd, val)
83
	if err != nil {
84
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
85
		return
86
	}
87
	p.PacketErrorWithBody(resp.(uint8), nil)
88
	return
89
}
90

91
// ExtentAppendWithCheck appends an extent with discard extents check.
92
// Format: one valid extent key followed by non or several discard keys.
93
func (mp *metaPartition) ExtentAppendWithCheck(req *proto.AppendExtentKeyWithCheckRequest, p *Packet) (err error) {
94
	status := mp.isOverQuota(req.Inode, true, false)
95
	if status != 0 {
96
		log.LogErrorf("ExtentAppendWithCheck fail status [%v]", status)
97
		err = errors.New("ExtentAppendWithCheck is over quota")
98
		reply := []byte(err.Error())
99
		p.PacketErrorWithBody(status, reply)
100
		return
101
	}
102
	var (
103
		inoParm *Inode
104
		i       *Inode
105
	)
106
	if inoParm, i, err = mp.CheckQuota(req.Inode, p); err != nil {
107
		log.LogErrorf("ExtentAppendWithCheck CheckQuota fail err [%v]", err)
108
		return
109
	}
110

111
	// check volume's Type: if volume's type is cold, cbfs' extent can be modify/add only when objextent exist
112
	if proto.IsCold(mp.volType) {
113
		i.RLock()
114
		exist, idx := i.ObjExtents.FindOffsetExist(req.Extent.FileOffset)
115
		if !exist {
116
			i.RUnlock()
117
			err = fmt.Errorf("ebs's objextent not exist with offset[%v]", req.Extent.FileOffset)
118
			p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
119
			return
120
		}
121
		if i.ObjExtents.eks[idx].Size != uint64(req.Extent.Size) {
122
			err = fmt.Errorf("ebs's objextent size[%v] isn't equal to the append size[%v]", i.ObjExtents.eks[idx].Size, req.Extent.Size)
123
			p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
124
			i.RUnlock()
125
			return
126
		}
127
		i.RUnlock()
128
	}
129

130
	ext := req.Extent
131

132
	// extent key verSeq not set value since marshal will not include verseq
133
	// use inode verSeq instead
134
	inoParm.setVer(mp.verSeq)
135
	inoParm.Extents.Append(ext)
136
	log.LogDebugf("ExtentAppendWithCheck: ino(%v) mp[%v] verSeq (%v)", req.Inode, req.PartitionID, mp.verSeq)
137

138
	// Store discard extents right after the append extent key.
139
	if len(req.DiscardExtents) != 0 {
140
		inoParm.Extents.eks = append(inoParm.Extents.eks, req.DiscardExtents...)
141
	}
142
	val, err := inoParm.Marshal()
143
	if err != nil {
144
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
145
		return
146
	}
147
	var opFlag uint32 = opFSMExtentsAddWithCheck
148
	if req.IsSplit {
149
		opFlag = opFSMExtentSplit
150
	}
151
	resp, err := mp.submit(opFlag, val)
152
	if err != nil {
153
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
154
		return
155
	}
156

157
	log.LogDebugf("ExtentAppendWithCheck: ino(%v) mp[%v] verSeq (%v) req.VerSeq(%v) rspcode(%v)", req.Inode, req.PartitionID, mp.verSeq, req.VerSeq, resp.(uint8))
158

159
	if mp.verSeq > req.VerSeq {
160
		// reuse ExtentType to identify flag of version inconsistent between metanode and client
161
		// will resp to client and make client update all streamer's extent and it's verSeq
162
		p.ExtentType |= proto.MultiVersionFlag
163
		p.VerSeq = mp.verSeq
164
	}
165
	p.PacketErrorWithBody(resp.(uint8), nil)
166
	return
167
}
168

169
func (mp *metaPartition) SetTxInfo(info []*proto.TxInfo) {
170
	for _, txInfo := range info {
171
		if txInfo.Volume != mp.config.VolName {
172
			continue
173
		}
174
		mp.txProcessor.mask = txInfo.Mask
175
		mp.txProcessor.txManager.setLimit(txInfo.OpLimitVal)
176
		log.LogInfof("SetTxInfo mp[%v] mask %v limit %v", mp.config.PartitionId, proto.GetMaskString(txInfo.Mask), txInfo.OpLimitVal)
177
	}
178
}
179

180
type VerOpData struct {
181
	Op      uint8
182
	VerSeq  uint64
183
	VerList []*proto.VolVersionInfo
184
}
185

186
func (mp *metaPartition) checkByMasterVerlist(mpVerList *proto.VolVersionInfoList, masterVerList *proto.VolVersionInfoList) (err error) {
187
	currMasterSeq := masterVerList.GetLastVer()
188
	verMapMaster := make(map[uint64]*proto.VolVersionInfo)
189
	for _, ver := range masterVerList.VerList {
190
		verMapMaster[ver.Ver] = ver
191
	}
192
	log.LogDebugf("checkVerList. volname [%v] mp[%v] masterVerList %v mpVerList.VerList %v", mp.config.VolName, mp.config.PartitionId, masterVerList, mpVerList.VerList)
193
	mp.multiVersionList.RWLock.Lock()
194
	defer mp.multiVersionList.RWLock.Unlock()
195
	vlen := len(mpVerList.VerList)
196
	for id, info2 := range mpVerList.VerList {
197
		if id == vlen-1 {
198
			break
199
		}
200
		log.LogDebugf("checkVerList. volname [%v] mp[%v] ver info %v currMasterseq [%v]", mp.config.VolName, mp.config.PartitionId, info2, currMasterSeq)
201
		_, exist := verMapMaster[info2.Ver]
202
		if !exist {
203
			if _, ok := mp.multiVersionList.TemporaryVerMap[info2.Ver]; !ok {
204
				log.LogInfof("checkVerList. volname [%v] mp[%v] ver info %v be consider as TemporaryVer", mp.config.VolName, mp.config.PartitionId, info2)
205
				mp.multiVersionList.TemporaryVerMap[info2.Ver] = info2
206
			}
207
		}
208
	}
209

210
	for verSeq := range mp.multiVersionList.TemporaryVerMap {
211
		for index, verInfo := range mp.multiVersionList.VerList {
212
			if verInfo.Ver == verSeq {
213
				log.LogInfof("checkVerList.updateVerList volname [%v] mp[%v] ver info %v be consider as TemporaryVer and do deletion verlist %v",
214
					mp.config.VolName, mp.config.PartitionId, verInfo, mp.multiVersionList.VerList)
215
				if index == len(mp.multiVersionList.VerList)-1 {
216
					log.LogInfof("checkVerList.updateVerList volname [%v] mp[%v] last ver info %v should not be consider as TemporaryVer and do deletion verlist %v",
217
						mp.config.VolName, mp.config.PartitionId, verInfo, mp.multiVersionList.VerList)
218
					return
219
				} else {
220
					mp.multiVersionList.VerList = append(mp.multiVersionList.VerList[:index], mp.multiVersionList.VerList[index+1:]...)
221
				}
222

223
				log.LogInfof("checkVerList.updateVerList volname [%v] mp[%v] verlist %v", mp.config.VolName, mp.config.PartitionId, mp.multiVersionList.VerList)
224
				break
225
			}
226
		}
227
	}
228
	return
229
}
230

231
func (mp *metaPartition) checkVerList(reqVerListInfo *proto.VolVersionInfoList, sync bool) (needUpdate bool, err error) {
232
	mp.multiVersionList.RWLock.RLock()
233
	verMapLocal := make(map[uint64]*proto.VolVersionInfo)
234
	verMapReq := make(map[uint64]*proto.VolVersionInfo)
235
	for _, ver := range reqVerListInfo.VerList {
236
		verMapReq[ver.Ver] = ver
237
	}
238

239
	var VerList []*proto.VolVersionInfo
240

241
	for _, info2 := range mp.multiVersionList.VerList {
242
		log.LogDebugf("checkVerList. volname [%v] mp[%v] ver info %v", mp.config.VolName, mp.config.PartitionId, info2)
243
		vms, exist := verMapReq[info2.Ver]
244
		if !exist {
245
			log.LogWarnf("checkVerList. volname [%v] mp[%v] version info(%v) not exist in master (%v)",
246
				mp.config.VolName, mp.config.PartitionId, info2, reqVerListInfo.VerList)
247
		} else if info2.Status != proto.VersionNormal && info2.Status != vms.Status {
248
			log.LogWarnf("checkVerList. volname [%v] mp[%v] ver [%v] status abnormal %v", mp.config.VolName, mp.config.PartitionId, info2.Ver, info2.Status)
249
			info2.Status = vms.Status
250
			needUpdate = true
251
		}
252

253
		if _, ok := verMapLocal[info2.Ver]; !ok {
254
			verMapLocal[info2.Ver] = info2
255
			VerList = append(VerList, info2)
256
		}
257
	}
258
	mp.multiVersionList.RWLock.RUnlock()
259

260
	for _, vInfo := range reqVerListInfo.VerList {
261
		if vInfo.Status != proto.VersionNormal && vInfo.Status != proto.VersionPrepare {
262
			log.LogDebugf("checkVerList. volname [%v] mp[%v] master info %v", mp.config.VolName, mp.config.PartitionId, vInfo)
263
			continue
264
		}
265
		ver, exist := verMapLocal[vInfo.Ver]
266
		if !exist {
267
			expStr := fmt.Sprintf("checkVerList.volname [%v] mp[%v] not found %v in mp list and append version %v",
268
				mp.config.VolName, mp.config.PartitionId, vInfo.Ver, vInfo)
269
			log.LogWarnf("[checkVerList] volname [%v]", expStr)
270
			if vInfo.Ver < mp.multiVersionList.GetLastVer() {
271
				continue
272
			}
273
			exporter.Warning(expStr)
274
			VerList = append(VerList, vInfo)
275
			needUpdate = true
276
			verMapLocal[vInfo.Ver] = vInfo
277
			continue
278
		}
279
		if ver.Status != vInfo.Status {
280
			warn := fmt.Sprintf("checkVerList.volname [%v] mp[%v] ver [%v] inoraml.local status [%v] update to %v",
281
				mp.config.VolName, mp.config.PartitionId, vInfo.Status, vInfo.Ver, vInfo.Status)
282
			log.LogWarn(warn)
283
			ver.Status = vInfo.Status
284
		}
285
	}
286
	if needUpdate {
287
		var lastSeq uint64
288
		sort.SliceStable(VerList, func(i, j int) bool {
289
			if VerList[i].Ver < VerList[j].Ver {
290
				lastSeq = VerList[j].Ver
291
				return true
292
			}
293
			lastSeq = VerList[i].Ver
294
			return false
295
		})
296
		if err = mp.HandleVersionOp(proto.SyncBatchVersionList, lastSeq, VerList, sync); err != nil {
297
			return
298
		}
299
	}
300
	return
301
}
302

303
func (mp *metaPartition) HandleVersionOp(op uint8, verSeq uint64, verList []*proto.VolVersionInfo, sync bool) (err error) {
304
	verData := &VerOpData{
305
		Op:      op,
306
		VerSeq:  verSeq,
307
		VerList: verList,
308
	}
309
	data, _ := json.Marshal(verData)
310
	if sync {
311
		_, err = mp.submit(opFSMVersionOp, data)
312
		return
313
	}
314
	select {
315
	case mp.verUpdateChan <- data:
316
		log.LogDebugf("mp[%v] verseq [%v] op [%v] be pushed to queue", mp.config.PartitionId, verSeq, op)
317
	default:
318
		err = fmt.Errorf("mp[%v] version update channel full, verdata %v not be executed", mp.config.PartitionId, string(data))
319
	}
320
	return
321
}
322

323
func (mp *metaPartition) GetAllVersionInfo(req *proto.MultiVersionOpRequest, p *Packet) (err error) {
324
	return
325
}
326

327
func (mp *metaPartition) GetSpecVersionInfo(req *proto.MultiVersionOpRequest, p *Packet) (err error) {
328
	return
329
}
330

331
func (mp *metaPartition) GetExtentByVer(ino *Inode, req *proto.GetExtentsRequest, rsp *proto.GetExtentsResponse) {
332
	log.LogInfof("action[GetExtentByVer] read ino[%v] readseq [%v] ino seq [%v] hist len %v", ino.Inode, req.VerSeq, ino.getVer(), ino.getLayerLen())
333
	reqVer := req.VerSeq
334
	if isInitSnapVer(req.VerSeq) {
335
		reqVer = 0
336
	}
337
	ino.DoReadFunc(func() {
338
		ino.Extents.Range(func(_ int, ek proto.ExtentKey) bool {
339
			if ek.GetSeq() <= reqVer {
340
				rsp.Extents = append(rsp.Extents, ek)
341
				log.LogInfof("action[GetExtentByVer] fresh layer.read ino[%v] readseq [%v] ino seq [%v] include ek [%v]", ino.Inode, reqVer, ino.getVer(), ek)
342
			} else {
343
				log.LogInfof("action[GetExtentByVer] fresh layer.read ino[%v] readseq [%v] ino seq [%v] exclude ek [%v]", ino.Inode, reqVer, ino.getVer(), ek)
344
			}
345
			return true
346
		})
347
		ino.RangeMultiVer(func(idx int, snapIno *Inode) bool {
348
			log.LogInfof("action[GetExtentByVer] read ino[%v] readseq [%v] snapIno ino seq [%v]", ino.Inode, reqVer, snapIno.getVer())
349
			for _, ek := range snapIno.Extents.eks {
350
				if reqVer >= ek.GetSeq() {
351
					log.LogInfof("action[GetExtentByVer] get extent ino[%v] readseq [%v] snapIno ino seq [%v], include ek (%v)", ino.Inode, reqVer, snapIno.getVer(), ek.String())
352
					rsp.Extents = append(rsp.Extents, ek)
353
				} else {
354
					log.LogInfof("action[GetExtentByVer] not get extent ino[%v] readseq [%v] snapIno ino seq [%v], exclude ek (%v)", ino.Inode, reqVer, snapIno.getVer(), ek.String())
355
				}
356
			}
357
			if reqVer >= snapIno.getVer() {
358
				log.LogInfof("action[GetExtentByVer] finish read ino[%v] readseq [%v] snapIno ino seq [%v]", ino.Inode, reqVer, snapIno.getVer())
359
				return false
360
			}
361
			return true
362
		})
363
		sort.SliceStable(rsp.Extents, func(i, j int) bool {
364
			return rsp.Extents[i].FileOffset < rsp.Extents[j].FileOffset
365
		})
366
	})
367

368
	return
369
}
370

371
func (mp *metaPartition) SetUidLimit(info []*proto.UidSpaceInfo) {
372
	mp.uidManager.volName = mp.config.VolName
373
	mp.uidManager.setUidAcl(info)
374
}
375

376
func (mp *metaPartition) GetUidInfo() (info []*proto.UidReportSpaceInfo) {
377
	return mp.uidManager.getAllUidSpace()
378
}
379

380
// ExtentsList returns the list of extents.
381
func (mp *metaPartition) ExtentsList(req *proto.GetExtentsRequest, p *Packet) (err error) {
382
	log.LogDebugf("action[ExtentsList] inode[%v] verseq [%v]", req.Inode, req.VerSeq)
383

384
	// note:don't need set reqSeq, extents get be done in next step
385
	ino := NewInode(req.Inode, 0)
386
	retMsg := mp.getInodeTopLayer(ino)
387

388
	// notice.getInode should not set verSeq due to extent need filter from the newest layer to req.VerSeq
389
	ino = retMsg.Msg
390
	var (
391
		reply  []byte
392
		status = retMsg.Status
393
	)
394

395
	if status == proto.OpOk {
396
		resp := &proto.GetExtentsResponse{}
397
		log.LogInfof("action[ExtentsList] inode[%v] request verseq [%v] ino ver [%v] extent size %v ino.Size %v ino[%v] hist len %v",
398
			req.Inode, req.VerSeq, ino.getVer(), len(ino.Extents.eks), ino.Size, ino, ino.getLayerLen())
399

400
		if req.VerSeq > 0 && ino.getVer() > 0 && (req.VerSeq < ino.getVer() || isInitSnapVer(req.VerSeq)) {
401
			mp.GetExtentByVer(ino, req, resp)
402
			vIno := ino.Copy().(*Inode)
403
			vIno.setVerNoCheck(req.VerSeq)
404
			if vIno = mp.getInodeByVer(vIno); vIno != nil {
405
				resp.Generation = vIno.Generation
406
				resp.Size = vIno.Size
407
			}
408
		} else {
409
			ino.DoReadFunc(func() {
410
				resp.Generation = ino.Generation
411
				resp.Size = ino.Size
412
				ino.Extents.Range(func(_ int, ek proto.ExtentKey) bool {
413
					resp.Extents = append(resp.Extents, ek)
414
					log.LogInfof("action[ExtentsList] append ek [%v]", ek)
415
					return true
416
				})
417
			})
418
		}
419
		if req.VerAll {
420
			resp.LayerInfo = retMsg.Msg.getAllLayerEks()
421
		}
422
		reply, err = json.Marshal(resp)
423
		if err != nil {
424
			status = proto.OpErr
425
			reply = []byte(err.Error())
426
		}
427
	}
428
	p.PacketErrorWithBody(status, reply)
429
	return
430
}
431

432
// ObjExtentsList returns the list of obj extents and extents.
433
func (mp *metaPartition) ObjExtentsList(req *proto.GetExtentsRequest, p *Packet) (err error) {
434
	ino := NewInode(req.Inode, 0)
435
	ino.setVer(req.VerSeq)
436
	retMsg := mp.getInode(ino, false)
437
	ino = retMsg.Msg
438
	var (
439
		reply  []byte
440
		status = retMsg.Status
441
	)
442
	if status == proto.OpOk {
443
		resp := &proto.GetObjExtentsResponse{}
444
		ino.DoReadFunc(func() {
445
			resp.Generation = ino.Generation
446
			resp.Size = ino.Size
447
			ino.Extents.Range(func(_ int, ek proto.ExtentKey) bool {
448
				resp.Extents = append(resp.Extents, ek)
449
				return true
450
			})
451
			ino.ObjExtents.Range(func(ek proto.ObjExtentKey) bool {
452
				resp.ObjExtents = append(resp.ObjExtents, ek)
453
				return true
454
			})
455
		})
456

457
		reply, err = json.Marshal(resp)
458
		if err != nil {
459
			status = proto.OpErr
460
			reply = []byte(err.Error())
461
		}
462
	}
463
	p.PacketErrorWithBody(status, reply)
464
	return
465
}
466

467
// ExtentsTruncate truncates an extent.
468
func (mp *metaPartition) ExtentsTruncate(req *ExtentsTruncateReq, p *Packet, remoteAddr string) (err error) {
469
	if !proto.IsHot(mp.volType) {
470
		err = fmt.Errorf("only support hot vol")
471
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
472
		return
473
	}
474
	fileSize := uint64(0)
475
	start := time.Now()
476
	if mp.IsEnableAuditLog() {
477
		defer func() {
478
			auditlog.LogInodeOp(remoteAddr, mp.GetVolName(), p.GetOpMsg(), req.GetFullPath(), err, time.Since(start).Milliseconds(), req.Inode, fileSize)
479
		}()
480
	}
481
	ino := NewInode(req.Inode, proto.Mode(os.ModePerm))
482
	item := mp.inodeTree.CopyGet(ino)
483
	if item == nil {
484
		err = fmt.Errorf("inode[%v] is not exist", req.Inode)
485
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
486
		return
487
	}
488
	i := item.(*Inode)
489
	status := mp.isOverQuota(req.Inode, req.Size > i.Size, false)
490
	if status != 0 {
491
		log.LogErrorf("ExtentsTruncate fail status [%v]", status)
492
		err = errors.New("ExtentsTruncate is over quota")
493
		reply := []byte(err.Error())
494
		p.PacketErrorWithBody(status, reply)
495
		return
496
	}
497

498
	ino.Size = req.Size
499
	fileSize = ino.Size
500
	ino.setVer(mp.verSeq)
501
	val, err := ino.Marshal()
502
	if err != nil {
503
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
504
		return
505
	}
506
	resp, err := mp.submit(opFSMExtentTruncate, val)
507
	if err != nil {
508
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
509
		return
510
	}
511
	msg := resp.(*InodeResponse)
512
	p.PacketErrorWithBody(msg.Status, nil)
513
	return
514
}
515

516
func (mp *metaPartition) BatchExtentAppend(req *proto.AppendExtentKeysRequest, p *Packet) (err error) {
517
	if !proto.IsHot(mp.volType) {
518
		err = fmt.Errorf("only support hot vol")
519
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
520
		return
521
	}
522

523
	var ino *Inode
524
	if ino, _, err = mp.CheckQuota(req.Inode, p); err != nil {
525
		log.LogErrorf("BatchExtentAppend fail err [%v]", err)
526
		return
527
	}
528

529
	extents := req.Extents
530
	for _, extent := range extents {
531
		ino.Extents.Append(extent)
532
	}
533
	val, err := ino.Marshal()
534
	if err != nil {
535
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
536
		return
537
	}
538
	resp, err := mp.submit(opFSMExtentsAdd, val)
539
	if err != nil {
540
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
541
		return
542
	}
543
	p.PacketErrorWithBody(resp.(uint8), nil)
544
	return
545
}
546

547
func (mp *metaPartition) BatchObjExtentAppend(req *proto.AppendObjExtentKeysRequest, p *Packet) (err error) {
548
	var ino *Inode
549
	if ino, _, err = mp.CheckQuota(req.Inode, p); err != nil {
550
		log.LogErrorf("BatchObjExtentAppend fail status [%v]", err)
551
		return
552
	}
553

554
	objExtents := req.Extents
555
	for _, objExtent := range objExtents {
556
		err = ino.ObjExtents.Append(objExtent)
557
		if err != nil {
558
			p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
559
			return
560
		}
561
	}
562
	val, err := ino.Marshal()
563
	if err != nil {
564
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
565
		return
566
	}
567
	resp, err := mp.submit(opFSMObjExtentsAdd, val)
568
	if err != nil {
569
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
570
		return
571
	}
572
	p.PacketErrorWithBody(resp.(uint8), nil)
573
	return
574
}
575

576
// func (mp *metaPartition) ExtentsDelete(req *proto.DelExtentKeyRequest, p *Packet) (err error) {
577
// 	ino := NewInode(req.Inode, 0)
578
// 	inode := mp.inodeTree.Get(ino).(*Inode)
579
// 	inode.Extents.Delete(req.Extents)
580
// 	curTime := timeutil.GetCurrentTimeUnix()
581
// 	if inode.ModifyTime < curTime {
582
// 		inode.ModifyTime = curTime
583
// 	}
584
// 	val, err := inode.Marshal()
585
// 	if err != nil {
586
// 		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
587
// 		return
588
// 	}
589
// 	resp, err := mp.submit(opFSMExtentsDel, val)
590
// 	if err != nil {
591
// 		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
592
// 		return
593
// 	}
594
// 	p.PacketErrorWithBody(resp.(uint8), nil)
595
// 	return
596
// }
597

598
// ExtentsEmpty only use in datalake situation
599
func (mp *metaPartition) ExtentsOp(p *Packet, ino *Inode, op uint32) (err error) {
600
	val, err := ino.Marshal()
601
	if err != nil {
602
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
603
		return
604
	}
605
	resp, err := mp.submit(op, val)
606
	if err != nil {
607
		p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
608
		return
609
	}
610
	p.PacketErrorWithBody(resp.(uint8), nil)
611
	return
612
}
613

614
func (mp *metaPartition) sendExtentsToChan(eks []proto.ExtentKey) (err error) {
615
	if len(eks) == 0 {
616
		return
617
	}
618

619
	sortExts := NewSortedExtentsFromEks(eks)
620
	val, err := sortExts.MarshalBinary(true)
621
	if err != nil {
622
		return fmt.Errorf("[delExtents] marshal binary fail, %s", err.Error())
623
	}
624

625
	_, err = mp.submit(opFSMSentToChan, val)
626

627
	return
628
}
629

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.