cubefs

Форк
0
/
libsdk.go 
1519 строк · 35.2 Кб
1
// Copyright 2020 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package main
16

17
/*
18

19
#define _GNU_SOURCE
20
#include <string.h>
21
#include <stdint.h>
22
#include <sys/types.h>
23
#include <sys/stat.h>
24
#include <dirent.h>
25
#include <fcntl.h>
26

27
struct cfs_stat_info {
28
    uint64_t ino;
29
    uint64_t size;
30
    uint64_t blocks;
31
    uint64_t atime;
32
    uint64_t mtime;
33
    uint64_t ctime;
34
    uint32_t atime_nsec;
35
    uint32_t mtime_nsec;
36
    uint32_t ctime_nsec;
37
    mode_t   mode;
38
    uint32_t nlink;
39
    uint32_t blk_size;
40
    uint32_t uid;
41
    uint32_t gid;
42
};
43

44
struct cfs_summary_info {
45
    int64_t files;
46
    int64_t subdirs;
47
    int64_t fbytes;
48
};
49

50
struct cfs_dirent {
51
    uint64_t ino;
52
    char     name[256];
53
    char     d_type;
54
    uint32_t     nameLen;
55
};
56

57
struct cfs_hdfs_stat_info {
58
    uint64_t size;
59
    uint64_t atime;
60
    uint64_t mtime;
61
    uint32_t atime_nsec;
62
    uint32_t mtime_nsec;
63
    mode_t   mode;
64
};
65

66
struct cfs_dirent_info {
67
    struct   cfs_hdfs_stat_info stat;
68
    char     d_type;
69
    char     name[256];
70
    uint32_t     nameLen;
71
};
72

73
*/
74
import "C"
75

76
import (
77
	"context"
78
	"fmt"
79
	"io"
80
	syslog "log"
81
	"os"
82
	gopath "path"
83
	"reflect"
84
	"regexp"
85
	"strconv"
86
	"strings"
87
	"sync"
88
	"sync/atomic"
89
	"syscall"
90
	"time"
91
	"unsafe"
92

93
	"github.com/bits-and-blooms/bitset"
94
	"github.com/cubefs/cubefs/blobstore/api/access"
95
	"github.com/cubefs/cubefs/blobstore/common/trace"
96
	"github.com/cubefs/cubefs/blockcache/bcache"
97
	"github.com/cubefs/cubefs/client/fs"
98
	"github.com/cubefs/cubefs/proto"
99
	"github.com/cubefs/cubefs/sdk/data/blobstore"
100
	"github.com/cubefs/cubefs/sdk/data/stream"
101
	masterSDK "github.com/cubefs/cubefs/sdk/master"
102
	"github.com/cubefs/cubefs/sdk/meta"
103
	"github.com/cubefs/cubefs/util/auditlog"
104
	"github.com/cubefs/cubefs/util/buf"
105
	"github.com/cubefs/cubefs/util/errors"
106
	"github.com/cubefs/cubefs/util/log"
107
	"github.com/cubefs/cubefs/util/stat"
108
)
109

110
const (
111
	defaultBlkSize = uint32(1) << 12
112

113
	maxFdNum uint = 10240000
114

115
	MaxSizePutOnce = int64(1) << 23
116
)
117

118
var gClientManager *clientManager
119

120
var (
121
	statusOK = C.int(0)
122
	// error status must be minus value
123
	statusEIO     = errorToStatus(syscall.EIO)
124
	statusEINVAL  = errorToStatus(syscall.EINVAL)
125
	statusEEXIST  = errorToStatus(syscall.EEXIST)
126
	statusEBADFD  = errorToStatus(syscall.EBADFD)
127
	statusEACCES  = errorToStatus(syscall.EACCES)
128
	statusEMFILE  = errorToStatus(syscall.EMFILE)
129
	statusENOTDIR = errorToStatus(syscall.ENOTDIR)
130
	statusEISDIR  = errorToStatus(syscall.EISDIR)
131
	statusENOSPC  = errorToStatus(syscall.ENOSPC)
132
)
133
var once sync.Once
134

135
func init() {
136
	gClientManager = &clientManager{
137
		clients: make(map[int64]*client),
138
	}
139
}
140

141
func errorToStatus(err error) C.int {
142
	if err == nil {
143
		return 0
144
	}
145
	if errno, is := err.(syscall.Errno); is {
146
		return -C.int(errno)
147
	}
148
	return -C.int(syscall.EIO)
149
}
150

151
type clientManager struct {
152
	nextClientID int64
153
	clients      map[int64]*client
154
	mu           sync.RWMutex
155
}
156

157
type pushConfig struct {
158
	PushAddr string `json:"pushAddr"`
159
}
160

161
func newClient() *client {
162
	id := atomic.AddInt64(&gClientManager.nextClientID, 1)
163
	c := &client{
164
		id:                  id,
165
		fdmap:               make(map[uint]*file),
166
		fdset:               bitset.New(maxFdNum),
167
		dirChildrenNumLimit: proto.DefaultDirChildrenNumLimit,
168
		cwd:                 "/",
169
		sc:                  fs.NewSummaryCache(fs.DefaultSummaryExpiration, fs.MaxSummaryCache),
170
		ic:                  fs.NewInodeCache(fs.DefaultInodeExpiration, fs.MaxInodeCache),
171
		dc:                  fs.NewDentryCache(),
172
	}
173

174
	gClientManager.mu.Lock()
175
	gClientManager.clients[id] = c
176
	gClientManager.mu.Unlock()
177

178
	return c
179
}
180

181
func getClient(id int64) (c *client, exist bool) {
182
	gClientManager.mu.RLock()
183
	defer gClientManager.mu.RUnlock()
184
	c, exist = gClientManager.clients[id]
185
	return
186
}
187

188
func removeClient(id int64) {
189
	gClientManager.mu.Lock()
190
	defer gClientManager.mu.Unlock()
191
	delete(gClientManager.clients, id)
192
}
193

194
type file struct {
195
	fd    uint
196
	ino   uint64
197
	pino  uint64
198
	flags uint32
199
	mode  uint32
200

201
	// dir only
202
	dirp *dirStream
203

204
	// rw
205
	fileWriter *blobstore.Writer
206
	fileReader *blobstore.Reader
207

208
	path string
209
}
210

211
type dirStream struct {
212
	pos     int
213
	dirents []proto.Dentry
214
}
215

216
type client struct {
217
	// client id allocated by libsdk
218
	id int64
219

220
	// mount config
221
	volName             string
222
	masterAddr          string
223
	followerRead        bool
224
	logDir              string
225
	logLevel            string
226
	ebsEndpoint         string
227
	servicePath         string
228
	volType             int
229
	cacheAction         int
230
	ebsBlockSize        int
231
	enableBcache        bool
232
	readBlockThread     int
233
	writeBlockThread    int
234
	cacheRuleKey        string
235
	cacheThreshold      int
236
	enableSummary       bool
237
	secretKey           string
238
	accessKey           string
239
	subDir              string
240
	pushAddr            string
241
	cluster             string
242
	dirChildrenNumLimit uint32
243
	enableAudit         bool
244

245
	// runtime context
246
	cwd    string // current working directory
247
	fdmap  map[uint]*file
248
	fdset  *bitset.BitSet
249
	fdlock sync.RWMutex
250

251
	// server info
252
	mw   *meta.MetaWrapper
253
	ec   *stream.ExtentClient
254
	ic   *fs.InodeCache
255
	dc   *fs.DentryCache
256
	bc   *bcache.BcacheClient
257
	ebsc *blobstore.BlobStoreClient
258
	sc   *fs.SummaryCache
259
}
260

