1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
22
"github.com/cubefs/cubefs/proto"
25
func (mp *metaPartition) UpdateXAttr(req *proto.UpdateXAttrRequest, p *Packet) (err error) {
26
newValueList := strings.Split(req.Value, ",")
27
filesInc, _ := strconv.ParseInt(newValueList[0], 10, 64)
28
dirsInc, _ := strconv.ParseInt(newValueList[1], 10, 64)
29
bytesInc, _ := strconv.ParseInt(newValueList[2], 10, 64)
32
defer mp.xattrLock.Unlock()
33
treeItem := mp.extendTree.Get(NewExtend(req.Inode))
35
extend := treeItem.(*Extend)
36
if value, exist := extend.Get([]byte(req.Key)); exist {
37
oldValueList := strings.Split(string(value), ",")
38
oldFiles, _ := strconv.ParseInt(oldValueList[0], 10, 64)
39
oldDirs, _ := strconv.ParseInt(oldValueList[1], 10, 64)
40
oldBytes, _ := strconv.ParseInt(oldValueList[2], 10, 64)
41
newFiles := oldFiles + filesInc
42
newDirs := oldDirs + dirsInc
43
newBytes := oldBytes + bytesInc
44
newValue := strconv.FormatInt(int64(newFiles), 10) + "," +
45
strconv.FormatInt(int64(newDirs), 10) + "," +
46
strconv.FormatInt(int64(newBytes), 10)
47
extend := NewExtend(req.Inode)
48
extend.Put([]byte(req.Key), []byte(newValue), mp.verSeq)
49
if _, err = mp.putExtend(opFSMUpdateXAttr, extend); err != nil {
50
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
56
extend.Put([]byte(req.Key), []byte(req.Value), mp.verSeq)
57
if _, err = mp.putExtend(opFSMUpdateXAttr, extend); err != nil {
58
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
65
extend := NewExtend(req.Inode)
66
extend.Put([]byte(req.Key), []byte(req.Value), mp.verSeq)
67
if _, err = mp.putExtend(opFSMUpdateXAttr, extend); err != nil {
68
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
76
func (mp *metaPartition) SetXAttr(req *proto.SetXAttrRequest, p *Packet) (err error) {
77
extend := NewExtend(req.Inode)
78
extend.Put([]byte(req.Key), []byte(req.Value), mp.verSeq)
79
if _, err = mp.putExtend(opFSMSetXAttr, extend); err != nil {
80
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
87
func (mp *metaPartition) BatchSetXAttr(req *proto.BatchSetXAttrRequest, p *Packet) (err error) {
88
extend := NewExtend(req.Inode)
89
for key, val := range req.Attrs {
90
extend.Put([]byte(key), []byte(val), mp.verSeq)
93
if _, err = mp.putExtend(opFSMSetXAttr, extend); err != nil {
94
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
101
func (mp *metaPartition) GetXAttr(req *proto.GetXAttrRequest, p *Packet) (err error) {
102
response := &proto.GetXAttrResponse{
103
VolName: req.VolName,
104
PartitionId: req.PartitionId,
108
treeItem := mp.extendTree.Get(NewExtend(req.Inode))
110
if extend := treeItem.(*Extend).GetExtentByVersion(req.VerSeq); extend != nil {
111
if value, exist := extend.Get([]byte(req.Key)); exist {
112
response.Value = string(value)
117
encoded, err = json.Marshal(response)
119
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
122
p.PacketOkWithBody(encoded)
126
func (mp *metaPartition) GetAllXAttr(req *proto.GetAllXAttrRequest, p *Packet) (err error) {
127
response := &proto.GetAllXAttrResponse{
128
VolName: req.VolName,
129
PartitionId: req.PartitionId,
131
Attrs: make(map[string]string),
133
treeItem := mp.extendTree.Get(NewExtend(req.Inode))
135
if extend := treeItem.(*Extend).GetExtentByVersion(req.VerSeq); extend != nil {
136
for key, val := range extend.dataMap {
137
response.Attrs[key] = string(val)
142
encoded, err = json.Marshal(response)
144
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
147
p.PacketOkWithBody(encoded)
151
func (mp *metaPartition) BatchGetXAttr(req *proto.BatchGetXAttrRequest, p *Packet) (err error) {
152
response := &proto.BatchGetXAttrResponse{
153
VolName: req.VolName,
154
PartitionId: req.PartitionId,
155
XAttrs: make([]*proto.XAttrInfo, 0, len(req.Inodes)),
157
for _, inode := range req.Inodes {
158
treeItem := mp.extendTree.Get(NewExtend(inode))
160
info := &proto.XAttrInfo{
162
XAttrs: make(map[string]string),
166
if extend = treeItem.(*Extend).GetExtentByVersion(req.VerSeq); extend != nil {
167
for _, key := range req.Keys {
168
if val, exist := extend.Get([]byte(key)); exist {
169
info.XAttrs[key] = string(val)
173
response.XAttrs = append(response.XAttrs, info)
177
if encoded, err = json.Marshal(response); err != nil {
178
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
181
p.PacketOkWithBody(encoded)
185
func (mp *metaPartition) RemoveXAttr(req *proto.RemoveXAttrRequest, p *Packet) (err error) {
186
extend := NewExtend(req.Inode)
187
extend.Put([]byte(req.Key), nil, req.VerSeq)
188
if _, err = mp.putExtend(opFSMRemoveXAttr, extend); err != nil {
189
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
196
func (mp *metaPartition) ListXAttr(req *proto.ListXAttrRequest, p *Packet) (err error) {
197
response := &proto.ListXAttrResponse{
198
VolName: req.VolName,
199
PartitionId: req.PartitionId,
201
XAttrs: make([]string, 0),
203
treeItem := mp.extendTree.Get(NewExtend(req.Inode))
205
if extend := treeItem.(*Extend).GetExtentByVersion(req.VerSeq); extend != nil {
206
extend.Range(func(key, value []byte) bool {
207
response.XAttrs = append(response.XAttrs, string(key))
213
encoded, err = json.Marshal(response)
215
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
218
p.PacketOkWithBody(encoded)
222
func (mp *metaPartition) putExtend(op uint32, extend *Extend) (resp interface{}, err error) {
224
if marshaled, err = extend.Bytes(); err != nil {
227
resp, err = mp.submit(op, marshaled)