cubefs

Форк
0
/
meta_quota_manager.go 
350 строк · 9.9 Кб
1
// Copyright 2023 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package metanode
16

17
import (
18
	"bytes"
19
	"encoding/binary"
20
	"sync"
21

22
	"github.com/cubefs/cubefs/proto"
23
	"github.com/cubefs/cubefs/util/log"
24
)
25

26
type MetaQuotaManager struct {
27
	statisticTemp        *sync.Map // key quotaId, value proto.QuotaUsedInfo
28
	statisticBase        *sync.Map // key quotaId, value proto.QuotaUsedInfo
29
	statisticRebuildTemp *sync.Map // key quotaId, value proto.QuotaUsedInfo
30
	statisticRebuildBase *sync.Map // key quotaId, value proto.QuotaUsedInfo
31
	limitedMap           *sync.Map
32
	rbuilding            bool
33
	volName              string
34
	rwlock               sync.RWMutex
35
	mpID                 uint64
36
	enable               bool
37
}
38

39
type MetaQuotaInode struct {
40
	inode    *Inode
41
	quotaIds []uint32
42
}
43

44
type TxMetaQuotaInode struct {
45
	txinode  *TxInode
46
	quotaIds []uint32
47
}
48

49
func NewQuotaManager(volName string, mpId uint64) (mqMgr *MetaQuotaManager) {
50
	mqMgr = &MetaQuotaManager{
51
		statisticTemp:        new(sync.Map),
52
		statisticBase:        new(sync.Map),
53
		statisticRebuildTemp: new(sync.Map),
54
		statisticRebuildBase: new(sync.Map),
55
		limitedMap:           new(sync.Map),
56
		volName:              volName,
57
		mpID:                 mpId,
58
	}
59
	return
60
}
61

62
func (qInode *MetaQuotaInode) Marshal() (result []byte, err error) {
63
	var inodeBytes []byte
64
	quotaBytes := bytes.NewBuffer(make([]byte, 0, 128))
65
	buff := bytes.NewBuffer(make([]byte, 0, 128))
66
	inodeBytes, err = qInode.inode.Marshal()
67
	if err != nil {
68
		return
69
	}
70
	inodeLen := uint32(len(inodeBytes))
71
	if err = binary.Write(buff, binary.BigEndian, inodeLen); err != nil {
72
		return
73
	}
74
	buff.Write(inodeBytes)
75
	for _, quotaId := range qInode.quotaIds {
76
		if err = binary.Write(quotaBytes, binary.BigEndian, quotaId); err != nil {
77
			return
78
		}
79
	}
80
	buff.Write(quotaBytes.Bytes())
81
	result = buff.Bytes()
82
	log.LogDebugf("MetaQuotaInode Marshal inode[%v] inodeLen [%v] size [%v]", qInode.inode.Inode, inodeLen, len(result))
83
	return
84
}
85

86
func (qInode *MetaQuotaInode) Unmarshal(raw []byte) (err error) {
87
	var inodeLen uint32
88
	var quotaId uint32
89
	buff := bytes.NewBuffer(raw)
90
	if err = binary.Read(buff, binary.BigEndian, &inodeLen); err != nil {
91
		return
92
	}
93
	inodeBytes := make([]byte, inodeLen)
94
	if _, err = buff.Read(inodeBytes); err != nil {
95
		return
96
	}
97
	log.LogDebugf("MetaQuotaInode Unmarshal inodeLen [%v] size [%v]", inodeBytes, len(raw))
98
	qInode.inode = NewInode(0, 0)
99
	if err = qInode.inode.Unmarshal(inodeBytes); err != nil {
100
		return
101
	}
102
	for {
103
		if buff.Len() == 0 {
104
			break
105
		}
106
		if err = binary.Read(buff, binary.BigEndian, &quotaId); err != nil {
107
			return
108
		}
109
		qInode.quotaIds = append(qInode.quotaIds, quotaId)
110
	}
111
	return
112
}
113

114
func (qInode *TxMetaQuotaInode) Marshal() (result []byte, err error) {
115
	var inodeBytes []byte
116
	quotaBytes := bytes.NewBuffer(make([]byte, 0, 128))
117
	buff := bytes.NewBuffer(make([]byte, 0, 128))
118
	inodeBytes, err = qInode.txinode.Marshal()
119
	if err != nil {
120
		return
121
	}
122
	inodeLen := uint32(len(inodeBytes))
123
	if err = binary.Write(buff, binary.BigEndian, inodeLen); err != nil {
124
		return
125
	}
126
	buff.Write(inodeBytes)
127
	for _, quotaId := range qInode.quotaIds {
128
		if err = binary.Write(quotaBytes, binary.BigEndian, quotaId); err != nil {
129
			return
130
		}
131
	}
132
	buff.Write(quotaBytes.Bytes())
133
	result = buff.Bytes()
134
	log.LogDebugf("TxMetaQuotaInode Marshal inode[%v] inodeLen [%v] size [%v]", qInode.txinode.Inode.Inode, inodeLen, len(result))
135
	return
136
}
137

