cubefs

Форк
0
/
fuse.go 
921 строка · 26.4 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package main
16

17
//
18
// Usage: ./client -c fuse.json &
19
//
20
// Default mountpoint is specified in fuse.json, which is "/mnt".
21
//
22

23
import (
24
	"flag"
25
	"fmt"
26
	"io"
27
	syslog "log"
28
	"math"
29
	"net"
30
	"net/http"
31
	"net/http/pprof"
32
	"os"
33
	"os/signal"
34
	"path"
35
	"path/filepath"
36
	"runtime"
37
	"runtime/debug"
38
	"strings"
39
	"syscall"
40
	"time"
41

42
	"github.com/cubefs/cubefs/blockcache/bcache"
43
	cfs "github.com/cubefs/cubefs/client/fs"
44
	"github.com/cubefs/cubefs/depends/bazil.org/fuse"
45
	"github.com/cubefs/cubefs/depends/bazil.org/fuse/fs"
46
	"github.com/cubefs/cubefs/proto"
47
	"github.com/cubefs/cubefs/sdk/master"
48
	"github.com/cubefs/cubefs/util"
49
	"github.com/cubefs/cubefs/util/auditlog"
50
	"github.com/cubefs/cubefs/util/buf"
51
	"github.com/cubefs/cubefs/util/config"
52
	"github.com/cubefs/cubefs/util/errors"
53
	"github.com/cubefs/cubefs/util/exporter"
54
	"github.com/cubefs/cubefs/util/log"
55
	"github.com/cubefs/cubefs/util/stat"
56
	sysutil "github.com/cubefs/cubefs/util/sys"
57
	"github.com/cubefs/cubefs/util/ump"
58
	"github.com/jacobsa/daemonize"
59
	_ "go.uber.org/automaxprocs"
60
)
61

62
const (
63
	MaxReadAhead = 512 * 1024
64

65
	defaultRlimit uint64 = 1024000
66

67
	UpdateConfInterval = 2 * time.Minute
68

69
	MasterRetrys = 5
70
)
71

72
const (
73
	LoggerDir    = "client"
74
	LoggerPrefix = "client"
75
	LoggerOutput = "output.log"
76

77
	ModuleName            = "fuseclient"
78
	ConfigKeyExporterPort = "exporterKey"
79

80
	ControlCommandSetRate      = "/rate/set"
81
	ControlCommandGetRate      = "/rate/get"
82
	ControlCommandFreeOSMemory = "/debug/freeosmemory"
83
	ControlCommandSuspend      = "/suspend"
84
	ControlCommandResume       = "/resume"
85
	Role                       = "Client"
86

87
	DefaultIP            = "127.0.0.1"
88
	DynamicUDSNameFormat = "/tmp/CubeFS-fdstore-%v.sock"
89
	DefaultUDSName       = "/tmp/CubeFS-fdstore.sock"
90

91
	DefaultLogPath = "/var/log/cubefs"
92
)
93

94
var (
95
	configFile           = flag.String("c", "", "FUSE client config file")
96
	configVersion        = flag.Bool("v", false, "show version")
97
	configForeground     = flag.Bool("f", false, "run foreground")
98
	configDynamicUDSName = flag.Bool("n", false, "dynamic unix domain socket filename")
99
	configRestoreFuse    = flag.Bool("r", false, "restore FUSE instead of mounting")
100
	configRestoreFuseUDS = flag.String("s", "", "restore socket addr")
101
	configFuseHttpPort   = flag.String("p", "", "fuse http service port")
102
)
103

104
var GlobalMountOptions []proto.MountOption
105

106
func init() {
107
	GlobalMountOptions = proto.NewMountOptions()
108
	proto.InitMountOptions(GlobalMountOptions)
109
}
110

111
func createUDS(sockAddr string) (listener net.Listener, err error) {
112
	var addr *net.UnixAddr
113

114
	log.LogInfof("sockaddr: %s\n", sockAddr)
115

116
	os.Remove(sockAddr)
117
	if addr, err = net.ResolveUnixAddr("unix", sockAddr); err != nil {
118
		log.LogErrorf("cannot resolve unix addr: %v\n", err)
119
		return
120
	}
121

122
	if listener, err = net.ListenUnix("unix", addr); err != nil {
123
		log.LogErrorf("cannot create unix domain: %v\n", err)
124
		return
125
	}
126

127
	if err = os.Chmod(sockAddr, 0o666); err != nil {
128
		log.LogErrorf("failed to chmod socket file: %v\n", err)
129
		listener.Close()
130
		return
131
	}
132

133
	return
134
}
135

136
func destroyUDS(listener net.Listener) {
137
	sockAddr := listener.Addr().String()
138
	listener.Close()
139
	os.Remove(sockAddr)
140
}
141

