cubefs

Форк
0
/
partition_op_quota.go 
298 строк · 8.9 Кб
1
// Copyright 2023 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package metanode
16

17
import (
18
	"encoding/json"
19

20
	"github.com/cubefs/cubefs/proto"
21
	"github.com/cubefs/cubefs/util/log"
22
)
23

24
func (mp *metaPartition) batchSetInodeQuota(req *proto.BatchSetMetaserverQuotaReuqest,
25
	resp *proto.BatchSetMetaserverQuotaResponse) (err error) {
26
	if len(req.Inodes) == 0 {
27
		return nil
28
	}
29

30
	val, err := json.Marshal(req)
31
	if err != nil {
32
		log.LogErrorf("batchSetInodeQuota marshal req [%v] failed [%v]", req, err)
33
		return
34
	}
35

36
	r, err := mp.submit(opFSMSetInodeQuotaBatch, val)
37
	if err != nil {
38
		log.LogErrorf("batchSetInodeQuota submit req [%v] failed [%v]", req, err)
39
		return
40
	}
41
	resp.InodeRes = r.(*proto.BatchSetMetaserverQuotaResponse).InodeRes
42
	log.LogInfof("batchSetInodeQuota quotaId [%v] mp[%v] btreeLen [%v] resp [%v] success", req.QuotaId, mp.config.PartitionId,
43
		mp.extendTree.Len(), resp)
44
	return
45
}
46

47
func (mp *metaPartition) batchDeleteInodeQuota(req *proto.BatchDeleteMetaserverQuotaReuqest,
48
	resp *proto.BatchDeleteMetaserverQuotaResponse) (err error) {
49
	if len(req.Inodes) == 0 {
50
		return nil
51
	}
52

53
	val, err := json.Marshal(req)
54
	if err != nil {
55
		log.LogErrorf("batchDeleteInodeQuota marshal req [%v] failed [%v]", req, err)
56
		return
57
	}
58

59
	r, err := mp.submit(opFSMDeleteInodeQuotaBatch, val)
60
	if err != nil {
61
		log.LogErrorf("batchDeleteInodeQuota submit req [%v] failed [%v]", req, err)
62
		return
63
	}
64
	resp.InodeRes = r.(*proto.BatchDeleteMetaserverQuotaResponse).InodeRes
65
	log.LogInfof("batchSetInodeQuota quotaId [%v] mp[%v] btreeLen [%v] resp [%v] success", req.QuotaId, mp.config.PartitionId,
66
		mp.extendTree.Len(), resp)
67
	return
68
}
69

70
func (mp *metaPartition) setQuotaHbInfo(infos []*proto.QuotaHeartBeatInfo) {
71
	mp.mqMgr.setQuotaHbInfo(infos)
72
	return
73
}
74

75
func (mp *metaPartition) getQuotaReportInfos() (infos []*proto.QuotaReportInfo) {
76
	return mp.mqMgr.getQuotaReportInfos()
77
}
78

79
func (mp *metaPartition) statisticExtendByLoad(extend *Extend) {
80
	mqMgr := mp.mqMgr
81
	ino := NewInode(extend.GetInode(), 0)
82
	retMsg := mp.getInode(ino, true)
83
	if retMsg.Status != proto.OpOk {
84
		log.LogErrorf("statisticExtendByLoad get inode[%v] fail [%v].", extend.GetInode(), retMsg.Status)
85
		return
86
	}
87
	ino = retMsg.Msg
88
	if ino.NLink == 0 {
89
		return
90
	}
91
	quotaIds, isFind := mp.isExistQuota(extend.GetInode())
92
	if isFind {
93
		mqMgr.rwlock.Lock()
94
		defer mqMgr.rwlock.Unlock()
95
		for _, quotaId := range quotaIds {
96
			var baseInfo proto.QuotaUsedInfo
97
			value, isFind := mqMgr.statisticBase.Load(quotaId)
98
			if isFind {
99
				baseInfo = value.(proto.QuotaUsedInfo)
100
			}
101
			baseInfo.UsedBytes += int64(ino.Size)
102
			baseInfo.UsedFiles += 1
103
			mqMgr.statisticBase.Store(quotaId, baseInfo)
104
			log.LogDebugf("[statisticExtendByLoad] quotaId [%v] baseInfo [%v]", quotaId, baseInfo)
105

106
		}
107
	}
108
	log.LogInfof("statisticExtendByLoad ino[%v] isFind [%v].", ino.Inode, isFind)
109
	return
110
}
111

