cubefs

Форк
0
/
snapshot_scanner.go 
456 строк · 14.4 Кб
1
// Copyright 2023 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package lcnode
16

17
import (
18
	"os"
19
	"sync/atomic"
20
	"syscall"
21
	"time"
22

23
	"github.com/cubefs/cubefs/proto"
24
	"github.com/cubefs/cubefs/sdk/meta"
25
	"github.com/cubefs/cubefs/util/log"
26
	"github.com/cubefs/cubefs/util/routinepool"
27
	"github.com/cubefs/cubefs/util/unboundedchan"
28
)
29

30
const (
31
	SnapScanTypeOnlyFile        int = 1
32
	SnapScanTypeOnlyDirAndDepth int = 2
33
)
34

35
type SnapshotScanner struct {
36
	ID          string
37
	Volume      string
38
	mw          MetaWrapper
39
	lcnode      *LcNode
40
	adminTask   *proto.AdminTask
41
	verDelReq   *proto.SnapshotVerDelTaskRequest
42
	inodeChan   *unboundedchan.UnboundedChan
43
	rPoll       *routinepool.RoutinePool
44
	currentStat *proto.SnapshotStatistics
45
	scanType    int
46
	stopC       chan bool
47
}
48

49
func NewSnapshotScanner(adminTask *proto.AdminTask, l *LcNode) (*SnapshotScanner, error) {
50
	request := adminTask.Request.(*proto.SnapshotVerDelTaskRequest)
51
	var err error
52
	metaConfig := &meta.MetaConfig{
53
		Volume:        request.Task.VolName,
54
		Masters:       l.masters,
55
		Authenticate:  false,
56
		ValidateOwner: false,
57
	}
58

59
	var metaWrapper *meta.MetaWrapper
60
	if metaWrapper, err = meta.NewMetaWrapper(metaConfig); err != nil {
61
		return nil, err
62
	}
63

64
	scanner := &SnapshotScanner{
65
		ID:          request.Task.Id,
66
		Volume:      request.Task.VolName,
67
		mw:          metaWrapper,
68
		lcnode:      l,
69
		adminTask:   adminTask,
70
		verDelReq:   request,
71
		inodeChan:   unboundedchan.NewUnboundedChan(defaultUnboundedChanInitCapacity),
72
		rPoll:       routinepool.NewRoutinePool(snapshotRoutineNumPerTask),
73
		currentStat: &proto.SnapshotStatistics{},
74
		stopC:       make(chan bool),
75
	}
76
	return scanner, nil
77
}
78

79
func (l *LcNode) startSnapshotScan(adminTask *proto.AdminTask) (err error) {
80
	request := adminTask.Request.(*proto.SnapshotVerDelTaskRequest)
81
	log.LogInfof("startSnapshotScan: scan task(%v) received!", request.Task)
82
	response := &proto.SnapshotVerDelTaskResponse{}
83
	adminTask.Response = response
84

85
	l.scannerMutex.Lock()
86
	if _, ok := l.snapshotScanners[request.Task.Id]; ok {
87
		log.LogInfof("startSnapshotScan: scan task(%v) is already running!", request.Task)
88
		l.scannerMutex.Unlock()
89
		return
90
	}
91

92
	var scanner *SnapshotScanner
93
	scanner, err = NewSnapshotScanner(adminTask, l)
94
	if err != nil {
95
		log.LogErrorf("startSnapshotScan: NewSnapshotScanner err(%v)", err)
96
		response.Status = proto.TaskFailed
97
		response.Result = err.Error()
98
		l.scannerMutex.Unlock()
99
		return
100
	}
101
	l.snapshotScanners[scanner.ID] = scanner
102
	l.scannerMutex.Unlock()
103

104
	go scanner.Start()
105
	return
106
}
107

108
func (s *SnapshotScanner) getTaskVerSeq() uint64 {
109
	return s.verDelReq.Task.VolVersionInfo.Ver
110
}
111