138
func (qInode *TxMetaQuotaInode) Unmarshal(raw []byte) (err error) {
139
	var inodeLen uint32
140
	var quotaId uint32
141
	buff := bytes.NewBuffer(raw)
142
	if err = binary.Read(buff, binary.BigEndian, &inodeLen); err != nil {
143
		return
144
	}
145
	inodeBytes := make([]byte, inodeLen)
146
	if _, err = buff.Read(inodeBytes); err != nil {
147
		return
148
	}
149
	log.LogDebugf("TxMetaQuotaInode Unmarshal inodeLen [%v] size [%v]", inodeBytes, len(raw))
150
	qInode.txinode = NewTxInode(0, 0, nil)
151
	if err = qInode.txinode.Unmarshal(inodeBytes); err != nil {
152
		return
153
	}
154
	for {
155
		if buff.Len() == 0 {
156
			break
157
		}
158
		if err = binary.Read(buff, binary.BigEndian, &quotaId); err != nil {
159
			return
160
		}
161
		qInode.quotaIds = append(qInode.quotaIds, quotaId)
162
	}
163
	return
164
}
165

166
func (mqMgr *MetaQuotaManager) setQuotaHbInfo(infos []*proto.QuotaHeartBeatInfo) {
167
	mqMgr.rwlock.Lock()
168
	defer mqMgr.rwlock.Unlock()
169

170
	for _, info := range infos {
171
		if mqMgr.volName != info.VolName {
172
			continue
173
		}
174
		mqMgr.enable = info.Enable
175
		mqMgr.limitedMap.Store(info.QuotaId, info.LimitedInfo)
176
		log.LogDebugf("mp[%v] quotaId [%v] limitedInfo [%v]", mqMgr.mpID, info.QuotaId, info.LimitedInfo)
177
	}
178
	mqMgr.limitedMap.Range(func(key, value interface{}) bool {
179
		quotaId := key.(uint32)
180
		found := false
181

182
		for _, info := range infos {
183
			if mqMgr.volName != info.VolName {
184
				continue
185
			}
186
			if info.QuotaId == quotaId {
187
				found = true
188
				break
189
			}
190
		}
191

192
		if !found {
193
			mqMgr.limitedMap.Delete(quotaId)
194
		}
195
		return true
196
	})
197
	return
198
}
199

200
func (mqMgr *MetaQuotaManager) getQuotaReportInfos() (infos []*proto.QuotaReportInfo) {
201
	mqMgr.rwlock.Lock()
202
	defer mqMgr.rwlock.Unlock()
203
	var usedInfo proto.QuotaUsedInfo
204
	mqMgr.statisticTemp.Range(func(key, value interface{}) bool {
205
		usedInfo = value.(proto.QuotaUsedInfo)
206
		if value, isFind := mqMgr.statisticBase.Load(key.(uint32)); isFind {
207
			baseInfo := value.(proto.QuotaUsedInfo)
208
			log.LogDebugf("[getQuotaReportInfos] statisticTemp mp[%v] key [%v] usedInfo [%v] baseInfo [%v]", mqMgr.mpID,
209
				key.(uint32), usedInfo, baseInfo)
210
			usedInfo.Add(&baseInfo)
211
			if usedInfo.UsedFiles < 0 {
212
				log.LogWarnf("[getQuotaReportInfos] statisticTemp mp[%v] key [%v] usedInfo [%v]", mqMgr.mpID, key.(uint32), usedInfo)
213
				usedInfo.UsedFiles = 0
214
			}
215
			if usedInfo.UsedBytes < 0 {
216
				log.LogWarnf("[getQuotaReportInfos] statisticTemp mp[%v] key [%v] usedInfo [%v]", mqMgr.mpID, key.(uint32), usedInfo)
217
				usedInfo.UsedBytes = 0
218
			}
219
		}
220
		mqMgr.statisticBase.Store(key.(uint32), usedInfo)
221
		return true
222
	})
223
	mqMgr.statisticTemp = new(sync.Map)
224
	mqMgr.statisticBase.Range(func(key, value interface{}) bool {
225
		quotaId := key.(uint32)
226
		if _, ok := mqMgr.limitedMap.Load(quotaId); !ok {
227
			return true
228
		}
229
		usedInfo = value.(proto.QuotaUsedInfo)
230
		reportInfo := &proto.QuotaReportInfo{
231
			QuotaId:  quotaId,
232
			UsedInfo: usedInfo,
233
		}
234
		infos = append(infos, reportInfo)
235
		log.LogDebugf("[getQuotaReportInfos] statisticBase mp[%v] key [%v] usedInfo [%v]", mqMgr.mpID, key.(uint32), usedInfo)
236
		return true
237
	})
238
	return
239
}
240

241
func (mqMgr *MetaQuotaManager) statisticRebuildStart() bool {
242
	mqMgr.rwlock.Lock()
243
	defer mqMgr.rwlock.Unlock()
244
	if !mqMgr.enable {
245
		return false
246
	}
247

248
	if mqMgr.rbuilding {
249
		return false
250
	}
251
	mqMgr.rbuilding = true
252
	return true
253
}
254

