13
"github.com/ergo-services/ergo/etf"
14
"github.com/ergo-services/ergo/lib"
15
"github.com/ergo-services/ergo/node"
19
defaultHandshakeTimeout = 5 * time.Second
20
clusterNameLengthMax = 128
23
type Handshake struct {
31
type handshakeDetails struct {
34
details node.HandshakeDetails
39
func createHandshake(options node.Cloud) (node.HandshakeInterface, error) {
40
if options.Timeout == 0 {
41
options.Timeout = defaultHandshakeTimeout
44
if err := RegisterTypes(); err != nil {
53
func (ch *Handshake) Init(nodename string, creation uint32, flags node.Flags) error {
54
if flags.EnableProxy == false {
55
s := "proxy feature must be enabled for the cloud connection"
59
if ch.options.Cluster == "" {
60
s := "option Cloud.Cluster can not be empty"
64
if len(ch.options.Cluster) > clusterNameLengthMax {
65
s := "option Cloud.Cluster has too long name"
69
ch.nodename = nodename
70
ch.creation = creation
72
if ch.options.Flags.Enable == false {
76
ch.flags.EnableRemoteSpawn = ch.options.Flags.EnableRemoteSpawn
80
func (ch *Handshake) Start(remote net.Addr, conn lib.NetReadWriter, tls bool, cookie string) (node.HandshakeDetails, error) {
82
handshake := &handshakeDetails{
83
cookieHash: hash.Sum([]byte(cookie)),
86
handshake.details.Flags = ch.flags
91
timer := time.NewTimer(ch.options.Timeout)
95
defer lib.ReleaseBuffer(b)
97
asyncReadChannel := make(chan error, 2)
99
_, err := b.ReadDataFrom(conn, 1024)
100
asyncReadChannel <- err
104
await := []byte{ProtoHandshakeV1AuthReply, ProtoHandshakeV1Error}
111
return handshake.details, fmt.Errorf("timeout")
112
case err := <-asyncReadChannel:
114
return handshake.details, err
117
if b.Len() < expectingBytes {
121
if b.B[0] != ProtoHandshakeV1 {
122
return handshake.details, fmt.Errorf("malformed handshake proto")
125
l := int(binary.BigEndian.Uint16(b.B[2:4]))
126
buffer := b.B[4 : l+4]
128
if len(buffer) != l {
129
return handshake.details, fmt.Errorf("malformed handshake (wrong packet length)")
133
if bytes.Count(await, b.B[1:2]) == 0 {
134
return handshake.details, fmt.Errorf("malformed handshake sequence")
137
await, rest, err = ch.handle(conn, b.B[1], buffer, handshake)
139
return handshake.details, err
142
if await == nil && rest != nil {
144
handshake.details.Buffer = lib.TakeBuffer()
145
handshake.details.Buffer.Set(rest)
157
return handshake.details, nil
160
func (ch *Handshake) handle(socket io.Writer, messageType byte, buffer []byte, details *handshakeDetails) ([]byte, []byte, error) {
162
case ProtoHandshakeV1AuthReply:
163
if err := ch.handleV1AuthReply(buffer, details); err != nil {
166
if err := ch.sendV1Challenge(socket, details); err != nil {
169
return []byte{ProtoHandshakeV1ChallengeAccept, ProtoHandshakeV1Error}, nil, nil
171
case ProtoHandshakeV1ChallengeAccept:
172
rest, err := ch.handleV1ChallegeAccept(buffer, details)
176
return nil, rest, err
178
case ProtoHandshakeV1Error:
179
return nil, nil, ch.handleV1Error(buffer)
182
return nil, nil, fmt.Errorf("unknown message type")
186
func (ch *Handshake) sendV1Auth(socket io.Writer) error {
187
b := lib.TakeBuffer()
188
defer lib.ReleaseBuffer(b)
190
message := MessageHandshakeV1Auth{
192
Cluster: ch.options.Cluster,
193
Creation: ch.creation,
194
Flags: ch.options.Flags,
196
b.Allocate(1 + 1 + 2)
197
b.B[0] = ProtoHandshakeV1
198
b.B[1] = ProtoHandshakeV1Auth
199
if err := etf.Encode(message, b, etf.EncodeOptions{}); err != nil {
202
binary.BigEndian.PutUint16(b.B[2:4], uint16(b.Len()-4))
203
if err := b.WriteDataTo(socket); err != nil {
210
func (ch *Handshake) sendV1Challenge(socket io.Writer, handshake *handshakeDetails) error {
211
b := lib.TakeBuffer()
212
defer lib.ReleaseBuffer(b)
214
digest := GenDigest(handshake.hash, []byte(ch.nodename), handshake.digestRemote, handshake.cookieHash)
215
message := MessageHandshakeV1Challenge{
218
b.Allocate(1 + 1 + 2)
219
b.B[0] = ProtoHandshakeV1
220
b.B[1] = ProtoHandshakeV1Challenge
221
if err := etf.Encode(message, b, etf.EncodeOptions{}); err != nil {
224
binary.BigEndian.PutUint16(b.B[2:4], uint16(b.Len()-4))
225
if err := b.WriteDataTo(socket); err != nil {
233
func (ch *Handshake) handleV1AuthReply(buffer []byte, handshake *handshakeDetails) error {
234
m, _, err := etf.Decode(buffer, nil, etf.DecodeOptions{})
236
return fmt.Errorf("malformed MessageHandshakeV1AuthReply message: %s", err)
238
message, ok := m.(MessageHandshakeV1AuthReply)
240
return fmt.Errorf("malformed MessageHandshakeV1AuthReply message: %#v", m)
243
digest := GenDigest(handshake.hash, []byte(message.Node), []byte(ch.options.Cluster), handshake.cookieHash)
244
if bytes.Compare(message.Digest, digest) != 0 {
245
return fmt.Errorf("authorization failed")
247
handshake.digestRemote = digest
248
handshake.details.Name = message.Node
249
handshake.details.Creation = message.Creation
254
func (ch *Handshake) handleV1ChallegeAccept(buffer []byte, handshake *handshakeDetails) ([]byte, error) {
255
m, rest, err := etf.Decode(buffer, nil, etf.DecodeOptions{})
257
return nil, fmt.Errorf("malformed MessageHandshakeV1ChallengeAccept message: %s", err)
259
message, ok := m.(MessageHandshakeV1ChallengeAccept)
261
return nil, fmt.Errorf("malformed MessageHandshakeV1ChallengeAccept message: %#v", m)
264
mapping := etf.NewAtomMapping()
265
mapping.In[etf.Atom(message.Node)] = etf.Atom(ch.nodename)
266
mapping.Out[etf.Atom(ch.nodename)] = etf.Atom(message.Node)
267
handshake.details.AtomMapping = mapping
268
handshake.mapName = message.Node
272
func (ch *Handshake) handleV1Error(buffer []byte) error {
273
m, _, err := etf.Decode(buffer, nil, etf.DecodeOptions{})
275
return fmt.Errorf("malformed MessageHandshakeV1Error message: %s", err)
277
message, ok := m.(MessageHandshakeV1Error)
279
return fmt.Errorf("malformed MessageHandshakeV1Error message: %#v", m)
281
return fmt.Errorf(message.Reason)