112
func (s *SnapshotScanner) Stop() {
113
	close(s.stopC)
114
	s.rPoll.WaitAndClose()
115
	close(s.inodeChan.In)
116
	s.mw.Close()
117
	log.LogDebugf("snapshot scanner(%v) stopped", s.ID)
118
}
119

120
func (s *SnapshotScanner) Start() {
121
	response := s.adminTask.Response.(*proto.SnapshotVerDelTaskResponse)
122
	t := time.Now()
123
	response.StartTime = &t
124

125
	// 1. delete all files
126
	log.LogInfof("snapshot startScan(%v): first round files start!", s.ID)
127
	s.scanType = SnapScanTypeOnlyFile
128
	go s.scan()
129
	firstDentry := &proto.ScanDentry{
130
		Inode: proto.RootIno,
131
		Type:  proto.Mode(os.ModeDir),
132
	}
133
	s.firstIn(firstDentry)
134
	s.checkScanning(false)
135

136
	// 2. delete all dirs
137
	log.LogInfof("snapshot startScan(%v): second round dirs start!", s.ID)
138
	s.scanType = SnapScanTypeOnlyDirAndDepth
139
	s.firstIn(firstDentry)
140
	s.checkScanning(true)
141
}
142

143
func (s *SnapshotScanner) firstIn(d *proto.ScanDentry) {
144
	select {
145
	case <-s.stopC:
146
		log.LogDebugf("snapshot firstIn(%v): stopC!", s.ID)
147
		return
148
	default:
149
		s.inodeChan.In <- d
150
		log.LogDebugf("snapshot startScan(%v): scan type(%v), first dir dentry(%v) in!", s.ID, s.scanType, d)
151
	}
152
}
153

154
func (s *SnapshotScanner) getDirJob(dentry *proto.ScanDentry) (job func()) {
155
	switch s.scanType {
156
	case SnapScanTypeOnlyDirAndDepth:
157
		log.LogDebug("getDirJob: SnapScanTypeOnlyDirAndDepth")
158
		job = func() {
159
			s.handlVerDelDepthFirst(dentry)
160
		}
161
	case SnapScanTypeOnlyFile:
162
		if s.inodeChan.Len() > maxDirChanNum {
163
			log.LogDebug("getDirJob: SnapScanTypeOnlyFile DepthFirst")
164
			job = func() {
165
				s.handlVerDelDepthFirst(dentry)
166
			}
167
		} else {
168
			log.LogDebug("getDirJob: SnapScanTypeOnlyFile BreadthFirst")
169
			job = func() {
170
				s.handlVerDelBreadthFirst(dentry)
171
			}
172
		}
173
	default:
174
		log.LogErrorf("getDirJob: invalid scanType: %v", s.scanType)
175
	}
176
	return
177
}
178

179
func (s *SnapshotScanner) scan() {
180
	log.LogDebugf("SnapshotScanner Enter scan")
181
	defer func() {
182
		log.LogDebugf("SnapshotScanner Exit scan")
183
	}()
184
	for {
185
		select {
186
		case <-s.stopC:
187
			return
188
		case val, ok := <-s.inodeChan.Out:
189
			if !ok {
190
				log.LogWarnf("inodeChan closed")
191
			} else {
192
				dentry := val.(*proto.ScanDentry)
193
				job := s.getDirJob(dentry)
194
				_, err := s.rPoll.Submit(job)
195
				if err != nil {
196
					log.LogErrorf("handlVerDel failed, err(%v)", err)
197
				}
198
			}
199
		}
200
	}
201
}
202