142
func recvFuseFdFromOldClient(udsListener net.Listener) (file *os.File, err error) {
143
	var conn net.Conn
144
	var socket *os.File
145

146
	if conn, err = udsListener.Accept(); err != nil {
147
		log.LogErrorf("unix domain accepts fail: %v\n", err)
148
		return
149
	}
150
	defer conn.Close()
151

152
	log.LogInfof("a new connection accepted\n")
153
	unixconn := conn.(*net.UnixConn)
154
	if socket, err = unixconn.File(); err != nil {
155
		log.LogErrorf("failed to get socket file: %v\n", err)
156
		return
157
	}
158
	defer socket.Close()
159

160
	if file, err = util.RecvFd(socket); err != nil {
161
		log.LogErrorf("failed to receive fd: %v\n", err)
162
		return
163
	}
164

165
	log.LogInfof("Received file %s fd %v\n", file.Name(), file.Fd())
166
	return
167
}
168

169
func sendSuspendRequest(port string, udsListener net.Listener) (err error) {
170
	var (
171
		req  *http.Request
172
		resp *http.Response
173
		data []byte
174
	)
175
	udsFilePath := udsListener.Addr().String()
176

177
	url := fmt.Sprintf("http://%s:%s/suspend?sock=%s", DefaultIP, port, udsFilePath)
178
	if req, err = http.NewRequest("POST", url, nil); err != nil {
179
		log.LogErrorf("Failed to get new request: %v\n", err)
180
		return err
181
	}
182
	req.Header.Set("Content-Type", "application/text")
183

184
	client := http.DefaultClient
185
	client.Timeout = 120 * time.Second
186
	if resp, err = client.Do(req); err != nil {
187
		log.LogErrorf("Failed to post request: %v\n", err)
188
		return err
189
	}
190
	defer resp.Body.Close()
191

192
	if data, err = io.ReadAll(resp.Body); err != nil {
193
		log.LogErrorf("Failed to read response: %v\n", err)
194
		return err
195
	}
196

197
	if resp.StatusCode == http.StatusOK {
198
		log.LogInfof("\n==> %s\n==> Could restore cfs-client now with -r option.\n\n", string(data))
199
	} else {
200
		log.LogErrorf("\n==> %s\n==> Status: %s\n\n", string(data), resp.Status)
201
		return fmt.Errorf(resp.Status)
202
	}
203

204
	return nil
205
}
206

207
func sendResumeRequest(port string) (err error) {
208
	var (
209
		req  *http.Request
210
		resp *http.Response
211
		data []byte
212
	)
213

214
	url := fmt.Sprintf("http://%s:%s/resume", DefaultIP, port)
215
	if req, err = http.NewRequest("POST", url, nil); err != nil {
216
		log.LogErrorf("Failed to get new request: %v\n", err)
217
		return err
218
	}
219
	req.Header.Set("Content-Type", "application/text")
220

221
	client := http.DefaultClient
222
	if resp, err = client.Do(req); err != nil {
223
		log.LogErrorf("Failed to post request: %v\n", err)
224
		return err
225
	}
226
	defer resp.Body.Close()
227

228
	if data, err = io.ReadAll(resp.Body); err != nil {
229
		log.LogErrorf("Failed to read response: %v\n", err)
230
		return err
231
	}
232

233
	log.LogInfof("data: %s\n", string(data))
234
	return nil
235
}
236

237
func doSuspend(uds string, port string) (*os.File, error) {
238
	var fud *os.File
239

240
	udsListener, err := createUDS(uds)
241
	if err != nil {
242
		log.LogErrorf("doSuspend: failed to create UDS: %v\n", err)
243
		return nil, err
244
	}
245
	defer destroyUDS(udsListener)
246

247
	if err = sendSuspendRequest(port, udsListener); err != nil {
248
		sendResumeRequest(port)
249
		return nil, err
250
	}
251

252
	if fud, err = recvFuseFdFromOldClient(udsListener); err != nil {
253
		sendResumeRequest(port)
254
		return nil, err
255
	}
256

257
	return fud, nil
258
}
259