261
//export cfs_new_client
262
func cfs_new_client() C.int64_t {
263
	c := newClient()
264
	// Just skip fd 0, 1, 2, to avoid confusion.
265
	c.fdset.Set(0).Set(1).Set(2)
266
	return C.int64_t(c.id)
267
}
268

269
//export cfs_set_client
270
func cfs_set_client(id C.int64_t, key, val *C.char) C.int {
271
	c, exist := getClient(int64(id))
272
	if !exist {
273
		return statusEINVAL
274
	}
275
	k := C.GoString(key)
276
	v := C.GoString(val)
277

278
	switch k {
279
	case "volName":
280
		c.volName = v
281
	case "masterAddr":
282
		c.masterAddr = v
283
	case "followerRead":
284
		if v == "true" {
285
			c.followerRead = true
286
		} else {
287
			c.followerRead = false
288
		}
289
	case "logDir":
290
		c.logDir = v
291
	case "logLevel":
292
		c.logLevel = v
293
	case "enableBcache":
294
		if v == "true" {
295
			c.enableBcache = true
296
		} else {
297
			c.enableBcache = false
298
		}
299
	case "readBlockThread":
300
		rt, err := strconv.Atoi(v)
301
		if err == nil {
302
			c.readBlockThread = rt
303
		}
304
	case "writeBlockThread":
305
		wt, err := strconv.Atoi(v)
306
		if err == nil {
307
			c.writeBlockThread = wt
308
		}
309
	case "enableSummary":
310
		if v == "true" {
311
			c.enableSummary = true
312
		} else {
313
			c.enableSummary = false
314
		}
315
	case "accessKey":
316
		c.accessKey = v
317
	case "secretKey":
318
		c.secretKey = v
319
	case "pushAddr":
320
		c.pushAddr = v
321
	case "enableAudit":
322
		if v == "true" {
323
			c.enableAudit = true
324
		} else {
325
			c.enableAudit = false
326
		}
327
	default:
328
		return statusEINVAL
329
	}
330
	return statusOK
331
}
332

333
//export cfs_start_client
334
func cfs_start_client(id C.int64_t) C.int {
335
	c, exist := getClient(int64(id))
336
	if !exist {
337
		return statusEINVAL
338
	}
339

340
	err := c.start()
341
	if err != nil {
342
		syslog.Println(err)
343
		return statusEIO
344
	}
345
	return statusOK
346
}
347

348
//export cfs_close_client
349
func cfs_close_client(id C.int64_t) {
350
	if c, exist := getClient(int64(id)); exist {
351
		if c.ec != nil {
352
			_ = c.ec.Close()
353
		}
354
		if c.mw != nil {
355
			_ = c.mw.Close()
356
		}
357
		removeClient(int64(id))
358
	}
359
	auditlog.StopAudit()
360
	log.LogFlush()
361
}
362

363
//export cfs_chdir
364
func cfs_chdir(id C.int64_t, path *C.char) C.int {
365
	c, exist := getClient(int64(id))
366
	if !exist {
367
		return statusEINVAL
368
	}
369
	cwd := c.absPath(C.GoString(path))
370
	dirInfo, err := c.lookupPath(cwd)
371
	if err != nil {
372
		return errorToStatus(err)
373
	}
374
	if !proto.IsDir(dirInfo.Mode) {
375
		return statusENOTDIR
376
	}
377
	c.cwd = cwd
378
	return statusOK
379
}
380

381
//export cfs_getcwd
382
func cfs_getcwd(id C.int64_t) *C.char {
383
	c, exist := getClient(int64(id)) // client's working directory
384
	if !exist {
385
		return C.CString("")
386
	}
387
	return C.CString(c.cwd)
388
}
389

390
//export cfs_getattr
391
func cfs_getattr(id C.int64_t, path *C.char, stat *C.struct_cfs_stat_info) C.int {
392
	c, exist := getClient(int64(id))
393
	if !exist {
394
		return statusEINVAL
395
	}
396

397
	info, err := c.lookupPath(c.absPath(C.GoString(path)))
398
	if err != nil {
399
		return errorToStatus(err)
400
	}
401

402
	// fill up the stat
403
	stat.ino = C.uint64_t(info.Inode)
404
	stat.size = C.uint64_t(info.Size)
405
	stat.nlink = C.uint32_t(info.Nlink)
406
	stat.blk_size = C.uint32_t(defaultBlkSize)
407
	stat.uid = C.uint32_t(info.Uid)
408
	stat.gid = C.uint32_t(info.Gid)
409

410
	if info.Size%512 != 0 {
411
		stat.blocks = C.uint64_t(info.Size>>9) + 1
412
	} else {
413
		stat.blocks = C.uint64_t(info.Size >> 9)
414
	}
415
	// fill up the mode
416
	if proto.IsRegular(info.Mode) {
417
		stat.mode = C.uint32_t(C.S_IFREG) | C.uint32_t(info.Mode&0o777)
418
	} else if proto.IsDir(info.Mode) {
419
		stat.mode = C.uint32_t(C.S_IFDIR) | C.uint32_t(info.Mode&0o777)
420
	} else if proto.IsSymlink(info.Mode) {
421
		stat.mode = C.uint32_t(C.S_IFLNK) | C.uint32_t(info.Mode&0o777)
422
	} else {
423
		stat.mode = C.uint32_t(C.S_IFSOCK) | C.uint32_t(info.Mode&0o777)
424
	}
425

426
	// fill up the time struct
427
	t := info.AccessTime.UnixNano()
428
	stat.atime = C.uint64_t(t / 1e9)
429
	stat.atime_nsec = C.uint32_t(t % 1e9)
430

431
	t = info.ModifyTime.UnixNano()
432
	stat.mtime = C.uint64_t(t / 1e9)
433
	stat.mtime_nsec = C.uint32_t(t % 1e9)
434

435
	t = info.CreateTime.UnixNano()
436
	stat.ctime = C.uint64_t(t / 1e9)
437
	stat.ctime_nsec = C.uint32_t(t % 1e9)
438

439
	return statusOK
440
}
441

