1
// Copyright 2023 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
22
"github.com/cubefs/cubefs/proto"
23
"github.com/cubefs/cubefs/util/log"
26
type MetaQuotaManager struct {
27
statisticTemp *sync.Map // key quotaId, value proto.QuotaUsedInfo
28
statisticBase *sync.Map // key quotaId, value proto.QuotaUsedInfo
29
statisticRebuildTemp *sync.Map // key quotaId, value proto.QuotaUsedInfo
30
statisticRebuildBase *sync.Map // key quotaId, value proto.QuotaUsedInfo
39
type MetaQuotaInode struct {
44
type TxMetaQuotaInode struct {
49
func NewQuotaManager(volName string, mpId uint64) (mqMgr *MetaQuotaManager) {
50
mqMgr = &MetaQuotaManager{
51
statisticTemp: new(sync.Map),
52
statisticBase: new(sync.Map),
53
statisticRebuildTemp: new(sync.Map),
54
statisticRebuildBase: new(sync.Map),
55
limitedMap: new(sync.Map),
62
func (qInode *MetaQuotaInode) Marshal() (result []byte, err error) {
64
quotaBytes := bytes.NewBuffer(make([]byte, 0, 128))
65
buff := bytes.NewBuffer(make([]byte, 0, 128))
66
inodeBytes, err = qInode.inode.Marshal()
70
inodeLen := uint32(len(inodeBytes))
71
if err = binary.Write(buff, binary.BigEndian, inodeLen); err != nil {
74
buff.Write(inodeBytes)
75
for _, quotaId := range qInode.quotaIds {
76
if err = binary.Write(quotaBytes, binary.BigEndian, quotaId); err != nil {
80
buff.Write(quotaBytes.Bytes())
82
log.LogDebugf("MetaQuotaInode Marshal inode[%v] inodeLen [%v] size [%v]", qInode.inode.Inode, inodeLen, len(result))
86
func (qInode *MetaQuotaInode) Unmarshal(raw []byte) (err error) {
89
buff := bytes.NewBuffer(raw)
90
if err = binary.Read(buff, binary.BigEndian, &inodeLen); err != nil {
93
inodeBytes := make([]byte, inodeLen)
94
if _, err = buff.Read(inodeBytes); err != nil {
97
log.LogDebugf("MetaQuotaInode Unmarshal inodeLen [%v] size [%v]", inodeBytes, len(raw))
98
qInode.inode = NewInode(0, 0)
99
if err = qInode.inode.Unmarshal(inodeBytes); err != nil {
106
if err = binary.Read(buff, binary.BigEndian, "aId); err != nil {
109
qInode.quotaIds = append(qInode.quotaIds, quotaId)
114
func (qInode *TxMetaQuotaInode) Marshal() (result []byte, err error) {
115
var inodeBytes []byte
116
quotaBytes := bytes.NewBuffer(make([]byte, 0, 128))
117
buff := bytes.NewBuffer(make([]byte, 0, 128))
118
inodeBytes, err = qInode.txinode.Marshal()
122
inodeLen := uint32(len(inodeBytes))
123
if err = binary.Write(buff, binary.BigEndian, inodeLen); err != nil {
126
buff.Write(inodeBytes)
127
for _, quotaId := range qInode.quotaIds {
128
if err = binary.Write(quotaBytes, binary.BigEndian, quotaId); err != nil {
132
buff.Write(quotaBytes.Bytes())
133
result = buff.Bytes()
134
log.LogDebugf("TxMetaQuotaInode Marshal inode[%v] inodeLen [%v] size [%v]", qInode.txinode.Inode.Inode, inodeLen, len(result))
138
func (qInode *TxMetaQuotaInode) Unmarshal(raw []byte) (err error) {
141
buff := bytes.NewBuffer(raw)
142
if err = binary.Read(buff, binary.BigEndian, &inodeLen); err != nil {
145
inodeBytes := make([]byte, inodeLen)
146
if _, err = buff.Read(inodeBytes); err != nil {
149
log.LogDebugf("TxMetaQuotaInode Unmarshal inodeLen [%v] size [%v]", inodeBytes, len(raw))
150
qInode.txinode = NewTxInode(0, 0, nil)
151
if err = qInode.txinode.Unmarshal(inodeBytes); err != nil {
158
if err = binary.Read(buff, binary.BigEndian, "aId); err != nil {
161
qInode.quotaIds = append(qInode.quotaIds, quotaId)
166
func (mqMgr *MetaQuotaManager) setQuotaHbInfo(infos []*proto.QuotaHeartBeatInfo) {
168
defer mqMgr.rwlock.Unlock()
170
for _, info := range infos {
171
if mqMgr.volName != info.VolName {
174
mqMgr.enable = info.Enable
175
mqMgr.limitedMap.Store(info.QuotaId, info.LimitedInfo)
176
log.LogDebugf("mp[%v] quotaId [%v] limitedInfo [%v]", mqMgr.mpID, info.QuotaId, info.LimitedInfo)
178
mqMgr.limitedMap.Range(func(key, value interface{}) bool {
179
quotaId := key.(uint32)
182
for _, info := range infos {
183
if mqMgr.volName != info.VolName {
186
if info.QuotaId == quotaId {
193
mqMgr.limitedMap.Delete(quotaId)
200
func (mqMgr *MetaQuotaManager) getQuotaReportInfos() (infos []*proto.QuotaReportInfo) {
202
defer mqMgr.rwlock.Unlock()
203
var usedInfo proto.QuotaUsedInfo
204
mqMgr.statisticTemp.Range(func(key, value interface{}) bool {
205
usedInfo = value.(proto.QuotaUsedInfo)
206
if value, isFind := mqMgr.statisticBase.Load(key.(uint32)); isFind {
207
baseInfo := value.(proto.QuotaUsedInfo)
208
log.LogDebugf("[getQuotaReportInfos] statisticTemp mp[%v] key [%v] usedInfo [%v] baseInfo [%v]", mqMgr.mpID,
209
key.(uint32), usedInfo, baseInfo)
210
usedInfo.Add(&baseInfo)
211
if usedInfo.UsedFiles < 0 {
212
log.LogWarnf("[getQuotaReportInfos] statisticTemp mp[%v] key [%v] usedInfo [%v]", mqMgr.mpID, key.(uint32), usedInfo)
213
usedInfo.UsedFiles = 0
215
if usedInfo.UsedBytes < 0 {
216
log.LogWarnf("[getQuotaReportInfos] statisticTemp mp[%v] key [%v] usedInfo [%v]", mqMgr.mpID, key.(uint32), usedInfo)
217
usedInfo.UsedBytes = 0
220
mqMgr.statisticBase.Store(key.(uint32), usedInfo)
223
mqMgr.statisticTemp = new(sync.Map)
224
mqMgr.statisticBase.Range(func(key, value interface{}) bool {
225
quotaId := key.(uint32)
226
if _, ok := mqMgr.limitedMap.Load(quotaId); !ok {
229
usedInfo = value.(proto.QuotaUsedInfo)
230
reportInfo := &proto.QuotaReportInfo{
234
infos = append(infos, reportInfo)
235
log.LogDebugf("[getQuotaReportInfos] statisticBase mp[%v] key [%v] usedInfo [%v]", mqMgr.mpID, key.(uint32), usedInfo)
241
func (mqMgr *MetaQuotaManager) statisticRebuildStart() bool {
243
defer mqMgr.rwlock.Unlock()
251
mqMgr.rbuilding = true
255
func (mqMgr *MetaQuotaManager) statisticRebuildFin(rebuild bool) {
257
defer mqMgr.rwlock.Unlock()
258
mqMgr.rbuilding = false
260
mqMgr.statisticRebuildBase = new(sync.Map)
261
mqMgr.statisticRebuildTemp = new(sync.Map)
264
mqMgr.statisticBase = mqMgr.statisticRebuildBase
265
mqMgr.statisticTemp = mqMgr.statisticRebuildTemp
266
mqMgr.statisticRebuildBase = new(sync.Map)
267
mqMgr.statisticRebuildTemp = new(sync.Map)
269
if log.EnableInfo() {
270
mqMgr.statisticTemp.Range(func(key, value interface{}) bool {
271
quotaId := key.(uint32)
272
usedInfo := value.(proto.QuotaUsedInfo)
273
log.LogInfof("statisticRebuildFin statisticTemp mp[%v] quotaId [%v] usedInfo [%v]", mqMgr.mpID, quotaId, usedInfo)
276
mqMgr.statisticBase.Range(func(key, value interface{}) bool {
277
quotaId := key.(uint32)
278
usedInfo := value.(proto.QuotaUsedInfo)
279
log.LogInfof("statisticRebuildFin statisticBase mp[%v] quotaId [%v] usedInfo [%v]", mqMgr.mpID, quotaId, usedInfo)
285
func (mqMgr *MetaQuotaManager) IsOverQuota(size bool, files bool, quotaId uint32) (status uint8) {
286
var limitedInfo proto.QuotaLimitedInfo
288
defer mqMgr.rwlock.RUnlock()
290
log.LogInfof("IsOverQuota quota [%v] is disable.", quotaId)
293
value, isFind := mqMgr.limitedMap.Load(quotaId)
295
limitedInfo = value.(proto.QuotaLimitedInfo)
296
if size && limitedInfo.LimitedBytes {
297
status = proto.OpNoSpaceErr
300
if files && limitedInfo.LimitedFiles {
301
status = proto.OpNoSpaceErr
304
log.LogInfof("IsOverQuota quotaId [%v] limitedInfo[%v] status [%v] isFind [%v]", quotaId, limitedInfo, status, isFind)
308
func (mqMgr *MetaQuotaManager) updateUsedInfo(size int64, files int64, quotaId uint32) {
309
var baseInfo proto.QuotaUsedInfo
310
var baseTemp proto.QuotaUsedInfo
312
defer mqMgr.rwlock.Unlock()
314
value, isFind := mqMgr.statisticTemp.Load(quotaId)
316
baseInfo = value.(proto.QuotaUsedInfo)
318
baseInfo.UsedBytes += size
319
baseInfo.UsedFiles += files
320
mqMgr.statisticTemp.Store(quotaId, baseInfo)
322
value, isFind = mqMgr.statisticRebuildTemp.Load(quotaId)
324
baseTemp = value.(proto.QuotaUsedInfo)
326
baseTemp.UsedBytes = 0
327
baseTemp.UsedFiles = 0
329
baseTemp.UsedBytes += size
330
baseTemp.UsedFiles += files
331
mqMgr.statisticRebuildTemp.Store(quotaId, baseTemp)
333
log.LogDebugf("updateUsedInfo mpId [%v] quotaId [%v] baseInfo [%v] baseTemp[%v]", mqMgr.mpID, quotaId, baseInfo, baseTemp)
337
func (mqMgr *MetaQuotaManager) EnableQuota() bool {
341
func (mqMgr *MetaQuotaManager) getUsedInfoForTest(quotaId uint32) (size int64, files int64) {
343
defer mqMgr.rwlock.Unlock()
344
var baseInfo proto.QuotaUsedInfo
345
value, isFind := mqMgr.statisticTemp.Load(quotaId)
347
baseInfo = value.(proto.QuotaUsedInfo)
349
return baseInfo.UsedBytes, baseInfo.UsedFiles