203
func (s *SnapshotScanner) handlVerDelDepthFirst(dentry *proto.ScanDentry) {
204
	var (
205
		children []proto.Dentry
206
		ino      *proto.InodeInfo
207
		err      error
208
	)
209
	onlyDir := s.scanType == SnapScanTypeOnlyDirAndDepth
210

211
	if os.FileMode(dentry.Type).IsDir() {
212
		marker := ""
213
		done := false
214

215
		for !done {
216
			children, err = s.mw.ReadDirLimitForSnapShotClean(dentry.Inode, marker, uint64(defaultReadDirLimit), s.getTaskVerSeq(), onlyDir)
217
			if err != nil && err != syscall.ENOENT {
218
				log.LogErrorf("action[handlVerDelDepthFirst] ReadDirLimitForSnapShotClean failed, parent[%v] maker[%v] verSeq[%v] err[%v]",
219
					dentry.Inode, marker, s.getTaskVerSeq(), err)
220
				atomic.AddInt64(&s.currentStat.ErrorSkippedNum, 1)
221
				return
222
			}
223
			log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimitForSnapShotClean parent[%v] maker[%v] verSeq[%v] children[%v]",
224
				dentry.Inode, marker, s.getTaskVerSeq(), len(children))
225

226
			if err == syscall.ENOENT {
227
				log.LogErrorf("action[handlVerDelDepthFirst] ReadDirLimitForSnapShotClean failed, parent[%v] maker[%v] verSeq[%v] err[%v]",
228
					dentry.Inode, marker, s.getTaskVerSeq(), err)
229
				break
230
			}
231

232
			if marker != "" {
233
				if len(children) >= 1 && marker == children[0].Name {
234
					if len(children) <= 1 {
235
						log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimit_ll done, parent[%v] maker[%v] verSeq[%v] children[%v]",
236
							dentry.Inode, marker, s.getTaskVerSeq(), children)
237
						break
238
					} else {
239
						skippedChild := children[0]
240
						children = children[1:]
241
						log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimit_ll skip last marker[%v], parent[%v] verSeq[%v] skippedName[%v]",
242
							marker, dentry.Inode, s.getTaskVerSeq(), skippedChild.Name)
243
					}
244
				}
245
			}
246

247
			files := make([]*proto.ScanDentry, 0)
248
			dirs := make([]*proto.ScanDentry, 0)
249

250
			for _, child := range children {
251
				childDentry := &proto.ScanDentry{
252
					ParentId: dentry.Inode,
253
					Name:     child.Name,
254
					Inode:    child.Inode,
255
					Type:     child.Type,
256
				}
257
				if os.FileMode(childDentry.Type).IsDir() {
258
					dirs = append(dirs, childDentry)
259
				} else {
260
					files = append(files, childDentry)
261
				}
262
			}
263

264
			for _, file := range files {
265
				if ino, err = s.mw.Delete_Ver_ll(file.ParentId, file.Name, false, s.getTaskVerSeq(), file.Path); err != nil {
266
					log.LogErrorf("action[handlVerDelDepthFirst] Delete_Ver_ll failed, file(parent[%v] child name[%v]) verSeq[%v] err[%v]",
267
						file.ParentId, file.Name, s.getTaskVerSeq(), err)
268
					atomic.AddInt64(&s.currentStat.ErrorSkippedNum, 1)
269
					return
270
				} else {
271
					log.LogDebugf("action[handlVerDelDepthFirst] Delete_Ver_ll success, file(parent[%v] child name[%v]) verSeq[%v] ino[%v]",
272
						file.ParentId, file.Name, s.getTaskVerSeq(), ino)
273
					atomic.AddInt64(&s.currentStat.FileNum, 1)
274
					atomic.AddInt64(&s.currentStat.TotalInodeNum, 1)
275
				}
276
			}
277

278
			for _, dir := range dirs {
279
				s.handlVerDelDepthFirst(dir)
280
			}
281

282
			childrenNr := len(children)
283
			if (marker == "" && childrenNr < defaultReadDirLimit) || (marker != "" && childrenNr+1 < defaultReadDirLimit) {
284
				log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimit_ll done, parent[%v]",
285
					dentry.Inode)
286
				done = true
287
			} else {
288
				marker = children[childrenNr-1].Name
289
				log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimit_ll next marker[%v] parent[%v]", marker, dentry.Inode)
290
			}
291
		}
292

293
	}
294