442
//export cfs_setattr
443
func cfs_setattr(id C.int64_t, path *C.char, stat *C.struct_cfs_stat_info, valid C.int) C.int {
444
	c, exist := getClient(int64(id))
445
	if !exist {
446
		return statusEINVAL
447
	}
448

449
	info, err := c.lookupPath(c.absPath(C.GoString(path)))
450
	if err != nil {
451
		return errorToStatus(err)
452
	}
453

454
	err = c.setattr(info, uint32(valid), uint32(stat.mode), uint32(stat.uid), uint32(stat.gid), int64(stat.atime), int64(stat.mtime))
455

456
	if err != nil {
457
		return errorToStatus(err)
458
	}
459
	c.ic.Delete(info.Inode)
460
	return statusOK
461
}
462

463
//export cfs_open
464
func cfs_open(id C.int64_t, path *C.char, flags C.int, mode C.mode_t) C.int {
465
	c, exist := getClient(int64(id))
466
	if !exist {
467
		return statusEINVAL
468
	}
469
	start := time.Now()
470

471
	fuseMode := uint32(mode) & uint32(0o777)
472
	fuseFlags := uint32(flags) &^ uint32(0x8000)
473
	accFlags := fuseFlags & uint32(C.O_ACCMODE)
474

475
	absPath := c.absPath(C.GoString(path))
476

477
	var info *proto.InodeInfo
478
	var parentIno uint64
479

480
	/*
481
	 * Note that the rwx mode is ignored when using libsdk
482
	 */
483

484
	if fuseFlags&uint32(C.O_CREAT) != 0 {
485
		if accFlags != uint32(C.O_WRONLY) && accFlags != uint32(C.O_RDWR) {
486
			return statusEACCES
487
		}
488
		dirpath, name := gopath.Split(absPath)
489
		dirInfo, err := c.lookupPath(dirpath)
490
		if err != nil {
491
			return errorToStatus(err)
492
		}
493
		parentIno = dirInfo.Inode
494
		defer func() {
495
			if info != nil {
496
				auditlog.LogClientOp("Create", dirpath, "nil", err, time.Since(start).Microseconds(), info.Inode, 0)
497
			} else {
498
				auditlog.LogClientOp("Create", dirpath, "nil", err, time.Since(start).Microseconds(), 0, 0)
499
			}
500
		}()
501
		newInfo, err := c.create(dirInfo.Inode, name, fuseMode, absPath)
502
		if err != nil {
503
			if err != syscall.EEXIST {
504
				return errorToStatus(err)
505
			}
506
			newInfo, err = c.lookupPath(absPath)
507
			if err != nil {
508
				return errorToStatus(err)
509
			}
510
		}
511
		info = newInfo
512
	} else {
513
		dirpath, _ := gopath.Split(absPath)
514
		dirInfo, err := c.lookupPath(dirpath)
515
		if err != nil {
516
			return errorToStatus(err)
517
		}
518
		parentIno = dirInfo.Inode // parent inode
519
		newInfo, err := c.lookupPath(absPath)
520
		if err != nil {
521
			return errorToStatus(err)
522
		}
523
		info = newInfo
524
	}
525
	var fileCache bool
526
	if c.cacheRuleKey == "" {
527
		fileCache = false
528
	} else {
529
		fileCachePattern := fmt.Sprintf(".*%s.*", c.cacheRuleKey)
530
		fileCache, _ = regexp.MatchString(fileCachePattern, absPath)
531
	}
532
	f := c.allocFD(info.Inode, fuseFlags, fuseMode, fileCache, info.Size, parentIno, absPath)
533
	if f == nil {
534
		return statusEMFILE
535
	}
536

537
	if proto.IsRegular(info.Mode) {
538
		c.openStream(f)
539
		if fuseFlags&uint32(C.O_TRUNC) != 0 {
540
			if accFlags != uint32(C.O_WRONLY) && accFlags != uint32(C.O_RDWR) {
541
				c.closeStream(f)
542
				c.releaseFD(f.fd)
543
				return statusEACCES
544
			}
545
			if err := c.truncate(f, 0); err != nil {
546
				c.closeStream(f)
547
				c.releaseFD(f.fd)
548
				return statusEIO
549
			}
550
		}
551
	}
552

553
	return C.int(f.fd)
554
}
555

556
//export cfs_flush
557
func cfs_flush(id C.int64_t, fd C.int) C.int {
558
	c, exist := getClient(int64(id))
559
	if !exist {
560
		return statusEINVAL
561
	}
562

563
	f := c.getFile(uint(fd))
564
	if f == nil {
565
		return statusEBADFD
566
	}
567

568
	err := c.flush(f)
569
	if err != nil {
570
		return statusEIO
571
	}
572
	c.ic.Delete(f.ino)
573
	return statusOK
574
}
575

576
//export cfs_close
577
func cfs_close(id C.int64_t, fd C.int) {
578
	c, exist := getClient(int64(id))
579
	if !exist {
580
		return
581
	}
582
	f := c.releaseFD(uint(fd))
583
	if f != nil {
584
		c.flush(f)
585
		c.closeStream(f)
586
	}
587
}
588

589
//export cfs_write
590
func cfs_write(id C.int64_t, fd C.int, buf unsafe.Pointer, size C.size_t, off C.off_t) C.ssize_t {
591
	c, exist := getClient(int64(id))
592
	if !exist {
593
		return C.ssize_t(statusEINVAL)
594
	}
595

596
	f := c.getFile(uint(fd))
597
	if f == nil {
598
		return C.ssize_t(statusEBADFD)
599
	}
600

601
	accFlags := f.flags & uint32(C.O_ACCMODE)
602
	if accFlags != uint32(C.O_WRONLY) && accFlags != uint32(C.O_RDWR) {
603
		return C.ssize_t(statusEACCES)
604
	}
605

606
	var buffer []byte
607

608
	hdr := (*reflect.SliceHeader)(unsafe.Pointer(&buffer))
609
	hdr.Data = uintptr(buf)
610
	hdr.Len = int(size)
611
	hdr.Cap = int(size)
612

613
	var flags int
614
	var wait bool
615

616
	if f.flags&uint32(C.O_DIRECT) != 0 || f.flags&uint32(C.O_SYNC) != 0 || f.flags&uint32(C.O_DSYNC) != 0 {
617
		if proto.IsHot(c.volType) {
618
			wait = true
619
		}
620
	}
621
	if f.flags&uint32(C.O_APPEND) != 0 || proto.IsCold(c.volType) {
622
		flags |= proto.FlagsAppend
623
		flags |= proto.FlagsSyncWrite
624
	}
625

626
	n, err := c.write(f, int(off), buffer, flags)
627
	if err != nil {
628
		if err == syscall.ENOSPC {
629
			return C.ssize_t(statusENOSPC)
630
		}
631
		return C.ssize_t(statusEIO)
632
	}
633

634
	if wait {
635
		if err = c.flush(f); err != nil {
636
			return C.ssize_t(statusEIO)
637
		}
638
	}
639

640
	return C.ssize_t(n)
641
}
642

