1
// Copyright 2020 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
44
struct cfs_summary_info {
57
struct cfs_hdfs_stat_info {
66
struct cfs_dirent_info {
67
struct cfs_hdfs_stat_info stat;
93
"github.com/bits-and-blooms/bitset"
94
"github.com/cubefs/cubefs/blobstore/api/access"
95
"github.com/cubefs/cubefs/blobstore/common/trace"
96
"github.com/cubefs/cubefs/blockcache/bcache"
97
"github.com/cubefs/cubefs/client/fs"
98
"github.com/cubefs/cubefs/proto"
99
"github.com/cubefs/cubefs/sdk/data/blobstore"
100
"github.com/cubefs/cubefs/sdk/data/stream"
101
masterSDK "github.com/cubefs/cubefs/sdk/master"
102
"github.com/cubefs/cubefs/sdk/meta"
103
"github.com/cubefs/cubefs/util/auditlog"
104
"github.com/cubefs/cubefs/util/buf"
105
"github.com/cubefs/cubefs/util/errors"
106
"github.com/cubefs/cubefs/util/log"
107
"github.com/cubefs/cubefs/util/stat"
111
defaultBlkSize = uint32(1) << 12
113
maxFdNum uint = 10240000
115
MaxSizePutOnce = int64(1) << 23
118
var gClientManager *clientManager
122
// error status must be minus value
123
statusEIO = errorToStatus(syscall.EIO)
124
statusEINVAL = errorToStatus(syscall.EINVAL)
125
statusEEXIST = errorToStatus(syscall.EEXIST)
126
statusEBADFD = errorToStatus(syscall.EBADFD)
127
statusEACCES = errorToStatus(syscall.EACCES)
128
statusEMFILE = errorToStatus(syscall.EMFILE)
129
statusENOTDIR = errorToStatus(syscall.ENOTDIR)
130
statusEISDIR = errorToStatus(syscall.EISDIR)
131
statusENOSPC = errorToStatus(syscall.ENOSPC)
136
gClientManager = &clientManager{
137
clients: make(map[int64]*client),
141
func errorToStatus(err error) C.int {
145
if errno, is := err.(syscall.Errno); is {
148
return -C.int(syscall.EIO)
151
type clientManager struct {
153
clients map[int64]*client
157
type pushConfig struct {
158
PushAddr string `json:"pushAddr"`
161
func newClient() *client {
162
id := atomic.AddInt64(&gClientManager.nextClientID, 1)
165
fdmap: make(map[uint]*file),
166
fdset: bitset.New(maxFdNum),
167
dirChildrenNumLimit: proto.DefaultDirChildrenNumLimit,
169
sc: fs.NewSummaryCache(fs.DefaultSummaryExpiration, fs.MaxSummaryCache),
170
ic: fs.NewInodeCache(fs.DefaultInodeExpiration, fs.MaxInodeCache),
171
dc: fs.NewDentryCache(),
174
gClientManager.mu.Lock()
175
gClientManager.clients[id] = c
176
gClientManager.mu.Unlock()
181
func getClient(id int64) (c *client, exist bool) {
182
gClientManager.mu.RLock()
183
defer gClientManager.mu.RUnlock()
184
c, exist = gClientManager.clients[id]
188
func removeClient(id int64) {
189
gClientManager.mu.Lock()
190
defer gClientManager.mu.Unlock()
191
delete(gClientManager.clients, id)
205
fileWriter *blobstore.Writer
206
fileReader *blobstore.Reader
211
type dirStream struct {
213
dirents []proto.Dentry
217
// client id allocated by libsdk
242
dirChildrenNumLimit uint32
246
cwd string // current working directory
253
ec *stream.ExtentClient
256
bc *bcache.BcacheClient
257
ebsc *blobstore.BlobStoreClient
261
//export cfs_new_client
262
func cfs_new_client() C.int64_t {
264
// Just skip fd 0, 1, 2, to avoid confusion.
265
c.fdset.Set(0).Set(1).Set(2)
266
return C.int64_t(c.id)
269
//export cfs_set_client
270
func cfs_set_client(id C.int64_t, key, val *C.char) C.int {
271
c, exist := getClient(int64(id))
285
c.followerRead = true
287
c.followerRead = false
295
c.enableBcache = true
297
c.enableBcache = false
299
case "readBlockThread":
300
rt, err := strconv.Atoi(v)
302
c.readBlockThread = rt
304
case "writeBlockThread":
305
wt, err := strconv.Atoi(v)
307
c.writeBlockThread = wt
309
case "enableSummary":
311
c.enableSummary = true
313
c.enableSummary = false
325
c.enableAudit = false
333
//export cfs_start_client
334
func cfs_start_client(id C.int64_t) C.int {
335
c, exist := getClient(int64(id))
348
//export cfs_close_client
349
func cfs_close_client(id C.int64_t) {
350
if c, exist := getClient(int64(id)); exist {
357
removeClient(int64(id))
364
func cfs_chdir(id C.int64_t, path *C.char) C.int {
365
c, exist := getClient(int64(id))
369
cwd := c.absPath(C.GoString(path))
370
dirInfo, err := c.lookupPath(cwd)
372
return errorToStatus(err)
374
if !proto.IsDir(dirInfo.Mode) {
382
func cfs_getcwd(id C.int64_t) *C.char {
383
c, exist := getClient(int64(id)) // client's working directory
387
return C.CString(c.cwd)
391
func cfs_getattr(id C.int64_t, path *C.char, stat *C.struct_cfs_stat_info) C.int {
392
c, exist := getClient(int64(id))
397
info, err := c.lookupPath(c.absPath(C.GoString(path)))
399
return errorToStatus(err)
403
stat.ino = C.uint64_t(info.Inode)
404
stat.size = C.uint64_t(info.Size)
405
stat.nlink = C.uint32_t(info.Nlink)
406
stat.blk_size = C.uint32_t(defaultBlkSize)
407
stat.uid = C.uint32_t(info.Uid)
408
stat.gid = C.uint32_t(info.Gid)
410
if info.Size%512 != 0 {
411
stat.blocks = C.uint64_t(info.Size>>9) + 1
413
stat.blocks = C.uint64_t(info.Size >> 9)
416
if proto.IsRegular(info.Mode) {
417
stat.mode = C.uint32_t(C.S_IFREG) | C.uint32_t(info.Mode&0o777)
418
} else if proto.IsDir(info.Mode) {
419
stat.mode = C.uint32_t(C.S_IFDIR) | C.uint32_t(info.Mode&0o777)
420
} else if proto.IsSymlink(info.Mode) {
421
stat.mode = C.uint32_t(C.S_IFLNK) | C.uint32_t(info.Mode&0o777)
423
stat.mode = C.uint32_t(C.S_IFSOCK) | C.uint32_t(info.Mode&0o777)
426
// fill up the time struct
427
t := info.AccessTime.UnixNano()
428
stat.atime = C.uint64_t(t / 1e9)
429
stat.atime_nsec = C.uint32_t(t % 1e9)
431
t = info.ModifyTime.UnixNano()
432
stat.mtime = C.uint64_t(t / 1e9)
433
stat.mtime_nsec = C.uint32_t(t % 1e9)
435
t = info.CreateTime.UnixNano()
436
stat.ctime = C.uint64_t(t / 1e9)
437
stat.ctime_nsec = C.uint32_t(t % 1e9)
443
func cfs_setattr(id C.int64_t, path *C.char, stat *C.struct_cfs_stat_info, valid C.int) C.int {
444
c, exist := getClient(int64(id))
449
info, err := c.lookupPath(c.absPath(C.GoString(path)))
451
return errorToStatus(err)
454
err = c.setattr(info, uint32(valid), uint32(stat.mode), uint32(stat.uid), uint32(stat.gid), int64(stat.atime), int64(stat.mtime))
457
return errorToStatus(err)
459
c.ic.Delete(info.Inode)
464
func cfs_open(id C.int64_t, path *C.char, flags C.int, mode C.mode_t) C.int {
465
c, exist := getClient(int64(id))
471
fuseMode := uint32(mode) & uint32(0o777)
472
fuseFlags := uint32(flags) &^ uint32(0x8000)
473
accFlags := fuseFlags & uint32(C.O_ACCMODE)
475
absPath := c.absPath(C.GoString(path))
477
var info *proto.InodeInfo
481
* Note that the rwx mode is ignored when using libsdk
484
if fuseFlags&uint32(C.O_CREAT) != 0 {
485
if accFlags != uint32(C.O_WRONLY) && accFlags != uint32(C.O_RDWR) {
488
dirpath, name := gopath.Split(absPath)
489
dirInfo, err := c.lookupPath(dirpath)
491
return errorToStatus(err)
493
parentIno = dirInfo.Inode
496
auditlog.LogClientOp("Create", dirpath, "nil", err, time.Since(start).Microseconds(), info.Inode, 0)
498
auditlog.LogClientOp("Create", dirpath, "nil", err, time.Since(start).Microseconds(), 0, 0)
501
newInfo, err := c.create(dirInfo.Inode, name, fuseMode, absPath)
503
if err != syscall.EEXIST {
504
return errorToStatus(err)
506
newInfo, err = c.lookupPath(absPath)
508
return errorToStatus(err)
513
dirpath, _ := gopath.Split(absPath)
514
dirInfo, err := c.lookupPath(dirpath)
516
return errorToStatus(err)
518
parentIno = dirInfo.Inode // parent inode
519
newInfo, err := c.lookupPath(absPath)
521
return errorToStatus(err)
526
if c.cacheRuleKey == "" {
529
fileCachePattern := fmt.Sprintf(".*%s.*", c.cacheRuleKey)
530
fileCache, _ = regexp.MatchString(fileCachePattern, absPath)
532
f := c.allocFD(info.Inode, fuseFlags, fuseMode, fileCache, info.Size, parentIno, absPath)
537
if proto.IsRegular(info.Mode) {
539
if fuseFlags&uint32(C.O_TRUNC) != 0 {
540
if accFlags != uint32(C.O_WRONLY) && accFlags != uint32(C.O_RDWR) {
545
if err := c.truncate(f, 0); err != nil {
557
func cfs_flush(id C.int64_t, fd C.int) C.int {
558
c, exist := getClient(int64(id))
563
f := c.getFile(uint(fd))
577
func cfs_close(id C.int64_t, fd C.int) {
578
c, exist := getClient(int64(id))
582
f := c.releaseFD(uint(fd))
590
func cfs_write(id C.int64_t, fd C.int, buf unsafe.Pointer, size C.size_t, off C.off_t) C.ssize_t {
591
c, exist := getClient(int64(id))
593
return C.ssize_t(statusEINVAL)
596
f := c.getFile(uint(fd))
598
return C.ssize_t(statusEBADFD)
601
accFlags := f.flags & uint32(C.O_ACCMODE)
602
if accFlags != uint32(C.O_WRONLY) && accFlags != uint32(C.O_RDWR) {
603
return C.ssize_t(statusEACCES)
608
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&buffer))
609
hdr.Data = uintptr(buf)
616
if f.flags&uint32(C.O_DIRECT) != 0 || f.flags&uint32(C.O_SYNC) != 0 || f.flags&uint32(C.O_DSYNC) != 0 {
617
if proto.IsHot(c.volType) {
621
if f.flags&uint32(C.O_APPEND) != 0 || proto.IsCold(c.volType) {
622
flags |= proto.FlagsAppend
623
flags |= proto.FlagsSyncWrite
626
n, err := c.write(f, int(off), buffer, flags)
628
if err == syscall.ENOSPC {
629
return C.ssize_t(statusENOSPC)
631
return C.ssize_t(statusEIO)
635
if err = c.flush(f); err != nil {
636
return C.ssize_t(statusEIO)
644
func cfs_read(id C.int64_t, fd C.int, buf unsafe.Pointer, size C.size_t, off C.off_t) C.ssize_t {
645
c, exist := getClient(int64(id))
647
return C.ssize_t(statusEINVAL)
650
f := c.getFile(uint(fd))
652
return C.ssize_t(statusEBADFD)
655
accFlags := f.flags & uint32(C.O_ACCMODE)
656
if accFlags == uint32(C.O_WRONLY) {
657
return C.ssize_t(statusEACCES)
662
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&buffer))
663
hdr.Data = uintptr(buf)
667
n, err := c.read(f, int(off), buffer)
669
return C.ssize_t(statusEIO)
675
//export cfs_batch_get_inodes
676
func cfs_batch_get_inodes(id C.int64_t, fd C.int, iids unsafe.Pointer, stats []C.struct_cfs_stat_info, count C.int) (n C.int) {
677
c, exist := getClient(int64(id))
682
f := c.getFile(uint(fd))
687
var inodeIDS []uint64
689
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&inodeIDS))
690
hdr.Data = uintptr(iids)
694
infos := c.mw.BatchInodeGet(inodeIDS)
695
if len(infos) > int(count) {
699
for i := 0; i < len(infos); i++ {
701
stats[i].ino = C.uint64_t(infos[i].Inode)
702
stats[i].size = C.uint64_t(infos[i].Size)
703
stats[i].blocks = C.uint64_t(infos[i].Size >> 9)
704
stats[i].nlink = C.uint32_t(infos[i].Nlink)
705
stats[i].blk_size = C.uint32_t(defaultBlkSize)
706
stats[i].uid = C.uint32_t(infos[i].Uid)
707
stats[i].gid = C.uint32_t(infos[i].Gid)
710
if proto.IsRegular(infos[i].Mode) {
711
stats[i].mode = C.uint32_t(C.S_IFREG) | C.uint32_t(infos[i].Mode&0o777)
712
} else if proto.IsDir(infos[i].Mode) {
713
stats[i].mode = C.uint32_t(C.S_IFDIR) | C.uint32_t(infos[i].Mode&0o777)
714
} else if proto.IsSymlink(infos[i].Mode) {
715
stats[i].mode = C.uint32_t(C.S_IFLNK) | C.uint32_t(infos[i].Mode&0o777)
717
stats[i].mode = C.uint32_t(C.S_IFSOCK) | C.uint32_t(infos[i].Mode&0o777)
720
// fill up the time struct
721
t := infos[i].AccessTime.UnixNano()
722
stats[i].atime = C.uint64_t(t / 1e9)
723
stats[i].atime_nsec = C.uint32_t(t % 1e9)
725
t = infos[i].ModifyTime.UnixNano()
726
stats[i].mtime = C.uint64_t(t / 1e9)
727
stats[i].mtime_nsec = C.uint32_t(t % 1e9)
729
t = infos[i].CreateTime.UnixNano()
730
stats[i].ctime = C.uint64_t(t / 1e9)
731
stats[i].ctime_nsec = C.uint32_t(t % 1e9)
734
n = C.int(len(infos))
738
//export cfs_refreshsummary
739
func cfs_refreshsummary(id C.int64_t, path *C.char, goroutine_num C.int) C.int {
740
c, exist := getClient(int64(id))
744
if !c.enableSummary {
747
info, err := c.lookupPath(c.absPath(C.GoString(path)))
754
goroutineNum := int32(goroutine_num)
755
err = c.mw.RefreshSummary_ll(ino, goroutineNum)
757
return errorToStatus(err)
763
* Note that readdir is not thread-safe according to the POSIX spec.
767
func cfs_readdir(id C.int64_t, fd C.int, dirents []C.struct_cfs_dirent, count C.int) (n C.int) {
768
c, exist := getClient(int64(id))
773
f := c.getFile(uint(fd))
779
f.dirp = &dirStream{}
780
dentries, err := c.mw.ReadDir_ll(f.ino)
782
return errorToStatus(err)
784
f.dirp.dirents = dentries
788
for dirp.pos < len(dirp.dirents) && n < count {
790
dirents[n].ino = C.uint64_t(dirp.dirents[dirp.pos].Inode)
793
if proto.IsRegular(dirp.dirents[dirp.pos].Type) {
794
dirents[n].d_type = C.DT_REG
795
} else if proto.IsDir(dirp.dirents[dirp.pos].Type) {
796
dirents[n].d_type = C.DT_DIR
797
} else if proto.IsSymlink(dirp.dirents[dirp.pos].Type) {
798
dirents[n].d_type = C.DT_LNK
800
dirents[n].d_type = C.DT_UNKNOWN
804
nameLen := len(dirp.dirents[dirp.pos].Name)
808
hdr := (*reflect.StringHeader)(unsafe.Pointer(&dirp.dirents[dirp.pos].Name))
809
C.memcpy(unsafe.Pointer(&dirents[n].name[0]), unsafe.Pointer(hdr.Data), C.size_t(nameLen))
810
dirents[n].name[nameLen] = 0
811
dirents[n].nameLen = C.uint32_t(nameLen)
821
func cfs_lsdir(id C.int64_t, fd C.int, direntsInfo []C.struct_cfs_dirent_info, count C.int) (n C.int) {
822
c, exist := getClient(int64(id))
827
f := c.getFile(uint(fd))
833
f.dirp = &dirStream{}
834
dentries, err := c.mw.ReadDir_ll(f.ino)
836
return errorToStatus(err)
838
f.dirp.dirents = dentries
842
inodeIDS := make([]uint64, count, count)
843
inodeMap := make(map[uint64]C.int)
844
for dirp.pos < len(dirp.dirents) && n < count {
845
inodeIDS[n] = dirp.dirents[dirp.pos].Inode
846
inodeMap[dirp.dirents[dirp.pos].Inode] = n
848
if proto.IsRegular(dirp.dirents[dirp.pos].Type) {
849
direntsInfo[n].d_type = C.DT_REG
850
} else if proto.IsDir(dirp.dirents[dirp.pos].Type) {
851
direntsInfo[n].d_type = C.DT_DIR
852
} else if proto.IsSymlink(dirp.dirents[dirp.pos].Type) {
853
direntsInfo[n].d_type = C.DT_LNK
855
direntsInfo[n].d_type = C.DT_UNKNOWN
857
nameLen := len(dirp.dirents[dirp.pos].Name)
861
hdr := (*reflect.StringHeader)(unsafe.Pointer(&dirp.dirents[dirp.pos].Name))
863
C.memcpy(unsafe.Pointer(&direntsInfo[n].name[0]), unsafe.Pointer(hdr.Data), C.size_t(nameLen))
864
direntsInfo[n].name[nameLen] = 0
865
direntsInfo[n].nameLen = C.uint32_t(nameLen)
874
infos := c.mw.BatchInodeGet(inodeIDS)
875
if len(infos) != int(n) {
878
for i := 0; i < len(infos); i++ {
880
index := inodeMap[infos[i].Inode]
881
direntsInfo[index].stat.size = C.uint64_t(infos[i].Size)
884
if proto.IsRegular(infos[i].Mode) {
885
direntsInfo[index].stat.mode = C.uint32_t(C.S_IFREG) | C.uint32_t(infos[i].Mode&0o777)
886
} else if proto.IsDir(infos[i].Mode) {
887
direntsInfo[index].stat.mode = C.uint32_t(C.S_IFDIR) | C.uint32_t(infos[i].Mode&0o777)
888
} else if proto.IsSymlink(infos[i].Mode) {
889
direntsInfo[index].stat.mode = C.uint32_t(C.S_IFLNK) | C.uint32_t(infos[i].Mode&0o777)
891
direntsInfo[index].stat.mode = C.uint32_t(C.S_IFSOCK) | C.uint32_t(infos[i].Mode&0o777)
894
// fill up the time struct
895
t := infos[index].AccessTime.UnixNano()
896
direntsInfo[index].stat.atime = C.uint64_t(t / 1e9)
897
direntsInfo[index].stat.atime_nsec = C.uint32_t(t % 1e9)
899
t = infos[index].ModifyTime.UnixNano()
900
direntsInfo[index].stat.mtime = C.uint64_t(t / 1e9)
901
direntsInfo[index].stat.mtime_nsec = C.uint32_t(t % 1e9)
907
func cfs_mkdirs(id C.int64_t, path *C.char, mode C.mode_t) C.int {
908
c, exist := getClient(int64(id))
917
dirpath := c.absPath(C.GoString(path))
924
auditlog.LogClientOp("Mkdir", dirpath, "nil", gerr, time.Since(start).Microseconds(), gino, 0)
926
auditlog.LogClientOp("Mkdir", dirpath, "nil", gerr, time.Since(start).Microseconds(), 0, 0)
930
pino := proto.RootIno
931
dirs := strings.Split(dirpath, "/")
932
for _, dir := range dirs {
933
if dir == "/" || dir == "" {
936
child, _, err := c.mw.Lookup_ll(pino, dir)
938
if err == syscall.ENOENT {
939
info, err := c.mkdir(pino, dir, uint32(mode), dirpath)
942
if err != syscall.EEXIST {
944
return errorToStatus(err)
951
return errorToStatus(err)
962
func cfs_rmdir(id C.int64_t, path *C.char) C.int {
963
c, exist := getClient(int64(id))
969
var info *proto.InodeInfo
971
absPath := c.absPath(C.GoString(path))
974
auditlog.LogClientOp("Rmdir", absPath, "nil", err, time.Since(start).Microseconds(), 0, 0)
976
auditlog.LogClientOp("Rmdir", absPath, "nil", err, time.Since(start).Microseconds(), info.Inode, 0)
979
dirpath, name := gopath.Split(absPath)
980
dirInfo, err := c.lookupPath(dirpath)
982
return errorToStatus(err)
985
info, err = c.mw.Delete_ll(dirInfo.Inode, name, true, absPath)
986
c.ic.Delete(dirInfo.Inode)
988
return errorToStatus(err)
992
func cfs_unlink(id C.int64_t, path *C.char) C.int {
993
c, exist := getClient(int64(id))
1000
var info *proto.InodeInfo
1002
absPath := c.absPath(C.GoString(path))
1003
dirpath, name := gopath.Split(absPath)
1007
auditlog.LogClientOp("Unlink", absPath, "nil", err, time.Since(start).Microseconds(), 0, 0)
1009
auditlog.LogClientOp("Unlink", absPath, "nil", err, time.Since(start).Microseconds(), info.Inode, 0)
1012
dirInfo, err := c.lookupPath(dirpath)
1014
return errorToStatus(err)
1017
_, mode, err := c.mw.Lookup_ll(dirInfo.Inode, name)
1019
return errorToStatus(err)
1021
if proto.IsDir(mode) {
1025
info, err = c.mw.Delete_ll(dirInfo.Inode, name, false, absPath)
1027
return errorToStatus(err)
1031
_ = c.mw.Evict(info.Inode, absPath)
1032
c.ic.Delete(info.Inode)
1038
func cfs_rename(id C.int64_t, from *C.char, to *C.char) C.int {
1039
c, exist := getClient(int64(id))
1047
absFrom := c.absPath(C.GoString(from))
1048
absTo := c.absPath(C.GoString(to))
1051
auditlog.LogClientOp("Rename", absFrom, absTo, err, time.Since(start).Microseconds(), 0, 0)
1054
srcDirPath, srcName := gopath.Split(absFrom)
1055
dstDirPath, dstName := gopath.Split(absTo)
1057
srcDirInfo, err := c.lookupPath(srcDirPath)
1059
return errorToStatus(err)
1061
dstDirInfo, err := c.lookupPath(dstDirPath)
1063
return errorToStatus(err)
1066
err = c.mw.Rename_ll(srcDirInfo.Inode, srcName, dstDirInfo.Inode, dstName, absFrom, absTo, false)
1067
c.ic.Delete(srcDirInfo.Inode)
1068
c.ic.Delete(dstDirInfo.Inode)
1069
c.dc.Delete(absFrom)
1070
return errorToStatus(err)
1074
func cfs_fchmod(id C.int64_t, fd C.int, mode C.mode_t) C.int {
1075
c, exist := getClient(int64(id))
1080
f := c.getFile(uint(fd))
1085
info, err := c.mw.InodeGet_ll(f.ino)
1087
return errorToStatus(err)
1090
err = c.setattr(info, proto.AttrMode, uint32(mode), 0, 0, 0, 0)
1092
return errorToStatus(err)
1094
c.ic.Delete(info.Inode)
1098
//export cfs_getsummary
1099
func cfs_getsummary(id C.int64_t, path *C.char, summary *C.struct_cfs_summary_info, useCache *C.char, goroutine_num C.int) C.int {
1100
c, exist := getClient(int64(id))
1105
info, err := c.lookupPath(c.absPath(C.GoString(path)))
1107
return errorToStatus(err)
1110
if strings.ToLower(C.GoString(useCache)) == "true" {
1111
cacheSummaryInfo := c.sc.Get(info.Inode)
1112
if cacheSummaryInfo != nil {
1113
summary.files = C.int64_t(cacheSummaryInfo.Files)
1114
summary.subdirs = C.int64_t(cacheSummaryInfo.Subdirs)
1115
summary.fbytes = C.int64_t(cacheSummaryInfo.Fbytes)
1120
if !proto.IsDir(info.Mode) {
1121
return statusENOTDIR
1123
goroutineNum := int32(goroutine_num)
1124
summaryInfo, err := c.mw.GetSummary_ll(info.Inode, goroutineNum)
1126
return errorToStatus(err)
1128
if strings.ToLower(C.GoString(useCache)) != "false" {
1129
c.sc.Put(info.Inode, &summaryInfo)
1131
summary.files = C.int64_t(summaryInfo.Files)
1132
summary.subdirs = C.int64_t(summaryInfo.Subdirs)
1133
summary.fbytes = C.int64_t(summaryInfo.Fbytes)
1139
func (c *client) absPath(path string) string {
1140
p := gopath.Clean(path)
1141
if !gopath.IsAbs(p) {
1142
p = gopath.Join(c.cwd, p)
1144
return gopath.Clean(p)
1147
func (c *client) start() (err error) {
1148
masters := strings.Split(c.masterAddr, ",")
1150
if c.logLevel == "" {
1153
level := parseLogLevel(c.logLevel)
1154
log.InitLog(c.logDir, "libcfs", level, nil, log.DefaultLogLeftSpaceLimit)
1155
stat.NewStatistic(c.logDir, "libcfs", int64(stat.DefaultStatLogSize), stat.DefaultTimeOutUs, true)
1157
proto.InitBufferPool(int64(32768))
1158
if c.readBlockThread == 0 {
1159
c.readBlockThread = 10
1161
if c.writeBlockThread == 0 {
1162
c.writeBlockThread = 10
1164
if err = c.loadConfFromMaster(masters); err != nil {
1167
if err = c.checkPermission(); err != nil {
1168
err = errors.NewErrorf("check permission failed: %v", err)
1174
_, err = auditlog.InitAudit(c.logDir, "clientSdk", int64(auditlog.DefaultAuditLogSize))
1176
log.LogWarnf("Init audit log fail: %v", err)
1180
if c.enableSummary {
1181
c.sc = fs.NewSummaryCache(fs.DefaultSummaryExpiration, fs.MaxSummaryCache)
1184
c.bc = bcache.NewBcacheClient()
1186
var ebsc *blobstore.BlobStoreClient
1187
if c.ebsEndpoint != "" {
1188
if ebsc, err = blobstore.NewEbsClient(access.Config{
1189
ConnMode: access.NoLimitConnMode,
1190
Consul: access.ConsulConfig{
1191
Address: c.ebsEndpoint,
1193
MaxSizePutOnce: MaxSizePutOnce,
1194
Logger: &access.Logger{
1195
Filename: gopath.Join(c.logDir, "libcfs/ebs.log"),
1201
var mw *meta.MetaWrapper
1202
if mw, err = meta.NewMetaWrapper(&meta.MetaConfig{
1205
ValidateOwner: false,
1206
EnableSummary: c.enableSummary,
1208
log.LogErrorf("newClient NewMetaWrapper failed(%v)", err)
1211
var ec *stream.ExtentClient
1212
if ec, err = stream.NewExtentClient(&stream.ExtentConfig{
1214
VolumeType: c.volType,
1216
FollowerRead: c.followerRead,
1217
OnAppendExtentKey: mw.AppendExtentKey,
1218
OnSplitExtentKey: mw.SplitExtentKey,
1219
OnGetExtents: mw.GetExtents,
1220
OnTruncate: mw.Truncate,
1221
BcacheEnable: c.enableBcache,
1222
OnLoadBcache: c.bc.Get,
1223
OnCacheBcache: c.bc.Put,
1224
OnEvictBcache: c.bc.Evict,
1225
DisableMetaCache: true,
1227
log.LogErrorf("newClient NewExtentClient failed(%v)", err)
1237
func (c *client) checkPermission() (err error) {
1238
if c.accessKey == "" || c.secretKey == "" {
1239
err = errors.New("invalid AccessKey or SecretKey")
1244
mc := masterSDK.NewMasterClientFromString(c.masterAddr, false)
1245
var userInfo *proto.UserInfo
1246
if userInfo, err = mc.UserAPI().GetAKInfo(c.accessKey); err != nil {
1249
if userInfo.SecretKey != c.secretKey {
1250
err = proto.ErrNoPermission
1253
policy := userInfo.Policy
1254
if policy.IsOwn(c.volName) {
1258
if policy.IsAuthorized(c.volName, c.subDir, proto.POSIXWriteAction) &&
1259
policy.IsAuthorized(c.volName, c.subDir, proto.POSIXReadAction) {
1263
if policy.IsAuthorized(c.volName, c.subDir, proto.POSIXReadAction) &&
1264
!policy.IsAuthorized(c.volName, c.subDir, proto.POSIXWriteAction) {
1267
err = proto.ErrNoPermission
1271
func (c *client) allocFD(ino uint64, flags, mode uint32, fileCache bool, fileSize uint64, parentInode uint64, path string) *file {
1273
defer c.fdlock.Unlock()
1274
fd, ok := c.fdset.NextClear(0)
1275
if !ok || fd > maxFdNum {
1279
f := &file{fd: fd, ino: ino, flags: flags, mode: mode, pino: parentInode, path: path}
1280
if proto.IsCold(c.volType) {
1281
clientConf := blobstore.ClientConfig{
1284
BlockSize: c.ebsBlockSize,
1290
EnableBcache: c.enableBcache,
1291
WConcurrency: c.writeBlockThread,
1292
ReadConcurrency: c.readBlockThread,
1293
CacheAction: c.cacheAction,
1294
FileCache: fileCache,
1296
CacheThreshold: c.cacheThreshold,
1298
f.fileWriter.FreeCache()
1299
switch flags & 0xff {
1300
case syscall.O_RDONLY:
1301
f.fileReader = blobstore.NewReader(clientConf)
1303
case syscall.O_WRONLY:
1304
f.fileWriter = blobstore.NewWriter(clientConf)
1306
case syscall.O_RDWR:
1307
f.fileReader = blobstore.NewReader(clientConf)
1308
f.fileWriter = blobstore.NewWriter(clientConf)
1310
f.fileWriter = blobstore.NewWriter(clientConf)
1318
func (c *client) getFile(fd uint) *file {
1325
func (c *client) releaseFD(fd uint) *file {
1327
defer c.fdlock.Unlock()
1328
f, ok := c.fdmap[fd]
1338
func (c *client) lookupPath(path string) (*proto.InodeInfo, error) {
1339
ino, ok := c.dc.Get(gopath.Clean(path))
1341
inoInterval, err := c.mw.LookupPath(gopath.Clean(path))
1345
c.dc.Put(gopath.Clean(path), inoInterval)
1348
info := c.ic.Get(ino)
1352
info, err := c.mw.InodeGet_ll(ino)
1361
func (c *client) setattr(info *proto.InodeInfo, valid uint32, mode, uid, gid uint32, atime, mtime int64) error {
1362
// Only rwx mode bit can be set
1363
if valid&proto.AttrMode != 0 {
1364
fuseMode := mode & uint32(0o777)
1365
mode = info.Mode &^ uint32(0o777) // clear rwx mode bit
1368
return c.mw.Setattr(info.Inode, valid, mode, uid, gid, atime, mtime)
1371
func (c *client) create(pino uint64, name string, mode uint32, fullPath string) (info *proto.InodeInfo, err error) {
1372
fuseMode := mode & 0o777
1373
return c.mw.Create_ll(pino, name, fuseMode, 0, 0, nil, fullPath)
1376
func (c *client) mkdir(pino uint64, name string, mode uint32, fullPath string) (info *proto.InodeInfo, err error) {
1377
fuseMode := mode & 0o777
1378
fuseMode |= uint32(os.ModeDir)
1379
return c.mw.Create_ll(pino, name, fuseMode, 0, 0, nil, fullPath)
1382
func (c *client) openStream(f *file) {
1383
_ = c.ec.OpenStream(f.ino)
1386
func (c *client) closeStream(f *file) {
1387
_ = c.ec.CloseStream(f.ino)
1388
_ = c.ec.EvictStream(f.ino)
1389
f.fileWriter.FreeCache()
1394
func (c *client) flush(f *file) error {
1395
if proto.IsHot(c.volType) {
1396
return c.ec.Flush(f.ino)
1398
if f.fileWriter != nil {
1399
return f.fileWriter.Flush(f.ino, c.ctx(c.id, f.ino))
1405
func (c *client) truncate(f *file, size int) error {
1406
err := c.ec.Truncate(c.mw, f.pino, f.ino, size, f.path)
1413
func (c *client) write(f *file, offset int, data []byte, flags int) (n int, err error) {
1414
if proto.IsHot(c.volType) {
1415
c.ec.GetStreamer(f.ino).SetParentInode(f.pino) // set the parent inode
1416
checkFunc := func() error {
1417
if !c.mw.EnableQuota {
1421
if ok := c.ec.UidIsLimited(0); ok {
1422
return syscall.ENOSPC
1425
if c.mw.IsQuotaLimitedById(f.ino, true, false) {
1426
return syscall.ENOSPC
1430
n, err = c.ec.Write(f.ino, offset, data, flags, checkFunc)
1432
n, err = f.fileWriter.Write(c.ctx(c.id, f.ino), offset, data, flags)
1440
func (c *client) read(f *file, offset int, data []byte) (n int, err error) {
1441
if proto.IsHot(c.volType) {
1442
n, err = c.ec.Read(f.ino, data, offset, len(data))
1444
n, err = f.fileReader.Read(c.ctx(c.id, f.ino), data, offset, len(data))
1446
if err != nil && err != io.EOF {
1452
func (c *client) ctx(cid int64, ino uint64) context.Context {
1453
_, ctx := trace.StartSpanFromContextWithTraceID(context.Background(), "", fmt.Sprintf("cid=%v,ino=%v", cid, ino))
1457
func (c *client) loadConfFromMaster(masters []string) (err error) {
1458
mc := masterSDK.NewMasterClient(masters, false)
1459
var volumeInfo *proto.SimpleVolView
1460
volumeInfo, err = mc.AdminAPI().GetVolumeSimpleInfo(c.volName)
1464
c.volType = volumeInfo.VolType
1465
c.ebsBlockSize = volumeInfo.ObjBlockSize
1466
c.cacheAction = volumeInfo.CacheAction
1467
c.cacheRuleKey = volumeInfo.CacheRule
1468
c.cacheThreshold = volumeInfo.CacheThreshold
1470
var clusterInfo *proto.ClusterInfo
1471
clusterInfo, err = mc.AdminAPI().GetClusterInfo()
1475
c.ebsEndpoint = clusterInfo.EbsAddr
1476
c.servicePath = clusterInfo.ServicePath
1477
c.cluster = clusterInfo.Cluster
1478
c.dirChildrenNumLimit = clusterInfo.DirChildrenNumLimit
1479
buf.InitCachePool(c.ebsBlockSize)
1483
func parseLogLevel(loglvl string) log.Level {
1485
switch strings.ToLower(loglvl) {
1487
level = log.DebugLevel
1489
level = log.InfoLevel
1491
level = log.WarnLevel
1493
level = log.ErrorLevel
1495
level = log.ErrorLevel
1500
func (c *client) fileSize(ino uint64) (size int, gen uint64) {
1501
size, gen, valid := c.ec.FileSize(ino)
1502
log.LogDebugf("fileSize: ino(%v) fileSize(%v) gen(%v) valid(%v)", ino, size, gen, valid)
1505
info := c.ic.Get(ino)
1507
return int(info.Size), info.Generation
1509
if info, err := c.mw.InodeGet_ll(ino); err == nil {
1510
size = int(info.Size)
1511
gen = info.Generation