295
	if onlyDir {
296
		if ino, err = s.mw.Delete_Ver_ll(dentry.ParentId, dentry.Name, os.FileMode(dentry.Type).IsDir(), s.getTaskVerSeq(), dentry.Path); err != nil {
297
			if dentry.ParentId >= 1 {
298
				log.LogErrorf("action[handlVerDelDepthFirst] Delete_Ver_ll failed, dir(parent[%v] child name[%v]) verSeq[%v] err[%v]",
299
					dentry.ParentId, dentry.Name, s.getTaskVerSeq(), err)
300
				atomic.AddInt64(&s.currentStat.ErrorSkippedNum, 1)
301
				return
302
			}
303
		} else {
304
			log.LogDebugf("action[handlVerDelDepthFirst] Delete_Ver_ll success, dir(parent[%v] child name[%v]) verSeq[%v] ino[%v]",
305
				dentry.ParentId, dentry.Name, s.getTaskVerSeq(), ino)
306
			atomic.AddInt64(&s.currentStat.DirNum, 1)
307
			atomic.AddInt64(&s.currentStat.TotalInodeNum, 1)
308
		}
309
	}
310
}
311

312
func (s *SnapshotScanner) handlVerDelBreadthFirst(dentry *proto.ScanDentry) {
313
	var (
314
		children []proto.Dentry
315
		ino      *proto.InodeInfo
316
		err      error
317
	)
318

319
	if !os.FileMode(dentry.Type).IsDir() {
320
		return
321
	}
322

323
	scanDentries := make([]*proto.ScanDentry, 0)
324
	totalChildDirNum := 0
325
	totalChildFileNum := 0
326
	marker := ""
327
	done := false
328

329
	for !done {
330
		children, err = s.mw.ReadDirLimitForSnapShotClean(dentry.Inode, marker, uint64(defaultReadDirLimit), s.getTaskVerSeq(), false)
331
		if err != nil && err != syscall.ENOENT {
332
			log.LogErrorf("action[handlVerDelBreadthFirst] ReadDirLimitForSnapShotClean failed, parent[%v] maker[%v] verSeq[%v] err[%v]",
333
				dentry.Inode, marker, s.getTaskVerSeq(), err)
334
			atomic.AddInt64(&s.currentStat.ErrorSkippedNum, 1)
335
			return
336
		}
337
		log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimitForSnapShotClean parent[%v] maker[%v] verSeq[%v] children[%v]",
338
			dentry.Inode, marker, s.getTaskVerSeq(), len(children))
339

340
		if err == syscall.ENOENT {
341
			log.LogErrorf("action[handlVerDelBreadthFirst] ReadDirLimitForSnapShotClean failed, parent[%v] maker[%v] verSeq[%v] err[%v]",
342
				dentry.Inode, marker, s.getTaskVerSeq(), err)
343
			break
344
		}
345

346
		if marker != "" {
347
			if len(children) >= 1 && marker == children[0].Name {
348
				if len(children) <= 1 {
349
					log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimit_ll done, parent[%v] maker[%v] verSeq[%v] children[%v]",
350
						dentry.Inode, marker, s.getTaskVerSeq(), children)
351
					break
352
				} else {
353
					skippedChild := children[0]
354
					children = children[1:]
355
					log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimit_ll skip last marker[%v], parent[%v] verSeq[%v] skippedName[%v]",
356
						marker, dentry.Inode, s.getTaskVerSeq(), skippedChild.Name)
357
				}
358
			}
359
		}
360

361
		for _, child := range children {
362
			childDentry := &proto.ScanDentry{
363
				ParentId: dentry.Inode,
364
				Name:     child.Name,
365
				Inode:    child.Inode,
366
				Type:     child.Type,
367
			}
368
			if os.FileMode(childDentry.Type).IsDir() {
369
				s.inodeChan.In <- childDentry
370
				totalChildDirNum++
371
				log.LogDebugf("action[handlVerDelBreadthFirst] push dir(parent[%v] child name[%v] ino[%v]) in channel",
372
					childDentry.ParentId, childDentry.Name, childDentry.Inode)
373
			} else {
374
				scanDentries = append(scanDentries, childDentry)
375
			}
376
		}
