cubefs

Форк
0
/
fdstore.go 
390 строк · 9.0 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package main
16

17
import (
18
	"flag"
19
	"fmt"
20
	"io"
21
	"net"
22
	"net/http"
23
	"os"
24
	"strings"
25
	"unsafe"
26

27
	"github.com/cubefs/cubefs/depends/bazil.org/fuse/fs"
28
	"github.com/cubefs/cubefs/proto"
29
	"github.com/cubefs/cubefs/util"
30
)
31

32
var (
33
	optFuseHttpPort = flag.String("p", "", "FUSE client http server port")
34
	optDynamicUDS   = flag.Bool("n", false, "use dynamic UDS file name")
35
	optSuspend      = flag.Bool("s", false, "suspend fuse")
36
	optResume       = flag.Bool("r", false, "resume fuse")
37
	optDump         = flag.String("d", "", "dump nodes/handles list files, <nodes list>,<handles list>")
38
	optVersion      = flag.Bool("v", false, "show version")
39
)
40

41
const (
42
	DefaultUDS string = "/tmp/CubeFS-fdstore.sock"
43
	DefaultIP  string = "127.0.0.1"
44
)
45

46
func createUDS() (listener net.Listener, err error) {
47
	var sockAddr string
48
	var addr *net.UnixAddr
49

50
	if *optDynamicUDS {
51
		sockAddr = fmt.Sprintf("/tmp/CubeFS-fdstore-%v.sock", os.Getpid())
52
	} else {
53
		sockAddr = DefaultUDS
54
	}
55
	fmt.Printf("sockaddr: %s\n", sockAddr)
56

57
	if addr, err = net.ResolveUnixAddr("unix", sockAddr); err != nil {
58
		fmt.Printf("cannot resolve unix addr: %v\n", err)
59
		return
60
	}
61

62
	if listener, err = net.ListenUnix("unix", addr); err != nil {
63
		fmt.Printf("cannot create unix domain: %v\n", err)
64
		return
65
	}
66

67
	if err = os.Chmod(sockAddr, 0o666); err != nil {
68
		fmt.Printf("failed to chmod socket file: %v\n", err)
69
		listener.Close()
70
		return
71
	}
72

73
	return
74
}
75

76
func destroyUDS(listener net.Listener) {
77
	sockAddr := listener.Addr().String()
78
	listener.Close()
79
	os.Remove(sockAddr)
80
}
81

82
func RecvFuseFdFromOldClient(udsListener net.Listener) (file *os.File, err error) {
83
	var conn net.Conn
84
	var socket *os.File
85

86
	if conn, err = udsListener.Accept(); err != nil {
87
		fmt.Fprintf(os.Stderr, "unix domain accepts fail: %v\n", err)
88
		return
89
	}
90
	defer conn.Close()
91

92
	fmt.Printf("a new connection accepted\n")
93
	unixconn := conn.(*net.UnixConn)
94
	if socket, err = unixconn.File(); err != nil {
95
		fmt.Fprintf(os.Stderr, "failed to get socket file: %v\n", err)
96
		return
97
	}
98
	defer socket.Close()
99

100
	if file, err = util.RecvFd(socket); err != nil {
101
		fmt.Fprintf(os.Stderr, "failed to receive fd: %v\n", err)
102
		return
103
	}
104

105
	fmt.Printf("Received file %s fd %v\n", file.Name(), file.Fd())
106
	return file, nil
107
}
108

109
func SendFuseFdToNewClient(udsListener net.Listener, file *os.File) (err error) {
110
	var conn net.Conn
111
	var socket *os.File
112

113
	if conn, err = udsListener.Accept(); err != nil {
114
		fmt.Fprintf(os.Stderr, "unix domain accepts fail: %v\n", err)
115
		return
116
	}
117
	defer conn.Close()
118

119
	fmt.Printf("a new connection accepted\n")
120
	unixconn := conn.(*net.UnixConn)
121
	if socket, err = unixconn.File(); err != nil {
122
		fmt.Fprintf(os.Stderr, "failed to get socket file: %v\n", err)
123
		return
124
	}
125
	defer socket.Close()
126

127
	if file == nil {
128
		err = fmt.Errorf("no file is received")
129
		fmt.Fprintf(os.Stderr, err.Error())
130
		return
131
	} else {
132
		if err = util.SendFd(socket, file.Name(), file.Fd()); err != nil {
133
			fmt.Fprintf(os.Stderr, "failed to send fd %v: %v\n", file.Fd(), err)
134
			return
135
		}
136
	}
137

138
	fmt.Printf("Sent file %s fd %v\n", file.Name(), file.Fd())
139
	return nil
140
}
141