260
func main() {
261
	flag.Parse()
262

263
	if *configVersion {
264
		fmt.Print(proto.DumpVersion(Role))
265
		os.Exit(0)
266
	}
267

268
	if !*configForeground {
269
		if err := startDaemon(); err != nil {
270
			fmt.Printf("Mount failed: %v\n", err)
271
			os.Exit(1)
272
		}
273
		os.Exit(0)
274
	}
275

276
	/*
277
	 * We are in daemon from here.
278
	 * Must notify the parent process through SignalOutcome anyway.
279
	 */
280

281
	cfg, _ := config.LoadConfigFile(*configFile)
282
	opt, err := parseMountOption(cfg)
283
	if err != nil {
284
		err = errors.NewErrorf("parse mount opt failed: %v\n", err)
285
		fmt.Println(err)
286
		daemonize.SignalOutcome(err)
287
		os.Exit(1)
288
	}
289
	// load  conf from master
290
	for retry := 0; retry < MasterRetrys; retry++ {
291
		err = loadConfFromMaster(opt)
292
		if err != nil {
293
			time.Sleep(5 * time.Second * time.Duration(retry+1))
294
		} else {
295
			break
296
		}
297
	}
298
	if err != nil {
299
		err = errors.NewErrorf("parse mount opt from master failed: %v\n", err)
300
		fmt.Println(err)
301
		daemonize.SignalOutcome(err)
302
		os.Exit(1)
303
	}
304

305
	if opt.MaxCPUs > 0 {
306
		runtime.GOMAXPROCS(int(opt.MaxCPUs))
307
	}
308
	// use uber automaxprocs: get real cpu number to k8s pod"
309

310
	level := parseLogLevel(opt.Loglvl)
311
	_, err = log.InitLog(opt.Logpath, opt.Volname, level, nil, log.DefaultLogLeftSpaceLimit)
312
	if err != nil {
313
		err = errors.NewErrorf("Init log dir fail: %v\n", err)
314
		fmt.Println(err)
315
		daemonize.SignalOutcome(err)
316
		os.Exit(1)
317
	}
318
	defer log.LogFlush()
319

320
	if _, err = os.Stat(opt.MountPoint); err != nil {
321
		if err = os.Mkdir(opt.MountPoint, os.ModePerm); err != nil {
322
			err = errors.NewErrorf("Init.MountPoint mkdir failed error %v\n", err)
323
			fmt.Println(err)
324
			os.Exit(1)
325
		}
326
	}
327

328
	_, err = stat.NewStatistic(opt.Logpath, LoggerPrefix, int64(stat.DefaultStatLogSize),
329
		stat.DefaultTimeOutUs, true)
330
	if err != nil {
331
		err = errors.NewErrorf("Init stat log fail: %v\n", err)
332
		fmt.Println(err)
333
		daemonize.SignalOutcome(err)
334
		os.Exit(1)
335
	}
336
	stat.ClearStat()
337

338
	if opt.EnableAudit {
339
		_, err = auditlog.InitAuditWithPrefix(opt.Logpath, LoggerPrefix, int64(auditlog.DefaultAuditLogSize),
340
			auditlog.NewAuditPrefix(opt.Master, opt.Volname, opt.SubDir, opt.MountPoint))
341
		if err != nil {
342
			err = errors.NewErrorf("Init audit log fail: %v\n", err)
343
			fmt.Println(err)
344
			daemonize.SignalOutcome(err)
345
			os.Exit(1)
346
		}
347
	}
348

349
	proto.InitBufferPool(opt.BuffersTotalLimit)
350
	if proto.IsCold(opt.VolType) {
351
		buf.InitCachePool(opt.EbsBlockSize)
352
	}
353
	if opt.EnableBcache {
354
		buf.InitbCachePool(bcache.MaxBlockSize)
355
	}
356
	outputFilePath := path.Join(opt.Logpath, LoggerPrefix, LoggerOutput)
357
	outputFile, err := os.OpenFile(outputFilePath, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o666)
358
	if err != nil {
359
		err = errors.NewErrorf("Open output file failed: %v\n", err)
360
		fmt.Println(err)
361
		daemonize.SignalOutcome(err)
362
		os.Exit(1)
363
	}
364
	defer func() {
365
		outputFile.Sync()
366
		outputFile.Close()
367
	}()
368
	syslog.SetOutput(outputFile)
369

370
	if *configRestoreFuse {
371
		syslog.Println("NeedAfterAlloc restore fuse")
372
		opt.NeedRestoreFuse = true
373
	}
374

375
	syslog.Println(proto.DumpVersion(Role))
376
	syslog.Println("*** Final Mount Options ***")
377
	for _, o := range GlobalMountOptions {
378
		syslog.Println(o)
379
	}
380
	syslog.Println("*** End ***")
381

382
	changeRlimit(defaultRlimit)
383

384
	if err = sysutil.RedirectFD(int(outputFile.Fd()), int(os.Stderr.Fd())); err != nil {
385
		err = errors.NewErrorf("Redirect fd failed: %v\n", err)
386
		syslog.Println(err)
387
		daemonize.SignalOutcome(err)
388
		os.Exit(1)
389
	}
390

391
	registerInterceptedSignal(opt.MountPoint)
392
	for retry := 0; retry < MasterRetrys; retry++ {
393
		err = checkPermission(opt)
394
		if err != nil {
395
			time.Sleep(5 * time.Second * time.Duration(retry+1))
396
		} else {
397
			break
398
		}
399
	}
400
	if err != nil {
401
		err = errors.NewErrorf("check permission failed: %v", err)
402
		syslog.Println(err)
403
		log.LogFlush()
404
		_ = daemonize.SignalOutcome(err)
405
		os.Exit(1)
406
	}
407

408
	var fud *os.File
409
	if opt.NeedRestoreFuse && *configFuseHttpPort != "" {
410
		log.LogInfof("Suspend/Restore by self\n")
411
		var udsName string
412
		if *configDynamicUDSName {
413
			udsName = fmt.Sprintf(DynamicUDSNameFormat, os.Getpid())
414
		} else {
415
			udsName = DefaultUDSName
416
		}
417

418
		// Tell old cfs-client to suspend first. This should be done
419
		// before mount() to avoid pprof port conflict between old and
420
		// new cfs-clients.
421
		if fud, err = doSuspend(udsName, *configFuseHttpPort); err != nil {
422
			log.LogErrorf("Failed to tell old cfs-client to suspend: %v\n", err)
423
			syslog.Printf("Error: Failed to tell old cfs-client to suspend: %v\n", err)
424
			log.LogFlush()
425
			_ = daemonize.SignalOutcome(err)
426
			os.Exit(1)
427
		}
428
	}
429

430
	fsConn, super, err := mount(opt)
431
	if err != nil {
432
		err = errors.NewErrorf("mount failed: %v", err)
433
		syslog.Println(err)
434
		log.LogFlush()
435
		_ = daemonize.SignalOutcome(err)
436
		os.Exit(1)
437
	} else {
438
		_ = daemonize.SignalOutcome(nil)
439
	}
440
	defer fsConn.Close()
441
	defer super.Close()
442

443
	syslog.Printf("enable bcache %v", opt.EnableBcache)
444

445
	if cfg.GetString(exporter.ConfigKeyPushAddr) == "" {
446
		pushAddr, err := getPushAddrFromMaster(opt.Master)
447
		if err == nil && pushAddr != "" {
448
			syslog.Printf("use remote push addr %v", pushAddr)
449
			cfg.SetString(exporter.ConfigKeyPushAddr, pushAddr)
450
		}
451
	}
452

453
	exporter.Init(ModuleName, cfg)
454
	exporter.RegistConsul(super.ClusterName(), ModuleName, cfg)
455

456
	err = log.OutputPid(opt.Logpath, ModuleName)
457
	if err != nil {
458
		log.LogFlush()
459
		syslog.Printf("output pid err(%v)", err)
460
		os.Exit(1)
461
	}
462

463
	if opt.NeedRestoreFuse {
464
		if fud == nil {
465
			if *configRestoreFuseUDS == "" {
466
				super.SetSockAddr(DefaultUDSName)
467
			} else {
468
				super.SetSockAddr(*configRestoreFuseUDS)
469
			}
470
		} else {
471
			fsConn.SetFuseDevFile(fud)
472
		}
473
	}
474

475
	if err = fs.Serve(fsConn, super, opt); err != nil {
476
		log.LogFlush()
477
		syslog.Printf("fs Serve returns err(%v)", err)
478
		os.Exit(1)
479
	}
480

481
	<-fsConn.Ready
482
	if fsConn.MountError != nil {
483
		log.LogFlush()
484
		syslog.Printf("fs Serve returns err(%v)\n", err)
485
		os.Exit(1)
486
	}
487
}
488