377

378
		for _, file := range scanDentries {
379
			if ino, err = s.mw.Delete_Ver_ll(file.ParentId, file.Name, false, s.getTaskVerSeq(), dentry.Path); err != nil {
380
				log.LogErrorf("action[handlVerDelBreadthFirst] Delete_Ver_ll failed, file(parent[%v] child name[%v]) verSeq[%v] err[%v]",
381
					file.ParentId, file.Name, s.getTaskVerSeq(), err)
382
				atomic.AddInt64(&s.currentStat.ErrorSkippedNum, 1)
383
				return
384
			} else {
385
				totalChildFileNum++
386
				log.LogDebugf("action[handlVerDelBreadthFirst] Delete_Ver_ll success, file(parent[%v] child name[%v]) verSeq[%v] ino[%v]",
387
					file.ParentId, file.Name, s.getTaskVerSeq(), ino)
388
				atomic.AddInt64(&s.currentStat.FileNum, 1)
389
				atomic.AddInt64(&s.currentStat.TotalInodeNum, 1)
390
			}
391
		}
392
		scanDentries = scanDentries[:0]
393
		childrenNr := len(children)
394
		if (marker == "" && childrenNr < defaultReadDirLimit) || (marker != "" && childrenNr+1 < defaultReadDirLimit) {
395
			log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimit_ll done, parent[%v] total childrenNr[%v] marker[%v]",
396
				dentry.Inode, totalChildFileNum+totalChildDirNum, marker)
397
			done = true
398
		} else {
399
			marker = children[childrenNr-1].Name
400
			log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimit_ll next marker[%v] parent[%v]", marker, dentry.Inode)
401
		}
402
	}
403
}
404

405
func (s *SnapshotScanner) DoneScanning() bool {
406
	log.LogDebugf("inodeChan.Len(%v) rPoll.RunningNum(%v)", s.inodeChan.Len(), s.rPoll.RunningNum())
407
	return s.inodeChan.Len() == 0 && s.rPoll.RunningNum() == 0
408
}
409

410
func (s *SnapshotScanner) checkScanning(report bool) {
411
	dur := time.Second * time.Duration(scanCheckInterval)
412
	taskCheckTimer := time.NewTimer(dur)
413
	for {
414
		select {
415
		case <-s.stopC:
416
			log.LogDebugf("stop checking scan")
417
			return
418
		case <-taskCheckTimer.C:
419
			if s.DoneScanning() {
420
				taskCheckTimer.Stop()
421
				if report {
422
					t := time.Now()
423
					response := s.adminTask.Response.(*proto.SnapshotVerDelTaskResponse)
424
					if s.currentStat.ErrorSkippedNum > 0 {
425
						response.Status = proto.TaskFailed
426
					} else {
427
						response.Status = proto.TaskSucceeds
428
					}
429
					response.EndTime = &t
430
					response.Done = true
431
					response.ID = s.ID
432
					response.LcNode = s.lcnode.localServerAddr
433
					response.SnapshotVerDelTask = s.verDelReq.Task
434
					response.VolName = s.Volume
435
					response.VerSeq = s.getTaskVerSeq()
436
					response.FileNum = s.currentStat.FileNum
437
					response.DirNum = s.currentStat.DirNum
438
					response.TotalInodeNum = s.currentStat.TotalInodeNum
439
					response.ErrorSkippedNum = s.currentStat.ErrorSkippedNum
440
					s.lcnode.scannerMutex.Lock()
441
					s.Stop()
442
					delete(s.lcnode.snapshotScanners, s.ID)
443
					s.lcnode.scannerMutex.Unlock()
444

445
					s.lcnode.respondToMaster(s.adminTask)
446
					log.LogInfof("scan completed for task(%v)", s.adminTask)
447
				} else {
448
					log.LogInfof("first round scan completed for task(%v) without report", s.adminTask)
449
				}
450

451
				return
452
			}
453
			taskCheckTimer.Reset(dur)
454
		}
455
	}
456
}
457

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.