643
//export cfs_read
644
func cfs_read(id C.int64_t, fd C.int, buf unsafe.Pointer, size C.size_t, off C.off_t) C.ssize_t {
645
	c, exist := getClient(int64(id))
646
	if !exist {
647
		return C.ssize_t(statusEINVAL)
648
	}
649

650
	f := c.getFile(uint(fd))
651
	if f == nil {
652
		return C.ssize_t(statusEBADFD)
653
	}
654

655
	accFlags := f.flags & uint32(C.O_ACCMODE)
656
	if accFlags == uint32(C.O_WRONLY) {
657
		return C.ssize_t(statusEACCES)
658
	}
659

660
	var buffer []byte
661

662
	hdr := (*reflect.SliceHeader)(unsafe.Pointer(&buffer))
663
	hdr.Data = uintptr(buf)
664
	hdr.Len = int(size)
665
	hdr.Cap = int(size)
666

667
	n, err := c.read(f, int(off), buffer)
668
	if err != nil {
669
		return C.ssize_t(statusEIO)
670
	}
671

672
	return C.ssize_t(n)
673
}
674

675
//export cfs_batch_get_inodes
676
func cfs_batch_get_inodes(id C.int64_t, fd C.int, iids unsafe.Pointer, stats []C.struct_cfs_stat_info, count C.int) (n C.int) {
677
	c, exist := getClient(int64(id))
678
	if !exist {
679
		return statusEINVAL
680
	}
681

682
	f := c.getFile(uint(fd))
683
	if f == nil {
684
		return statusEBADFD
685
	}
686

687
	var inodeIDS []uint64
688

689
	hdr := (*reflect.SliceHeader)(unsafe.Pointer(&inodeIDS))
690
	hdr.Data = uintptr(iids)
691
	hdr.Len = int(count)
692
	hdr.Cap = int(count)
693

694
	infos := c.mw.BatchInodeGet(inodeIDS)
695
	if len(infos) > int(count) {
696
		return statusEINVAL
697
	}
698

699
	for i := 0; i < len(infos); i++ {
700
		// fill up the stat
701
		stats[i].ino = C.uint64_t(infos[i].Inode)
702
		stats[i].size = C.uint64_t(infos[i].Size)
703
		stats[i].blocks = C.uint64_t(infos[i].Size >> 9)
704
		stats[i].nlink = C.uint32_t(infos[i].Nlink)
705
		stats[i].blk_size = C.uint32_t(defaultBlkSize)
706
		stats[i].uid = C.uint32_t(infos[i].Uid)
707
		stats[i].gid = C.uint32_t(infos[i].Gid)
708

709
		// fill up the mode
710
		if proto.IsRegular(infos[i].Mode) {
711
			stats[i].mode = C.uint32_t(C.S_IFREG) | C.uint32_t(infos[i].Mode&0o777)
712
		} else if proto.IsDir(infos[i].Mode) {
713
			stats[i].mode = C.uint32_t(C.S_IFDIR) | C.uint32_t(infos[i].Mode&0o777)
714
		} else if proto.IsSymlink(infos[i].Mode) {
715
			stats[i].mode = C.uint32_t(C.S_IFLNK) | C.uint32_t(infos[i].Mode&0o777)
716
		} else {
717
			stats[i].mode = C.uint32_t(C.S_IFSOCK) | C.uint32_t(infos[i].Mode&0o777)
718
		}
719

720
		// fill up the time struct
721
		t := infos[i].AccessTime.UnixNano()
722
		stats[i].atime = C.uint64_t(t / 1e9)
723
		stats[i].atime_nsec = C.uint32_t(t % 1e9)
724

725
		t = infos[i].ModifyTime.UnixNano()
726
		stats[i].mtime = C.uint64_t(t / 1e9)
727
		stats[i].mtime_nsec = C.uint32_t(t % 1e9)
728

729
		t = infos[i].CreateTime.UnixNano()
730
		stats[i].ctime = C.uint64_t(t / 1e9)
731
		stats[i].ctime_nsec = C.uint32_t(t % 1e9)
732
	}
733

734
	n = C.int(len(infos))
735
	return
736
}
737

738
//export cfs_refreshsummary
739
func cfs_refreshsummary(id C.int64_t, path *C.char, goroutine_num C.int) C.int {
740
	c, exist := getClient(int64(id))
741
	if !exist {
742
		return statusEINVAL
743
	}
744
	if !c.enableSummary {
745
		return statusEINVAL
746
	}
747
	info, err := c.lookupPath(c.absPath(C.GoString(path)))
748
	var ino uint64
749
	if err != nil {
750
		ino = proto.RootIno
751
	} else {
752
		ino = info.Inode
753
	}
754
	goroutineNum := int32(goroutine_num)
755
	err = c.mw.RefreshSummary_ll(ino, goroutineNum)
756
	if err != nil {
757
		return errorToStatus(err)
758
	}
759
	return statusOK
760
}
761

762
/*
763
 * Note that readdir is not thread-safe according to the POSIX spec.
764
 */
765

766
//export cfs_readdir
767
func cfs_readdir(id C.int64_t, fd C.int, dirents []C.struct_cfs_dirent, count C.int) (n C.int) {
768
	c, exist := getClient(int64(id))
769
	if !exist {
770
		return statusEINVAL
771
	}
772

773
	f := c.getFile(uint(fd))
774
	if f == nil {
775
		return statusEBADFD
776
	}
777

778
	if f.dirp == nil {
779
		f.dirp = &dirStream{}
780
		dentries, err := c.mw.ReadDir_ll(f.ino)
781
		if err != nil {
782
			return errorToStatus(err)
783
		}
784
		f.dirp.dirents = dentries
785
	}
786

787
	dirp := f.dirp
788
	for dirp.pos < len(dirp.dirents) && n < count {
789
		// fill up ino
790
		dirents[n].ino = C.uint64_t(dirp.dirents[dirp.pos].Inode)
791

792
		// fill up d_type
793
		if proto.IsRegular(dirp.dirents[dirp.pos].Type) {
794
			dirents[n].d_type = C.DT_REG
795
		} else if proto.IsDir(dirp.dirents[dirp.pos].Type) {
796
			dirents[n].d_type = C.DT_DIR
797
		} else if proto.IsSymlink(dirp.dirents[dirp.pos].Type) {
798
			dirents[n].d_type = C.DT_LNK
799
		} else {
800
			dirents[n].d_type = C.DT_UNKNOWN
801
		}
802

803
		// fill up name
804
		nameLen := len(dirp.dirents[dirp.pos].Name)
805
		if nameLen >= 256 {
806
			nameLen = 255
807
		}
808
		hdr := (*reflect.StringHeader)(unsafe.Pointer(&dirp.dirents[dirp.pos].Name))
809
		C.memcpy(unsafe.Pointer(&dirents[n].name[0]), unsafe.Pointer(hdr.Data), C.size_t(nameLen))
810
		dirents[n].name[nameLen] = 0
811
		dirents[n].nameLen = C.uint32_t(nameLen)
812
		// advance cursor
813
		dirp.pos++
814
		n++
815
	}
816

817
	return n
818
}
819