489
func getPushAddrFromMaster(masterAddr string) (addr string, err error) {
490
	mc := master.NewMasterClientFromString(masterAddr, false)
491
	addr, err = mc.AdminAPI().GetMonitorPushAddr()
492
	return
493
}
494

495
func startDaemon() error {
496
	cmdPath, err := os.Executable()
497
	if err != nil {
498
		return fmt.Errorf("startDaemon failed: cannot get absolute command path, err(%v)", err)
499
	}
500

501
	if len(os.Args) <= 1 {
502
		return fmt.Errorf("startDaemon failed: cannot use null arguments")
503
	}
504

505
	args := []string{"-f"}
506
	args = append(args, os.Args[1:]...)
507

508
	if *configFile != "" {
509
		configPath, err := filepath.Abs(*configFile)
510
		if err != nil {
511
			return fmt.Errorf("startDaemon failed: cannot get absolute command path of config file(%v) , err(%v)", *configFile, err)
512
		}
513
		for i := 0; i < len(args); i++ {
514
			if args[i] == "-c" {
515
				// Since *configFile is not "", the (i+1)th argument must be the config file path
516
				args[i+1] = configPath
517
				break
518
			}
519
		}
520
	}
521

522
	env := os.Environ()
523

524
	// add GODEBUG=madvdontneed=1 environ, to make sysUnused uses madvise(MADV_DONTNEED) to signal the kernel that a
525
	// range of allocated memory contains unneeded data.
526
	env = append(env, "GODEBUG=madvdontneed=1")
527
	err = daemonize.Run(cmdPath, args, env, os.Stdout)
528
	if err != nil {
529
		return fmt.Errorf("startDaemon failed: daemon start failed, cmd(%v) args(%v) env(%v) err(%v)\n", cmdPath, args, env, err)
530
	}
531

532
	return nil
533
}
534

