cubefs

Форк
0
/
partition_op_extend.go 
229 строк · 6.7 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package metanode
16

17
import (
18
	"encoding/json"
19
	"strconv"
20
	"strings"
21

22
	"github.com/cubefs/cubefs/proto"
23
)
24

25
func (mp *metaPartition) UpdateXAttr(req *proto.UpdateXAttrRequest, p *Packet) (err error) {
26
	newValueList := strings.Split(req.Value, ",")
27
	filesInc, _ := strconv.ParseInt(newValueList[0], 10, 64)
28
	dirsInc, _ := strconv.ParseInt(newValueList[1], 10, 64)
29
	bytesInc, _ := strconv.ParseInt(newValueList[2], 10, 64)
30

31
	mp.xattrLock.Lock()
32
	defer mp.xattrLock.Unlock()
33
	treeItem := mp.extendTree.Get(NewExtend(req.Inode))
34
	if treeItem != nil {
35
		extend := treeItem.(*Extend)
36
		if value, exist := extend.Get([]byte(req.Key)); exist {
37
			oldValueList := strings.Split(string(value), ",")
38
			oldFiles, _ := strconv.ParseInt(oldValueList[0], 10, 64)
39
			oldDirs, _ := strconv.ParseInt(oldValueList[1], 10, 64)
40
			oldBytes, _ := strconv.ParseInt(oldValueList[2], 10, 64)
41
			newFiles := oldFiles + filesInc
42
			newDirs := oldDirs + dirsInc
43
			newBytes := oldBytes + bytesInc
44
			newValue := strconv.FormatInt(int64(newFiles), 10) + "," +
45
				strconv.FormatInt(int64(newDirs), 10) + "," +
46
				strconv.FormatInt(int64(newBytes), 10)
47
			extend := NewExtend(req.Inode)
48
			extend.Put([]byte(req.Key), []byte(newValue), mp.verSeq)
49
			if _, err = mp.putExtend(opFSMUpdateXAttr, extend); err != nil {
50
				p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
51
				return
52
			}
53
			p.PacketOkReply()
54
			return
55
		} else {
56
			extend.Put([]byte(req.Key), []byte(req.Value), mp.verSeq)
57
			if _, err = mp.putExtend(opFSMUpdateXAttr, extend); err != nil {
58
				p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
59
				return
60
			}
61
			p.PacketOkReply()
62
			return
63
		}
64
	} else {
65
		extend := NewExtend(req.Inode)
66
		extend.Put([]byte(req.Key), []byte(req.Value), mp.verSeq)
67
		if _, err = mp.putExtend(opFSMUpdateXAttr, extend); err != nil {
68
			p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
69
			return
70
		}
71
		p.PacketOkReply()
72
		return
73
	}
74
}
75

76
func (mp *metaPartition) SetXAttr(req *proto.SetXAttrRequest, p *Packet) (err error) {
77
	extend := NewExtend(req.Inode)
78
	extend.Put([]byte(req.Key), []byte(req.Value), mp.verSeq)
79
	if _, err = mp.putExtend(opFSMSetXAttr, extend); err != nil {
80
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
81
		return
82
	}
83
	p.PacketOkReply()
84
	return
85
}
86

87
func (mp *metaPartition) BatchSetXAttr(req *proto.BatchSetXAttrRequest, p *Packet) (err error) {
88
	extend := NewExtend(req.Inode)
89
	for key, val := range req.Attrs {
90
		extend.Put([]byte(key), []byte(val), mp.verSeq)
91
	}
92

93
	if _, err = mp.putExtend(opFSMSetXAttr, extend); err != nil {
94
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
95
		return
96
	}
97
	p.PacketOkReply()
98
	return
99
}
100

101
func (mp *metaPartition) GetXAttr(req *proto.GetXAttrRequest, p *Packet) (err error) {
102
	response := &proto.GetXAttrResponse{
103
		VolName:     req.VolName,
104
		PartitionId: req.PartitionId,
105
		Inode:       req.Inode,
106
		Key:         req.Key,
107
	}
108
	treeItem := mp.extendTree.Get(NewExtend(req.Inode))
109
	if treeItem != nil {
110
		if extend := treeItem.(*Extend).GetExtentByVersion(req.VerSeq); extend != nil {
111
			if value, exist := extend.Get([]byte(req.Key)); exist {
112
				response.Value = string(value)
113
			}
114
		}
115
	}
116
	var encoded []byte
117
	encoded, err = json.Marshal(response)
118
	if err != nil {
119
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
120
		return
121
	}
122
	p.PacketOkWithBody(encoded)
123
	return
124
}
125