820
//export cfs_lsdir
821
func cfs_lsdir(id C.int64_t, fd C.int, direntsInfo []C.struct_cfs_dirent_info, count C.int) (n C.int) {
822
	c, exist := getClient(int64(id))
823
	if !exist {
824
		return statusEINVAL
825
	}
826

827
	f := c.getFile(uint(fd))
828
	if f == nil {
829
		return statusEBADFD
830
	}
831

832
	if f.dirp == nil {
833
		f.dirp = &dirStream{}
834
		dentries, err := c.mw.ReadDir_ll(f.ino)
835
		if err != nil {
836
			return errorToStatus(err)
837
		}
838
		f.dirp.dirents = dentries
839
	}
840

841
	dirp := f.dirp
842
	inodeIDS := make([]uint64, count, count)
843
	inodeMap := make(map[uint64]C.int)
844
	for dirp.pos < len(dirp.dirents) && n < count {
845
		inodeIDS[n] = dirp.dirents[dirp.pos].Inode
846
		inodeMap[dirp.dirents[dirp.pos].Inode] = n
847
		// fill up d_type
848
		if proto.IsRegular(dirp.dirents[dirp.pos].Type) {
849
			direntsInfo[n].d_type = C.DT_REG
850
		} else if proto.IsDir(dirp.dirents[dirp.pos].Type) {
851
			direntsInfo[n].d_type = C.DT_DIR
852
		} else if proto.IsSymlink(dirp.dirents[dirp.pos].Type) {
853
			direntsInfo[n].d_type = C.DT_LNK
854
		} else {
855
			direntsInfo[n].d_type = C.DT_UNKNOWN
856
		}
857
		nameLen := len(dirp.dirents[dirp.pos].Name)
858
		if nameLen >= 256 {
859
			nameLen = 255
860
		}
861
		hdr := (*reflect.StringHeader)(unsafe.Pointer(&dirp.dirents[dirp.pos].Name))
862

863
		C.memcpy(unsafe.Pointer(&direntsInfo[n].name[0]), unsafe.Pointer(hdr.Data), C.size_t(nameLen))
864
		direntsInfo[n].name[nameLen] = 0
865
		direntsInfo[n].nameLen = C.uint32_t(nameLen)
866

867
		// advance cursor
868
		dirp.pos++
869
		n++
870
	}
871
	if n == 0 {
872
		return n
873
	}
874
	infos := c.mw.BatchInodeGet(inodeIDS)
875
	if len(infos) != int(n) {
876
		return statusEIO
877
	}
878
	for i := 0; i < len(infos); i++ {
879
		// fill up the stat
880
		index := inodeMap[infos[i].Inode]
881
		direntsInfo[index].stat.size = C.uint64_t(infos[i].Size)
882

883
		// fill up the mode
884
		if proto.IsRegular(infos[i].Mode) {
885
			direntsInfo[index].stat.mode = C.uint32_t(C.S_IFREG) | C.uint32_t(infos[i].Mode&0o777)
886
		} else if proto.IsDir(infos[i].Mode) {
887
			direntsInfo[index].stat.mode = C.uint32_t(C.S_IFDIR) | C.uint32_t(infos[i].Mode&0o777)
888
		} else if proto.IsSymlink(infos[i].Mode) {
889
			direntsInfo[index].stat.mode = C.uint32_t(C.S_IFLNK) | C.uint32_t(infos[i].Mode&0o777)
890
		} else {
891
			direntsInfo[index].stat.mode = C.uint32_t(C.S_IFSOCK) | C.uint32_t(infos[i].Mode&0o777)
892
		}
893

894
		// fill up the time struct
895
		t := infos[index].AccessTime.UnixNano()
896
		direntsInfo[index].stat.atime = C.uint64_t(t / 1e9)
897
		direntsInfo[index].stat.atime_nsec = C.uint32_t(t % 1e9)
898

899
		t = infos[index].ModifyTime.UnixNano()
900
		direntsInfo[index].stat.mtime = C.uint64_t(t / 1e9)
901
		direntsInfo[index].stat.mtime_nsec = C.uint32_t(t % 1e9)
902
	}
903
	return n
904
}
905

906
//export cfs_mkdirs
907
func cfs_mkdirs(id C.int64_t, path *C.char, mode C.mode_t) C.int {
908
	c, exist := getClient(int64(id))
909
	if !exist {
910
		return statusEINVAL
911
	}
912

913
	start := time.Now()
914
	var gerr error
915
	var gino uint64
916

917
	dirpath := c.absPath(C.GoString(path))
918
	if dirpath == "/" {
919
		return statusEEXIST
920
	}
921

922
	defer func() {
923
		if gerr == nil {
924
			auditlog.LogClientOp("Mkdir", dirpath, "nil", gerr, time.Since(start).Microseconds(), gino, 0)
925
		} else {
926
			auditlog.LogClientOp("Mkdir", dirpath, "nil", gerr, time.Since(start).Microseconds(), 0, 0)
927
		}
928
	}()
929

930
	pino := proto.RootIno
931
	dirs := strings.Split(dirpath, "/")
932
	for _, dir := range dirs {
933
		if dir == "/" || dir == "" {
934
			continue
935
		}
936
		child, _, err := c.mw.Lookup_ll(pino, dir)
937
		if err != nil {
938
			if err == syscall.ENOENT {
939
				info, err := c.mkdir(pino, dir, uint32(mode), dirpath)
940

941
				if err != nil {
942
					if err != syscall.EEXIST {
943
						gerr = err
944
						return errorToStatus(err)
945
					}
946
				} else {
947
					child = info.Inode
948
				}
949
			} else {
950
				gerr = err
951
				return errorToStatus(err)
952
			}
953
		}
954
		pino = child
955
		gino = child
956
	}
957

958
	return 0
959
}
960

961
//export cfs_rmdir
962
func cfs_rmdir(id C.int64_t, path *C.char) C.int {
963
	c, exist := getClient(int64(id))
964
	if !exist {
965
		return statusEINVAL
966
	}
967
	start := time.Now()
968
	var err error
969
	var info *proto.InodeInfo
970

971
	absPath := c.absPath(C.GoString(path))
972
	defer func() {
973
		if info == nil {
974
			auditlog.LogClientOp("Rmdir", absPath, "nil", err, time.Since(start).Microseconds(), 0, 0)
975
		} else {
976
			auditlog.LogClientOp("Rmdir", absPath, "nil", err, time.Since(start).Microseconds(), info.Inode, 0)
977
		}
978
	}()
979
	dirpath, name := gopath.Split(absPath)
980
	dirInfo, err := c.lookupPath(dirpath)
981
	if err != nil {
982
		return errorToStatus(err)
983
	}
984

985
	info, err = c.mw.Delete_ll(dirInfo.Inode, name, true, absPath)
986
	c.ic.Delete(dirInfo.Inode)
987
	c.dc.Delete(absPath)
988
	return errorToStatus(err)
989
}
990