535
func waitListenAndServe(statusCh chan error, addr string, handler http.Handler) {
536
	var err error
537
	var loop int = 0
538
	var interval int = (1 << 17) - 1
539
	var listener net.Listener
540
	var dynamicPort bool
541

542
	if addr == ":" {
543
		addr = ":0"
544
	}
545

546
	// FIXME: 1 min timeout?
547
	timeout := time.Now().Add(time.Minute)
548
	for {
549
		if listener, err = net.Listen("tcp", addr); err == nil {
550
			break
551
		}
552

553
		// addr is not released for use
554
		if strings.Contains(err.Error(), "bind: address already in use") {
555
			if loop&interval == 0 {
556
				syslog.Printf("address %v is still in use\n", addr)
557
			}
558
			runtime.Gosched()
559
		} else {
560
			break
561
		}
562
		if time.Now().After(timeout) {
563
			msg := fmt.Sprintf("address %v is still in use after "+
564
				"timeout, choose port automatically\n", addr)
565
			syslog.Print(msg)
566
			msg = "Warning: " + msg
567
			daemonize.StatusWriter.Write([]byte(msg))
568
			dynamicPort = true
569
			break
570
		}
571
		loop++
572
	}
573
	syslog.Printf("address %v wait loop %v\n", addr, loop)
574

575
	if dynamicPort {
576
		ipport := strings.Split(addr, ":")
577
		addr = ipport[0] + ":0"
578
		listener, err = net.Listen("tcp", addr)
579
	}
580

581
	if err != nil {
582
		statusCh <- err
583
		return
584
	}
585

586
	statusCh <- nil
587
	msg := fmt.Sprintf("Start pprof with port: %v\n",
588
		listener.Addr().(*net.TCPAddr).Port)
589
	syslog.Print(msg)
590
	if dynamicPort {
591
		msg = "Warning: " + msg
592
		daemonize.StatusWriter.Write([]byte(msg))
593
	}
594
	http.Serve(listener, handler)
595
	// unreachable
596
}
597

598
func mount(opt *proto.MountOptions) (fsConn *fuse.Conn, super *cfs.Super, err error) {
599
	super, err = cfs.NewSuper(opt)
600
	if err != nil {
601
		log.LogError(errors.Stack(err))
602
		return
603
	}
604

605
	http.HandleFunc(ControlCommandSetRate, super.SetRate)
606
	http.HandleFunc(ControlCommandGetRate, super.GetRate)
607
	http.HandleFunc(log.SetLogLevelPath, log.SetLogLevel)
608
	http.HandleFunc(ControlCommandFreeOSMemory, freeOSMemory)
609
	http.HandleFunc(log.GetLogPath, log.GetLog)
610
	http.HandleFunc(ControlCommandSuspend, super.SetSuspend)
611
	http.HandleFunc(ControlCommandResume, super.SetResume)
612
	// auditlog
613
	http.HandleFunc(auditlog.EnableAuditLogReqPath, super.EnableAuditLog)
614
	http.HandleFunc(auditlog.DisableAuditLogReqPath, auditlog.DisableAuditLog)
615
	http.HandleFunc(auditlog.SetAuditLogBufSizeReqPath, auditlog.ResetWriterBuffSize)
616

617
	statusCh := make(chan error)
618
	pprofAddr := ":" + opt.Profport
619
	if opt.LocallyProf {
620
		pprofAddr = "127.0.0.1:" + opt.Profport
621
	}
622
	mainMux := http.NewServeMux()
623
	mux := http.NewServeMux()
624
	mux.Handle("/debug/pprof", http.HandlerFunc(pprof.Index))
625
	mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
626
	mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
627
	mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
628
	mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
629
	mux.Handle("/debug/", http.HandlerFunc(pprof.Index))
630
	mainHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
631
		if strings.HasPrefix(req.URL.Path, "/debug/") {
632
			mux.ServeHTTP(w, req)
633
		} else {
634
			http.DefaultServeMux.ServeHTTP(w, req)
635
		}
636
	})
637
	mainMux.Handle("/", mainHandler)
638

639
	go waitListenAndServe(statusCh, pprofAddr, mainMux)
640
	if err = <-statusCh; err != nil {
641
		daemonize.SignalOutcome(err)
642
		return
643
	}
644