142
func SendSuspendRequest(port string, udsListener net.Listener) (err error) {
143
	var (
144
		req  *http.Request
145
		resp *http.Response
146
		data []byte
147
	)
148
	udsFilePath := udsListener.Addr().String()
149

150
	url := fmt.Sprintf("http://%s:%s/suspend?sock=%s", DefaultIP, port, udsFilePath)
151
	if req, err = http.NewRequest("POST", url, nil); err != nil {
152
		fmt.Fprintf(os.Stderr, "Failed to get new request: %v\n", err)
153
		return err
154
	}
155
	req.Header.Set("Content-Type", "application/text")
156

157
	client := http.DefaultClient
158
	if resp, err = client.Do(req); err != nil {
159
		fmt.Fprintf(os.Stderr, "Failed to post request: %v\n", err)
160
		return err
161
	}
162
	defer resp.Body.Close()
163

164
	if data, err = io.ReadAll(resp.Body); err != nil {
165
		fmt.Fprintf(os.Stderr, "Failed to read response: %v\n", err)
166
		return err
167
	}
168

169
	if resp.StatusCode == http.StatusOK {
170
		fmt.Printf("\n==> %s\n==> Could restore cfs-client now with -r option.\n\n", string(data))
171
	} else {
172
		fmt.Printf("\n==> %s\n==> Status: %s\n\n", string(data), resp.Status)
173
		return fmt.Errorf(resp.Status)
174
	}
175

176
	return nil
177
}
178

179
func WaitSuspendFinish(ch chan error) error {
180
	err := <-ch
181
	return err
182
}
183

184
func doSuspend(port string) error {
185
	var fud *os.File
186

187
	udsListener, err := createUDS()
188
	if err != nil {
189
		fmt.Fprintf(os.Stderr, "doSuspend: failed to create UDS: %v\n", err)
190
		return err
191
	}
192
	defer destroyUDS(udsListener)
193

194
	if err = SendSuspendRequest(port, udsListener); err != nil {
195
		// SendResumeRequest(port)
196
		return err
197
	}
198

199
	if fud, err = RecvFuseFdFromOldClient(udsListener); err != nil {
200
		// SendResumeRequest(port)
201
		return err
202
	}
203

204
	if err = SendFuseFdToNewClient(udsListener, fud); err != nil {
205
		// SendResumeRequest(port)
206
		return err
207
	}
208

209
	return nil
210
}
211

212
func SendResumeRequest(port string) (err error) {
213
	var (
214
		req  *http.Request
215
		resp *http.Response
216
		data []byte
217
	)
218

219
	url := fmt.Sprintf("http://%s:%s/resume", DefaultIP, port)
220
	if req, err = http.NewRequest("POST", url, nil); err != nil {
221
		fmt.Fprintf(os.Stderr, "Failed to get new request: %v\n", err)
222
		return err
223
	}
224
	req.Header.Set("Content-Type", "application/text")
225

226
	client := http.DefaultClient
227
	if resp, err = client.Do(req); err != nil {
228
		fmt.Fprintf(os.Stderr, "Failed to post request: %v\n", err)
229
		return err
230
	}
231
	defer resp.Body.Close()
232

233
	if data, err = io.ReadAll(resp.Body); err != nil {
234
		fmt.Fprintf(os.Stderr, "Failed to read response: %v\n", err)
235
		return err
236
	}
237

238
	fmt.Printf("data: %s\n", string(data))
239
	return nil
240
}
241

242
func doResume(port string) error {
243
	err := SendResumeRequest(port)
244
	return err
245
}
246