255
func (mqMgr *MetaQuotaManager) statisticRebuildFin(rebuild bool) {
256
	mqMgr.rwlock.Lock()
257
	defer mqMgr.rwlock.Unlock()
258
	mqMgr.rbuilding = false
259
	if !rebuild {
260
		mqMgr.statisticRebuildBase = new(sync.Map)
261
		mqMgr.statisticRebuildTemp = new(sync.Map)
262
		return
263
	}
264
	mqMgr.statisticBase = mqMgr.statisticRebuildBase
265
	mqMgr.statisticTemp = mqMgr.statisticRebuildTemp
266
	mqMgr.statisticRebuildBase = new(sync.Map)
267
	mqMgr.statisticRebuildTemp = new(sync.Map)
268

269
	if log.EnableInfo() {
270
		mqMgr.statisticTemp.Range(func(key, value interface{}) bool {
271
			quotaId := key.(uint32)
272
			usedInfo := value.(proto.QuotaUsedInfo)
273
			log.LogInfof("statisticRebuildFin statisticTemp  mp[%v] quotaId [%v] usedInfo [%v]", mqMgr.mpID, quotaId, usedInfo)
274
			return true
275
		})
276
		mqMgr.statisticBase.Range(func(key, value interface{}) bool {
277
			quotaId := key.(uint32)
278
			usedInfo := value.(proto.QuotaUsedInfo)
279
			log.LogInfof("statisticRebuildFin statisticBase  mp[%v] quotaId [%v] usedInfo [%v]", mqMgr.mpID, quotaId, usedInfo)
280
			return true
281
		})
282
	}
283
}
284

285
func (mqMgr *MetaQuotaManager) IsOverQuota(size bool, files bool, quotaId uint32) (status uint8) {
286
	var limitedInfo proto.QuotaLimitedInfo
287
	mqMgr.rwlock.RLock()
288
	defer mqMgr.rwlock.RUnlock()
289
	if !mqMgr.enable {
290
		log.LogInfof("IsOverQuota quota [%v] is disable.", quotaId)
291
		return
292
	}
293
	value, isFind := mqMgr.limitedMap.Load(quotaId)
294
	if isFind {
295
		limitedInfo = value.(proto.QuotaLimitedInfo)
296
		if size && limitedInfo.LimitedBytes {
297
			status = proto.OpNoSpaceErr
298
		}
299

300
		if files && limitedInfo.LimitedFiles {
301
			status = proto.OpNoSpaceErr
302
		}
303
	}
304
	log.LogInfof("IsOverQuota quotaId [%v] limitedInfo[%v] status [%v] isFind [%v]", quotaId, limitedInfo, status, isFind)
305
	return
306
}
307

308
func (mqMgr *MetaQuotaManager) updateUsedInfo(size int64, files int64, quotaId uint32) {
309
	var baseInfo proto.QuotaUsedInfo
310
	var baseTemp proto.QuotaUsedInfo
311
	mqMgr.rwlock.Lock()
312
	defer mqMgr.rwlock.Unlock()
313

314
	value, isFind := mqMgr.statisticTemp.Load(quotaId)
315
	if isFind {
316
		baseInfo = value.(proto.QuotaUsedInfo)
317
	}
318
	baseInfo.UsedBytes += size
319
	baseInfo.UsedFiles += files
320
	mqMgr.statisticTemp.Store(quotaId, baseInfo)
321
	if mqMgr.rbuilding {
322
		value, isFind = mqMgr.statisticRebuildTemp.Load(quotaId)
323
		if isFind {
324
			baseTemp = value.(proto.QuotaUsedInfo)
325
		} else {
326
			baseTemp.UsedBytes = 0
327
			baseTemp.UsedFiles = 0
328
		}
329
		baseTemp.UsedBytes += size
330
		baseTemp.UsedFiles += files
331
		mqMgr.statisticRebuildTemp.Store(quotaId, baseTemp)
332
	}
333
	log.LogDebugf("updateUsedInfo mpId [%v] quotaId [%v] baseInfo [%v] baseTemp[%v]", mqMgr.mpID, quotaId, baseInfo, baseTemp)
334
	return
335
}
336

337
func (mqMgr *MetaQuotaManager) EnableQuota() bool {
338
	return mqMgr.enable
339
}
340

341
func (mqMgr *MetaQuotaManager) getUsedInfoForTest(quotaId uint32) (size int64, files int64) {
342
	mqMgr.rwlock.Lock()
343
	defer mqMgr.rwlock.Unlock()
344
	var baseInfo proto.QuotaUsedInfo
345
	value, isFind := mqMgr.statisticTemp.Load(quotaId)
346
	if isFind {
347
		baseInfo = value.(proto.QuotaUsedInfo)
348
	}
349
	return baseInfo.UsedBytes, baseInfo.UsedFiles
350
}
351

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.