645
	go func() {
646
		mc := master.NewMasterClientFromString(opt.Master, false)
647
		t := time.NewTicker(UpdateConfInterval)
648
		defer t.Stop()
649
		for range t.C {
650
			log.LogDebugf("UpdateVolConf: load conf from master")
651
			var volumeInfo *proto.SimpleVolView
652
			volumeInfo, err = mc.AdminAPI().GetVolumeSimpleInfo(opt.Volname)
653
			if err != nil {
654
				log.LogErrorf("UpdateVolConf: get vol info from master failed, err %s", err.Error())
655
				if err == proto.ErrVolNotExists {
656
					log.LogFlush()
657
					daemonize.SignalOutcome(err)
658
					os.Exit(1)
659
				}
660
				continue
661
			}
662
			if volumeInfo.Status == proto.VolStatusMarkDelete {
663
				err = fmt.Errorf("vol [%s] has been deleted, stop client", volumeInfo.Name)
664
				log.LogError(err)
665
				log.LogFlush()
666
				daemonize.SignalOutcome(err)
667
				os.Exit(1)
668
			}
669
			super.SetTransaction(volumeInfo.EnableTransaction, volumeInfo.TxTimeout, volumeInfo.TxConflictRetryNum, volumeInfo.TxConflictRetryInterval)
670
			if proto.IsCold(opt.VolType) {
671
				super.CacheAction = volumeInfo.CacheAction
672
				super.CacheThreshold = volumeInfo.CacheThreshold
673
				super.EbsBlockSize = volumeInfo.ObjBlockSize
674
			}
675
		}
676
	}()
677

678
	if err = ump.InitUmp(fmt.Sprintf("%v_%v", super.ClusterName(), ModuleName), opt.UmpDatadir); err != nil {
679
		return
680
	}
681

682
	options := []fuse.MountOption{
683
		fuse.AllowOther(),
684
		fuse.MaxReadahead(MaxReadAhead),
685
		fuse.AsyncRead(),
686
		fuse.AutoInvalData(opt.AutoInvalData),
687
		fuse.FSName(opt.FileSystemName),
688
		fuse.Subtype("cubefs"),
689
		fuse.LocalVolume(),
690
		fuse.VolumeName(opt.FileSystemName),
691
		fuse.RequestTimeout(opt.RequestTimeout),
692
	}
693

694
	if opt.Rdonly {
695
		options = append(options, fuse.ReadOnly())
696
	}
697

698
	if opt.WriteCache {
699
		options = append(options, fuse.WritebackCache())
700
	}
701

702
	if opt.EnablePosixACL {
703
		options = append(options, fuse.PosixACL())
704
		options = append(options, fuse.DefaultPermissions())
705
	}
706

707
	if opt.EnableUnixPermission {
708
		options = append(options, fuse.DefaultPermissions())
709
	}
710

711
	fsConn, err = fuse.Mount(opt.MountPoint, opt.NeedRestoreFuse, options...)
712
	return
713
}
714

715
func registerInterceptedSignal(mnt string) {
716
	sigC := make(chan os.Signal, 1)
717
	signal.Notify(sigC, syscall.SIGINT, syscall.SIGTERM)
718
	go func() {
719
		sig := <-sigC
720
		syslog.Printf("Killed due to a received signal (%v)\n", sig)
721
		auditlog.StopAudit()
722
		os.Exit(1)
723
	}()
724
}
725

