ergo

Форк
0
/
client.go 
229 строк · 5.7 Кб
1
package cloud
2

3
import (
4
	"crypto/tls"
5
	"fmt"
6
	"net"
7
	"os"
8
	"regexp"
9
	"strconv"
10
	"strings"
11
	"time"
12

13
	"github.com/ergo-services/ergo/etf"
14
	"github.com/ergo-services/ergo/gen"
15
	"github.com/ergo-services/ergo/lib"
16
	"github.com/ergo-services/ergo/node"
17
)
18

19
type CloudNode struct {
20
	Node       string
21
	Port       uint16
22
	SkipVerify bool
23
}
24

25
type cloudClient struct {
26
	gen.Server
27
}
28

29
type cloudClientState struct {
30
	options   node.Cloud
31
	handshake node.HandshakeInterface
32
	monitor   etf.Ref
33
	node      string
34
}
35

36
type messageCloudClientConnect struct{}
37

38
func (cc *cloudClient) Init(process *gen.ServerProcess, args ...etf.Term) error {
39
	lib.Log("[%s] CLOUD_CLIENT: Init: %#v", process.NodeName(), args)
40
	if len(args) == 0 {
41
		return fmt.Errorf("no args to start cloud client")
42
	}
43

44
	cloudOptions, ok := args[0].(node.Cloud)
45
	if ok == false {
46
		return fmt.Errorf("wrong args for the cloud client")
47
	}
48

49
	handshake, err := createHandshake(cloudOptions)
50
	if err != nil {
51
		return fmt.Errorf("can not create HandshakeInterface for the cloud client: %s", err)
52
	}
53

54
	process.State = &cloudClientState{
55
		options:   cloudOptions,
56
		handshake: handshake,
57
	}
58

59
	if err := process.RegisterEvent(EventCloud, MessageEventCloud{}); err != nil {
60
		lib.Warning("can't register event %q: %s", EventCloud, err)
61
	}
62

63
	process.Cast(process.Self(), messageCloudClientConnect{})
64

65
	return nil
66
}
67

68
func (cc *cloudClient) HandleCast(process *gen.ServerProcess, message etf.Term) gen.ServerStatus {
69
	lib.Log("[%s] CLOUD_CLIENT: HandleCast: %#v", process.NodeName(), message)
70
	switch message.(type) {
71
	case messageCloudClientConnect:
72
		state := process.State.(*cloudClientState)
73

74
		// initiate connection with the cloud
75
		cloudNodes, err := getCloudNodes()
76
		if err != nil {
77
			lib.Warning("can't resolve cloud nodes: %s", err)
78
		}
79

80
		// add static route with custom handshake
81
		thisNode := process.Env(node.EnvKeyNode).(node.Node)
82

83
		for _, cloud := range cloudNodes {
84
			routeOptions := node.RouteOptions{
85
				Cookie:    state.options.Cookie,
86
				IsErgo:    true,
87
				Handshake: state.handshake,
88
			}
89
			routeOptions.TLS = &tls.Config{
90
				InsecureSkipVerify: cloud.SkipVerify,
91
			}
92
			if err := thisNode.AddStaticRoutePort(cloud.Node, cloud.Port, routeOptions); err != nil {
93
				if err != lib.ErrTaken {
94
					continue
95
				}
96
			}
97

98
			lib.Log("[%s] CLOUD_CLIENT: trying to connect with: %s", process.NodeName(), cloud.Node)
99
			if err := thisNode.Connect(cloud.Node); err != nil {
100
				lib.Log("[%s] CLOUD_CLIENT: failed with reason: ", err)
101
				continue
102
			}
103

104
			// add proxy domain route
105
			proxyRoute := node.ProxyRoute{
106
				Name:   "@" + state.options.Cluster,
107
				Proxy:  cloud.Node,
108
				Cookie: state.options.Cookie,
109
			}
110
			thisNode.AddProxyRoute(proxyRoute)
111

112
			state.monitor = process.MonitorNode(cloud.Node)
113
			state.node = cloud.Node
114
			event := MessageEventCloud{
115
				Cluster: proxyRoute.Name,
116
				Online:  true,
117
				Proxy:   cloud.Node,
118
			}
119
			if err := process.SendEventMessage(EventCloud, event); err != nil {
120
				lib.Log("[%s] CLOUD_CLIENT: failed to send event (%s) %#v: %s",
121
					process.NodeName(), EventCloud, event, err)
122
			}
123
			return gen.ServerStatusOK
124
		}
125

126
		// cloud nodes aren't available. make another attempt in 3 seconds
127
		process.CastAfter(process.Self(), messageCloudClientConnect{}, 5*time.Second)
128
	}
129
	return gen.ServerStatusOK
130
}
131