112
func (mp *metaPartition) statisticExtendByStore(extend *Extend, inodeTree *BTree) {
113
	mqMgr := mp.mqMgr
114
	ino := NewInode(extend.GetInode(), 0)
115

116
	retMsg := mp.getInode(ino, true)
117
	if retMsg.Status != proto.OpOk {
118
		log.LogErrorf("statisticExtendByStore get inode[%v] fail [%v].", extend.GetInode(), retMsg.Status)
119
		return
120
	}
121
	ino = retMsg.Msg
122
	if ino.NLink == 0 {
123
		return
124
	}
125
	value, exist := extend.Get([]byte(proto.QuotaKey))
126
	if !exist {
127
		log.LogDebugf("statisticExtendByStore get quota key failed, mp[%v] inode[%v]", mp.config.PartitionId, extend.GetInode())
128
		return
129
	}
130
	quotaInfos := &proto.MetaQuotaInfos{
131
		QuotaInfoMap: make(map[uint32]*proto.MetaQuotaInfo),
132
	}
133
	if err := json.Unmarshal(value, &quotaInfos.QuotaInfoMap); err != nil {
134
		log.LogErrorf("statisticExtendByStore inode[%v] Unmarshal quotaInfos fail [%v]", extend.GetInode(), err)
135
		return
136
	}
137
	mqMgr.rwlock.Lock()
138
	defer mqMgr.rwlock.Unlock()
139
	for quotaId := range quotaInfos.QuotaInfoMap {
140
		var baseInfo proto.QuotaUsedInfo
141
		value, isFind := mqMgr.statisticRebuildBase.Load(quotaId)
142
		if isFind {
143
			baseInfo = value.(proto.QuotaUsedInfo)
144
		}
145
		baseInfo.UsedBytes += int64(ino.Size)
146
		baseInfo.UsedFiles += 1
147
		mqMgr.statisticRebuildBase.Store(quotaId, baseInfo)
148
		log.LogDebugf("[statisticExtendByStore] mp[%v] quotaId [%v] inode[%v] baseInfo [%v]",
149
			mp.config.PartitionId, quotaId, extend.GetInode(), baseInfo)
150
	}
151
	log.LogDebugf("statisticExtendByStore mp[%v] inode[%v] success.", mp.config.PartitionId, extend.GetInode())
152
	return
153
}
154

155
func (mp *metaPartition) updateUsedInfo(size int64, files int64, ino uint64) {
156
	quotaIds, isFind := mp.isExistQuota(ino)
157
	if isFind {
158
		log.LogInfof("updateUsedInfo ino[%v] quotaIds [%v] size [%v] files [%v]", ino, quotaIds, size, files)
159
		for _, quotaId := range quotaIds {
160
			mp.mqMgr.updateUsedInfo(size, files, quotaId)
161
		}
162
	}
163
	return
164
}
165

166
func (mp *metaPartition) isExistQuota(ino uint64) (quotaIds []uint32, isFind bool) {
167
	extend := NewExtend(ino)
168
	treeItem := mp.extendTree.Get(extend)
169
	if treeItem == nil {
170
		isFind = false
171
		return
172
	}
173
	extend = treeItem.(*Extend)
174
	value, exist := extend.Get([]byte(proto.QuotaKey))
175
	if !exist {
176
		isFind = false
177
		return
178
	}
179
	quotaInfos := &proto.MetaQuotaInfos{
180
		QuotaInfoMap: make(map[uint32]*proto.MetaQuotaInfo),
181
	}
182
	if err := json.Unmarshal(value, &quotaInfos.QuotaInfoMap); err != nil {
183
		log.LogErrorf("set quota inode[%v] Unmarshal quotaInfos fail [%v]", ino, err)
184
		isFind = false
185
		return
186
	}
187
	isFind = true
188
	quotaInfos.RLock()
189
	for quotaId := range quotaInfos.QuotaInfoMap {
190
		quotaIds = append(quotaIds, quotaId)
191
	}
192
	quotaInfos.RUnlock()
193
	log.LogInfof("isExistQuota inode:[%v] quotaIds [%v] isFind[%v]", ino, quotaIds, isFind)
194
	return
195
}
196