726
func parseMountOption(cfg *config.Config) (*proto.MountOptions, error) {
727
	var err error
728
	opt := new(proto.MountOptions)
729

730
	proto.ParseMountOptions(GlobalMountOptions, cfg)
731

732
	rawmnt := GlobalMountOptions[proto.MountPoint].GetString()
733
	opt.MountPoint, err = filepath.Abs(rawmnt)
734
	if err != nil {
735
		return nil, errors.Trace(err, "invalide mount point (%v) ", rawmnt)
736
	}
737
	opt.Volname = GlobalMountOptions[proto.VolName].GetString()
738
	opt.Owner = GlobalMountOptions[proto.Owner].GetString()
739
	opt.Master = GlobalMountOptions[proto.Master].GetString()
740
	logPath := GlobalMountOptions[proto.LogDir].GetString()
741
	if len(logPath) == 0 {
742
		logPath = DefaultLogPath
743
	}
744
	opt.Logpath = path.Join(logPath, LoggerPrefix)
745
	opt.Loglvl = GlobalMountOptions[proto.LogLevel].GetString()
746
	opt.Profport = GlobalMountOptions[proto.ProfPort].GetString()
747
	opt.LocallyProf = GlobalMountOptions[proto.LocallyProf].GetBool()
748
	opt.IcacheTimeout = GlobalMountOptions[proto.IcacheTimeout].GetInt64()
749
	opt.LookupValid = GlobalMountOptions[proto.LookupValid].GetInt64()
750
	opt.AttrValid = GlobalMountOptions[proto.AttrValid].GetInt64()
751
	opt.ReadRate = GlobalMountOptions[proto.ReadRate].GetInt64()
752
	opt.WriteRate = GlobalMountOptions[proto.WriteRate].GetInt64()
753
	opt.EnSyncWrite = GlobalMountOptions[proto.EnSyncWrite].GetInt64()
754
	opt.AutoInvalData = GlobalMountOptions[proto.AutoInvalData].GetInt64()
755
	opt.UmpDatadir = GlobalMountOptions[proto.WarnLogDir].GetString()
756
	opt.Rdonly = GlobalMountOptions[proto.Rdonly].GetBool()
757
	opt.WriteCache = GlobalMountOptions[proto.WriteCache].GetBool()
758
	opt.KeepCache = GlobalMountOptions[proto.KeepCache].GetBool()
759
	opt.FollowerRead = GlobalMountOptions[proto.FollowerRead].GetBool()
760
	opt.Authenticate = GlobalMountOptions[proto.Authenticate].GetBool()
761
	if opt.Authenticate {
762
		opt.TicketMess.ClientKey = GlobalMountOptions[proto.ClientKey].GetString()
763
		ticketHostConfig := GlobalMountOptions[proto.TicketHost].GetString()
764
		ticketHosts := strings.Split(ticketHostConfig, ",")
765
		opt.TicketMess.TicketHosts = ticketHosts
766
		opt.TicketMess.EnableHTTPS = GlobalMountOptions[proto.EnableHTTPS].GetBool()
767
		if opt.TicketMess.EnableHTTPS {
768
			opt.TicketMess.CertFile = GlobalMountOptions[proto.CertFile].GetString()
769
		}
770
	}
771
	opt.AccessKey = GlobalMountOptions[proto.AccessKey].GetString()
772
	opt.SecretKey = GlobalMountOptions[proto.SecretKey].GetString()
773
	opt.DisableDcache = GlobalMountOptions[proto.DisableDcache].GetBool()
774
	opt.SubDir = GlobalMountOptions[proto.SubDir].GetString()
775
	opt.FsyncOnClose = GlobalMountOptions[proto.FsyncOnClose].GetBool()
776
	opt.MaxCPUs = GlobalMountOptions[proto.MaxCPUs].GetInt64()
777
	opt.EnableXattr = GlobalMountOptions[proto.EnableXattr].GetBool()
778
	opt.NearRead = GlobalMountOptions[proto.NearRead].GetBool()
779
	opt.EnablePosixACL = GlobalMountOptions[proto.EnablePosixACL].GetBool()
780
	opt.EnableSummary = GlobalMountOptions[proto.EnableSummary].GetBool()
781
	opt.EnableUnixPermission = GlobalMountOptions[proto.EnableUnixPermission].GetBool()
782
	opt.ReadThreads = GlobalMountOptions[proto.ReadThreads].GetInt64()
783
	opt.WriteThreads = GlobalMountOptions[proto.WriteThreads].GetInt64()
784

785
	opt.BcacheDir = GlobalMountOptions[proto.BcacheDir].GetString()
786
	// opt.EnableBcache = GlobalMountOptions[proto.EnableBcache].GetBool()
787
	opt.BcacheFilterFiles = GlobalMountOptions[proto.BcacheFilterFiles].GetString()
788
	opt.BcacheBatchCnt = GlobalMountOptions[proto.BcacheBatchCnt].GetInt64()
789
	opt.BcacheCheckIntervalS = GlobalMountOptions[proto.BcacheCheckIntervalS].GetInt64()
790
	if _, err := os.Stat(bcache.UnixSocketPath); err == nil && opt.BcacheDir != "" {
791
		opt.EnableBcache = true
792
	}
793

794
	opt.EnableBcache = GlobalMountOptions[proto.EnableBcache].GetBool()
795
	if opt.Rdonly {
796
		verReadSeq := GlobalMountOptions[proto.SnapshotReadVerSeq].GetInt64()
797
		if verReadSeq == -1 {
798
			opt.VerReadSeq = math.MaxUint64
799
		} else {
800
			opt.VerReadSeq = uint64(verReadSeq)
801
		}
802
		log.LogDebugf("oonfig.verReadSeq %v opt.VerReadSeq %v", verReadSeq, opt.VerReadSeq)
803
	}
804
	opt.MetaSendTimeout = GlobalMountOptions[proto.MetaSendTimeout].GetInt64()
805

806
	opt.BuffersTotalLimit = GlobalMountOptions[proto.BuffersTotalLimit].GetInt64()
807
	opt.MetaSendTimeout = GlobalMountOptions[proto.MetaSendTimeout].GetInt64()
808
	opt.MaxStreamerLimit = GlobalMountOptions[proto.MaxStreamerLimit].GetInt64()
809
	opt.EnableAudit = GlobalMountOptions[proto.EnableAudit].GetBool()
810
	opt.RequestTimeout = GlobalMountOptions[proto.RequestTimeout].GetInt64()
811
	opt.MinWriteAbleDataPartitionCnt = int(GlobalMountOptions[proto.MinWriteAbleDataPartitionCnt].GetInt64())
812
	opt.FileSystemName = GlobalMountOptions[proto.FileSystemName].GetString()
813

814
	if opt.MountPoint == "" || opt.Volname == "" || opt.Owner == "" || opt.Master == "" {
815
		return nil, errors.New(fmt.Sprintf("invalid config file: lack of mandatory fields, mountPoint(%v), volName(%v), owner(%v), masterAddr(%v)", opt.MountPoint, opt.Volname, opt.Owner, opt.Master))
816
	}
817

818
	if opt.BuffersTotalLimit < 0 {
819
		return nil, errors.New(fmt.Sprintf("invalid fields, BuffersTotalLimit(%v) must larger or equal than 0", opt.BuffersTotalLimit))
820
	}
821

822
	if opt.FileSystemName == "" {
823
		opt.FileSystemName = "cubefs-" + opt.Volname
824
	}
825

826
	return opt, nil
827
}
828