991
//export cfs_unlink
992
func cfs_unlink(id C.int64_t, path *C.char) C.int {
993
	c, exist := getClient(int64(id))
994
	if !exist {
995
		return statusEINVAL
996
	}
997

998
	start := time.Now()
999
	var err error
1000
	var info *proto.InodeInfo
1001

1002
	absPath := c.absPath(C.GoString(path))
1003
	dirpath, name := gopath.Split(absPath)
1004

1005
	defer func() {
1006
		if info == nil {
1007
			auditlog.LogClientOp("Unlink", absPath, "nil", err, time.Since(start).Microseconds(), 0, 0)
1008
		} else {
1009
			auditlog.LogClientOp("Unlink", absPath, "nil", err, time.Since(start).Microseconds(), info.Inode, 0)
1010
		}
1011
	}()
1012
	dirInfo, err := c.lookupPath(dirpath)
1013
	if err != nil {
1014
		return errorToStatus(err)
1015
	}
1016

1017
	_, mode, err := c.mw.Lookup_ll(dirInfo.Inode, name)
1018
	if err != nil {
1019
		return errorToStatus(err)
1020
	}
1021
	if proto.IsDir(mode) {
1022
		return statusEISDIR
1023
	}
1024

1025
	info, err = c.mw.Delete_ll(dirInfo.Inode, name, false, absPath)
1026
	if err != nil {
1027
		return errorToStatus(err)
1028
	}
1029

1030
	if info != nil {
1031
		_ = c.mw.Evict(info.Inode, absPath)
1032
		c.ic.Delete(info.Inode)
1033
	}
1034
	return 0
1035
}
1036

1037
//export cfs_rename
1038
func cfs_rename(id C.int64_t, from *C.char, to *C.char) C.int {
1039
	c, exist := getClient(int64(id))
1040
	if !exist {
1041
		return statusEINVAL
1042
	}
1043

1044
	start := time.Now()
1045
	var err error
1046

1047
	absFrom := c.absPath(C.GoString(from))
1048
	absTo := c.absPath(C.GoString(to))
1049

1050
	defer func() {
1051
		auditlog.LogClientOp("Rename", absFrom, absTo, err, time.Since(start).Microseconds(), 0, 0)
1052
	}()
1053

1054
	srcDirPath, srcName := gopath.Split(absFrom)
1055
	dstDirPath, dstName := gopath.Split(absTo)
1056

1057
	srcDirInfo, err := c.lookupPath(srcDirPath)
1058
	if err != nil {
1059
		return errorToStatus(err)
1060
	}
1061
	dstDirInfo, err := c.lookupPath(dstDirPath)
1062
	if err != nil {
1063
		return errorToStatus(err)
1064
	}
1065

1066
	err = c.mw.Rename_ll(srcDirInfo.Inode, srcName, dstDirInfo.Inode, dstName, absFrom, absTo, false)
1067
	c.ic.Delete(srcDirInfo.Inode)
1068
	c.ic.Delete(dstDirInfo.Inode)
1069
	c.dc.Delete(absFrom)
1070
	return errorToStatus(err)
1071
}
1072

1073
//export cfs_fchmod
1074
func cfs_fchmod(id C.int64_t, fd C.int, mode C.mode_t) C.int {
1075
	c, exist := getClient(int64(id))
1076
	if !exist {
1077
		return statusEINVAL
1078
	}
1079

1080
	f := c.getFile(uint(fd))
1081
	if f == nil {
1082
		return statusEBADFD
1083
	}
1084

1085
	info, err := c.mw.InodeGet_ll(f.ino)
1086
	if err != nil {
1087
		return errorToStatus(err)
1088
	}
1089

1090
	err = c.setattr(info, proto.AttrMode, uint32(mode), 0, 0, 0, 0)
1091
	if err != nil {
1092
		return errorToStatus(err)
1093
	}
1094
	c.ic.Delete(info.Inode)
1095
	return statusOK
1096
}
1097

1098
//export cfs_getsummary
1099
func cfs_getsummary(id C.int64_t, path *C.char, summary *C.struct_cfs_summary_info, useCache *C.char, goroutine_num C.int) C.int {
1100
	c, exist := getClient(int64(id))
1101
	if !exist {
1102
		return statusEINVAL
1103
	}
1104

1105
	info, err := c.lookupPath(c.absPath(C.GoString(path)))
1106
	if err != nil {
1107
		return errorToStatus(err)
1108
	}
1109

1110
	if strings.ToLower(C.GoString(useCache)) == "true" {
1111
		cacheSummaryInfo := c.sc.Get(info.Inode)
1112
		if cacheSummaryInfo != nil {
1113
			summary.files = C.int64_t(cacheSummaryInfo.Files)
1114
			summary.subdirs = C.int64_t(cacheSummaryInfo.Subdirs)
1115
			summary.fbytes = C.int64_t(cacheSummaryInfo.Fbytes)
1116
			return statusOK
1117
		}
1118
	}
1119

1120
	if !proto.IsDir(info.Mode) {
1121
		return statusENOTDIR
1122
	}
1123
	goroutineNum := int32(goroutine_num)
1124
	summaryInfo, err := c.mw.GetSummary_ll(info.Inode, goroutineNum)
1125
	if err != nil {
1126
		return errorToStatus(err)
1127
	}
1128
	if strings.ToLower(C.GoString(useCache)) != "false" {
1129
		c.sc.Put(info.Inode, &summaryInfo)
1130
	}
1131
	summary.files = C.int64_t(summaryInfo.Files)
1132
	summary.subdirs = C.int64_t(summaryInfo.Subdirs)
1133
	summary.fbytes = C.int64_t(summaryInfo.Fbytes)
1134
	return statusOK
1135
}
1136

1137
// internals
1138

1139
func (c *client) absPath(path string) string {
1140
	p := gopath.Clean(path)
1141
	if !gopath.IsAbs(p) {
1142
		p = gopath.Join(c.cwd, p)
1143
	}
1144
	return gopath.Clean(p)
1145
}
1146