197
func (mp *metaPartition) isOverQuota(ino uint64, size bool, files bool) (status uint8) {
198
	quotaIds, isFind := mp.isExistQuota(ino)
199
	if isFind {
200
		for _, quotaId := range quotaIds {
201
			status = mp.mqMgr.IsOverQuota(size, files, quotaId)
202
			if status != 0 {
203
				log.LogWarnf("isOverQuota ino[%v] quotaId [%v] size [%v] files[%v] status[%v]", ino, quotaId, size, files, status)
204
				return
205
			}
206
		}
207
	}
208
	return
209
}
210

211
func (mp *metaPartition) getInodeQuota(inode uint64, p *Packet) (err error) {
212
	extend := NewExtend(inode)
213
	quotaInfos := &proto.MetaQuotaInfos{
214
		QuotaInfoMap: make(map[uint32]*proto.MetaQuotaInfo),
215
	}
216
	var (
217
		value []byte
218
		exist bool
219
	)
220
	treeItem := mp.extendTree.CopyGet(extend)
221
	if treeItem == nil {
222
		goto handleRsp
223
	}
224
	extend = treeItem.(*Extend)
225

226
	value, exist = extend.Get([]byte(proto.QuotaKey))
227
	if exist {
228
		if err = json.Unmarshal(value, &quotaInfos.QuotaInfoMap); err != nil {
229
			log.LogErrorf("getInodeQuota inode[%v] Unmarshal quotaInfos fail [%v]", inode, err)
230
			p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
231
			return
232
		}
233
	}
234
handleRsp:
235
	response := &proto.GetInodeQuotaResponse{}
236
	log.LogInfof("getInodeQuota indoe %v ,map %v", inode, quotaInfos.QuotaInfoMap)
237
	response.MetaQuotaInfoMap = quotaInfos.QuotaInfoMap
238

239
	encoded, err := json.Marshal(response)
240
	if err != nil {
241
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
242
		return
243
	}
244
	p.PacketOkWithBody(encoded)
245
	return
246
}
247

248
func (mp *metaPartition) getInodeQuotaInfos(inode uint64) (quotaInfos map[uint32]*proto.MetaQuotaInfo, err error) {
249
	log.LogInfof("getInodeQuotaInfos mp[%v] treeLen[%v]", mp.config.PartitionId, mp.extendTree.Len())
250
	treeItem := mp.extendTree.Get(NewExtend(inode))
251
	if treeItem == nil {
252
		return
253
	}
254
	extend := treeItem.(*Extend)
255
	info := &proto.MetaQuotaInfos{
256
		QuotaInfoMap: make(map[uint32]*proto.MetaQuotaInfo),
257
	}
258
	value, exist := extend.Get([]byte(proto.QuotaKey))
259
	if exist {
260
		if err = json.Unmarshal(value, &info.QuotaInfoMap); err != nil {
261
			log.LogErrorf("getInodeQuota inode[%v] Unmarshal quotaInfos fail [%v]", inode, err)
262
			return
263
		}
264
		quotaInfos = info.QuotaInfoMap
265
	}
266
	log.LogInfof("getInodeQuotaInfos inode[%v] quotaInfos [%v] exist [%v]", inode, quotaInfos, exist)
267
	return
268
}
269

270
func (mp *metaPartition) setInodeQuota(quotaIds []uint32, inode uint64) {
271
	extend := NewExtend(inode)
272
	quotaInfos := &proto.MetaQuotaInfos{
273
		QuotaInfoMap: make(map[uint32]*proto.MetaQuotaInfo),
274
	}
275
	for _, quotaId := range quotaIds {
276
		quotaInfo := &proto.MetaQuotaInfo{
277
			RootInode: false,
278
		}
279
		quotaInfos.QuotaInfoMap[quotaId] = quotaInfo
280
	}
281
	value, err := json.Marshal(quotaInfos.QuotaInfoMap)
282
	if err != nil {
283
		log.LogErrorf("setInodeQuota marsha1 quotaInfos [%v] fail [%v]", quotaInfos, err)
284
		return
285
	}
286
	extend.Put([]byte(proto.QuotaKey), value, mp.verSeq)
287
	treeItem := mp.extendTree.CopyGet(extend)
288
	var e *Extend
289
	if treeItem == nil {
290
		mp.extendTree.ReplaceOrInsert(extend, true)
291
	} else {
292
		e = treeItem.(*Extend)
293
		e.Merge(extend, true)
294
	}
295

296
	log.LogInfof("setInodeQuota inode[%v] quota [%v] success.", inode, quotaIds)
297
	return
298
}
299

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.