829
func checkPermission(opt *proto.MountOptions) (err error) {
830
	mc := master.NewMasterClientFromString(opt.Master, false)
831
	localIP, _ := ump.GetLocalIpAddr()
832
	if info, err := mc.UserAPI().AclOperation(opt.Volname, localIP, util.AclCheckIP); err != nil || !info.OK {
833
		syslog.Println(err)
834
		return proto.ErrNoAclPermission
835
	}
836
	// Check user access policy is enabled
837
	if opt.AccessKey != "" {
838
		var userInfo *proto.UserInfo
839
		if userInfo, err = mc.UserAPI().GetAKInfo(opt.AccessKey); err != nil {
840
			return
841
		}
842
		if userInfo.SecretKey != opt.SecretKey {
843
			err = proto.ErrNoPermission
844
			return
845
		}
846
		policy := userInfo.Policy
847
		if policy.IsOwn(opt.Volname) {
848
			return
849
		}
850
		if policy.IsAuthorized(opt.Volname, opt.SubDir, proto.POSIXWriteAction) &&
851
			policy.IsAuthorized(opt.Volname, opt.SubDir, proto.POSIXReadAction) {
852
			return
853
		}
854
		if policy.IsAuthorized(opt.Volname, opt.SubDir, proto.POSIXReadAction) &&
855
			!policy.IsAuthorized(opt.Volname, opt.SubDir, proto.POSIXWriteAction) {
856
			opt.Rdonly = true
857
			return
858
		}
859
		err = proto.ErrNoPermission
860
		return
861
	}
862
	return
863
}
864

865
func parseLogLevel(loglvl string) log.Level {
866
	var level log.Level
867
	switch strings.ToLower(loglvl) {
868
	case "debug":
869
		level = log.DebugLevel
870
	case "info":
871
		level = log.InfoLevel
872
	case "warn":
873
		level = log.WarnLevel
874
	case "error":
875
		level = log.ErrorLevel
876
	default:
877
		level = log.ErrorLevel
878
	}
879
	return level
880
}
881

882
func changeRlimit(val uint64) {
883
	rlimit := &syscall.Rlimit{Max: val, Cur: val}
884
	err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, rlimit)
885
	if err != nil {
886
		syslog.Printf("Failed to set rlimit to %v \n", val)
887
	} else {
888
		syslog.Printf("Successfully set rlimit to %v \n", val)
889
	}
890
}
891

892
func freeOSMemory(w http.ResponseWriter, r *http.Request) {
893
	debug.FreeOSMemory()
894
}
895

896
func loadConfFromMaster(opt *proto.MountOptions) (err error) {
897
	mc := master.NewMasterClientFromString(opt.Master, false)
898
	var volumeInfo *proto.SimpleVolView
899
	volumeInfo, err = mc.AdminAPI().GetVolumeSimpleInfo(opt.Volname)
900
	if err != nil {
901
		return
902
	}
903
	opt.VolType = volumeInfo.VolType
904
	opt.EbsBlockSize = volumeInfo.ObjBlockSize
905
	opt.CacheAction = volumeInfo.CacheAction
906
	opt.CacheThreshold = volumeInfo.CacheThreshold
907
	opt.EnableQuota = volumeInfo.EnableQuota
908
	opt.EnableTransaction = volumeInfo.EnableTransaction
909
	opt.TxTimeout = volumeInfo.TxTimeout
910
	opt.TxConflictRetryNum = volumeInfo.TxConflictRetryNum
911
	opt.TxConflictRetryInterval = volumeInfo.TxConflictRetryInterval
912

913
	var clusterInfo *proto.ClusterInfo
914
	clusterInfo, err = mc.AdminAPI().GetClusterInfo()
915
	if err != nil {
916
		return
917
	}
918
	opt.EbsEndpoint = clusterInfo.EbsAddr
919
	opt.EbsServicePath = clusterInfo.ServicePath
920
	return
921
}
922

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.