13
"github.com/ergo-services/ergo/etf"
14
"github.com/ergo-services/ergo/gen"
15
"github.com/ergo-services/ergo/lib"
16
"github.com/ergo-services/ergo/node"
19
type CloudNode struct {
25
type cloudClient struct {
29
type cloudClientState struct {
31
handshake node.HandshakeInterface
36
type messageCloudClientConnect struct{}
38
func (cc *cloudClient) Init(process *gen.ServerProcess, args ...etf.Term) error {
39
lib.Log("[%s] CLOUD_CLIENT: Init: %#v", process.NodeName(), args)
41
return fmt.Errorf("no args to start cloud client")
44
cloudOptions, ok := args[0].(node.Cloud)
46
return fmt.Errorf("wrong args for the cloud client")
49
handshake, err := createHandshake(cloudOptions)
51
return fmt.Errorf("can not create HandshakeInterface for the cloud client: %s", err)
54
process.State = &cloudClientState{
55
options: cloudOptions,
59
if err := process.RegisterEvent(EventCloud, MessageEventCloud{}); err != nil {
60
lib.Warning("can't register event %q: %s", EventCloud, err)
63
process.Cast(process.Self(), messageCloudClientConnect{})
68
func (cc *cloudClient) HandleCast(process *gen.ServerProcess, message etf.Term) gen.ServerStatus {
69
lib.Log("[%s] CLOUD_CLIENT: HandleCast: %#v", process.NodeName(), message)
70
switch message.(type) {
71
case messageCloudClientConnect:
72
state := process.State.(*cloudClientState)
74
// initiate connection with the cloud
75
cloudNodes, err := getCloudNodes()
77
lib.Warning("can't resolve cloud nodes: %s", err)
80
// add static route with custom handshake
81
thisNode := process.Env(node.EnvKeyNode).(node.Node)
83
for _, cloud := range cloudNodes {
84
routeOptions := node.RouteOptions{
85
Cookie: state.options.Cookie,
87
Handshake: state.handshake,
89
routeOptions.TLS = &tls.Config{
90
InsecureSkipVerify: cloud.SkipVerify,
92
if err := thisNode.AddStaticRoutePort(cloud.Node, cloud.Port, routeOptions); err != nil {
93
if err != lib.ErrTaken {
98
lib.Log("[%s] CLOUD_CLIENT: trying to connect with: %s", process.NodeName(), cloud.Node)
99
if err := thisNode.Connect(cloud.Node); err != nil {
100
lib.Log("[%s] CLOUD_CLIENT: failed with reason: ", err)
104
// add proxy domain route
105
proxyRoute := node.ProxyRoute{
106
Name: "@" + state.options.Cluster,
108
Cookie: state.options.Cookie,
110
thisNode.AddProxyRoute(proxyRoute)
112
state.monitor = process.MonitorNode(cloud.Node)
113
state.node = cloud.Node
114
event := MessageEventCloud{
115
Cluster: proxyRoute.Name,
119
if err := process.SendEventMessage(EventCloud, event); err != nil {
120
lib.Log("[%s] CLOUD_CLIENT: failed to send event (%s) %#v: %s",
121
process.NodeName(), EventCloud, event, err)
123
return gen.ServerStatusOK
126
// cloud nodes aren't available. make another attempt in 3 seconds
127
process.CastAfter(process.Self(), messageCloudClientConnect{}, 5*time.Second)
129
return gen.ServerStatusOK
132
func (cc *cloudClient) HandleInfo(process *gen.ServerProcess, message etf.Term) gen.ServerStatus {
133
lib.Log("[%s] CLOUD_CLIENT: HandleInfo: %#v", process.NodeName(), message)
134
state := process.State.(*cloudClientState)
136
switch m := message.(type) {
137
case gen.MessageNodeDown:
138
if m.Ref != state.monitor {
139
return gen.ServerStatusOK
141
thisNode := process.Env(node.EnvKeyNode).(node.Node)
142
state.cleanup(thisNode)
144
event := MessageEventCloud{
147
process.SendEventMessage(EventCloud, event)
148
// lost connection with the cloud node. try to connect again
149
process.Cast(process.Self(), messageCloudClientConnect{})
151
return gen.ServerStatusOK
154
func (cc *cloudClient) Terminate(process *gen.ServerProcess, reason string) {
155
state := process.State.(*cloudClientState)
156
thisNode := process.Env(node.EnvKeyNode).(node.Node)
157
thisNode.RemoveProxyRoute("@" + state.options.Cluster)
158
thisNode.Disconnect(state.node)
159
state.cleanup(thisNode)
162
func (ccs *cloudClientState) cleanup(node node.Node) {
163
node.RemoveStaticRoute(ccs.node)
164
node.RemoveProxyRoute("@" + ccs.options.Cluster)
168
func getCloudNodes() ([]CloudNode, error) {
169
// check if custom cloud entries have been defined via env
170
if entries := strings.Fields(os.Getenv("ERGO_SERVICES_CLOUD")); len(entries) > 0 {
171
nodes := []CloudNode{}
172
for _, entry := range entries {
173
re := regexp.MustCompile("[@:]+")
174
nameHostPort := re.Split(entry, -1)
178
switch len(nameHostPort) {
180
// either abc@def or abc:def
181
if p, err := strconv.Atoi(nameHostPort[1]); err == nil {
184
name = nameHostPort[0]
185
host = nameHostPort[1]
188
if p, err := strconv.Atoi(nameHostPort[2]); err == nil {
193
name = nameHostPort[0]
194
host = nameHostPort[1]
201
Node: name + "@" + host,
205
nodes = append(nodes, node)
213
_, srv, err := net.LookupSRV("cloud", "dist", "ergo.services")
218
nodes := make([]CloudNode, len(srv))
220
nodes[i].Node = "dist@" + strings.TrimSuffix(srv[i].Target, ".")
221
nodes[i].Port = srv[i].Port
224
// return only 3 of them
226
return nodes[:3], nil