1147
func (c *client) start() (err error) {
1148
	masters := strings.Split(c.masterAddr, ",")
1149
	if c.logDir != "" {
1150
		if c.logLevel == "" {
1151
			c.logLevel = "WARN"
1152
		}
1153
		level := parseLogLevel(c.logLevel)
1154
		log.InitLog(c.logDir, "libcfs", level, nil, log.DefaultLogLeftSpaceLimit)
1155
		stat.NewStatistic(c.logDir, "libcfs", int64(stat.DefaultStatLogSize), stat.DefaultTimeOutUs, true)
1156
	}
1157
	proto.InitBufferPool(int64(32768))
1158
	if c.readBlockThread == 0 {
1159
		c.readBlockThread = 10
1160
	}
1161
	if c.writeBlockThread == 0 {
1162
		c.writeBlockThread = 10
1163
	}
1164
	if err = c.loadConfFromMaster(masters); err != nil {
1165
		return
1166
	}
1167
	if err = c.checkPermission(); err != nil {
1168
		err = errors.NewErrorf("check permission failed: %v", err)
1169
		syslog.Println(err)
1170
		return
1171
	}
1172

1173
	if c.enableAudit {
1174
		_, err = auditlog.InitAudit(c.logDir, "clientSdk", int64(auditlog.DefaultAuditLogSize))
1175
		if err != nil {
1176
			log.LogWarnf("Init audit log fail: %v", err)
1177
		}
1178
	}
1179

1180
	if c.enableSummary {
1181
		c.sc = fs.NewSummaryCache(fs.DefaultSummaryExpiration, fs.MaxSummaryCache)
1182
	}
1183
	if c.enableBcache {
1184
		c.bc = bcache.NewBcacheClient()
1185
	}
1186
	var ebsc *blobstore.BlobStoreClient
1187
	if c.ebsEndpoint != "" {
1188
		if ebsc, err = blobstore.NewEbsClient(access.Config{
1189
			ConnMode: access.NoLimitConnMode,
1190
			Consul: access.ConsulConfig{
1191
				Address: c.ebsEndpoint,
1192
			},
1193
			MaxSizePutOnce: MaxSizePutOnce,
1194
			Logger: &access.Logger{
1195
				Filename: gopath.Join(c.logDir, "libcfs/ebs.log"),
1196
			},
1197
		}); err != nil {
1198
			return
1199
		}
1200
	}
1201
	var mw *meta.MetaWrapper
1202
	if mw, err = meta.NewMetaWrapper(&meta.MetaConfig{
1203
		Volume:        c.volName,
1204
		Masters:       masters,
1205
		ValidateOwner: false,
1206
		EnableSummary: c.enableSummary,
1207
	}); err != nil {
1208
		log.LogErrorf("newClient NewMetaWrapper failed(%v)", err)
1209
		return err
1210
	}
1211
	var ec *stream.ExtentClient
1212
	if ec, err = stream.NewExtentClient(&stream.ExtentConfig{
1213
		Volume:            c.volName,
1214
		VolumeType:        c.volType,
1215
		Masters:           masters,
1216
		FollowerRead:      c.followerRead,
1217
		OnAppendExtentKey: mw.AppendExtentKey,
1218
		OnSplitExtentKey:  mw.SplitExtentKey,
1219
		OnGetExtents:      mw.GetExtents,
1220
		OnTruncate:        mw.Truncate,
1221
		BcacheEnable:      c.enableBcache,
1222
		OnLoadBcache:      c.bc.Get,
1223
		OnCacheBcache:     c.bc.Put,
1224
		OnEvictBcache:     c.bc.Evict,
1225
		DisableMetaCache:  true,
1226
	}); err != nil {
1227
		log.LogErrorf("newClient NewExtentClient failed(%v)", err)
1228
		return
1229
	}
1230

1231
	c.mw = mw
1232
	c.ec = ec
1233
	c.ebsc = ebsc
1234
	return nil
1235
}
1236

1237
func (c *client) checkPermission() (err error) {
1238
	if c.accessKey == "" || c.secretKey == "" {
1239
		err = errors.New("invalid AccessKey or SecretKey")
1240
		return
1241
	}
1242

1243
	// checkPermission
1244
	mc := masterSDK.NewMasterClientFromString(c.masterAddr, false)
1245
	var userInfo *proto.UserInfo
1246
	if userInfo, err = mc.UserAPI().GetAKInfo(c.accessKey); err != nil {
1247
		return
1248
	}
1249
	if userInfo.SecretKey != c.secretKey {
1250
		err = proto.ErrNoPermission
1251
		return
1252
	}
1253
	policy := userInfo.Policy
1254
	if policy.IsOwn(c.volName) {
1255
		return
1256
	}
1257
	// read write
1258
	if policy.IsAuthorized(c.volName, c.subDir, proto.POSIXWriteAction) &&
1259
		policy.IsAuthorized(c.volName, c.subDir, proto.POSIXReadAction) {
1260
		return
1261
	}
1262
	// read only
1263
	if policy.IsAuthorized(c.volName, c.subDir, proto.POSIXReadAction) &&
1264
		!policy.IsAuthorized(c.volName, c.subDir, proto.POSIXWriteAction) {
1265
		return
1266
	}
1267
	err = proto.ErrNoPermission
1268
	return
1269
}
1270

1271
func (c *client) allocFD(ino uint64, flags, mode uint32, fileCache bool, fileSize uint64, parentInode uint64, path string) *file {
1272
	c.fdlock.Lock()
1273
	defer c.fdlock.Unlock()
1274
	fd, ok := c.fdset.NextClear(0)
1275
	if !ok || fd > maxFdNum {
1276
		return nil
1277
	}
1278
	c.fdset.Set(fd)
1279
	f := &file{fd: fd, ino: ino, flags: flags, mode: mode, pino: parentInode, path: path}
1280
	if proto.IsCold(c.volType) {
1281
		clientConf := blobstore.ClientConfig{
1282
			VolName:         c.volName,
1283
			VolType:         c.volType,
1284
			BlockSize:       c.ebsBlockSize,
1285
			Ino:             ino,
1286
			Bc:              c.bc,
1287
			Mw:              c.mw,
1288
			Ec:              c.ec,
1289
			Ebsc:            c.ebsc,
1290
			EnableBcache:    c.enableBcache,
1291
			WConcurrency:    c.writeBlockThread,
1292
			ReadConcurrency: c.readBlockThread,
1293
			CacheAction:     c.cacheAction,
1294
			FileCache:       fileCache,
1295
			FileSize:        fileSize,
1296
			CacheThreshold:  c.cacheThreshold,
1297
		}
1298
		f.fileWriter.FreeCache()
1299
		switch flags & 0xff {
1300
		case syscall.O_RDONLY:
1301
			f.fileReader = blobstore.NewReader(clientConf)
1302
			f.fileWriter = nil
1303
		case syscall.O_WRONLY:
1304
			f.fileWriter = blobstore.NewWriter(clientConf)
1305
			f.fileReader = nil
1306
		case syscall.O_RDWR:
1307
			f.fileReader = blobstore.NewReader(clientConf)
1308
			f.fileWriter = blobstore.NewWriter(clientConf)
1309
		default:
1310
			f.fileWriter = blobstore.NewWriter(clientConf)
1311
			f.fileReader = nil
1312
		}
1313
	}
1314
	c.fdmap[fd] = f
1315
	return f
1316
}
1317

1318
func (c *client) getFile(fd uint) *file {
1319
	c.fdlock.Lock()
1320
	f := c.fdmap[fd]
1321
	c.fdlock.Unlock()
1322
	return f
1323
}
1324

1325
func (c *client) releaseFD(fd uint) *file {
1326
	c.fdlock.Lock()
1327
	defer c.fdlock.Unlock()
1328
	f, ok := c.fdmap[fd]
1329
	if !ok {
1330
		return nil
1331
	}
1332
	delete(c.fdmap, fd)
1333
	c.fdset.Clear(fd)
1334
	c.ic.Delete(f.ino)
1335
	return f
1336
}
1337

