cubefs

Форк
0
/
server.go 
217 строк · 4.7 Кб
1
// Copyright 2018 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package metanode
16

17
import (
18
	"fmt"
19
	"io"
20
	"net"
21

22
	"github.com/xtaci/smux"
23

24
	"github.com/cubefs/cubefs/proto"
25
	"github.com/cubefs/cubefs/util"
26
	"github.com/cubefs/cubefs/util/log"
27
)
28

29
// StartTcpService binds and listens to the specified port.
30
func (m *MetaNode) startServer() (err error) {
31
	// initialize and start the server.
32
	m.httpStopC = make(chan uint8)
33

34
	addr := fmt.Sprintf(":%s", m.listen)
35
	if m.bindIp {
36
		addr = fmt.Sprintf("%s:%s", m.localAddr, m.listen)
37
	}
38

39
	ln, err := net.Listen("tcp", addr)
40
	if err != nil {
41
		return
42
	}
43
	go func(stopC chan uint8) {
44
		defer ln.Close()
45
		for {
46
			conn, err := ln.Accept()
47
			select {
48
			case <-stopC:
49
				return
50
			default:
51
			}
52
			if err != nil {
53
				continue
54
			}
55
			go m.serveConn(conn, stopC)
56
		}
57
	}(m.httpStopC)
58
	log.LogInfof("start server over...")
59
	return
60
}
61

62
func (m *MetaNode) stopServer() {
63
	if m.httpStopC != nil {
64
		defer func() {
65
			if r := recover(); r != nil {
66
				log.LogErrorf("action[StopTcpServer],err:%v", r)
67
			}
68
		}()
69
		close(m.httpStopC)
70
	}
71
}
72

73
// Read data from the specified tcp connection until the connection is closed by the remote or the tcp service is down.
74
func (m *MetaNode) serveConn(conn net.Conn, stopC chan uint8) {
75
	defer func() {
76
		conn.Close()
77
		m.RemoveConnection()
78
	}()
79
	m.AddConnection()
80
	c := conn.(*net.TCPConn)
81
	c.SetKeepAlive(true)
82
	c.SetNoDelay(true)
83
	remoteAddr := conn.RemoteAddr().String()
84
	for {
85
		select {
86
		case <-stopC:
87
			return
88
		default:
89
		}
90
		p := &Packet{}
91
		if err := p.ReadFromConnWithVer(conn, proto.NoReadDeadlineTime); err != nil {
92
			if err != io.EOF {
93
				log.LogError("serve MetaNode: ", err.Error())
94
			}
95
			return
96
		}
97
		if err := m.handlePacket(conn, p, remoteAddr); err != nil {
98
			log.LogErrorf("serve handlePacket fail: %v", err)
99
		}
100
	}
101
}
102

103
func (m *MetaNode) handlePacket(conn net.Conn, p *Packet,
104
	remoteAddr string) (err error) {
105
	// Handle request
106
	err = m.metadataManager.HandleMetadataOperation(conn, p, remoteAddr)
107
	return
108
}
109

110
func (m *MetaNode) startSmuxServer() (err error) {
111
	// initialize and start the server.
112
	m.smuxStopC = make(chan uint8)
113

114
	ipPort := fmt.Sprintf(":%s", m.listen)
115
	if m.bindIp {
116
		ipPort = fmt.Sprintf("%s:%s", m.localAddr, m.listen)
117
	}
118
	addr := util.ShiftAddrPort(ipPort, smuxPortShift)
119
	ln, err := net.Listen("tcp", addr)
120
	if err != nil {
121
		return
122
	}
123
	go func(stopC chan uint8) {
124
		defer ln.Close()
125
		for {
126
			conn, err := ln.Accept()
127
			select {
128
			case <-stopC:
129
				return
130
			default:
131
			}
132
			if err != nil {
133
				continue
134
			}
135
			go m.serveSmuxConn(conn, stopC)
136
		}
137
	}(m.smuxStopC)
138
	log.LogInfof("start Smux Server over...")
139
	return
140
}
141

142
func (m *MetaNode) stopSmuxServer() {
143
	if smuxPool != nil {
144
		smuxPool.Close()
145
		log.LogDebugf("action[stopSmuxServer] stop smux conn pool")
146
	}
147

148
	if m.smuxStopC != nil {
149
		defer func() {
150
			if r := recover(); r != nil {
151
				log.LogErrorf("action[stopSmuxServer],err:%v", r)
152
			}
153
		}()
154
		close(m.smuxStopC)
155
	}
156
}
157

158
func (m *MetaNode) serveSmuxConn(conn net.Conn, stopC chan uint8) {
159
	defer func() {
160
		conn.Close()
161
		m.RemoveConnection()
162
	}()
163
	m.AddConnection()
164
	c := conn.(*net.TCPConn)
165
	c.SetKeepAlive(true)
166
	c.SetNoDelay(true)
167
	remoteAddr := conn.RemoteAddr().String()
168

169
	var sess *smux.Session
170
	var err error
171
	sess, err = smux.Server(conn, smuxPoolCfg.Config)
172
	if err != nil {
173
		log.LogErrorf("action[serveSmuxConn] failed to serve smux connection, err(%v)", err)
174
		return
175
	}
176
	defer sess.Close()
177

178
	for {
179
		select {
180
		case <-stopC:
181
			return
182
		default:
183
		}
184

185
		stream, err := sess.AcceptStream()
186
		if err != nil {
187
			if util.FilterSmuxAcceptError(err) != nil {
188
				log.LogErrorf("action[startSmuxService] failed to accept, err: %s", err)
189
			} else {
190
				log.LogInfof("action[startSmuxService] accept done, err: %s", err)
191
			}
192
			break
193
		}
194
		go m.serveSmuxStream(stream, remoteAddr, stopC)
195
	}
196
}
197

198
func (m *MetaNode) serveSmuxStream(stream *smux.Stream, remoteAddr string, stopC chan uint8) {
199
	for {
200
		select {
201
		case <-stopC:
202
			return
203
		default:
204
		}
205

206
		p := &Packet{}
207
		if err := p.ReadFromConnWithVer(stream, proto.NoReadDeadlineTime); err != nil {
208
			if err != io.EOF {
209
				log.LogError("serve MetaNode: ", err.Error())
210
			}
211
			return
212
		}
213
		if err := m.handlePacket(stream, p, remoteAddr); err != nil {
214
			log.LogErrorf("serve handlePacket fail: %v", err)
215
		}
216
	}
217
}
218

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.