1
// Copyright 2018 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
22
"github.com/xtaci/smux"
24
"github.com/cubefs/cubefs/proto"
25
"github.com/cubefs/cubefs/util"
26
"github.com/cubefs/cubefs/util/log"
29
// StartTcpService binds and listens to the specified port.
30
func (m *MetaNode) startServer() (err error) {
31
// initialize and start the server.
32
m.httpStopC = make(chan uint8)
34
addr := fmt.Sprintf(":%s", m.listen)
36
addr = fmt.Sprintf("%s:%s", m.localAddr, m.listen)
39
ln, err := net.Listen("tcp", addr)
43
go func(stopC chan uint8) {
46
conn, err := ln.Accept()
55
go m.serveConn(conn, stopC)
58
log.LogInfof("start server over...")
62
func (m *MetaNode) stopServer() {
63
if m.httpStopC != nil {
65
if r := recover(); r != nil {
66
log.LogErrorf("action[StopTcpServer],err:%v", r)
73
// Read data from the specified tcp connection until the connection is closed by the remote or the tcp service is down.
74
func (m *MetaNode) serveConn(conn net.Conn, stopC chan uint8) {
80
c := conn.(*net.TCPConn)
83
remoteAddr := conn.RemoteAddr().String()
91
if err := p.ReadFromConnWithVer(conn, proto.NoReadDeadlineTime); err != nil {
93
log.LogError("serve MetaNode: ", err.Error())
97
if err := m.handlePacket(conn, p, remoteAddr); err != nil {
98
log.LogErrorf("serve handlePacket fail: %v", err)
103
func (m *MetaNode) handlePacket(conn net.Conn, p *Packet,
104
remoteAddr string) (err error) {
106
err = m.metadataManager.HandleMetadataOperation(conn, p, remoteAddr)
110
func (m *MetaNode) startSmuxServer() (err error) {
111
// initialize and start the server.
112
m.smuxStopC = make(chan uint8)
114
ipPort := fmt.Sprintf(":%s", m.listen)
116
ipPort = fmt.Sprintf("%s:%s", m.localAddr, m.listen)
118
addr := util.ShiftAddrPort(ipPort, smuxPortShift)
119
ln, err := net.Listen("tcp", addr)
123
go func(stopC chan uint8) {
126
conn, err := ln.Accept()
135
go m.serveSmuxConn(conn, stopC)
138
log.LogInfof("start Smux Server over...")
142
func (m *MetaNode) stopSmuxServer() {
145
log.LogDebugf("action[stopSmuxServer] stop smux conn pool")
148
if m.smuxStopC != nil {
150
if r := recover(); r != nil {
151
log.LogErrorf("action[stopSmuxServer],err:%v", r)
158
func (m *MetaNode) serveSmuxConn(conn net.Conn, stopC chan uint8) {
164
c := conn.(*net.TCPConn)
167
remoteAddr := conn.RemoteAddr().String()
169
var sess *smux.Session
171
sess, err = smux.Server(conn, smuxPoolCfg.Config)
173
log.LogErrorf("action[serveSmuxConn] failed to serve smux connection, err(%v)", err)
185
stream, err := sess.AcceptStream()
187
if util.FilterSmuxAcceptError(err) != nil {
188
log.LogErrorf("action[startSmuxService] failed to accept, err: %s", err)
190
log.LogInfof("action[startSmuxService] accept done, err: %s", err)
194
go m.serveSmuxStream(stream, remoteAddr, stopC)
198
func (m *MetaNode) serveSmuxStream(stream *smux.Stream, remoteAddr string, stopC chan uint8) {
207
if err := p.ReadFromConnWithVer(stream, proto.NoReadDeadlineTime); err != nil {
209
log.LogError("serve MetaNode: ", err.Error())
213
if err := m.handlePacket(stream, p, remoteAddr); err != nil {
214
log.LogErrorf("serve handlePacket fail: %v", err)