1
// Copyright 2023 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
23
"github.com/cubefs/cubefs/proto"
24
"github.com/cubefs/cubefs/sdk/meta"
25
"github.com/cubefs/cubefs/util/log"
26
"github.com/cubefs/cubefs/util/routinepool"
27
"github.com/cubefs/cubefs/util/unboundedchan"
31
SnapScanTypeOnlyFile int = 1
32
SnapScanTypeOnlyDirAndDepth int = 2
35
type SnapshotScanner struct {
40
adminTask *proto.AdminTask
41
verDelReq *proto.SnapshotVerDelTaskRequest
42
inodeChan *unboundedchan.UnboundedChan
43
rPoll *routinepool.RoutinePool
44
currentStat *proto.SnapshotStatistics
49
func NewSnapshotScanner(adminTask *proto.AdminTask, l *LcNode) (*SnapshotScanner, error) {
50
request := adminTask.Request.(*proto.SnapshotVerDelTaskRequest)
52
metaConfig := &meta.MetaConfig{
53
Volume: request.Task.VolName,
59
var metaWrapper *meta.MetaWrapper
60
if metaWrapper, err = meta.NewMetaWrapper(metaConfig); err != nil {
64
scanner := &SnapshotScanner{
66
Volume: request.Task.VolName,
71
inodeChan: unboundedchan.NewUnboundedChan(defaultUnboundedChanInitCapacity),
72
rPoll: routinepool.NewRoutinePool(snapshotRoutineNumPerTask),
73
currentStat: &proto.SnapshotStatistics{},
74
stopC: make(chan bool),
79
func (l *LcNode) startSnapshotScan(adminTask *proto.AdminTask) (err error) {
80
request := adminTask.Request.(*proto.SnapshotVerDelTaskRequest)
81
log.LogInfof("startSnapshotScan: scan task(%v) received!", request.Task)
82
response := &proto.SnapshotVerDelTaskResponse{}
83
adminTask.Response = response
86
if _, ok := l.snapshotScanners[request.Task.Id]; ok {
87
log.LogInfof("startSnapshotScan: scan task(%v) is already running!", request.Task)
88
l.scannerMutex.Unlock()
92
var scanner *SnapshotScanner
93
scanner, err = NewSnapshotScanner(adminTask, l)
95
log.LogErrorf("startSnapshotScan: NewSnapshotScanner err(%v)", err)
96
response.Status = proto.TaskFailed
97
response.Result = err.Error()
98
l.scannerMutex.Unlock()
101
l.snapshotScanners[scanner.ID] = scanner
102
l.scannerMutex.Unlock()
108
func (s *SnapshotScanner) getTaskVerSeq() uint64 {
109
return s.verDelReq.Task.VolVersionInfo.Ver
112
func (s *SnapshotScanner) Stop() {
114
s.rPoll.WaitAndClose()
115
close(s.inodeChan.In)
117
log.LogDebugf("snapshot scanner(%v) stopped", s.ID)
120
func (s *SnapshotScanner) Start() {
121
response := s.adminTask.Response.(*proto.SnapshotVerDelTaskResponse)
123
response.StartTime = &t
125
// 1. delete all files
126
log.LogInfof("snapshot startScan(%v): first round files start!", s.ID)
127
s.scanType = SnapScanTypeOnlyFile
129
firstDentry := &proto.ScanDentry{
130
Inode: proto.RootIno,
131
Type: proto.Mode(os.ModeDir),
133
s.firstIn(firstDentry)
134
s.checkScanning(false)
136
// 2. delete all dirs
137
log.LogInfof("snapshot startScan(%v): second round dirs start!", s.ID)
138
s.scanType = SnapScanTypeOnlyDirAndDepth
139
s.firstIn(firstDentry)
140
s.checkScanning(true)
143
func (s *SnapshotScanner) firstIn(d *proto.ScanDentry) {
146
log.LogDebugf("snapshot firstIn(%v): stopC!", s.ID)
150
log.LogDebugf("snapshot startScan(%v): scan type(%v), first dir dentry(%v) in!", s.ID, s.scanType, d)
154
func (s *SnapshotScanner) getDirJob(dentry *proto.ScanDentry) (job func()) {
156
case SnapScanTypeOnlyDirAndDepth:
157
log.LogDebug("getDirJob: SnapScanTypeOnlyDirAndDepth")
159
s.handlVerDelDepthFirst(dentry)
161
case SnapScanTypeOnlyFile:
162
if s.inodeChan.Len() > maxDirChanNum {
163
log.LogDebug("getDirJob: SnapScanTypeOnlyFile DepthFirst")
165
s.handlVerDelDepthFirst(dentry)
168
log.LogDebug("getDirJob: SnapScanTypeOnlyFile BreadthFirst")
170
s.handlVerDelBreadthFirst(dentry)
174
log.LogErrorf("getDirJob: invalid scanType: %v", s.scanType)
179
func (s *SnapshotScanner) scan() {
180
log.LogDebugf("SnapshotScanner Enter scan")
182
log.LogDebugf("SnapshotScanner Exit scan")
188
case val, ok := <-s.inodeChan.Out:
190
log.LogWarnf("inodeChan closed")
192
dentry := val.(*proto.ScanDentry)
193
job := s.getDirJob(dentry)
194
_, err := s.rPoll.Submit(job)
196
log.LogErrorf("handlVerDel failed, err(%v)", err)
203
func (s *SnapshotScanner) handlVerDelDepthFirst(dentry *proto.ScanDentry) {
205
children []proto.Dentry
209
onlyDir := s.scanType == SnapScanTypeOnlyDirAndDepth
211
if os.FileMode(dentry.Type).IsDir() {
216
children, err = s.mw.ReadDirLimitForSnapShotClean(dentry.Inode, marker, uint64(defaultReadDirLimit), s.getTaskVerSeq(), onlyDir)
217
if err != nil && err != syscall.ENOENT {
218
log.LogErrorf("action[handlVerDelDepthFirst] ReadDirLimitForSnapShotClean failed, parent[%v] maker[%v] verSeq[%v] err[%v]",
219
dentry.Inode, marker, s.getTaskVerSeq(), err)
220
atomic.AddInt64(&s.currentStat.ErrorSkippedNum, 1)
223
log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimitForSnapShotClean parent[%v] maker[%v] verSeq[%v] children[%v]",
224
dentry.Inode, marker, s.getTaskVerSeq(), len(children))
226
if err == syscall.ENOENT {
227
log.LogErrorf("action[handlVerDelDepthFirst] ReadDirLimitForSnapShotClean failed, parent[%v] maker[%v] verSeq[%v] err[%v]",
228
dentry.Inode, marker, s.getTaskVerSeq(), err)
233
if len(children) >= 1 && marker == children[0].Name {
234
if len(children) <= 1 {
235
log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimit_ll done, parent[%v] maker[%v] verSeq[%v] children[%v]",
236
dentry.Inode, marker, s.getTaskVerSeq(), children)
239
skippedChild := children[0]
240
children = children[1:]
241
log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimit_ll skip last marker[%v], parent[%v] verSeq[%v] skippedName[%v]",
242
marker, dentry.Inode, s.getTaskVerSeq(), skippedChild.Name)
247
files := make([]*proto.ScanDentry, 0)
248
dirs := make([]*proto.ScanDentry, 0)
250
for _, child := range children {
251
childDentry := &proto.ScanDentry{
252
ParentId: dentry.Inode,
257
if os.FileMode(childDentry.Type).IsDir() {
258
dirs = append(dirs, childDentry)
260
files = append(files, childDentry)
264
for _, file := range files {
265
if ino, err = s.mw.Delete_Ver_ll(file.ParentId, file.Name, false, s.getTaskVerSeq(), file.Path); err != nil {
266
log.LogErrorf("action[handlVerDelDepthFirst] Delete_Ver_ll failed, file(parent[%v] child name[%v]) verSeq[%v] err[%v]",
267
file.ParentId, file.Name, s.getTaskVerSeq(), err)
268
atomic.AddInt64(&s.currentStat.ErrorSkippedNum, 1)
271
log.LogDebugf("action[handlVerDelDepthFirst] Delete_Ver_ll success, file(parent[%v] child name[%v]) verSeq[%v] ino[%v]",
272
file.ParentId, file.Name, s.getTaskVerSeq(), ino)
273
atomic.AddInt64(&s.currentStat.FileNum, 1)
274
atomic.AddInt64(&s.currentStat.TotalInodeNum, 1)
278
for _, dir := range dirs {
279
s.handlVerDelDepthFirst(dir)
282
childrenNr := len(children)
283
if (marker == "" && childrenNr < defaultReadDirLimit) || (marker != "" && childrenNr+1 < defaultReadDirLimit) {
284
log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimit_ll done, parent[%v]",
288
marker = children[childrenNr-1].Name
289
log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimit_ll next marker[%v] parent[%v]", marker, dentry.Inode)
296
if ino, err = s.mw.Delete_Ver_ll(dentry.ParentId, dentry.Name, os.FileMode(dentry.Type).IsDir(), s.getTaskVerSeq(), dentry.Path); err != nil {
297
if dentry.ParentId >= 1 {
298
log.LogErrorf("action[handlVerDelDepthFirst] Delete_Ver_ll failed, dir(parent[%v] child name[%v]) verSeq[%v] err[%v]",
299
dentry.ParentId, dentry.Name, s.getTaskVerSeq(), err)
300
atomic.AddInt64(&s.currentStat.ErrorSkippedNum, 1)
304
log.LogDebugf("action[handlVerDelDepthFirst] Delete_Ver_ll success, dir(parent[%v] child name[%v]) verSeq[%v] ino[%v]",
305
dentry.ParentId, dentry.Name, s.getTaskVerSeq(), ino)
306
atomic.AddInt64(&s.currentStat.DirNum, 1)
307
atomic.AddInt64(&s.currentStat.TotalInodeNum, 1)
312
func (s *SnapshotScanner) handlVerDelBreadthFirst(dentry *proto.ScanDentry) {
314
children []proto.Dentry
319
if !os.FileMode(dentry.Type).IsDir() {
323
scanDentries := make([]*proto.ScanDentry, 0)
324
totalChildDirNum := 0
325
totalChildFileNum := 0
330
children, err = s.mw.ReadDirLimitForSnapShotClean(dentry.Inode, marker, uint64(defaultReadDirLimit), s.getTaskVerSeq(), false)
331
if err != nil && err != syscall.ENOENT {
332
log.LogErrorf("action[handlVerDelBreadthFirst] ReadDirLimitForSnapShotClean failed, parent[%v] maker[%v] verSeq[%v] err[%v]",
333
dentry.Inode, marker, s.getTaskVerSeq(), err)
334
atomic.AddInt64(&s.currentStat.ErrorSkippedNum, 1)
337
log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimitForSnapShotClean parent[%v] maker[%v] verSeq[%v] children[%v]",
338
dentry.Inode, marker, s.getTaskVerSeq(), len(children))
340
if err == syscall.ENOENT {
341
log.LogErrorf("action[handlVerDelBreadthFirst] ReadDirLimitForSnapShotClean failed, parent[%v] maker[%v] verSeq[%v] err[%v]",
342
dentry.Inode, marker, s.getTaskVerSeq(), err)
347
if len(children) >= 1 && marker == children[0].Name {
348
if len(children) <= 1 {
349
log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimit_ll done, parent[%v] maker[%v] verSeq[%v] children[%v]",
350
dentry.Inode, marker, s.getTaskVerSeq(), children)
353
skippedChild := children[0]
354
children = children[1:]
355
log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimit_ll skip last marker[%v], parent[%v] verSeq[%v] skippedName[%v]",
356
marker, dentry.Inode, s.getTaskVerSeq(), skippedChild.Name)
361
for _, child := range children {
362
childDentry := &proto.ScanDentry{
363
ParentId: dentry.Inode,
368
if os.FileMode(childDentry.Type).IsDir() {
369
s.inodeChan.In <- childDentry
371
log.LogDebugf("action[handlVerDelBreadthFirst] push dir(parent[%v] child name[%v] ino[%v]) in channel",
372
childDentry.ParentId, childDentry.Name, childDentry.Inode)
374
scanDentries = append(scanDentries, childDentry)
378
for _, file := range scanDentries {
379
if ino, err = s.mw.Delete_Ver_ll(file.ParentId, file.Name, false, s.getTaskVerSeq(), dentry.Path); err != nil {
380
log.LogErrorf("action[handlVerDelBreadthFirst] Delete_Ver_ll failed, file(parent[%v] child name[%v]) verSeq[%v] err[%v]",
381
file.ParentId, file.Name, s.getTaskVerSeq(), err)
382
atomic.AddInt64(&s.currentStat.ErrorSkippedNum, 1)
386
log.LogDebugf("action[handlVerDelBreadthFirst] Delete_Ver_ll success, file(parent[%v] child name[%v]) verSeq[%v] ino[%v]",
387
file.ParentId, file.Name, s.getTaskVerSeq(), ino)
388
atomic.AddInt64(&s.currentStat.FileNum, 1)
389
atomic.AddInt64(&s.currentStat.TotalInodeNum, 1)
392
scanDentries = scanDentries[:0]
393
childrenNr := len(children)
394
if (marker == "" && childrenNr < defaultReadDirLimit) || (marker != "" && childrenNr+1 < defaultReadDirLimit) {
395
log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimit_ll done, parent[%v] total childrenNr[%v] marker[%v]",
396
dentry.Inode, totalChildFileNum+totalChildDirNum, marker)
399
marker = children[childrenNr-1].Name
400
log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimit_ll next marker[%v] parent[%v]", marker, dentry.Inode)
405
func (s *SnapshotScanner) DoneScanning() bool {
406
log.LogDebugf("inodeChan.Len(%v) rPoll.RunningNum(%v)", s.inodeChan.Len(), s.rPoll.RunningNum())
407
return s.inodeChan.Len() == 0 && s.rPoll.RunningNum() == 0
410
func (s *SnapshotScanner) checkScanning(report bool) {
411
dur := time.Second * time.Duration(scanCheckInterval)
412
taskCheckTimer := time.NewTimer(dur)
416
log.LogDebugf("stop checking scan")
418
case <-taskCheckTimer.C:
419
if s.DoneScanning() {
420
taskCheckTimer.Stop()
423
response := s.adminTask.Response.(*proto.SnapshotVerDelTaskResponse)
424
if s.currentStat.ErrorSkippedNum > 0 {
425
response.Status = proto.TaskFailed
427
response.Status = proto.TaskSucceeds
429
response.EndTime = &t
432
response.LcNode = s.lcnode.localServerAddr
433
response.SnapshotVerDelTask = s.verDelReq.Task
434
response.VolName = s.Volume
435
response.VerSeq = s.getTaskVerSeq()
436
response.FileNum = s.currentStat.FileNum
437
response.DirNum = s.currentStat.DirNum
438
response.TotalInodeNum = s.currentStat.TotalInodeNum
439
response.ErrorSkippedNum = s.currentStat.ErrorSkippedNum
440
s.lcnode.scannerMutex.Lock()
442
delete(s.lcnode.snapshotScanners, s.ID)
443
s.lcnode.scannerMutex.Unlock()
445
s.lcnode.respondToMaster(s.adminTask)
446
log.LogInfof("scan completed for task(%v)", s.adminTask)
448
log.LogInfof("first round scan completed for task(%v) without report", s.adminTask)
453
taskCheckTimer.Reset(dur)