126
func (mp *metaPartition) GetAllXAttr(req *proto.GetAllXAttrRequest, p *Packet) (err error) {
127
	response := &proto.GetAllXAttrResponse{
128
		VolName:     req.VolName,
129
		PartitionId: req.PartitionId,
130
		Inode:       req.Inode,
131
		Attrs:       make(map[string]string),
132
	}
133
	treeItem := mp.extendTree.Get(NewExtend(req.Inode))
134
	if treeItem != nil {
135
		if extend := treeItem.(*Extend).GetExtentByVersion(req.VerSeq); extend != nil {
136
			for key, val := range extend.dataMap {
137
				response.Attrs[key] = string(val)
138
			}
139
		}
140
	}
141
	var encoded []byte
142
	encoded, err = json.Marshal(response)
143
	if err != nil {
144
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
145
		return
146
	}
147
	p.PacketOkWithBody(encoded)
148
	return
149
}
150

151
func (mp *metaPartition) BatchGetXAttr(req *proto.BatchGetXAttrRequest, p *Packet) (err error) {
152
	response := &proto.BatchGetXAttrResponse{
153
		VolName:     req.VolName,
154
		PartitionId: req.PartitionId,
155
		XAttrs:      make([]*proto.XAttrInfo, 0, len(req.Inodes)),
156
	}
157
	for _, inode := range req.Inodes {
158
		treeItem := mp.extendTree.Get(NewExtend(inode))
159
		if treeItem != nil {
160
			info := &proto.XAttrInfo{
161
				Inode:  inode,
162
				XAttrs: make(map[string]string),
163
			}
164

165
			var extend *Extend
166
			if extend = treeItem.(*Extend).GetExtentByVersion(req.VerSeq); extend != nil {
167
				for _, key := range req.Keys {
168
					if val, exist := extend.Get([]byte(key)); exist {
169
						info.XAttrs[key] = string(val)
170
					}
171
				}
172
			}
173
			response.XAttrs = append(response.XAttrs, info)
174
		}
175
	}
176
	var encoded []byte
177
	if encoded, err = json.Marshal(response); err != nil {
178
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
179
		return
180
	}
181
	p.PacketOkWithBody(encoded)
182
	return
183
}
184

185
func (mp *metaPartition) RemoveXAttr(req *proto.RemoveXAttrRequest, p *Packet) (err error) {
186
	extend := NewExtend(req.Inode)
187
	extend.Put([]byte(req.Key), nil, req.VerSeq)
188
	if _, err = mp.putExtend(opFSMRemoveXAttr, extend); err != nil {
189
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
190
		return
191
	}
192
	p.PacketOkReply()
193
	return
194
}
195

196
func (mp *metaPartition) ListXAttr(req *proto.ListXAttrRequest, p *Packet) (err error) {
197
	response := &proto.ListXAttrResponse{
198
		VolName:     req.VolName,
199
		PartitionId: req.PartitionId,
200
		Inode:       req.Inode,
201
		XAttrs:      make([]string, 0),
202
	}
203
	treeItem := mp.extendTree.Get(NewExtend(req.Inode))
204
	if treeItem != nil {
205
		if extend := treeItem.(*Extend).GetExtentByVersion(req.VerSeq); extend != nil {
206
			extend.Range(func(key, value []byte) bool {
207
				response.XAttrs = append(response.XAttrs, string(key))
208
				return true
209
			})
210
		}
211
	}
212
	var encoded []byte
213
	encoded, err = json.Marshal(response)
214
	if err != nil {
215
		p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
216
		return
217
	}
218
	p.PacketOkWithBody(encoded)
219
	return
220
}
221

222
func (mp *metaPartition) putExtend(op uint32, extend *Extend) (resp interface{}, err error) {
223
	var marshaled []byte
224
	if marshaled, err = extend.Bytes(); err != nil {
225
		return
226
	}
227
	resp, err = mp.submit(op, marshaled)
228
	return
229
}
230

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.