1338
func (c *client) lookupPath(path string) (*proto.InodeInfo, error) {
1339
	ino, ok := c.dc.Get(gopath.Clean(path))
1340
	if !ok {
1341
		inoInterval, err := c.mw.LookupPath(gopath.Clean(path))
1342
		if err != nil {
1343
			return nil, err
1344
		}
1345
		c.dc.Put(gopath.Clean(path), inoInterval)
1346
		ino = inoInterval
1347
	}
1348
	info := c.ic.Get(ino)
1349
	if info != nil {
1350
		return info, nil
1351
	}
1352
	info, err := c.mw.InodeGet_ll(ino)
1353
	if err != nil {
1354
		return nil, err
1355
	}
1356
	c.ic.Put(info)
1357

1358
	return info, nil
1359
}
1360

1361
func (c *client) setattr(info *proto.InodeInfo, valid uint32, mode, uid, gid uint32, atime, mtime int64) error {
1362
	// Only rwx mode bit can be set
1363
	if valid&proto.AttrMode != 0 {
1364
		fuseMode := mode & uint32(0o777)
1365
		mode = info.Mode &^ uint32(0o777) // clear rwx mode bit
1366
		mode |= fuseMode
1367
	}
1368
	return c.mw.Setattr(info.Inode, valid, mode, uid, gid, atime, mtime)
1369
}
1370

1371
func (c *client) create(pino uint64, name string, mode uint32, fullPath string) (info *proto.InodeInfo, err error) {
1372
	fuseMode := mode & 0o777
1373
	return c.mw.Create_ll(pino, name, fuseMode, 0, 0, nil, fullPath)
1374
}
1375

1376
func (c *client) mkdir(pino uint64, name string, mode uint32, fullPath string) (info *proto.InodeInfo, err error) {
1377
	fuseMode := mode & 0o777
1378
	fuseMode |= uint32(os.ModeDir)
1379
	return c.mw.Create_ll(pino, name, fuseMode, 0, 0, nil, fullPath)
1380
}
1381

1382
func (c *client) openStream(f *file) {
1383
	_ = c.ec.OpenStream(f.ino)
1384
}
1385

1386
func (c *client) closeStream(f *file) {
1387
	_ = c.ec.CloseStream(f.ino)
1388
	_ = c.ec.EvictStream(f.ino)
1389
	f.fileWriter.FreeCache()
1390
	f.fileWriter = nil
1391
	f.fileReader = nil
1392
}
1393

1394
func (c *client) flush(f *file) error {
1395
	if proto.IsHot(c.volType) {
1396
		return c.ec.Flush(f.ino)
1397
	} else {
1398
		if f.fileWriter != nil {
1399
			return f.fileWriter.Flush(f.ino, c.ctx(c.id, f.ino))
1400
		}
1401
	}
1402
	return nil
1403
}
1404

1405
func (c *client) truncate(f *file, size int) error {
1406
	err := c.ec.Truncate(c.mw, f.pino, f.ino, size, f.path)
1407
	if err != nil {
1408
		return err
1409
	}
1410
	return nil
1411
}
1412

1413
func (c *client) write(f *file, offset int, data []byte, flags int) (n int, err error) {
1414
	if proto.IsHot(c.volType) {
1415
		c.ec.GetStreamer(f.ino).SetParentInode(f.pino) // set the parent inode
1416
		checkFunc := func() error {
1417
			if !c.mw.EnableQuota {
1418
				return nil
1419
			}
1420

1421
			if ok := c.ec.UidIsLimited(0); ok {
1422
				return syscall.ENOSPC
1423
			}
1424

1425
			if c.mw.IsQuotaLimitedById(f.ino, true, false) {
1426
				return syscall.ENOSPC
1427
			}
1428
			return nil
1429
		}
1430
		n, err = c.ec.Write(f.ino, offset, data, flags, checkFunc)
1431
	} else {
1432
		n, err = f.fileWriter.Write(c.ctx(c.id, f.ino), offset, data, flags)
1433
	}
1434
	if err != nil {
1435
		return 0, err
1436
	}
1437
	return n, nil
1438
}
1439

1440
func (c *client) read(f *file, offset int, data []byte) (n int, err error) {
1441
	if proto.IsHot(c.volType) {
1442
		n, err = c.ec.Read(f.ino, data, offset, len(data))
1443
	} else {
1444
		n, err = f.fileReader.Read(c.ctx(c.id, f.ino), data, offset, len(data))
1445
	}
1446
	if err != nil && err != io.EOF {
1447
		return 0, err
1448
	}
1449
	return n, nil
1450
}
1451

1452
func (c *client) ctx(cid int64, ino uint64) context.Context {
1453
	_, ctx := trace.StartSpanFromContextWithTraceID(context.Background(), "", fmt.Sprintf("cid=%v,ino=%v", cid, ino))
1454
	return ctx
1455
}
1456

1457
func (c *client) loadConfFromMaster(masters []string) (err error) {
1458
	mc := masterSDK.NewMasterClient(masters, false)
1459
	var volumeInfo *proto.SimpleVolView
1460
	volumeInfo, err = mc.AdminAPI().GetVolumeSimpleInfo(c.volName)
1461
	if err != nil {
1462
		return
1463
	}
1464
	c.volType = volumeInfo.VolType
1465
	c.ebsBlockSize = volumeInfo.ObjBlockSize
1466
	c.cacheAction = volumeInfo.CacheAction
1467
	c.cacheRuleKey = volumeInfo.CacheRule
1468
	c.cacheThreshold = volumeInfo.CacheThreshold
1469

1470
	var clusterInfo *proto.ClusterInfo
1471
	clusterInfo, err = mc.AdminAPI().GetClusterInfo()
1472
	if err != nil {
1473
		return
1474
	}
1475
	c.ebsEndpoint = clusterInfo.EbsAddr
1476
	c.servicePath = clusterInfo.ServicePath
1477
	c.cluster = clusterInfo.Cluster
1478
	c.dirChildrenNumLimit = clusterInfo.DirChildrenNumLimit
1479
	buf.InitCachePool(c.ebsBlockSize)
1480
	return
1481
}
1482

1483
func parseLogLevel(loglvl string) log.Level {
1484
	var level log.Level
1485
	switch strings.ToLower(loglvl) {
1486
	case "debug":
1487
		level = log.DebugLevel
1488
	case "info":
1489
		level = log.InfoLevel
1490
	case "warn":
1491
		level = log.WarnLevel
1492
	case "error":
1493
		level = log.ErrorLevel
1494
	default:
1495
		level = log.ErrorLevel
1496
	}
1497
	return level
1498
}
1499

1500
func (c *client) fileSize(ino uint64) (size int, gen uint64) {
1501
	size, gen, valid := c.ec.FileSize(ino)
1502
	log.LogDebugf("fileSize: ino(%v) fileSize(%v) gen(%v) valid(%v)", ino, size, gen, valid)
1503

1504
	if !valid {
1505
		info := c.ic.Get(ino)
1506
		if info != nil {
1507
			return int(info.Size), info.Generation
1508
		}
1509
		if info, err := c.mw.InodeGet_ll(ino); err == nil {
1510
			size = int(info.Size)
1511
			gen = info.Generation
1512
		}
1513
	}
1514
	return
1515
}
1516

1517
func main() {
1518
	// do nothing
1519
}
1520

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.