132
func (cc *cloudClient) HandleInfo(process *gen.ServerProcess, message etf.Term) gen.ServerStatus {
133
	lib.Log("[%s] CLOUD_CLIENT: HandleInfo: %#v", process.NodeName(), message)
134
	state := process.State.(*cloudClientState)
135

136
	switch m := message.(type) {
137
	case gen.MessageNodeDown:
138
		if m.Ref != state.monitor {
139
			return gen.ServerStatusOK
140
		}
141
		thisNode := process.Env(node.EnvKeyNode).(node.Node)
142
		state.cleanup(thisNode)
143

144
		event := MessageEventCloud{
145
			Online: false,
146
		}
147
		process.SendEventMessage(EventCloud, event)
148
		// lost connection with the cloud node. try to connect again
149
		process.Cast(process.Self(), messageCloudClientConnect{})
150
	}
151
	return gen.ServerStatusOK
152
}
153

154
func (cc *cloudClient) Terminate(process *gen.ServerProcess, reason string) {
155
	state := process.State.(*cloudClientState)
156
	thisNode := process.Env(node.EnvKeyNode).(node.Node)
157
	thisNode.RemoveProxyRoute("@" + state.options.Cluster)
158
	thisNode.Disconnect(state.node)
159
	state.cleanup(thisNode)
160
}
161

162
func (ccs *cloudClientState) cleanup(node node.Node) {
163
	node.RemoveStaticRoute(ccs.node)
164
	node.RemoveProxyRoute("@" + ccs.options.Cluster)
165
	ccs.node = ""
166
}
167

168
func getCloudNodes() ([]CloudNode, error) {
169
	// check if custom cloud entries have been defined via env
170
	if entries := strings.Fields(os.Getenv("ERGO_SERVICES_CLOUD")); len(entries) > 0 {
171
		nodes := []CloudNode{}
172
		for _, entry := range entries {
173
			re := regexp.MustCompile("[@:]+")
174
			nameHostPort := re.Split(entry, -1)
175
			name := "dist"
176
			host := "localhost"
177
			port := 4411
178
			switch len(nameHostPort) {
179
			case 2:
180
				// either abc@def or abc:def
181
				if p, err := strconv.Atoi(nameHostPort[1]); err == nil {
182
					port = p
183
				} else {
184
					name = nameHostPort[0]
185
					host = nameHostPort[1]
186
				}
187
			case 3:
188
				if p, err := strconv.Atoi(nameHostPort[2]); err == nil {
189
					port = p
190
				} else {
191
					continue
192
				}
193
				name = nameHostPort[0]
194
				host = nameHostPort[1]
195

196
			default:
197
				continue
198
			}
199

200
			node := CloudNode{
201
				Node:       name + "@" + host,
202
				Port:       uint16(port),
203
				SkipVerify: true,
204
			}
205
			nodes = append(nodes, node)
206

207
		}
208

209
		if len(nodes) > 0 {
210
			return nodes, nil
211
		}
212
	}
213
	_, srv, err := net.LookupSRV("cloud", "dist", "ergo.services")
214
	if err != nil {
215
		return nil, err
216
	}
217

218
	nodes := make([]CloudNode, len(srv))
219
	for i := range srv {
220
		nodes[i].Node = "dist@" + strings.TrimSuffix(srv[i].Target, ".")
221
		nodes[i].Port = srv[i].Port
222
	}
223

224
	// return only 3 of them
225
	if len(nodes) > 3 {
226
		return nodes[:3], nil
227
	}
228
	return nodes, nil
229
}
230

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.