247
func doDump(filePathes string) {
248
	pathes := strings.Split(filePathes, ",")
249
	if len(pathes) != 2 {
250
		fmt.Fprintf(os.Stderr, "Invalid dump parameter '%s'\n", filePathes)
251
		return
252
	}
253

254
	nodes := make([]*fs.ContextNode, 0)
255
	handles := make([]*fs.ContextHandle, 0)
256

257
	nodeListFile, err := os.OpenFile(pathes[0], os.O_RDONLY, 0o644)
258
	if err != nil {
259
		fmt.Fprintf(os.Stderr, "failed to open nodes list file: %v\n", err)
260
		return
261
	}
262
	defer nodeListFile.Close()
263

264
	cnVer, err := fs.ReadVersion(nodeListFile)
265
	if err != nil {
266
		fmt.Fprintf(os.Stderr, "failed to read nodes version: %v\n", err)
267
		return
268
	}
269

270
	i := 0
271
	for {
272
		var (
273
			data    []byte = make([]byte, unsafe.Sizeof(fs.ContextNode{}))
274
			rsize   int
275
			skipped int
276
		)
277

278
		rsize, err = nodeListFile.Read(data)
279
		if rsize == 0 || err == io.EOF {
280
			break
281
		}
282

283
		if cnVer == fs.ContextNodeVersionV1 {
284
			cn := fs.ContextNodeFromBytes(data)
285
			for uint64(len(nodes)) < cn.NodeID {
286
				nodes = append(nodes, nil)
287
				skipped++
288
				i++
289
			}
290
			if skipped > 0 {
291
				fmt.Printf("[... skipped %d]\n", skipped)
292
			}
293

294
			fmt.Printf("[%d] snode(%s)\n", i, cn)
295
			nodes = append(nodes, cn)
296
		}
297
		i++
298
	}
299

300
	handleListFile, err := os.OpenFile(pathes[1], os.O_RDONLY, 0o644)
301
	if err != nil {
302
		fmt.Fprintf(os.Stderr, "failed to open handles list file: %v\n", err)
303
		return
304
	}
305
	defer handleListFile.Close()
306

307
	chVer, err := fs.ReadVersion(handleListFile)
308
	if err != nil {
309
		fmt.Fprintf(os.Stderr, "failed to read handles version: %v\n", err)
310
		return
311
	}
312

313
	i = 0
314
	for {
315
		var (
316
			data    []byte = make([]byte, unsafe.Sizeof(fs.ContextHandle{}))
317
			rsize   int
318
			skipped int
319
		)
320
		rsize, err = handleListFile.Read(data)
321
		if rsize == 0 || err == io.EOF {
322
			err = nil
323
			break
324
		}
325

326
		if chVer == fs.ContextHandleVersionV1 {
327
			ch := fs.ContextHandleFromBytes(data)
328

329
			for uint64(len(handles)) < ch.HandleID {
330
				handles = append(handles, nil)
331
				skipped++
332
				i++
333
			}
334
			if skipped > 0 {
335
				fmt.Printf("[... skipped %d]\n", skipped)
336
			}
337

338
			if ch.NodeID > uint64(len(nodes)) {
339
				fmt.Printf("[%d] shandle(handleid:%v nodeid:%v) [out of nodes range]\n",
340
					i, ch.HandleID, ch.NodeID)
341
			} else if nodes[ch.NodeID] == nil {
342
				fmt.Printf("[%d] shandle(handleid:%v nodeid:%v) [no associated node]\n",
343
					i, ch.HandleID, ch.NodeID)
344
			} else {
345
				fmt.Printf("[%d] shandle(handleid:%v nodeid:%v)\n", i, ch.HandleID, ch.NodeID)
346
			}
347

348
			handles = append(handles, ch)
349
		}
350
		i++
351
	}
352
}
353

354
func main() {
355
	var err error
356

357
	flag.Parse()
358

359
	if *optVersion {
360
		fmt.Printf(proto.DumpVersion("fdstore"))
361
		os.Exit(0)
362
	}
363

364
	if *optDump == "" {
365
		if *optFuseHttpPort == "" || (!*optSuspend && !*optResume) {
366
			flag.Usage()
367
			os.Exit(-1)
368
		}
369
	}
370

371
	fmt.Printf("Fuse address: %s:%s\n", DefaultIP, *optFuseHttpPort)
372

373
	if *optDump != "" {
374
		doDump(*optDump)
375
	} else if *optSuspend {
376
		fmt.Printf("Do suspend ...\n")
377
		err = doSuspend(*optFuseHttpPort)
378
	} else if *optResume {
379
		fmt.Printf("Do Resume ...\n")
380
		err = doResume(*optFuseHttpPort)
381
	}
382

383
	if err != nil {
384
		fmt.Printf("Done FAILED\n")
385
		os.Exit(-1)
386
	}
387

388
	fmt.Printf("Done Successfully\n")
389
	return
390
}
391

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.